blob: 273d409acbd67c900b9f7759919d9d35797287f8 [file] [log] [blame]
Brian O'Connorcff03322015-02-03 15:28:59 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Brian O'Connorcff03322015-02-03 15:28:59 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
Brian O'Connorc590ebb2016-12-08 18:16:41 -080018import com.google.common.collect.ImmutableList;
Brian O'Connorcff03322015-02-03 15:28:59 -080019import com.google.common.collect.Lists;
20import org.slf4j.Logger;
21import org.slf4j.LoggerFactory;
22
23import java.util.List;
24import java.util.Timer;
25import java.util.TimerTask;
Brian O'Connorc590ebb2016-12-08 18:16:41 -080026import java.util.concurrent.atomic.AtomicReference;
Brian O'Connorcff03322015-02-03 15:28:59 -080027
28import static com.google.common.base.Preconditions.checkArgument;
29import static com.google.common.base.Preconditions.checkNotNull;
30
31/**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080032 * Base implementation of an item accumulator. It allows triggering based on
33 * item inter-arrival time threshold, maximum batch life threshold and maximum
Brian O'Connorcff03322015-02-03 15:28:59 -080034 * batch size.
35 */
Brian O'Connorcff03322015-02-03 15:28:59 -080036public abstract class AbstractAccumulator<T> implements Accumulator<T> {
37
38 private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
39
40 private final Timer timer;
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080041 private final int maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -080042 private final int maxBatchMillis;
43 private final int maxIdleMillis;
44
Brian O'Connorc590ebb2016-12-08 18:16:41 -080045 private final AtomicReference<TimerTask> idleTask = new AtomicReference<>();
46 private final AtomicReference<TimerTask> maxTask = new AtomicReference<>();
Brian O'Connorcff03322015-02-03 15:28:59 -080047
Brian O'Connorc590ebb2016-12-08 18:16:41 -080048 private final List<T> items;
Brian O'Connorcff03322015-02-03 15:28:59 -080049
50 /**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080051 * Creates an item accumulator capable of triggering on the specified
Brian O'Connorcff03322015-02-03 15:28:59 -080052 * thresholds.
53 *
Thomas Vachuskaa82341c2015-08-25 17:36:59 -070054 * @param timer timer to use for scheduling check-points
55 * @param maxItems maximum number of items to accumulate before
56 * processing is triggered
Brian O'Connorc590ebb2016-12-08 18:16:41 -080057 * <p>
58 * NB: It is possible that processItems will contain
59 * more than maxItems under high load or if isReady()
60 * can return false.
61 * </p>
Thomas Vachuskaa82341c2015-08-25 17:36:59 -070062 * @param maxBatchMillis maximum number of millis allowed since the first
63 * item before processing is triggered
64 * @param maxIdleMillis maximum number millis between items before
65 * processing is triggered
Brian O'Connorcff03322015-02-03 15:28:59 -080066 */
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080067 protected AbstractAccumulator(Timer timer, int maxItems,
Brian O'Connorcff03322015-02-03 15:28:59 -080068 int maxBatchMillis, int maxIdleMillis) {
69 this.timer = checkNotNull(timer, "Timer cannot be null");
70
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080071 checkArgument(maxItems > 1, "Maximum number of items must be > 1");
Brian O'Connorcff03322015-02-03 15:28:59 -080072 checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
73 checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
74
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080075 this.maxItems = maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -080076 this.maxBatchMillis = maxBatchMillis;
77 this.maxIdleMillis = maxIdleMillis;
Brian O'Connorc590ebb2016-12-08 18:16:41 -080078
79 items = Lists.newArrayListWithExpectedSize(maxItems);
Brian O'Connorcff03322015-02-03 15:28:59 -080080 }
81
82 @Override
Brian O'Connorc590ebb2016-12-08 18:16:41 -080083 public void add(T item) {
84 final int sizeAtTimeOfAdd;
85 synchronized (items) {
86 items.add(item);
87 sizeAtTimeOfAdd = items.size();
88 }
89
90 /*
91 WARNING: It is possible that the item that was just added to the list
92 has been processed by an existing idle task at this point.
93
94 By rescheduling the following timers, it is possible that a
95 superfluous maxTask is generated now OR that the idle task and max
96 task are scheduled at their specified delays. This could result in
97 calls to processItems sooner than expected.
98 */
Brian O'Connorcff03322015-02-03 15:28:59 -080099
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800100 // Did we hit the max item threshold?
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800101 if (sizeAtTimeOfAdd >= maxItems) {
102 if (maxIdleMillis < maxBatchMillis) {
103 cancelTask(idleTask);
104 }
105 rescheduleTask(maxTask, 0 /* now! */);
Brian O'Connorcff03322015-02-03 15:28:59 -0800106 } else {
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800107 // Otherwise, schedule idle task and if this is a first item
Brian O'Connorcff03322015-02-03 15:28:59 -0800108 // also schedule the max batch age task.
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800109 if (maxIdleMillis < maxBatchMillis) {
110 rescheduleTask(idleTask, maxIdleMillis);
111 }
112 if (sizeAtTimeOfAdd == 1) {
113 rescheduleTask(maxTask, maxBatchMillis);
Brian O'Connorcff03322015-02-03 15:28:59 -0800114 }
115 }
116 }
117
Thomas Vachuskaa82341c2015-08-25 17:36:59 -0700118 /**
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800119 * Reschedules the specified task, cancelling existing one if applicable.
120 *
121 * @param taskRef task reference
122 * @param millis delay in milliseconds
Thomas Vachuskaa82341c2015-08-25 17:36:59 -0700123 */
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800124 private void rescheduleTask(AtomicReference<TimerTask> taskRef, long millis) {
125 ProcessorTask newTask = new ProcessorTask();
126 timer.schedule(newTask, millis);
127 swapAndCancelTask(taskRef, newTask);
Thomas Vachuskaa82341c2015-08-25 17:36:59 -0700128 }
129
130 /**
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800131 * Cancels the specified task if it has not run or is not running.
132 *
133 * @param taskRef task reference
Thomas Vachuskaa82341c2015-08-25 17:36:59 -0700134 */
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800135 private void cancelTask(AtomicReference<TimerTask> taskRef) {
136 swapAndCancelTask(taskRef, null);
Brian O'Connorcff03322015-02-03 15:28:59 -0800137 }
138
Thomas Vachuskaa82341c2015-08-25 17:36:59 -0700139 /**
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800140 * Sets the new task and attempts to cancelTask the old one.
141 *
142 * @param taskRef task reference
143 * @param newTask new task
Thomas Vachuskaa82341c2015-08-25 17:36:59 -0700144 */
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800145 private void swapAndCancelTask(AtomicReference<TimerTask> taskRef,
146 TimerTask newTask) {
147 TimerTask oldTask = taskRef.getAndSet(newTask);
148 if (oldTask != null) {
149 oldTask.cancel();
Brian O'Connorcff03322015-02-03 15:28:59 -0800150 }
Brian O'Connorcff03322015-02-03 15:28:59 -0800151 }
152
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800153 // Task for triggering processing of accumulated items
Brian O'Connorcff03322015-02-03 15:28:59 -0800154 private class ProcessorTask extends TimerTask {
155 @Override
156 public void run() {
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800157 try {
158 if (isReady()) {
159
160 List<T> batch = finalizeCurrentBatch();
Thomas Vachuskaa82341c2015-08-25 17:36:59 -0700161 if (!batch.isEmpty()) {
162 processItems(batch);
Simon Hunt8d22c4b2015-08-06 16:24:43 -0700163 }
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800164 } else {
165 rescheduleTask(idleTask, maxIdleMillis);
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800166 }
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800167 } catch (Exception e) {
168 log.warn("Unable to process batch due to", e);
Brian O'Connorcff03322015-02-03 15:28:59 -0800169 }
170 }
171 }
172
Brian O'Connorc590ebb2016-12-08 18:16:41 -0800173 /**
174 * Returns an immutable copy of the existing items and clear the list.
175 *
176 * @return list of existing items
177 */
178 private List<T> finalizeCurrentBatch() {
179 List<T> finalizedList;
180 synchronized (items) {
181 finalizedList = ImmutableList.copyOf(items);
182 items.clear();
183 /*
184 * To avoid reprocessing being triggered on an empty list.
185 */
186 cancelTask(maxTask);
187 cancelTask(idleTask);
188 }
189 return finalizedList;
Brian O'Connorcff03322015-02-03 15:28:59 -0800190 }
191
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800192 @Override
193 public boolean isReady() {
194 return true;
195 }
196
Brian O'Connorcff03322015-02-03 15:28:59 -0800197 /**
198 * Returns the backing timer.
199 *
200 * @return backing timer
201 */
202 public Timer timer() {
203 return timer;
204 }
205
206 /**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800207 * Returns the maximum number of items allowed to accumulate before
Brian O'Connorcff03322015-02-03 15:28:59 -0800208 * processing is triggered.
209 *
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800210 * @return max number of items
Brian O'Connorcff03322015-02-03 15:28:59 -0800211 */
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800212 public int maxItems() {
213 return maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -0800214 }
215
216 /**
217 * Returns the maximum number of millis allowed to expire since the first
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800218 * item before processing is triggered.
Brian O'Connorcff03322015-02-03 15:28:59 -0800219 *
220 * @return max number of millis a batch is allowed to last
221 */
222 public int maxBatchMillis() {
223 return maxBatchMillis;
224 }
225
226 /**
227 * Returns the maximum number of millis allowed to expire since the last
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800228 * item arrival before processing is triggered.
Brian O'Connorcff03322015-02-03 15:28:59 -0800229 *
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800230 * @return max number of millis since the last item
Brian O'Connorcff03322015-02-03 15:28:59 -0800231 */
232 public int maxIdleMillis() {
233 return maxIdleMillis;
234 }
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800235
Brian O'Connorcff03322015-02-03 15:28:59 -0800236}