This repository was archived by the owner on Oct 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathKafkaClusterConfig.java
More file actions
142 lines (120 loc) · 6.42 KB
/
KafkaClusterConfig.java
File metadata and controls
142 lines (120 loc) · 6.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package kafka_pubsub;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* Class with all the topics' name and the getter for all the kind of properties
*/
public class KafkaClusterConfig {
// topics
public static final String FLINK_TOPIC = "flink-topic";
public static final String FLINK_QUERY_1_DAILY_TOPIC = "flink-output-topic-query1-daily";
public static final String FLINK_QUERY_1_WEEKLY_TOPIC = "flink-output-topic-query1-weekly";
public static final String FLINK_QUERY_1_MONTHLY_TOPIC = "flink-output-topic-query1-monthly";
public static final String FLINK_QUERY_2_DAILY_TOPIC = "flink-output-topic-query2-daily";
public static final String FLINK_QUERY_2_WEEKLY_TOPIC = "flink-output-topic-query2-weekly";
public static final String FLINK_QUERY_3_DAILY_TOPIC = "flink-output-topic-query3-daily";
public static final String FLINK_QUERY_3_WEEKLY_TOPIC = "flink-output-topic-query3-weekly";
public static final String KAFKA_STREAMS_TOPIC = "kafka-streams-topic";
public static final String KAFKA_QUERY_1_DAILY_TOPIC = "kafka-streams-output-topic-query1-daily";
public static final String KAFKA_QUERY_1_WEEKLY_TOPIC = "kafka-streams-output-topic-query1-weekly";
public static final String KAFKA_QUERY_1_MONTHLY_TOPIC = "kafka-streams-output-topic-query1-monthly";
public static final String KAFKA_QUERY_2_DAILY_TOPIC = "kafka-streams-output-topic-query2-daily";
public static final String KAFKA_QUERY_2_WEEKLY_TOPIC = "kafka-streams-output-topic-query2-weekly";
public static final String KAFKA_QUERY_3_DAILY_TOPIC = "kafka-streams-output-topic-query3-daily";
public static final String KAFKA_QUERY_3_WEEKLY_TOPIC = "kafka-streams-output-topic-query3-weekly";
public static final String[] FLINK_TOPICS = {FLINK_QUERY_1_DAILY_TOPIC, FLINK_QUERY_1_WEEKLY_TOPIC,
FLINK_QUERY_1_MONTHLY_TOPIC, FLINK_QUERY_2_DAILY_TOPIC, FLINK_QUERY_2_WEEKLY_TOPIC,
FLINK_QUERY_3_DAILY_TOPIC, FLINK_QUERY_3_WEEKLY_TOPIC};
public static final String[] KAFKA_TOPICS = {KAFKA_QUERY_1_DAILY_TOPIC, KAFKA_QUERY_1_WEEKLY_TOPIC,
KAFKA_QUERY_1_MONTHLY_TOPIC, KAFKA_QUERY_2_DAILY_TOPIC, KAFKA_QUERY_2_WEEKLY_TOPIC,
KAFKA_QUERY_3_DAILY_TOPIC, KAFKA_QUERY_3_WEEKLY_TOPIC};
// if consumer has no offset for the queue starts from the first record
private static final String CONSUMER_FIRST_OFFSET = "earliest";
// for exactly once production
private static final boolean ENABLE_PRODUCER_EXACTLY_ONCE = true;
private static final String ENABLE_CONSUMER_EXACTLY_ONCE = "read_committed";
// brokers
public static final String KAFKA_BROKER_1 = "localhost:9092";
public static final String KAFKA_BROKER_2 = "localhost:9093";
public static final String KAFKA_BROKER_3 = "localhost:9094";
// bootstrap servers
public static final String BOOTSTRAP_SERVERS = KAFKA_BROKER_1 + "," + KAFKA_BROKER_2 + "," + KAFKA_BROKER_3;
/**
* Creates properties for a Kafka Consumer representing the Flink stream source
* @param consumerGroupId id of consumer group
* @return created properties
*/
public static Properties getFlinkSourceProperties(String consumerGroupId) {
Properties props = new Properties();
// specify brokers
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// set consumer group id
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
// start reading from beginning of partition if no offset was created
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, CONSUMER_FIRST_OFFSET);
// exactly once semantic
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, ENABLE_CONSUMER_EXACTLY_ONCE);
// key and value deserializers
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return props;
}
/**
* Creates properties for a Kafka Producer respresenting the one Flink processing sink
* @param producerId producer's id
* @return created properties
*/
public static Properties getFlinkSinkProperties(String producerId) {
Properties props = new Properties();
// specify brokers
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// set producer id
props.put(ProducerConfig.CLIENT_ID_CONFIG, producerId);
// exactly once semantic
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, ENABLE_PRODUCER_EXACTLY_ONCE);
return props;
}
/**
* Creates properties for a Kafka Consumer representing one output subscriber
* @param consumerGroupId id of consumer group
* @return created properties
*/
public static Properties getKafkaParametricConsumerProperties(String consumerGroupId) {
Properties props = new Properties();
// specify brokers
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaClusterConfig.BOOTSTRAP_SERVERS);
// set consumer group id
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
// start reading from beginning of partition if no offset was created
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, KafkaClusterConfig.CONSUMER_FIRST_OFFSET);
// exactly once semantic
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, ENABLE_CONSUMER_EXACTLY_ONCE);
// key and value deserializers
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return props;
}
/**
* Creates properties for a Kafka Producer representing the entire stream processing source
* @param producerId producer's id
* @return created properties
*/
public static Properties getKafkaSingleProducerProperties(String producerId) {
Properties props = new Properties();
// specify brokers
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
// set producer id
props.put(ProducerConfig.CLIENT_ID_CONFIG, producerId);
// exactly once semantic
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, ENABLE_PRODUCER_EXACTLY_ONCE);
// key and value serializers
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return props;
}
}