From bb86e5aa0c78415502faad92464148f2e449e24d Mon Sep 17 00:00:00 2001 From: German Osin Date: Sat, 7 Feb 2026 11:32:57 +0100 Subject: [PATCH 1/2] BE: Issue 1475 Expose connector consumer group if present --- .../kafbat/ui/config/ClustersProperties.java | 1 + .../kafbat/ui/mapper/KafkaConnectMapper.java | 27 ++++++- .../java/io/kafbat/ui/model/KafkaCluster.java | 1 + .../model/connect/InternalConnectorInfo.java | 1 + .../ui/service/KafkaClusterFactory.java | 10 +++ .../ui/service/KafkaConnectService.java | 75 ++++++++++++++----- .../kafbat/ui/KafkaConnectServiceTests.java | 28 +++++-- .../index/KafkaConnectNgramFilterTest.java | 14 ++-- contract-typespec/api/kafka-connect.tsp | 3 + 9 files changed, 128 insertions(+), 32 deletions(-) diff --git a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java index 2f089129e..aabd1b5f9 100644 --- a/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java +++ b/api/src/main/java/io/kafbat/ui/config/ClustersProperties.java @@ -157,6 +157,7 @@ public static class ConnectCluster { String password; String keystoreLocation; String keystorePassword; + String consumerNamePattern = "connect-%s"; } @Data diff --git a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java index 7b671f2b5..b2168e8bf 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java @@ -31,6 +31,7 @@ import javax.annotation.Nullable; import org.mapstruct.Mapper; import org.mapstruct.Mapping; +import org.openapitools.jackson.nullable.JsonNullable; @Mapper(componentModel = "spring") public interface KafkaConnectMapper { @@ -48,6 +49,7 @@ default ClusterInfo toClient(KafkaConnectState state) { ConnectorDTO fromClient(Connector connector); default ConnectorDTO fromClient(Connector connector, + ClustersProperties.ConnectCluster properties, String connect, ConnectorTopics topics, Map sanitizedConfigs, @@ -73,6 +75,9 @@ default ConnectorDTO fromClient(Connector connector, } } } + result.setConsumer(JsonNullable.of( + properties.getConsumerNamePattern().formatted(connector.getName()) + )); return result; } @@ -91,7 +96,19 @@ ConnectorPluginConfigValidationResponseDTO fromClient( io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse connectorPluginConfigValidationResponse); - default InternalConnectorInfo fromClient(String connect, ExpandedConnector connector, @Nullable List topics) { + + default InternalConnectorInfo fromClient(ClustersProperties.ConnectCluster connect, + ExpandedConnector connector, @Nullable List topics) { + return fromClient(connect.getName(), connect.getConsumerNamePattern(), connector, topics); + } + + default InternalConnectorInfo fromClient(ConnectDTO connect, ExpandedConnector connector, + @Nullable List topics) { + return fromClient(connect.getName(), connect.getConsumerNamePattern(), connector, topics); + } + + default InternalConnectorInfo fromClient(String connectName, String consumerGroupPattern, + ExpandedConnector connector, @Nullable List topics) { Objects.requireNonNull(connector.getInfo()); Objects.requireNonNull(connector.getStatus()); List tasks = List.of(); @@ -113,7 +130,8 @@ default InternalConnectorInfo fromClient(String connect, ExpandedConnector conne } ConnectorDTO connectorDto = fromClient(connector.getInfo()) - .connect(connect) + .connect(connectName) + .consumer(consumerGroupPattern.formatted(connector.getInfo().getName())) .status(fromClient(connector.getStatus().getConnector())); return InternalConnectorInfo.builder() @@ -121,6 +139,7 @@ default InternalConnectorInfo fromClient(String connect, ExpandedConnector conne .config(connector.getInfo().getConfig()) .tasks(tasks) .topics(topics) + .consumer(consumerGroupPattern.formatted(connector.getInfo().getName())) .build(); } @@ -174,7 +193,8 @@ default ConnectDTO toKafkaConnect( .failedTasksCount(failedTasksCount) .version(clusterInfo.getVersion()) .commit(clusterInfo.getCommit()) - .clusterId(clusterInfo.getKafkaClusterId()); + .clusterId(clusterInfo.getKafkaClusterId()) + .consumerNamePattern(connect.getConsumerNamePattern()); } default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) { @@ -193,6 +213,7 @@ default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo .topics(connectInfo.getTopics()) .status(connector.getStatus()) .tasksCount(tasks.size()) + .consumer(connectInfo.getConsumer()) .failedTasksCount(failedTasksCount); } diff --git a/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java b/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java index 8fad56a49..5848c9bb7 100644 --- a/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java +++ b/api/src/main/java/io/kafbat/ui/model/KafkaCluster.java @@ -35,6 +35,7 @@ public class KafkaCluster { private final MetricsScraper metricsScrapping; private final ReactiveFailover schemaRegistryClient; private final Map> connectsClients; + private final Map connectsConfigs; private final ReactiveFailover ksqlClient; private final ReactiveFailover prometheusStorageClient; } diff --git a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java index 6c884316a..cb8ea80c2 100644 --- a/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java +++ b/api/src/main/java/io/kafbat/ui/model/connect/InternalConnectorInfo.java @@ -14,4 +14,5 @@ public class InternalConnectorInfo { private final Map config; private final List tasks; private final List topics; + private final String consumer; } diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java index 5a0c5201a..42f73c6bd 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java @@ -30,6 +30,8 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; +import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -83,6 +85,7 @@ public KafkaCluster create(ClustersProperties properties, } if (connectClientsConfigured(clusterProperties)) { builder.connectsClients(connectClients(clusterProperties)); + builder.connectsConfigs(connectConfigs(clusterProperties)); } if (ksqlConfigured(clusterProperties)) { builder.ksqlClient(ksqlClient(clusterProperties)); @@ -206,6 +209,13 @@ private Map> connectClients( return connects; } + private Map connectConfigs(ClustersProperties.Cluster clusterProperties) { + return clusterProperties.getKafkaConnect().stream().collect(Collectors.toMap( + ClustersProperties.ConnectCluster::getName, + Function.identity() + )); + } + private ReactiveFailover connectClient(ClustersProperties.Cluster cluster, ClustersProperties.ConnectCluster connectCluster) { return ReactiveFailover.create( diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index 6c1623226..d79b008d6 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -21,11 +21,13 @@ import io.kafbat.ui.model.FullConnectorInfoDTO; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.NewConnectorDTO; +import io.kafbat.ui.model.Statistics; import io.kafbat.ui.model.TaskDTO; import io.kafbat.ui.model.TaskIdDTO; import io.kafbat.ui.model.connect.InternalConnectorInfo; import io.kafbat.ui.service.index.KafkaConnectNgramFilter; import io.kafbat.ui.service.metrics.scrape.KafkaConnectState; +import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState; import io.kafbat.ui.util.ReactiveFailover; import jakarta.validation.Valid; import java.util.HashMap; @@ -35,6 +37,7 @@ import java.util.function.Predicate; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; +import org.openapitools.jackson.nullable.JsonNullable; import org.springframework.stereotype.Service; import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Flux; @@ -105,8 +108,8 @@ private Flux getConnectConnectors( return getConnectorsWithErrorsSuppress(cluster, connect.getName()).flatMapMany(connectors -> Flux.fromStream( connectors.values().stream().map(c -> - kafkaConnectMapper.fromClient(connect.getName(), c, null) - ) + kafkaConnectMapper.fromClient(connect, c, null) + ).map(i -> checkConsumerGroup(cluster, i)) ) ); } @@ -124,8 +127,8 @@ public Flux getAllConnectors(final KafkaCluster cluster, connect.getName(), e.getKey() ).map(topics -> - kafkaConnectMapper.fromClient(connect.getName(), e.getValue(), topics.getTopics()) - ) + kafkaConnectMapper.fromClient(connect, e.getValue(), topics.getTopics()) + ).map(i -> checkConsumerGroup(cluster, i)) ) ) ).map(kafkaConnectMapper::fullConnectorInfo) @@ -142,10 +145,10 @@ public Flux scrapeAllConnects(KafkaCluster cluster) { return Flux.fromIterable(connectClusters.orElse(List.of())).flatMap(c -> getClusterInfo(cluster, c.getName()).map(info -> kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false) - ).onErrorResume((t) -> Mono.just(new ConnectDTO().name(c.getName()))) + ).onErrorResume((_) -> Mono.just(new ConnectDTO().name(c.getName()))) ).flatMap(connect -> getConnectorsWithErrorsSuppress(cluster, connect.getName()) - .onErrorResume(t -> Mono.just(Map.of())) + .onErrorResume(_ -> Mono.just(Map.of())) .flatMapMany(connectors -> Flux.fromIterable(connectors.entrySet()) .flatMap(e -> @@ -154,7 +157,7 @@ public Flux scrapeAllConnects(KafkaCluster cluster) { connect.getName(), e.getKey() ).map(topics -> - kafkaConnectMapper.fromClient(connect.getName(), e.getValue(), topics.getTopics()) + kafkaConnectMapper.fromClient(connect, e.getValue(), topics.getTopics()) ) ) ).collectList().map(connectors -> kafkaConnectMapper.toScrapeState(connect, connectors)) @@ -178,7 +181,7 @@ public Mono getConnectorTopics(KafkaCluster cluster, String con .map(result -> result.get(connectorName)) // old Connect API versions don't have this endpoint, setting empty list for // backward-compatibility - .onErrorResume(Exception.class, e -> Mono.just(new ConnectorTopics().topics(List.of()))); + .onErrorResume(Exception.class, _ -> Mono.just(new ConnectorTopics().topics(List.of()))); } public Mono> getConnectors(KafkaCluster cluster, String connectName) { @@ -228,17 +231,18 @@ public Mono getConnector(KafkaCluster cluster, String connectName, client.getConnector(connectorName), getConnectorTopics(cluster, connectName, connectorName), client.getConnectorStatus(connectorName).onErrorResume(WebClientResponseException.NotFound.class, - e -> emptyStatus(connectorName)) + _ -> emptyStatus(connectorName)) ) .map(t -> kafkaConnectMapper.fromClient( t.getT1(), + cluster.getConnectsConfigs().get(connectName), connectName, t.getT2(), kafkaConfigSanitizer.sanitizeConnectorConfig(t.getT1().getConfig()), t.getT3() ) - ) + ).map(c -> checkConsumerGroup(cluster, c)) ); } @@ -263,7 +267,7 @@ public Mono setConnectorConfig(KafkaCluster cluster, String connec .mono(c -> requestBody .flatMap(body -> c.setConnectorConfig(connectorName, body)) - .map(connector -> kafkaConnectMapper.fromClient(connector))); + .map(kafkaConnectMapper::fromClient)); } public Mono deleteConnector( @@ -277,7 +281,7 @@ public Mono updateConnectorState(KafkaCluster cluster, String connectName, return api(cluster, connectName) .mono(client -> switch (action) { case RESTART -> client.restartConnector(connectorName, false, false); - case RESTART_ALL_TASKS -> restartTasks(cluster, connectName, connectorName, task -> true); + case RESTART_ALL_TASKS -> restartTasks(cluster, connectName, connectorName, _ -> true); case RESTART_FAILED_TASKS -> restartTasks(cluster, connectName, connectorName, t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED); case PAUSE -> client.pauseConnector(connectorName); @@ -303,13 +307,13 @@ public Flux getConnectorTasks(KafkaCluster cluster, String connectName, return api(cluster, connectName) .flux(client -> client.getConnectorTasks(connectorName) - .onErrorResume(WebClientResponseException.NotFound.class, e -> Flux.empty()) + .onErrorResume(WebClientResponseException.NotFound.class, _ -> Flux.empty()) .map(kafkaConnectMapper::fromClient) .flatMap(task -> client .getConnectorTaskStatus(connectorName, Optional.ofNullable(task.getId()).map(TaskIdDTO::getTask).orElseThrow() - ).onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty()) + ).onErrorResume(WebClientResponseException.NotFound.class, _ -> Mono.empty()) .map(kafkaConnectMapper::fromClient) .map(task::status) )); @@ -352,11 +356,11 @@ public Mono resetConnectorOffsets(KafkaCluster cluster, String connectName return api(cluster, connectName) .mono(client -> client.resetConnectorOffsets(connectorName)) .onErrorResume(WebClientResponseException.NotFound.class, - e -> { + _ -> { throw new NotFoundException("Connector %s not found in %s".formatted(connectorName, connectName)); }) .onErrorResume(WebClientResponseException.BadRequest.class, - e -> { + _ -> { throw new ConnectorOffsetsResetException( "Failed to reset offsets of connector %s of %s. Make sure it is STOPPED first." .formatted(connectorName, connectName)); @@ -381,11 +385,44 @@ public Flux getTopicConnectors(KafkaCluster cluster, Strin connectors.entrySet() .stream() .filter(c -> entry.getValue().contains(c.getKey())) - .map(c -> kafkaConnectMapper.fromClient(entry.getKey(), c.getValue(), null)) - .map(kafkaConnectMapper::fullConnectorInfo) - .toList() + .map(c -> + kafkaConnectMapper.fromClient( + cluster.getConnectsConfigs().get(entry.getKey()), c.getValue(), null) + ).map(i -> + checkConsumerGroup(cluster, i) + ).map(kafkaConnectMapper::fullConnectorInfo).toList() ) ).flatMap(Flux::fromIterable); + } + + private InternalConnectorInfo checkConsumerGroup(KafkaCluster cluster, InternalConnectorInfo info) { + if (info.getConsumer() == null) { + return info; + } + + return info.toBuilder().consumer( + getConsumerGroup(cluster, info.getConsumer()).orElse(null) + ).build(); + } + + private ConnectorDTO checkConsumerGroup(KafkaCluster cluster, ConnectorDTO dto) { + if (dto.getConsumer() == null && dto.getConsumer().isPresent()) { + return dto; + } + + dto.setConsumer( + getConsumerGroup(cluster, dto.getConsumer().get()) + .map(JsonNullable::of) + .orElse(JsonNullable.undefined()) + ); + return dto; + } + private Optional getConsumerGroup(KafkaCluster cluster, String consumerGroupName) { + return Optional.ofNullable(statisticsCache.get(cluster)) + .map(Statistics::getClusterState) + .map(ScrapedClusterState::getConsumerGroupsStates) + .map(cg -> cg.get(consumerGroupName)) + .map(ScrapedClusterState.ConsumerGroupState::group); } } diff --git a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java index 2951c9684..75e1fd265 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java @@ -13,6 +13,7 @@ import io.kafbat.ui.model.ConnectorStateDTO; import io.kafbat.ui.model.ConnectorStatusDTO; import io.kafbat.ui.model.ConnectorTypeDTO; +import io.kafbat.ui.model.FullConnectorInfoDTO; import io.kafbat.ui.model.InternalTopic; import io.kafbat.ui.model.KafkaCluster; import io.kafbat.ui.model.NewConnectorDTO; @@ -23,11 +24,14 @@ import io.kafbat.ui.service.TopicsService; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.UUID; import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.openapitools.jackson.nullable.JsonNullable; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.ParameterizedTypeReference; import org.springframework.test.web.reactive.server.WebTestClient; @@ -83,8 +87,15 @@ void setUp() { "test.password", "test-credentials"))) .exchange() .expectStatus().isOk(); - // Force cache refresh - statisticsService.updateCache(kafkaCluster).block(); + + // Force cache refresh and wait for the consumer to start + Awaitility.await().until(() -> + statisticsService.updateCache(kafkaCluster).map(v -> + Optional.ofNullable( + v.getClusterState().getConsumerGroupsStates().get("connect-" + connectorName) + ).isPresent() + ).block() + ); } @AfterEach @@ -102,9 +113,15 @@ void shouldListAllConnectors() { .uri("/api/clusters/{clusterName}/connectors", LOCAL) .exchange() .expectStatus().isOk() - .expectBody() - .jsonPath(String.format("$[?(@.name == '%s')]", connectorName)) - .exists(); + .expectBodyList(FullConnectorInfoDTO.class) + .value(connectors -> { + assertThat(connectors) + .anyMatch(connector -> connector.getName().equals(connectorName)); + FullConnectorInfoDTO createdConnector = + connectors.stream().filter(connector -> connector.getName().equals(connectorName)) + .findFirst().orElseThrow(); + assertThat(createdConnector.getConsumer()).isEqualTo(JsonNullable.of("connect-" + connectorName)); + }); } @Test @@ -203,6 +220,7 @@ void shouldRetrieveConnector() { .task(0))) .type(ConnectorTypeDTO.SINK) .name(connectorName) + .consumer("connect-"+connectorName) .config(config); webTestClient.get() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL, diff --git a/api/src/test/java/io/kafbat/ui/service/index/KafkaConnectNgramFilterTest.java b/api/src/test/java/io/kafbat/ui/service/index/KafkaConnectNgramFilterTest.java index df1b14fe4..5c1f943b9 100644 --- a/api/src/test/java/io/kafbat/ui/service/index/KafkaConnectNgramFilterTest.java +++ b/api/src/test/java/io/kafbat/ui/service/index/KafkaConnectNgramFilterTest.java @@ -9,6 +9,7 @@ import java.util.List; import java.util.Map; import java.util.stream.IntStream; +import org.openapitools.jackson.nullable.JsonNullable; class KafkaConnectNgramFilterTest extends AbstractNgramFilterTest { @@ -29,7 +30,8 @@ protected List items() { List.of(), new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, "worker-1", "reason"), 1, - 0)).toList(); + 0, + JsonNullable.of("connect-connector-"+i))).toList(); } @Override @@ -54,7 +56,8 @@ protected List sortedItems() { List.of(), new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, null, "reason"), 1, - 0 + 0, + null ), new FullConnectorInfoDTO( "pay-connect", @@ -64,7 +67,8 @@ protected List sortedItems() { List.of(), new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, null, "reason"), 1, - 0 + 0, + null ) ); } @@ -85,7 +89,7 @@ protected List sortedResult(List ite List.of(), new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, null, "reason"), 1, - 0), + 0, null), new FullConnectorInfoDTO( "connect-pay", "connector-pay", @@ -94,6 +98,6 @@ protected List sortedResult(List ite List.of(), new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, null, "reason"), 1, - 0)); + 0, null)); } } diff --git a/contract-typespec/api/kafka-connect.tsp b/contract-typespec/api/kafka-connect.tsp index ff4897a2f..0bd394889 100644 --- a/contract-typespec/api/kafka-connect.tsp +++ b/contract-typespec/api/kafka-connect.tsp @@ -182,6 +182,7 @@ model Connect { version?: string | null; commit?: string | null; clusterId?: string | null; + consumerNamePattern?: string; } model ConnectorConfig is Record; @@ -246,6 +247,7 @@ model Connector { status: ConnectorStatus; connect: string; topics?: string[]; + consumer?: string | null; } enum ConnectorAction { @@ -317,6 +319,7 @@ model FullConnectorInfo { status: ConnectorStatus; tasksCount?: integer; failedTasksCount?: integer; + consumer?: string | null; } enum ConnectorColumnsToSort { From ae4f059017864cc557a3baefe86a657474b5927a Mon Sep 17 00:00:00 2001 From: German Osin Date: Sat, 7 Feb 2026 11:40:31 +0100 Subject: [PATCH 2/2] Fixed styles --- .../java/io/kafbat/ui/KafkaConnectServiceTests.java | 12 ++++++------ .../service/index/KafkaConnectNgramFilterTest.java | 3 +-- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java index 75e1fd265..71e76709f 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java @@ -90,11 +90,11 @@ void setUp() { // Force cache refresh and wait for the consumer to start Awaitility.await().until(() -> - statisticsService.updateCache(kafkaCluster).map(v -> - Optional.ofNullable( - v.getClusterState().getConsumerGroupsStates().get("connect-" + connectorName) - ).isPresent() - ).block() + statisticsService.updateCache(kafkaCluster).map(v -> + Optional.ofNullable( + v.getClusterState().getConsumerGroupsStates().get("connect-" + connectorName) + ).isPresent() + ).block() ); } @@ -220,7 +220,7 @@ void shouldRetrieveConnector() { .task(0))) .type(ConnectorTypeDTO.SINK) .name(connectorName) - .consumer("connect-"+connectorName) + .consumer("connect-" + connectorName) .config(config); webTestClient.get() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL, diff --git a/api/src/test/java/io/kafbat/ui/service/index/KafkaConnectNgramFilterTest.java b/api/src/test/java/io/kafbat/ui/service/index/KafkaConnectNgramFilterTest.java index 5c1f943b9..75b3910e4 100644 --- a/api/src/test/java/io/kafbat/ui/service/index/KafkaConnectNgramFilterTest.java +++ b/api/src/test/java/io/kafbat/ui/service/index/KafkaConnectNgramFilterTest.java @@ -9,7 +9,6 @@ import java.util.List; import java.util.Map; import java.util.stream.IntStream; -import org.openapitools.jackson.nullable.JsonNullable; class KafkaConnectNgramFilterTest extends AbstractNgramFilterTest { @@ -31,7 +30,7 @@ protected List items() { new ConnectorStatusDTO(ConnectorStateDTO.RUNNING, "worker-1", "reason"), 1, 0, - JsonNullable.of("connect-connector-"+i))).toList(); + null)).toList(); } @Override