This repository was archived by the owner on Oct 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathAverageDelayAggregator.java
More file actions
54 lines (47 loc) · 1.83 KB
/
AverageDelayAggregator.java
File metadata and controls
54 lines (47 loc) · 1.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package flink_dsp.query1;
import org.apache.flink.api.common.functions.AggregateFunction;
import utility.BusData;
import utility.accumulators.AverageDelayAccumulator;
import java.util.Date;
/**
* Class used to aggregate data for the first query
*/
public class AverageDelayAggregator implements AggregateFunction<BusData, AverageDelayAccumulator, AverageDelayOutcome> {
/**
* Function that initializes the AverageDelayAccumulator
* @return a new accumulator
*/
public AverageDelayAccumulator createAccumulator() {
return new AverageDelayAccumulator();
}
/**
* Function called to aggregate the busData's information to the accumulator
* @param busData contains all the information to be aggregated
* @param accumulator contains aggregated values so far
* @return updated accumulator
*/
public AverageDelayAccumulator add(BusData busData, AverageDelayAccumulator accumulator) {
accumulator.add(busData.getBoro(), busData.getDelay(), 1L);
return accumulator;
}
/**
* Function called to merge two accumulators, it adds acc2's boro information to acc1
* @param acc1 first accumulator to be merged
* @param acc2 second accumulator to be merged
* @return merged accumulator
*/
public AverageDelayAccumulator merge(AverageDelayAccumulator acc1, AverageDelayAccumulator acc2) {
acc2.getBoroMap().forEach((k, v) -> acc1.add(k, v.getTotal(), v.getCounter()));
return acc1;
}
/**
* Called at the end of the computation, used to gain the result from the accumulator
* @param accumulator containing all the information
* @return an AverageDelayOutcome with the results
*/
public AverageDelayOutcome getResult(AverageDelayAccumulator accumulator) {
AverageDelayOutcome outcome = new AverageDelayOutcome();
accumulator.getBoroMap().forEach((k, v) -> outcome.addMean(k, v.getTotal() / v.getCounter()));
return outcome;
}
}