| /* |
| * Copyright 2015 Open Networking Laboratory |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.onlab.util; |
| |
| import com.google.common.collect.Lists; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.List; |
| import java.util.Timer; |
| import java.util.TimerTask; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| |
| /** |
| * Base implementation of an item accumulator. It allows triggering based on |
| * item inter-arrival time threshold, maximum batch life threshold and maximum |
| * batch size. |
| */ |
| public abstract class AbstractAccumulator<T> implements Accumulator<T> { |
| |
| private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class); |
| |
| private final Timer timer; |
| private final int maxItems; |
| private final int maxBatchMillis; |
| private final int maxIdleMillis; |
| |
| private volatile TimerTask idleTask = new ProcessorTask(); |
| private volatile TimerTask maxTask = new ProcessorTask(); |
| |
| private List<T> items = Lists.newArrayList(); |
| |
| /** |
| * Creates an item accumulator capable of triggering on the specified |
| * thresholds. |
| * |
| * @param timer timer to use for scheduling check-points |
| * @param maxItems maximum number of items to accumulate before |
| * processing is triggered |
| * @param maxBatchMillis maximum number of millis allowed since the first |
| * item before processing is triggered |
| * @param maxIdleMillis maximum number millis between items before |
| * processing is triggered |
| */ |
| protected AbstractAccumulator(Timer timer, int maxItems, |
| int maxBatchMillis, int maxIdleMillis) { |
| this.timer = checkNotNull(timer, "Timer cannot be null"); |
| |
| checkArgument(maxItems > 1, "Maximum number of items must be > 1"); |
| checkArgument(maxBatchMillis > 0, "Maximum millis must be positive"); |
| checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive"); |
| |
| this.maxItems = maxItems; |
| this.maxBatchMillis = maxBatchMillis; |
| this.maxIdleMillis = maxIdleMillis; |
| } |
| |
| @Override |
| public synchronized void add(T item) { |
| idleTask = cancelIfActive(idleTask); |
| items.add(checkNotNull(item, "Item cannot be null")); |
| |
| // Did we hit the max item threshold? |
| if (items.size() >= maxItems) { |
| maxTask = cancelIfActive(maxTask); |
| scheduleNow(); |
| } else { |
| // Otherwise, schedule idle task and if this is a first item |
| // also schedule the max batch age task. |
| idleTask = schedule(maxIdleMillis); |
| if (items.size() == 1) { |
| maxTask = schedule(maxBatchMillis); |
| } |
| } |
| } |
| |
| /** |
| * Finalizes the current batch, if ready, and schedules a new processor |
| * in the immediate future. |
| */ |
| private void scheduleNow() { |
| if (isReady()) { |
| TimerTask task = new ProcessorTask(finalizeCurrentBatch()); |
| timer.schedule(task, 1); |
| } |
| } |
| |
| /** |
| * Schedules a new processor task given number of millis in the future. |
| * Batch finalization is deferred to time of execution. |
| */ |
| private TimerTask schedule(int millis) { |
| TimerTask task = new ProcessorTask(); |
| timer.schedule(task, millis); |
| return task; |
| } |
| |
| /** |
| * Cancels the specified task if it is active. |
| */ |
| private TimerTask cancelIfActive(TimerTask task) { |
| if (task != null) { |
| task.cancel(); |
| } |
| return task; |
| } |
| |
| // Task for triggering processing of accumulated items |
| private class ProcessorTask extends TimerTask { |
| |
| private final List<T> items; |
| |
| // Creates a new processor task with deferred batch finalization. |
| ProcessorTask() { |
| this.items = null; |
| } |
| |
| // Creates a new processor task with pre-emptive batch finalization. |
| ProcessorTask(List<T> items) { |
| this.items = items; |
| } |
| |
| @Override |
| public void run() { |
| synchronized (AbstractAccumulator.this) { |
| idleTask = cancelIfActive(idleTask); |
| } |
| if (isReady()) { |
| try { |
| synchronized (AbstractAccumulator.this) { |
| maxTask = cancelIfActive(maxTask); |
| } |
| List<T> batch = items != null ? items : finalizeCurrentBatch(); |
| if (!batch.isEmpty()) { |
| processItems(batch); |
| } |
| } catch (Exception e) { |
| log.warn("Unable to process batch due to", e); |
| } |
| } else { |
| synchronized (AbstractAccumulator.this) { |
| idleTask = schedule(maxIdleMillis); |
| } |
| } |
| } |
| } |
| |
| // Demotes and returns the current batch of items and promotes a new one. |
| private synchronized List<T> finalizeCurrentBatch() { |
| List<T> toBeProcessed = items; |
| items = Lists.newArrayList(); |
| return toBeProcessed; |
| } |
| |
| @Override |
| public boolean isReady() { |
| return true; |
| } |
| |
| /** |
| * Returns the backing timer. |
| * |
| * @return backing timer |
| */ |
| public Timer timer() { |
| return timer; |
| } |
| |
| /** |
| * Returns the maximum number of items allowed to accumulate before |
| * processing is triggered. |
| * |
| * @return max number of items |
| */ |
| public int maxItems() { |
| return maxItems; |
| } |
| |
| /** |
| * Returns the maximum number of millis allowed to expire since the first |
| * item before processing is triggered. |
| * |
| * @return max number of millis a batch is allowed to last |
| */ |
| public int maxBatchMillis() { |
| return maxBatchMillis; |
| } |
| |
| /** |
| * Returns the maximum number of millis allowed to expire since the last |
| * item arrival before processing is triggered. |
| * |
| * @return max number of millis since the last item |
| */ |
| public int maxIdleMillis() { |
| return maxIdleMillis; |
| } |
| |
| } |