4545import javax.inject.Inject;
4646
4747import java.util.ArrayList;
48- import java.util.Arrays;
4948import java.util.Collections;
5049import java.util.HashMap;
5150import java.util.List;
5251import java.util.Map;
5352import java.util.Properties;
5453import java.util.concurrent.Future;
55- import java.util.stream.Stream;
5654
5755import static org.apache.atlas.security.SecurityProperties.TLS_ENABLED;
5856import static org.apache.atlas.security.SecurityProperties.TRUSTSTORE_PASSWORD_KEY;
@@ -83,7 +81,7 @@ public class KafkaNotification extends AbstractNotification implements Service {
8381 private static final boolean SORT_NOT_NEEDED = false;
8482
8583 private static final Map<NotificationType, String> PRODUCER_TOPIC_MAP = new HashMap<>();
86- private static Map<NotificationType, String[] > CONSUMER_TOPICS_MAP = new HashMap<>();
84+ private static final Map<NotificationType, List< String> > CONSUMER_TOPICS_MAP = new HashMap<>();
8785
8886 private final Properties properties;
8987 private final Long pollTimeOutMs;
@@ -154,7 +152,7 @@ protected KafkaNotification(Properties properties) {
154152 LOG.info("<== KafkaNotification()");
155153 }
156154
157- public static String[] trimAndPurge(String[] strings) {
155+ public static List< String> trimAndPurge(String[] strings) {
158156 List<String> ret = new ArrayList<>();
159157
160158 if (strings != null) {
@@ -167,7 +165,7 @@ public static String[] trimAndPurge(String[] strings) {
167165 }
168166 }
169167
170- return ret.toArray(new String[ret.size()]) ;
168+ return ret;
171169 }
172170
173171 @Override
@@ -188,10 +186,7 @@ public void stop() {
188186
189187 @Override
190188 public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers) {
191- boolean enableAutoCommit = Boolean.parseBoolean(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable", "false")));
192- if (notificationType.equals(NotificationType.ASYNC_IMPORT)) {
193- enableAutoCommit = true;
194- }
189+ boolean enableAutoCommit = notificationType.equals(NotificationType.ASYNC_IMPORT) || Boolean.parseBoolean(properties.getProperty("enable.auto.commit", properties.getProperty("auto.commit.enable", "false")));
195190 return createConsumers(notificationType, numConsumers, enableAutoCommit);
196191 }
197192
@@ -215,13 +210,18 @@ public void close() {
215210 }
216211
217212 @Override
218- public void closeConsumer(NotificationType notificationType) {
219- List<KafkaConsumer> notificationConsumers = this.consumers.get(notificationType);
220- for (final KafkaConsumer consumer : notificationConsumers) {
221- consumer.unsubscribe();
222- consumer.close();
223- }
224- this.consumers.remove(notificationType);
213+ public void closeConsumer(NotificationType notificationTypeToClose, String topic) {
214+ this.consumers.computeIfPresent(notificationTypeToClose, (notificationType, notificationConsumers) -> {
215+ notificationConsumers.removeIf(consumer -> {
216+ if (consumer.subscription().contains(topic)) {
217+ consumer.unsubscribe();
218+ consumer.close();
219+ return true;
220+ }
221+ return false;
222+ });
223+ return notificationConsumers.isEmpty() ? null : notificationConsumers;
224+ });
225225 }
226226
227227 // ----- NotificationInterface -------------------------------------------
@@ -243,16 +243,16 @@ public boolean isReady(NotificationType notificationType) {
243243 public <T> List<NotificationConsumer<T>> createConsumers(NotificationType notificationType, int numConsumers, boolean autoCommitEnabled) {
244244 LOG.info("==> KafkaNotification.createConsumers(notificationType={}, numConsumers={}, autoCommitEnabled={})", notificationType, numConsumers, autoCommitEnabled);
245245
246- String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
246+ List< String> topics = CONSUMER_TOPICS_MAP.get(notificationType);
247247
248- if (numConsumers < topics.length ) {
249- LOG.warn("consumers count {} is fewer than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics.", numConsumers, topics.length , topics.length );
248+ if (numConsumers < topics.size() ) {
249+ LOG.warn("consumers count {} is fewer than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics.", numConsumers, topics.size() , topics.size() );
250250
251- numConsumers = topics.length ;
252- } else if (numConsumers > topics.length ) {
253- LOG.warn("consumers count {} is higher than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics", numConsumers, topics.length , topics.length );
251+ numConsumers = topics.size() ;
252+ } else if (numConsumers > topics.size() ) {
253+ LOG.warn("consumers count {} is higher than number of topics {}. Creating {} consumers, so that consumer count is equal to number of topics", numConsumers, topics.size() , topics.size() );
254254
255- numConsumers = topics.length ;
255+ numConsumers = topics.size() ;
256256 }
257257
258258 List<KafkaConsumer> notificationConsumers = this.consumers.get(notificationType);
@@ -318,11 +318,7 @@ public Properties getConsumerProperties(NotificationType notificationType) {
318318 String groupId = properties.getProperty(notificationType.toString().toLowerCase() + "." + CONSUMER_GROUP_ID_PROPERTY);
319319
320320 if (StringUtils.isEmpty(groupId)) {
321- if (!notificationType.equals(NotificationType.ASYNC_IMPORT)) {
322- groupId = "atlas";
323- } else {
324- groupId = "atlas-import";
325- }
321+ groupId = notificationType.equals(NotificationType.ASYNC_IMPORT) ? "atlas-import" : "atlas";
326322 }
327323
328324 if (StringUtils.isEmpty(groupId)) {
@@ -343,8 +339,8 @@ public KafkaConsumer getOrCreateKafkaConsumer(KafkaConsumer existingConsumer, Pr
343339
344340 try {
345341 if (ret == null || !isKafkaConsumerOpen(ret)) {
346- String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
347- String topic = topics[ idxConsumer % topics.length] ;
342+ List< String> topics = CONSUMER_TOPICS_MAP.get(notificationType);
343+ String topic = topics.get( idxConsumer % topics.size()) ;
348344
349345 LOG.debug("Creating new KafkaConsumer for topic : {}, index : {}", topic, idxConsumer);
350346
@@ -452,39 +448,29 @@ private KafkaProducer getOrCreateProducerByCriteria(Object producerCriteria, Map
452448
453449 @Override
454450 public void addTopicToNotificationType(NotificationType notificationType, String topic) {
455- String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
456- String[] updatedTopics;
457- if (topics == null) {
458- updatedTopics = new String[] {topic};
459- } else {
460- updatedTopics = Stream.concat(Arrays.stream(topics), Stream.of(topic)).toArray(String[]::new);
461- }
462- CONSUMER_TOPICS_MAP.put(notificationType, updatedTopics);
451+ CONSUMER_TOPICS_MAP.computeIfAbsent(notificationType, k -> new ArrayList<>()).add(topic);
463452 }
464453
465454 @Override
466455 public void closeProducer(NotificationType notificationType, String topic) {
467- KafkaProducer producerToClose = producersByTopic.get(topic);
468- if (producerToClose != null) {
469- producersByTopic.remove(topic);
470- producerToClose.close();
471- }
456+ producersByTopic.computeIfPresent(topic, (key, producer) -> {
457+ // Close the KafkaProducer before removal
458+ producer.close();
459+ // Returning null removes the key from the map
460+ return null;
461+ });
472462 PRODUCER_TOPIC_MAP.remove(notificationType, topic);
473463 }
474464
475465 @Override
476- public void deleteTopics (NotificationType notificationType, String topicName) {
466+ public void deleteTopic (NotificationType notificationType, String topicName) {
477467 try (AdminClient adminClient = AdminClient.create(this.properties)) {
478468 adminClient.deleteTopics(Collections.singleton(topicName));
479469 }
480- String[] topics = CONSUMER_TOPICS_MAP.get(notificationType);
481- String[] updatedTopics;
482- if (topics == null) {
483- updatedTopics = new String[] {};
484- } else {
485- updatedTopics = Arrays.stream(topics).filter(topic -> !topic.equals(topicName)).toArray(String[]::new);
486- }
487- CONSUMER_TOPICS_MAP.put(notificationType, updatedTopics);
470+ CONSUMER_TOPICS_MAP.computeIfPresent(notificationType, (key, topics) -> {
471+ topics.remove(topicName);
472+ return topics.isEmpty() ? null : topics;
473+ });
488474 }
489475
490476 // kafka-client doesn't have method to check if consumer is open, hence checking list topics and catching exception
0 commit comments