Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public static class ConnectCluster {
String password;
String keystoreLocation;
String keystorePassword;
String consumerNamePattern = "connect-%s";
}

@Data
Expand Down
27 changes: 24 additions & 3 deletions api/src/main/java/io/kafbat/ui/mapper/KafkaConnectMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import javax.annotation.Nullable;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.openapitools.jackson.nullable.JsonNullable;

@Mapper(componentModel = "spring")
public interface KafkaConnectMapper {
Expand All @@ -48,6 +49,7 @@ default ClusterInfo toClient(KafkaConnectState state) {
ConnectorDTO fromClient(Connector connector);

default ConnectorDTO fromClient(Connector connector,
ClustersProperties.ConnectCluster properties,
String connect,
ConnectorTopics topics,
Map<String, Object> sanitizedConfigs,
Expand All @@ -73,6 +75,9 @@ default ConnectorDTO fromClient(Connector connector,
}
}
}
result.setConsumer(JsonNullable.of(
properties.getConsumerNamePattern().formatted(connector.getName())
));

return result;
}
Expand All @@ -91,7 +96,19 @@ ConnectorPluginConfigValidationResponseDTO fromClient(
io.kafbat.ui.connect.model.ConnectorPluginConfigValidationResponse
connectorPluginConfigValidationResponse);

default InternalConnectorInfo fromClient(String connect, ExpandedConnector connector, @Nullable List<String> topics) {

default InternalConnectorInfo fromClient(ClustersProperties.ConnectCluster connect,
ExpandedConnector connector, @Nullable List<String> topics) {
return fromClient(connect.getName(), connect.getConsumerNamePattern(), connector, topics);
}

default InternalConnectorInfo fromClient(ConnectDTO connect, ExpandedConnector connector,
@Nullable List<String> topics) {
return fromClient(connect.getName(), connect.getConsumerNamePattern(), connector, topics);
}

default InternalConnectorInfo fromClient(String connectName, String consumerGroupPattern,
ExpandedConnector connector, @Nullable List<String> topics) {
Objects.requireNonNull(connector.getInfo());
Objects.requireNonNull(connector.getStatus());
List<TaskDTO> tasks = List.of();
Expand All @@ -113,14 +130,16 @@ default InternalConnectorInfo fromClient(String connect, ExpandedConnector conne
}

ConnectorDTO connectorDto = fromClient(connector.getInfo())
.connect(connect)
.connect(connectName)
.consumer(consumerGroupPattern.formatted(connector.getInfo().getName()))
.status(fromClient(connector.getStatus().getConnector()));

return InternalConnectorInfo.builder()
.connector(connectorDto)
.config(connector.getInfo().getConfig())
.tasks(tasks)
.topics(topics)
.consumer(consumerGroupPattern.formatted(connector.getInfo().getName()))
.build();
}

Expand Down Expand Up @@ -174,7 +193,8 @@ default ConnectDTO toKafkaConnect(
.failedTasksCount(failedTasksCount)
.version(clusterInfo.getVersion())
.commit(clusterInfo.getCommit())
.clusterId(clusterInfo.getKafkaClusterId());
.clusterId(clusterInfo.getKafkaClusterId())
.consumerNamePattern(connect.getConsumerNamePattern());
}

default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo) {
Expand All @@ -193,6 +213,7 @@ default FullConnectorInfoDTO fullConnectorInfo(InternalConnectorInfo connectInfo
.topics(connectInfo.getTopics())
.status(connector.getStatus())
.tasksCount(tasks.size())
.consumer(connectInfo.getConsumer())
.failedTasksCount(failedTasksCount);
}

Expand Down
1 change: 1 addition & 0 deletions api/src/main/java/io/kafbat/ui/model/KafkaCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class KafkaCluster {
private final MetricsScraper metricsScrapping;
private final ReactiveFailover<KafkaSrClientApi> schemaRegistryClient;
private final Map<String, ReactiveFailover<KafkaConnectClientApi>> connectsClients;
private final Map<String, ClustersProperties.ConnectCluster> connectsConfigs;
private final ReactiveFailover<KsqlApiClient> ksqlClient;
private final ReactiveFailover<PrometheusClientApi> prometheusStorageClient;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ public class InternalConnectorInfo {
private final Map<String, Object> config;
private final List<TaskDTO> tasks;
private final List<String> topics;
private final String consumer;
}
10 changes: 10 additions & 0 deletions api/src/main/java/io/kafbat/ui/service/KafkaClusterFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
Expand Down Expand Up @@ -88,6 +90,7 @@ public KafkaCluster create(ClustersProperties properties,
}
if (connectClientsConfigured(clusterProperties)) {
builder.connectsClients(connectClients(clusterProperties));
builder.connectsConfigs(connectConfigs(clusterProperties));
}
if (ksqlConfigured(clusterProperties)) {
builder.ksqlClient(ksqlClient(clusterProperties));
Expand Down Expand Up @@ -211,6 +214,13 @@ private Map<String, ReactiveFailover<KafkaConnectClientApi>> connectClients(
return connects;
}

private Map<String, ClustersProperties.ConnectCluster> connectConfigs(ClustersProperties.Cluster clusterProperties) {
return clusterProperties.getKafkaConnect().stream().collect(Collectors.toMap(
ClustersProperties.ConnectCluster::getName,
Function.identity()
));
}

private ReactiveFailover<KafkaConnectClientApi> connectClient(ClustersProperties.Cluster cluster,
ClustersProperties.ConnectCluster connectCluster) {
return ReactiveFailover.create(
Expand Down
75 changes: 56 additions & 19 deletions api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import io.kafbat.ui.model.FullConnectorInfoDTO;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.NewConnectorDTO;
import io.kafbat.ui.model.Statistics;
import io.kafbat.ui.model.TaskDTO;
import io.kafbat.ui.model.TaskIdDTO;
import io.kafbat.ui.model.connect.InternalConnectorInfo;
import io.kafbat.ui.service.index.KafkaConnectNgramFilter;
import io.kafbat.ui.service.metrics.scrape.KafkaConnectState;
import io.kafbat.ui.service.metrics.scrape.ScrapedClusterState;
import io.kafbat.ui.util.ReactiveFailover;
import jakarta.validation.Valid;
import java.util.HashMap;
Expand All @@ -35,6 +37,7 @@
import java.util.function.Predicate;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.openapitools.jackson.nullable.JsonNullable;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Flux;
Expand Down Expand Up @@ -105,8 +108,8 @@ private Flux<InternalConnectorInfo> getConnectConnectors(
return getConnectorsWithErrorsSuppress(cluster, connect.getName()).flatMapMany(connectors ->
Flux.fromStream(
connectors.values().stream().map(c ->
kafkaConnectMapper.fromClient(connect.getName(), c, null)
)
kafkaConnectMapper.fromClient(connect, c, null)
).map(i -> checkConsumerGroup(cluster, i))
)
);
}
Expand All @@ -124,8 +127,8 @@ public Flux<FullConnectorInfoDTO> getAllConnectors(final KafkaCluster cluster,
connect.getName(),
e.getKey()
).map(topics ->
kafkaConnectMapper.fromClient(connect.getName(), e.getValue(), topics.getTopics())
)
kafkaConnectMapper.fromClient(connect, e.getValue(), topics.getTopics())
).map(i -> checkConsumerGroup(cluster, i))
)
)
).map(kafkaConnectMapper::fullConnectorInfo)
Expand All @@ -142,10 +145,10 @@ public Flux<KafkaConnectState> scrapeAllConnects(KafkaCluster cluster) {
return Flux.fromIterable(connectClusters.orElse(List.of())).flatMap(c ->
getClusterInfo(cluster, c.getName()).map(info ->
kafkaConnectMapper.toKafkaConnect(c, List.of(), info, false)
).onErrorResume((t) -> Mono.just(new ConnectDTO().name(c.getName())))
).onErrorResume((_) -> Mono.just(new ConnectDTO().name(c.getName())))
).flatMap(connect ->
getConnectorsWithErrorsSuppress(cluster, connect.getName())
.onErrorResume(t -> Mono.just(Map.of()))
.onErrorResume(_ -> Mono.just(Map.of()))
.flatMapMany(connectors ->
Flux.fromIterable(connectors.entrySet())
.flatMap(e ->
Expand All @@ -154,7 +157,7 @@ public Flux<KafkaConnectState> scrapeAllConnects(KafkaCluster cluster) {
connect.getName(),
e.getKey()
).map(topics ->
kafkaConnectMapper.fromClient(connect.getName(), e.getValue(), topics.getTopics())
kafkaConnectMapper.fromClient(connect, e.getValue(), topics.getTopics())
)
)
).collectList().map(connectors -> kafkaConnectMapper.toScrapeState(connect, connectors))
Expand All @@ -178,7 +181,7 @@ public Mono<ConnectorTopics> getConnectorTopics(KafkaCluster cluster, String con
.map(result -> result.get(connectorName))
// old Connect API versions don't have this endpoint, setting empty list for
// backward-compatibility
.onErrorResume(Exception.class, e -> Mono.just(new ConnectorTopics().topics(List.of())));
.onErrorResume(Exception.class, _ -> Mono.just(new ConnectorTopics().topics(List.of())));
}

public Mono<Map<String, ExpandedConnector>> getConnectors(KafkaCluster cluster, String connectName) {
Expand Down Expand Up @@ -228,17 +231,18 @@ public Mono<ConnectorDTO> getConnector(KafkaCluster cluster, String connectName,
client.getConnector(connectorName),
getConnectorTopics(cluster, connectName, connectorName),
client.getConnectorStatus(connectorName).onErrorResume(WebClientResponseException.NotFound.class,
e -> emptyStatus(connectorName))
_ -> emptyStatus(connectorName))
)
.map(t ->
kafkaConnectMapper.fromClient(
t.getT1(),
cluster.getConnectsConfigs().get(connectName),
connectName,
t.getT2(),
kafkaConfigSanitizer.sanitizeConnectorConfig(t.getT1().getConfig()),
t.getT3()
)
)
).map(c -> checkConsumerGroup(cluster, c))
);
}

Expand All @@ -263,7 +267,7 @@ public Mono<ConnectorDTO> setConnectorConfig(KafkaCluster cluster, String connec
.mono(c ->
requestBody
.flatMap(body -> c.setConnectorConfig(connectorName, body))
.map(connector -> kafkaConnectMapper.fromClient(connector)));
.map(kafkaConnectMapper::fromClient));
}

public Mono<Void> deleteConnector(
Expand All @@ -277,7 +281,7 @@ public Mono<Void> updateConnectorState(KafkaCluster cluster, String connectName,
return api(cluster, connectName)
.mono(client -> switch (action) {
case RESTART -> client.restartConnector(connectorName, false, false);
case RESTART_ALL_TASKS -> restartTasks(cluster, connectName, connectorName, task -> true);
case RESTART_ALL_TASKS -> restartTasks(cluster, connectName, connectorName, _ -> true);
case RESTART_FAILED_TASKS -> restartTasks(cluster, connectName, connectorName,
t -> t.getStatus().getState() == ConnectorTaskStatusDTO.FAILED);
case PAUSE -> client.pauseConnector(connectorName);
Expand All @@ -303,13 +307,13 @@ public Flux<TaskDTO> getConnectorTasks(KafkaCluster cluster, String connectName,
return api(cluster, connectName)
.flux(client ->
client.getConnectorTasks(connectorName)
.onErrorResume(WebClientResponseException.NotFound.class, e -> Flux.empty())
.onErrorResume(WebClientResponseException.NotFound.class, _ -> Flux.empty())
.map(kafkaConnectMapper::fromClient)
.flatMap(task ->
client
.getConnectorTaskStatus(connectorName,
Optional.ofNullable(task.getId()).map(TaskIdDTO::getTask).orElseThrow()
).onErrorResume(WebClientResponseException.NotFound.class, e -> Mono.empty())
).onErrorResume(WebClientResponseException.NotFound.class, _ -> Mono.empty())
.map(kafkaConnectMapper::fromClient)
.map(task::status)
));
Expand Down Expand Up @@ -352,11 +356,11 @@ public Mono<Void> resetConnectorOffsets(KafkaCluster cluster, String connectName
return api(cluster, connectName)
.mono(client -> client.resetConnectorOffsets(connectorName))
.onErrorResume(WebClientResponseException.NotFound.class,
e -> {
_ -> {
throw new NotFoundException("Connector %s not found in %s".formatted(connectorName, connectName));
})
.onErrorResume(WebClientResponseException.BadRequest.class,
e -> {
_ -> {
throw new ConnectorOffsetsResetException(
"Failed to reset offsets of connector %s of %s. Make sure it is STOPPED first."
.formatted(connectorName, connectName));
Expand All @@ -381,11 +385,44 @@ public Flux<FullConnectorInfoDTO> getTopicConnectors(KafkaCluster cluster, Strin
connectors.entrySet()
.stream()
.filter(c -> entry.getValue().contains(c.getKey()))
.map(c -> kafkaConnectMapper.fromClient(entry.getKey(), c.getValue(), null))
.map(kafkaConnectMapper::fullConnectorInfo)
.toList()
.map(c ->
kafkaConnectMapper.fromClient(
cluster.getConnectsConfigs().get(entry.getKey()), c.getValue(), null)
).map(i ->
checkConsumerGroup(cluster, i)
).map(kafkaConnectMapper::fullConnectorInfo).toList()
)
).flatMap(Flux::fromIterable);
}

private InternalConnectorInfo checkConsumerGroup(KafkaCluster cluster, InternalConnectorInfo info) {
if (info.getConsumer() == null) {
return info;
}

return info.toBuilder().consumer(
getConsumerGroup(cluster, info.getConsumer()).orElse(null)
).build();
}

private ConnectorDTO checkConsumerGroup(KafkaCluster cluster, ConnectorDTO dto) {
if (dto.getConsumer() == null && dto.getConsumer().isPresent()) {
return dto;
}

dto.setConsumer(
getConsumerGroup(cluster, dto.getConsumer().get())
.map(JsonNullable::of)
.orElse(JsonNullable.undefined())
);
return dto;
}

private Optional<String> getConsumerGroup(KafkaCluster cluster, String consumerGroupName) {
return Optional.ofNullable(statisticsCache.get(cluster))
.map(Statistics::getClusterState)
.map(ScrapedClusterState::getConsumerGroupsStates)
.map(cg -> cg.get(consumerGroupName))
.map(ScrapedClusterState.ConsumerGroupState::group);
}
}
28 changes: 23 additions & 5 deletions api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.kafbat.ui.model.ConnectorStateDTO;
import io.kafbat.ui.model.ConnectorStatusDTO;
import io.kafbat.ui.model.ConnectorTypeDTO;
import io.kafbat.ui.model.FullConnectorInfoDTO;
import io.kafbat.ui.model.InternalTopic;
import io.kafbat.ui.model.KafkaCluster;
import io.kafbat.ui.model.NewConnectorDTO;
Expand All @@ -23,11 +24,14 @@
import io.kafbat.ui.service.TopicsService;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.openapitools.jackson.nullable.JsonNullable;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.test.web.reactive.server.WebTestClient;
Expand Down Expand Up @@ -83,8 +87,15 @@ void setUp() {
"test.password", "test-credentials")))
.exchange()
.expectStatus().isOk();
// Force cache refresh
statisticsService.updateCache(kafkaCluster).block();

// Force cache refresh and wait for the consumer to start
Awaitility.await().until(() ->
statisticsService.updateCache(kafkaCluster).map(v ->
Optional.ofNullable(
v.getClusterState().getConsumerGroupsStates().get("connect-" + connectorName)
).isPresent()
).block()
);
}

@AfterEach
Expand All @@ -102,9 +113,15 @@ void shouldListAllConnectors() {
.uri("/api/clusters/{clusterName}/connectors", LOCAL)
.exchange()
.expectStatus().isOk()
.expectBody()
.jsonPath(String.format("$[?(@.name == '%s')]", connectorName))
.exists();
.expectBodyList(FullConnectorInfoDTO.class)
.value(connectors -> {
assertThat(connectors)
.anyMatch(connector -> connector.getName().equals(connectorName));
FullConnectorInfoDTO createdConnector =
connectors.stream().filter(connector -> connector.getName().equals(connectorName))
.findFirst().orElseThrow();
assertThat(createdConnector.getConsumer()).isEqualTo(JsonNullable.of("connect-" + connectorName));
});
}

@Test
Expand Down Expand Up @@ -203,6 +220,7 @@ void shouldRetrieveConnector() {
.task(0)))
.type(ConnectorTypeDTO.SINK)
.name(connectorName)
.consumer("connect-" + connectorName)
.config(config);
webTestClient.get()
.uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL,
Expand Down
Loading
Loading