Added detection of traffic flowing using StatisticService.
Change-Id: I2044ec16fd722d953d0e2b2c955e4da2b1dab663
diff --git a/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
new file mode 100644
index 0000000..cc5756c
--- /dev/null
+++ b/web/gui/src/main/java/org/onlab/onos/gui/TopologyViewWebSocket.java
@@ -0,0 +1,514 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onlab.onos.gui;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.eclipse.jetty.websocket.WebSocket;
+import org.onlab.onos.cluster.ClusterEvent;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.core.ApplicationId;
+import org.onlab.onos.core.CoreService;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.Host;
+import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceListener;
+import org.onlab.onos.net.flow.DefaultTrafficSelector;
+import org.onlab.onos.net.flow.DefaultTrafficTreatment;
+import org.onlab.onos.net.host.HostEvent;
+import org.onlab.onos.net.host.HostListener;
+import org.onlab.onos.net.intent.HostToHostIntent;
+import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentEvent;
+import org.onlab.onos.net.intent.IntentListener;
+import org.onlab.onos.net.intent.MultiPointToSinglePointIntent;
+import org.onlab.onos.net.intent.OpticalConnectivityIntent;
+import org.onlab.onos.net.intent.PathIntent;
+import org.onlab.onos.net.intent.PointToPointIntent;
+import org.onlab.onos.net.link.LinkEvent;
+import org.onlab.onos.net.link.LinkListener;
+import org.onlab.osgi.ServiceDirectory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.onos.cluster.ClusterEvent.Type.INSTANCE_ADDED;
+import static org.onlab.onos.net.DeviceId.deviceId;
+import static org.onlab.onos.net.HostId.hostId;
+import static org.onlab.onos.net.device.DeviceEvent.Type.DEVICE_ADDED;
+import static org.onlab.onos.net.host.HostEvent.Type.HOST_ADDED;
+import static org.onlab.onos.net.intent.IntentState.INSTALLED;
+import static org.onlab.onos.net.link.LinkEvent.Type.LINK_ADDED;
+
+/**
+ * Web socket capable of interacting with the GUI topology view.
+ */
+public class TopologyViewWebSocket
+ extends TopologyViewMessages
+ implements WebSocket.OnTextMessage, WebSocket.OnControl {
+
+ private static final long MAX_AGE_MS = 15000;
+
+ private static final byte PING = 0x9;
+ private static final byte PONG = 0xA;
+ private static final byte[] PING_DATA = new byte[]{(byte) 0xde, (byte) 0xad};
+
+ private static final String APP_ID = "org.onlab.onos.gui";
+
+ private static final long TRAFFIC_FREQUENCY_SEC = 1000;
+
+ private final ApplicationId appId;
+
+ private Connection connection;
+ private FrameConnection control;
+
+ private final ClusterEventListener clusterListener = new InternalClusterListener();
+ private final DeviceListener deviceListener = new InternalDeviceListener();
+ private final LinkListener linkListener = new InternalLinkListener();
+ private final HostListener hostListener = new InternalHostListener();
+ private final IntentListener intentListener = new InternalIntentListener();
+
+ // Intents that are being monitored for the GUI
+ private ObjectNode monitorRequest;
+ private final Timer timer = new Timer("intent-traffic-monitor");
+ private final TimerTask timerTask = new IntentTrafficMonitor();
+
+ private long lastActive = System.currentTimeMillis();
+ private boolean listenersRemoved = false;
+
+ /**
+ * Creates a new web-socket for serving data to GUI topology view.
+ *
+ * @param directory service directory
+ */
+ public TopologyViewWebSocket(ServiceDirectory directory) {
+ super(directory);
+ appId = directory.get(CoreService.class).registerApplication(APP_ID);
+ }
+
+ /**
+ * Issues a close on the connection.
+ */
+ synchronized void close() {
+ removeListeners();
+ if (connection.isOpen()) {
+ connection.close();
+ }
+ }
+
+ /**
+ * Indicates if this connection is idle.
+ *
+ * @return true if idle or closed
+ */
+ synchronized boolean isIdle() {
+ boolean idle = (System.currentTimeMillis() - lastActive) > MAX_AGE_MS;
+ if (idle || !connection.isOpen()) {
+ return true;
+ }
+ try {
+ control.sendControl(PING, PING_DATA, 0, PING_DATA.length);
+ } catch (IOException e) {
+ log.warn("Unable to send ping message due to: ", e);
+ }
+ return false;
+ }
+
+ @Override
+ public void onOpen(Connection connection) {
+ log.info("GUI client connected");
+ this.connection = connection;
+ this.control = (FrameConnection) connection;
+ addListeners();
+ timer.schedule(timerTask, TRAFFIC_FREQUENCY_SEC, TRAFFIC_FREQUENCY_SEC);
+
+ sendAllInstances();
+ sendAllDevices();
+ sendAllLinks();
+ sendAllHosts();
+ }
+
+ @Override
+ public synchronized void onClose(int closeCode, String message) {
+ removeListeners();
+ timer.cancel();
+ log.info("GUI client disconnected");
+ }
+
+ @Override
+ public boolean onControl(byte controlCode, byte[] data, int offset, int length) {
+ lastActive = System.currentTimeMillis();
+ return true;
+ }
+
+ @Override
+ public void onMessage(String data) {
+ lastActive = System.currentTimeMillis();
+ try {
+ ObjectNode event = (ObjectNode) mapper.reader().readTree(data);
+ String type = string(event, "event", "unknown");
+ if (type.equals("requestDetails")) {
+ requestDetails(event);
+ } else if (type.equals("updateMeta")) {
+ updateMetaUi(event);
+ } else if (type.equals("addHostIntent")) {
+ createHostIntent(event);
+ } else if (type.equals("requestTraffic")) {
+ requestTraffic(event);
+ } else if (type.equals("cancelTraffic")) {
+ cancelTraffic(event);
+ }
+ } catch (Exception e) {
+ log.warn("Unable to parse GUI request {} due to {}", data, e);
+ }
+ }
+
+ // Sends the specified data to the client.
+ private synchronized void sendMessage(ObjectNode data) {
+ try {
+ if (connection.isOpen()) {
+ connection.sendMessage(data.toString());
+ }
+ } catch (IOException e) {
+ log.warn("Unable to send message {} to GUI due to {}", data, e);
+ }
+ }
+
+ // Sends all controller nodes to the client as node-added messages.
+ private void sendAllInstances() {
+ for (ControllerNode node : clusterService.getNodes()) {
+ sendMessage(instanceMessage(new ClusterEvent(INSTANCE_ADDED, node)));
+ }
+ }
+
+ // Sends all devices to the client as device-added messages.
+ private void sendAllDevices() {
+ for (Device device : deviceService.getDevices()) {
+ sendMessage(deviceMessage(new DeviceEvent(DEVICE_ADDED, device)));
+ }
+ }
+
+ // Sends all links to the client as link-added messages.
+ private void sendAllLinks() {
+ for (Link link : linkService.getLinks()) {
+ sendMessage(linkMessage(new LinkEvent(LINK_ADDED, link)));
+ }
+ }
+
+ // Sends all hosts to the client as host-added messages.
+ private void sendAllHosts() {
+ for (Host host : hostService.getHosts()) {
+ sendMessage(hostMessage(new HostEvent(HOST_ADDED, host)));
+ }
+ }
+
+ // Sends back device or host details.
+ private void requestDetails(ObjectNode event) {
+ ObjectNode payload = payload(event);
+ String type = string(payload, "class", "unknown");
+ long sid = number(event, "sid");
+
+ if (type.equals("device")) {
+ sendMessage(deviceDetails(deviceId(string(payload, "id")), sid));
+ } else if (type.equals("host")) {
+ sendMessage(hostDetails(hostId(string(payload, "id")), sid));
+ }
+ }
+
+ // Creates host-to-host intent.
+ private void createHostIntent(ObjectNode event) {
+ ObjectNode payload = payload(event);
+ long id = number(event, "sid");
+ // TODO: add protection against device ids and non-existent hosts.
+ HostId one = hostId(string(payload, "one"));
+ HostId two = hostId(string(payload, "two"));
+
+ HostToHostIntent hostIntent = new HostToHostIntent(appId, one, two,
+ DefaultTrafficSelector.builder().build(),
+ DefaultTrafficTreatment.builder().build());
+ monitorRequest = event;
+ intentService.submit(hostIntent);
+ }
+
+ // Sends traffic message.
+ private synchronized void requestTraffic(ObjectNode event) {
+ ObjectNode payload = payload(event);
+ long sid = number(event, "sid");
+ monitorRequest = event;
+
+ // Get the set of selected hosts and their intents.
+ Set<Host> hosts = getHosts((ArrayNode) payload.path("ids"));
+ Set<Intent> intents = findPathIntents(hosts);
+
+ // If there is a hover node, include it in the hosts and find intents.
+ String hover = string(payload, "hover");
+ Set<Intent> hoverIntents;
+ if (!isNullOrEmpty(hover)) {
+ addHost(hosts, hostId(hover));
+ hoverIntents = findPathIntents(hosts);
+ intents.removeAll(hoverIntents);
+
+ // Send an initial message to highlight all links of all monitored intents.
+ sendMessage(trafficMessage(sid,
+ new TrafficClass("primary", hoverIntents),
+ new TrafficClass("secondary", intents)));
+
+ } else {
+ // Send an initial message to highlight all links of all monitored intents.
+ sendMessage(trafficMessage(sid, new TrafficClass("primary", intents)));
+ }
+ }
+
+ // Cancels sending traffic messages.
+ private void cancelTraffic(ObjectNode event) {
+ sendMessage(trafficMessage(number(event, "sid")));
+ monitorRequest = null;
+ }
+
+ // Finds all path (host-to-host or point-to-point) intents that pertains
+ // to the given hosts.
+ private Set<Intent> findPathIntents(Set<Host> hosts) {
+ // Derive from this the set of edge connect points.
+ Set<ConnectPoint> edgePoints = getEdgePoints(hosts);
+
+ // Iterate over all intents and produce a set that contains only those
+ // intents that target all selected hosts or derived edge connect points.
+ return getIntents(hosts, edgePoints);
+ }
+
+ // Produces a set of intents that target all selected hosts or connect points.
+ private Set<Intent> getIntents(Set<Host> hosts, Set<ConnectPoint> edgePoints) {
+ Set<Intent> intents = new HashSet<>();
+ if (hosts.isEmpty()) {
+ return intents;
+ }
+
+ Set<OpticalConnectivityIntent> opticalIntents = new HashSet<>();
+
+ for (Intent intent : intentService.getIntents()) {
+ if (intentService.getIntentState(intent.id()) == INSTALLED) {
+ boolean isRelevant = false;
+ if (intent instanceof HostToHostIntent) {
+ isRelevant = isIntentRelevant((HostToHostIntent) intent, hosts);
+ } else if (intent instanceof PointToPointIntent) {
+ isRelevant = isIntentRelevant((PointToPointIntent) intent, edgePoints);
+ } else if (intent instanceof MultiPointToSinglePointIntent) {
+ isRelevant = isIntentRelevant((MultiPointToSinglePointIntent) intent, edgePoints);
+ } else if (intent instanceof OpticalConnectivityIntent) {
+ opticalIntents.add((OpticalConnectivityIntent) intent);
+ }
+ // TODO: add other intents, e.g. SinglePointToMultiPointIntent
+
+ if (isRelevant) {
+ intents.add(intent);
+ }
+ }
+ }
+
+ for (OpticalConnectivityIntent intent : opticalIntents) {
+ if (isIntentRelevant(intent, intents)) {
+ intents.add(intent);
+ }
+ }
+ return intents;
+ }
+
+ // Indicates whether the specified intent involves all of the given hosts.
+ private boolean isIntentRelevant(HostToHostIntent intent, Set<Host> hosts) {
+ for (Host host : hosts) {
+ HostId id = host.id();
+ // Bail if intent does not involve this host.
+ if (!id.equals(intent.one()) && !id.equals(intent.two())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Indicates whether the specified intent involves all of the given edge points.
+ private boolean isIntentRelevant(PointToPointIntent intent,
+ Set<ConnectPoint> edgePoints) {
+ for (ConnectPoint point : edgePoints) {
+ // Bail if intent does not involve this edge point.
+ if (!point.equals(intent.egressPoint()) &&
+ !point.equals(intent.ingressPoint())) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Indicates whether the specified intent involves all of the given edge points.
+ private boolean isIntentRelevant(MultiPointToSinglePointIntent intent,
+ Set<ConnectPoint> edgePoints) {
+ for (ConnectPoint point : edgePoints) {
+ // Bail if intent does not involve this edge point.
+ if (!point.equals(intent.egressPoint()) &&
+ !intent.ingressPoints().contains(point)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Indicates whether the specified intent involves all of the given edge points.
+ private boolean isIntentRelevant(OpticalConnectivityIntent opticalIntent,
+ Set<Intent> intents) {
+ Link ccSrc = getFirstLink(opticalIntent.getSrcConnectPoint(), false);
+ Link ccDst = getFirstLink(opticalIntent.getDst(), true);
+
+ for (Intent intent : intents) {
+ List<Intent> installables = intentService.getInstallableIntents(intent.id());
+ for (Intent installable : installables) {
+ if (installable instanceof PathIntent) {
+ List<Link> links = ((PathIntent) installable).path().links();
+ if (links.size() == 3) {
+ Link tunnel = links.get(1);
+ if (tunnel.src().equals(ccSrc.src()) &&
+ tunnel.dst().equals(ccDst.dst())) {
+ return true;
+ }
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private Link getFirstLink(ConnectPoint point, boolean ingress) {
+ for (Link link : linkService.getLinks(point)) {
+ if (point.equals(ingress ? link.src() : link.dst())) {
+ return link;
+ }
+ }
+ return null;
+ }
+
+ // Produces a set of all host ids listed in the specified JSON array.
+ private Set<Host> getHosts(ArrayNode array) {
+ Set<Host> hosts = new HashSet<>();
+ if (array != null) {
+ for (JsonNode node : array) {
+ try {
+ addHost(hosts, hostId(node.asText()));
+ } catch (IllegalArgumentException e) {
+ log.debug("Skipping ID {}", node.asText());
+ }
+ }
+ }
+ return hosts;
+ }
+
+ private void addHost(Set<Host> hosts, HostId hostId) {
+ Host host = hostService.getHost(hostId);
+ if (host != null) {
+ hosts.add(host);
+ }
+ }
+
+ // Produces a set of edge points from the specified set of hosts.
+ private Set<ConnectPoint> getEdgePoints(Set<Host> hosts) {
+ Set<ConnectPoint> edgePoints = new HashSet<>();
+ for (Host host : hosts) {
+ edgePoints.add(host.location());
+ }
+ return edgePoints;
+ }
+
+
+ // Adds all internal listeners.
+ private void addListeners() {
+ clusterService.addListener(clusterListener);
+ deviceService.addListener(deviceListener);
+ linkService.addListener(linkListener);
+ hostService.addListener(hostListener);
+ intentService.addListener(intentListener);
+ }
+
+ // Removes all internal listeners.
+ private synchronized void removeListeners() {
+ if (!listenersRemoved) {
+ listenersRemoved = true;
+ clusterService.removeListener(clusterListener);
+ deviceService.removeListener(deviceListener);
+ linkService.removeListener(linkListener);
+ hostService.removeListener(hostListener);
+ intentService.removeListener(intentListener);
+ }
+ }
+
+ // Cluster event listener.
+ private class InternalClusterListener implements ClusterEventListener {
+ @Override
+ public void event(ClusterEvent event) {
+ sendMessage(instanceMessage(event));
+ }
+ }
+
+ // Device event listener.
+ private class InternalDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ sendMessage(deviceMessage(event));
+ }
+ }
+
+ // Link event listener.
+ private class InternalLinkListener implements LinkListener {
+ @Override
+ public void event(LinkEvent event) {
+ sendMessage(linkMessage(event));
+ }
+ }
+
+ // Host event listener.
+ private class InternalHostListener implements HostListener {
+ @Override
+ public void event(HostEvent event) {
+ sendMessage(hostMessage(event));
+ }
+ }
+
+ // Intent event listener.
+ private class InternalIntentListener implements IntentListener {
+ @Override
+ public void event(IntentEvent event) {
+ if (monitorRequest != null) {
+ requestTraffic(monitorRequest);
+ }
+ }
+ }
+
+ private class IntentTrafficMonitor extends TimerTask {
+ @Override
+ public void run() {
+ if (monitorRequest != null) {
+ requestTraffic(monitorRequest);
+ }
+ }
+ }
+}
+