This repository was archived by the owner on Oct 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathFlinkDSPMain.java
More file actions
83 lines (70 loc) · 2.97 KB
/
FlinkDSPMain.java
File metadata and controls
83 lines (70 loc) · 2.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import flink_dsp.query1.Query1TopologyBuilder;
import flink_dsp.query2.Query2TopologyBuilder;
import flink_dsp.query3.Query3TopologyBuilder;
import kafka_pubsub.KafkaClusterConfig;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import scala.Tuple2;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Locale;
import java.util.Properties;
import static kafka_pubsub.KafkaClusterConfig.FLINK_TOPIC;
/**
* Class used to start Flink data stream processing
*/
public class FlinkDSPMain {
private static final String CONSUMER_GROUP_ID = "single-flink-consumer";
public static void main(String[] args) {
// setup flink environment
Configuration conf = new Configuration();
StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// add the source and handle watermarks
Properties props = KafkaClusterConfig.getFlinkSourceProperties(CONSUMER_GROUP_ID);
DataStream<Tuple2<Long, String>> stream = environment
.addSource(new FlinkKafkaConsumer<>(FLINK_TOPIC, new SimpleStringSchema(), props))
// extract event timestamp and set it as key
.flatMap(new FlatMapFunction<String, Tuple2<Long, String>>() {
@Override
public void flatMap(String s, Collector<Tuple2<Long, String>> collector) {
String[] info = s.split(";(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1);
DateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS", Locale.US);
try {
collector.collect(new Tuple2<>(format.parse(info[7]).getTime(), s));
} catch (ParseException ignored) {
}
}
})
// assign timestamp to every tuple to enable watermarking system
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<Long, String>>() {
@Override
public long extractAscendingTimestamp(Tuple2<Long, String> tuple) {
// specify event time
// kafka's auto-watermarks generation is only related to offset not to event time
return tuple._1();
}
})
.name("stream-source");
//build query 1 topology
Query1TopologyBuilder.buildTopology(stream);
//build query 2 topology
Query2TopologyBuilder.buildTopology(stream);
// build query 3 topology
Query3TopologyBuilder.buildTopology(stream);
try {
//execute the environment for DSP
environment.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
}