Added detection of traffic flowing using StatisticService.
Change-Id: I2044ec16fd722d953d0e2b2c955e4da2b1dab663
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index 64a56ca..cbc4262 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -25,7 +25,7 @@
*/
public interface FlowRuleProvider extends Provider {
- static final int POLL_INTERVAL = 5;
+ static final int POLL_INTERVAL = 1;
/**
* Instructs the provider to apply the specified flow rules to their
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleStatisticStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleStatisticStore.java
new file mode 100644
index 0000000..0d4da5c
--- /dev/null
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleStatisticStore.java
@@ -0,0 +1,210 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.store.trivial.impl;
+
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.flow.FlowEntry;
+import org.onlab.onos.net.flow.FlowRule;
+import org.onlab.onos.net.flow.instructions.Instruction;
+import org.onlab.onos.net.flow.instructions.Instructions;
+import org.onlab.onos.net.statistic.StatisticStore;
+import org.slf4j.Logger;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+
+/**
+ * Maintains statistics using RPC calls to collect stats from remote instances
+ * on demand.
+ */
+@Component(immediate = true)
+@Service
+public class SimpleStatisticStore implements StatisticStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private Map<ConnectPoint, InternalStatisticRepresentation>
+ representations = new ConcurrentHashMap<>();
+
+ private Map<ConnectPoint, Set<FlowEntry>> previous = new ConcurrentHashMap<>();
+ private Map<ConnectPoint, Set<FlowEntry>> current = new ConcurrentHashMap<>();
+
+ @Activate
+ public void activate() {
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public void prepareForStatistics(FlowRule rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+ InternalStatisticRepresentation rep;
+ synchronized (representations) {
+ rep = getOrCreateRepresentation(cp);
+ }
+ rep.prepare();
+ }
+
+ @Override
+ public synchronized void removeFromStatistics(FlowRule rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+ InternalStatisticRepresentation rep = representations.get(cp);
+ if (rep != null) {
+ rep.remove(rule);
+ }
+ Set<FlowEntry> values = current.get(cp);
+ if (values != null) {
+ values.remove(rule);
+ }
+ values = previous.get(cp);
+ if (values != null) {
+ values.remove(rule);
+ }
+
+ }
+
+ @Override
+ public void addOrUpdateStatistic(FlowEntry rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+ InternalStatisticRepresentation rep = representations.get(cp);
+ if (rep != null && rep.submit(rule)) {
+ updatePublishedStats(cp, rep.get());
+ }
+ }
+
+ private synchronized void updatePublishedStats(ConnectPoint cp,
+ Set<FlowEntry> flowEntries) {
+ Set<FlowEntry> curr = current.get(cp);
+ if (curr == null) {
+ curr = new HashSet<>();
+ }
+ previous.put(cp, curr);
+ current.put(cp, flowEntries);
+
+ }
+
+ @Override
+ public Set<FlowEntry> getCurrentStatistic(ConnectPoint connectPoint) {
+ return getCurrentStatisticInternal(connectPoint);
+ }
+
+ private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
+ return current.get(connectPoint);
+ }
+
+ @Override
+ public Set<FlowEntry> getPreviousStatistic(ConnectPoint connectPoint) {
+ return getPreviousStatisticInternal(connectPoint);
+ }
+
+ private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
+ return previous.get(connectPoint);
+ }
+
+ private InternalStatisticRepresentation getOrCreateRepresentation(ConnectPoint cp) {
+
+ if (representations.containsKey(cp)) {
+ return representations.get(cp);
+ } else {
+ InternalStatisticRepresentation rep = new InternalStatisticRepresentation();
+ representations.put(cp, rep);
+ return rep;
+ }
+
+ }
+
+ private ConnectPoint buildConnectPoint(FlowRule rule) {
+ PortNumber port = getOutput(rule);
+ if (port == null) {
+ log.warn("Rule {} has no output.", rule);
+ return null;
+ }
+ ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
+ return cp;
+ }
+
+ private PortNumber getOutput(FlowRule rule) {
+ for (Instruction i : rule.treatment().instructions()) {
+ if (i.type() == Instruction.Type.OUTPUT) {
+ Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
+ return out.port();
+ }
+ if (i.type() == Instruction.Type.DROP) {
+ return PortNumber.P0;
+ }
+ }
+ return null;
+ }
+
+ private class InternalStatisticRepresentation {
+
+ private final AtomicInteger counter = new AtomicInteger(0);
+ private final Set<FlowEntry> rules = new HashSet<>();
+
+ public void prepare() {
+ counter.incrementAndGet();
+ }
+
+ public synchronized void remove(FlowRule rule) {
+ rules.remove(rule);
+ counter.decrementAndGet();
+ }
+
+ public synchronized boolean submit(FlowEntry rule) {
+ if (rules.contains(rule)) {
+ rules.remove(rule);
+ }
+ rules.add(rule);
+ if (counter.get() == 0) {
+ return true;
+ } else {
+ return counter.decrementAndGet() == 0;
+ }
+ }
+
+ public synchronized Set<FlowEntry> get() {
+ counter.set(rules.size());
+ return Sets.newHashSet(rules);
+ }
+
+ }
+
+}
diff --git a/web/gui/src/main/java/org/onlab/onos/gui/GuiWebSocketServlet.java b/web/gui/src/main/java/org/onlab/onos/gui/GuiWebSocketServlet.java
index aa00a863..37120d9 100644
--- a/web/gui/src/main/java/org/onlab/onos/gui/GuiWebSocketServlet.java
+++ b/web/gui/src/main/java/org/onlab/onos/gui/GuiWebSocketServlet.java
@@ -37,7 +37,7 @@
private ServiceDirectory directory = new DefaultServiceDirectory();
- private final Set<TopologyWebSocket> sockets = new HashSet<>();
+ private final Set<TopologyViewWebSocket> sockets = new HashSet<>();
private final Timer timer = new Timer();
private final TimerTask pruner = new Pruner();
@@ -49,7 +49,7 @@
@Override
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) {
- TopologyWebSocket socket = new TopologyWebSocket(directory);
+ TopologyViewWebSocket socket = new TopologyViewWebSocket(directory);
synchronized (sockets) {
sockets.add(socket);
}
@@ -61,9 +61,9 @@
@Override
public void run() {
synchronized (sockets) {
- Iterator<TopologyWebSocket> it = sockets.iterator();
+ Iterator<TopologyViewWebSocket> it = sockets.iterator();
while (it.hasNext()) {
- TopologyWebSocket socket = it.next();
+ TopologyViewWebSocket socket = it.next();
if (socket.isIdle()) {
it.remove();
socket.close();
diff --git a/web/gui/src/main/java/org/onlab/onos/gui/TopologyMessages.java b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewMessages.java
similarity index 96%
rename from web/gui/src/main/java/org/onlab/onos/gui/TopologyMessages.java
rename to web/gui/src/main/java/org/onlab/onos/gui/TopologyViewMessages.java
index cad7228..dcc2355 100644
--- a/web/gui/src/main/java/org/onlab/onos/gui/TopologyMessages.java
+++ b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewMessages.java
@@ -49,6 +49,8 @@
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkService;
import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.net.statistic.Load;
+import org.onlab.onos.net.statistic.StatisticService;
import org.onlab.osgi.ServiceDirectory;
import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
@@ -75,9 +77,9 @@
/**
* Facility for creating messages bound for the topology viewer.
*/
-public abstract class TopologyMessages {
+public abstract class TopologyViewMessages {
- protected static final Logger log = LoggerFactory.getLogger(TopologyMessages.class);
+ protected static final Logger log = LoggerFactory.getLogger(TopologyViewMessages.class);
private static final ProviderId PID = new ProviderId("core", "org.onlab.onos.core", true);
private static final String COMPACT = "%s/%s-%s/%s";
@@ -89,7 +91,7 @@
protected final HostService hostService;
protected final MastershipService mastershipService;
protected final IntentService intentService;
-// protected final StatisticService statService;
+ protected final StatisticService statService;
protected final ObjectMapper mapper = new ObjectMapper();
@@ -101,7 +103,7 @@
*
* @param directory service directory
*/
- protected TopologyMessages(ServiceDirectory directory) {
+ protected TopologyViewMessages(ServiceDirectory directory) {
this.directory = checkNotNull(directory, "Directory cannot be null");
clusterService = directory.get(ClusterService.class);
deviceService = directory.get(DeviceService.class);
@@ -109,7 +111,7 @@
hostService = directory.get(HostService.class);
mastershipService = directory.get(MastershipService.class);
intentService = directory.get(IntentService.class);
-// statService = directory.get(StatisticService.class);
+ statService = directory.get(StatisticService.class);
}
// Retrieves the payload from the specified event.
@@ -408,14 +410,15 @@
if (links != null) {
ArrayNode labels = mapper.createArrayNode();
- boolean hasTraffic = true; // FIXME
+ boolean hasTraffic = false;
for (Link link : links) {
linksNode.add(compactLinkString(link));
-// Load load = statService.load(link);
+ Load load = statService.load(link);
String label = "";
-// if (load.rate() > 0) {
-// label = load.toString();
-// }
+ if (load.rate() > 0) {
+ hasTraffic = true;
+ label = load.toString();
+ }
labels.add(label);
}
pathNode.put("class", hasTraffic ? type + " animated" : type);
diff --git a/web/gui/src/main/java/org/onlab/onos/gui/TopologyWebSocket.java b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
similarity index 98%
rename from web/gui/src/main/java/org/onlab/onos/gui/TopologyWebSocket.java
rename to web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
index af28086..cc5756c 100644
--- a/web/gui/src/main/java/org/onlab/onos/gui/TopologyWebSocket.java
+++ b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
@@ -66,8 +66,8 @@
/**
* Web socket capable of interacting with the GUI topology view.
*/
-public class TopologyWebSocket
- extends TopologyMessages
+public class TopologyViewWebSocket
+ extends TopologyViewMessages
implements WebSocket.OnTextMessage, WebSocket.OnControl {
private static final long MAX_AGE_MS = 15000;
@@ -78,7 +78,7 @@
private static final String APP_ID = "org.onlab.onos.gui";
- private static final long TRAFFIC_FREQUENCY_SEC = 5000;
+ private static final long TRAFFIC_FREQUENCY_SEC = 1000;
private final ApplicationId appId;
@@ -104,7 +104,7 @@
*
* @param directory service directory
*/
- public TopologyWebSocket(ServiceDirectory directory) {
+ public TopologyViewWebSocket(ServiceDirectory directory) {
super(directory);
appId = directory.get(CoreService.class).registerApplication(APP_ID);
}