Add flow rule interface, builder, admin manager for statistics.

 Add API for removing flow rule.

Change-Id: If642a2ec8546f73da7234197ad19a97b6a1dc9da
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRule.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRule.java
new file mode 100644
index 0000000..a5fd61a
--- /dev/null
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRule.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.api;
+
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.TpPort;
+
+/**
+ * Flow Rule Interface for Statistics.
+ */
+public interface StatsFlowRule {
+    /**
+     * Returns IP Prefix of Source VM.
+     *
+     * @return srcIpPrefix
+     */
+    IpPrefix srcIpPrefix();
+
+    /**
+     * Returns IP Prefix of Destination VM.
+     *
+     * @return dstIpPrefix
+     */
+    IpPrefix dstIpPrefix();
+
+
+    /**
+     * Returns IP protocol.
+     *
+     * @return ipProtocol
+     */
+    byte ipProtocol();
+
+    /**
+     * Returns source transport port.
+     *
+     * @return srcTpPort
+     */
+
+    TpPort srcTpPort();
+
+    /**
+     * Returns destination transport port.
+     *
+     * @return dstTpPort
+     */
+    TpPort dstTpPort();
+
+
+
+
+    /**
+     * Builder of new flow rule entities.
+     */
+    interface Builder {
+
+        /**
+         * Builds an immutable openstack flow rule instance.
+         *
+         * @return openstack flow rule instance
+         */
+        StatsFlowRule build();
+
+        /**
+         * Returns openstack flow rule builder with supplied srcIpPrefix.
+         *
+         * @param srcIpPrefix Source IP address
+         * @return openstack flow rule builder
+         */
+        Builder srcIpPrefix(IpPrefix srcIpPrefix);
+
+
+        /**
+         * Returns openstack flow rule builder with supplied srcIpPrefix.
+         *
+         * @param dstIpPrefix Destination IP Prefix
+         * @return openstack flow rule builder
+         */
+        Builder dstIpPrefix(IpPrefix dstIpPrefix);
+
+
+        /**
+         * Returns openstack flow rule builder with supplied ipProtocol.
+         *
+         * @param ipProtocol IP protocol number
+         * @return openstack flow rule builder
+         */
+        Builder ipProtocol(byte ipProtocol);
+
+
+        /**
+         * Returns openstack flow rule builder with supplied srcTpPort.
+         *
+         * @param srcTpPort Source transport port number
+         * @return openstack flow rule builder
+         */
+        Builder srcTpPort(TpPort srcTpPort);
+
+        /**
+         * Returns openstack flow rule builder with supplied dstTpPort.
+         *
+         * @param dstTpPort Destination transport port number
+         * @return openstack flow rule builder
+         */
+        Builder dstTpPort(TpPort dstTpPort);
+    }
+}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java
new file mode 100644
index 0000000..c0e5e5d
--- /dev/null
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.api;
+
+import java.util.Set;
+
+/**
+ * Admin service API for making a flow rule.
+ */
+public interface StatsFlowRuleAdminService {
+
+    /**
+     * Start this service.
+     */
+    void start();
+
+    /**
+     * Stop this service.
+     */
+    void stop();
+
+    /**
+     * Craete a flow rule.
+     *
+     * @param flowRule  Flow rule for a VM
+     */
+    void createFlowRule(StatsFlowRule flowRule);
+
+    /**
+     * Get flow rule list.
+     * @return flow rule list.
+     */
+    Set<FlowInfo> getFlowRule();
+
+    /**
+     * Get flow rule list.
+     * @param flowRule Flow rule for a VM
+     * @return flow rule list.
+     */
+    Set<FlowInfo> getFlowRule(StatsFlowRule flowRule);
+
+    /**
+     * Delete the flow rule.
+     *
+     * @param flowRule  Flow rule for Openstack VM
+     */
+    void deleteFlowRule(StatsFlowRule flowRule);
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/StatsFlowRuleJsonCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/StatsFlowRuleJsonCodec.java
new file mode 100644
index 0000000..8fc8143
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/StatsFlowRuleJsonCodec.java
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.codec;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.TpPort;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.JsonCodec;
+import org.onosproject.openstacktelemetry.api.StatsFlowRule;
+import org.onosproject.openstacktelemetry.impl.DefaultStatsFlowRule;
+import org.slf4j.Logger;
+
+import org.onlab.packet.IpPrefix;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+public class StatsFlowRuleJsonCodec extends JsonCodec<StatsFlowRule> {
+
+    private final Logger log = getLogger(getClass());
+
+    public static final String SRC_IP_PREFIX = "srcIpPrefix";
+    public static final String DST_IP_PREFIX = "dstIpPrefix";
+    public static final String IP_PROTOCOL   = "ipProtocol";
+    public static final String SRC_TP_PORT   = "srcTpPort";
+    public static final String DST_TP_PORT   = "dstTpPort";
+
+    public ObjectNode encode(StatsFlowRule flowRule, CodecContext context) {
+        checkNotNull(flowRule, "FlowInfo cannot be null");
+        ObjectNode result = context.mapper().createObjectNode()
+                            .put(SRC_IP_PREFIX, flowRule.srcIpPrefix().toString())
+                            .put(DST_IP_PREFIX, flowRule.dstIpPrefix().toString())
+                            .put(IP_PROTOCOL, flowRule.ipProtocol())
+                            .put(SRC_TP_PORT, flowRule.srcTpPort().toString())
+                            .put(DST_TP_PORT, flowRule.dstTpPort().toString());
+        return result;
+    }
+
+    @Override
+    public StatsFlowRule decode(ObjectNode json, CodecContext context) {
+        if (json == null || !json.isObject()) {
+            return null;
+        }
+        try {
+            String srcIpPrefix = json.get(SRC_IP_PREFIX).asText();
+            String dstIpPrefix = json.get(DST_IP_PREFIX).asText();
+            String tmpIpProtocol = new String("");
+            int srcTpPort  = 0;
+            int dstTpPort  = 0;
+
+            DefaultStatsFlowRule.Builder flowRuleBuilder;
+
+            byte ipProtocol = 0;
+            if (json.get(IP_PROTOCOL) == null) {
+                log.info("ipProtocol: null");
+                flowRuleBuilder = DefaultStatsFlowRule.builder()
+                        .srcIpPrefix(IpPrefix.valueOf(srcIpPrefix))
+                        .dstIpPrefix(IpPrefix.valueOf(dstIpPrefix));
+            } else {
+                tmpIpProtocol = json.get(IP_PROTOCOL).asText().toUpperCase();
+                srcTpPort     = json.get(SRC_TP_PORT).asInt();
+                dstTpPort     = json.get(DST_TP_PORT).asInt();
+                if (tmpIpProtocol.equals("TCP")) {
+                    ipProtocol = IPv4.PROTOCOL_TCP;
+                } else if (tmpIpProtocol.equals("UDP")) {
+                    ipProtocol = IPv4.PROTOCOL_UDP;
+                } else {
+                    ipProtocol = 0;
+                }
+
+                flowRuleBuilder = DefaultStatsFlowRule.builder()
+                        .srcIpPrefix(IpPrefix.valueOf(srcIpPrefix))
+                        .dstIpPrefix(IpPrefix.valueOf(dstIpPrefix))
+                        .ipProtocol(ipProtocol)
+                        .srcTpPort(TpPort.tpPort(srcTpPort))
+                        .dstTpPort(TpPort.tpPort(dstTpPort));
+            }
+
+            log.debug("StatsFlowRule after building from JSON:\n{}",
+                     flowRuleBuilder.build().toString());
+
+            return flowRuleBuilder.build();
+        } catch (Exception ex) {
+            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+        }
+        return null;
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
new file mode 100644
index 0000000..6328e64
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.base.MoreObjects;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.TpPort;
+import org.onosproject.openstacktelemetry.api.StatsFlowRule;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public final class DefaultStatsFlowRule implements StatsFlowRule {
+    private final IpPrefix srcIpPrefix;
+    private final IpPrefix dstIpPrefix;
+    private final byte     ipProtocol;
+    private final TpPort srcTpPort;
+    private final TpPort   dstTpPort;
+
+    private static final String NOT_NULL_MSG = "Element % cannot be null";
+
+    protected DefaultStatsFlowRule(IpPrefix srcIpPrefix,
+                                   IpPrefix dstIpPrefix,
+                                   byte ipProtoco,
+                                   TpPort srcTpPort,
+                                   TpPort dstTpPort) {
+        this.srcIpPrefix = srcIpPrefix;
+        this.dstIpPrefix = dstIpPrefix;
+        this.ipProtocol  = ipProtoco;
+        this.srcTpPort   = srcTpPort;
+        this.dstTpPort   = dstTpPort;
+    }
+
+    @Override
+    public IpPrefix srcIpPrefix() {
+        return srcIpPrefix;
+    }
+
+    @Override
+    public IpPrefix dstIpPrefix() {
+        return dstIpPrefix;
+    }
+
+    @Override
+    public byte ipProtocol() {
+        return ipProtocol;
+    }
+
+    @Override
+    public TpPort srcTpPort() {
+        return srcTpPort;
+    }
+
+    @Override
+    public TpPort dstTpPort() {
+        return dstTpPort;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("srcIpPrefix", srcIpPrefix)
+                .add("dstIpPrefix", dstIpPrefix)
+                .add("ipProtocol", ipProtocol)
+                .add("srcTpPort", srcTpPort)
+                .add("dstTpPort", dstTpPort)
+                .toString();
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public static Builder from(StatsFlowRule flowRule) {
+        return new Builder()
+                .srcIpPrefix(flowRule.srcIpPrefix())
+                .dstIpPrefix(flowRule.dstIpPrefix())
+                .ipProtocol(flowRule.ipProtocol())
+                .srcTpPort(flowRule.srcTpPort())
+                .dstTpPort(flowRule.dstTpPort());
+    }
+
+    /**
+     * A builder class for openstack flow rule.
+     */
+    public static final class Builder implements StatsFlowRule.Builder {
+        private IpPrefix srcIpPrefix;
+        private IpPrefix dstIpPrefix;
+        private byte     ipProtocol;
+        private TpPort   srcTpPort;
+        private TpPort   dstTpPort;
+
+        // private constructor not intended to use from external
+        private Builder() {
+        }
+
+        @Override
+        public DefaultStatsFlowRule build() {
+            checkArgument(srcIpPrefix != null, NOT_NULL_MSG, "Source IP Prefix");
+            checkArgument(dstIpPrefix != null, NOT_NULL_MSG, "Destination IP Prefix");
+
+            return new DefaultStatsFlowRule(srcIpPrefix,
+                                                dstIpPrefix,
+                                                ipProtocol,
+                                                srcTpPort,
+                                                dstTpPort);
+        }
+
+        @Override
+        public Builder srcIpPrefix(IpPrefix srcIpPrefix) {
+            this.srcIpPrefix = srcIpPrefix;
+            return this;
+        }
+
+        @Override
+        public Builder dstIpPrefix(IpPrefix dstIpPrefix) {
+            this.dstIpPrefix = dstIpPrefix;
+            return this;
+        }
+
+        @Override
+        public Builder ipProtocol(byte ipProtocol) {
+            this.ipProtocol = ipProtocol;
+            return this;
+        }
+
+        @Override
+        public Builder srcTpPort(TpPort srcTpPort) {
+            this.srcTpPort = srcTpPort;
+            return this;
+        }
+
+        @Override
+        public Builder dstTpPort(TpPort dstTpPort) {
+            this.dstTpPort = dstTpPort;
+            return this;
+        }
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index fa8d9fe..7e8a102 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -91,7 +91,7 @@
             producer = null;
         }
 
-        log.info("Kafka producer has Stopped");
+        log.info("InfluxDB producer has Stopped");
     }
 
     @Override
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
new file mode 100644
index 0000000..c0646ad
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -0,0 +1,513 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.osgi.DefaultServiceDirectory;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.IndexTableId;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.IPCriterion;
+import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.criteria.TcpPortCriterion;
+import org.onosproject.net.flow.criteria.UdpPortCriterion;
+import org.onosproject.net.host.HostService;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.StatsFlowRule;
+import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
+import org.onosproject.openstacktelemetry.api.StatsInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
+
+
+/**
+ * Flow rule manager for network statistics of a VM.
+ */
+@Component(immediate = true)
+@Service
+public class StatsFlowRuleManager implements StatsFlowRuleAdminService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static final byte FLOW_TYPE_SONA = 1; // VLAN
+
+    public static final int MILLISECONDS = 1000;
+    private static final int REFRESH_INTERVAL = 5;
+
+    private ApplicationId appId;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected FlowRuleService flowRuleService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected HostService hostService;
+
+    private Timer timer;
+    private TimerTask task;
+    private OpenstackTelemetryManager osTelemetryManager;
+
+    Set<FlowInfo> gFlowInfoSet = new HashSet<>();
+    private int loopCount = 0;
+
+    private static final int SOURCE_ID = 1;
+    private static final int TARGET_ID = 2;
+    private static final int PRIORITY_BASE = 10000;
+    private static final int METRIC_PRIORITY_SOURCE  = SOURCE_ID * PRIORITY_BASE;
+    private static final int METRIC_PRIORITY_TARGET  = TARGET_ID * PRIORITY_BASE;
+
+    public static final int    FLOW_TABLE_VM_SOURCE  =  0; // STAT_INBOUND_TABLE
+    public static final int    FLOW_TABLE_DHCP_ARP   =  1; // DHCP_ARP_TABLE
+    public static final int    FLOW_TABLE_VM_TARGET  = 49; // STAT_OUTBOUND_TABLE
+    public static final int    FLOW_TABLE_FORWARDING = 50; // FORWARDING_TABLE
+
+    static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
+
+    public StatsFlowRuleManager() {
+        log.info("Object is instantiated");
+        this.timer = new Timer("openstack-telemetry-sender");
+    }
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
+        log.info("Application is activated");
+        osTelemetryManager = new OpenstackTelemetryManager();
+        this.start();
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        log.info("Application is deactivated");
+    }
+
+    private class InternalTimerTask extends TimerTask {
+        @Override
+        public void run() {
+            log.debug("Timger Task Thread Starts ({})", loopCount++);
+            try {
+                Set<FlowInfo> flowInfoSet = getFlowRule();
+                for (FlowInfo flowInfo: flowInfoSet) {
+                    log.info("Publish FlowInfo to NMS: {}", flowInfo.toString());
+                    osTelemetryManager.publish(flowInfo);
+                }
+            } catch (Exception ex) {
+                log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+            }
+        }
+    }
+
+    @Override
+    public void start() {
+        log.info("Start publishing thread");
+        Set<FlowInfo>  gFlowInfoSet = getFlowRule();
+        task = new InternalTimerTask();
+        timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
+                                  MILLISECONDS * REFRESH_INTERVAL);
+    }
+
+    @Override
+    public void stop() {
+        log.info("Stop data publishing thread");
+        task.cancel();
+        task = null;
+    }
+
+    public void connectTables(
+                            DeviceId deviceId,
+                            int fromTable,
+                            int toTable,
+                            StatsFlowRule statsFlowRule,
+                            int rulePriority,
+                            boolean installFlag) {
+        try {
+            log.debug("Table Transition: {} -> {}", fromTable, toTable);
+            int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
+            int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
+            int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
+
+            TrafficSelector.Builder selector;
+            if (statsFlowRule == null) {
+                selector = DefaultTrafficSelector.builder();
+            } else {
+                selector = DefaultTrafficSelector.builder()
+                                       .matchEthType(Ethernet.TYPE_IPV4)
+                                       .matchIPSrc(statsFlowRule.srcIpPrefix())
+                                       .matchIPDst(statsFlowRule.dstIpPrefix());
+                if (statsFlowRule.ipProtocol() == IPv4.PROTOCOL_TCP) {
+                    selector = selector.matchIPProtocol(statsFlowRule.ipProtocol())
+                                       .matchTcpSrc(statsFlowRule.srcTpPort())
+                                       .matchTcpDst(statsFlowRule.dstTpPort());
+                } else if (statsFlowRule.ipProtocol() == IPv4.PROTOCOL_UDP) {
+                    selector = selector.matchIPProtocol(statsFlowRule.ipProtocol())
+                                       .matchUdpSrc(statsFlowRule.srcTpPort())
+                                       .matchUdpDst(statsFlowRule.dstTpPort());
+                }
+            }
+
+            TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
+            treatment.transition(toTable);
+            FlowRule flowRule = DefaultFlowRule.builder()
+                                               .forDevice(deviceId)
+                                               .withSelector(selector.build())
+                                               .withTreatment(treatment.build())
+                                               .withPriority(prefixLength)
+                                               .fromApp(appId)
+                                               .makePermanent()
+                                               .forTable(fromTable)
+                                               .build();
+            applyRule(flowRule, installFlag);
+        } catch (Exception ex) {
+            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+        }
+    }
+
+    /**
+     * Apply FlowRule to switch.
+     *
+     * @param flowRule FlowRule
+     * @param install Flag to install or not
+     */
+    private void applyRule(FlowRule flowRule, boolean install) {
+        log.debug("Apply flow rule to bridge device");
+        FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
+        flowOpsBuilder = install ? flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
+
+        flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
+            @Override
+            public void onSuccess(FlowRuleOperations ops) {
+                log.debug("Provisioned vni or forwarding table: \n {}", ops.toString());
+            }
+
+            @Override
+            public void onError(FlowRuleOperations ops) {
+                log.debug("Failed to provision vni or forwarding table: \n {}", ops.toString());
+            }
+        }));
+    }
+
+    /**
+     * Craete a flow rule.
+     *
+     * @param flowRule  flow rule for Openstack VMs
+     */
+    @Override
+    public void createFlowRule(StatsFlowRule flowRule) {
+        try {
+            log.debug("Create Flow Rule. SrcIp:{} DstIp:{}",
+                      flowRule.srcIpPrefix().toString(),
+                      flowRule.dstIpPrefix().toString());
+
+            // To make a inversed flow rule.
+            DefaultStatsFlowRule.Builder inverseFlowRuleBuilder
+                                        = DefaultStatsFlowRule
+                                            .builder()
+                                            .srcIpPrefix(flowRule.dstIpPrefix())
+                                            .dstIpPrefix(flowRule.srcIpPrefix())
+                                            .ipProtocol(flowRule.ipProtocol())
+                                            .srcTpPort(flowRule.dstTpPort())
+                                            .dstTpPort(flowRule.srcTpPort());
+            StatsFlowRule inverseFlowRule = inverseFlowRuleBuilder.build();
+            DeviceService deviceService = DefaultServiceDirectory.getService(DeviceService.class);
+            Iterable<Device> devices = deviceService.getDevices();
+            for (Device d : devices) {
+                log.debug("Device: {}", d.toString());
+                if (d.type() == Device.Type.CONTROLLER) {
+                    log.info("Don't create flow rule for 'DeviceType=CONTROLLER' ({})",
+                             d.id().toString());
+                    continue;
+                }
+                connectTables(d.id(), FLOW_TABLE_VM_SOURCE, FLOW_TABLE_DHCP_ARP,
+                              flowRule, METRIC_PRIORITY_SOURCE, true);
+                connectTables(d.id(), FLOW_TABLE_VM_TARGET, FLOW_TABLE_FORWARDING,
+                              inverseFlowRule, METRIC_PRIORITY_TARGET, true);
+            }
+        } catch (Exception ex) {
+            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+        }
+    }
+
+    /**
+     * Get FlowRule.
+     *
+     * @param flowRule Flow rule for a VM
+     * @return Set of FlowInfo
+     */
+    public Set<FlowInfo> getFlowRule(StatsFlowRule flowRule) {
+        Set<FlowInfo> flowInfoSet = new HashSet<>();
+        log.info("Get flow rule: {}", flowRule.toString());
+        // TODO  Make a implementation here.
+        return flowInfoSet;
+    }
+
+    /**
+     * Delete FlowRule for StatsInfo.
+     *
+     * @param flowRule  Flow rule for Openstack VM
+     */
+    @Override
+    public void deleteFlowRule(StatsFlowRule flowRule) {
+        log.debug("Delete Flow Rule: {}", flowRule.toString());
+        flowRuleService = DefaultServiceDirectory.getService(FlowRuleService.class);
+        flowRuleService.removeFlowRulesById(appId);
+        // TODO  Write a implementation code here
+
+        try {
+            log.debug("Delete Flow Rule. SrcIp:{} DstIp:{}",
+                      flowRule.srcIpPrefix().toString(),
+                      flowRule.dstIpPrefix().toString());
+
+            // To make a inversed flow rule.
+            DefaultStatsFlowRule.Builder inverseFlowRuleBuilder
+                                        = DefaultStatsFlowRule
+                                            .builder()
+                                            .srcIpPrefix(flowRule.dstIpPrefix())
+                                            .dstIpPrefix(flowRule.srcIpPrefix())
+                                            .ipProtocol(flowRule.ipProtocol())
+                                            .srcTpPort(flowRule.dstTpPort())
+                                            .dstTpPort(flowRule.srcTpPort());
+            StatsFlowRule inverseFlowRule = inverseFlowRuleBuilder.build();
+            DeviceService deviceService = DefaultServiceDirectory.getService(DeviceService.class);
+            Iterable<Device> devices = deviceService.getDevices();
+            for (Device d : devices) {
+                log.debug("Device: {}", d.toString());
+                if (d.type() == Device.Type.CONTROLLER) {
+                    log.info("Don't care for 'DeviceType=CONTROLLER' ({})",
+                             d.id().toString());
+                    continue;
+                }
+                connectTables(d.id(), FLOW_TABLE_VM_SOURCE, FLOW_TABLE_DHCP_ARP,
+                              flowRule, METRIC_PRIORITY_SOURCE, false);
+                connectTables(d.id(), FLOW_TABLE_VM_TARGET, FLOW_TABLE_FORWARDING,
+                              inverseFlowRule, METRIC_PRIORITY_TARGET, false);
+            }
+        } catch (Exception ex) {
+            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+        }
+    }
+
+    /**
+     * Get a list of the FlowRule Store.
+     *
+     * @return list of Flow Rule
+     */
+    public Set<FlowInfo> getFlowRule() {
+        log.debug("Get Flow Information List");
+        Set<FlowInfo> flowInfoSet = new HashSet<>();
+        try {
+            flowRuleService = DefaultServiceDirectory.getService(FlowRuleService.class);
+            Iterable<FlowEntry> flowEntries = flowRuleService.getFlowEntriesById(appId);
+
+            for (FlowEntry entry : flowEntries) {
+                FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
+                IPCriterion srcIpCriterion =
+                        (IPCriterion) entry.selector().getCriterion(Criterion.Type.IPV4_SRC);
+                IPCriterion dstIpCriterion =
+                        (IPCriterion) entry.selector().getCriterion(Criterion.Type.IPV4_DST);
+                IPProtocolCriterion ipProtocolCriterion =
+                        (IPProtocolCriterion) entry.selector().getCriterion(Criterion.Type.IP_PROTO);
+
+                log.debug("[FlowInfo]  TableID:{}  SRC_IP:{}  DST_IP:{}  Pkt:{}  Byte:{}",
+                         ((IndexTableId) entry.table()).id(),
+                         srcIpCriterion.ip().toString(), dstIpCriterion.ip().toString(),
+                         entry.packets(), entry.bytes());
+
+                fBuilder.withFlowType(FLOW_TYPE_SONA).withSrcIp(srcIpCriterion.ip())
+                        .withDstIp(dstIpCriterion.ip())
+                        .withProtocol((byte) ipProtocolCriterion.protocol());
+
+                if (ipProtocolCriterion.protocol() == IPv4.PROTOCOL_TCP) {
+                    TcpPortCriterion tcpSrcCriterion =
+                            (TcpPortCriterion) entry.selector().getCriterion(Criterion.Type.TCP_SRC);
+                    TcpPortCriterion tcpDstCriterion =
+                            (TcpPortCriterion) entry.selector().getCriterion(Criterion.Type.TCP_DST);
+                    log.debug("TCP SRC Port: {}   Dst Port: {}",
+                                tcpSrcCriterion.tcpPort().toInt(), tcpDstCriterion.tcpPort().toInt());
+                    fBuilder.withSrcPort(tcpSrcCriterion.tcpPort());
+                    fBuilder.withDstPort(tcpDstCriterion.tcpPort());
+                } else if (ipProtocolCriterion.protocol() == IPv4.PROTOCOL_UDP) {
+                    UdpPortCriterion udpSrcCriterion =
+                            (UdpPortCriterion) entry.selector().getCriterion(Criterion.Type.UDP_SRC);
+                    UdpPortCriterion udpDstCriterion =
+                            (UdpPortCriterion) entry.selector().getCriterion(Criterion.Type.UDP_DST);
+                    log.debug("UDP SRC Port: {}   Dst Port: {}",
+                                udpSrcCriterion.udpPort().toInt(), udpDstCriterion.udpPort().toInt());
+                    fBuilder.withSrcPort(udpSrcCriterion.udpPort());
+                    fBuilder.withDstPort(udpDstCriterion.udpPort());
+                } else {
+                    log.debug("Other protocol: {}", ipProtocolCriterion.protocol());
+                }
+
+                fBuilder.withSrcMac(getMacAddress(srcIpCriterion.ip().address()))
+                        .withDstMac(getMacAddress(dstIpCriterion.ip().address()))
+                        .withInputInterfaceId(getInterfaceId(srcIpCriterion.ip().address()))
+                        .withOutputInterfaceId(getInterfaceId(dstIpCriterion.ip().address()))
+                        .withVlanId(getVlanId(srcIpCriterion.ip().address()))
+                        .withDeviceId(entry.deviceId());
+
+                StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
+                sBuilder.withStartupTime(0)
+                        .withCurrAccPkts((int) entry.packets()).withCurrAccBytes(entry.bytes())
+                        .withErrorPkts((short) 0).withDropPkts((short) 0)
+                        .withLstPktOffset(REFRESH_INTERVAL * MILLISECONDS);
+
+                fBuilder.withStatsInfo(sBuilder.build());
+
+                FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
+                flowInfoSet.add(flowInfo);
+                log.debug("FlowInfo: \n{}", flowInfo.toString());
+            }
+        } catch (Exception ex) {
+            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+        }
+        return flowInfoSet;
+    }
+
+    /**
+     * Merge old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
+     *
+     * @param flowInfo current FlowInfo object
+     * @param fBuilder Builder for FlowInfo
+     * @param sBuilder Builder for StatsInfo
+     * @return Merged FlowInfo object
+     */
+    private FlowInfo mergeFlowInfo(FlowInfo flowInfo,
+                                   FlowInfo.Builder fBuilder,
+                                   StatsInfo.Builder sBuilder) {
+        try {
+            log.debug("Current FlowInfo:\n{}", flowInfo.toString());
+            for (FlowInfo gFlowInfo: gFlowInfoSet) {
+                log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
+                if (gFlowInfo.deviceId().equals(flowInfo.deviceId()) &&
+                        gFlowInfo.srcIp().equals(flowInfo.srcIp()) &&
+                        gFlowInfo.dstIp().equals(flowInfo.dstIp()) &&
+                        gFlowInfo.srcPort().equals(flowInfo.srcPort()) &&
+                        gFlowInfo.dstPort().equals(flowInfo.dstPort()) &&
+                        (gFlowInfo.protocol() == flowInfo.protocol())
+                        ) {
+                    // Get old StatsInfo object and merge the value to current object.
+                    StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
+                    sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
+                    sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
+                    FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
+                    gFlowInfoSet.remove(gFlowInfo);
+                    gFlowInfoSet.add(newFlowInfo);
+                    log.info("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
+                    return newFlowInfo;
+                }
+            }
+            // No such record, then build the FlowInfo object and return this object.
+            log.info("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
+            FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
+            gFlowInfoSet.add(newFlowInfo);
+            return newFlowInfo;
+        } catch (Exception ex) {
+            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+        }
+        log.debug("Add this FlowInfo {}", flowInfo.toString());
+        gFlowInfoSet.add(flowInfo);
+        return flowInfo;
+    }
+
+    /**
+     * Get VLAN ID with respect to IP Address.
+     *
+     * @param ipAddress IP Address of host
+     * @return VLAN ID
+     */
+    public VlanId getVlanId(IpAddress ipAddress) {
+        try {
+            if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+                Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+                return host.vlan();
+            }
+        } catch (Exception ex) {
+            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+        }
+        return VlanId.vlanId();
+    }
+
+    /**
+     * Get Interface ID of Switch which is connected to a host.
+     *
+     * @param ipAddress IP Address of host
+     * @return Interface ID of Switch
+     */
+    public int getInterfaceId(IpAddress ipAddress) {
+        try {
+            if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+                Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+                return (int) host.location().port().toLong();
+            }
+        } catch (Exception ex) {
+            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+        }
+        return -1;
+    }
+
+    /**
+     * Get MAC Address of host.
+     *
+     * @param ipAddress IP Address of host
+     * @return MAC Address of host
+     */
+    public MacAddress getMacAddress(IpAddress ipAddress) {
+        try {
+            if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+                Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+                return host.mac();
+            }
+        } catch (Exception ex) {
+            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+        }
+        return NO_HOST_MAC;
+    }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java
index 30737ac..5b3ec7e 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryCodecRegister.java
@@ -22,8 +22,10 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onosproject.codec.CodecService;
 import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.StatsFlowRule;
 import org.onosproject.openstacktelemetry.api.StatsInfo;
 import org.onosproject.openstacktelemetry.codec.FlowInfoJsonCodec;
+import org.onosproject.openstacktelemetry.codec.StatsFlowRuleJsonCodec;
 import org.onosproject.openstacktelemetry.codec.StatsInfoJsonCodec;
 
 import static org.slf4j.LoggerFactory.getLogger;
@@ -43,6 +45,7 @@
     protected void activate() {
         codecService.registerCodec(StatsInfo.class, new StatsInfoJsonCodec());
         codecService.registerCodec(FlowInfo.class, new FlowInfoJsonCodec());
+        codecService.registerCodec(StatsFlowRule.class, new StatsFlowRuleJsonCodec());
 
         log.info("Started");
     }
@@ -51,6 +54,7 @@
     protected void deactivate() {
         codecService.unregisterCodec(StatsInfo.class);
         codecService.unregisterCodec(FlowInfo.class);
+        codecService.unregisterCodec(StatsFlowRule.class);
 
         log.info("Stopped");
     }
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
index f4af16d..9b5209b 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
@@ -15,14 +15,37 @@
  */
 package org.onosproject.openstacktelemetry.web;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.onosproject.codec.JsonCodec;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.StatsFlowRule;
+import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
+import org.onosproject.openstacktelemetry.codec.FlowInfoJsonCodec;
 import org.onosproject.rest.AbstractWebResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import javax.ws.rs.Consumes;
 import javax.ws.rs.GET;
+import javax.ws.rs.POST;
 import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
+import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.UriBuilder;
+import javax.ws.rs.core.UriInfo;
+import java.io.InputStream;
+import java.util.Set;
+
+import static com.fasterxml.jackson.databind.SerializationFeature.INDENT_OUTPUT;
+import static javax.ws.rs.core.Response.created;
+import static org.onlab.util.Tools.readTreeFromStream;
 
 /**
  * Handles REST API call of openstack telemetry.
@@ -31,8 +54,140 @@
 @Path("telemetry")
 public class OpenstackTelemetryWebResource extends AbstractWebResource {
 
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
     private final ObjectNode root = mapper().createObjectNode();
 
+    private static final String JSON_NODE_FLOW_RULE = "rules";
+    private static final String FLOW_RULE_ID = "STATS_FLOW_RULE_ID";
+
+    private final StatsFlowRuleAdminService statsFlowRuleService = get(StatsFlowRuleAdminService.class);
+
+    @Context
+    private UriInfo uriInfo;
+
+    /**
+     * Creates a flow rule for metric.
+     *
+     * @param input openstack flow rule JSON input stream
+     * @return 201 CREATED if the JSON is correct,
+     *         400 BAD_REQUEST if the JSON is malformed.
+     */
+    @POST
+    @Consumes(MediaType.APPLICATION_JSON)
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response createBulkFlowRule(InputStream input) {
+        log.info("CREATE BULK FLOW RULE: {}", input.toString());
+
+        readNodeConfiguration(input).forEach(flowRule -> {
+                log.debug("FlowRule: {}", flowRule.toString());
+                statsFlowRuleService.createFlowRule(flowRule);
+            });
+
+        UriBuilder locationBuilder = uriInfo.getBaseUriBuilder()
+                                            .path(JSON_NODE_FLOW_RULE)
+                                            .path(FLOW_RULE_ID);
+
+        return created(locationBuilder.build()).build();
+    }
+
+    /**
+     * Delete flow rules.
+     *
+     * @param input openstack flow rule JSON input stream
+     * @return 200 OK if processing is correct.
+     */
+    public Response deleteBulkFlowRule(InputStream input) {
+        log.info("DELETE BULK FLOW RULE: {}", input.toString());
+
+        readNodeConfiguration(input).forEach(flowRule -> {
+            log.debug("FlowRule: {}", flowRule.toString());
+            statsFlowRuleService.deleteFlowRule(flowRule);
+        });
+
+        return ok(root).build();
+    }
+
+
+    /**
+     * Get flow rules which is installed on ONOS.
+     *
+     * @return 200 OK
+     */
+    public Response readBulkFlowRule() {
+        log.info("READ BULK FLOW RULE");
+
+        return ok(root).build();
+    }
+
+
+    /**
+     * Get flow information list.
+     *
+     * @return Flow information list
+     */
+    @GET
+    @Path("list")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getFlowInfoBulk() {
+        log.info("GET BULK FLOW RULE");
+
+        Set<FlowInfo> flowInfoSet;
+        flowInfoSet = statsFlowRuleService.getFlowRule();
+
+        log.info("\n\n======================================================\n" +
+                 "FlowInfo Set: \n{}" +
+                 "\n\n======================================================\n",
+                 flowInfoSet);
+
+        JsonCodec<FlowInfo> flowInfoCodec = new FlowInfoJsonCodec();
+
+        ObjectNode nodeJson;
+        int idx = 0;
+        for (FlowInfo flowInfo: flowInfoSet) {
+            nodeJson = flowInfoCodec.encode(flowInfo, this);
+            root.put("FlowInfo" + String.valueOf(idx++), nodeJson.toString());
+        }
+        return ok(root).build();
+    }
+
+    @GET
+    @Path("list/{src_ip_prefix}/{dst_ip_prefix}")
+    @Produces(MediaType.APPLICATION_JSON)
+    public Response getFlowRule(
+            @PathParam("src_ip_prefix") String srcIpPrefix,
+            @PathParam("dst_ip_prefix") String dstIpPrefix) {
+        return ok(root).build();
+    }
+
+
+    private Set<StatsFlowRule> readNodeConfiguration(InputStream input) {
+        log.info("Input JSON Data: \n\t\t{}", input.toString());
+        Set<StatsFlowRule> flowRuleSet = Sets.newHashSet();
+        try {
+            JsonNode jsonTree = readTreeFromStream(mapper().enable(INDENT_OUTPUT), input);
+            ArrayNode nodes = (ArrayNode) jsonTree.path(JSON_NODE_FLOW_RULE);
+            nodes.forEach(node -> {
+                try {
+                    ObjectNode objectNode = node.deepCopy();
+                    log.debug("ObjectNode: {}", objectNode.toString());
+                    StatsFlowRule statsFlowRule = codec(StatsFlowRule.class)
+                                                        .decode(objectNode, this);
+                    log.debug("StatsFlowRule: {}", statsFlowRule.toString());
+                    flowRuleSet.add(statsFlowRule);
+                } catch (Exception ex) {
+                    log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+                    throw new IllegalArgumentException();
+                }
+            });
+        } catch (Exception ex) {
+            log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+            throw new IllegalArgumentException(ex);
+        }
+
+        return flowRuleSet;
+    }
+
     /**
      * OpenstackTelemetryImpl method.
      *