Netflow store and cli implementation

Implemented netflow version 9 of netflow collector store and cli.

Reference :  https://datatracker.ietf.org/doc/html/rfc3954

Change-Id: I0b2ec932c28bc3b36042c7a725a41956cc859fcd
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowRecord.java
similarity index 95%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowRecord.java
index 8dd434c..7f29f43 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowRecord.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
 
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
@@ -25,12 +25,6 @@
 
 import com.google.common.base.MoreObjects;
 
-import org.onosproject.netflow.DataRecord;
-import org.onosproject.netflow.TemplateId;
-import org.onosproject.netflow.Flow;
-import org.onosproject.netflow.FlowTemplateField;
-import org.onosproject.netflow.DataDeserializer;
-
 import static com.google.common.base.Preconditions.checkNotNull;
 
 
@@ -118,7 +112,7 @@
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
                 .add("templateId", templateId)
-                .add("flows", flows)
+                .add("\n\nflows", flows)
                 .toString();
     }
 
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowSet.java
similarity index 97%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowSet.java
index 7183915..b569387 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowSet.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
 
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
@@ -161,6 +161,7 @@
                 .add("flowSetId", flowSetId)
                 .add("length", length)
                 .add("data", data)
+                .add("dataFlow", dataFlow)
                 .toString();
     }
 
@@ -206,7 +207,7 @@
             byte[] dataRecord = new byte[template.getValueLength()];
             bb.get(dataRecord);
             this.setDataFlow(DataFlowRecord.deserializer().deserialize(
-                    data, 0, template.getValueLength(), template));
+                    dataRecord, 0, template.getValueLength(), template));
         }
 
     }
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataTemplateRecord.java
similarity index 88%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/DataTemplateRecord.java
index dc98531..4230a10 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataTemplateRecord.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
 
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
@@ -28,8 +28,6 @@
 import com.google.common.base.MoreObjects;
 
 import org.onlab.packet.Deserializer;
-import org.onosproject.netflow.TemplateId;
-import org.onosproject.netflow.FlowTemplateField;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
@@ -117,6 +115,37 @@
                 .toString();
     }
 
+
+    @Override
+    public int hashCode() {
+        int hash = 3;
+        hash = 79 * hash + Objects.hashCode(this.templateId);
+        hash = 79 * hash + this.filedCount;
+        hash = 79 * hash + Objects.hashCode(this.fields);
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final DataTemplateRecord other = (DataTemplateRecord) obj;
+        if (this.filedCount != other.filedCount) {
+            return false;
+        }
+        if (!Objects.equals(this.templateId, other.templateId)) {
+            return false;
+        }
+        return Objects.equals(this.fields, other.fields);
+    }
+
     /**
      * Data deserializer function for data template record.
      *
diff --git a/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowField.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowField.java
index 207903c..fcaa120 100644
--- a/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowField.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowField.java
@@ -18,18 +18,24 @@
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Map;
+import java.util.Set;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.BiFunction;
+import java.util.stream.Collectors;
 
 import static org.onosproject.netflow.NetflowUtils.VAR_INT_LONG;
 import static org.onosproject.netflow.NetflowUtils.VAR_BYTE;
 import static org.onosproject.netflow.NetflowUtils.NULL;
 import static org.onosproject.netflow.NetflowUtils.VAR_SHORT;
-import static org.onosproject.netflow.NetflowUtils.VAR_IP_ADDRESS;
+import static org.onosproject.netflow.NetflowUtils.VAR_IPV4_ADDRESS;
+import static org.onosproject.netflow.NetflowUtils.VAR_IPV6_ADDRESS;
 import static org.onosproject.netflow.NetflowUtils.VAR_SHORT_INT;
 import static org.onosproject.netflow.NetflowUtils.VAR_INT;
 import static org.onosproject.netflow.NetflowUtils.VAR_MAC;
+import static org.onosproject.netflow.NetflowUtils.VAR_UNSIGNED_BYTE;
+import static org.onosproject.netflow.NetflowUtils.VAR_UNSIGNED_INT;
+import static org.onosproject.netflow.NetflowUtils.VAR_UNSIGNED_SHORT;;
 
 /**
  * The flow fields are a selection of Packet Header
@@ -44,34 +50,34 @@
      * https://www.ietf.org/rfc/rfc3954.txt
      * Section :- Field Type Definitions.
      */
-    IN_BYTES(1, VAR_INT_LONG),
-    IN_PKTS(2, VAR_INT_LONG),
+    IN_BYTES(1, VAR_UNSIGNED_INT),
+    IN_PKTS(2, VAR_UNSIGNED_INT),
     FLOWS(3, VAR_INT_LONG),
     PROTOCOL(4, VAR_BYTE),
     SRC_TOS(5, VAR_BYTE),
     TCP_FLAGS(6, VAR_BYTE),
-    L4_SRC_PORT(7, VAR_SHORT),
-    IPV4_SRC_ADDR(8, VAR_IP_ADDRESS),
+    L4_SRC_PORT(7, VAR_UNSIGNED_SHORT),
+    IPV4_SRC_ADDR(8, VAR_IPV4_ADDRESS),
     SRC_MASK(9, VAR_BYTE),
-    INPUT_SNMP(10, VAR_SHORT_INT),
-    L4_DST_PORT(11, VAR_SHORT),
-    IPV4_DST_ADDR(12, VAR_IP_ADDRESS),
+    INPUT_SNMP(10, VAR_UNSIGNED_SHORT),
+    L4_DST_PORT(11, VAR_UNSIGNED_SHORT),
+    IPV4_DST_ADDR(12, VAR_IPV4_ADDRESS),
     DST_MASK(13, VAR_BYTE),
-    OUTPUT_SNMP(14, VAR_SHORT_INT),
-    IPV4_NEXT_HOP(15, VAR_IP_ADDRESS),
+    OUTPUT_SNMP(14, VAR_UNSIGNED_SHORT),
+    IPV4_NEXT_HOP(15, VAR_IPV4_ADDRESS),
     SRC_AS(16, VAR_SHORT_INT),
     DST_AST(17, VAR_SHORT_INT),
-    BGP_IPV4_NEXT_HOP(18, VAR_IP_ADDRESS),
+    BGP_IPV4_NEXT_HOP(18, VAR_IPV4_ADDRESS),
     MUL_DST_PKTS(19, VAR_INT_LONG),
     MUL_DST_BYTES(20, VAR_INT_LONG),
-    LAST_SWITCHED(21, VAR_INT),
-    FIRST_SWITCHED(22, VAR_INT),
+    LAST_SWITCHED(21, VAR_UNSIGNED_INT),
+    FIRST_SWITCHED(22, VAR_UNSIGNED_INT),
     OUT_BYTES(23, VAR_INT_LONG),
     OUT_PKTS(24, VAR_INT_LONG),
     MIN_PKT_LNGTH(25, NULL),
     MAX_PKT_LNGTH(26, NULL),
-    IPV6_SRC_ADDR(27, VAR_IP_ADDRESS),
-    IPV6_DST_ADDR(28, VAR_IP_ADDRESS),
+    IPV6_SRC_ADDR(27, VAR_IPV6_ADDRESS),
+    IPV6_DST_ADDR(28, VAR_IPV6_ADDRESS),
     IPV6_SRC_MASK(29, VAR_BYTE),
     IPV6_DST_MASK(30, VAR_BYTE),
     IPV6_FLOW_LABEL(31, NULL),
@@ -89,7 +95,7 @@
     IPV4_SRC_PREFIX(44, NULL),
     IPV4_DST_PREFIX(45, NULL),
     MPLS_TOP_LABEL_TYPE(46, VAR_BYTE),
-    MPLS_TOP_LABEL_IP_ADDR(47, VAR_IP_ADDRESS),
+    MPLS_TOP_LABEL_IP_ADDR(47, VAR_IPV4_ADDRESS),
     FLOW_SAMPLER_ID(48, VAR_BYTE),
     FLOW_SAMPLER_MODE(49, VAR_BYTE),
     FLOW_SAMPLER_RANDOM_INTERVAL(50, NULL),
@@ -101,10 +107,10 @@
     OUT_DST_MAC(57, VAR_MAC),
     SRC_VLAN(58, VAR_SHORT),
     DST_VLAN(59, VAR_SHORT),
-    IP_PROTOCOL_VERSION(60, VAR_BYTE),
+    IP_PROTOCOL_VERSION(60, VAR_UNSIGNED_BYTE),
     DIRECTION(61, VAR_BYTE),
-    IPV6_NEXT_HOP(62, VAR_IP_ADDRESS),
-    BPG_IPV6_NEXT_HOP(63, VAR_IP_ADDRESS),
+    IPV6_NEXT_HOP(62, VAR_IPV6_ADDRESS),
+    BPG_IPV6_NEXT_HOP(63, VAR_IPV6_ADDRESS),
     IPV6_OPTION_HEADERS(64, VAR_INT),
     MPLS_LABEL_1(70, NULL),
     MPLS_LABEL_2(71, NULL),
@@ -139,6 +145,7 @@
     LAYER2_PACKET_SECTION_SIZE(103, NULL),
     LAYER2_PACKET_SECTION_DATA(104, NULL);
 
+
     final int fieldID;
     final BiFunction<ByteBuffer, Integer, Object> parser;
     private static Map<Integer, FlowField> fields = new ConcurrentHashMap<>();
@@ -158,6 +165,12 @@
                 .map(id -> fields.get(id));
     }
 
+    public static Set<String> getAllFlowField() {
+        return fields.values().stream()
+                .map(f -> f.name())
+                .collect(Collectors.toSet());
+    }
+
     public BiFunction<ByteBuffer, Integer, Object> getParser() {
         return this.parser;
     }
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowSet.java
similarity index 98%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/FlowSet.java
index 1b292a7..e6c5a50 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowSet.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
 
 import java.util.Arrays;
 import java.util.Map;
diff --git a/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowController.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowController.java
new file mode 100644
index 0000000..91b3b62
--- /dev/null
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowController.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Map;
+
+/**
+ * It control and manage the netflow traffic.
+ * it is Collecting, Storing and analyzing NetFlow data
+ * it can help to understand which applications, and protocols
+ * may be consuming the most network bandwidth by tracking processes,
+ * protocols, times of day, and traffic routing.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public interface NetflowController {
+
+    /**
+     * Add template flowset to controller.
+     * Add new template to controller if it not exist,
+     * otherwise it will replace the existing template.
+     *
+     * @param templateRecord template flowset record.
+     */
+    void addTemplateFlowSet(DataTemplateRecord templateRecord);
+
+    /**
+     * Update data flowset to controller.
+     * it will update new data flowset to store.
+     *
+     * @param dataFlowRecord data flowset record.
+     */
+    void updateDataFlowSet(DataFlowRecord dataFlowRecord);
+
+    /**
+     * Get template flowset from controller.
+     * it will fetch current template which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return optional of data template record, optional will be empty if template not found.
+     */
+    Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId);
+
+    /**
+     * Get data flowset from controller.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return list of data flow record, list will be empty if template id not matched.
+     */
+    List<DataFlowRecord> getDataFlowSet(TemplateId templateId);
+
+    /**
+     * Get all template flowsets from controller.
+     * it will fetch all templates from store.
+     *
+     * @return set of data template record, set will be empty if templates not found in the store.
+     */
+    Set<DataTemplateRecord> getTemplateFlowSet();
+
+    /**
+     * Get data flowset from controller.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @return mapping from a template id to data flow record.
+     */
+    Map<TemplateId, List<DataFlowRecord>> getDataFlowSet();
+}
diff --git a/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowStore.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowStore.java
new file mode 100644
index 0000000..5dc5719
--- /dev/null
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowStore.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Manages inventory of Netflow template and data flowset to distribute
+ * information.
+ */
+public interface NetflowStore {
+
+    /**
+     * Get template flowset from store.
+     * it will fetch current template which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return optional of data template record, optional will be empty if template not found.
+     */
+    Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId);
+
+    /**
+     * Get set of template flowsets from store.
+     * it will fetch all template flowsets from store.
+     *
+     * @return set of data template record, set will be empty if templates not found in the store.
+     */
+    Set<DataTemplateRecord> getTemplateFlowSet();
+
+    /**
+     * Get optional template flowset from store.
+     * it will fetch current optional template which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return optional of optional template flowset, optional will be empty if template not found.
+     */
+    Optional<OptionalTemplateFlowSet> getOptionalTemplateFlowSet(TemplateId templateId);
+
+    /**
+     * Get set of optional template flowsets from store.
+     * it will fetch all optional template flowsets from store.
+     *
+     * @return set of optional template flowsets, set will be empty if templates not found in the store.
+     */
+    Set<OptionalTemplateFlowSet> getOptionalTemplateFlowSet();
+
+    /**
+     * Get data flowset from store.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return list of data flow record, list will be empty if template id not matched.
+     */
+    List<DataFlowRecord> getDataFlowSet(TemplateId templateId);
+
+    /**
+     * Get data flowset from store.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @return mapping from a template id to data flow record.
+     */
+    Map<TemplateId, List<DataFlowRecord>> getDataFlowSet();
+
+    /**
+     * Update template flowset to the store.
+     * Add new template to store if it not exist,
+     * otherwise it will replace the existing template.
+     *
+     * @param templateRecord template flowset record.
+     */
+    void updateTemplateFlowSet(DataTemplateRecord templateRecord);
+
+    /**
+     * Update optional template flowset to the store.
+     * Add new optional template to store if it not exist,
+     * otherwise it will replace the existing optional template.
+     *
+     * @param optionalTemplateFlowSet optional template flowset.
+     */
+    void updateOptionalTemplateFlowSet(OptionalTemplateFlowSet optionalTemplateFlowSet);
+
+    /**
+     * Add data flow record to the store.
+     * Add new data flow record to store
+     *
+     * @param dataFlowRecord data flow record.
+     */
+    void addDataFlowSet(DataFlowRecord dataFlowRecord);
+
+    /**
+     * Remove template flowset from store.
+     * it will remove template flowset which is matching to the given template id from store.
+     *
+     * @param templateId template id.
+     */
+    void clearTemplateFlowSet(TemplateId templateId);
+
+    /**
+     * Remove all template flowset from store.
+     * it will remove all template flowsets from store.
+     */
+    void clearTemplateFlowSet();
+
+    /**
+     * Remove optional template flowset from store.
+     * it will remove optional template which is matching to the given template id from store.
+     *
+     * @param templateId template id.
+     */
+    void clearOptionalTemplateFlowSet(TemplateId templateId);
+
+    /**
+     * Remove all optional template flowset from store.
+     * it will remove all optional template flowsets from store.
+     */
+    void clearOptionalTemplateFlowSet();
+
+    /**
+     * Remove data flowset from store.
+     * it will remove dataflowset which is matching to the given template id from store.
+     *
+     * @param templateId template id.
+     */
+    void clearDataFlowSet(TemplateId templateId);
+
+    /**
+     * Remove all data flowset from store.
+     * it will remove all data flowsets from store.
+     */
+    void clearDataFlowSet();
+
+    /**
+     * Remove template, optional template and data flowsets from store.
+     * it will remove all flowsets from store.
+     */
+    void clearAllFlowSet();
+
+}
\ No newline at end of file
diff --git a/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowUtils.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowUtils.java
index 7e75dc7..ebadd3d 100644
--- a/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowUtils.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowUtils.java
@@ -15,10 +15,9 @@
  */
 package org.onosproject.netflow;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.function.BiFunction;
+import org.onlab.packet.IpAddress;
 
 
 /**
@@ -32,13 +31,38 @@
         return bb.get();
     };
 
+    public static final BiFunction<ByteBuffer, Integer, Object> VAR_UNSIGNED_BYTE = (bb, len) -> {
+        return getUnsignedByte(bb);
+    };
+
     public static final BiFunction<ByteBuffer, Integer, Object> VAR_SHORT = (bb, len) -> {
         return bb.getShort();
     };
 
+    public static final BiFunction<ByteBuffer, Integer, Object> VAR_UNSIGNED_SHORT = (bb, len) -> {
+        return asUnsignedShort(bb);
+    };
+
+    public static short getUnsignedByte(ByteBuffer bb) {
+        return ((short) (bb.get() & 0xFF));
+    }
+
+    public static int asUnsignedShort(ByteBuffer bb) {
+        return bb.getShort() & 0xFFFF;
+    }
+
+    public static long getUnsignedInt(ByteBuffer bb) {
+        return ((long) bb.getInt() & 0xFFFFFFFFL);
+    }
+
     public static final BiFunction<ByteBuffer, Integer, Object> VAR_INT = (bb, len) -> {
         return bb.getInt();
     };
+
+    public static final BiFunction<ByteBuffer, Integer, Object> VAR_UNSIGNED_INT = (bb, len) -> {
+        return getUnsignedInt(bb);
+    };
+
     public static final BiFunction<ByteBuffer, Integer, Object> VAR_FLOAT = (bb, len) -> {
         return bb.getFloat();
     };
@@ -52,6 +76,12 @@
         return null;
     };
 
+    public static final BiFunction<ByteBuffer, Integer, byte[]> BYTE_ARRAY = (bb, len) -> {
+        byte[] bytes = new byte[len];
+        bb.get(bytes);
+        return bytes;
+    };
+
     public static final BiFunction<ByteBuffer, Integer, Object> VAR_INT_LONG = (bb, len) -> {
         if (len == 4) {
             return bb.getInt();
@@ -66,8 +96,14 @@
         return bb.getInt();
     };
 
-    public static final BiFunction<ByteBuffer, Integer, Object> VAR_IP_ADDRESS = (bb, len) -> {
-        return toInetAddress(bb, len);
+    public static final BiFunction<ByteBuffer, Integer, Object> VAR_IPV4_ADDRESS = (bb, len) -> {
+        byte[] value = getByteArray(bb, len);
+        return IpAddress.valueOf(IpAddress.Version.INET, value).toInetAddress().getHostAddress();
+    };
+
+    public static final BiFunction<ByteBuffer, Integer, Object> VAR_IPV6_ADDRESS = (bb, len) -> {
+        byte[] value = getByteArray(bb, len);
+        return IpAddress.valueOf(IpAddress.Version.INET6, value).toInetAddress().getHostAddress();
     };
 
     public static final BiFunction<ByteBuffer, Integer, Object> VAR_MAC = (bb, len) -> {
@@ -77,18 +113,10 @@
         return null;
     };
 
-    public static InetAddress toInetAddress(ByteBuffer bb, int length) {
-        byte[] address = new byte[length];
-        bb.get(address);
-
-        InetAddress ipAddress = null;
-        try {
-            ipAddress = InetAddress.getByAddress(address);
-        } catch (UnknownHostException e) {
-            throw new IllegalArgumentException("Invalid host buffer");
-        }
-
-        return ipAddress;
+    private static byte[] getByteArray(ByteBuffer bb, int length) {
+        byte[] bytes = new byte[length];
+        bb.get(bytes);
+        return bytes;
     }
 
 }
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/OptionalTemplateFlowSet.java
similarity index 87%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/OptionalTemplateFlowSet.java
index 9a00bb2..068a814 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/OptionalTemplateFlowSet.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
 
 import java.util.List;
 
@@ -53,6 +53,8 @@
 
     private int length;
 
+    private TemplateId templateId;
+
     private List<DataTemplateRecord> records;
 
     /**
@@ -90,6 +92,20 @@
     }
 
     /**
+     * Returns template record's template id.
+     * Template Records is given a unique Template ID.
+     * This uniqueness is local to the Observation
+     * Domain that generated the Template ID.  Template IDs 0-255 are
+     * reserved for Template FlowSets, Options FlowSets, and other
+     * reserved FlowSets yet to be created.
+     *
+     * @return list of flowsets
+     */
+    public TemplateId getTemplateId() {
+        return templateId;
+    }
+
+    /**
      * Returns list of optional data template records.
      *
      * @return list of optional data template records
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateFlowSet.java
similarity index 95%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateFlowSet.java
index b6bc51e..6ba1d66 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateFlowSet.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
 
 import java.nio.ByteBuffer;
 import java.util.LinkedList;
@@ -26,8 +26,6 @@
 import org.onlab.packet.DeserializationException;
 import org.onlab.packet.Deserializer;
 
-import static com.google.common.base.Preconditions.checkState;
-
 /**
  * One of the essential elements in the NetFlow format is the Template
  * FlowSet.  Templates greatly enhance the flexibility of the Flow
@@ -256,21 +254,11 @@
         }
 
         /**
-         * Checks arguments for template flow set.
-         */
-        private void checkArguments() {
-            checkState(flowSetId == 0, "Invalid flow set id.");
-            checkState(length == 0, "Invalid flow set length.");
-
-        }
-
-        /**
          * Builds template flow set.
          *
          * @return template flow set.
          */
         public TemplateFlowSet build() {
-            checkArguments();
             return new TemplateFlowSet(this);
         }
 
diff --git a/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateId.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateId.java
index bf6f4a2..8ec8d07 100644
--- a/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateId.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateId.java
@@ -15,7 +15,7 @@
  */
 package org.onosproject.netflow;
 
-import com.google.common.base.MoreObjects;
+import org.onlab.util.Identifier;
 
 /**
  * Template Records is given a unique Template ID.
@@ -26,23 +26,34 @@
  * FlowSets are numbered from 256 to 65535.
  * Ref: https://www.ietf.org/rfc/rfc3954.txt
  */
-public class TemplateId {
+public class TemplateId extends Identifier<Integer> {
 
-    private int id;
-
+    /**
+     * Default constructor.
+     *
+     * @param id template id.
+     */
     public TemplateId(int id) {
-        this.id = id;
+        super(id);
     }
 
+    /**
+     * Get the value of the template id.
+     *
+     * @return the value of the template id.
+     */
     public int getId() {
-        return id;
+        return identifier;
     }
 
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(getClass())
-                .add("id", id)
-                .toString();
+    /**
+     * Get the value of the templateid object.
+     *
+     * @param id template id.
+     * @return the value of the templateid object.
+     */
+    public static TemplateId valueOf(int id) {
+        return new TemplateId(id);
     }
 
 }
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateRecord.java
similarity index 91%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateRecord.java
index db66292..744db66 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateRecord.java
@@ -13,11 +13,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
 
 import org.onlab.packet.BasePacket;
 
-import org.onosproject.netflow.FlowRecord;
 /**
  * A Template Record defines the structure and interpretation of fields.
  * in a Flow Data Record.
diff --git a/apps/netflow/app/BUILD b/apps/netflow/app/BUILD
index 747d61d..c1a4601 100644
--- a/apps/netflow/app/BUILD
+++ b/apps/netflow/app/BUILD
@@ -1,4 +1,4 @@
-COMPILE_DEPS = CORE_DEPS + [
+COMPILE_DEPS = CORE_DEPS + KRYO + CLI + [
     "//core/store/serializers:onos-core-serializers",
     "//apps/netflow/api:onos-apps-netflow-api",
     "@io_netty_netty_common//jar",
@@ -6,5 +6,6 @@
 ]
 
 osgi_jar(
+    karaf_command_packages = ["org.onosproject.netflow.cli"],
     deps = COMPILE_DEPS,
 )
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java
new file mode 100644
index 0000000..760d40b
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.api.console.Completer;
+import org.apache.karaf.shell.api.console.CommandLine;
+import org.apache.karaf.shell.api.console.Session;
+import org.apache.karaf.shell.support.completers.StringsCompleter;
+
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Flow field completer.
+ */
+@Service
+public class FlowFieldCompleter implements Completer {
+
+    @Override
+    public int complete(Session session, CommandLine commandLine, List<String> candidates) {
+
+        StringsCompleter delegate = new StringsCompleter();
+        Set<String> flowfields = FlowField.getAllFlowField();
+        SortedSet<String> strings = delegate.getStrings();
+        for (String field : flowfields) {
+            strings.add(field);
+        }
+        return delegate.complete(session, commandLine, candidates);
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java
new file mode 100644
index 0000000..7a5db0d
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Map;
+import java.util.Collection;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+
+/**
+ * Lists all netflow data flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-dataflow",
+        description = "Lists all data flowsets received from netflow exporter.")
+public class NetflowDataflowCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "templateid",
+            description = "Data flowset template id",
+            required = false, multiValued = false)
+    int templateID = 0;
+
+    @Override
+    protected void doExecute() {
+        NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+
+        if (templateID < 0) {
+            print("Invalid template ID");
+            return;
+        }
+        if (templateID > 0) {
+            List<DataFlowRecord> dataFlowSet = controller.getDataFlowSet(TemplateId.valueOf(templateID));
+            if (!dataFlowSet.isEmpty()) {
+                dataFlowSet.stream().forEach(data -> printDataflowSet(data));
+            } else {
+                print("Default template not found");
+            }
+        } else {
+            Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+            if (dataFlowSets.isEmpty()) {
+                print("Default template not found");
+            } else {
+                dataFlowSets.values().stream().flatMap(Collection::stream).forEach(data -> printDataflowSet(data));
+            }
+        }
+    }
+
+    /**
+     * Adds data flowset details in specified row wise.
+     *
+     * @param dataFlowRecord data flowset record.
+     */
+    private void printDataflowSet(DataFlowRecord dataFlowRecord) {
+        print("Template ID : %d", dataFlowRecord.getTemplateId().getId());
+        dataFlowRecord.getFlows().forEach(dataflow -> {
+            print("Field : %s,  Value : %s", dataflow.getField().name(),
+                    dataflow.getValue().toString());
+        });
+        print("\n");
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java
new file mode 100644
index 0000000..c23acdf
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.DataFlowRecord;
+
+/**
+ * Netflow overall summary report.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-summary",
+        description = "Summary of template flowsets and data flowsets.")
+public class NetflowSummaryCommand extends AbstractShellCommand {
+
+    @Override
+    protected void doExecute() {
+        NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+        Set<DataTemplateRecord> templates = controller.getTemplateFlowSet();
+        if (templates.isEmpty()) {
+            print("Template not found");
+        } else {
+            Set<Integer> templateIds = templates.stream()
+                    .map(DataTemplateRecord::getTemplateId)
+                    .map(TemplateId::getId)
+                    .collect(Collectors.toSet());
+            printTemplateflowSet(templateIds);
+
+        }
+        Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+        if (dataFlowSets.isEmpty()) {
+            print("Data not found");
+        } else {
+            printDataflowSet(dataFlowSets);
+        }
+    }
+
+    /**
+     * Adds template flowset summary in specified row wise.
+     *
+     * @param templateIds template ids
+     */
+    private void printTemplateflowSet(Set<Integer> templateIds) {
+        print("Number of templates : %d, Template IDs : %s",
+                templateIds.size(),
+                templateIds.toString());
+    }
+
+    /**
+     * Adds data flowset summary in specified row wise.
+     *
+     * @param dataflowMap data flow map
+     */
+    private void printDataflowSet(Map<TemplateId, List<DataFlowRecord>> dataflowMap) {
+        dataflowMap.entrySet().forEach(entry -> {
+            print("Template ID : %d, Data flow count : %d",
+                    entry.getKey().getId(),
+                    entry.getValue().size());
+        });
+        print("\n");
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java
new file mode 100644
index 0000000..abfbb73
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.Set;
+import java.util.Optional;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataTemplateRecord;
+
+/**
+ * Lists all netflow template flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-template",
+        description = "Lists all template flowsets received from netflow exporter.")
+public class NetflowTemplateCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "templateID",
+            description = "Netflow template ID",
+            required = false, multiValued = false)
+    int templateID = 0;
+
+    @Override
+    protected void doExecute() {
+        NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+        if (templateID < 0) {
+            print("Invalid template ID");
+            return;
+        }
+        if (templateID > 0) {
+            Optional<DataTemplateRecord> template = controller.getTemplateFlowSet(TemplateId.valueOf(templateID));
+            if (template.isPresent()) {
+                printTemplate(template.get());
+            } else {
+                print("Default template not found");
+            }
+        } else {
+            Set<DataTemplateRecord> templates = controller.getTemplateFlowSet();
+            if (templates.isEmpty()) {
+                print("Default template not found");
+            } else {
+                templates.stream().forEach(template -> printTemplate(template));
+            }
+        }
+    }
+
+    /**
+     * Adds template flowset details in specified row wise.
+     *
+     * @param template template flow set.
+     */
+    private void printTemplate(DataTemplateRecord template) {
+        print("Template ID : %d, Field Count : %d",
+                template.getTemplateId().getId(),
+                template.getFiledCount());
+        template.getFields().forEach(flowTemplateField -> {
+            print("Field : %s, Length : %d",
+                    flowTemplateField.getFlowField().name(),
+                    flowTemplateField.getLength());
+        });
+        print("\n");
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java
new file mode 100644
index 0000000..51cefa1
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Optional;
+import java.util.Map;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Lists all filtered data flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-traffic-filter",
+        description = "Lists all filtered data flowsets received from netflow exporter.")
+public class NetflowTrafficFilterCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "field", description = "flow field",
+            required = false, multiValued = false)
+    @Completion(FlowFieldCompleter.class)
+    protected String field = null;
+
+    @Argument(index = 1, name = "value", description = "flow value",
+            required = false, multiValued = false)
+    protected String value = null;
+
+    @Override
+    protected void doExecute() {
+        NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+        Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+        if (dataFlowSets.isEmpty()) {
+            print("Data not found");
+            return;
+        }
+        dataFlowSets.values()
+                .stream()
+                .flatMap(Collection::stream)
+                .filter(df -> {
+                    Optional<Object> fieldValue = getFieldValue(df.getFlows(), field);
+                    if (fieldValue.isPresent() && fieldValue.toString().equals(value)) {
+                        return true;
+                    }
+                    return false;
+                })
+                .forEach(df -> prinDataflowSet(df));
+    }
+
+    /**
+     * Get flow field value from collection of flows.
+     * get flow field value which is matching to the given flow field.
+     *
+     * @param flows collection of flows
+     * @param field flow field
+     */
+    private Optional<Object> getFieldValue(List<Flow> flows, String field) {
+        FlowField flowField = FlowField.valueOf(field);
+        return flows.stream()
+                .filter(flow -> flow.getField() == flowField)
+                .map(Flow::getValue)
+                .findAny();
+    }
+
+    /**
+     * Get flow field value from collection of flows.
+     * get flow field value which is matching to the given flow field predicates.
+     *
+     * @param flows collection of flows
+     * @param field flow field predicates
+     */
+    private Optional<Object> getFieldValue(List<Flow> flows, Predicate<FlowField> field) {
+        return flows.stream()
+                .filter(flow -> field.test(flow.getField()))
+                .map(Flow::getValue)
+                .findAny();
+    }
+
+
+    /**
+     * Adds data flowset record details in specified row wise.
+     *
+     * @param dataFlowRecord data flowset record.
+     */
+    private void prinDataflowSet(DataFlowRecord dataFlowRecord) {
+        print("Template ID : %d", dataFlowRecord.getTemplateId().getId());
+        dataFlowRecord.getFlows().forEach(dataflow -> {
+            print("Field : %s, Value : %s",
+                    dataflow.getField().name(),
+                    dataflow.getValue().toString());
+        });
+        print("\n");
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java
new file mode 100644
index 0000000..fec9ba7
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Optional;
+import java.util.Map;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import java.util.HashMap;
+import java.util.function.Predicate;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Flow traffic summary report.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-traffic-summary",
+        description = "Summary of data flowset records based on in/out bytes/packets, protocols or applications.")
+public class NetflowTrafficSummaryCommand extends AbstractShellCommand {
+
+    @Override
+    protected void doExecute() {
+        NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+        Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+        if (dataFlowSets.isEmpty()) {
+            print("Data not found");
+            return;
+        }
+
+        Map<String, List<DataFlowRecord>> ds = dataFlowSets.values()
+                .stream()
+                .flatMap(Collection::stream)
+                .collect(Collectors.groupingBy(d -> hostWiseFlow(d.getFlows())));
+        printHostTraffic(ds);
+
+        Map<String, Map<String, List<DataFlowRecord>>> protocol = new HashMap<>();
+        ds.entrySet().forEach(entry -> {
+            protocol.put(entry.getKey(), protocolWiseFlow(entry.getValue()));
+        });
+        printPrtocolTraffic(protocol);
+
+        Map<String, Map<String, List<DataFlowRecord>>> application = new HashMap<>();
+        ds.entrySet().forEach(entry -> {
+            application.put(entry.getKey(), applicationWiseFlow(entry.getValue()));
+        });
+        printApplicationTraffic(application);
+    }
+
+    /**
+     * Host wise flow filter.
+     * ipv4 and ipv6 host flow fields will be filtered and flow direction
+     * added from src ip to dst ip
+     *
+     * @param flows collection of flows
+     */
+    private String hostWiseFlow(List<Flow> flows) {
+        Predicate<FlowField> srcIpParser = f -> (f.equals(FlowField.IPV4_SRC_ADDR) ||
+                f.equals(FlowField.IPV6_SRC_ADDR));
+        Predicate<FlowField> dstIpParser = f -> (f.equals(FlowField.IPV4_DST_ADDR) ||
+                f.equals(FlowField.IPV6_DST_ADDR));
+        String srcIp = getFieldValue(flows, srcIpParser).get().toString();
+        String dstIp = getFieldValue(flows, dstIpParser).get().toString();
+        return srcIp + "  =>  " + dstIp;
+    }
+
+    /**
+     * Summary of all ingress bytes.
+     *
+     * @param dataFlowRecords collection of data flowset recored.
+     */
+    private int totalInBytes(List<DataFlowRecord> dataFlowRecords) {
+        return dataFlowRecords.stream()
+                .map(data -> inBytes(data.getFlows()))
+                .collect(Collectors.summingInt(Integer::intValue));
+    }
+
+    /**
+     * Summary of all ingress packets.
+     *
+     * @param dataFlowRecords collection of data flowset recored.
+     */
+    private int totalInPackets(List<DataFlowRecord> dataFlowRecords) {
+        return dataFlowRecords.stream()
+                .map(data -> inPackets(data.getFlows()))
+                .collect(Collectors.summingInt(Integer::intValue));
+    }
+
+    /**
+     * Application protocol wise flow filter.
+     *
+     * @param dataFlowRecords collection of data flowset recored.
+     */
+    private Map<String, List<DataFlowRecord>> applicationWiseFlow(List<DataFlowRecord> dataFlowRecords) {
+        return dataFlowRecords.stream()
+                .collect(Collectors.groupingBy(d -> getFieldValue(d.getFlows(),
+                        FlowField.L4_DST_PORT).get().toString()));
+
+    }
+
+    /**
+     * Transport protocol wise flow filter.
+     *
+     * @param dataFlowRecords collection of data flowset recored.
+     */
+    private Map<String, List<DataFlowRecord>> protocolWiseFlow(List<DataFlowRecord> dataFlowRecords) {
+        return dataFlowRecords.stream()
+                .collect(Collectors.groupingBy(d -> getFieldValue(d.getFlows(), FlowField.PROTOCOL).get().toString()));
+
+    }
+
+    /**
+     * Filter ingress bytes from flows and type cast to integer.
+     *
+     * @param flows collection of flows.
+     */
+    private int inBytes(List<Flow> flows) {
+        return Integer.parseInt(getFieldValue(flows, FlowField.IN_BYTES).get().toString());
+    }
+
+    /**
+     * Filter ingress packets from flows and type cast to integer.
+     *
+     * @param flows collection of flows.
+     */
+    private int inPackets(List<Flow> flows) {
+        return Integer.parseInt(getFieldValue(flows, FlowField.IN_PKTS).get().toString());
+    }
+
+    /**
+     * Get flow field value from collection of flows.
+     * get flow field value which is matching to the given flow field.
+     *
+     * @param flows collection of flows
+     * @param field flow field
+     */
+    private Optional<Object> getFieldValue(List<Flow> flows, FlowField field) {
+        return flows.stream()
+                .filter(flow -> flow.getField() == field)
+                .map(Flow::getValue)
+                .findAny();
+    }
+
+    /**
+     * Get flow field value from collection of flows.
+     * get flow field value which is matching to the given flow field predicates.
+     *
+     * @param flows collection of flows
+     * @param field flow field predicates
+     */
+    private Optional<Object> getFieldValue(List<Flow> flows, Predicate<FlowField> field) {
+        return flows.stream()
+                .filter(flow -> field.test(flow.getField()))
+                .map(Flow::getValue)
+                .findAny();
+    }
+
+    /**
+     * Adds host wise traffic summary in specified row wise.
+     *
+     * @param hosts mapping of host and traffic details.
+     */
+    private void printHostTraffic(Map<String, List<DataFlowRecord>> hosts) {
+        print("\nHost wise traffic flow");
+        hosts.entrySet().forEach(entry -> {
+            print("Traffic flow : %s, Total bytes : %d, Total packets : %d",
+                    entry.getKey(),
+                    totalInBytes(entry.getValue()),
+                    totalInPackets(entry.getValue()));
+        });
+        print("\n");
+    }
+
+    /**
+     * Adds protocol wise traffic summary in specified row wise.
+     *
+     * @param protocol mapping of portocol and traffic details.
+     */
+    private void printPrtocolTraffic(Map<String, Map<String, List<DataFlowRecord>>> protocol) {
+        print("Protocol wise traffic flow");
+        protocol.entrySet().forEach(entry -> {
+            entry.getValue().entrySet().forEach(port -> {
+                print("Traffic flow : %s, Protocol : %s, Total bytes : %d, Total packets : %d",
+                        entry.getKey(),
+                        port.getKey(),
+                        totalInBytes(port.getValue()),
+                        totalInPackets(port.getValue()));
+            });
+        });
+        print("\n");
+    }
+
+    /**
+     * Adds application wise traffic summary in specified row wise.
+     *
+     * @param applications mapping of application and traffic details.
+     */
+    private void printApplicationTraffic(Map<String, Map<String, List<DataFlowRecord>>> applications) {
+        print("Application wise traffic flow");
+        applications.entrySet().forEach(entry -> {
+            entry.getValue().entrySet().forEach(port -> {
+                print("Traffic flow : %s, Application : %s, Total bytes : %d, Total packets : %d",
+                        entry.getKey(),
+                        port.getKey(),
+                        totalInBytes(port.getValue()),
+                        totalInPackets(port.getValue()));
+            });
+        });
+        print("\n");
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java
new file mode 100644
index 0000000..f7183d6
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * CLI implementation for netflow collector.
+ */
+package org.onosproject.netflow.cli;
\ No newline at end of file
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java
new file mode 100644
index 0000000..8f17232
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netflow.impl;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.netflow.NetflowController;
+
+/**
+ * The main controller class. Handles all setup and network listeners -
+ * Ownership of netflow message receiver.
+ */
+public class Controller {
+
+    private static final Logger log = LoggerFactory.getLogger(Controller.class);
+
+    private ChannelGroup channelGroup;
+    private Channel serverChannel;
+
+    // Configuration options
+    protected static final short NETFLOW_PORT_NUM = 2055;
+    private final int workerThreads = 16;
+
+    // Start time of the controller
+    private long systemStartTime;
+
+    private ChannelFactory serverExecFactory;
+
+    private static final int BUFFER_SIZE = 5 * 1024;
+
+    private NetflowController controller;
+
+    /**
+     * Constructor to initialize the values.
+     *
+     * @param controller netflow controller instance
+     */
+    public Controller(NetflowController controller) {
+        this.controller = controller;
+    }
+
+    /**
+     * To get system start time.
+     *
+     * @return system start time in milliseconds
+     */
+    public long getSystemStartTime() {
+        return (this.systemStartTime);
+    }
+
+    /**
+     * Initialize timer.
+     */
+    public void init() {
+        this.systemStartTime = System.currentTimeMillis();
+    }
+
+    /**
+     * Gets run time memory.
+     *
+     * @return run time memory
+     */
+    public Map<String, Long> getMemory() {
+        Map<String, Long> m = new HashMap<>();
+        Runtime runtime = Runtime.getRuntime();
+        m.put("total", runtime.totalMemory());
+        m.put("free", runtime.freeMemory());
+        return m;
+    }
+
+    /**
+     * Gets UP time.
+     *
+     * @return UP time
+     */
+    public Long getUptime() {
+        RuntimeMXBean rb = ManagementFactory.getRuntimeMXBean();
+        return rb.getUptime();
+    }
+
+    /**
+     * Netflow collector it will receive message from netflow exporter.
+     */
+    public void run() {
+
+        try {
+
+            final ConnectionlessBootstrap bootstrap = createServerBootStrap();
+
+            bootstrap.setOption("reuseAddress", false);
+            bootstrap.setOption("child.reuseAddress", false);
+            bootstrap.setOption("readBufferSize", BUFFER_SIZE); //15M
+            bootstrap.setOption("receiveBufferSizePredictor",
+                    new FixedReceiveBufferSizePredictor(BUFFER_SIZE));
+            bootstrap.setOption("receiveBufferSizePredictorFactory",
+                    new FixedReceiveBufferSizePredictorFactory(BUFFER_SIZE));
+            ChannelPipelineFactory pfact = new NetflowPipelineFactory(this.controller);
+
+            bootstrap.setPipelineFactory(pfact);
+            InetSocketAddress inetSocketAddress = new InetSocketAddress(NETFLOW_PORT_NUM);
+            channelGroup = new DefaultChannelGroup();
+            serverChannel = bootstrap.bind(inetSocketAddress);
+            channelGroup.add(serverChannel);
+            log.info("Listening for netflow exporter connection on {}", inetSocketAddress);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /**
+     * Creates server boot strap.
+     *
+     * @return ServerBootStrap
+     */
+    private ConnectionlessBootstrap createServerBootStrap() {
+
+        if (workerThreads == 0) {
+            serverExecFactory = new NioDatagramChannelFactory(
+                    Executors.newFixedThreadPool(2));
+            return new ConnectionlessBootstrap(serverExecFactory);
+        } else {
+            serverExecFactory = new NioDatagramChannelFactory(
+                    Executors.newFixedThreadPool(2),
+                    workerThreads);
+            return new ConnectionlessBootstrap(serverExecFactory);
+        }
+    }
+
+    /**
+     * Stops the netflow collector.
+     */
+    public void stop() {
+        log.info("Stopped");
+        channelGroup.close();
+    }
+
+    /**
+     * Starts the netflow collector.
+     */
+    public void start() {
+        log.info("Started");
+        this.run();
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java
new file mode 100644
index 0000000..90d75dd
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.Optional;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.onosproject.netflow.NetflowStore;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+import org.onosproject.netflow.FlowTemplateField;
+import org.onosproject.netflow.SourceId;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataRecord;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.FlowSet;
+import org.onosproject.netflow.DataFlowSet;
+import org.onosproject.netflow.TemplateRecord;
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.OptionalTemplateFlowSet;
+import org.onosproject.netflow.TemplateFlowSet;
+import org.onlab.packet.BasePacket;
+
+/**
+ * Manages inventory of Netflow template and data flowset to distribute
+ * information.
+ */
+@Component(immediate = true, service = NetflowStore.class)
+public class DistributedNetflowStore implements NetflowStore {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private EventuallyConsistentMap<TemplateId, DataTemplateRecord> templateFlowSet;
+    private EventuallyConsistentMap<TemplateId, OptionalTemplateFlowSet> optionalTemplateFlowSet;
+    private EventuallyConsistentMap<TemplateId, List<DataFlowRecord>> dataFlowSet;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Activate
+    protected void activate() {
+        KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.MISC)
+                .register(List.class)
+                .register(LinkedList.class)
+                .register(ArrayList.class)
+                .register(HashSet.class)
+                .register(BasePacket.class)
+                .register(Flow.class)
+                .register(FlowField.class)
+                .register(FlowTemplateField.class)
+                .register(SourceId.class)
+                .register(TemplateId.class)
+                .register(DataRecord.class)
+                .register(DataFlowRecord.class)
+                .register(FlowSet.class)
+                .register(DataFlowSet.class)
+                .register(TemplateRecord.class)
+                .register(DataTemplateRecord.class)
+                .register(NetFlowPacket.class)
+                .register(OptionalTemplateFlowSet.class)
+                .register(TemplateFlowSet.class)
+                .register(TemplateRecord.class);
+
+        templateFlowSet = storageService.<TemplateId, DataTemplateRecord>eventuallyConsistentMapBuilder()
+                .withSerializer(serializer)
+                .withName("netflow-templateflowset")
+                .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+                .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+                .withTombstonesDisabled()
+                .build();
+
+        optionalTemplateFlowSet = storageService.<TemplateId, OptionalTemplateFlowSet>eventuallyConsistentMapBuilder()
+                .withSerializer(serializer)
+                .withName("netflow-optionaltemplateflowset")
+                .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+                .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+                .withTombstonesDisabled()
+                .build();
+
+        dataFlowSet = storageService.<TemplateId, List<DataFlowRecord>>eventuallyConsistentMapBuilder()
+                .withSerializer(serializer)
+                .withName("netflow-dataflowset")
+                .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+                .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+                .withTombstonesDisabled()
+                .build();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactive() {
+        log.info("Stopped");
+    }
+
+    /**
+     * Get template flowset from store.
+     * it will fetch current template which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return optional of data template record, optional will be empty if template not found.
+     */
+    @Override
+    public Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId) {
+        return Optional.ofNullable(templateFlowSet.get(templateId));
+    }
+
+    /**
+     * Get set of template flowsets from store.
+     * it will fetch all template flowsets from store.
+     *
+     * @return set of data template record, set will be empty if templates not found in the store.
+     */
+    @Override
+    public Set<DataTemplateRecord> getTemplateFlowSet() {
+        return templateFlowSet.values().stream()
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Get optional template flowset from store.
+     * it will fetch current optional template which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return optional of optional template flowset, optional will be empty if template not found.
+     */
+    @Override
+    public Optional<OptionalTemplateFlowSet> getOptionalTemplateFlowSet(TemplateId templateId) {
+        return Optional.ofNullable(optionalTemplateFlowSet.get(templateId));
+    }
+
+    /**
+     * Get set of optional template flowsets from store.
+     * it will fetch all optional template flowsets from store.
+     *
+     * @return set of optional template flowsets, set will be empty if templates not found in the store.
+     */
+    @Override
+    public Set<OptionalTemplateFlowSet> getOptionalTemplateFlowSet() {
+        return ImmutableSet.copyOf(optionalTemplateFlowSet.values());
+    }
+
+    /**
+     * Get data flowset from store.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return list of data flow record, list will be empty if template id not matched.
+     */
+    @Override
+    public List<DataFlowRecord> getDataFlowSet(TemplateId templateId) {
+        List<DataFlowRecord> dataRecord = dataFlowSet.get(templateId);
+        if (!Objects.isNull(dataRecord)) {
+            return ImmutableList.copyOf(dataRecord);
+        }
+        return Lists.newArrayList();
+    }
+
+    /**
+     * Get data flowset from store.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @return mapping from a template id to data flow record.
+     */
+    @Override
+    public Map<TemplateId, List<DataFlowRecord>> getDataFlowSet() {
+        return dataFlowSet.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
+    }
+
+    /**
+     * Update template flowset to the store.
+     * Add new template to store if it not exist,
+     * otherwise it will replace the existing template.
+     *
+     * @param templateRecord template flowset record.
+     */
+    @Override
+    public void updateTemplateFlowSet(DataTemplateRecord templateRecord) {
+        templateFlowSet.put(templateRecord.getTemplateId(), templateRecord);
+    }
+
+    /**
+     * Update optional template flowset to the store.
+     * Add new optional template to store if it not exist,
+     * otherwise it will replace the existing optional template.
+     *
+     * @param templateFlowSet optional template flowset.
+     */
+    @Override
+    public void updateOptionalTemplateFlowSet(OptionalTemplateFlowSet templateFlowSet) {
+        optionalTemplateFlowSet.put(templateFlowSet.getTemplateId(), templateFlowSet);
+    }
+
+    /**
+     * Add data flow record to the store.
+     * Add new data flow record to store
+     *
+     * @param dataFlowRecord data flow record.
+     */
+    @Override
+    public void addDataFlowSet(DataFlowRecord dataFlowRecord) {
+        dataFlowSet.compute(dataFlowRecord.getTemplateId(),
+                (id, flowSet) -> {
+                    List<DataFlowRecord> newSet = new ArrayList<>();
+                    if (flowSet != null) {
+                        newSet.addAll(flowSet);
+                    }
+                    newSet.add(dataFlowRecord);
+                    return newSet;
+                });
+    }
+
+    /**
+     * Remove template flowset from store.
+     * it will remove template flowset which is matching to the given template id from store.
+     *
+     * @param templateId template id.
+     */
+    @Override
+    public void clearTemplateFlowSet(TemplateId templateId) {
+        if (templateFlowSet.containsKey(templateId)) {
+            templateFlowSet.remove(templateId);
+        }
+    }
+
+    /**
+     * Remove all template flowset from store.
+     * it will remove all template flowsets from store.
+     */
+    @Override
+    public void clearTemplateFlowSet() {
+        templateFlowSet.clear();
+    }
+
+    /**
+     * Remove optional template flowset from store.
+     * it will remove optional template which is matching to the given template id from store.
+     *
+     * @param templateId template id.
+     */
+    @Override
+    public void clearOptionalTemplateFlowSet(TemplateId templateId) {
+        if (optionalTemplateFlowSet.containsKey(templateId)) {
+            optionalTemplateFlowSet.remove(templateId);
+        }
+    }
+
+    /**
+     * Remove all optional template flowset from store.
+     * it will remove all optional template flowsets from store.
+     */
+    @Override
+    public void clearOptionalTemplateFlowSet() {
+        optionalTemplateFlowSet.clear();
+    }
+
+    /**
+     * Remove data flowset from store.
+     * it will remove dataflowset which is matching to the given template id from store.
+     *
+     * @param templateId template id.
+     */
+    @Override
+    public void clearDataFlowSet(TemplateId templateId) {
+        if (dataFlowSet.containsKey(templateId)) {
+            dataFlowSet.remove(templateId);
+        }
+    }
+
+    /**
+     * Remove all data flowset from store.
+     * it will remove all data flowsets from store.
+     */
+    @Override
+    public void clearDataFlowSet() {
+        dataFlowSet.clear();
+    }
+
+    /**
+     * Remove template, optional template and data flowsets from store.
+     * it will remove all flowsets from store.
+     */
+    @Override
+    public void clearAllFlowSet() {
+        templateFlowSet.clear();
+        optionalTemplateFlowSet.clear();
+        dataFlowSet.clear();
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
index df5ffce..27eb8ea 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.netflow.impl;
 
+import java.util.Optional;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
@@ -22,18 +23,35 @@
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.SimpleChannelHandler;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.FlowSet;
+import org.onosproject.netflow.TemplateFlowSet;
+import org.onosproject.netflow.DataFlowSet;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.NetflowController;
+
 /**
  * Channel handler deals with the netfow exporter connection and dispatches messages
  * from netfow exporter to the appropriate locations.
  */
 public class NeflowChannelHandler extends SimpleChannelHandler {
 
+    private static final Logger log = LoggerFactory.getLogger(NeflowChannelHandler.class);
+
     private Channel channel;
 
+    private NetflowController controller;
+
     /**
      * Create a new netflow channelHandler instance.
+     *
+     * @param controller netflow controller.
      */
-    NeflowChannelHandler() {
+    NeflowChannelHandler(NetflowController controller) {
+        this.controller = controller;
     }
 
     /**
@@ -81,8 +99,39 @@
      */
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
-        //TODO Netflow message store to netflow distributed store
-        NetFlowPacket packet = (NetFlowPacket) event.getMessage();
+        try {
+            NetFlowPacket netFlowPacket = (NetFlowPacket) event.getMessage();
+            netFlowPacket.getFlowSets()
+                    .stream()
+                    .filter(n -> n.getType() == FlowSet.Type.TEMPLATE_FLOWSET)
+                    .map(t -> (TemplateFlowSet) t)
+                    .flatMap(t -> t.getRecords().stream())
+                    .forEach(t -> controller.addTemplateFlowSet(t));
+
+            netFlowPacket.getFlowSets()
+                    .stream()
+                    .filter(n -> n.getType() == FlowSet.Type.DATA_FLOWSET)
+                    .map(t -> (DataFlowSet) t)
+                    .forEach(data -> {
+                        Optional<DataTemplateRecord> template = controller
+                                .getTemplateFlowSet(TemplateId.valueOf(data.getFlowSetId()));
+                        if (!template.isPresent()) {
+                            return;
+                        }
+                        try {
+                            data.dataDeserializer(template.get());
+                            data.getDataFlow()
+                                    .stream()
+                                    .forEach(dataflow -> controller.updateDataFlowSet(dataflow));
+                        } catch (Exception ex) {
+                            log.error("Netflow dataflow deserializer exception ", ex);
+                        }
+                    });
+            log.info("Netflow message received {}", netFlowPacket);
+        } catch (Exception er) {
+            log.error("Netflow message deserializer exception ", er);
+        }
+
     }
 
 }
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
index be6ae9a..bbe0c0f 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
@@ -23,6 +23,7 @@
 
 import org.onlab.packet.Deserializer;
 import org.onlab.packet.BasePacket;
+import org.onosproject.netflow.FlowSet;
 
 import static com.google.common.base.Preconditions.checkState;
 
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java
new file mode 100644
index 0000000..6d33f61
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import org.onosproject.netflow.NetflowController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import org.onosproject.netflow.NetflowStore;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.DataTemplateRecord;
+
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * It control and manage the netflow traffic.
+ * it is Collecting, Storing and analyzing NetFlow data
+ * it can help to understand which applications, and protocols
+ * may be consuming the most network bandwidth by tracking processes,
+ * protocols, times of day, and traffic routing.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+@Component(immediate = true, service = NetflowController.class)
+public class NetflowControllerImpl implements NetflowController {
+
+    private static final Logger log = LoggerFactory.getLogger(NetflowControllerImpl.class);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected NetflowStore store;
+
+    @Activate
+    public void activate() {
+        Controller ctrl = new Controller(this);
+        ctrl.start();
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    /**
+     * Add template flowset to controller.
+     * Add new template to controller if it not exist,
+     * otherwise it will replace the existing template.
+     *
+     * @param templateRecord template flowset record.
+     */
+    @Override
+    public void addTemplateFlowSet(DataTemplateRecord templateRecord) {
+        store.updateTemplateFlowSet(templateRecord);
+    }
+
+    /**
+     * Update data flowset to controller.
+     * it will update new data flowset to store.
+     *
+     * @param dataFlowRecord data flowset record.
+     */
+    @Override
+    public void updateDataFlowSet(DataFlowRecord dataFlowRecord) {
+        store.addDataFlowSet(dataFlowRecord);
+    }
+
+    /**
+     * Get template flowset from controller.
+     * it will fetch current template which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return optional of data template record, optional will be empty if template not found.
+     */
+    @Override
+    public Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId) {
+        return store.getTemplateFlowSet(templateId);
+    }
+
+    /**
+     * Get data flowset from controller.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return list of data flow record, list will be empty if template id not matched.
+     */
+    @Override
+    public List<DataFlowRecord> getDataFlowSet(TemplateId templateId) {
+        return store.getDataFlowSet(templateId);
+    }
+
+    /**
+     * Get all template flowsets from controller.
+     * it will fetch all templates from store.
+     *
+     * @return set of data template record, set will be empty if templates not found in the store.
+     */
+    @Override
+    public Set<DataTemplateRecord> getTemplateFlowSet() {
+        return store.getTemplateFlowSet();
+    }
+
+    /**
+     * Get data flowset from controller.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @return mapping from a template id to data flow record.
+     */
+    @Override
+    public Map<TemplateId, List<DataFlowRecord>> getDataFlowSet() {
+        return store.getDataFlowSet();
+    }
+
+}
\ No newline at end of file
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
index 8dad97d..6807a5c 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
@@ -42,7 +42,7 @@
                 return NetFlowPacket.deserializer().deserialize(bytes, 0, bytes.length);
             }
         } catch (Exception e) {
-            log.error("Netflow message decode error");
+            log.error("Netflow message decode error ", e);
             buffer.resetReaderIndex();
             buffer.discardReadBytes();
 
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
index 178f6e6..9f5ef2e 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
@@ -18,18 +18,23 @@
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
-
+import org.onosproject.netflow.NetflowController;
 
 /**
  * Creates a ChannelPipeline for a server-side netflow message channel.
  */
 public class NetflowPipelineFactory implements ChannelPipelineFactory {
 
+    private NetflowController controller;
+
     /**
      * Constructor to initialize the values.
+     *
+     * @param controller netflow controller.
      */
-    public NetflowPipelineFactory() {
+    public NetflowPipelineFactory(NetflowController controller) {
         super();
+        this.controller = controller;
     }
 
     /**
@@ -40,7 +45,7 @@
      */
     @Override
     public ChannelPipeline getPipeline() throws Exception {
-        NeflowChannelHandler handler = new NeflowChannelHandler();
+        NeflowChannelHandler handler = new NeflowChannelHandler(controller);
 
         ChannelPipeline pipeline = Channels.pipeline();
         pipeline.addLast("netflowmessagedecoder", new NetflowMessageDecoder());