66 * to you under the Apache License, Version 2.0 (the
77 * "License"); you may not use this file except in compliance
88 * with the License. You may obtain a copy of the License at
9- *
10- * http://www.apache.org/licenses/LICENSE-2.0
11- *
9+ * <p>
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ * <p>
1212 * Unless required by applicable law or agreed to in writing, software
1313 * distributed under the License is distributed on an "AS IS" BASIS,
1414 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
2121import org .apache .atlas .notification .AtlasNotificationMessageDeserializer ;
2222import org .apache .atlas .notification .NotificationInterface ;
2323import org .apache .commons .collections .MapUtils ;
24+ import org .apache .kafka .clients .consumer .ConsumerRecord ;
25+ import org .apache .kafka .clients .consumer .ConsumerRecords ;
26+ import org .apache .kafka .clients .consumer .KafkaConsumer ;
27+ import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
28+ import org .apache .kafka .common .TopicPartition ;
2429import org .slf4j .Logger ;
2530import org .slf4j .LoggerFactory ;
2631
2934import java .util .List ;
3035import java .util .Map ;
3136
32- import org .apache .kafka .clients .consumer .ConsumerRecord ;
33- import org .apache .kafka .clients .consumer .ConsumerRecords ;
34- import org .apache .kafka .clients .consumer .KafkaConsumer ;
35- import org .apache .kafka .common .TopicPartition ;
36- import org .apache .kafka .clients .consumer .OffsetAndMetadata ;
37-
3837/**
3938 * Kafka specific notification consumer.
4039 *
@@ -45,7 +44,7 @@ public class AtlasKafkaConsumer<T> extends AbstractNotificationConsumer<T> {
4544
4645 private final KafkaConsumer kafkaConsumer ;
4746 private final boolean autoCommitEnabled ;
48- private long pollTimeoutMilliSeconds = 1000L ;
47+ private final long pollTimeoutMilliSeconds ;
4948
5049 public AtlasKafkaConsumer (NotificationInterface .NotificationType notificationType , KafkaConsumer kafkaConsumer , boolean autoCommitEnabled , long pollTimeoutMilliSeconds ) {
5150 this (notificationType .getDeserializer (), kafkaConsumer , autoCommitEnabled , pollTimeoutMilliSeconds );
@@ -59,32 +58,11 @@ public AtlasKafkaConsumer(AtlasNotificationMessageDeserializer<T> deserializer,
5958 this .pollTimeoutMilliSeconds = pollTimeoutMilliSeconds ;
6059 }
6160
62- public List <AtlasKafkaMessage <T >> receive () {
63- return this .receive (this .pollTimeoutMilliSeconds );
64- }
65-
66- @ Override
67- public List <AtlasKafkaMessage <T >> receive (long timeoutMilliSeconds ) {
68- return receive (this .pollTimeoutMilliSeconds , null );
69- }
70-
71- @ Override
72- public List <AtlasKafkaMessage <T >> receiveWithCheckedCommit (Map <TopicPartition , Long > lastCommittedPartitionOffset ) {
73- return receive (this .pollTimeoutMilliSeconds , lastCommittedPartitionOffset );
74- }
75-
76- @ Override
77- public List <AtlasKafkaMessage <T >> receiveRawRecordsWithCheckedCommit (Map <TopicPartition , Long > lastCommittedPartitionOffset ) {
78- return receiveRawRecords (this .pollTimeoutMilliSeconds , lastCommittedPartitionOffset );
79- }
80-
81-
8261 @ Override
8362 public void commit (TopicPartition partition , long offset ) {
8463 if (!autoCommitEnabled ) {
85- if (LOG .isDebugEnabled ()) {
86- LOG .info (" commiting the offset ==>> " + offset );
87- }
64+ LOG .debug (" commiting the offset ==>> {}" , offset );
65+
8866 kafkaConsumer .commitSync (Collections .singletonMap (partition , new OffsetAndMetadata (offset )));
8967 }
9068 }
@@ -103,6 +81,25 @@ public void wakeup() {
10381 }
10482 }
10583
84+ public List <AtlasKafkaMessage <T >> receive () {
85+ return this .receive (this .pollTimeoutMilliSeconds );
86+ }
87+
88+ @ Override
89+ public List <AtlasKafkaMessage <T >> receive (long timeoutMilliSeconds ) {
90+ return receive (this .pollTimeoutMilliSeconds , null );
91+ }
92+
93+ @ Override
94+ public List <AtlasKafkaMessage <T >> receiveWithCheckedCommit (Map <TopicPartition , Long > lastCommittedPartitionOffset ) {
95+ return receive (this .pollTimeoutMilliSeconds , lastCommittedPartitionOffset );
96+ }
97+
98+ @ Override
99+ public List <AtlasKafkaMessage <T >> receiveRawRecordsWithCheckedCommit (Map <TopicPartition , Long > lastCommittedPartitionOffset ) {
100+ return receiveRawRecords (this .pollTimeoutMilliSeconds , lastCommittedPartitionOffset );
101+ }
102+
106103 private List <AtlasKafkaMessage <T >> receiveRawRecords (long timeoutMilliSeconds , Map <TopicPartition , Long > lastCommittedPartitionOffset ) {
107104 return receive (timeoutMilliSeconds , lastCommittedPartitionOffset , true );
108105 }
@@ -112,7 +109,7 @@ private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPa
112109 }
113110
114111 private List <AtlasKafkaMessage <T >> receive (long timeoutMilliSeconds , Map <TopicPartition , Long > lastCommittedPartitionOffset , boolean isRawDataRequired ) {
115- List <AtlasKafkaMessage <T >> messages = new ArrayList ();
112+ List <AtlasKafkaMessage <T >> messages = new ArrayList <> ();
116113
117114 ConsumerRecords <?, ?> records = kafkaConsumer != null ? kafkaConsumer .poll (timeoutMilliSeconds ) : null ;
118115
@@ -127,10 +124,9 @@ private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPa
127124 if (MapUtils .isNotEmpty (lastCommittedPartitionOffset )
128125 && lastCommittedPartitionOffset .containsKey (topicPartition )
129126 && record .offset () < lastCommittedPartitionOffset .get (topicPartition )) {
130-
131127 commit (topicPartition , record .offset ());
132128 LOG .info ("Skipping already processed message: topic={}, partition={} offset={}. Last processed offset={}" ,
133- record .topic (), record .partition (), record .offset (), lastCommittedPartitionOffset .get (topicPartition ));
129+ record .topic (), record .partition (), record .offset (), lastCommittedPartitionOffset .get (topicPartition ));
134130 continue ;
135131 }
136132
@@ -147,21 +143,18 @@ private List<AtlasKafkaMessage<T>> receive(long timeoutMilliSeconds, Map<TopicPa
147143 continue ;
148144 }
149145
150- AtlasKafkaMessage kafkaMessage = null ;
146+ AtlasKafkaMessage kafkaMessage ;
151147
152148 if (isRawDataRequired ) {
153- kafkaMessage = new AtlasKafkaMessage (message , record .offset (), record .topic (), record .partition (),
154- deserializer .getMsgCreated (), deserializer .getSpooled (), deserializer .getSource (), record .value ().toString ());
149+ kafkaMessage = new AtlasKafkaMessage (message , record .offset (), record .topic (), record .partition (), deserializer .getMsgCreated (), deserializer .getSpooled (), deserializer .getSource (), record .value ().toString ());
155150 } else {
156- kafkaMessage = new AtlasKafkaMessage (message , record .offset (), record .topic (), record .partition (),
157- deserializer .getMsgCreated (), deserializer .getSpooled (), deserializer .getSource ());
151+ kafkaMessage = new AtlasKafkaMessage (message , record .offset (), record .topic (), record .partition (), deserializer .getMsgCreated (), deserializer .getSpooled (), deserializer .getSource ());
158152 }
159153
160154 messages .add (kafkaMessage );
161155 }
162156 }
163157
164158 return messages ;
165-
166159 }
167160}
0 commit comments