Adding configurability to the even accumulator for the topology provider.

Change-Id: I35ede9a62782dc6a2e55b8895aeec6ece8836960
diff --git a/core/net/pom.xml b/core/net/pom.xml
index 7e7f4ad..3a2d052 100644
--- a/core/net/pom.xml
+++ b/core/net/pom.xml
@@ -58,6 +58,11 @@
         </dependency>
 
         <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.compendium</artifactId>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.felix</groupId>
             <artifactId>org.apache.felix.scr.annotations</artifactId>
         </dependency>
diff --git a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
index 214fdff..ac9fd00 100644
--- a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
+++ b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
@@ -15,9 +15,12 @@
  */
 package org.onlab.onos.net.topology.impl;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
@@ -36,13 +39,16 @@
 import org.onlab.onos.net.topology.TopologyProvider;
 import org.onlab.onos.net.topology.TopologyProviderRegistry;
 import org.onlab.onos.net.topology.TopologyProviderService;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
 import java.util.Collections;
+import java.util.Dictionary;
 import java.util.List;
 import java.util.Timer;
 import java.util.concurrent.ExecutorService;
 
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.concurrent.Executors.newFixedThreadPool;
 import static org.onlab.onos.core.CoreService.CORE_PROVIDER_ID;
 import static org.onlab.onos.net.device.DeviceEvent.Type.*;
@@ -59,16 +65,27 @@
 public class DefaultTopologyProvider extends AbstractProvider
         implements TopologyProvider {
 
-    // TODO: make these configurable
-    private static final int MAX_EVENTS = 100;
-    private static final int MAX_IDLE_MS = 5;
-    private static final int MAX_BATCH_MS = 50;
     private static final int MAX_THREADS = 8;
+    private static final int DEFAULT_MAX_EVENTS = 100;
+    private static final int DEFAULT_MAX_BATCH_MS = 50;
+    private static final int DEFAULT_MAX_IDLE_MS = 5;
 
     // FIXME: Replace with a system-wide timer instance;
     // TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
     private static final Timer TIMER = new Timer();
 
+    @Property(name = "maxEvents", intValue = DEFAULT_MAX_EVENTS,
+            label = "Maximum number of events to accumulate")
+    private int maxEvents = DEFAULT_MAX_EVENTS;
+
+    @Property(name = "maxIdleMs", intValue = DEFAULT_MAX_IDLE_MS,
+            label = "Maximum number of millis between events")
+    private int maxIdleMs = DEFAULT_MAX_IDLE_MS;
+
+    @Property(name = "maxBatchMs", intValue = DEFAULT_MAX_BATCH_MS,
+            label = "Maximum number of millis for whole batch")
+    private int maxBatchMs = DEFAULT_MAX_BATCH_MS;
+
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -97,9 +114,9 @@
     }
 
     @Activate
-    public synchronized void activate() {
+    public synchronized void activate(ComponentContext context) {
         executor = newFixedThreadPool(MAX_THREADS, namedThreads("topo-build-%d"));
-        accumulator = new TopologyChangeAccumulator();
+        modified(context);
 
         providerService = providerRegistry.register(this);
         deviceService.addListener(deviceListener);
@@ -111,7 +128,7 @@
     }
 
     @Deactivate
-    public synchronized void deactivate() {
+    public synchronized void deactivate(ComponentContext context) {
         isStarted = false;
 
         deviceService.removeListener(deviceListener);
@@ -125,6 +142,41 @@
         log.info("Stopped");
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        if (context == null) {
+            accumulator = new TopologyChangeAccumulator();
+            return;
+        }
+
+        Dictionary properties = context.getProperties();
+        int newMaxEvents, newMaxBatchMs, newMaxIdleMs;
+        try {
+            String s = (String) properties.get("maxEvents");
+            newMaxEvents = isNullOrEmpty(s) ? maxEvents : Integer.parseInt(s);
+
+            s = (String) properties.get("maxBatchMs");
+            newMaxBatchMs = isNullOrEmpty(s) ? maxBatchMs : Integer.parseInt(s);
+
+            s = (String) properties.get("maxIdleMs");
+            newMaxIdleMs = isNullOrEmpty(s) ? maxIdleMs : Integer.parseInt(s);
+        } catch (Exception e) {
+            newMaxEvents = DEFAULT_MAX_EVENTS;
+            newMaxBatchMs = DEFAULT_MAX_BATCH_MS;
+            newMaxIdleMs = DEFAULT_MAX_IDLE_MS;
+        }
+
+        if (newMaxEvents != maxEvents || newMaxBatchMs != maxBatchMs || newMaxIdleMs != maxIdleMs) {
+            maxEvents = newMaxEvents;
+            maxBatchMs = newMaxBatchMs;
+            maxIdleMs = newMaxIdleMs;
+            accumulator = maxEvents > 1 ? new TopologyChangeAccumulator() : null;
+            log.info("Reconfigured with maxEvents = {}; maxBatchMs = {}; maxIdleMs = {}",
+                     maxEvents, maxBatchMs, maxIdleMs);
+        }
+    }
+
+
     @Override
     public void triggerRecompute() {
         triggerTopologyBuild(Collections.<Event>emptyList());
@@ -154,6 +206,14 @@
         }
     }
 
+    private void processEvent(Event event) {
+        if (accumulator != null) {
+            accumulator.add(event);
+        } else {
+            triggerTopologyBuild(ImmutableList.of(event));
+        }
+    }
+
     // Callback for device events
     private class InternalDeviceListener implements DeviceListener {
         @Override
@@ -161,7 +221,7 @@
             DeviceEvent.Type type = event.type();
             if (type == DEVICE_ADDED || type == DEVICE_REMOVED ||
                     type == DEVICE_AVAILABILITY_CHANGED) {
-                accumulator.add(event);
+                processEvent(event);
             }
         }
     }
@@ -170,7 +230,7 @@
     private class InternalLinkListener implements LinkListener {
         @Override
         public void event(LinkEvent event) {
-            accumulator.add(event);
+            processEvent(event);
         }
     }
 
@@ -179,7 +239,7 @@
             extends AbstractEventAccumulator implements EventAccumulator {
 
         TopologyChangeAccumulator() {
-            super(TIMER, MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
+            super(TIMER, maxEvents, maxBatchMs, maxIdleMs);
         }
 
         @Override
diff --git a/core/net/src/test/java/org/onlab/onos/net/topology/impl/DefaultTopologyProviderTest.java b/core/net/src/test/java/org/onlab/onos/net/topology/impl/DefaultTopologyProviderTest.java
index 3ea1abb..56594233 100644
--- a/core/net/src/test/java/org/onlab/onos/net/topology/impl/DefaultTopologyProviderTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/topology/impl/DefaultTopologyProviderTest.java
@@ -67,12 +67,12 @@
         provider.deviceService = deviceService;
         provider.linkService = linkService;
         provider.providerRegistry = topologyService;
-        provider.activate();
+        provider.activate(null);
     }
 
     @After
     public void tearDown() {
-        provider.deactivate();
+        provider.deactivate(null);
         provider.providerRegistry = null;
         provider.deviceService = null;
         provider.linkService = null;