blob: fb3e4f08afe162e93757986a529e8f255702706b [file] [log] [blame]
Brian O'Connorcff03322015-02-03 15:28:59 -08001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2015-present Open Networking Laboratory
Brian O'Connorcff03322015-02-03 15:28:59 -08003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
18import com.google.common.collect.Lists;
19import org.slf4j.Logger;
20import org.slf4j.LoggerFactory;
21
22import java.util.List;
23import java.util.Timer;
24import java.util.TimerTask;
25
26import static com.google.common.base.Preconditions.checkArgument;
27import static com.google.common.base.Preconditions.checkNotNull;
28
29/**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080030 * Base implementation of an item accumulator. It allows triggering based on
31 * item inter-arrival time threshold, maximum batch life threshold and maximum
Brian O'Connorcff03322015-02-03 15:28:59 -080032 * batch size.
33 */
Brian O'Connorcff03322015-02-03 15:28:59 -080034public abstract class AbstractAccumulator<T> implements Accumulator<T> {
35
36 private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
37
38 private final Timer timer;
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080039 private final int maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -080040 private final int maxBatchMillis;
41 private final int maxIdleMillis;
42
Ray Milkey71081a62015-08-17 17:44:01 -070043 private volatile TimerTask idleTask = new ProcessorTask();
44 private volatile TimerTask maxTask = new ProcessorTask();
Brian O'Connorcff03322015-02-03 15:28:59 -080045
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080046 private List<T> items = Lists.newArrayList();
Brian O'Connorcff03322015-02-03 15:28:59 -080047
48 /**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080049 * Creates an item accumulator capable of triggering on the specified
Brian O'Connorcff03322015-02-03 15:28:59 -080050 * thresholds.
51 *
Thomas Vachuskaa82341c2015-08-25 17:36:59 -070052 * @param timer timer to use for scheduling check-points
53 * @param maxItems maximum number of items to accumulate before
54 * processing is triggered
55 * @param maxBatchMillis maximum number of millis allowed since the first
56 * item before processing is triggered
57 * @param maxIdleMillis maximum number millis between items before
58 * processing is triggered
Brian O'Connorcff03322015-02-03 15:28:59 -080059 */
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080060 protected AbstractAccumulator(Timer timer, int maxItems,
Brian O'Connorcff03322015-02-03 15:28:59 -080061 int maxBatchMillis, int maxIdleMillis) {
62 this.timer = checkNotNull(timer, "Timer cannot be null");
63
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080064 checkArgument(maxItems > 1, "Maximum number of items must be > 1");
Brian O'Connorcff03322015-02-03 15:28:59 -080065 checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
66 checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
67
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080068 this.maxItems = maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -080069 this.maxBatchMillis = maxBatchMillis;
70 this.maxIdleMillis = maxIdleMillis;
71 }
72
73 @Override
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080074 public synchronized void add(T item) {
Brian O'Connorcff03322015-02-03 15:28:59 -080075 idleTask = cancelIfActive(idleTask);
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080076 items.add(checkNotNull(item, "Item cannot be null"));
Brian O'Connorcff03322015-02-03 15:28:59 -080077
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080078 // Did we hit the max item threshold?
Thomas Vachuska75af68a2015-02-22 12:13:52 -080079 if (items.size() >= maxItems) {
Brian O'Connorcff03322015-02-03 15:28:59 -080080 maxTask = cancelIfActive(maxTask);
Thomas Vachuskaa82341c2015-08-25 17:36:59 -070081 scheduleNow();
Brian O'Connorcff03322015-02-03 15:28:59 -080082 } else {
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080083 // Otherwise, schedule idle task and if this is a first item
Brian O'Connorcff03322015-02-03 15:28:59 -080084 // also schedule the max batch age task.
85 idleTask = schedule(maxIdleMillis);
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080086 if (items.size() == 1) {
Brian O'Connorcff03322015-02-03 15:28:59 -080087 maxTask = schedule(maxBatchMillis);
88 }
89 }
90 }
91
Thomas Vachuskaa82341c2015-08-25 17:36:59 -070092 /**
93 * Finalizes the current batch, if ready, and schedules a new processor
94 * in the immediate future.
95 */
96 private void scheduleNow() {
97 if (isReady()) {
98 TimerTask task = new ProcessorTask(finalizeCurrentBatch());
99 timer.schedule(task, 1);
100 }
101 }
102
103 /**
104 * Schedules a new processor task given number of millis in the future.
105 * Batch finalization is deferred to time of execution.
106 */
Brian O'Connorcff03322015-02-03 15:28:59 -0800107 private TimerTask schedule(int millis) {
108 TimerTask task = new ProcessorTask();
109 timer.schedule(task, millis);
110 return task;
111 }
112
Thomas Vachuskaa82341c2015-08-25 17:36:59 -0700113 /**
114 * Cancels the specified task if it is active.
115 */
Brian O'Connorcff03322015-02-03 15:28:59 -0800116 private TimerTask cancelIfActive(TimerTask task) {
117 if (task != null) {
118 task.cancel();
119 }
120 return task;
121 }
122
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800123 // Task for triggering processing of accumulated items
Brian O'Connorcff03322015-02-03 15:28:59 -0800124 private class ProcessorTask extends TimerTask {
Thomas Vachuskaa82341c2015-08-25 17:36:59 -0700125
126 private final List<T> items;
127
128 // Creates a new processor task with deferred batch finalization.
129 ProcessorTask() {
130 this.items = null;
131 }
132
133 // Creates a new processor task with pre-emptive batch finalization.
134 ProcessorTask(List<T> items) {
135 this.items = items;
136 }
137
Brian O'Connorcff03322015-02-03 15:28:59 -0800138 @Override
139 public void run() {
Ray Milkey71081a62015-08-17 17:44:01 -0700140 synchronized (AbstractAccumulator.this) {
141 idleTask = cancelIfActive(idleTask);
142 }
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800143 if (isReady()) {
144 try {
Ray Milkey71081a62015-08-17 17:44:01 -0700145 synchronized (AbstractAccumulator.this) {
146 maxTask = cancelIfActive(maxTask);
147 }
Thomas Vachuskaa82341c2015-08-25 17:36:59 -0700148 List<T> batch = items != null ? items : finalizeCurrentBatch();
149 if (!batch.isEmpty()) {
150 processItems(batch);
Simon Hunt8d22c4b2015-08-06 16:24:43 -0700151 }
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800152 } catch (Exception e) {
Thomas Vachuska91caf1a2015-08-26 10:14:29 -0700153 log.warn("Unable to process batch due to", e);
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800154 }
155 } else {
Ray Milkey71081a62015-08-17 17:44:01 -0700156 synchronized (AbstractAccumulator.this) {
157 idleTask = schedule(maxIdleMillis);
158 }
Brian O'Connorcff03322015-02-03 15:28:59 -0800159 }
160 }
161 }
162
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800163 // Demotes and returns the current batch of items and promotes a new one.
Brian O'Connorcff03322015-02-03 15:28:59 -0800164 private synchronized List<T> finalizeCurrentBatch() {
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800165 List<T> toBeProcessed = items;
166 items = Lists.newArrayList();
Brian O'Connorcff03322015-02-03 15:28:59 -0800167 return toBeProcessed;
168 }
169
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800170 @Override
171 public boolean isReady() {
172 return true;
173 }
174
Brian O'Connorcff03322015-02-03 15:28:59 -0800175 /**
176 * Returns the backing timer.
177 *
178 * @return backing timer
179 */
180 public Timer timer() {
181 return timer;
182 }
183
184 /**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800185 * Returns the maximum number of items allowed to accumulate before
Brian O'Connorcff03322015-02-03 15:28:59 -0800186 * processing is triggered.
187 *
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800188 * @return max number of items
Brian O'Connorcff03322015-02-03 15:28:59 -0800189 */
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800190 public int maxItems() {
191 return maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -0800192 }
193
194 /**
195 * Returns the maximum number of millis allowed to expire since the first
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800196 * item before processing is triggered.
Brian O'Connorcff03322015-02-03 15:28:59 -0800197 *
198 * @return max number of millis a batch is allowed to last
199 */
200 public int maxBatchMillis() {
201 return maxBatchMillis;
202 }
203
204 /**
205 * Returns the maximum number of millis allowed to expire since the last
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800206 * item arrival before processing is triggered.
Brian O'Connorcff03322015-02-03 15:28:59 -0800207 *
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800208 * @return max number of millis since the last item
Brian O'Connorcff03322015-02-03 15:28:59 -0800209 */
210 public int maxIdleMillis() {
211 return maxIdleMillis;
212 }
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800213
Brian O'Connorcff03322015-02-03 15:28:59 -0800214}