Support to publish port TX and RX stats in openstacktelemetry

Change-Id: I368fb676e4817cd01e5782a3b37170e2b9a5c6bd
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java
index 7fed73a..dcf682d 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java
@@ -44,7 +44,7 @@
      *
      * @return a set of flow infos
      */
-    Set<FlowInfo> getFlowInfo();
+    Set<FlowInfo> getFlowInfos();
 
     /**
      * Deletes stat flow rule.
diff --git a/apps/openstacktelemetry/app/BUCK b/apps/openstacktelemetry/app/BUCK
index 97bc0f9..4cc3d98 100644
--- a/apps/openstacktelemetry/app/BUCK
+++ b/apps/openstacktelemetry/app/BUCK
@@ -10,6 +10,7 @@
     '//lib:jersey-client',
     '//cli:onos-cli',
     '//lib:org.apache.karaf.shell.console',
+    '//apps/openstacknode/api:onos-apps-openstacknode-api',
     '//apps/openstacknetworking/api:onos-apps-openstacknetworking-api',
     '//apps/openstacktelemetry/api:onos-apps-openstacktelemetry-api',
     '//lib:kafka-clients',
diff --git a/apps/openstacktelemetry/app/pom.xml b/apps/openstacktelemetry/app/pom.xml
index 5b0f8bf..920958e 100644
--- a/apps/openstacktelemetry/app/pom.xml
+++ b/apps/openstacktelemetry/app/pom.xml
@@ -87,6 +87,12 @@
 
         <dependency>
             <groupId>org.onosproject</groupId>
+            <artifactId>onos-apps-openstacknode-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
             <artifactId>onlab-osgi</artifactId>
             <version>${project.version}</version>
         </dependency>
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
index 228bc2b..2ced23b 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
@@ -25,6 +25,9 @@
  */
 public class TinaMessageByteBufferCodec {
 
+    public static final String KAFKA_TOPIC = "sona.flow";
+    public static final String KAFKA_KEY = "flowdata";
+
     private static final int HEADER_SIZE = 8;
     private static final int ENTRY_SIZE = 88;
     private static final int MILLISECONDS = 1000;
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index d1de54a..56a0e81 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -124,7 +124,7 @@
     public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
 
         if (producer == null) {
-            log.warn("Kafka telemetry service has not been enabled!");
+            log.debug("Kafka telemetry service has not been enabled!");
             return null;
         }
 
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index da492a9..f3f386f 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -36,6 +36,9 @@
 import java.util.List;
 import java.util.Set;
 
+import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_KEY;
+import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_TOPIC;
+
 /**
  * Openstack telemetry manager.
  */
@@ -45,9 +48,6 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private static final String KAFKA_TOPIC = "sona.flow";
-    private static final String KAFKA_KEY = "flowdata";
-
     private List<TelemetryService> telemetryServices = Lists.newArrayList();
 
     @Activate
@@ -89,6 +89,8 @@
             if (service instanceof RestTelemetryManager) {
                 invokeRestPublisher((RestTelemetryService) service, flowInfos);
             }
+
+            log.trace("Publishing Flow Infos {}", flowInfos);
         });
     }
 
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index 9025224..a7ead8e 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -16,7 +16,6 @@
 package org.onosproject.openstacktelemetry.impl;
 
 import com.google.common.collect.Sets;
-import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -29,12 +28,17 @@
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.SharedScheduledExecutorService;
+import org.onlab.util.SharedScheduledExecutors;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Host;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.device.PortStatistics;
 import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -51,7 +55,11 @@
 import org.onosproject.net.flow.criteria.TcpPortCriterion;
 import org.onosproject.net.flow.criteria.UdpPortCriterion;
 import org.onosproject.net.host.HostService;
+import org.onosproject.openstacknetworking.api.InstancePort;
+import org.onosproject.openstacknetworking.api.InstancePortService;
 import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
+import org.onosproject.openstacknode.api.OpenstackNode;
+import org.onosproject.openstacknode.api.OpenstackNodeService;
 import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.StatsFlowRule;
@@ -62,10 +70,12 @@
 import org.slf4j.LoggerFactory;
 
 import java.util.Dictionary;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.onlab.packet.Ethernet.TYPE_IPV4;
 import static org.onlab.packet.IPv4.PROTOCOL_TCP;
@@ -83,6 +93,7 @@
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
 import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
+import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
 import static org.onosproject.openstacktelemetry.api.Constants.FLAT;
 import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
 import static org.onosproject.openstacktelemetry.api.Constants.VLAN;
@@ -101,16 +112,28 @@
     private static final byte FLOW_TYPE_SONA = 1; // VLAN
 
     private static final long MILLISECONDS = 1000L;
+    private static final long INITIAL_DELAY = 5L;
     private static final long REFRESH_INTERVAL = 5L;
+    private static final TimeUnit TIME_UNIT_SECOND = TimeUnit.SECONDS;
 
     private static final String REVERSE_PATH_STATS = "reversePathStats";
     private static final String EGRESS_STATS = "egressStats";
+    private static final String PORT_STATS = "portStats";
 
     private static final boolean DEFAULT_REVERSE_PATH_STATS = false;
     private static final boolean DEFAULT_EGRESS_STATS = false;
+    private static final boolean DEFAULT_PORT_STATS = true;
 
     private static final String MAC_NOT_NULL = "MAC should not be null";
 
+    private static final String ARBITRARY_IP = "0.0.0.0/32";
+    private static final int ARBITRARY_LENGTH = 32;
+    private static final String ARBITRARY_MAC = "00:00:00:00:00:00";
+    private static final int ARBITRARY_IN_INTF = 0;
+    private static final int ARBITRARY_OUT_INTF = 0;
+
+    private static final boolean RECOVER_FROM_FAILURE = true;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
 
@@ -121,6 +144,9 @@
     protected HostService hostService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService componentConfigService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -130,6 +156,12 @@
     protected OpenstackNetworkService osNetworkService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected InstancePortService instPortService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected OpenstackNodeService osNodeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackTelemetryService telemetryService;
 
     @Property(name = REVERSE_PATH_STATS, boolValue = DEFAULT_REVERSE_PATH_STATS,
@@ -142,12 +174,16 @@
                     "collecting the flow-based stats for egress port.")
     private boolean egressStats = DEFAULT_EGRESS_STATS;
 
+    @Property(name = PORT_STATS, boolValue = DEFAULT_PORT_STATS,
+            label = "A flag which indicates whether to collect port TX & RX stats.")
+    private boolean portStats = DEFAULT_PORT_STATS;
+
     private ApplicationId appId;
-    private Timer timer;
-    private TimerTask task;
+    private TelemetryCollector collector;
+    private SharedScheduledExecutorService executor;
+    private ScheduledFuture result;
 
     private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
-    private int loopCount = 0;
 
     private static final int SOURCE_ID = 1;
     private static final int TARGET_ID = 2;
@@ -157,15 +193,12 @@
 
     private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
 
-    public StatsFlowRuleManager() {
-        this.timer = new Timer("openstack-telemetry-sender");
-    }
-
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
 
         componentConfigService.registerProperties(getClass());
+        executor = SharedScheduledExecutors.getSingleThreadExecutor();
 
         this.start();
 
@@ -177,6 +210,8 @@
 
         componentConfigService.unregisterProperties(getClass(), false);
 
+        flowRuleService.removeFlowRulesById(appId);
+
         log.info("Stopped");
     }
 
@@ -190,16 +225,17 @@
     @Override
     public void start() {
         log.info("Start publishing thread");
-        task = new InternalTimerTask();
-        timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
-                MILLISECONDS * REFRESH_INTERVAL);
+        collector = new TelemetryCollector();
+
+        result = executor.scheduleAtFixedRate(collector, INITIAL_DELAY,
+                        REFRESH_INTERVAL, TIME_UNIT_SECOND, RECOVER_FROM_FAILURE);
     }
 
     @Override
     public void stop() {
         log.info("Stop data publishing thread");
-        task.cancel();
-        task = null;
+        result.cancel(true);
+        collector = null;
     }
 
     @Override
@@ -210,12 +246,221 @@
 
     @Override
     public void deleteStatFlowRule(StatsFlowRule statsFlowRule) {
-        // FIXME: following code might not be necessary
-        flowRuleService.removeFlowRulesById(appId);
 
         setStatFlowRule(statsFlowRule, false);
     }
 
+    /**
+     * Gets a set of the flow infos.
+     *
+     * @return a set of flow infos
+     */
+    @Override
+    public Set<FlowInfo> getFlowInfos() {
+        Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
+
+        // obtain all flow rule entries installed by telemetry app
+        for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
+            FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
+            TrafficSelector selector = entry.selector();
+
+            IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
+            IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
+            IPProtocolCriterion ipProtocol =
+                    (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
+
+            log.debug("[FlowInfo]  TableID:{}  SRC_IP:{}  DST_IP:{}  Pkt:{}  Byte:{}",
+                    ((IndexTableId) entry.table()).id(),
+                    srcIp.ip().toString(),
+                    dstIp.ip().toString(),
+                    entry.packets(),
+                    entry.bytes());
+
+            fBuilder.withFlowType(FLOW_TYPE_SONA)
+                    .withSrcIp(srcIp.ip())
+                    .withDstIp(dstIp.ip());
+
+            if (ipProtocol != null) {
+                fBuilder.withProtocol((byte) ipProtocol.protocol());
+
+                if (ipProtocol.protocol() == PROTOCOL_TCP) {
+                    TcpPortCriterion tcpSrc =
+                            (TcpPortCriterion) selector.getCriterion(TCP_SRC);
+                    TcpPortCriterion tcpDst =
+                            (TcpPortCriterion) selector.getCriterion(TCP_DST);
+
+                    log.debug("TCP SRC Port: {}, DST Port: {}",
+                            tcpSrc.tcpPort().toInt(),
+                            tcpDst.tcpPort().toInt());
+
+                    fBuilder.withSrcPort(tcpSrc.tcpPort());
+                    fBuilder.withDstPort(tcpDst.tcpPort());
+
+                } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
+
+                    UdpPortCriterion udpSrc =
+                            (UdpPortCriterion) selector.getCriterion(UDP_SRC);
+                    UdpPortCriterion udpDst =
+                            (UdpPortCriterion) selector.getCriterion(UDP_DST);
+
+                    log.debug("UDP SRC Port: {}, DST Port: {}",
+                                                    udpSrc.udpPort().toInt(),
+                                                    udpDst.udpPort().toInt());
+
+                    fBuilder.withSrcPort(udpSrc.udpPort());
+                    fBuilder.withDstPort(udpDst.udpPort());
+                } else {
+                    log.debug("Other protocol: {}", ipProtocol.protocol());
+                }
+            }
+
+            fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
+                    .withDstMac(getMacAddress(dstIp.ip().address()))
+                    .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
+                    .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
+                    .withVlanId(getVlanId(srcIp.ip().address()))
+                    .withDeviceId(entry.deviceId());
+
+            StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
+
+            // TODO: need to collect error and drop packets stats
+            // TODO: need to make the refresh interval configurable
+            sBuilder.withStartupTime(System.currentTimeMillis())
+                    .withFstPktArrTime(System.currentTimeMillis())
+                    .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
+                    .withCurrAccPkts((int) entry.packets())
+                    .withCurrAccBytes(entry.bytes())
+                    .withErrorPkts((short) 0)
+                    .withDropPkts((short) 0);
+
+            fBuilder.withStatsInfo(sBuilder.build());
+
+            FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
+
+            flowInfos.add(flowInfo);
+
+            log.debug("FlowInfo: \n{}", flowInfo.toString());
+        }
+
+        return flowInfos;
+    }
+
+    /**
+     * Gets a set of flow infos by referring to destination VM port.
+     *
+     * @return flow infos
+     */
+    private Set<FlowInfo> getDstPortBasedFlowInfos() {
+        Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
+        Set<PortNumber> instPortNums = instPortService.instancePorts()
+                                                .stream()
+                                                .map(InstancePort::portNumber)
+                                                .collect(Collectors.toSet());
+        Set<DeviceId> deviceIds = osNodeService.completeNodes(COMPUTE)
+                                                .stream()
+                                                .map(OpenstackNode::intgBridge)
+                                                .collect(Collectors.toSet());
+
+        deviceIds.forEach(d -> {
+            List<PortStatistics> stats =
+                                deviceService.getPortStatistics(d)
+                                .stream()
+                                .filter(s -> instPortNums.contains(s.portNumber()))
+                                .collect(Collectors.toList());
+
+            stats.forEach(s -> {
+                InstancePort instPort = getInstancePort(d, s.portNumber());
+                flowInfos.add(buildTxPortInfo(instPort, s));
+                flowInfos.add(buildRxPortInfo(instPort, s));
+            });
+        });
+
+        return flowInfos;
+    }
+
+    /**
+     * Obtains the flow info generated by TX port.
+     *
+     * @param instPort instance port
+     * @param stat port statistics
+     * @return flow info
+     */
+    private FlowInfo buildTxPortInfo(InstancePort instPort, PortStatistics stat) {
+        FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
+
+        fBuilder.withFlowType(FLOW_TYPE_SONA)
+                .withSrcIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
+                .withDstIp(IpPrefix.valueOf(ARBITRARY_IP))
+                .withSrcMac(instPort.macAddress())
+                .withDstMac(MacAddress.valueOf(ARBITRARY_MAC))
+                .withDeviceId(instPort.deviceId())
+                .withInputInterfaceId(ARBITRARY_IN_INTF)
+                .withOutputInterfaceId(ARBITRARY_OUT_INTF)
+                .withVlanId(VlanId.vlanId());
+
+        StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
+        sBuilder.withStartupTime(System.currentTimeMillis())
+                .withFstPktArrTime(System.currentTimeMillis())
+                .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
+                .withCurrAccPkts((int) stat.packetsSent())
+                .withCurrAccBytes(stat.bytesSent())
+                .withErrorPkts((short) stat.packetsTxErrors())
+                .withDropPkts((short) stat.packetsTxDropped());
+
+        fBuilder.withStatsInfo(sBuilder.build());
+
+        return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
+    }
+
+    /**
+     * Obtains the flow info generated by RX port.
+     *
+     * @param instPort instance port
+     * @param stat port statistics
+     * @return flow info
+     */
+    private FlowInfo buildRxPortInfo(InstancePort instPort, PortStatistics stat) {
+        FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
+
+        fBuilder.withFlowType(FLOW_TYPE_SONA)
+                .withSrcIp(IpPrefix.valueOf(ARBITRARY_IP))
+                .withDstIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
+                .withSrcMac(MacAddress.valueOf(ARBITRARY_MAC))
+                .withDstMac(instPort.macAddress())
+                .withDeviceId(instPort.deviceId())
+                .withInputInterfaceId(ARBITRARY_IN_INTF)
+                .withOutputInterfaceId(ARBITRARY_OUT_INTF)
+                .withVlanId(VlanId.vlanId());
+
+        StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
+        sBuilder.withStartupTime(System.currentTimeMillis())
+                .withFstPktArrTime(System.currentTimeMillis())
+                .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
+                .withCurrAccPkts((int) stat.packetsReceived())
+                .withCurrAccBytes(stat.bytesReceived())
+                .withErrorPkts((short) stat.packetsRxErrors())
+                .withDropPkts((short) stat.packetsRxDropped());
+
+        fBuilder.withStatsInfo(sBuilder.build());
+
+        return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
+    }
+
+    /**
+     * Obtains instance port which associated with the given device identifier
+     * and port number.
+     *
+     * @param deviceId      device identifier
+     * @param portNumber    port number
+     * @return instance port
+     */
+    private InstancePort getInstancePort(DeviceId deviceId, PortNumber portNumber) {
+        return instPortService.instancePorts().stream()
+                                .filter(p -> p.deviceId().equals(deviceId))
+                                .filter(p -> p.portNumber().equals(portNumber))
+                                .findFirst().orElse(null);
+    }
+
     private void connectTables(DeviceId deviceId, int fromTable, int toTable,
                                StatsFlowRule statsFlowRule, int rulePriority,
                                boolean install) {
@@ -278,111 +523,19 @@
         flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
             @Override
             public void onSuccess(FlowRuleOperations ops) {
-                log.debug("Provisioned vni or forwarding table: \n {}", ops.toString());
+                log.debug("Install rules for telemetry stats: \n {}",
+                                                                ops.toString());
             }
 
             @Override
             public void onError(FlowRuleOperations ops) {
-                log.debug("Failed to provision vni or forwarding table: \n {}", ops.toString());
+                log.debug("Failed to install rules for telemetry stats: \n {}",
+                                                                ops.toString());
             }
         }));
     }
 
     /**
-     * Gets a set of the flow infos.
-     *
-     * @return a set of flow infos
-     */
-    public Set<FlowInfo> getFlowInfo() {
-        Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
-
-        // obtain all flow rule entries installed by telemetry app
-        for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
-            FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
-            TrafficSelector selector = entry.selector();
-
-            IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
-            IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
-            IPProtocolCriterion ipProtocol =
-                    (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
-
-            log.debug("[FlowInfo]  TableID:{}  SRC_IP:{}  DST_IP:{}  Pkt:{}  Byte:{}",
-                    ((IndexTableId) entry.table()).id(),
-                    srcIp.ip().toString(),
-                    dstIp.ip().toString(),
-                    entry.packets(),
-                    entry.bytes());
-
-            fBuilder.withFlowType(FLOW_TYPE_SONA)
-                    .withSrcIp(srcIp.ip())
-                    .withDstIp(dstIp.ip());
-
-            if (ipProtocol != null) {
-                fBuilder.withProtocol((byte) ipProtocol.protocol());
-
-                if (ipProtocol.protocol() == PROTOCOL_TCP) {
-                    TcpPortCriterion tcpSrc =
-                            (TcpPortCriterion) selector.getCriterion(TCP_SRC);
-                    TcpPortCriterion tcpDst =
-                            (TcpPortCriterion) selector.getCriterion(TCP_DST);
-
-                    log.debug("TCP SRC Port: {}, DST Port: {}",
-                            tcpSrc.tcpPort().toInt(),
-                            tcpDst.tcpPort().toInt());
-
-                    fBuilder.withSrcPort(tcpSrc.tcpPort());
-                    fBuilder.withDstPort(tcpDst.tcpPort());
-
-                } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
-
-                    UdpPortCriterion udpSrc =
-                            (UdpPortCriterion) selector.getCriterion(UDP_SRC);
-                    UdpPortCriterion udpDst =
-                            (UdpPortCriterion) selector.getCriterion(UDP_DST);
-
-                    log.debug("UDP SRC Port: {}, DST Port: {}",
-                            udpSrc.udpPort().toInt(),
-                            udpDst.udpPort().toInt());
-
-                    fBuilder.withSrcPort(udpSrc.udpPort());
-                    fBuilder.withDstPort(udpDst.udpPort());
-                } else {
-                    log.debug("Other protocol: {}", ipProtocol.protocol());
-                }
-            }
-
-            fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
-                    .withDstMac(getMacAddress(dstIp.ip().address()))
-                    .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
-                    .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
-                    .withVlanId(getVlanId(srcIp.ip().address()))
-                    .withDeviceId(entry.deviceId());
-
-            StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
-
-            // TODO: need to collect error and drop packets stats
-            // TODO: need to make the refresh interval configurable
-            sBuilder.withStartupTime(System.currentTimeMillis())
-                    .withFstPktArrTime(System.currentTimeMillis())
-                    .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
-                    .withCurrAccPkts((int) entry.packets())
-                    .withCurrAccBytes(entry.bytes())
-                    .withErrorPkts((short) 0)
-                    .withDropPkts((short) 0);
-
-            fBuilder.withStatsInfo(sBuilder.build());
-
-            FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
-
-            flowInfos.add(flowInfo);
-
-            log.debug("FlowInfo: \n{}", flowInfo.toString());
-        }
-
-        return flowInfos;
-    }
-
-    /**
      * Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
      *
      * @param flowInfo current FlowInfo object
@@ -515,9 +668,7 @@
             Optional<Host> host = hostService.getHostsByIp(ipAddress).stream().findAny();
             return host.map(host1 -> host1.location().deviceId()).orElse(null);
         } else {
-            log.warn("Failed to get DeviceID which is connected to {}. " +
-                            "The destination is either a bare-metal or located out of DC",
-                    ipAddress.toString());
+            log.debug("No DeviceID is associated to {}", ipAddress.toString());
             return null;
         }
     }
@@ -593,29 +744,54 @@
             egressStats = egressStatsConfigured;
             log.info("Configured. Egress stats flag is {}", egressStats);
         }
+
+        Boolean portStatsConfigured = getBooleanProperty(properties, PORT_STATS);
+        if (portStatsConfigured == null) {
+            portStats = DEFAULT_PORT_STATS;
+            log.info("Port stats flag is NOT " +
+                    "configured, default value is {}", portStats);
+        } else {
+            portStats = portStatsConfigured;
+            log.info("Configured. Port stats flag is {}", portStats);
+        }
     }
 
-    private class InternalTimerTask extends TimerTask {
+    private class TelemetryCollector implements Runnable {
         @Override
         public void run() {
-            log.debug("Timer task thread starts ({})", loopCount++);
-
             Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
 
             // we only let the master controller of the device where the
-            // stats flow rules are installed send kafka message
-            getFlowInfo().forEach(f -> {
-                DeviceId deviceId = getDeviceId(f.srcIp().address());
-                if (mastershipService.isLocalMaster(deviceId)) {
+            // stats flow rules are installed send stats message
+            getFlowInfos().forEach(f -> {
+                if (checkSrcDstLocalMaster(f)) {
                     filteredFlowInfos.add(f);
                 }
             });
 
-            try {
-                telemetryService.publish(filteredFlowInfos);
-            } catch (Exception ex) {
-                log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+            // we only let the master controller of the device where the port
+            // is located to send stats message
+            if (portStats) {
+                getDstPortBasedFlowInfos().forEach(f -> {
+                    if (checkSrcDstLocalMaster(f)) {
+                        filteredFlowInfos.add(f);
+                    }
+                });
             }
+
+            telemetryService.publish(filteredFlowInfos);
+        }
+
+        private boolean checkSrcDstLocalMaster(FlowInfo info) {
+            DeviceId srcDeviceId = getDeviceId(info.srcIp().address());
+            DeviceId dstDeviceId = getDeviceId(info.dstIp().address());
+
+            boolean isSrcLocalMaster = srcDeviceId != null &&
+                    mastershipService.isLocalMaster(srcDeviceId);
+            boolean isDstLocalMaster = dstDeviceId != null &&
+                    mastershipService.isLocalMaster(dstDeviceId);
+
+            return isSrcLocalMaster || isDstLocalMaster;
         }
     }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
index 2efab8b..419b789 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
@@ -30,6 +30,7 @@
 import org.slf4j.LoggerFactory;
 
 import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
 import javax.ws.rs.Path;
@@ -98,6 +99,9 @@
      * @param input openstack flow rule JSON input stream
      * @return 200 OK if processing is correct.
      */
+    @DELETE
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
     public Response deleteBulkFlowRule(InputStream input) {
         log.info("DELETE BULK FLOW RULE: {}", input.toString());
 
@@ -132,7 +136,7 @@
         log.info("GET BULK FLOW RULE");
 
         Set<FlowInfo> flowInfoSet;
-        flowInfoSet = statsFlowRuleService.getFlowInfo();
+        flowInfoSet = statsFlowRuleService.getFlowInfos();
 
         log.info("\n\n======================================================\n" +
                  "FlowInfo Set: \n{}" +
@@ -151,11 +155,10 @@
     }
 
     @GET
-    @Path("list/{src_ip_prefix}/{dst_ip_prefix}")
+    @Path("list/{srcIpPrefix}/{dstIpPrefix}")
     @Produces(MediaType.APPLICATION_JSON)
-    public Response getFlowRule(
-            @PathParam("src_ip_prefix") String srcIpPrefix,
-            @PathParam("dst_ip_prefix") String dstIpPrefix) {
+    public Response getFlowRule(@PathParam("srcIpPrefix") String srcIpPrefix,
+                                @PathParam("dstIpPrefix") String dstIpPrefix) {
         return ok(root).build();
     }