Added detection of traffic flowing using StatisticService.

Change-Id: I2044ec16fd722d953d0e2b2c955e4da2b1dab663
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index 64a56ca..cbc4262 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -25,7 +25,7 @@
  */
 public interface FlowRuleProvider extends Provider {
 
-    static final int POLL_INTERVAL = 5;
+    static final int POLL_INTERVAL = 1;
 
     /**
      * Instructs the provider to apply the specified flow rules to their
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleStatisticStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleStatisticStore.java
new file mode 100644
index 0000000..0d4da5c
--- /dev/null
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleStatisticStore.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.store.trivial.impl;
+
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.flow.FlowEntry;
+import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.instructions.Instruction;
+import org.onlab.onos.net.flow.instructions.Instructions;
+import org.onlab.onos.net.statistic.StatisticStore;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+
+/**
+ * Maintains statistics using RPC calls to collect stats from remote instances
+ * on demand.
+ */
+@Component(immediate = true)
+@Service
+public class SimpleStatisticStore implements StatisticStore {
+
+    private final Logger log = getLogger(getClass());
+
+    private Map<ConnectPoint, InternalStatisticRepresentation>
+            representations = new ConcurrentHashMap<>();
+
+    private Map<ConnectPoint, Set<FlowEntry>> previous = new ConcurrentHashMap<>();
+    private Map<ConnectPoint, Set<FlowEntry>> current = new ConcurrentHashMap<>();
+
+    @Activate
+    public void activate() {
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public void prepareForStatistics(FlowRule rule) {
+        ConnectPoint cp = buildConnectPoint(rule);
+        if (cp == null) {
+            return;
+        }
+        InternalStatisticRepresentation rep;
+        synchronized (representations) {
+            rep = getOrCreateRepresentation(cp);
+        }
+        rep.prepare();
+    }
+
+    @Override
+    public synchronized void removeFromStatistics(FlowRule rule) {
+        ConnectPoint cp = buildConnectPoint(rule);
+        if (cp == null) {
+            return;
+        }
+        InternalStatisticRepresentation rep = representations.get(cp);
+        if (rep != null) {
+            rep.remove(rule);
+        }
+        Set<FlowEntry> values = current.get(cp);
+        if (values != null) {
+            values.remove(rule);
+        }
+        values = previous.get(cp);
+        if (values != null) {
+            values.remove(rule);
+        }
+
+    }
+
+    @Override
+    public void addOrUpdateStatistic(FlowEntry rule) {
+        ConnectPoint cp = buildConnectPoint(rule);
+        if (cp == null) {
+            return;
+        }
+        InternalStatisticRepresentation rep = representations.get(cp);
+        if (rep != null && rep.submit(rule)) {
+            updatePublishedStats(cp, rep.get());
+        }
+    }
+
+    private synchronized void updatePublishedStats(ConnectPoint cp,
+                                                   Set<FlowEntry> flowEntries) {
+        Set<FlowEntry> curr = current.get(cp);
+        if (curr == null) {
+            curr = new HashSet<>();
+        }
+        previous.put(cp, curr);
+        current.put(cp, flowEntries);
+
+    }
+
+    @Override
+    public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
+        return getCurrentStatisticInternal(connectPoint);
+    }
+
+    private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
+        return current.get(connectPoint);
+    }
+
+    @Override
+    public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
+        return getPreviousStatisticInternal(connectPoint);
+    }
+
+    private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
+        return previous.get(connectPoint);
+    }
+
+    private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
+
+        if (representations.containsKey(cp)) {
+            return representations.get(cp);
+        } else {
+            InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
+            representations.put(cp, rep);
+            return rep;
+        }
+
+    }
+
+    private ConnectPoint buildConnectPoint(FlowRule rule) {
+        PortNumber port = getOutput(rule);
+        if (port == null) {
+            log.warn("Rule {} has no output.", rule);
+            return null;
+        }
+        ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
+        return cp;
+    }
+
+    private PortNumber getOutput(FlowRule rule) {
+        for (Instruction i : rule.treatment().instructions()) {
+            if (i.type() == Instruction.Type.OUTPUT) {
+                Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
+                return out.port();
+            }
+            if (i.type() == Instruction.Type.DROP) {
+                return PortNumber.P0;
+            }
+        }
+        return null;
+    }
+
+    private class InternalStatisticRepresentation {
+
+        private final AtomicInteger counter = new AtomicInteger(0);
+        private final Set<FlowEntry> rules = new HashSet<>();
+
+        public void prepare() {
+            counter.incrementAndGet();
+        }
+
+        public synchronized void remove(FlowRule rule) {
+            rules.remove(rule);
+            counter.decrementAndGet();
+        }
+
+        public synchronized boolean submit(FlowEntry rule) {
+            if (rules.contains(rule)) {
+                rules.remove(rule);
+            }
+            rules.add(rule);
+            if (counter.get() == 0) {
+                return true;
+            } else {
+                return counter.decrementAndGet() == 0;
+            }
+        }
+
+        public synchronized Set<FlowEntry> get() {
+            counter.set(rules.size());
+            return Sets.newHashSet(rules);
+        }
+
+    }
+
+}
diff --git a/web/gui/src/main/java/org/onlab/onos/gui/GuiWebSocketServlet.java b/web/gui/src/main/java/org/onlab/onos/gui/GuiWebSocketServlet.java
index aa00a863..37120d9 100644
--- a/web/gui/src/main/java/org/onlab/onos/gui/GuiWebSocketServlet.java
+++ b/web/gui/src/main/java/org/onlab/onos/gui/GuiWebSocketServlet.java
@@ -37,7 +37,7 @@
 
     private ServiceDirectory directory = new DefaultServiceDirectory();
 
-    private final Set<TopologyWebSocket> sockets = new HashSet<>();
+    private final Set<TopologyViewWebSocket> sockets = new HashSet<>();
     private final Timer timer = new Timer();
     private final TimerTask pruner = new Pruner();
 
@@ -49,7 +49,7 @@
 
     @Override
     public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
-        TopologyWebSocket socket = new TopologyWebSocket(directory);
+        TopologyViewWebSocket socket = new TopologyViewWebSocket(directory);
         synchronized (sockets) {
             sockets.add(socket);
         }
@@ -61,9 +61,9 @@
         @Override
         public void run() {
             synchronized (sockets) {
-                Iterator<TopologyWebSocket> it = sockets.iterator();
+                Iterator<TopologyViewWebSocket> it = sockets.iterator();
                 while (it.hasNext()) {
-                    TopologyWebSocket socket = it.next();
+                    TopologyViewWebSocket socket = it.next();
                     if (socket.isIdle()) {
                         it.remove();
                         socket.close();
diff --git a/web/gui/src/main/java/org/onlab/onos/gui/TopologyMessages.java b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewMessages.java
similarity index 96%
rename from web/gui/src/main/java/org/onlab/onos/gui/TopologyMessages.java
rename to web/gui/src/main/java/org/onlab/onos/gui/TopologyViewMessages.java
index cad7228..dcc2355 100644
--- a/web/gui/src/main/java/org/onlab/onos/gui/TopologyMessages.java
+++ b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewMessages.java
@@ -49,6 +49,8 @@
 import org.onlab.onos.net.link.LinkEvent;
 import org.onlab.onos.net.link.LinkService;
 import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.net.statistic.Load;
+import org.onlab.onos.net.statistic.StatisticService;
 import org.onlab.osgi.ServiceDirectory;
 import org.onlab.packet.IpAddress;
 import org.slf4j.Logger;
@@ -75,9 +77,9 @@
 /**
  * Facility for creating messages bound for the topology viewer.
  */
-public abstract class TopologyMessages {
+public abstract class TopologyViewMessages {
 
-    protected static final Logger log = LoggerFactory.getLogger(TopologyMessages.class);
+    protected static final Logger log = LoggerFactory.getLogger(TopologyViewMessages.class);
 
     private static final ProviderId PID = new ProviderId("core", "org.onlab.onos.core", true);
     private static final String COMPACT = "%s/%s-%s/%s";
@@ -89,7 +91,7 @@
     protected final HostService hostService;
     protected final MastershipService mastershipService;
     protected final IntentService intentService;
-//    protected final StatisticService statService;
+    protected final StatisticService statService;
 
     protected final ObjectMapper mapper = new ObjectMapper();
 
@@ -101,7 +103,7 @@
      *
      * @param directory service directory
      */
-    protected TopologyMessages(ServiceDirectory directory) {
+    protected TopologyViewMessages(ServiceDirectory directory) {
         this.directory = checkNotNull(directory, "Directory cannot be null");
         clusterService = directory.get(ClusterService.class);
         deviceService = directory.get(DeviceService.class);
@@ -109,7 +111,7 @@
         hostService = directory.get(HostService.class);
         mastershipService = directory.get(MastershipService.class);
         intentService = directory.get(IntentService.class);
-//        statService = directory.get(StatisticService.class);
+        statService = directory.get(StatisticService.class);
     }
 
     // Retrieves the payload from the specified event.
@@ -408,14 +410,15 @@
 
         if (links != null) {
             ArrayNode labels = mapper.createArrayNode();
-            boolean hasTraffic = true; // FIXME
+            boolean hasTraffic = false;
             for (Link link : links) {
                 linksNode.add(compactLinkString(link));
-//                Load load = statService.load(link);
+                Load load = statService.load(link);
                 String label = "";
-//                if (load.rate() > 0) {
-//                    label = load.toString();
-//                }
+                if (load.rate() > 0) {
+                    hasTraffic = true;
+                    label = load.toString();
+                }
                 labels.add(label);
             }
             pathNode.put("class", hasTraffic ? type + " animated" : type);
diff --git a/web/gui/src/main/java/org/onlab/onos/gui/TopologyWebSocket.java b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
similarity index 98%
rename from web/gui/src/main/java/org/onlab/onos/gui/TopologyWebSocket.java
rename to web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
index af28086..cc5756c 100644
--- a/web/gui/src/main/java/org/onlab/onos/gui/TopologyWebSocket.java
+++ b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
@@ -66,8 +66,8 @@
 /**
  * Web socket capable of interacting with the GUI topology view.
  */
-public class TopologyWebSocket
-        extends TopologyMessages
+public class TopologyViewWebSocket
+        extends TopologyViewMessages
         implements WebSocket.OnTextMessage, WebSocket.OnControl {
 
     private static final long MAX_AGE_MS = 15000;
@@ -78,7 +78,7 @@
 
     private static final String APP_ID = "org.onlab.onos.gui";
 
-    private static final long TRAFFIC_FREQUENCY_SEC = 5000;
+    private static final long TRAFFIC_FREQUENCY_SEC = 1000;
 
     private final ApplicationId appId;
 
@@ -104,7 +104,7 @@
      *
      * @param directory service directory
      */
-    public TopologyWebSocket(ServiceDirectory directory) {
+    public TopologyViewWebSocket(ServiceDirectory directory) {
         super(directory);
         appId = directory.get(CoreService.class).registerApplication(APP_ID);
     }