Refactor OpenstackTelemetry App for better readability

Change-Id: I93353de31fb9671d8670ee44fc248fe7f36ac12b
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
index 35547b2..8b28797 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
@@ -35,14 +35,19 @@
 public class TinaFlowInfoByteBufferCodec extends ByteBufferCodec<FlowInfo> {
 
     private static final int MESSAGE_SIZE = 88;
+    private static final String OF_PREFIX = "of:";
 
     @Override
     public ByteBuffer encode(FlowInfo flowInfo) {
 
         ByteBuffer byteBuffer = ByteBuffer.allocate(MESSAGE_SIZE);
 
+        String  deviceId = flowInfo.deviceId().toString();
+        short switchId = (short) Integer.parseInt(deviceId.substring(3,
+                                                  deviceId.length()), 16);
+
         byteBuffer.put(flowInfo.flowType())
-                .putShort(Short.valueOf(flowInfo.deviceId().toString()))
+                .putShort(switchId)
                 .putInt(flowInfo.inputInterfaceId())
                 .putInt(flowInfo.outputInterfaceId())
                 .putShort(flowInfo.vlanId().toShort())
@@ -67,7 +72,8 @@
     public FlowInfo decode(ByteBuffer byteBuffer) {
 
         byte flowType = byteBuffer.get();
-        DeviceId deviceId = DeviceId.deviceId(String.valueOf(byteBuffer.getShort()));
+        String deviceIdStr = String.format("%016x", byteBuffer.getShort());
+        DeviceId deviceId = DeviceId.deviceId(OF_PREFIX + deviceIdStr);
         int inputInterfaceId = byteBuffer.getInt();
         int outputInterfaceId = byteBuffer.getInt();
         VlanId vlanId = VlanId.vlanId(byteBuffer.getShort());
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
new file mode 100644
index 0000000..3b849d1
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.codec;
+
+import org.onosproject.openstacktelemetry.api.FlowInfo;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+/**
+ * Tina Message ByteBuffer Codec.
+ */
+public class TinaMessageByteBufferCodec {
+
+    private static final int HEADER_SIZE = 8;
+    private static final int ENTRY_SIZE = 88;
+    private static final int MILLISECONDS = 1000;
+    private static final short KAFKA_MESSAGE_TYPE = 1;
+
+    /**
+     * Encodes a collection flow infos into byte buffer.
+     *
+     * @param flowInfos a collection of flow info
+     * @return encoded byte buffer
+     */
+    public ByteBuffer encode(Set<FlowInfo> flowInfos) {
+        ByteBuffer byteBuffer =
+                ByteBuffer.allocate(HEADER_SIZE + flowInfos.size() * ENTRY_SIZE);
+
+        byteBuffer.put(buildMessageHeader(flowInfos));
+        byteBuffer.put(buildMessageBody(flowInfos));
+
+        return byteBuffer;
+    }
+
+    private byte[] buildMessageHeader(Set<FlowInfo> flowInfos) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+        byteBuffer.putShort((short) flowInfos.size());
+        byteBuffer.putShort(KAFKA_MESSAGE_TYPE);
+        byteBuffer.putInt((int) (System.currentTimeMillis() / MILLISECONDS));
+
+        return byteBuffer.array();
+    }
+
+    private byte[] buildMessageBody(Set<FlowInfo> flowInfos) {
+        ByteBuffer byteBuffer = ByteBuffer.allocate(flowInfos.size() * ENTRY_SIZE);
+
+        TinaFlowInfoByteBufferCodec codec = new TinaFlowInfoByteBufferCodec();
+        flowInfos.forEach(flowInfo -> byteBuffer.put(codec.encode(flowInfo)));
+
+        return byteBuffer.array();
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
index 74a5016..3f56362 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
@@ -141,6 +141,16 @@
     }
 
     @Override
+    public boolean roughEquals(FlowInfo flowInfo) {
+        return deviceId.equals(flowInfo.deviceId()) &&
+                srcIp.equals(flowInfo.srcIp()) &&
+                dstIp.equals(flowInfo.dstIp()) &&
+                srcPort.equals(flowInfo.srcPort()) &&
+                dstPort.equals(flowInfo.dstPort()) &&
+                (protocol == flowInfo.protocol());
+    }
+
+    @Override
     public boolean equals(Object obj) {
         if (this == obj) {
             return true;
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index 4629a4e..da492a9 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -21,7 +21,6 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onosproject.openstacktelemetry.api.ByteBufferCodec;
 import org.onosproject.openstacktelemetry.api.FlowInfo;
 import org.onosproject.openstacktelemetry.api.GrpcTelemetryService;
 import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryService;
@@ -29,12 +28,13 @@
 import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.RestTelemetryService;
 import org.onosproject.openstacktelemetry.api.TelemetryService;
-import org.onosproject.openstacktelemetry.codec.TinaFlowInfoByteBufferCodec;
+import org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Set;
 
 /**
  * Openstack telemetry manager.
@@ -45,6 +45,9 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final String KAFKA_TOPIC = "sona.flow";
+    private static final String KAFKA_KEY = "flowdata";
+
     private List<TelemetryService> telemetryServices = Lists.newArrayList();
 
     @Activate
@@ -68,45 +71,42 @@
     }
 
     @Override
-    public void publish(FlowInfo flowInfo) {
+    public void publish(Set<FlowInfo> flowInfos) {
         telemetryServices.forEach(service -> {
 
-
             if (service instanceof GrpcTelemetryManager) {
-                invokeGrpcPublisher((GrpcTelemetryService) service, flowInfo);
+                invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos);
             }
 
             if (service instanceof InfluxDbTelemetryManager) {
-                invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfo);
+                invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfos);
             }
 
             if (service instanceof KafkaTelemetryManager) {
-                invokeKafkaPublisher((KafkaTelemetryService) service, flowInfo);
+                invokeKafkaPublisher((KafkaTelemetryService) service, flowInfos);
             }
 
             if (service instanceof RestTelemetryManager) {
-                invokeRestPublisher((RestTelemetryService) service, flowInfo);
+                invokeRestPublisher((RestTelemetryService) service, flowInfos);
             }
-
         });
     }
 
-    private void invokeGrpcPublisher(GrpcTelemetryService service, FlowInfo flowInfo) {
+    private void invokeGrpcPublisher(GrpcTelemetryService service, Set<FlowInfo> flowInfos) {
         // TODO: need provide implementation
     }
 
-    private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, FlowInfo flowInfo) {
+    private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set<FlowInfo> flowInfos) {
         // TODO: need provide implementation
     }
 
-    private void invokeKafkaPublisher(KafkaTelemetryService service, FlowInfo flowInfo) {
-        ByteBufferCodec codec = new TinaFlowInfoByteBufferCodec();
-        ByteBuffer buffer = codec.encode(flowInfo);
-        service.publish(new ProducerRecord<>("sona.flow", "flowdata", buffer.array()));
+    private void invokeKafkaPublisher(KafkaTelemetryService service, Set<FlowInfo> flowInfos) {
+        TinaMessageByteBufferCodec codec = new TinaMessageByteBufferCodec();
+        ByteBuffer buffer = codec.encode(flowInfos);
+        service.publish(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY, buffer.array()));
     }
 
-    private void invokeRestPublisher(RestTelemetryService service, FlowInfo flowInfo) {
+    private void invokeRestPublisher(RestTelemetryService service, Set<FlowInfo> flowInfos) {
         // TODO: need provide implementation
     }
-
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index c0646ad..e2f505c 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.openstacktelemetry.impl;
 
+import com.google.common.collect.Sets;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -22,9 +23,6 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.osgi.DefaultServiceDirectory;
-import org.onlab.packet.Ethernet;
-import org.onlab.packet.IPv4;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
@@ -45,27 +43,39 @@
 import org.onosproject.net.flow.IndexTableId;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.flow.criteria.Criterion;
 import org.onosproject.net.flow.criteria.IPCriterion;
 import org.onosproject.net.flow.criteria.IPProtocolCriterion;
 import org.onosproject.net.flow.criteria.TcpPortCriterion;
 import org.onosproject.net.flow.criteria.UdpPortCriterion;
 import org.onosproject.net.host.HostService;
 import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
 import org.onosproject.openstacktelemetry.api.StatsFlowRule;
 import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
 import org.onosproject.openstacktelemetry.api.StatsInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 
+import static org.onlab.packet.Ethernet.TYPE_IPV4;
+import static org.onlab.packet.IPv4.PROTOCOL_TCP;
+import static org.onlab.packet.IPv4.PROTOCOL_UDP;
+import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
+import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
+import static org.onosproject.net.flow.criteria.Criterion.Type.IP_PROTO;
+import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_DST;
+import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_SRC;
+import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_DST;
+import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_SRC;
+import static org.onosproject.openstacknetworking.api.Constants.DHCP_ARP_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.FORWARDING_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.STAT_INBOUND_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TABLE;
 import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
 
-
 /**
  * Flow rule manager for network statistics of a VM.
  */
@@ -77,7 +87,7 @@
 
     private static final byte FLOW_TYPE_SONA = 1; // VLAN
 
-    public static final int MILLISECONDS = 1000;
+    private static final int MILLISECONDS = 1000;
     private static final int REFRESH_INTERVAL = 5;
 
     private ApplicationId appId;
@@ -91,11 +101,16 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected HostService hostService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected OpenstackTelemetryService telemetryService;
+
     private Timer timer;
     private TimerTask task;
-    private OpenstackTelemetryManager osTelemetryManager;
 
-    Set<FlowInfo> gFlowInfoSet = new HashSet<>();
+    private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
     private int loopCount = 0;
 
     private static final int SOURCE_ID = 1;
@@ -104,51 +119,29 @@
     private static final int METRIC_PRIORITY_SOURCE  = SOURCE_ID * PRIORITY_BASE;
     private static final int METRIC_PRIORITY_TARGET  = TARGET_ID * PRIORITY_BASE;
 
-    public static final int    FLOW_TABLE_VM_SOURCE  =  0; // STAT_INBOUND_TABLE
-    public static final int    FLOW_TABLE_DHCP_ARP   =  1; // DHCP_ARP_TABLE
-    public static final int    FLOW_TABLE_VM_TARGET  = 49; // STAT_OUTBOUND_TABLE
-    public static final int    FLOW_TABLE_FORWARDING = 50; // FORWARDING_TABLE
-
-    static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
+    private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
 
     public StatsFlowRuleManager() {
-        log.info("Object is instantiated");
         this.timer = new Timer("openstack-telemetry-sender");
     }
 
     @Activate
     protected void activate() {
         appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
-        log.info("Application is activated");
-        osTelemetryManager = new OpenstackTelemetryManager();
+
         this.start();
+
+        log.info("Started");
     }
 
     @Deactivate
     protected void deactivate() {
-        log.info("Application is deactivated");
-    }
-
-    private class InternalTimerTask extends TimerTask {
-        @Override
-        public void run() {
-            log.debug("Timger Task Thread Starts ({})", loopCount++);
-            try {
-                Set<FlowInfo> flowInfoSet = getFlowRule();
-                for (FlowInfo flowInfo: flowInfoSet) {
-                    log.info("Publish FlowInfo to NMS: {}", flowInfo.toString());
-                    osTelemetryManager.publish(flowInfo);
-                }
-            } catch (Exception ex) {
-                log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
-            }
-        }
+        log.info("Stopped");
     }
 
     @Override
     public void start() {
         log.info("Start publishing thread");
-        Set<FlowInfo>  gFlowInfoSet = getFlowRule();
         task = new InternalTimerTask();
         timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
                                   MILLISECONDS * REFRESH_INTERVAL);
@@ -161,65 +154,86 @@
         task = null;
     }
 
-    public void connectTables(
-                            DeviceId deviceId,
-                            int fromTable,
-                            int toTable,
-                            StatsFlowRule statsFlowRule,
-                            int rulePriority,
-                            boolean installFlag) {
-        try {
-            log.debug("Table Transition: {} -> {}", fromTable, toTable);
-            int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
-            int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
-            int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
+    @Override
+    public void createStatFlowRule(StatsFlowRule statsFlowRule) {
 
-            TrafficSelector.Builder selector;
-            if (statsFlowRule == null) {
-                selector = DefaultTrafficSelector.builder();
-            } else {
-                selector = DefaultTrafficSelector.builder()
-                                       .matchEthType(Ethernet.TYPE_IPV4)
-                                       .matchIPSrc(statsFlowRule.srcIpPrefix())
-                                       .matchIPDst(statsFlowRule.dstIpPrefix());
-                if (statsFlowRule.ipProtocol() == IPv4.PROTOCOL_TCP) {
-                    selector = selector.matchIPProtocol(statsFlowRule.ipProtocol())
-                                       .matchTcpSrc(statsFlowRule.srcTpPort())
-                                       .matchTcpDst(statsFlowRule.dstTpPort());
-                } else if (statsFlowRule.ipProtocol() == IPv4.PROTOCOL_UDP) {
-                    selector = selector.matchIPProtocol(statsFlowRule.ipProtocol())
-                                       .matchUdpSrc(statsFlowRule.srcTpPort())
-                                       .matchUdpDst(statsFlowRule.dstTpPort());
-                }
-            }
+        setStatFlowRule(statsFlowRule, true);
 
-            TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
-            treatment.transition(toTable);
-            FlowRule flowRule = DefaultFlowRule.builder()
-                                               .forDevice(deviceId)
-                                               .withSelector(selector.build())
-                                               .withTreatment(treatment.build())
-                                               .withPriority(prefixLength)
-                                               .fromApp(appId)
-                                               .makePermanent()
-                                               .forTable(fromTable)
-                                               .build();
-            applyRule(flowRule, installFlag);
-        } catch (Exception ex) {
-            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+        log.info("Install stat flow rule for SrcIp:{} DstIp:{}",
+                                        statsFlowRule.srcIpPrefix().toString(),
+                                        statsFlowRule.dstIpPrefix().toString());
+    }
+
+    @Override
+    public void deleteStatFlowRule(StatsFlowRule statsFlowRule) {
+        // FIXME: following code might not be necessary
+        flowRuleService.removeFlowRulesById(appId);
+
+        setStatFlowRule(statsFlowRule, false);
+
+        log.info("Remove stat flow rule for SrcIp:{} DstIp:{}",
+                                        statsFlowRule.srcIpPrefix().toString(),
+                                        statsFlowRule.dstIpPrefix().toString());
+    }
+
+    private void connectTables(DeviceId deviceId, int fromTable, int toTable,
+                               StatsFlowRule statsFlowRule, int rulePriority,
+                               boolean install) {
+
+        log.debug("Table Transition: {} -> {}", fromTable, toTable);
+        int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
+        int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
+        int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
+        byte protocol = statsFlowRule.ipProtocol();
+
+        TrafficSelector.Builder selectorBuilder =
+                                        DefaultTrafficSelector.builder()
+                                        .matchEthType(TYPE_IPV4)
+                                        .matchIPSrc(statsFlowRule.srcIpPrefix())
+                                        .matchIPDst(statsFlowRule.dstIpPrefix());
+
+        if (protocol == PROTOCOL_TCP) {
+            selectorBuilder = selectorBuilder
+                                        .matchIPProtocol(statsFlowRule.ipProtocol())
+                                        .matchTcpSrc(statsFlowRule.srcTpPort())
+                                        .matchTcpDst(statsFlowRule.dstTpPort());
+
+        } else if (protocol == PROTOCOL_UDP) {
+            selectorBuilder = selectorBuilder
+                                        .matchIPProtocol(statsFlowRule.ipProtocol())
+                                        .matchUdpSrc(statsFlowRule.srcTpPort())
+                                        .matchUdpDst(statsFlowRule.dstTpPort());
+        } else {
+            log.warn("Unsupported protocol {}", statsFlowRule.ipProtocol());
         }
+
+        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+
+        treatmentBuilder.transition(toTable);
+
+        FlowRule flowRule = DefaultFlowRule.builder()
+                                        .forDevice(deviceId)
+                                        .withSelector(selectorBuilder.build())
+                                        .withTreatment(treatmentBuilder.build())
+                                        .withPriority(prefixLength)
+                                        .fromApp(appId)
+                                        .makePermanent()
+                                        .forTable(fromTable)
+                                        .build();
+
+        applyRule(flowRule, install);
     }
 
     /**
-     * Apply FlowRule to switch.
+     * Installs stats related flow rule to switch.
      *
-     * @param flowRule FlowRule
-     * @param install Flag to install or not
+     * @param flowRule flow rule
+     * @param install flag to install or not
      */
     private void applyRule(FlowRule flowRule, boolean install) {
-        log.debug("Apply flow rule to bridge device");
         FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
-        flowOpsBuilder = install ? flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
+        flowOpsBuilder = install ?
+                flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
 
         flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
             @Override
@@ -235,184 +249,97 @@
     }
 
     /**
-     * Craete a flow rule.
+     * Gets a set of the flow infos.
      *
-     * @param flowRule  flow rule for Openstack VMs
+     * @return a set of flow infos
      */
-    @Override
-    public void createFlowRule(StatsFlowRule flowRule) {
-        try {
-            log.debug("Create Flow Rule. SrcIp:{} DstIp:{}",
-                      flowRule.srcIpPrefix().toString(),
-                      flowRule.dstIpPrefix().toString());
+    public Set<FlowInfo> getFlowInfo() {
+        Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
 
-            // To make a inversed flow rule.
-            DefaultStatsFlowRule.Builder inverseFlowRuleBuilder
-                                        = DefaultStatsFlowRule
-                                            .builder()
-                                            .srcIpPrefix(flowRule.dstIpPrefix())
-                                            .dstIpPrefix(flowRule.srcIpPrefix())
-                                            .ipProtocol(flowRule.ipProtocol())
-                                            .srcTpPort(flowRule.dstTpPort())
-                                            .dstTpPort(flowRule.srcTpPort());
-            StatsFlowRule inverseFlowRule = inverseFlowRuleBuilder.build();
-            DeviceService deviceService = DefaultServiceDirectory.getService(DeviceService.class);
-            Iterable<Device> devices = deviceService.getDevices();
-            for (Device d : devices) {
-                log.debug("Device: {}", d.toString());
-                if (d.type() == Device.Type.CONTROLLER) {
-                    log.info("Don't create flow rule for 'DeviceType=CONTROLLER' ({})",
-                             d.id().toString());
-                    continue;
-                }
-                connectTables(d.id(), FLOW_TABLE_VM_SOURCE, FLOW_TABLE_DHCP_ARP,
-                              flowRule, METRIC_PRIORITY_SOURCE, true);
-                connectTables(d.id(), FLOW_TABLE_VM_TARGET, FLOW_TABLE_FORWARDING,
-                              inverseFlowRule, METRIC_PRIORITY_TARGET, true);
+        // obtain all flow rule entries installed by telemetry app
+        for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
+            FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
+            TrafficSelector selector = entry.selector();
+
+            IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
+            IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
+            IPProtocolCriterion ipProtocol =
+                                (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
+
+            log.debug("[FlowInfo]  TableID:{}  SRC_IP:{}  DST_IP:{}  Pkt:{}  Byte:{}",
+                                                ((IndexTableId) entry.table()).id(),
+                                                srcIp.ip().toString(),
+                                                dstIp.ip().toString(),
+                                                entry.packets(),
+                                                entry.bytes());
+
+            fBuilder.withFlowType(FLOW_TYPE_SONA)
+                    .withSrcIp(srcIp.ip())
+                    .withDstIp(dstIp.ip())
+                    .withProtocol((byte) ipProtocol.protocol());
+
+            if (ipProtocol.protocol() == PROTOCOL_TCP) {
+                TcpPortCriterion tcpSrc =
+                                (TcpPortCriterion) selector.getCriterion(TCP_SRC);
+                TcpPortCriterion tcpDst =
+                                (TcpPortCriterion) selector.getCriterion(TCP_DST);
+
+                log.debug("TCP SRC Port: {}, DST Port: {}",
+                                                    tcpSrc.tcpPort().toInt(),
+                                                    tcpDst.tcpPort().toInt());
+
+                fBuilder.withSrcPort(tcpSrc.tcpPort());
+                fBuilder.withDstPort(tcpDst.tcpPort());
+
+            } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
+
+                UdpPortCriterion udpSrc =
+                                (UdpPortCriterion) selector.getCriterion(UDP_SRC);
+                UdpPortCriterion udpDst =
+                                (UdpPortCriterion) selector.getCriterion(UDP_DST);
+
+                log.debug("UDP SRC Port: {}, DST Port: {}",
+                                                    udpSrc.udpPort().toInt(),
+                                                    udpDst.udpPort().toInt());
+
+                fBuilder.withSrcPort(udpSrc.udpPort());
+                fBuilder.withDstPort(udpDst.udpPort());
+            } else {
+                log.debug("Other protocol: {}", ipProtocol.protocol());
             }
-        } catch (Exception ex) {
-            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+
+            fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
+                    .withDstMac(getMacAddress(dstIp.ip().address()))
+                    .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
+                    .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
+                    .withVlanId(getVlanId(srcIp.ip().address()))
+                    .withDeviceId(entry.deviceId());
+
+            StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
+
+            // TODO: need to collect error and drop packets stats
+            // TODO: need to make the refresh interval configurable
+            sBuilder.withStartupTime(0)
+                    .withCurrAccPkts((int) entry.packets())
+                    .withCurrAccBytes(entry.bytes())
+                    .withErrorPkts((short) 0)
+                    .withDropPkts((short) 0)
+                    .withLstPktOffset(REFRESH_INTERVAL * MILLISECONDS);
+
+            fBuilder.withStatsInfo(sBuilder.build());
+
+            FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
+
+            flowInfos.add(flowInfo);
+
+            log.debug("FlowInfo: \n{}", flowInfo.toString());
         }
+
+        return flowInfos;
     }
 
     /**
-     * Get FlowRule.
-     *
-     * @param flowRule Flow rule for a VM
-     * @return Set of FlowInfo
-     */
-    public Set<FlowInfo> getFlowRule(StatsFlowRule flowRule) {
-        Set<FlowInfo> flowInfoSet = new HashSet<>();
-        log.info("Get flow rule: {}", flowRule.toString());
-        // TODO  Make a implementation here.
-        return flowInfoSet;
-    }
-
-    /**
-     * Delete FlowRule for StatsInfo.
-     *
-     * @param flowRule  Flow rule for Openstack VM
-     */
-    @Override
-    public void deleteFlowRule(StatsFlowRule flowRule) {
-        log.debug("Delete Flow Rule: {}", flowRule.toString());
-        flowRuleService = DefaultServiceDirectory.getService(FlowRuleService.class);
-        flowRuleService.removeFlowRulesById(appId);
-        // TODO  Write a implementation code here
-
-        try {
-            log.debug("Delete Flow Rule. SrcIp:{} DstIp:{}",
-                      flowRule.srcIpPrefix().toString(),
-                      flowRule.dstIpPrefix().toString());
-
-            // To make a inversed flow rule.
-            DefaultStatsFlowRule.Builder inverseFlowRuleBuilder
-                                        = DefaultStatsFlowRule
-                                            .builder()
-                                            .srcIpPrefix(flowRule.dstIpPrefix())
-                                            .dstIpPrefix(flowRule.srcIpPrefix())
-                                            .ipProtocol(flowRule.ipProtocol())
-                                            .srcTpPort(flowRule.dstTpPort())
-                                            .dstTpPort(flowRule.srcTpPort());
-            StatsFlowRule inverseFlowRule = inverseFlowRuleBuilder.build();
-            DeviceService deviceService = DefaultServiceDirectory.getService(DeviceService.class);
-            Iterable<Device> devices = deviceService.getDevices();
-            for (Device d : devices) {
-                log.debug("Device: {}", d.toString());
-                if (d.type() == Device.Type.CONTROLLER) {
-                    log.info("Don't care for 'DeviceType=CONTROLLER' ({})",
-                             d.id().toString());
-                    continue;
-                }
-                connectTables(d.id(), FLOW_TABLE_VM_SOURCE, FLOW_TABLE_DHCP_ARP,
-                              flowRule, METRIC_PRIORITY_SOURCE, false);
-                connectTables(d.id(), FLOW_TABLE_VM_TARGET, FLOW_TABLE_FORWARDING,
-                              inverseFlowRule, METRIC_PRIORITY_TARGET, false);
-            }
-        } catch (Exception ex) {
-            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
-        }
-    }
-
-    /**
-     * Get a list of the FlowRule Store.
-     *
-     * @return list of Flow Rule
-     */
-    public Set<FlowInfo> getFlowRule() {
-        log.debug("Get Flow Information List");
-        Set<FlowInfo> flowInfoSet = new HashSet<>();
-        try {
-            flowRuleService = DefaultServiceDirectory.getService(FlowRuleService.class);
-            Iterable<FlowEntry> flowEntries = flowRuleService.getFlowEntriesById(appId);
-
-            for (FlowEntry entry : flowEntries) {
-                FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
-                IPCriterion srcIpCriterion =
-                        (IPCriterion) entry.selector().getCriterion(Criterion.Type.IPV4_SRC);
-                IPCriterion dstIpCriterion =
-                        (IPCriterion) entry.selector().getCriterion(Criterion.Type.IPV4_DST);
-                IPProtocolCriterion ipProtocolCriterion =
-                        (IPProtocolCriterion) entry.selector().getCriterion(Criterion.Type.IP_PROTO);
-
-                log.debug("[FlowInfo]  TableID:{}  SRC_IP:{}  DST_IP:{}  Pkt:{}  Byte:{}",
-                         ((IndexTableId) entry.table()).id(),
-                         srcIpCriterion.ip().toString(), dstIpCriterion.ip().toString(),
-                         entry.packets(), entry.bytes());
-
-                fBuilder.withFlowType(FLOW_TYPE_SONA).withSrcIp(srcIpCriterion.ip())
-                        .withDstIp(dstIpCriterion.ip())
-                        .withProtocol((byte) ipProtocolCriterion.protocol());
-
-                if (ipProtocolCriterion.protocol() == IPv4.PROTOCOL_TCP) {
-                    TcpPortCriterion tcpSrcCriterion =
-                            (TcpPortCriterion) entry.selector().getCriterion(Criterion.Type.TCP_SRC);
-                    TcpPortCriterion tcpDstCriterion =
-                            (TcpPortCriterion) entry.selector().getCriterion(Criterion.Type.TCP_DST);
-                    log.debug("TCP SRC Port: {}   Dst Port: {}",
-                                tcpSrcCriterion.tcpPort().toInt(), tcpDstCriterion.tcpPort().toInt());
-                    fBuilder.withSrcPort(tcpSrcCriterion.tcpPort());
-                    fBuilder.withDstPort(tcpDstCriterion.tcpPort());
-                } else if (ipProtocolCriterion.protocol() == IPv4.PROTOCOL_UDP) {
-                    UdpPortCriterion udpSrcCriterion =
-                            (UdpPortCriterion) entry.selector().getCriterion(Criterion.Type.UDP_SRC);
-                    UdpPortCriterion udpDstCriterion =
-                            (UdpPortCriterion) entry.selector().getCriterion(Criterion.Type.UDP_DST);
-                    log.debug("UDP SRC Port: {}   Dst Port: {}",
-                                udpSrcCriterion.udpPort().toInt(), udpDstCriterion.udpPort().toInt());
-                    fBuilder.withSrcPort(udpSrcCriterion.udpPort());
-                    fBuilder.withDstPort(udpDstCriterion.udpPort());
-                } else {
-                    log.debug("Other protocol: {}", ipProtocolCriterion.protocol());
-                }
-
-                fBuilder.withSrcMac(getMacAddress(srcIpCriterion.ip().address()))
-                        .withDstMac(getMacAddress(dstIpCriterion.ip().address()))
-                        .withInputInterfaceId(getInterfaceId(srcIpCriterion.ip().address()))
-                        .withOutputInterfaceId(getInterfaceId(dstIpCriterion.ip().address()))
-                        .withVlanId(getVlanId(srcIpCriterion.ip().address()))
-                        .withDeviceId(entry.deviceId());
-
-                StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
-                sBuilder.withStartupTime(0)
-                        .withCurrAccPkts((int) entry.packets()).withCurrAccBytes(entry.bytes())
-                        .withErrorPkts((short) 0).withDropPkts((short) 0)
-                        .withLstPktOffset(REFRESH_INTERVAL * MILLISECONDS);
-
-                fBuilder.withStatsInfo(sBuilder.build());
-
-                FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
-                flowInfoSet.add(flowInfo);
-                log.debug("FlowInfo: \n{}", flowInfo.toString());
-            }
-        } catch (Exception ex) {
-            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
-        }
-        return flowInfoSet;
-    }
-
-    /**
-     * Merge old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
+     * Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
      *
      * @param flowInfo current FlowInfo object
      * @param fBuilder Builder for FlowInfo
@@ -422,39 +349,53 @@
     private FlowInfo mergeFlowInfo(FlowInfo flowInfo,
                                    FlowInfo.Builder fBuilder,
                                    StatsInfo.Builder sBuilder) {
-        try {
-            log.debug("Current FlowInfo:\n{}", flowInfo.toString());
-            for (FlowInfo gFlowInfo: gFlowInfoSet) {
-                log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
-                if (gFlowInfo.deviceId().equals(flowInfo.deviceId()) &&
-                        gFlowInfo.srcIp().equals(flowInfo.srcIp()) &&
-                        gFlowInfo.dstIp().equals(flowInfo.dstIp()) &&
-                        gFlowInfo.srcPort().equals(flowInfo.srcPort()) &&
-                        gFlowInfo.dstPort().equals(flowInfo.dstPort()) &&
-                        (gFlowInfo.protocol() == flowInfo.protocol())
-                        ) {
-                    // Get old StatsInfo object and merge the value to current object.
-                    StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
-                    sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
-                    sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
-                    FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
-                    gFlowInfoSet.remove(gFlowInfo);
-                    gFlowInfoSet.add(newFlowInfo);
-                    log.info("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
-                    return newFlowInfo;
-                }
+        for (FlowInfo gFlowInfo : gFlowInfoSet) {
+            log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
+            if (gFlowInfo.roughEquals(flowInfo)) {
+
+                // Get old StatsInfo object and merge the value to current object.
+                StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
+                sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
+                sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
+                FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build())
+                        .build();
+
+                gFlowInfoSet.remove(gFlowInfo);
+                gFlowInfoSet.add(newFlowInfo);
+                log.info("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
+                return newFlowInfo;
             }
-            // No such record, then build the FlowInfo object and return this object.
-            log.info("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
-            FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
-            gFlowInfoSet.add(newFlowInfo);
-            return newFlowInfo;
-        } catch (Exception ex) {
-            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
         }
-        log.debug("Add this FlowInfo {}", flowInfo.toString());
-        gFlowInfoSet.add(flowInfo);
-        return flowInfo;
+
+        // No such record, then build the FlowInfo object and return this object.
+        log.info("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
+        FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
+        gFlowInfoSet.add(newFlowInfo);
+        return newFlowInfo;
+    }
+
+    private void setStatFlowRule(StatsFlowRule statsFlowRule, boolean install) {
+        StatsFlowRule inverseFlowRule = DefaultStatsFlowRule.builder()
+                                        .srcIpPrefix(statsFlowRule.dstIpPrefix())
+                                        .dstIpPrefix(statsFlowRule.srcIpPrefix())
+                                        .ipProtocol(statsFlowRule.ipProtocol())
+                                        .srcTpPort(statsFlowRule.dstTpPort())
+                                        .dstTpPort(statsFlowRule.srcTpPort())
+                                        .build();
+
+        // FIXME: install stat flow rules for all devices for now
+        // need to query the device where the host with the given IP located
+        for (Device d : deviceService.getDevices()) {
+            if (d.type() == Device.Type.CONTROLLER) {
+                log.info("Not provide stats for 'CONTROLLER' ({})", d.id().toString());
+                continue;
+            }
+
+            connectTables(d.id(), STAT_INBOUND_TABLE, DHCP_ARP_TABLE,
+                            statsFlowRule, METRIC_PRIORITY_SOURCE, install);
+            connectTables(d.id(), STAT_OUTBOUND_TABLE, FORWARDING_TABLE,
+                            inverseFlowRule, METRIC_PRIORITY_TARGET, install);
+        }
     }
 
     /**
@@ -463,14 +404,10 @@
      * @param ipAddress IP Address of host
      * @return VLAN ID
      */
-    public VlanId getVlanId(IpAddress ipAddress) {
-        try {
-            if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
-                Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
-                return host.vlan();
-            }
-        } catch (Exception ex) {
-            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+    private VlanId getVlanId(IpAddress ipAddress) {
+        if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+            Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+            return host.vlan();
         }
         return VlanId.vlanId();
     }
@@ -481,14 +418,10 @@
      * @param ipAddress IP Address of host
      * @return Interface ID of Switch
      */
-    public int getInterfaceId(IpAddress ipAddress) {
-        try {
-            if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
-                Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
-                return (int) host.location().port().toLong();
-            }
-        } catch (Exception ex) {
-            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+    private int getInterfaceId(IpAddress ipAddress) {
+        if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+            Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+            return (int) host.location().port().toLong();
         }
         return -1;
     }
@@ -499,15 +432,24 @@
      * @param ipAddress IP Address of host
      * @return MAC Address of host
      */
-    public MacAddress getMacAddress(IpAddress ipAddress) {
-        try {
-            if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
-                Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
-                return host.mac();
-            }
-        } catch (Exception ex) {
-            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+    private MacAddress getMacAddress(IpAddress ipAddress) {
+        if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+            Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+            return host.mac();
         }
+
         return NO_HOST_MAC;
     }
+
+    private class InternalTimerTask extends TimerTask {
+        @Override
+        public void run() {
+            log.debug("Timer Task Thread Starts ({})", loopCount++);
+            try {
+                telemetryService.publish(getFlowInfo());
+            } catch (Exception ex) {
+                log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+            }
+        }
+    }
 }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
index 9b5209b..2efab8b 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
@@ -61,7 +61,8 @@
     private static final String JSON_NODE_FLOW_RULE = "rules";
     private static final String FLOW_RULE_ID = "STATS_FLOW_RULE_ID";
 
-    private final StatsFlowRuleAdminService statsFlowRuleService = get(StatsFlowRuleAdminService.class);
+    private final StatsFlowRuleAdminService
+                    statsFlowRuleService = get(StatsFlowRuleAdminService.class);
 
     @Context
     private UriInfo uriInfo;
@@ -81,7 +82,7 @@
 
         readNodeConfiguration(input).forEach(flowRule -> {
                 log.debug("FlowRule: {}", flowRule.toString());
-                statsFlowRuleService.createFlowRule(flowRule);
+                statsFlowRuleService.createStatFlowRule(flowRule);
             });
 
         UriBuilder locationBuilder = uriInfo.getBaseUriBuilder()
@@ -102,13 +103,12 @@
 
         readNodeConfiguration(input).forEach(flowRule -> {
             log.debug("FlowRule: {}", flowRule.toString());
-            statsFlowRuleService.deleteFlowRule(flowRule);
+            statsFlowRuleService.deleteStatFlowRule(flowRule);
         });
 
         return ok(root).build();
     }
 
-
     /**
      * Get flow rules which is installed on ONOS.
      *
@@ -120,7 +120,6 @@
         return ok(root).build();
     }
 
-
     /**
      * Get flow information list.
      *
@@ -133,7 +132,7 @@
         log.info("GET BULK FLOW RULE");
 
         Set<FlowInfo> flowInfoSet;
-        flowInfoSet = statsFlowRuleService.getFlowRule();
+        flowInfoSet = statsFlowRuleService.getFlowInfo();
 
         log.info("\n\n======================================================\n" +
                  "FlowInfo Set: \n{}" +
@@ -160,7 +159,6 @@
         return ok(root).build();
     }
 
-
     private Set<StatsFlowRule> readNodeConfiguration(InputStream input) {
         log.info("Input JSON Data: \n\t\t{}", input.toString());
         Set<StatsFlowRule> flowRuleSet = Sets.newHashSet();
@@ -187,17 +185,4 @@
 
         return flowRuleSet;
     }
-
-    /**
-     * OpenstackTelemetryImpl method.
-     *
-     * @return 200 OK
-     *
-     * @onos.rsModel dummy
-     */
-    @GET
-    @Produces(MediaType.APPLICATION_JSON)
-    public Response dummy() {
-        return ok(root).build();
-    }
 }