Skip to content

Commit 8e28aed

Browse files
committed
add pod informer
1 parent 47a23cb commit 8e28aed

File tree

2 files changed

+71
-0
lines changed

2 files changed

+71
-0
lines changed

src/main/java/tech/stackable/hadoop/StackableTopologyProvider.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@
55
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
66
import io.fabric8.kubernetes.client.KubernetesClientException;
77
import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext;
8+
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
9+
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
810
import java.net.InetAddress;
911
import java.net.UnknownHostException;
1012
import java.util.*;
13+
import java.util.concurrent.ExecutionException;
14+
import java.util.concurrent.Future;
1115
import java.util.stream.Collectors;
1216
import org.apache.hadoop.net.DNSToSwitchMapping;
1317
import org.slf4j.Logger;
@@ -29,11 +33,13 @@ public class StackableTopologyProvider implements DNSToSwitchMapping {
2933
// Default values
3034
public static final String DEFAULT_RACK = "/defaultRack";
3135
private static final int CACHE_EXPIRY_DEFAULT_SECONDS = 5 * 60;
36+
private static final int INFORMER_POLL_SECONDS = 30;
3237
// Cache on first usage (not on start-up to avoid attempts before listeners are available)
3338
private String listenerVersion;
3439

3540
private final KubernetesClient client;
3641
private final List<TopologyLabel> labels;
42+
private final SharedInformerFactory sharedInformerFactory;
3743

3844
// Caching layers
3945
private final TopologyCache cache;
@@ -43,6 +49,8 @@ public StackableTopologyProvider() {
4349
this.client = new KubernetesClientBuilder().build();
4450
this.cache = new TopologyCache(getCacheExpiration(), CACHE_EXPIRY_DEFAULT_SECONDS);
4551
this.labels = TopologyLabel.initializeTopologyLabels();
52+
this.sharedInformerFactory = client.informers();
53+
startPodInformer();
4654

4755
logInitializationStatus();
4856
}
@@ -149,6 +157,10 @@ private String tryNodeOrListenerOrPod(
149157
if (dataNodeIp != null) {
150158
return dataNodeIp;
151159
} else {
160+
// If a simple dataNode lookup does not work, we have to decide whether we
161+
// want to have the overhead of fetching the listeners, or of fetching all
162+
// pods in the namespace. Opt for listeners first, as there are typically
163+
// fewer of them.
152164
String resolvedListener = tryResolveListener(name);
153165
if (resolvedListener != null) {
154166
return resolvedListener;
@@ -553,4 +565,59 @@ private Map<String, Map<String, String>> buildPodLabelMap(List<Pod> dataNodes) {
553565
}
554566
return result;
555567
}
568+
569+
// ============================================================================
570+
// INFORMERS
571+
// ============================================================================
572+
573+
private void startPodInformer() {
574+
client
575+
.pods()
576+
.inNamespace(client.getNamespace())
577+
.inform(
578+
new ResourceEventHandler<>() {
579+
@Override
580+
public void onAdd(Pod pod) {
581+
cache.putPod(pod.getMetadata().getName(), pod);
582+
for (PodIP ip : pod.getStatus().getPodIPs()) {
583+
cache.putPod(ip.getIp(), pod);
584+
}
585+
LOG.info("Pod {} added", pod.getMetadata().getName());
586+
}
587+
588+
@Override
589+
public void onUpdate(Pod oldPod, Pod newPod) {
590+
cache.putPod(oldPod.getMetadata().getName(), newPod);
591+
for (PodIP ip : oldPod.getStatus().getPodIPs()) {
592+
cache.putPod(ip.getIp(), newPod);
593+
}
594+
LOG.info("Pod {} updated", oldPod.getMetadata().getName());
595+
}
596+
597+
@Override
598+
public void onDelete(Pod pod, boolean deletedFinalStateUnknown) {
599+
cache.deletePod(pod.getMetadata().getName());
600+
for (PodIP ip : pod.getStatus().getPodIPs()) {
601+
cache.deletePod(ip.getIp());
602+
}
603+
LOG.info("Pod {} deleted", pod.getMetadata().getName());
604+
}
605+
},
606+
INFORMER_POLL_SECONDS * 1000L);
607+
608+
Future<Void> future = sharedInformerFactory.startAllRegisteredInformers();
609+
610+
try {
611+
// this will block until complete
612+
LOG.debug("Waiting for informer registration to complete...");
613+
future.get();
614+
} catch (InterruptedException e) {
615+
LOG.error("Pod Informer initialization was interrupted", e);
616+
throw new RuntimeException(e);
617+
} catch (ExecutionException e) {
618+
LOG.error("Pod Informer initialization encountered an exception", e);
619+
throw new RuntimeException(e);
620+
}
621+
LOG.info("Pod Informer initialized.");
622+
}
556623
}

src/main/java/tech/stackable/hadoop/TopologyCache.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,10 @@ void putPod(String name, Pod pod) {
6969
pods.put(name, pod);
7070
}
7171

72+
void deletePod(String name) {
73+
pods.invalidate(name);
74+
}
75+
7276
boolean hasAllPods(List<String> names) {
7377
return names.stream().noneMatch(name -> pods.getIfPresent(name) == null);
7478
}

0 commit comments

Comments
 (0)