Fixing various issues and re-tuning.

Change-Id: I8822fcf77cfa507788241c5bda98ef4741b284b4
diff --git a/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
index 57781de..d624d04 100644
--- a/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
+++ b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
@@ -24,6 +24,9 @@
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.core.ApplicationId;
 import org.onlab.onos.core.CoreService;
+import org.onlab.onos.event.AbstractEventAccumulator;
+import org.onlab.onos.event.Event;
+import org.onlab.onos.event.EventAccumulator;
 import org.onlab.onos.mastership.MastershipEvent;
 import org.onlab.onos.mastership.MastershipListener;
 import org.onlab.onos.net.ConnectPoint;
@@ -36,6 +39,8 @@
 import org.onlab.onos.net.device.DeviceListener;
 import org.onlab.onos.net.flow.DefaultTrafficSelector;
 import org.onlab.onos.net.flow.DefaultTrafficTreatment;
+import org.onlab.onos.net.flow.FlowRuleEvent;
+import org.onlab.onos.net.flow.FlowRuleListener;
 import org.onlab.onos.net.flow.TrafficSelector;
 import org.onlab.onos.net.flow.TrafficTreatment;
 import org.onlab.onos.net.host.HostEvent;
@@ -84,8 +89,7 @@
 
     private static final String APP_ID = "org.onlab.onos.gui";
 
-    private static final long SUMMARY_FREQUENCY_SEC = 3000;
-    private static final long TRAFFIC_FREQUENCY_SEC = 1500;
+    private static final long TRAFFIC_FREQUENCY_SEC = 2000;
 
     private static final Comparator<? super ControllerNode> NODE_COMPARATOR =
             new Comparator<ControllerNode>() {
@@ -95,6 +99,13 @@
                 }
             };
 
+
+    private final Timer timer = new Timer("topology-view");
+
+    private static final int MAX_EVENTS = 500;
+    private static final int MAX_BATCH_MS = 1000;
+    private static final int MAX_IDLE_MS = 500;
+
     private final ApplicationId appId;
 
     private Connection connection;
@@ -106,16 +117,14 @@
     private final LinkListener linkListener = new InternalLinkListener();
     private final HostListener hostListener = new InternalHostListener();
     private final IntentListener intentListener = new InternalIntentListener();
+    private final FlowRuleListener flowListener = new InternalFlowListener();
 
-    // Timers and objects being monitored
-    private final Timer timer = new Timer("topology-view");
+    private final EventAccumulator eventAccummulator = new InternalEventAccummulator();
 
+    private boolean summaryEnabled = true;
     private TimerTask trafficTask;
     private ObjectNode trafficEvent;
 
-    private TimerTask summaryTask;
-    private ObjectNode summaryEvent;
-
     private long lastActive = System.currentTimeMillis();
     private boolean listenersRemoved = false;
 
@@ -128,7 +137,6 @@
      */
     public TopologyViewWebSocket(ServiceDirectory directory) {
         super(directory);
-
         intentFilter = new TopologyViewIntentFilter(intentService, deviceService,
                                                     hostService, linkService);
         appId = directory.get(CoreService.class).registerApplication(APP_ID);
@@ -431,21 +439,13 @@
 
     // Subscribes for summary messages.
     private synchronized void requestSummary(ObjectNode event) {
-        if (summaryTask == null) {
-            summaryEvent = event;
-            summaryTask = new SummaryMonitor();
-            timer.schedule(summaryTask, SUMMARY_FREQUENCY_SEC, SUMMARY_FREQUENCY_SEC);
-        }
+        summaryEnabled = true;
         sendMessage(summmaryMessage(number(event, "sid")));
     }
 
     // Cancels sending summary messages.
     private synchronized void cancelSummary(ObjectNode event) {
-        if (summaryTask != null) {
-            summaryTask.cancel();
-            summaryTask = null;
-            summaryEvent = null;
-        }
+        summaryEnabled = false;
     }
 
 
@@ -457,6 +457,7 @@
         linkService.addListener(linkListener);
         hostService.addListener(hostListener);
         intentService.addListener(intentListener);
+        flowService.addListener(flowListener);
     }
 
     // Removes all internal listeners.
@@ -469,6 +470,7 @@
             linkService.removeListener(linkListener);
             hostService.removeListener(hostListener);
             intentService.removeListener(intentListener);
+            flowService.removeListener(flowListener);
         }
     }
 
@@ -495,6 +497,7 @@
         @Override
         public void event(DeviceEvent event) {
             sendMessage(deviceMessage(event));
+            eventAccummulator.add(event);
         }
     }
 
@@ -503,6 +506,7 @@
         @Override
         public void event(LinkEvent event) {
             sendMessage(linkMessage(event));
+            eventAccummulator.add(event);
         }
     }
 
@@ -511,6 +515,7 @@
         @Override
         public void event(HostEvent event) {
             sendMessage(hostMessage(event));
+            eventAccummulator.add(event);
         }
     }
 
@@ -521,33 +526,55 @@
             if (trafficEvent != null) {
                 requestTraffic(trafficEvent);
             }
+            eventAccummulator.add(event);
+        }
+    }
+
+    // Intent event listener.
+    private class InternalFlowListener implements FlowRuleListener {
+        @Override
+        public void event(FlowRuleEvent event) {
+            eventAccummulator.add(event);
         }
     }
 
     private class TrafficMonitor extends TimerTask {
         @Override
         public void run() {
-            if (trafficEvent != null) {
-                String type = string(trafficEvent, "event", "unknown");
-                if (type.equals("requestAllTraffic")) {
-                    requestAllTraffic(trafficEvent);
-                } else if (type.equals("requestDeviceLinkFlows")) {
-                    requestDeviceLinkFlows(trafficEvent);
-                } else {
-                    requestTraffic(trafficEvent);
+            try {
+                if (trafficEvent != null) {
+                    String type = string(trafficEvent, "event", "unknown");
+                    if (type.equals("requestAllTraffic")) {
+                        requestAllTraffic(trafficEvent);
+                    } else if (type.equals("requestDeviceLinkFlows")) {
+                        requestDeviceLinkFlows(trafficEvent);
+                    } else {
+                        requestTraffic(trafficEvent);
+                    }
                 }
+            } catch (Exception e) {
+                log.warn("Unable to handle traffic request due to {}", e.getMessage());
             }
         }
     }
 
-    private class SummaryMonitor extends TimerTask {
+    // Accummulates events to drive methodic update of the summary pane.
+    private class InternalEventAccummulator extends AbstractEventAccumulator {
+        protected InternalEventAccummulator() {
+            super(new Timer("topo-summary"), MAX_EVENTS, MAX_BATCH_MS, MAX_IDLE_MS);
+        }
+
         @Override
-        public void run() {
-            if (summaryEvent != null) {
-                requestSummary(summaryEvent);
+        public void processEvents(List<Event> events) {
+            try {
+                if (summaryEnabled) {
+                    sendMessage(summmaryMessage(0));
+                }
+            } catch (Exception e) {
+                log.warn("Unable to handle summary request due to {}", e.getMessage());
             }
+
         }
     }
-
 }