Adding configurability to the even accumulator for the topology provider.
Change-Id: I35ede9a62782dc6a2e55b8895aeec6ece8836960
diff --git a/core/net/pom.xml b/core/net/pom.xml
index 7e7f4ad..3a2d052 100644
--- a/core/net/pom.xml
+++ b/core/net/pom.xml
@@ -58,6 +58,11 @@
</dependency>
<dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.compendium</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
diff --git a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
index 214fdff..ac9fd00 100644
--- a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
+++ b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
@@ -15,9 +15,12 @@
*/
package org.onlab.onos.net.topology.impl;
+import com.google.common.collect.ImmutableList;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
@@ -36,13 +39,16 @@
import org.onlab.onos.net.topology.TopologyProvider;
import org.onlab.onos.net.topology.TopologyProviderRegistry;
import org.onlab.onos.net.topology.TopologyProviderService;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import java.util.Collections;
+import java.util.Dictionary;
import java.util.List;
import java.util.Timer;
import java.util.concurrent.ExecutorService;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.onlab.onos.core.CoreService.CORE_PROVIDER_ID;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
@@ -59,16 +65,27 @@
public class DefaultTopologyProvider extends AbstractProvider
implements TopologyProvider {
- // TODO: make these configurable
- private static final int MAX_EVENTS = 100;
- private static final int MAX_IDLE_MS = 5;
- private static final int MAX_BATCH_MS = 50;
private static final int MAX_THREADS = 8;
+ private static final int DEFAULT_MAX_EVENTS = 100;
+ private static final int DEFAULT_MAX_BATCH_MS = 50;
+ private static final int DEFAULT_MAX_IDLE_MS = 5;
// FIXME: Replace with a system-wide timer instance;
// TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
private static final Timer TIMER = new Timer();
+ @Property(name = "maxEvents", intValue = DEFAULT_MAX_EVENTS,
+ label = "Maximum number of events to accumulate")
+ private int maxEvents = DEFAULT_MAX_EVENTS;
+
+ @Property(name = "maxIdleMs", intValue = DEFAULT_MAX_IDLE_MS,
+ label = "Maximum number of millis between events")
+ private int maxIdleMs = DEFAULT_MAX_IDLE_MS;
+
+ @Property(name = "maxBatchMs", intValue = DEFAULT_MAX_BATCH_MS,
+ label = "Maximum number of millis for whole batch")
+ private int maxBatchMs = DEFAULT_MAX_BATCH_MS;
+
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -97,9 +114,9 @@
}
@Activate
- public synchronized void activate() {
+ public synchronized void activate(ComponentContext context) {
executor = newFixedThreadPool(MAX_THREADS, namedThreads("topo-build-%d"));
- accumulator = new TopologyChangeAccumulator();
+ modified(context);
providerService = providerRegistry.register(this);
deviceService.addListener(deviceListener);
@@ -111,7 +128,7 @@
}
@Deactivate
- public synchronized void deactivate() {
+ public synchronized void deactivate(ComponentContext context) {
isStarted = false;
deviceService.removeListener(deviceListener);
@@ -125,6 +142,41 @@
log.info("Stopped");
}
+ @Modified
+ public void modified(ComponentContext context) {
+ if (context == null) {
+ accumulator = new TopologyChangeAccumulator();
+ return;
+ }
+
+ Dictionary properties = context.getProperties();
+ int newMaxEvents, newMaxBatchMs, newMaxIdleMs;
+ try {
+ String s = (String) properties.get("maxEvents");
+ newMaxEvents = isNullOrEmpty(s) ? maxEvents : Integer.parseInt(s);
+
+ s = (String) properties.get("maxBatchMs");
+ newMaxBatchMs = isNullOrEmpty(s) ? maxBatchMs : Integer.parseInt(s);
+
+ s = (String) properties.get("maxIdleMs");
+ newMaxIdleMs = isNullOrEmpty(s) ? maxIdleMs : Integer.parseInt(s);
+ } catch (Exception e) {
+ newMaxEvents = DEFAULT_MAX_EVENTS;
+ newMaxBatchMs = DEFAULT_MAX_BATCH_MS;
+ newMaxIdleMs = DEFAULT_MAX_IDLE_MS;
+ }
+
+ if (newMaxEvents != maxEvents || newMaxBatchMs != maxBatchMs || newMaxIdleMs != maxIdleMs) {
+ maxEvents = newMaxEvents;
+ maxBatchMs = newMaxBatchMs;
+ maxIdleMs = newMaxIdleMs;
+ accumulator = maxEvents > 1 ? new TopologyChangeAccumulator() : null;
+ log.info("Reconfigured with maxEvents = {}; maxBatchMs = {}; maxIdleMs = {}",
+ maxEvents, maxBatchMs, maxIdleMs);
+ }
+ }
+
+
@Override
public void triggerRecompute() {
triggerTopologyBuild(Collections.<Event>emptyList());
@@ -154,6 +206,14 @@
}
}
+ private void processEvent(Event event) {
+ if (accumulator != null) {
+ accumulator.add(event);
+ } else {
+ triggerTopologyBuild(ImmutableList.of(event));
+ }
+ }
+
// Callback for device events
private class InternalDeviceListener implements DeviceListener {
@Override
@@ -161,7 +221,7 @@
DeviceEvent.Type type = event.type();
if (type == DEVICE_ADDED || type == DEVICE_REMOVED ||
type == DEVICE_AVAILABILITY_CHANGED) {
- accumulator.add(event);
+ processEvent(event);
}
}
}
@@ -170,7 +230,7 @@
private class InternalLinkListener implements LinkListener {
@Override
public void event(LinkEvent event) {
- accumulator.add(event);
+ processEvent(event);
}
}
@@ -179,7 +239,7 @@
extends AbstractEventAccumulator implements EventAccumulator {
TopologyChangeAccumulator() {
- super(TIMER, MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
+ super(TIMER, maxEvents, maxBatchMs, maxIdleMs);
}
@Override
diff --git a/core/net/src/test/java/org/onlab/onos/net/topology/impl/DefaultTopologyProviderTest.java b/core/net/src/test/java/org/onlab/onos/net/topology/impl/DefaultTopologyProviderTest.java
index 3ea1abb..56594233 100644
--- a/core/net/src/test/java/org/onlab/onos/net/topology/impl/DefaultTopologyProviderTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/topology/impl/DefaultTopologyProviderTest.java
@@ -67,12 +67,12 @@
provider.deviceService = deviceService;
provider.linkService = linkService;
provider.providerRegistry = topologyService;
- provider.activate();
+ provider.activate(null);
}
@After
public void tearDown() {
- provider.deactivate();
+ provider.deactivate(null);
provider.providerRegistry = null;
provider.deviceService = null;
provider.linkService = null;