blob: f4175622536c02bf3994241d169e34cf1c7b702c [file] [log] [blame]
Brian O'Connorcff03322015-02-03 15:28:59 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
18import com.google.common.collect.Lists;
19import org.slf4j.Logger;
20import org.slf4j.LoggerFactory;
21
22import java.util.List;
23import java.util.Timer;
24import java.util.TimerTask;
25
26import static com.google.common.base.Preconditions.checkArgument;
27import static com.google.common.base.Preconditions.checkNotNull;
28
29/**
30 * Base implementation of an event accumulator. It allows triggering based on
31 * event inter-arrival time threshold, maximum batch life threshold and maximum
32 * batch size.
33 */
34// FIXME refactor the names here
35public abstract class AbstractAccumulator<T> implements Accumulator<T> {
36
37 private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
38
39 private final Timer timer;
40 private final int maxEvents;
41 private final int maxBatchMillis;
42 private final int maxIdleMillis;
43
44 private TimerTask idleTask = new ProcessorTask();
45 private TimerTask maxTask = new ProcessorTask();
46
47 private List<T> events = Lists.newArrayList();
48
49 /**
50 * Creates an event accumulator capable of triggering on the specified
51 * thresholds.
52 *
53 * @param timer timer to use for scheduling check-points
54 * @param maxEvents maximum number of events to accumulate before
55 * processing is triggered
56 * @param maxBatchMillis maximum number of millis allowed since the first
57 * event before processing is triggered
58 * @param maxIdleMillis maximum number millis between events before
59 * processing is triggered
60 */
61 protected AbstractAccumulator(Timer timer, int maxEvents,
62 int maxBatchMillis, int maxIdleMillis) {
63 this.timer = checkNotNull(timer, "Timer cannot be null");
64
65 checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
66 checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
67 checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
68
69 this.maxEvents = maxEvents;
70 this.maxBatchMillis = maxBatchMillis;
71 this.maxIdleMillis = maxIdleMillis;
72 }
73
74 @Override
Brian O'Connor6b6d0c12015-02-18 20:53:18 -080075 public synchronized void add(T event) {
Brian O'Connorcff03322015-02-03 15:28:59 -080076 idleTask = cancelIfActive(idleTask);
Brian O'Connor6b6d0c12015-02-18 20:53:18 -080077 events.add(checkNotNull(event, "Event cannot be null"));
Brian O'Connorcff03322015-02-03 15:28:59 -080078
79 // Did we hit the max event threshold?
80 if (events.size() == maxEvents) {
81 maxTask = cancelIfActive(maxTask);
82 schedule(1);
83 } else {
84 // Otherwise, schedule idle task and if this is a first event
85 // also schedule the max batch age task.
86 idleTask = schedule(maxIdleMillis);
87 if (events.size() == 1) {
88 maxTask = schedule(maxBatchMillis);
89 }
90 }
91 }
92
93 // Schedules a new processor task given number of millis in the future.
94 private TimerTask schedule(int millis) {
95 TimerTask task = new ProcessorTask();
96 timer.schedule(task, millis);
97 return task;
98 }
99
100 // Cancels the specified task if it is active.
101 private TimerTask cancelIfActive(TimerTask task) {
102 if (task != null) {
103 task.cancel();
104 }
105 return task;
106 }
107
108 // Task for triggering processing of accumulated events
109 private class ProcessorTask extends TimerTask {
110 @Override
111 public void run() {
112 try {
113 idleTask = cancelIfActive(idleTask);
114 maxTask = cancelIfActive(maxTask);
115 processEvents(finalizeCurrentBatch());
116 } catch (Exception e) {
Brian O'Connor6b6d0c12015-02-18 20:53:18 -0800117 log.warn("Unable to process batch due to {}", e);
Brian O'Connorcff03322015-02-03 15:28:59 -0800118 }
119 }
120 }
121
122 // Demotes and returns the current batch of events and promotes a new one.
123 private synchronized List<T> finalizeCurrentBatch() {
124 List<T> toBeProcessed = events;
125 events = Lists.newArrayList();
126 return toBeProcessed;
127 }
128
129 /**
130 * Returns the backing timer.
131 *
132 * @return backing timer
133 */
134 public Timer timer() {
135 return timer;
136 }
137
138 /**
139 * Returns the maximum number of events allowed to accumulate before
140 * processing is triggered.
141 *
142 * @return max number of events
143 */
144 public int maxEvents() {
145 return maxEvents;
146 }
147
148 /**
149 * Returns the maximum number of millis allowed to expire since the first
150 * event before processing is triggered.
151 *
152 * @return max number of millis a batch is allowed to last
153 */
154 public int maxBatchMillis() {
155 return maxBatchMillis;
156 }
157
158 /**
159 * Returns the maximum number of millis allowed to expire since the last
160 * event arrival before processing is triggered.
161 *
162 * @return max number of millis since the last event
163 */
164 public int maxIdleMillis() {
165 return maxIdleMillis;
166 }
167}