Add a message handler for Openstack Telemetry view

Change-Id: I2803ac6e8f3c90e005bc73c43a5b867934daa80f
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
index 6033483..2f67050 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
@@ -32,6 +32,8 @@
  * Implementation class of FlowInfo.
  */
 public final class DefaultFlowInfo implements FlowInfo {
+    private static final String INGRESS_STATS = "Ingress Port :";
+    private static final String EGRESS_STATS = "Egress Port :";
 
     private final byte flowType;
     private final DeviceId deviceId;
@@ -185,6 +187,20 @@
     }
 
     @Override
+    public String uniqueFlowInfoKey() {
+        if (srcIp.address().isZero() || dstIp.address().isZero()) {
+            if (!srcIp.address().isZero()) {
+                return INGRESS_STATS + srcIp.toString();
+            }
+            if (!dstIp.address().isZero()) {
+                return EGRESS_STATS + dstIp.toString();
+            }
+        }
+        return srcIp.toString() + ":" + srcPort.toString() + " -> " +
+                dstIp.toString() + ":" + dstPort.toString();
+    }
+
+    @Override
     public String toString() {
         return toStringHelper(this)
                 .add("flowType", flowType)
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index 3a92bdf..79cb4c7 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
+import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -46,7 +47,6 @@
 import org.onosproject.net.flow.FlowRuleOperations;
 import org.onosproject.net.flow.FlowRuleOperationsContext;
 import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.flow.IndexTableId;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.IPCriterion;
@@ -69,8 +69,11 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.Dictionary;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -93,6 +96,7 @@
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
 import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
+import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DATA_POINT_SIZE;
 import static org.onosproject.openstacktelemetry.api.Constants.FLAT;
 import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
 import static org.onosproject.openstacktelemetry.api.Constants.VLAN;
@@ -123,11 +127,10 @@
     private static final boolean DEFAULT_EGRESS_STATS = false;
     private static final boolean DEFAULT_PORT_STATS = true;
 
-    private static final String MAC_NOT_NULL = "MAC should not be null";
-
     private static final String ARBITRARY_IP = "0.0.0.0/32";
     private static final int ARBITRARY_LENGTH = 32;
     private static final String ARBITRARY_MAC = "00:00:00:00:00:00";
+    private static final MacAddress NO_HOST_MAC = MacAddress.valueOf(ARBITRARY_MAC);
     private static final int ARBITRARY_IN_INTF = 0;
     private static final int ARBITRARY_OUT_INTF = 0;
 
@@ -182,6 +185,7 @@
     private ScheduledFuture result;
 
     private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
+    private final Map<String, Queue<FlowInfo>> flowInfoMap = Maps.newConcurrentMap();
 
     private static final int SOURCE_ID = 1;
     private static final int TARGET_ID = 2;
@@ -189,8 +193,6 @@
     private static final int METRIC_PRIORITY_SOURCE  = SOURCE_ID * PRIORITY_BASE;
     private static final int METRIC_PRIORITY_TARGET  = TARGET_ID * PRIORITY_BASE;
 
-    private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
-
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
@@ -264,13 +266,6 @@
             IPProtocolCriterion ipProtocol =
                     (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
 
-            log.debug("[FlowInfo]  TableID:{}  SRC_IP:{}  DST_IP:{}  Pkt:{}  Byte:{}",
-                    ((IndexTableId) entry.table()).id(),
-                    srcIp.ip().toString(),
-                    dstIp.ip().toString(),
-                    entry.packets(),
-                    entry.bytes());
-
             fBuilder.withFlowType(FLOW_TYPE_SONA)
                     .withSrcIp(srcIp.ip())
                     .withDstIp(dstIp.ip());
@@ -283,25 +278,13 @@
                             (TcpPortCriterion) selector.getCriterion(TCP_SRC);
                     TcpPortCriterion tcpDst =
                             (TcpPortCriterion) selector.getCriterion(TCP_DST);
-
-                    log.debug("TCP SRC Port: {}, DST Port: {}",
-                            tcpSrc.tcpPort().toInt(),
-                            tcpDst.tcpPort().toInt());
-
                     fBuilder.withSrcPort(tcpSrc.tcpPort());
                     fBuilder.withDstPort(tcpDst.tcpPort());
-
                 } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
-
                     UdpPortCriterion udpSrc =
                             (UdpPortCriterion) selector.getCriterion(UDP_SRC);
                     UdpPortCriterion udpDst =
                             (UdpPortCriterion) selector.getCriterion(UDP_DST);
-
-                    log.debug("UDP SRC Port: {}, DST Port: {}",
-                                                    udpSrc.udpPort().toInt(),
-                                                    udpDst.udpPort().toInt());
-
                     fBuilder.withSrcPort(udpSrc.udpPort());
                     fBuilder.withDstPort(udpDst.udpPort());
                 } else {
@@ -387,7 +370,7 @@
                 .withSrcIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
                 .withDstIp(IpPrefix.valueOf(ARBITRARY_IP))
                 .withSrcMac(instPort.macAddress())
-                .withDstMac(MacAddress.valueOf(ARBITRARY_MAC))
+                .withDstMac(NO_HOST_MAC)
                 .withDeviceId(instPort.deviceId())
                 .withInputInterfaceId(ARBITRARY_IN_INTF)
                 .withOutputInterfaceId(ARBITRARY_OUT_INTF)
@@ -420,7 +403,7 @@
         fBuilder.withFlowType(FLOW_TYPE_SONA)
                 .withSrcIp(IpPrefix.valueOf(ARBITRARY_IP))
                 .withDstIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
-                .withSrcMac(MacAddress.valueOf(ARBITRARY_MAC))
+                .withSrcMac(NO_HOST_MAC)
                 .withDstMac(instPort.macAddress())
                 .withDeviceId(instPort.deviceId())
                 .withInputInterfaceId(ARBITRARY_IN_INTF)
@@ -460,7 +443,6 @@
                                StatsFlowRule statsFlowRule, int rulePriority,
                                boolean install) {
 
-        log.debug("Table Transition: {} -> {}", fromTable, toTable);
         int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
         int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
         int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
@@ -711,6 +693,26 @@
         return NO_HOST_MAC;
     }
 
+    private void enqFlowInfo(FlowInfo flowInfo) {
+        String key = flowInfo.uniqueFlowInfoKey();
+        Queue<FlowInfo> queue = flowInfoMap.get(key);
+        if (queue == null) {
+            Queue<FlowInfo> newQueue = new LinkedList<FlowInfo>();
+            newQueue.offer(flowInfo);
+            flowInfoMap.put(key, newQueue);
+            return;
+        }
+        queue.offer(flowInfo);
+
+        while (queue.size() > DEFAULT_DATA_POINT_SIZE) {
+            queue.remove(); // Removes a garbage data in the queue.
+        }
+    }
+
+    public Map<String, Queue<FlowInfo>> getFlowInfoMap() {
+        return flowInfoMap;
+    }
+
     /**
      * Extracts properties from the component configuration context.
      *
@@ -775,6 +777,11 @@
             }
 
             telemetryService.publish(filteredFlowInfos);
+
+            // TODO: Refactor the following code to "TelemetryService" style.
+            filteredFlowInfos.forEach(flowInfo -> {
+                enqFlowInfo(flowInfo);
+            });
         }
 
         private boolean checkSrcDstLocalMaster(FlowInfo info) {