blob: c5bfa231bbe8769cdb17f179e54d0cb2c41b2efa [file] [log] [blame]
tomcbff9392014-09-10 00:45:23 -07001package org.onlab.onos.event;
2
3import com.google.common.collect.Lists;
4
5import java.util.List;
6import java.util.Timer;
7import java.util.TimerTask;
8
9import static com.google.common.base.Preconditions.checkArgument;
10import static com.google.common.base.Preconditions.checkNotNull;
11
12/**
13 * Base implementation of an event accumulator. It allows triggering based on
14 * event inter-arrival time threshold, maximum batch life threshold and maximum
15 * batch size.
16 */
17public abstract class AbstractEventAccumulator implements EventAccumulator {
18
19 private final Timer timer;
20 private final int maxEvents;
21 private final int maxBatchMillis;
22 private final int maxIdleMillis;
23
24 private TimerTask idleTask = new ProcessorTask();
25 private TimerTask maxTask = new ProcessorTask();
26
27 private List<Event> events = Lists.newArrayList();
28
29 /**
30 * Creates an event accumulator capable of triggering on the specified
31 * thresholds.
32 *
33 * @param timer timer to use for scheduling check-points
34 * @param maxEvents maximum number of events to accumulate before
35 * processing is triggered
36 * @param maxBatchMillis maximum number of millis allowed since the first
37 * event before processing is triggered
38 * @param maxIdleMillis maximum number millis between events before
39 * processing is triggered
40 */
41 protected AbstractEventAccumulator(Timer timer, int maxEvents,
42 int maxBatchMillis, int maxIdleMillis) {
43 this.timer = checkNotNull(timer, "Timer cannot be null");
44
45 checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
46 checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
47 checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
48
49 this.maxEvents = maxEvents;
50 this.maxBatchMillis = maxBatchMillis;
51 this.maxIdleMillis = maxIdleMillis;
52 }
53
54 @Override
55 public void add(Event event) {
56 idleTask = cancelIfActive(idleTask);
57 events.add(event);
58
59 // Did we hit the max event threshold?
60 if (events.size() == maxEvents) {
61 maxTask = cancelIfActive(maxTask);
62 schedule(1);
63 } else {
64 // Otherwise, schedule idle task and if this is a first event
65 // also schedule the max batch age task.
66 idleTask = schedule(maxIdleMillis);
67 if (events.size() == 1) {
68 maxTask = schedule(maxBatchMillis);
69 }
70 }
71 }
72
73 // Schedules a new processor task given number of millis in the future.
74 private TimerTask schedule(int millis) {
75 TimerTask task = new ProcessorTask();
76 timer.schedule(task, millis);
77 return task;
78 }
79
80 // Cancels the specified task if it is active.
81 private TimerTask cancelIfActive(TimerTask task) {
82 if (task != null) {
83 task.cancel();
84 }
85 return task;
86 }
87
88 // Task for triggering processing of accumulated events
89 private class ProcessorTask extends TimerTask {
90 @Override
91 public void run() {
92 idleTask = cancelIfActive(idleTask);
93 maxTask = cancelIfActive(maxTask);
94 processEvents(finalizeCurrentBatch());
95 }
96 }
97
98 // Demotes and returns the current batch of events and promotes a new one.
99 private synchronized List<Event> finalizeCurrentBatch() {
100 List<Event> toBeProcessed = events;
101 events = Lists.newArrayList();
102 return toBeProcessed;
103 }
104
105 /**
106 * Returns the backing timer.
107 *
108 * @return backing timer
109 */
110 public Timer timer() {
111 return timer;
112 }
113
114 /**
115 * Returns the maximum number of events allowed to accumulate before
116 * processing is triggered.
117 *
118 * @return max number of events
119 */
120 public int maxEvents() {
121 return maxEvents;
122 }
123
124 /**
125 * Returns the maximum number of millis allowed to expire since the first
126 * event before processing is triggered.
127 *
128 * @return max number of millis a batch is allowed to last
129 */
130 public int maxBatchMillis() {
131 return maxBatchMillis;
132 }
133
134 /**
135 * Returns the maximum number of millis allowed to expire since the last
136 * event arrival before processing is triggered.
137 *
138 * @return max number of millis since the last event
139 */
140 public int maxIdleMillis() {
141 return maxIdleMillis;
142 }
143}