blob: 00a5a2cbd9f4318c6a295fcc1c51c2cc88a07128 [file] [log] [blame]
Brian O'Connorcff03322015-02-03 15:28:59 -08001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
18import com.google.common.collect.Lists;
19import org.slf4j.Logger;
20import org.slf4j.LoggerFactory;
21
22import java.util.List;
23import java.util.Timer;
24import java.util.TimerTask;
25
26import static com.google.common.base.Preconditions.checkArgument;
27import static com.google.common.base.Preconditions.checkNotNull;
28
29/**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080030 * Base implementation of an item accumulator. It allows triggering based on
31 * item inter-arrival time threshold, maximum batch life threshold and maximum
Brian O'Connorcff03322015-02-03 15:28:59 -080032 * batch size.
33 */
Brian O'Connorcff03322015-02-03 15:28:59 -080034public abstract class AbstractAccumulator<T> implements Accumulator<T> {
35
36 private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
37
38 private final Timer timer;
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080039 private final int maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -080040 private final int maxBatchMillis;
41 private final int maxIdleMillis;
42
43 private TimerTask idleTask = new ProcessorTask();
44 private TimerTask maxTask = new ProcessorTask();
45
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080046 private List<T> items = Lists.newArrayList();
Brian O'Connorcff03322015-02-03 15:28:59 -080047
48 /**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080049 * Creates an item accumulator capable of triggering on the specified
Brian O'Connorcff03322015-02-03 15:28:59 -080050 * thresholds.
51 *
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080052 * @param timer timer to use for scheduling check-points
53 * @param maxItems maximum number of items to accumulate before
54 * processing is triggered
55 * @param maxBatchMillis maximum number of millis allowed since the first
56 * item before processing is triggered
57 * @param maxIdleMillis maximum number millis between items before
58 * processing is triggered
Brian O'Connorcff03322015-02-03 15:28:59 -080059 */
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080060 protected AbstractAccumulator(Timer timer, int maxItems,
Brian O'Connorcff03322015-02-03 15:28:59 -080061 int maxBatchMillis, int maxIdleMillis) {
62 this.timer = checkNotNull(timer, "Timer cannot be null");
63
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080064 checkArgument(maxItems > 1, "Maximum number of items must be > 1");
Brian O'Connorcff03322015-02-03 15:28:59 -080065 checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
66 checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
67
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080068 this.maxItems = maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -080069 this.maxBatchMillis = maxBatchMillis;
70 this.maxIdleMillis = maxIdleMillis;
71 }
72
73 @Override
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080074 public synchronized void add(T item) {
Brian O'Connorcff03322015-02-03 15:28:59 -080075 idleTask = cancelIfActive(idleTask);
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080076 items.add(checkNotNull(item, "Item cannot be null"));
Brian O'Connorcff03322015-02-03 15:28:59 -080077
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080078 // Did we hit the max item threshold?
79 if (items.size() == maxItems) {
Brian O'Connorcff03322015-02-03 15:28:59 -080080 maxTask = cancelIfActive(maxTask);
81 schedule(1);
82 } else {
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080083 // Otherwise, schedule idle task and if this is a first item
Brian O'Connorcff03322015-02-03 15:28:59 -080084 // also schedule the max batch age task.
85 idleTask = schedule(maxIdleMillis);
Thomas Vachuskaecb63c52015-02-19 10:03:48 -080086 if (items.size() == 1) {
Brian O'Connorcff03322015-02-03 15:28:59 -080087 maxTask = schedule(maxBatchMillis);
88 }
89 }
90 }
91
92 // Schedules a new processor task given number of millis in the future.
93 private TimerTask schedule(int millis) {
94 TimerTask task = new ProcessorTask();
95 timer.schedule(task, millis);
96 return task;
97 }
98
99 // Cancels the specified task if it is active.
100 private TimerTask cancelIfActive(TimerTask task) {
101 if (task != null) {
102 task.cancel();
103 }
104 return task;
105 }
106
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800107 // Task for triggering processing of accumulated items
Brian O'Connorcff03322015-02-03 15:28:59 -0800108 private class ProcessorTask extends TimerTask {
109 @Override
110 public void run() {
111 try {
112 idleTask = cancelIfActive(idleTask);
113 maxTask = cancelIfActive(maxTask);
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800114 processItems(finalizeCurrentBatch());
Brian O'Connorcff03322015-02-03 15:28:59 -0800115 } catch (Exception e) {
Brian O'Connor6b6d0c12015-02-18 20:53:18 -0800116 log.warn("Unable to process batch due to {}", e);
Brian O'Connorcff03322015-02-03 15:28:59 -0800117 }
118 }
119 }
120
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800121 // Demotes and returns the current batch of items and promotes a new one.
Brian O'Connorcff03322015-02-03 15:28:59 -0800122 private synchronized List<T> finalizeCurrentBatch() {
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800123 List<T> toBeProcessed = items;
124 items = Lists.newArrayList();
Brian O'Connorcff03322015-02-03 15:28:59 -0800125 return toBeProcessed;
126 }
127
128 /**
129 * Returns the backing timer.
130 *
131 * @return backing timer
132 */
133 public Timer timer() {
134 return timer;
135 }
136
137 /**
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800138 * Returns the maximum number of items allowed to accumulate before
Brian O'Connorcff03322015-02-03 15:28:59 -0800139 * processing is triggered.
140 *
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800141 * @return max number of items
Brian O'Connorcff03322015-02-03 15:28:59 -0800142 */
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800143 public int maxItems() {
144 return maxItems;
Brian O'Connorcff03322015-02-03 15:28:59 -0800145 }
146
147 /**
148 * Returns the maximum number of millis allowed to expire since the first
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800149 * item before processing is triggered.
Brian O'Connorcff03322015-02-03 15:28:59 -0800150 *
151 * @return max number of millis a batch is allowed to last
152 */
153 public int maxBatchMillis() {
154 return maxBatchMillis;
155 }
156
157 /**
158 * Returns the maximum number of millis allowed to expire since the last
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800159 * item arrival before processing is triggered.
Brian O'Connorcff03322015-02-03 15:28:59 -0800160 *
Thomas Vachuskaecb63c52015-02-19 10:03:48 -0800161 * @return max number of millis since the last item
Brian O'Connorcff03322015-02-03 15:28:59 -0800162 */
163 public int maxIdleMillis() {
164 return maxIdleMillis;
165 }
166}