blob: 39c11034cee296cfb92bf47e1a59bb1b67b0348c [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
tomcbff9392014-09-10 00:45:23 -070016package org.onlab.onos.event;
17
18import com.google.common.collect.Lists;
Thomas Vachuska5bde31f2014-11-25 15:29:18 -080019import org.slf4j.Logger;
20import org.slf4j.LoggerFactory;
tomcbff9392014-09-10 00:45:23 -070021
22import java.util.List;
23import java.util.Timer;
24import java.util.TimerTask;
25
26import static com.google.common.base.Preconditions.checkArgument;
27import static com.google.common.base.Preconditions.checkNotNull;
28
29/**
30 * Base implementation of an event accumulator. It allows triggering based on
31 * event inter-arrival time threshold, maximum batch life threshold and maximum
32 * batch size.
33 */
34public abstract class AbstractEventAccumulator implements EventAccumulator {
35
Thomas Vachuska5bde31f2014-11-25 15:29:18 -080036 private Logger log = LoggerFactory.getLogger(AbstractEventAccumulator.class);
37
tomcbff9392014-09-10 00:45:23 -070038 private final Timer timer;
39 private final int maxEvents;
40 private final int maxBatchMillis;
41 private final int maxIdleMillis;
42
43 private TimerTask idleTask = new ProcessorTask();
44 private TimerTask maxTask = new ProcessorTask();
45
46 private List<Event> events = Lists.newArrayList();
47
48 /**
49 * Creates an event accumulator capable of triggering on the specified
50 * thresholds.
51 *
52 * @param timer timer to use for scheduling check-points
53 * @param maxEvents maximum number of events to accumulate before
54 * processing is triggered
55 * @param maxBatchMillis maximum number of millis allowed since the first
56 * event before processing is triggered
57 * @param maxIdleMillis maximum number millis between events before
58 * processing is triggered
59 */
60 protected AbstractEventAccumulator(Timer timer, int maxEvents,
61 int maxBatchMillis, int maxIdleMillis) {
62 this.timer = checkNotNull(timer, "Timer cannot be null");
63
64 checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
65 checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
66 checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
67
68 this.maxEvents = maxEvents;
69 this.maxBatchMillis = maxBatchMillis;
70 this.maxIdleMillis = maxIdleMillis;
71 }
72
73 @Override
74 public void add(Event event) {
75 idleTask = cancelIfActive(idleTask);
76 events.add(event);
77
78 // Did we hit the max event threshold?
79 if (events.size() == maxEvents) {
80 maxTask = cancelIfActive(maxTask);
81 schedule(1);
82 } else {
83 // Otherwise, schedule idle task and if this is a first event
84 // also schedule the max batch age task.
85 idleTask = schedule(maxIdleMillis);
86 if (events.size() == 1) {
87 maxTask = schedule(maxBatchMillis);
88 }
89 }
90 }
91
92 // Schedules a new processor task given number of millis in the future.
93 private TimerTask schedule(int millis) {
94 TimerTask task = new ProcessorTask();
95 timer.schedule(task, millis);
96 return task;
97 }
98
99 // Cancels the specified task if it is active.
100 private TimerTask cancelIfActive(TimerTask task) {
101 if (task != null) {
102 task.cancel();
103 }
104 return task;
105 }
106
107 // Task for triggering processing of accumulated events
108 private class ProcessorTask extends TimerTask {
109 @Override
110 public void run() {
Thomas Vachuska5bde31f2014-11-25 15:29:18 -0800111 try {
112 idleTask = cancelIfActive(idleTask);
113 maxTask = cancelIfActive(maxTask);
114 processEvents(finalizeCurrentBatch());
115 } catch (Exception e) {
116 log.warn("Unable to process batch due to {}", e.getMessage());
117 }
tomcbff9392014-09-10 00:45:23 -0700118 }
119 }
120
121 // Demotes and returns the current batch of events and promotes a new one.
122 private synchronized List<Event> finalizeCurrentBatch() {
123 List<Event> toBeProcessed = events;
124 events = Lists.newArrayList();
125 return toBeProcessed;
126 }
127
128 /**
129 * Returns the backing timer.
130 *
131 * @return backing timer
132 */
133 public Timer timer() {
134 return timer;
135 }
136
137 /**
138 * Returns the maximum number of events allowed to accumulate before
139 * processing is triggered.
140 *
141 * @return max number of events
142 */
143 public int maxEvents() {
144 return maxEvents;
145 }
146
147 /**
148 * Returns the maximum number of millis allowed to expire since the first
149 * event before processing is triggered.
150 *
151 * @return max number of millis a batch is allowed to last
152 */
153 public int maxBatchMillis() {
154 return maxBatchMillis;
155 }
156
157 /**
158 * Returns the maximum number of millis allowed to expire since the last
159 * event arrival before processing is triggered.
160 *
161 * @return max number of millis since the last event
162 */
163 public int maxIdleMillis() {
164 return maxIdleMillis;
165 }
166}