Updated accumulator documentation and refactored names to remove the event heritage.
Change-Id: I2238ab1215281702e670a406fb901ba8a4ef85ce
diff --git a/core/api/src/main/java/org/onosproject/event/AbstractEventAccumulator.java b/core/api/src/main/java/org/onosproject/event/AbstractEventAccumulator.java
deleted file mode 100644
index f3fc65f..0000000
--- a/core/api/src/main/java/org/onosproject/event/AbstractEventAccumulator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.event;
-
-import org.onlab.util.AbstractAccumulator;
-
-import java.util.Timer;
-
-/**
- * Base implementation of an event accumulator. It allows triggering based on
- * event inter-arrival time threshold, maximum batch life threshold and maximum
- * batch size.
- */
-public abstract class AbstractEventAccumulator
- extends AbstractAccumulator<Event>
- implements EventAccumulator {
-
- /**
- * Creates an event accumulator capable of triggering on the specified
- * thresholds.
- *
- * @param timer timer to use for scheduling check-points
- * @param maxEvents maximum number of events to accumulate before
- * processing is triggered
- * @param maxBatchMillis maximum number of millis allowed since the first
- * event before processing is triggered
- * @param maxIdleMillis maximum number millis between events before
- * processing is triggered
- */
- protected AbstractEventAccumulator(Timer timer, int maxEvents,
- int maxBatchMillis, int maxIdleMillis) {
- super(timer, maxEvents, maxBatchMillis, maxIdleMillis);
- }
-}
diff --git a/core/api/src/main/java/org/onosproject/event/EventAccumulator.java b/core/api/src/main/java/org/onosproject/event/EventAccumulator.java
deleted file mode 100644
index 78acfa0..0000000
--- a/core/api/src/main/java/org/onosproject/event/EventAccumulator.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.event;
-
-import org.onlab.util.Accumulator;
-
-/**
- * Abstraction of an accumulator capable of collecting events and at some
- * point in time triggers processing of all previously accumulated events.
- */
-public interface EventAccumulator extends Accumulator<Event> {
-}
diff --git a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentAccumulator.java b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentAccumulator.java
index 7eef748..09f5511 100644
--- a/core/net/src/main/java/org/onosproject/net/intent/impl/IntentAccumulator.java
+++ b/core/net/src/main/java/org/onosproject/net/intent/impl/IntentAccumulator.java
@@ -53,8 +53,8 @@
}
@Override
- public void processEvents(List<IntentData> ops) {
- delegate.execute(reduce(ops));
+ public void processItems(List<IntentData> items) {
+ delegate.execute(reduce(items));
// FIXME kick off the work
//for (IntentData data : opMap.values()) {}
}
diff --git a/core/net/src/main/java/org/onosproject/net/topology/impl/DefaultTopologyProvider.java b/core/net/src/main/java/org/onosproject/net/topology/impl/DefaultTopologyProvider.java
index 6137fa4..83e1f91 100644
--- a/core/net/src/main/java/org/onosproject/net/topology/impl/DefaultTopologyProvider.java
+++ b/core/net/src/main/java/org/onosproject/net/topology/impl/DefaultTopologyProvider.java
@@ -16,7 +16,6 @@
package org.onosproject.net.topology.impl;
import com.google.common.collect.ImmutableList;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -25,9 +24,9 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onosproject.event.AbstractEventAccumulator;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
import org.onosproject.event.Event;
-import org.onosproject.event.EventAccumulator;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
@@ -51,9 +50,9 @@
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.Tools.namedThreads;
import static org.onosproject.core.CoreService.CORE_PROVIDER_ID;
import static org.onosproject.net.device.DeviceEvent.Type.*;
-import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -104,7 +103,7 @@
private DeviceListener deviceListener = new InternalDeviceListener();
private LinkListener linkListener = new InternalLinkListener();
- private EventAccumulator accumulator;
+ private Accumulator<Event> accumulator;
private ExecutorService executor;
/**
@@ -245,18 +244,15 @@
}
// Event accumulator for paced triggering of topology assembly.
- private class TopologyChangeAccumulator
- extends AbstractEventAccumulator implements EventAccumulator {
-
+ private class TopologyChangeAccumulator extends AbstractAccumulator<Event> {
TopologyChangeAccumulator() {
super(TIMER, maxEvents, maxBatchMs, maxIdleMs);
}
@Override
- public void processEvents(List<Event> events) {
- triggerTopologyBuild(events);
+ public void processItems(List<Event> items) {
+ triggerTopologyBuild(items);
}
-
}
// Task for building topology data in a separate thread.
diff --git a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
index f417562..00a5a2c 100644
--- a/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/AbstractAccumulator.java
@@ -27,64 +27,63 @@
import static com.google.common.base.Preconditions.checkNotNull;
/**
- * Base implementation of an event accumulator. It allows triggering based on
- * event inter-arrival time threshold, maximum batch life threshold and maximum
+ * Base implementation of an item accumulator. It allows triggering based on
+ * item inter-arrival time threshold, maximum batch life threshold and maximum
* batch size.
*/
-// FIXME refactor the names here
public abstract class AbstractAccumulator<T> implements Accumulator<T> {
private Logger log = LoggerFactory.getLogger(AbstractAccumulator.class);
private final Timer timer;
- private final int maxEvents;
+ private final int maxItems;
private final int maxBatchMillis;
private final int maxIdleMillis;
private TimerTask idleTask = new ProcessorTask();
private TimerTask maxTask = new ProcessorTask();
- private List<T> events = Lists.newArrayList();
+ private List<T> items = Lists.newArrayList();
/**
- * Creates an event accumulator capable of triggering on the specified
+ * Creates an item accumulator capable of triggering on the specified
* thresholds.
*
- * @param timer timer to use for scheduling check-points
- * @param maxEvents maximum number of events to accumulate before
- * processing is triggered
- * @param maxBatchMillis maximum number of millis allowed since the first
- * event before processing is triggered
- * @param maxIdleMillis maximum number millis between events before
- * processing is triggered
+ * @param timer timer to use for scheduling check-points
+ * @param maxItems maximum number of items to accumulate before
+ * processing is triggered
+ * @param maxBatchMillis maximum number of millis allowed since the first
+ * item before processing is triggered
+ * @param maxIdleMillis maximum number millis between items before
+ * processing is triggered
*/
- protected AbstractAccumulator(Timer timer, int maxEvents,
+ protected AbstractAccumulator(Timer timer, int maxItems,
int maxBatchMillis, int maxIdleMillis) {
this.timer = checkNotNull(timer, "Timer cannot be null");
- checkArgument(maxEvents > 1, "Maximum number of events must be > 1");
+ checkArgument(maxItems > 1, "Maximum number of items must be > 1");
checkArgument(maxBatchMillis > 0, "Maximum millis must be positive");
checkArgument(maxIdleMillis > 0, "Maximum idle millis must be positive");
- this.maxEvents = maxEvents;
+ this.maxItems = maxItems;
this.maxBatchMillis = maxBatchMillis;
this.maxIdleMillis = maxIdleMillis;
}
@Override
- public synchronized void add(T event) {
+ public synchronized void add(T item) {
idleTask = cancelIfActive(idleTask);
- events.add(checkNotNull(event, "Event cannot be null"));
+ items.add(checkNotNull(item, "Item cannot be null"));
- // Did we hit the max event threshold?
- if (events.size() == maxEvents) {
+ // Did we hit the max item threshold?
+ if (items.size() == maxItems) {
maxTask = cancelIfActive(maxTask);
schedule(1);
} else {
- // Otherwise, schedule idle task and if this is a first event
+ // Otherwise, schedule idle task and if this is a first item
// also schedule the max batch age task.
idleTask = schedule(maxIdleMillis);
- if (events.size() == 1) {
+ if (items.size() == 1) {
maxTask = schedule(maxBatchMillis);
}
}
@@ -105,24 +104,24 @@
return task;
}
- // Task for triggering processing of accumulated events
+ // Task for triggering processing of accumulated items
private class ProcessorTask extends TimerTask {
@Override
public void run() {
try {
idleTask = cancelIfActive(idleTask);
maxTask = cancelIfActive(maxTask);
- processEvents(finalizeCurrentBatch());
+ processItems(finalizeCurrentBatch());
} catch (Exception e) {
log.warn("Unable to process batch due to {}", e);
}
}
}
- // Demotes and returns the current batch of events and promotes a new one.
+ // Demotes and returns the current batch of items and promotes a new one.
private synchronized List<T> finalizeCurrentBatch() {
- List<T> toBeProcessed = events;
- events = Lists.newArrayList();
+ List<T> toBeProcessed = items;
+ items = Lists.newArrayList();
return toBeProcessed;
}
@@ -136,18 +135,18 @@
}
/**
- * Returns the maximum number of events allowed to accumulate before
+ * Returns the maximum number of items allowed to accumulate before
* processing is triggered.
*
- * @return max number of events
+ * @return max number of items
*/
- public int maxEvents() {
- return maxEvents;
+ public int maxItems() {
+ return maxItems;
}
/**
* Returns the maximum number of millis allowed to expire since the first
- * event before processing is triggered.
+ * item before processing is triggered.
*
* @return max number of millis a batch is allowed to last
*/
@@ -157,9 +156,9 @@
/**
* Returns the maximum number of millis allowed to expire since the last
- * event arrival before processing is triggered.
+ * item arrival before processing is triggered.
*
- * @return max number of millis since the last event
+ * @return max number of millis since the last item
*/
public int maxIdleMillis() {
return maxIdleMillis;
diff --git a/utils/misc/src/main/java/org/onlab/util/Accumulator.java b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
index 897bddf..568e38c 100644
--- a/utils/misc/src/main/java/org/onlab/util/Accumulator.java
+++ b/utils/misc/src/main/java/org/onlab/util/Accumulator.java
@@ -18,25 +18,28 @@
import java.util.List;
/**
- * Abstraction of an accumulator capable of collecting events and at some
- * point in time triggers processing of all previously accumulated events.
+ * Abstraction of an accumulator capable of collecting items and at some
+ * point in time triggers processing of all previously accumulated items.
+ *
+ * @param <T> item type
*/
public interface Accumulator<T> {
/**
- * Adds an event to the current batch. This operation may, or may not
- * trigger processing of the current batch of events.
+ * Adds an item to the current batch. This operation may, or may not
+ * trigger processing of the current batch of items.
*
- * @param event event to be added to the current batch
+ * @param item item to be added to the current batch
*/
- void add(T event);
+ void add(T item);
/**
- * Processes the specified list of accumulated events.
+ * Processes the specified list of accumulated items.
*
- * @param events list of accumulated events
+ * @param items list of accumulated items
*/
- void processEvents(List<T> events);
+ void processItems(List<T> items);
//TODO consider a blocking version that required consumer participation
+
}
diff --git a/core/api/src/test/java/org/onosproject/event/AbstractEventAccumulatorTest.java b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
similarity index 69%
rename from core/api/src/test/java/org/onosproject/event/AbstractEventAccumulatorTest.java
rename to utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
index 67163dc5..b2b0b1f 100644
--- a/core/api/src/test/java/org/onosproject/event/AbstractEventAccumulatorTest.java
+++ b/utils/misc/src/test/java/org/onlab/util/AbstractAccumulatorTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2014 Open Networking Laboratory
+ * Copyright 2015 Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.event;
+package org.onlab.util;
import org.junit.Ignore;
import org.junit.Test;
@@ -21,16 +21,13 @@
import java.util.List;
import java.util.Timer;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import static org.onlab.junit.TestTools.delay;
-import static org.onosproject.event.TestEvent.Type.FOO;
/**
* Tests the operation of the accumulator.
*/
-public class AbstractEventAccumulatorTest {
+public class AbstractAccumulatorTest {
private final Timer timer = new Timer();
@@ -38,7 +35,7 @@
public void basics() throws Exception {
TestAccumulator accumulator = new TestAccumulator();
assertEquals("incorrect timer", timer, accumulator.timer());
- assertEquals("incorrect max events", 5, accumulator.maxEvents());
+ assertEquals("incorrect max events", 5, accumulator.maxItems());
assertEquals("incorrect max ms", 100, accumulator.maxBatchMillis());
assertEquals("incorrect idle ms", 50, accumulator.maxIdleMillis());
}
@@ -46,12 +43,12 @@
@Test
public void eventTrigger() {
TestAccumulator accumulator = new TestAccumulator();
- accumulator.add(new TestEvent(FOO, "a"));
- accumulator.add(new TestEvent(FOO, "b"));
- accumulator.add(new TestEvent(FOO, "c"));
- accumulator.add(new TestEvent(FOO, "d"));
+ accumulator.add(new TestItem("a"));
+ accumulator.add(new TestItem("b"));
+ accumulator.add(new TestItem("c"));
+ accumulator.add(new TestItem("d"));
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
- accumulator.add(new TestEvent(FOO, "e"));
+ accumulator.add(new TestItem("e"));
delay(20);
assertFalse("should have fired", accumulator.batch.isEmpty());
assertEquals("incorrect batch", "abcde", accumulator.batch);
@@ -61,16 +58,16 @@
@Test
public void timeTrigger() {
TestAccumulator accumulator = new TestAccumulator();
- accumulator.add(new TestEvent(FOO, "a"));
+ accumulator.add(new TestItem("a"));
delay(30);
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
- accumulator.add(new TestEvent(FOO, "b"));
+ accumulator.add(new TestItem("b"));
delay(30);
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
- accumulator.add(new TestEvent(FOO, "c"));
+ accumulator.add(new TestItem("c"));
delay(30);
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
- accumulator.add(new TestEvent(FOO, "d"));
+ accumulator.add(new TestItem("d"));
delay(30);
assertFalse("should have fired", accumulator.batch.isEmpty());
assertEquals("incorrect batch", "abcd", accumulator.batch);
@@ -79,15 +76,23 @@
@Test
public void idleTrigger() {
TestAccumulator accumulator = new TestAccumulator();
- accumulator.add(new TestEvent(FOO, "a"));
+ accumulator.add(new TestItem("a"));
assertTrue("should not have fired yet", accumulator.batch.isEmpty());
- accumulator.add(new TestEvent(FOO, "b"));
+ accumulator.add(new TestItem("b"));
delay(80);
assertFalse("should have fired", accumulator.batch.isEmpty());
assertEquals("incorrect batch", "ab", accumulator.batch);
}
- private class TestAccumulator extends AbstractEventAccumulator {
+ private class TestItem {
+ private final String s;
+
+ public TestItem(String s) {
+ this.s = s;
+ }
+ }
+
+ private class TestAccumulator extends AbstractAccumulator<TestItem> {
String batch = "";
@@ -96,10 +101,11 @@
}
@Override
- public void processEvents(List<Event> events) {
- for (Event event : events) {
- batch += event.subject();
+ public void processItems(List<TestItem> items) {
+ for (TestItem item : items) {
+ batch += item.s;
}
}
}
+
}
diff --git a/web/gui/src/main/java/org/onosproject/gui/TopologyViewWebSocket.java b/web/gui/src/main/java/org/onosproject/gui/TopologyViewWebSocket.java
index c3ffc1d..44ad026 100644
--- a/web/gui/src/main/java/org/onosproject/gui/TopologyViewWebSocket.java
+++ b/web/gui/src/main/java/org/onosproject/gui/TopologyViewWebSocket.java
@@ -20,14 +20,14 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.eclipse.jetty.websocket.WebSocket;
import org.onlab.osgi.ServiceDirectory;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.event.AbstractEventAccumulator;
import org.onosproject.event.Event;
-import org.onosproject.event.EventAccumulator;
import org.onosproject.mastership.MastershipAdminService;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
@@ -120,7 +120,7 @@
private final IntentListener intentListener = new InternalIntentListener();
private final FlowRuleListener flowListener = new InternalFlowListener();
- private final EventAccumulator eventAccummulator = new InternalEventAccummulator();
+ private final Accumulator<Event> eventAccummulator = new InternalEventAccummulator();
private TimerTask trafficTask;
private ObjectNode trafficEvent;
@@ -721,13 +721,13 @@
}
// Accumulates events to drive methodic update of the summary pane.
- private class InternalEventAccummulator extends AbstractEventAccumulator {
+ private class InternalEventAccummulator extends AbstractAccumulator<Event> {
protected InternalEventAccummulator() {
super(new Timer("topo-summary"), MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
}
@Override
- public void processEvents(List<Event> events) {
+ public void processItems(List<Event> items) {
try {
if (summaryEvent != null) {
sendMessage(summmaryMessage(0));