Authors: Marco Balletti, Francesco Marino
Folder containing the input dataset as a CSV file (dataset.csv).
Folder containing scripts and file for a container based execution of the project architecture:
start-dockers.shcreates the Kafka Cluster and necessary Kafka topics,stop-dockers.shstops and deletes the Kafka Cluster after the created topics deletion anddocker-compose.ymlis the Docker Compose file used to create the container infrastructure.
Folder containing benchmark results (under Benchmark directory), project report and presentation slides.
Folder containing Flink computation results as CSV files:
query1_daily.csvcontaining the output of the first query evaluated by daily windows,query1_weekly.csvcontaining the output of the first query evaluated by weekly windows,query1_monthly.csvcontaining the output of the first query evaluated by monthly windows,query2_daily.csvcontaining the output of the second query evaluated by daily windows,query2_weekly.csvcontaining the output of the second query evaluated by weekly windows,query3_daily.csvcontaining the output of the third query evaluated by daily windows andquery3_weekly.csvcontaining the output of the third query evaluated by weekly windows.
Results are evaluated from the entire dataset content
This directory contains in its subdirectories Java code for:
- creation of Kafka Topic producer for input data,
- creation of a Flink topology to run a DSP analysis of the three queries,
- creation of a Kafka Streams topology to run an alternative DSP analysis of the same three queries and
- creation of several Kafka topic consumers for DSP output saving.
It is recommended to open the entire directory with an IDE for better code navigation. Java project part was developed using JetBrains' IntelliJ IDEA.
In the main folder there are processing architecture launchers:
ConsumersLauncher.javathat launches consumers for Kafka Streams and Flink outputs,FlinkDSPMain.javathat starts Flink data stream processing,KafkaStreamsDSPMain.javathat starts Kafka Streams processing andProducerLauncher.javaused to start a producer that reads from file and publish tuples to Kafka topics simulating a real time data source.
This package contains classes for queries' topologies building and execution using Flink as DSP framework.
AverageDelayAggregator.javaused to aggregate data for the first query using daily, weekly and monthly windows,AverageDelayOutcome.javarepresenting the aggregation result,AverageDelayProcessWindow.javaused to set correctly windows' start times,MonthlyWindowAssigner.javacontains a custom thumbling window assigner for tuples separation by event time month (this was necessary due to differences in month durations) andQuery1TopologyBuilder.javathat builds the topology of the first query.
ReasonRankingAggregator.javaused to aggregate data for the second query using daily and weekly windows,ReasonRankingOutcome.javarepresenting the aggregation result,ReasonRankingProcessWindow.javaused to set correctly windows' start times andQuery2TopologyBuilder.javathat builds the topology of the second query.
CompanyRankingAggregator.javaused to aggregate data for the third query using daily and weekly windows,CompanyRankingOutcome.javarepresenting the aggregation result,CompanyRankingProcessWindow.javaused to set correctly windows' start times andQuery3TopologyBuilder.javathat builds the topology of the third query.
This package contains configurations for the Kafka publish-subscribe service and classes for Consumers and Producers instantiation:
KafkaClusterConfig.javacontaining topics name and properties builders (for publishers and subscribers),KafkaParametricConsumer.javaused to create and start consumers registered to Kafka topics (of DSP outputs) andKafkaSingleProducer.javacreates a producer that publishes DSP input tuples to Kafka topics.
This package contains classes for queries' topologies building and execution using Kafka Streams as DSP library and the KafkaStreamsConfig.java used to get properties for the stream processing library execution.
This package contains classes for queries' topologies creation:
Query1TopologyBuilder.javathat builds the topology of the first query,Query2TopologyBuilder.javathat builds the topology of the second query andQuery3TopologyBuilder.javathat builds the topology of the third query.
This package contains custom Kafka Streams windows:
CustomTimeWindows.javathat is an abstract class representing a generic custom duration time window,DailyTimeWindows.javathat implements a daily time window aligned to a given time zone,MonthlyTimeWindows.javathat implements a monthly time window (aligned to the first day of a month in a given time zone) andWeeklyTimeWindows.javaimplementing a weekly time window (starts on Monday and ends on Sunday aligned to a given time zone).
This package contains classes needed for queries' execution support, in particular:
BusData.javastructure representing tuple information needed for evaluation,DataCommonTransformation.javacontaining common method needed for queries processing andOutputFormatter.javaneeded for query outcomes formatting in order to be published on Kafka.
This package contains classes used as accumulators for both Flink and Kafka Streams processing:
AverageDelayAccumulator.javaused for average delay statistics grouped by neighbourhood (first query),AverageDelayStatistics.javaused to maintain information about per neighbourhood delay (first query),CompanyRankingAccumulator.javaused for company name ranking on delay basis (third query) andReasonRankingAccumulator.javaneeded for delay reason rankings (second query).
This package contains utilities for latency and throughput evaluation:
BenchmarkFlinkSink.javarepresenting a sink that can be used in Flink topology to evaluate performances andSynchronizedCounter.javathat is a static counter for benchmark evaluation (counts tuples and time).
This package contains utilities for delay string parsing and delay type ranking:
DelayFixes.javathis is an Enum for wrongly converted string correction in the dataset,DelayFormatException.javathat is a custom Java Exception for failure on gaining information from delay strings,DealyInfo.javarepresenting a single parsed delay information,DelayParsingUtility.javathat contains delay strings parsing logic andDelayScorer.javaused to assign a score on delay and reason basis (third query).
This package contains data serialization and deserialization utilities:
FlinkStringToKafkaSerializer.javaneeded to serialize Flink output strings for publication Kafka topics,JsonPOJODeserializer.javaused to deserialize custom object from JSON format,JsonPOJOSerializer.javaused to serialize custom object to JSON format andSerDesBuilders.javaused to build ser-des for Kafka Streams.