blob: e9e3cc80b84f9d286914701e409aa5d6f775dd3c [file] [log] [blame]
Brian O'Connorcff03322015-02-03 15:28:59 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
18import com.google.common.collect.Lists;
19import org.slf4j.Logger;
20import org.slf4j.LoggerFactory;
21
22import java.util.List;
23import java.util.Timer;
24import java.util.TimerTask;
25
26import static com.google.common.base.Preconditions.checkArgument;
27import static com.google.common.base.Preconditions.checkNotNull;
28
29/**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080030 * Base implementation of an item accumulator. It allows triggering based on
31 * item inter-arrival time threshold, maximum batch life threshold and maximum
Brian O'Connorcff03322015-02-03 15:28:59 -080032 * batch size.
33 */
Brian O'Connorcff03322015-02-03 15:28:59 -080034public abstract class AbstractAccumulator<T> implements Accumulator<T> {
35
36 private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
37
38 private final Timer timer;
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080039 private final int maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -080040 private final int maxBatchMillis;
41 private final int maxIdleMillis;
42
Ray Milkey71081a62015-08-17 17:44:01 -070043 private volatile TimerTask idleTask = new ProcessorTask();
44 private volatile TimerTask maxTask = new ProcessorTask();
Brian O'Connorcff03322015-02-03 15:28:59 -080045
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080046 private List<T> items = Lists.newArrayList();
Brian O'Connorcff03322015-02-03 15:28:59 -080047
48 /**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080049 * Creates an item accumulator capable of triggering on the specified
Brian O'Connorcff03322015-02-03 15:28:59 -080050 * thresholds.
51 *
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080052 * @param timer timer to use for scheduling check-points
53 * @param maxItems maximum number of items to accumulate before
54 * processing is triggered
55 * @param maxBatchMillis maximum number of millis allowed since the first
56 * item before processing is triggered
57 * @param maxIdleMillis maximum number millis between items before
58 * processing is triggered
Brian O'Connorcff03322015-02-03 15:28:59 -080059 */
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080060 protected AbstractAccumulator(Timer timer, int maxItems,
Brian O'Connorcff03322015-02-03 15:28:59 -080061 int maxBatchMillis, int maxIdleMillis) {
62 this.timer = checkNotNull(timer, "Timer cannot be null");
63
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080064 checkArgument(maxItems > 1, "Maximum number of items must be > 1");
Brian O'Connorcff03322015-02-03 15:28:59 -080065 checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
66 checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
67
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080068 this.maxItems = maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -080069 this.maxBatchMillis = maxBatchMillis;
70 this.maxIdleMillis = maxIdleMillis;
71 }
72
73 @Override
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080074 public synchronized void add(T item) {
Brian O'Connorcff03322015-02-03 15:28:59 -080075 idleTask = cancelIfActive(idleTask);
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080076 items.add(checkNotNull(item, "Item cannot be null"));
Brian O'Connorcff03322015-02-03 15:28:59 -080077
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080078 // Did we hit the max item threshold?
Thomas Vachuska75af68a2015-02-22 12:13:52 -080079 if (items.size() >= maxItems) {
Brian O'Connorcff03322015-02-03 15:28:59 -080080 maxTask = cancelIfActive(maxTask);
81 schedule(1);
82 } else {
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080083 // Otherwise, schedule idle task and if this is a first item
Brian O'Connorcff03322015-02-03 15:28:59 -080084 // also schedule the max batch age task.
85 idleTask = schedule(maxIdleMillis);
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080086 if (items.size() == 1) {
Brian O'Connorcff03322015-02-03 15:28:59 -080087 maxTask = schedule(maxBatchMillis);
88 }
89 }
90 }
91
92 // Schedules a new processor task given number of millis in the future.
93 private TimerTask schedule(int millis) {
94 TimerTask task = new ProcessorTask();
95 timer.schedule(task, millis);
96 return task;
97 }
98
99 // Cancels the specified task if it is active.
100 private TimerTask cancelIfActive(TimerTask task) {
101 if (task != null) {
102 task.cancel();
103 }
104 return task;
105 }
106
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800107 // Task for triggering processing of accumulated items
Brian O'Connorcff03322015-02-03 15:28:59 -0800108 private class ProcessorTask extends TimerTask {
109 @Override
110 public void run() {
Ray Milkey71081a62015-08-17 17:44:01 -0700111 synchronized (AbstractAccumulator.this) {
112 idleTask = cancelIfActive(idleTask);
113 }
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800114 if (isReady()) {
115 try {
Ray Milkey71081a62015-08-17 17:44:01 -0700116 synchronized (AbstractAccumulator.this) {
117 maxTask = cancelIfActive(maxTask);
118 }
Simon Hunt8d22c4b2015-08-06 16:24:43 -0700119 List<T> items = finalizeCurrentBatch();
120 if (!items.isEmpty()) {
121 processItems(items);
122 }
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800123 } catch (Exception e) {
124 log.warn("Unable to process batch due to {}", e);
125 }
126 } else {
Ray Milkey71081a62015-08-17 17:44:01 -0700127 synchronized (AbstractAccumulator.this) {
128 idleTask = schedule(maxIdleMillis);
129 }
Brian O'Connorcff03322015-02-03 15:28:59 -0800130 }
131 }
132 }
133
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800134 // Demotes and returns the current batch of items and promotes a new one.
Brian O'Connorcff03322015-02-03 15:28:59 -0800135 private synchronized List<T> finalizeCurrentBatch() {
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800136 List<T> toBeProcessed = items;
137 items = Lists.newArrayList();
Brian O'Connorcff03322015-02-03 15:28:59 -0800138 return toBeProcessed;
139 }
140
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800141 @Override
142 public boolean isReady() {
143 return true;
144 }
145
Brian O'Connorcff03322015-02-03 15:28:59 -0800146 /**
147 * Returns the backing timer.
148 *
149 * @return backing timer
150 */
151 public Timer timer() {
152 return timer;
153 }
154
155 /**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800156 * Returns the maximum number of items allowed to accumulate before
Brian O'Connorcff03322015-02-03 15:28:59 -0800157 * processing is triggered.
158 *
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800159 * @return max number of items
Brian O'Connorcff03322015-02-03 15:28:59 -0800160 */
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800161 public int maxItems() {
162 return maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -0800163 }
164
165 /**
166 * Returns the maximum number of millis allowed to expire since the first
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800167 * item before processing is triggered.
Brian O'Connorcff03322015-02-03 15:28:59 -0800168 *
169 * @return max number of millis a batch is allowed to last
170 */
171 public int maxBatchMillis() {
172 return maxBatchMillis;
173 }
174
175 /**
176 * Returns the maximum number of millis allowed to expire since the last
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800177 * item arrival before processing is triggered.
Brian O'Connorcff03322015-02-03 15:28:59 -0800178 *
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800179 * @return max number of millis since the last item
Brian O'Connorcff03322015-02-03 15:28:59 -0800180 */
181 public int maxIdleMillis() {
182 return maxIdleMillis;
183 }
Thomas Vachuska75af68a2015-02-22 12:13:52 -0800184
Brian O'Connorcff03322015-02-03 15:28:59 -0800185}