blob: f8666334ccb8f5f75fac737b18a759c74919fe75 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
tomcbff9392014-09-10 00:45:23 -070016package org.onlab.onos.event;
17
18import com.google.common.collect.Lists;
19
20import java.util.List;
21import java.util.Timer;
22import java.util.TimerTask;
23
24import static com.google.common.base.Preconditions.checkArgument;
25import static com.google.common.base.Preconditions.checkNotNull;
26
27/**
28 * Base implementation of an event accumulator. It allows triggering based on
29 * event inter-arrival time threshold, maximum batch life threshold and maximum
30 * batch size.
31 */
32public abstract class AbstractEventAccumulator implements EventAccumulator {
33
34 private final Timer timer;
35 private final int maxEvents;
36 private final int maxBatchMillis;
37 private final int maxIdleMillis;
38
39 private TimerTask idleTask = new ProcessorTask();
40 private TimerTask maxTask = new ProcessorTask();
41
42 private List<Event> events = Lists.newArrayList();
43
44 /**
45 * Creates an event accumulator capable of triggering on the specified
46 * thresholds.
47 *
48 * @param timer timer to use for scheduling check-points
49 * @param maxEvents maximum number of events to accumulate before
50 * processing is triggered
51 * @param maxBatchMillis maximum number of millis allowed since the first
52 * event before processing is triggered
53 * @param maxIdleMillis maximum number millis between events before
54 * processing is triggered
55 */
56 protected AbstractEventAccumulator(Timer timer, int maxEvents,
57 int maxBatchMillis, int maxIdleMillis) {
58 this.timer = checkNotNull(timer, "Timer cannot be null");
59
60 checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
61 checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
62 checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
63
64 this.maxEvents = maxEvents;
65 this.maxBatchMillis = maxBatchMillis;
66 this.maxIdleMillis = maxIdleMillis;
67 }
68
69 @Override
70 public void add(Event event) {
71 idleTask = cancelIfActive(idleTask);
72 events.add(event);
73
74 // Did we hit the max event threshold?
75 if (events.size() == maxEvents) {
76 maxTask = cancelIfActive(maxTask);
77 schedule(1);
78 } else {
79 // Otherwise, schedule idle task and if this is a first event
80 // also schedule the max batch age task.
81 idleTask = schedule(maxIdleMillis);
82 if (events.size() == 1) {
83 maxTask = schedule(maxBatchMillis);
84 }
85 }
86 }
87
88 // Schedules a new processor task given number of millis in the future.
89 private TimerTask schedule(int millis) {
90 TimerTask task = new ProcessorTask();
91 timer.schedule(task, millis);
92 return task;
93 }
94
95 // Cancels the specified task if it is active.
96 private TimerTask cancelIfActive(TimerTask task) {
97 if (task != null) {
98 task.cancel();
99 }
100 return task;
101 }
102
103 // Task for triggering processing of accumulated events
104 private class ProcessorTask extends TimerTask {
105 @Override
106 public void run() {
107 idleTask = cancelIfActive(idleTask);
108 maxTask = cancelIfActive(maxTask);
109 processEvents(finalizeCurrentBatch());
110 }
111 }
112
113 // Demotes and returns the current batch of events and promotes a new one.
114 private synchronized List<Event> finalizeCurrentBatch() {
115 List<Event> toBeProcessed = events;
116 events = Lists.newArrayList();
117 return toBeProcessed;
118 }
119
120 /**
121 * Returns the backing timer.
122 *
123 * @return backing timer
124 */
125 public Timer timer() {
126 return timer;
127 }
128
129 /**
130 * Returns the maximum number of events allowed to accumulate before
131 * processing is triggered.
132 *
133 * @return max number of events
134 */
135 public int maxEvents() {
136 return maxEvents;
137 }
138
139 /**
140 * Returns the maximum number of millis allowed to expire since the first
141 * event before processing is triggered.
142 *
143 * @return max number of millis a batch is allowed to last
144 */
145 public int maxBatchMillis() {
146 return maxBatchMillis;
147 }
148
149 /**
150 * Returns the maximum number of millis allowed to expire since the last
151 * event arrival before processing is triggered.
152 *
153 * @return max number of millis since the last event
154 */
155 public int maxIdleMillis() {
156 return maxIdleMillis;
157 }
158}