Fixing various issues and re-tuning.

Change-Id: I8822fcf77cfa507788241c5bda98ef4741b284b4
diff --git a/cli/src/main/java/org/onlab/onos/cli/BalanceMastersCommand.java b/cli/src/main/java/org/onlab/onos/cli/BalanceMastersCommand.java
index a3f3c23..77d0c16 100644
--- a/cli/src/main/java/org/onlab/onos/cli/BalanceMastersCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/BalanceMastersCommand.java
@@ -23,13 +23,15 @@
 import org.onlab.onos.mastership.MastershipAdminService;
 import org.onlab.onos.mastership.MastershipService;
 import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.net.device.DeviceService;
 
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
 import static com.google.common.collect.Lists.newArrayList;
+import static org.onlab.onos.net.MastershipRole.MASTER;
 
 /**
  * Forces device mastership rebalancing.
@@ -50,73 +52,62 @@
 
         // Create buckets reflecting current ownership.
         for (ControllerNode node : nodes) {
-            controllerDevices.putAll(node, mastershipService.getDevicesOf(node.id()));
+            Set<DeviceId> devicesOf = mastershipService.getDevicesOf(node.id());
+            controllerDevices.putAll(node, devicesOf);
+            print("Node %s has %d devices.", node.id(), devicesOf.size());
         }
 
-        int bucketCount = nodes.size();
-        for (int i = 0; i < bucketCount / 2; i++) {
+        int rounds = nodes.size();
+        for (int i = 0; i < rounds; i++) {
             // Iterate over the buckets and find the smallest and the largest.
-            ControllerNode smallest = findSmallestBucket(controllerDevices);
-            ControllerNode largest = findLargestBucket(controllerDevices);
-            balanceBuckets(smallest, largest, controllerDevices,
-                           mastershipService, adminService);
+            ControllerNode smallest = findBucket(true, nodes, controllerDevices);
+            ControllerNode largest = findBucket(false, nodes, controllerDevices);
+            balanceBuckets(smallest, largest, controllerDevices, adminService);
         }
     }
 
-    private ControllerNode findSmallestBucket(Multimap<ControllerNode, DeviceId> controllerDevices) {
-        int minSize = Integer.MAX_VALUE;
-        ControllerNode minNode = null;
-        for (ControllerNode node : controllerDevices.keySet()) {
+    private ControllerNode findBucket(boolean min, Collection<ControllerNode> nodes,
+                                      Multimap<ControllerNode, DeviceId> controllerDevices) {
+        int xSize = min ? Integer.MAX_VALUE : -1;
+        ControllerNode xNode = null;
+        for (ControllerNode node : nodes) {
             int size = controllerDevices.get(node).size();
-            if (size < minSize) {
-                minSize = size;
-                minNode = node;
+            if ((min && size < xSize) || (!min && size > xSize)) {
+                xSize = size;
+                xNode = node;
             }
         }
-        return minNode;
-    }
-
-    private ControllerNode findLargestBucket(Multimap<ControllerNode, DeviceId> controllerDevices) {
-        int maxSize = -1;
-        ControllerNode maxNode = null;
-        for (ControllerNode node : controllerDevices.keySet()) {
-            int size = controllerDevices.get(node).size();
-            if (size >= maxSize) {
-                maxSize = size;
-                maxNode = node;
-            }
-        }
-        return maxNode;
+        return xNode;
     }
 
     // FIXME: enhance to better handle cases where smallest cannot take any of the devices from largest
 
     private void balanceBuckets(ControllerNode smallest, ControllerNode largest,
                                 Multimap<ControllerNode, DeviceId> controllerDevices,
-                                MastershipService mastershipService,
                                 MastershipAdminService adminService) {
         Collection<DeviceId> minBucket = controllerDevices.get(smallest);
         Collection<DeviceId> maxBucket = controllerDevices.get(largest);
+        int bucketCount = controllerDevices.keySet().size();
+        int deviceCount = get(DeviceService.class).getDeviceCount();
 
         int delta = (maxBucket.size() - minBucket.size()) / 2;
+        delta = Math.min(deviceCount / bucketCount, delta);
 
-        print("Attempting to move %d nodes from %s to %s...",
-              delta, largest.id(), smallest.id());
+        if (delta > 0) {
+            print("Attempting to move %d nodes from %s to %s...",
+                  delta, largest.id(), smallest.id());
 
-        int i = 0;
-        Iterator<DeviceId> it = maxBucket.iterator();
-        while (it.hasNext() && i < delta) {
-            DeviceId deviceId = it.next();
-
-            // Check that the transfer can happen for the current element.
-            if (mastershipService.getNodesFor(deviceId).backups().contains(smallest.id())) {
-                print("Setting %s as the new master for %s", smallest.id(), deviceId);
-                adminService.setRole(smallest.id(), deviceId, MastershipRole.MASTER);
+            int i = 0;
+            Iterator<DeviceId> it = maxBucket.iterator();
+            while (it.hasNext() && i < delta) {
+                DeviceId deviceId = it.next();
+                print("Setting %s as the master for %s", smallest.id(), deviceId);
+                adminService.setRole(smallest.id(), deviceId, MASTER);
+                controllerDevices.put(smallest, deviceId);
+                it.remove();
                 i++;
             }
         }
-
-        controllerDevices.removeAll(smallest);
     }
 
 }
diff --git a/core/api/src/main/java/org/onlab/onos/event/AbstractEventAccumulator.java b/core/api/src/main/java/org/onlab/onos/event/AbstractEventAccumulator.java
index f866633..39c1103 100644
--- a/core/api/src/main/java/org/onlab/onos/event/AbstractEventAccumulator.java
+++ b/core/api/src/main/java/org/onlab/onos/event/AbstractEventAccumulator.java
@@ -16,6 +16,8 @@
 package org.onlab.onos.event;
 
 import com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Timer;
@@ -31,6 +33,8 @@
  */
 public abstract class AbstractEventAccumulator implements EventAccumulator {
 
+    private Logger log = LoggerFactory.getLogger(AbstractEventAccumulator.class);
+
     private final Timer timer;
     private final int maxEvents;
     private final int maxBatchMillis;
@@ -104,9 +108,13 @@
     private class ProcessorTask extends TimerTask {
         @Override
         public void run() {
-            idleTask = cancelIfActive(idleTask);
-            maxTask = cancelIfActive(maxTask);
-            processEvents(finalizeCurrentBatch());
+            try {
+                idleTask = cancelIfActive(idleTask);
+                maxTask = cancelIfActive(maxTask);
+                processEvents(finalizeCurrentBatch());
+            } catch (Exception e) {
+                log.warn("Unable to process batch due to {}", e.getMessage());
+            }
         }
     }
 
diff --git a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
index ac9fd00..3f6e285 100644
--- a/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
+++ b/core/net/src/main/java/org/onlab/onos/net/topology/impl/DefaultTopologyProvider.java
@@ -66,13 +66,13 @@
         implements TopologyProvider {
 
     private static final int MAX_THREADS = 8;
-    private static final int DEFAULT_MAX_EVENTS = 100;
-    private static final int DEFAULT_MAX_BATCH_MS = 50;
-    private static final int DEFAULT_MAX_IDLE_MS = 5;
+    private static final int DEFAULT_MAX_EVENTS = 200;
+    private static final int DEFAULT_MAX_BATCH_MS = 60;
+    private static final int DEFAULT_MAX_IDLE_MS = 30;
 
     // FIXME: Replace with a system-wide timer instance;
     // TODO: Convert to use HashedWheelTimer or produce a variant of that; then decide which we want to adopt
-    private static final Timer TIMER = new Timer();
+    private static final Timer TIMER = new Timer("topo-event-batching");
 
     @Property(name = "maxEvents", intValue = DEFAULT_MAX_EVENTS,
             label = "Maximum number of events to accumulate")
@@ -122,6 +122,9 @@
         deviceService.addListener(deviceListener);
         linkService.addListener(linkListener);
 
+        log.info("Configured with maxEvents = {}; maxBatchMs = {}; maxIdleMs = {}",
+                 maxEvents, maxBatchMs, maxIdleMs);
+
         isStarted = true;
         triggerRecompute();
         log.info("Started");
diff --git a/tools/test/bin/onos-topo-cfg-all b/tools/test/bin/onos-topo-cfg-all
new file mode 100755
index 0000000..adf84c1
--- /dev/null
+++ b/tools/test/bin/onos-topo-cfg-all
@@ -0,0 +1,15 @@
+#!/bin/bash
+# -----------------------------------------------------------------------------
+# ONOS topology configuration uploader.
+# -----------------------------------------------------------------------------
+
+[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
+. $ONOS_ROOT/tools/build/envDefaults
+
+nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
+
+for node in $nodes; do
+    printf "$node..."
+    onos-topo-cfg $node $1
+done
+printf "\n"
diff --git a/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
index 57781de..d624d04 100644
--- a/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
+++ b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
@@ -24,6 +24,9 @@
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.core.CoreService;
+import org.onlab.onos.event.AbstractEventAccumulator;
+import org.onlab.onos.event.Event;
+import org.onlab.onos.event.EventAccumulator;
 import org.onlab.onos.mastership.MastershipEvent;
 import org.onlab.onos.mastership.MastershipListener;
 import org.onlab.onos.net.ConnectPoint;
@@ -36,6 +39,8 @@
 import org.onlab.onos.net.device.DeviceListener;
 import org.onlab.onos.net.flow.DefaultTrafficSelector;
 import org.onlab.onos.net.flow.DefaultTrafficTreatment;
+import org.onlab.onos.net.flow.FlowRuleEvent;
+import org.onlab.onos.net.flow.FlowRuleListener;
 import org.onlab.onos.net.flow.TrafficSelector;
 import org.onlab.onos.net.flow.TrafficTreatment;
 import org.onlab.onos.net.host.HostEvent;
@@ -84,8 +89,7 @@
 
     private static final String APP_ID = "org.onlab.onos.gui";
 
-    private static final long SUMMARY_FREQUENCY_SEC = 3000;
-    private static final long TRAFFIC_FREQUENCY_SEC = 1500;
+    private static final long TRAFFIC_FREQUENCY_SEC = 2000;
 
     private static final Comparator<? super ControllerNode> NODE_COMPARATOR =
             new Comparator<ControllerNode>() {
@@ -95,6 +99,13 @@
                 }
             };
 
+
+    private final Timer timer = new Timer("topology-view");
+
+    private static final int MAX_EVENTS = 500;
+    private static final int MAX_BATCH_MS = 1000;
+    private static final int MAX_IDLE_MS = 500;
+
     private final ApplicationId appId;
 
     private Connection connection;
@@ -106,16 +117,14 @@
     private final LinkListener linkListener = new InternalLinkListener();
     private final HostListener hostListener = new InternalHostListener();
     private final IntentListener intentListener = new InternalIntentListener();
+    private final FlowRuleListener flowListener = new InternalFlowListener();
 
-    // Timers and objects being monitored
-    private final Timer timer = new Timer("topology-view");
+    private final EventAccumulator eventAccummulator = new InternalEventAccummulator();
 
+    private boolean summaryEnabled = true;
     private TimerTask trafficTask;
     private ObjectNode trafficEvent;
 
-    private TimerTask summaryTask;
-    private ObjectNode summaryEvent;
-
     private long lastActive = System.currentTimeMillis();
     private boolean listenersRemoved = false;
 
@@ -128,7 +137,6 @@
      */
     public TopologyViewWebSocket(ServiceDirectory directory) {
         super(directory);
-
         intentFilter = new TopologyViewIntentFilter(intentService, deviceService,
                                                     hostService, linkService);
         appId = directory.get(CoreService.class).registerApplication(APP_ID);
@@ -431,21 +439,13 @@
 
     // Subscribes for summary messages.
     private synchronized void requestSummary(ObjectNode event) {
-        if (summaryTask == null) {
-            summaryEvent = event;
-            summaryTask = new SummaryMonitor();
-            timer.schedule(summaryTask, SUMMARY_FREQUENCY_SEC, SUMMARY_FREQUENCY_SEC);
-        }
+        summaryEnabled = true;
         sendMessage(summmaryMessage(number(event, "sid")));
     }
 
     // Cancels sending summary messages.
     private synchronized void cancelSummary(ObjectNode event) {
-        if (summaryTask != null) {
-            summaryTask.cancel();
-            summaryTask = null;
-            summaryEvent = null;
-        }
+        summaryEnabled = false;
     }
 
 
@@ -457,6 +457,7 @@
         linkService.addListener(linkListener);
         hostService.addListener(hostListener);
         intentService.addListener(intentListener);
+        flowService.addListener(flowListener);
     }
 
     // Removes all internal listeners.
@@ -469,6 +470,7 @@
             linkService.removeListener(linkListener);
             hostService.removeListener(hostListener);
             intentService.removeListener(intentListener);
+            flowService.removeListener(flowListener);
         }
     }
 
@@ -495,6 +497,7 @@
         @Override
         public void event(DeviceEvent event) {
             sendMessage(deviceMessage(event));
+            eventAccummulator.add(event);
         }
     }
 
@@ -503,6 +506,7 @@
         @Override
         public void event(LinkEvent event) {
             sendMessage(linkMessage(event));
+            eventAccummulator.add(event);
         }
     }
 
@@ -511,6 +515,7 @@
         @Override
         public void event(HostEvent event) {
             sendMessage(hostMessage(event));
+            eventAccummulator.add(event);
         }
     }
 
@@ -521,33 +526,55 @@
             if (trafficEvent != null) {
                 requestTraffic(trafficEvent);
             }
+            eventAccummulator.add(event);
+        }
+    }
+
+    // Intent event listener.
+    private class InternalFlowListener implements FlowRuleListener {
+        @Override
+        public void event(FlowRuleEvent event) {
+            eventAccummulator.add(event);
         }
     }
 
     private class TrafficMonitor extends TimerTask {
         @Override
         public void run() {
-            if (trafficEvent != null) {
-                String type = string(trafficEvent, "event", "unknown");
-                if (type.equals("requestAllTraffic")) {
-                    requestAllTraffic(trafficEvent);
-                } else if (type.equals("requestDeviceLinkFlows")) {
-                    requestDeviceLinkFlows(trafficEvent);
-                } else {
-                    requestTraffic(trafficEvent);
+            try {
+                if (trafficEvent != null) {
+                    String type = string(trafficEvent, "event", "unknown");
+                    if (type.equals("requestAllTraffic")) {
+                        requestAllTraffic(trafficEvent);
+                    } else if (type.equals("requestDeviceLinkFlows")) {
+                        requestDeviceLinkFlows(trafficEvent);
+                    } else {
+                        requestTraffic(trafficEvent);
+                    }
                 }
+            } catch (Exception e) {
+                log.warn("Unable to handle traffic request due to {}", e.getMessage());
             }
         }
     }
 
-    private class SummaryMonitor extends TimerTask {
+    // Accummulates events to drive methodic update of the summary pane.
+    private class InternalEventAccummulator extends AbstractEventAccumulator {
+        protected InternalEventAccummulator() {
+            super(new Timer("topo-summary"), MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
+        }
+
         @Override
-        public void run() {
-            if (summaryEvent != null) {
-                requestSummary(summaryEvent);
+        public void processEvents(List<Event> events) {
+            try {
+                if (summaryEnabled) {
+                    sendMessage(summmaryMessage(0));
+                }
+            } catch (Exception e) {
+                log.warn("Unable to handle summary request due to {}", e.getMessage());
             }
+
         }
     }
-
 }
 
diff --git a/web/gui/src/main/webapp/topo.js b/web/gui/src/main/webapp/topo.js
index eb05dbf..e212f95 100644
--- a/web/gui/src/main/webapp/topo.js
+++ b/web/gui/src/main/webapp/topo.js
@@ -356,6 +356,9 @@
             hideInstances();
         } else if (summaryPane.isVisible()) {
             cancelSummary();
+            stopAntTimer();
+        } else {
+            hoverMode = hoverModeFlows;
         }
     }