Skip to content

Commit 7b6fdc7

Browse files
wu-shengclaude
andcommitted
Shared KubernetesClient singleton to fix thread leak and churn
- Add SharedKubernetesClient enum singleton with KubernetesHttpClientFactory that uses virtual threads on JDK 25+ or a single fixed thread on JDK <25. - Replace all KubernetesClientBuilder().build() calls (9 sites across 7 files) with the shared instance to eliminate per-call thread churn. - Fix KubernetesCoordinator client leak (never closed, selector thread persisted). - Consolidate kubernetes-client dependencies in query-graphql-plugin and configuration-k8s-configmap to use library-kubernetes-support. - Fix benchmark health check to use curlimages/curl pod (OAP JRE 25 image does not include curl). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 506f60d commit 7b6fdc7

File tree

13 files changed

+164
-67
lines changed

13 files changed

+164
-67
lines changed

benchmarks/envs-setup/istio-cluster_oap-banyandb/setup.sh

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,9 +356,12 @@ OAP_PODS_CHECK=($(kubectl -n "$NAMESPACE" get pods -l app=skywalking,component=o
356356
EXPECTED_NODES=${#OAP_PODS_CHECK[@]}
357357
CLUSTER_HEALTHY=true
358358

359+
CURL_IMAGE="curlimages/curl:latest"
359360
for pod in "${OAP_PODS_CHECK[@]}"; do
360361
log " Checking $pod..."
361-
METRICS=$(kubectl -n "$NAMESPACE" exec "$pod" -c oap -- curl -s http://localhost:1234/metrics 2>/dev/null)
362+
POD_IP=$(kubectl -n "$NAMESPACE" get pod "$pod" -o jsonpath='{.status.podIP}')
363+
METRICS=$(kubectl -n "$NAMESPACE" run "health-check-${pod##*-}" --rm -i --restart=Never \
364+
--image="$CURL_IMAGE" -- curl -s "http://${POD_IP}:1234/metrics" 2>/dev/null) || METRICS=""
362365
REMOTE_OUT=$(echo "$METRICS" | grep '^remote_out_count{' || true)
363366

364367
if [ -z "$REMOTE_OUT" ]; then

oap-server/server-cluster-plugin/cluster-kubernetes-plugin/src/main/java/org/apache/skywalking/oap/server/cluster/plugin/kubernetes/KubernetesCoordinator.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
package org.apache.skywalking.oap.server.cluster.plugin.kubernetes;
2020

2121
import com.linecorp.armeria.client.Endpoint;
22-
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
2322
import lombok.extern.slf4j.Slf4j;
23+
import org.apache.skywalking.library.kubernetes.SharedKubernetesClient;
2424
import org.apache.skywalking.oap.server.core.CoreModule;
2525
import org.apache.skywalking.oap.server.core.cluster.ClusterCoordinator;
2626
import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
@@ -72,8 +72,8 @@ private EndpointGroup createEndpointGroup() {
7272
if (port == -1) {
7373
port = manager.find(CoreModule.NAME).provider().getService(ConfigService.class).getGRPCPort();
7474
}
75-
final var kubernetesClient = new KubernetesClientBuilder().build();
76-
final var builder = KubernetesLabelSelectorEndpointGroup.builder(kubernetesClient);
75+
final var builder = KubernetesLabelSelectorEndpointGroup.builder(
76+
SharedKubernetesClient.INSTANCE.get());
7777

7878
if (StringUtil.isNotBlank(config.getNamespace())) {
7979
builder.namespace(config.getNamespace());

oap-server/server-configuration/configuration-k8s-configmap/pom.xml

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,9 @@
3737
<version>${project.version}</version>
3838
</dependency>
3939
<dependency>
40-
<groupId>io.fabric8</groupId>
41-
<artifactId>kubernetes-client</artifactId>
42-
</dependency>
43-
<dependency>
44-
<groupId>io.fabric8</groupId>
45-
<artifactId>kubernetes-httpclient-jdk</artifactId>
40+
<groupId>org.apache.skywalking</groupId>
41+
<artifactId>library-kubernetes-support</artifactId>
42+
<version>${project.version}</version>
4643
</dependency>
4744
<dependency>
4845
<groupId>org.apache.skywalking</groupId>

oap-server/server-configuration/configuration-k8s-configmap/src/main/java/org/apache/skywalking/oap/server/configuration/configmap/ConfigurationConfigmapInformer.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
package org.apache.skywalking.oap.server.configuration.configmap;
2020

2121
import io.fabric8.kubernetes.api.model.ConfigMap;
22-
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
2322
import io.fabric8.kubernetes.client.informers.cache.Lister;
23+
import org.apache.skywalking.library.kubernetes.SharedKubernetesClient;
2424
import lombok.extern.slf4j.Slf4j;
2525

2626
import java.util.HashMap;
@@ -32,19 +32,13 @@ public class ConfigurationConfigmapInformer {
3232
private final Lister<ConfigMap> configMapLister;
3333

3434
public ConfigurationConfigmapInformer(ConfigmapConfigurationSettings settings) {
35-
final var client = new KubernetesClientBuilder().build();
36-
final var informer = client
35+
final var informer = SharedKubernetesClient.INSTANCE.get()
3736
.configMaps()
3837
.inNamespace(settings.getNamespace())
3938
.withLabelSelector(settings.getLabelSelector())
4039
.inform();
4140

4241
configMapLister = new Lister<>(informer.getIndexer());
43-
44-
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
45-
informer.stop();
46-
client.close();
47-
}));
4842
}
4943

5044
public Map<String, String> configMapData() {

oap-server/server-library/library-kubernetes-support/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@
2828
<artifactId>library-kubernetes-support</artifactId>
2929

3030
<dependencies>
31+
<dependency>
32+
<groupId>org.apache.skywalking</groupId>
33+
<artifactId>library-util</artifactId>
34+
<version>${project.version}</version>
35+
</dependency>
3136
<dependency>
3237
<groupId>io.fabric8</groupId>
3338
<artifactId>kubernetes-client</artifactId>

oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesEndpoints.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.common.cache.CacheLoader;
2424
import com.google.common.cache.LoadingCache;
2525
import io.fabric8.kubernetes.api.model.Endpoints;
26-
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
2726
import lombok.SneakyThrows;
2827
import org.slf4j.LoggerFactory;
2928

@@ -43,8 +42,8 @@ public enum KubernetesEndpoints {
4342
.expireAfterWrite(Duration.ofMinutes(3));
4443

4544
endpoints = cacheBuilder.build(CacheLoader.from(() -> {
46-
try (final var kubernetesClient = new KubernetesClientBuilder().build()) {
47-
return kubernetesClient
45+
try {
46+
return SharedKubernetesClient.INSTANCE.get()
4847
.endpoints()
4948
.inAnyNamespace()
5049
.list()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.library.kubernetes;
20+
21+
import io.fabric8.kubernetes.client.jdkhttp.JdkHttpClientFactory;
22+
import org.apache.skywalking.oap.server.library.util.VirtualThreads;
23+
24+
import java.net.http.HttpClient;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
28+
/**
29+
* Custom {@link JdkHttpClientFactory} that configures the JDK {@link HttpClient}
30+
* with a minimal executor to reduce thread usage.
31+
*
32+
* <ul>
33+
* <li>JDK 25+: virtual-thread-per-task executor (0 platform threads)</li>
34+
* <li>JDK &lt; 25: single-thread fixed pool named {@code K8sClient-executor-0}</li>
35+
* </ul>
36+
*
37+
* <p>The JDK {@code HttpClient} always creates 1 internal {@code SelectorManager}
38+
* thread regardless. This factory controls only the executor threads.
39+
*/
40+
final class KubernetesHttpClientFactory extends JdkHttpClientFactory {
41+
42+
@Override
43+
protected void additionalConfig(final HttpClient.Builder builder) {
44+
final ExecutorService executor = VirtualThreads.createExecutor(
45+
"K8sClient-executor",
46+
() -> Executors.newFixedThreadPool(1, r -> {
47+
final Thread t = new Thread(r, "K8sClient-executor-0");
48+
t.setDaemon(true);
49+
return t;
50+
})
51+
);
52+
builder.executor(executor);
53+
}
54+
}

oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesPods.java

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.common.cache.CacheLoader;
2424
import com.google.common.cache.LoadingCache;
2525
import io.fabric8.kubernetes.api.model.Pod;
26-
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
2726
import lombok.SneakyThrows;
2827

2928
import java.time.Duration;
@@ -44,30 +43,26 @@ public enum KubernetesPods {
4443
podByIP = cacheBuilder.build(new CacheLoader<>() {
4544
@Override
4645
public Optional<Pod> load(String ip) {
47-
try (final var kubernetesClient = new KubernetesClientBuilder().build()) {
48-
return kubernetesClient
49-
.pods()
50-
.inAnyNamespace()
51-
.withField("status.podIP", ip)
52-
.list()
53-
.getItems()
54-
.stream()
55-
.findFirst();
56-
}
46+
return SharedKubernetesClient.INSTANCE.get()
47+
.pods()
48+
.inAnyNamespace()
49+
.withField("status.podIP", ip)
50+
.list()
51+
.getItems()
52+
.stream()
53+
.findFirst();
5754
}
5855
});
5956

6057
podByObjectID = cacheBuilder.build(new CacheLoader<>() {
6158
@Override
6259
public Optional<Pod> load(ObjectID objectID) {
63-
try (final var kubernetesClient = new KubernetesClientBuilder().build()) {
64-
return Optional.ofNullable(
65-
kubernetesClient
66-
.pods()
67-
.inNamespace(objectID.namespace())
68-
.withName(objectID.name())
69-
.get());
70-
}
60+
return Optional.ofNullable(
61+
SharedKubernetesClient.INSTANCE.get()
62+
.pods()
63+
.inNamespace(objectID.namespace())
64+
.withName(objectID.name())
65+
.get());
7166
}
7267
});
7368
}

oap-server/server-library/library-kubernetes-support/src/main/java/org/apache/skywalking/library/kubernetes/KubernetesServices.java

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.google.common.cache.CacheLoader;
2424
import com.google.common.cache.LoadingCache;
2525
import io.fabric8.kubernetes.api.model.Service;
26-
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
2726
import lombok.SneakyThrows;
2827
import org.slf4j.LoggerFactory;
2928

@@ -45,8 +44,8 @@ public enum KubernetesServices {
4544
.expireAfterWrite(Duration.ofMinutes(3));
4645

4746
services = cacheBuilder.build(CacheLoader.from(() -> {
48-
try (final var kubernetesClient = new KubernetesClientBuilder().build()) {
49-
return kubernetesClient
47+
try {
48+
return SharedKubernetesClient.INSTANCE.get()
5049
.services()
5150
.inAnyNamespace()
5251
.list()
@@ -60,14 +59,12 @@ public enum KubernetesServices {
6059
serviceByID = cacheBuilder.build(new CacheLoader<>() {
6160
@Override
6261
public Optional<Service> load(ObjectID id) {
63-
try (final var kubernetesClient = new KubernetesClientBuilder().build()) {
64-
return Optional.ofNullable(
65-
kubernetesClient
66-
.services()
67-
.inNamespace(id.namespace())
68-
.withName(id.name())
69-
.get());
70-
}
62+
return Optional.ofNullable(
63+
SharedKubernetesClient.INSTANCE.get()
64+
.services()
65+
.inNamespace(id.namespace())
66+
.withName(id.name())
67+
.get());
7168
}
7269
});
7370
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.library.kubernetes;
20+
21+
import io.fabric8.kubernetes.client.KubernetesClient;
22+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
23+
24+
/**
25+
* Shared {@link KubernetesClient} singleton. All modules that need Kubernetes API
26+
* access should use this instead of creating their own client instances.
27+
*
28+
* <p>Each {@code KubernetesClient} spawns internal JDK {@code HttpClient} threads
29+
* (NIO selector, executor pool). Sharing a single client eliminates thread churn
30+
* from repeated client creation in Guava cache loaders.
31+
*
32+
* <p>Thread footprint (per JDK version):
33+
* <ul>
34+
* <li>JDK 25+: 1 SelectorManager + virtual thread executor = ~1 platform thread</li>
35+
* <li>JDK &lt; 25: 1 SelectorManager + 1 fixed executor thread = 2 platform threads</li>
36+
* </ul>
37+
*/
38+
public enum SharedKubernetesClient {
39+
INSTANCE;
40+
41+
private final KubernetesClient client;
42+
43+
SharedKubernetesClient() {
44+
client = new KubernetesClientBuilder()
45+
.withHttpClientFactory(new KubernetesHttpClientFactory())
46+
.build();
47+
Runtime.getRuntime().addShutdownHook(
48+
new Thread(client::close, "K8sClient-shutdown"));
49+
}
50+
51+
public KubernetesClient get() {
52+
return client;
53+
}
54+
}

0 commit comments

Comments
 (0)