Netflow store and cli implementation

Implemented netflow version 9 of netflow collector store and cli.

Reference :  https://datatracker.ietf.org/doc/html/rfc3954

Change-Id: I0b2ec932c28bc3b36042c7a725a41956cc859fcd
diff --git a/apps/netflow/app/BUILD b/apps/netflow/app/BUILD
index 747d61d..c1a4601 100644
--- a/apps/netflow/app/BUILD
+++ b/apps/netflow/app/BUILD
@@ -1,4 +1,4 @@
-COMPILE_DEPS = CORE_DEPS + [
+COMPILE_DEPS = CORE_DEPS + KRYO + CLI + [
     "//core/store/serializers:onos-core-serializers",
     "//apps/netflow/api:onos-apps-netflow-api",
     "@io_netty_netty_common//jar",
@@ -6,5 +6,6 @@
 ]
 
 osgi_jar(
+    karaf_command_packages = ["org.onosproject.netflow.cli"],
     deps = COMPILE_DEPS,
 )
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java
new file mode 100644
index 0000000..760d40b
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.api.console.Completer;
+import org.apache.karaf.shell.api.console.CommandLine;
+import org.apache.karaf.shell.api.console.Session;
+import org.apache.karaf.shell.support.completers.StringsCompleter;
+
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Flow field completer.
+ */
+@Service
+public class FlowFieldCompleter implements Completer {
+
+    @Override
+    public int complete(Session session, CommandLine commandLine, List<String> candidates) {
+
+        StringsCompleter delegate = new StringsCompleter();
+        Set<String> flowfields = FlowField.getAllFlowField();
+        SortedSet<String> strings = delegate.getStrings();
+        for (String field : flowfields) {
+            strings.add(field);
+        }
+        return delegate.complete(session, commandLine, candidates);
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java
new file mode 100644
index 0000000..7a5db0d
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Map;
+import java.util.Collection;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+
+/**
+ * Lists all netflow data flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-dataflow",
+        description = "Lists all data flowsets received from netflow exporter.")
+public class NetflowDataflowCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "templateid",
+            description = "Data flowset template id",
+            required = false, multiValued = false)
+    int templateID = 0;
+
+    @Override
+    protected void doExecute() {
+        NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+
+        if (templateID < 0) {
+            print("Invalid template ID");
+            return;
+        }
+        if (templateID > 0) {
+            List<DataFlowRecord> dataFlowSet = controller.getDataFlowSet(TemplateId.valueOf(templateID));
+            if (!dataFlowSet.isEmpty()) {
+                dataFlowSet.stream().forEach(data -> printDataflowSet(data));
+            } else {
+                print("Default template not found");
+            }
+        } else {
+            Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+            if (dataFlowSets.isEmpty()) {
+                print("Default template not found");
+            } else {
+                dataFlowSets.values().stream().flatMap(Collection::stream).forEach(data -> printDataflowSet(data));
+            }
+        }
+    }
+
+    /**
+     * Adds data flowset details in specified row wise.
+     *
+     * @param dataFlowRecord data flowset record.
+     */
+    private void printDataflowSet(DataFlowRecord dataFlowRecord) {
+        print("Template ID : %d", dataFlowRecord.getTemplateId().getId());
+        dataFlowRecord.getFlows().forEach(dataflow -> {
+            print("Field : %s,  Value : %s", dataflow.getField().name(),
+                    dataflow.getValue().toString());
+        });
+        print("\n");
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java
new file mode 100644
index 0000000..c23acdf
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.DataFlowRecord;
+
+/**
+ * Netflow overall summary report.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-summary",
+        description = "Summary of template flowsets and data flowsets.")
+public class NetflowSummaryCommand extends AbstractShellCommand {
+
+    @Override
+    protected void doExecute() {
+        NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+        Set<DataTemplateRecord> templates = controller.getTemplateFlowSet();
+        if (templates.isEmpty()) {
+            print("Template not found");
+        } else {
+            Set<Integer> templateIds = templates.stream()
+                    .map(DataTemplateRecord::getTemplateId)
+                    .map(TemplateId::getId)
+                    .collect(Collectors.toSet());
+            printTemplateflowSet(templateIds);
+
+        }
+        Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+        if (dataFlowSets.isEmpty()) {
+            print("Data not found");
+        } else {
+            printDataflowSet(dataFlowSets);
+        }
+    }
+
+    /**
+     * Adds template flowset summary in specified row wise.
+     *
+     * @param templateIds template ids
+     */
+    private void printTemplateflowSet(Set<Integer> templateIds) {
+        print("Number of templates : %d, Template IDs : %s",
+                templateIds.size(),
+                templateIds.toString());
+    }
+
+    /**
+     * Adds data flowset summary in specified row wise.
+     *
+     * @param dataflowMap data flow map
+     */
+    private void printDataflowSet(Map<TemplateId, List<DataFlowRecord>> dataflowMap) {
+        dataflowMap.entrySet().forEach(entry -> {
+            print("Template ID : %d, Data flow count : %d",
+                    entry.getKey().getId(),
+                    entry.getValue().size());
+        });
+        print("\n");
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java
new file mode 100644
index 0000000..abfbb73
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.Set;
+import java.util.Optional;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataTemplateRecord;
+
+/**
+ * Lists all netflow template flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-template",
+        description = "Lists all template flowsets received from netflow exporter.")
+public class NetflowTemplateCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "templateID",
+            description = "Netflow template ID",
+            required = false, multiValued = false)
+    int templateID = 0;
+
+    @Override
+    protected void doExecute() {
+        NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+        if (templateID < 0) {
+            print("Invalid template ID");
+            return;
+        }
+        if (templateID > 0) {
+            Optional<DataTemplateRecord> template = controller.getTemplateFlowSet(TemplateId.valueOf(templateID));
+            if (template.isPresent()) {
+                printTemplate(template.get());
+            } else {
+                print("Default template not found");
+            }
+        } else {
+            Set<DataTemplateRecord> templates = controller.getTemplateFlowSet();
+            if (templates.isEmpty()) {
+                print("Default template not found");
+            } else {
+                templates.stream().forEach(template -> printTemplate(template));
+            }
+        }
+    }
+
+    /**
+     * Adds template flowset details in specified row wise.
+     *
+     * @param template template flow set.
+     */
+    private void printTemplate(DataTemplateRecord template) {
+        print("Template ID : %d, Field Count : %d",
+                template.getTemplateId().getId(),
+                template.getFiledCount());
+        template.getFields().forEach(flowTemplateField -> {
+            print("Field : %s, Length : %d",
+                    flowTemplateField.getFlowField().name(),
+                    flowTemplateField.getLength());
+        });
+        print("\n");
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java
new file mode 100644
index 0000000..51cefa1
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Optional;
+import java.util.Map;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Lists all filtered data flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-traffic-filter",
+        description = "Lists all filtered data flowsets received from netflow exporter.")
+public class NetflowTrafficFilterCommand extends AbstractShellCommand {
+
+    @Argument(index = 0, name = "field", description = "flow field",
+            required = false, multiValued = false)
+    @Completion(FlowFieldCompleter.class)
+    protected String field = null;
+
+    @Argument(index = 1, name = "value", description = "flow value",
+            required = false, multiValued = false)
+    protected String value = null;
+
+    @Override
+    protected void doExecute() {
+        NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+        Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+        if (dataFlowSets.isEmpty()) {
+            print("Data not found");
+            return;
+        }
+        dataFlowSets.values()
+                .stream()
+                .flatMap(Collection::stream)
+                .filter(df -> {
+                    Optional<Object> fieldValue = getFieldValue(df.getFlows(), field);
+                    if (fieldValue.isPresent() && fieldValue.toString().equals(value)) {
+                        return true;
+                    }
+                    return false;
+                })
+                .forEach(df -> prinDataflowSet(df));
+    }
+
+    /**
+     * Get flow field value from collection of flows.
+     * get flow field value which is matching to the given flow field.
+     *
+     * @param flows collection of flows
+     * @param field flow field
+     */
+    private Optional<Object> getFieldValue(List<Flow> flows, String field) {
+        FlowField flowField = FlowField.valueOf(field);
+        return flows.stream()
+                .filter(flow -> flow.getField() == flowField)
+                .map(Flow::getValue)
+                .findAny();
+    }
+
+    /**
+     * Get flow field value from collection of flows.
+     * get flow field value which is matching to the given flow field predicates.
+     *
+     * @param flows collection of flows
+     * @param field flow field predicates
+     */
+    private Optional<Object> getFieldValue(List<Flow> flows, Predicate<FlowField> field) {
+        return flows.stream()
+                .filter(flow -> field.test(flow.getField()))
+                .map(Flow::getValue)
+                .findAny();
+    }
+
+
+    /**
+     * Adds data flowset record details in specified row wise.
+     *
+     * @param dataFlowRecord data flowset record.
+     */
+    private void prinDataflowSet(DataFlowRecord dataFlowRecord) {
+        print("Template ID : %d", dataFlowRecord.getTemplateId().getId());
+        dataFlowRecord.getFlows().forEach(dataflow -> {
+            print("Field : %s, Value : %s",
+                    dataflow.getField().name(),
+                    dataflow.getValue().toString());
+        });
+        print("\n");
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java
new file mode 100644
index 0000000..fec9ba7
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Optional;
+import java.util.Map;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import java.util.HashMap;
+import java.util.function.Predicate;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Flow traffic summary report.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-traffic-summary",
+        description = "Summary of data flowset records based on in/out bytes/packets, protocols or applications.")
+public class NetflowTrafficSummaryCommand extends AbstractShellCommand {
+
+    @Override
+    protected void doExecute() {
+        NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+        Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+        if (dataFlowSets.isEmpty()) {
+            print("Data not found");
+            return;
+        }
+
+        Map<String, List<DataFlowRecord>> ds = dataFlowSets.values()
+                .stream()
+                .flatMap(Collection::stream)
+                .collect(Collectors.groupingBy(d -> hostWiseFlow(d.getFlows())));
+        printHostTraffic(ds);
+
+        Map<String, Map<String, List<DataFlowRecord>>> protocol = new HashMap<>();
+        ds.entrySet().forEach(entry -> {
+            protocol.put(entry.getKey(), protocolWiseFlow(entry.getValue()));
+        });
+        printPrtocolTraffic(protocol);
+
+        Map<String, Map<String, List<DataFlowRecord>>> application = new HashMap<>();
+        ds.entrySet().forEach(entry -> {
+            application.put(entry.getKey(), applicationWiseFlow(entry.getValue()));
+        });
+        printApplicationTraffic(application);
+    }
+
+    /**
+     * Host wise flow filter.
+     * ipv4 and ipv6 host flow fields will be filtered and flow direction
+     * added from src ip to dst ip
+     *
+     * @param flows collection of flows
+     */
+    private String hostWiseFlow(List<Flow> flows) {
+        Predicate<FlowField> srcIpParser = f -> (f.equals(FlowField.IPV4_SRC_ADDR) ||
+                f.equals(FlowField.IPV6_SRC_ADDR));
+        Predicate<FlowField> dstIpParser = f -> (f.equals(FlowField.IPV4_DST_ADDR) ||
+                f.equals(FlowField.IPV6_DST_ADDR));
+        String srcIp = getFieldValue(flows, srcIpParser).get().toString();
+        String dstIp = getFieldValue(flows, dstIpParser).get().toString();
+        return srcIp + "  =>  " + dstIp;
+    }
+
+    /**
+     * Summary of all ingress bytes.
+     *
+     * @param dataFlowRecords collection of data flowset recored.
+     */
+    private int totalInBytes(List<DataFlowRecord> dataFlowRecords) {
+        return dataFlowRecords.stream()
+                .map(data -> inBytes(data.getFlows()))
+                .collect(Collectors.summingInt(Integer::intValue));
+    }
+
+    /**
+     * Summary of all ingress packets.
+     *
+     * @param dataFlowRecords collection of data flowset recored.
+     */
+    private int totalInPackets(List<DataFlowRecord> dataFlowRecords) {
+        return dataFlowRecords.stream()
+                .map(data -> inPackets(data.getFlows()))
+                .collect(Collectors.summingInt(Integer::intValue));
+    }
+
+    /**
+     * Application protocol wise flow filter.
+     *
+     * @param dataFlowRecords collection of data flowset recored.
+     */
+    private Map<String, List<DataFlowRecord>> applicationWiseFlow(List<DataFlowRecord> dataFlowRecords) {
+        return dataFlowRecords.stream()
+                .collect(Collectors.groupingBy(d -> getFieldValue(d.getFlows(),
+                        FlowField.L4_DST_PORT).get().toString()));
+
+    }
+
+    /**
+     * Transport protocol wise flow filter.
+     *
+     * @param dataFlowRecords collection of data flowset recored.
+     */
+    private Map<String, List<DataFlowRecord>> protocolWiseFlow(List<DataFlowRecord> dataFlowRecords) {
+        return dataFlowRecords.stream()
+                .collect(Collectors.groupingBy(d -> getFieldValue(d.getFlows(), FlowField.PROTOCOL).get().toString()));
+
+    }
+
+    /**
+     * Filter ingress bytes from flows and type cast to integer.
+     *
+     * @param flows collection of flows.
+     */
+    private int inBytes(List<Flow> flows) {
+        return Integer.parseInt(getFieldValue(flows, FlowField.IN_BYTES).get().toString());
+    }
+
+    /**
+     * Filter ingress packets from flows and type cast to integer.
+     *
+     * @param flows collection of flows.
+     */
+    private int inPackets(List<Flow> flows) {
+        return Integer.parseInt(getFieldValue(flows, FlowField.IN_PKTS).get().toString());
+    }
+
+    /**
+     * Get flow field value from collection of flows.
+     * get flow field value which is matching to the given flow field.
+     *
+     * @param flows collection of flows
+     * @param field flow field
+     */
+    private Optional<Object> getFieldValue(List<Flow> flows, FlowField field) {
+        return flows.stream()
+                .filter(flow -> flow.getField() == field)
+                .map(Flow::getValue)
+                .findAny();
+    }
+
+    /**
+     * Get flow field value from collection of flows.
+     * get flow field value which is matching to the given flow field predicates.
+     *
+     * @param flows collection of flows
+     * @param field flow field predicates
+     */
+    private Optional<Object> getFieldValue(List<Flow> flows, Predicate<FlowField> field) {
+        return flows.stream()
+                .filter(flow -> field.test(flow.getField()))
+                .map(Flow::getValue)
+                .findAny();
+    }
+
+    /**
+     * Adds host wise traffic summary in specified row wise.
+     *
+     * @param hosts mapping of host and traffic details.
+     */
+    private void printHostTraffic(Map<String, List<DataFlowRecord>> hosts) {
+        print("\nHost wise traffic flow");
+        hosts.entrySet().forEach(entry -> {
+            print("Traffic flow : %s, Total bytes : %d, Total packets : %d",
+                    entry.getKey(),
+                    totalInBytes(entry.getValue()),
+                    totalInPackets(entry.getValue()));
+        });
+        print("\n");
+    }
+
+    /**
+     * Adds protocol wise traffic summary in specified row wise.
+     *
+     * @param protocol mapping of portocol and traffic details.
+     */
+    private void printPrtocolTraffic(Map<String, Map<String, List<DataFlowRecord>>> protocol) {
+        print("Protocol wise traffic flow");
+        protocol.entrySet().forEach(entry -> {
+            entry.getValue().entrySet().forEach(port -> {
+                print("Traffic flow : %s, Protocol : %s, Total bytes : %d, Total packets : %d",
+                        entry.getKey(),
+                        port.getKey(),
+                        totalInBytes(port.getValue()),
+                        totalInPackets(port.getValue()));
+            });
+        });
+        print("\n");
+    }
+
+    /**
+     * Adds application wise traffic summary in specified row wise.
+     *
+     * @param applications mapping of application and traffic details.
+     */
+    private void printApplicationTraffic(Map<String, Map<String, List<DataFlowRecord>>> applications) {
+        print("Application wise traffic flow");
+        applications.entrySet().forEach(entry -> {
+            entry.getValue().entrySet().forEach(port -> {
+                print("Traffic flow : %s, Application : %s, Total bytes : %d, Total packets : %d",
+                        entry.getKey(),
+                        port.getKey(),
+                        totalInBytes(port.getValue()),
+                        totalInPackets(port.getValue()));
+            });
+        });
+        print("\n");
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java
new file mode 100644
index 0000000..f7183d6
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * CLI implementation for netflow collector.
+ */
+package org.onosproject.netflow.cli;
\ No newline at end of file
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java
new file mode 100644
index 0000000..8f17232
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netflow.impl;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.netflow.NetflowController;
+
+/**
+ * The main controller class. Handles all setup and network listeners -
+ * Ownership of netflow message receiver.
+ */
+public class Controller {
+
+    private static final Logger log = LoggerFactory.getLogger(Controller.class);
+
+    private ChannelGroup channelGroup;
+    private Channel serverChannel;
+
+    // Configuration options
+    protected static final short NETFLOW_PORT_NUM = 2055;
+    private final int workerThreads = 16;
+
+    // Start time of the controller
+    private long systemStartTime;
+
+    private ChannelFactory serverExecFactory;
+
+    private static final int BUFFER_SIZE = 5 * 1024;
+
+    private NetflowController controller;
+
+    /**
+     * Constructor to initialize the values.
+     *
+     * @param controller netflow controller instance
+     */
+    public Controller(NetflowController controller) {
+        this.controller = controller;
+    }
+
+    /**
+     * To get system start time.
+     *
+     * @return system start time in milliseconds
+     */
+    public long getSystemStartTime() {
+        return (this.systemStartTime);
+    }
+
+    /**
+     * Initialize timer.
+     */
+    public void init() {
+        this.systemStartTime = System.currentTimeMillis();
+    }
+
+    /**
+     * Gets run time memory.
+     *
+     * @return run time memory
+     */
+    public Map<String, Long> getMemory() {
+        Map<String, Long> m = new HashMap<>();
+        Runtime runtime = Runtime.getRuntime();
+        m.put("total", runtime.totalMemory());
+        m.put("free", runtime.freeMemory());
+        return m;
+    }
+
+    /**
+     * Gets UP time.
+     *
+     * @return UP time
+     */
+    public Long getUptime() {
+        RuntimeMXBean rb = ManagementFactory.getRuntimeMXBean();
+        return rb.getUptime();
+    }
+
+    /**
+     * Netflow collector it will receive message from netflow exporter.
+     */
+    public void run() {
+
+        try {
+
+            final ConnectionlessBootstrap bootstrap = createServerBootStrap();
+
+            bootstrap.setOption("reuseAddress", false);
+            bootstrap.setOption("child.reuseAddress", false);
+            bootstrap.setOption("readBufferSize", BUFFER_SIZE); //15M
+            bootstrap.setOption("receiveBufferSizePredictor",
+                    new FixedReceiveBufferSizePredictor(BUFFER_SIZE));
+            bootstrap.setOption("receiveBufferSizePredictorFactory",
+                    new FixedReceiveBufferSizePredictorFactory(BUFFER_SIZE));
+            ChannelPipelineFactory pfact = new NetflowPipelineFactory(this.controller);
+
+            bootstrap.setPipelineFactory(pfact);
+            InetSocketAddress inetSocketAddress = new InetSocketAddress(NETFLOW_PORT_NUM);
+            channelGroup = new DefaultChannelGroup();
+            serverChannel = bootstrap.bind(inetSocketAddress);
+            channelGroup.add(serverChannel);
+            log.info("Listening for netflow exporter connection on {}", inetSocketAddress);
+        } catch (Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    /**
+     * Creates server boot strap.
+     *
+     * @return ServerBootStrap
+     */
+    private ConnectionlessBootstrap createServerBootStrap() {
+
+        if (workerThreads == 0) {
+            serverExecFactory = new NioDatagramChannelFactory(
+                    Executors.newFixedThreadPool(2));
+            return new ConnectionlessBootstrap(serverExecFactory);
+        } else {
+            serverExecFactory = new NioDatagramChannelFactory(
+                    Executors.newFixedThreadPool(2),
+                    workerThreads);
+            return new ConnectionlessBootstrap(serverExecFactory);
+        }
+    }
+
+    /**
+     * Stops the netflow collector.
+     */
+    public void stop() {
+        log.info("Stopped");
+        channelGroup.close();
+    }
+
+    /**
+     * Starts the netflow collector.
+     */
+    public void start() {
+        log.info("Started");
+        this.run();
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java
deleted file mode 100644
index 8dd434c..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.function.BiPredicate;
-import java.util.function.Function;
-import java.util.function.Predicate;
-
-import com.google.common.base.MoreObjects;
-
-import org.onosproject.netflow.DataRecord;
-import org.onosproject.netflow.TemplateId;
-import org.onosproject.netflow.Flow;
-import org.onosproject.netflow.FlowTemplateField;
-import org.onosproject.netflow.DataDeserializer;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-
-/**
- * A Flow Data Record is a data record that contains values of the Flow.
- * parameters corresponding to a Template Record.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public class DataFlowRecord extends DataRecord {
-
-    /*
-    0                   1                   2                   3
-    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |   Record 1 - Field Value 1    |   Record 1 - Field Value 2    |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |   Record 1 - Field Value 3    |             ...               |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |   Record 2 - Field Value 1    |   Record 2 - Field Value 2    |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |   Record 2 - Field Value 3    |             ...               |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-     */
-
-    private TemplateId templateId;
-
-    private List<Flow> flows;
-
-    public DataFlowRecord(Builder builder) {
-        this.templateId = builder.templateId;
-        this.flows = builder.flows;
-    }
-
-    /**
-     * Returns unique template ID.
-     * Template Records is given a unique Template ID.
-     * This uniqueness is local to the Observation
-     * Domain that generated the Template ID.  Template IDs 0-255 are
-     * reserved for Template FlowSets, Options FlowSets, and other
-     * reserved FlowSets yet to be created.
-     *
-     * @return list of flowsets
-     */
-    @Override
-    public TemplateId getTemplateId() {
-        return this.templateId;
-    }
-
-    /**
-     * Returns type of data flow.
-     *
-     * @return type of data flow
-     */
-    public List<Flow> getFlows() {
-        return flows;
-    }
-
-    @Override
-    public int hashCode() {
-        int hash = 7;
-        hash = 29 * hash + Objects.hashCode(this.templateId);
-        hash = 29 * hash + Objects.hashCode(this.flows);
-        return hash;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-        final DataFlowRecord other = (DataFlowRecord) obj;
-        if (!Objects.equals(this.templateId, other.templateId)) {
-            return false;
-        }
-        return Objects.equals(this.flows, other.flows);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(getClass())
-                .add("templateId", templateId)
-                .add("flows", flows)
-                .toString();
-    }
-
-    /**
-     * Deserializer function for data flow record.
-     *
-     * @return deserializer function
-     */
-    public static DataDeserializer<DataFlowRecord> deserializer() {
-        return (data, offset, length, template) -> {
-            ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
-
-            Predicate<FlowTemplateField> isValidTemplate = t
-                    -> Objects.nonNull(t) && Objects.nonNull(t.getFlowField()) && t.getLength() > 0;
-
-            BiPredicate<ByteBuffer, Integer> isValidBuffer = (b, l)
-                    -> b.hasRemaining() && b.remaining() >= l;
-
-            Function<FlowTemplateField, Flow> parser = (f) -> {
-
-                if (!isValidTemplate.test(f) && isValidBuffer.test(bb, f.getLength())) {
-                    throw new IllegalStateException("Invalid data set");
-                }
-                return new Flow.Builder()
-                        .field(f.getFlowField())
-                        .value(f.getFlowField().getParser().apply(bb, f.getLength()))
-                        .build();
-
-            };
-            DataTemplateRecord templateRecord = (DataTemplateRecord) template;
-            Builder builder = new Builder()
-                    .templateId(templateRecord.getTemplateId());
-            long count = templateRecord.getFields().stream()
-                    .filter(Objects::nonNull)
-                    .map(t -> builder.flow(parser.apply(t)))
-                    .count();
-
-            if (count != templateRecord.getFiledCount()) {
-                throw new IllegalStateException("Invalid parsing fields");
-            }
-            return builder.build();
-        };
-    }
-
-    /**
-     * Builder for data flow record.
-     */
-    private static class Builder {
-
-        private TemplateId templateId;
-
-        private List<Flow> flows = new LinkedList<>();
-
-        /**
-         * Setter for unique template ID.
-         *
-         * @param templateId template id.
-         * @return this class builder.
-         */
-        public Builder templateId(TemplateId templateId) {
-            this.templateId = templateId;
-            return this;
-        }
-
-        /**
-         * Setter for data flow.
-         *
-         * @param flow data flow.
-         * @return this class builder.
-         */
-        public Builder flow(Flow flow) {
-            this.flows.add(flow);
-            return this;
-        }
-
-        /**
-         * Setter for list of data flow.
-         *
-         * @param flows list of data flow.
-         * @return this class builder.
-         */
-        public Builder flows(List<Flow> flows) {
-            this.flows = flows;
-            return this;
-        }
-
-        /**
-         * Checks arguments for data flow record.
-         */
-        private void checkArguments() {
-            checkNotNull(templateId, "TemplateId cannot be null");
-        }
-
-        /**
-         * Builds data flow record.
-         *
-         * @return data flow record.
-         */
-        public DataFlowRecord build() {
-            checkArguments();
-            return new DataFlowRecord(this);
-        }
-
-    }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java
deleted file mode 100644
index 7183915..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.function.Function;
-
-import com.google.common.base.MoreObjects;
-
-import org.onlab.packet.DeserializationException;
-import org.onlab.packet.Deserializer;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * A Data FlowSet is one or more records, of the same type, that are
- * grouped together in an Export Packet.  Each record is either a Flow
- * Data Record or an Options Data Record previously defined by a
- * Template Record or an Options Template Record.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public final class DataFlowSet extends FlowSet {
-
-    /*
-    0                   1                   2                   3
-    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |   FlowSet ID = Template ID    |          Length               |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |   Record 1 - Field Value 1    |   Record 1 - Field Value 2    |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |   Record 1 - Field Value 3    |             ...               |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |   Record 2 - Field Value 1    |   Record 2 - Field Value 2    |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |   Record 2 - Field Value 3    |             ...               |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |   Record 3 - Field Value 1    |             ...               |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |              ...              |            Padding            |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-     */
-
-    private int flowSetId;
-
-    private int length;
-
-    private byte[] data;
-
-    private DataFlowSet(Builder builder) {
-        this.flowSetId = builder.flowSetId;
-        this.length = builder.length;
-        this.data = builder.data;
-    }
-
-    private List<DataFlowRecord> dataFlow = new LinkedList<>();
-
-    /**
-     * Returns flowset id.
-     * Each Data FlowSet is associated with a FlowSet ID.  The FlowSet
-     * ID maps to a (previously generated) Template ID
-     *
-     * @return flowset id
-     */
-    @Override
-    public int getFlowSetId() {
-        return this.flowSetId;
-    }
-
-    /**
-     * Returns length of this FlowSet.
-     * Length is the sum of the lengths
-     * of the FlowSet ID, Length itself, all Flow Records within this
-     * FlowSet, and the padding bytes, if any.
-     *
-     * @return length of the flowset
-     */
-    @Override
-    public int getLength() {
-        return this.length;
-    }
-
-    /**
-     * Returns list of data flow records.
-     *
-     * @return list of data flow records
-     */
-    public List<DataFlowRecord> getDataFlow() {
-        return dataFlow;
-    }
-
-    /**
-     * Set data flow record.
-     *
-     * @param dataFlow data flow record
-     */
-    public void setDataFlow(DataFlowRecord dataFlow) {
-        this.dataFlow.add(dataFlow);
-    }
-
-    /**
-     * Returns type of flowset.
-     *
-     * @return type of flowset
-     */
-    @Override
-    public Type getType() {
-        return Type.DATA_FLOWSET;
-    }
-
-    @Override
-    public int hashCode() {
-        int hash = 3;
-        hash = 79 * hash + this.flowSetId;
-        hash = 79 * hash + this.length;
-        hash = 79 * hash + Objects.hashCode(this.dataFlow);
-        return hash;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-        final DataFlowSet other = (DataFlowSet) obj;
-        if (this.flowSetId != other.flowSetId) {
-            return false;
-        }
-        if (this.length != other.length) {
-            return false;
-        }
-        return Objects.equals(this.dataFlow, other.dataFlow);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(getClass())
-                .add("flowSetId", flowSetId)
-                .add("length", length)
-                .add("data", data)
-                .toString();
-    }
-
-    /**
-     * Deserializer function for data flow set.
-     *
-     * @return deserializer function
-     */
-    public static Deserializer<DataFlowSet> deserializer() {
-        return (data, offset, length) -> {
-            Function<ByteBuffer, byte[]> readBytes = b -> {
-                if (b.remaining() == b.limit()) {
-                    return null;
-                }
-                byte[] bytes = new byte[b.remaining()];
-                b.get(bytes);
-                return bytes;
-            };
-            ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
-            return new Builder()
-                    .flowSetId(bb.getShort())
-                    .length(bb.getShort())
-                    .data(readBytes.apply(bb))
-                    .build();
-
-        };
-    }
-
-    /**
-     * Data eserializer function for data flow record.
-     *
-     * @param template data template record
-     * @throws DeserializationException if unable to deserialize data
-     */
-    public void dataDeserializer(DataTemplateRecord template) throws DeserializationException {
-        dataFlow = new LinkedList<>();
-        ByteBuffer bb = ByteBuffer.wrap(data, 0, data.length);
-
-        while (bb.hasRemaining()) {
-            if (bb.remaining() < template.getValueLength()) {
-                break;
-            }
-            byte[] dataRecord = new byte[template.getValueLength()];
-            bb.get(dataRecord);
-            this.setDataFlow(DataFlowRecord.deserializer().deserialize(
-                    data, 0, template.getValueLength(), template));
-        }
-
-    }
-
-    /**
-     * Builder for data flow set.
-     */
-    private static class Builder {
-
-        private int flowSetId;
-
-        private int length;
-
-        private byte[] data;
-
-        /**
-         * Setter for flowset id.
-         *
-         * @param flowSetId flowset id.
-         * @return this class builder.
-         */
-        public Builder flowSetId(int flowSetId) {
-            this.flowSetId = flowSetId;
-            return this;
-        }
-
-        /**
-         * Setter for length of this FlowSet.
-         *
-         * @param length length of this FlowSet.
-         * @return this class builder.
-         */
-        public Builder length(int length) {
-            this.length = length;
-            return this;
-        }
-
-        /**
-         * Setter for flow data.
-         *
-         * @param data flow data.
-         * @return this class builder.
-         */
-        public Builder data(byte[] data) {
-            this.data = data;
-            return this;
-        }
-
-        /**
-         * Checks arguments for data flow set.
-         */
-        private void checkArguments() {
-            checkState(flowSetId != 0, "Invalid data flowset id.");
-            checkState(length != 0, "Invalid data flowset length.");
-            checkNotNull(data, "Data flow set cannot be null");
-
-        }
-
-        /**
-         * Builds data flowset.
-         *
-         * @return data flowset.
-         */
-        public DataFlowSet build() {
-            checkArguments();
-            return new DataFlowSet(this);
-        }
-
-    }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java
deleted file mode 100644
index dc98531..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import com.google.common.base.MoreObjects;
-
-import org.onlab.packet.Deserializer;
-import org.onosproject.netflow.TemplateId;
-import org.onosproject.netflow.FlowTemplateField;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Template Record defines the structure and interpretation.
- * of fields in an Options Data Record, including defining the scope
- * within which the Options Data Record is relevant.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public final class DataTemplateRecord extends TemplateRecord {
-
-    /*
-        0                   1                   2                   3
-    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |      Template ID 256          |         Field Count           |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |        Field Type 1           |         Field Length 1        |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |        Field Type 2           |         Field Length 2        |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-     */
-
-    private TemplateId templateId;
-
-    private int filedCount;
-
-    private List<FlowTemplateField> fields;
-
-    private DataTemplateRecord(Builder builder) {
-        this.templateId = builder.templateId;
-        this.filedCount = builder.filedCount;
-        this.fields = builder.fields;
-    }
-
-    /**
-     * Returns template record's template id.
-     * Template Records is given a unique Template ID.
-     * This uniqueness is local to the Observation
-     * Domain that generated the Template ID.  Template IDs 0-255 are
-     * reserved for Template FlowSets, Options FlowSets, and other
-     * reserved FlowSets yet to be created.
-     *
-     * @return list of flowsets
-     */
-    @Override
-    public TemplateId getTemplateId() {
-        return templateId;
-    }
-
-    /**
-     * Returns number of fields in this Template Record.
-     *
-     * @return field count
-     */
-    public int getFiledCount() {
-        return filedCount;
-    }
-
-    /**
-     * Returns list of flow template fields.
-     *
-     * @return list of flow template fields
-     */
-    public List<FlowTemplateField> getFields() {
-        return fields;
-    }
-
-    public int getValueLength() {
-        Optional.ofNullable(fields)
-                .orElseThrow(() -> new IllegalStateException("Invalid fields"));
-        return fields.stream()
-                .filter(Objects::nonNull)
-                .map(FlowTemplateField::getLength)
-                .collect(Collectors.summingInt(Integer::intValue));
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(getClass())
-                .add("templateId", templateId)
-                .add("filedCount", filedCount)
-                .add("fields", fields)
-                .toString();
-    }
-
-    /**
-     * Data deserializer function for data template record.
-     *
-     * @return data deserializer function
-     */
-    public static Deserializer<DataTemplateRecord> deserializer() {
-        return (data, offset, length) -> {
-            Predicate<ByteBuffer> isValidBuffer = b -> b.remaining() < FlowSet.FIELD_LENTH;
-            Function<ByteBuffer, FlowTemplateField> parse = (b)
-                    -> {
-                if (isValidBuffer.test(b)) {
-                    throw new IllegalStateException("Invalid buffersize");
-                }
-                return new FlowTemplateField.Builder()
-                        .flowField(b.getShort())
-                        .length(b.getShort())
-                        .build();
-            };
-
-            Builder builder = new Builder();
-            ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
-            if (isValidBuffer.test(bb)) {
-                throw new IllegalStateException("Invalid buffersize");
-            }
-            builder.templateId(bb.getShort())
-                    .filedCount(bb.getShort());
-            IntStream.rangeClosed(1, builder.filedCount).forEach(i -> builder.templateField(parse.apply(bb)));
-            return builder.build();
-        };
-    }
-
-    /**
-     * Builder for data template record.
-     */
-    private static class Builder {
-
-        private TemplateId templateId;
-
-        private int filedCount;
-
-        private List<FlowTemplateField> fields = new LinkedList<>();
-
-        /**
-         * Setter for template record's template id.
-         *
-         * @param templateId template record's template id.
-         * @return this class builder.
-         */
-        public Builder templateId(int templateId) {
-            this.templateId = new TemplateId((templateId));
-            return this;
-        }
-
-        /**
-         * Setter for number of fields in this Template Record.
-         *
-         * @param filedCount number of fields in this Template Record.
-         * @return this class builder.
-         */
-        public Builder filedCount(int filedCount) {
-            this.filedCount = filedCount;
-            return this;
-        }
-
-        /**
-         * Setter for list of flow template fields.
-         *
-         * @param fields list of flow template fields.
-         * @return this class builder.
-         */
-        public Builder templateFields(List<FlowTemplateField> fields) {
-            this.fields = fields;
-            return this;
-        }
-
-        /**
-         * Setter for  flow template fields.
-         *
-         * @param field flow template fields.
-         * @return this class builder.
-         */
-        public Builder templateField(FlowTemplateField field) {
-            this.fields.add(field);
-            return this;
-        }
-
-        /**
-         * Checks arguments for data template record.
-         */
-        private void checkArguments() {
-            checkState(filedCount != 0, "Invalid template filed count.");
-            checkNotNull(templateId, "Template Id cannot be null.");
-
-        }
-
-        /**
-         * Builds data template record.
-         *
-         * @return data template record.
-         */
-        public DataTemplateRecord build() {
-            checkArguments();
-            return new DataTemplateRecord(this);
-        }
-
-    }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java
new file mode 100644
index 0000000..90d75dd
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.Optional;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.onosproject.netflow.NetflowStore;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+import org.onosproject.netflow.FlowTemplateField;
+import org.onosproject.netflow.SourceId;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataRecord;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.FlowSet;
+import org.onosproject.netflow.DataFlowSet;
+import org.onosproject.netflow.TemplateRecord;
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.OptionalTemplateFlowSet;
+import org.onosproject.netflow.TemplateFlowSet;
+import org.onlab.packet.BasePacket;
+
+/**
+ * Manages inventory of Netflow template and data flowset to distribute
+ * information.
+ */
+@Component(immediate = true, service = NetflowStore.class)
+public class DistributedNetflowStore implements NetflowStore {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private EventuallyConsistentMap<TemplateId, DataTemplateRecord> templateFlowSet;
+    private EventuallyConsistentMap<TemplateId, OptionalTemplateFlowSet> optionalTemplateFlowSet;
+    private EventuallyConsistentMap<TemplateId, List<DataFlowRecord>> dataFlowSet;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Activate
+    protected void activate() {
+        KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.MISC)
+                .register(List.class)
+                .register(LinkedList.class)
+                .register(ArrayList.class)
+                .register(HashSet.class)
+                .register(BasePacket.class)
+                .register(Flow.class)
+                .register(FlowField.class)
+                .register(FlowTemplateField.class)
+                .register(SourceId.class)
+                .register(TemplateId.class)
+                .register(DataRecord.class)
+                .register(DataFlowRecord.class)
+                .register(FlowSet.class)
+                .register(DataFlowSet.class)
+                .register(TemplateRecord.class)
+                .register(DataTemplateRecord.class)
+                .register(NetFlowPacket.class)
+                .register(OptionalTemplateFlowSet.class)
+                .register(TemplateFlowSet.class)
+                .register(TemplateRecord.class);
+
+        templateFlowSet = storageService.<TemplateId, DataTemplateRecord>eventuallyConsistentMapBuilder()
+                .withSerializer(serializer)
+                .withName("netflow-templateflowset")
+                .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+                .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+                .withTombstonesDisabled()
+                .build();
+
+        optionalTemplateFlowSet = storageService.<TemplateId, OptionalTemplateFlowSet>eventuallyConsistentMapBuilder()
+                .withSerializer(serializer)
+                .withName("netflow-optionaltemplateflowset")
+                .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+                .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+                .withTombstonesDisabled()
+                .build();
+
+        dataFlowSet = storageService.<TemplateId, List<DataFlowRecord>>eventuallyConsistentMapBuilder()
+                .withSerializer(serializer)
+                .withName("netflow-dataflowset")
+                .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+                .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+                .withTombstonesDisabled()
+                .build();
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactive() {
+        log.info("Stopped");
+    }
+
+    /**
+     * Get template flowset from store.
+     * it will fetch current template which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return optional of data template record, optional will be empty if template not found.
+     */
+    @Override
+    public Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId) {
+        return Optional.ofNullable(templateFlowSet.get(templateId));
+    }
+
+    /**
+     * Get set of template flowsets from store.
+     * it will fetch all template flowsets from store.
+     *
+     * @return set of data template record, set will be empty if templates not found in the store.
+     */
+    @Override
+    public Set<DataTemplateRecord> getTemplateFlowSet() {
+        return templateFlowSet.values().stream()
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Get optional template flowset from store.
+     * it will fetch current optional template which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return optional of optional template flowset, optional will be empty if template not found.
+     */
+    @Override
+    public Optional<OptionalTemplateFlowSet> getOptionalTemplateFlowSet(TemplateId templateId) {
+        return Optional.ofNullable(optionalTemplateFlowSet.get(templateId));
+    }
+
+    /**
+     * Get set of optional template flowsets from store.
+     * it will fetch all optional template flowsets from store.
+     *
+     * @return set of optional template flowsets, set will be empty if templates not found in the store.
+     */
+    @Override
+    public Set<OptionalTemplateFlowSet> getOptionalTemplateFlowSet() {
+        return ImmutableSet.copyOf(optionalTemplateFlowSet.values());
+    }
+
+    /**
+     * Get data flowset from store.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return list of data flow record, list will be empty if template id not matched.
+     */
+    @Override
+    public List<DataFlowRecord> getDataFlowSet(TemplateId templateId) {
+        List<DataFlowRecord> dataRecord = dataFlowSet.get(templateId);
+        if (!Objects.isNull(dataRecord)) {
+            return ImmutableList.copyOf(dataRecord);
+        }
+        return Lists.newArrayList();
+    }
+
+    /**
+     * Get data flowset from store.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @return mapping from a template id to data flow record.
+     */
+    @Override
+    public Map<TemplateId, List<DataFlowRecord>> getDataFlowSet() {
+        return dataFlowSet.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
+    }
+
+    /**
+     * Update template flowset to the store.
+     * Add new template to store if it not exist,
+     * otherwise it will replace the existing template.
+     *
+     * @param templateRecord template flowset record.
+     */
+    @Override
+    public void updateTemplateFlowSet(DataTemplateRecord templateRecord) {
+        templateFlowSet.put(templateRecord.getTemplateId(), templateRecord);
+    }
+
+    /**
+     * Update optional template flowset to the store.
+     * Add new optional template to store if it not exist,
+     * otherwise it will replace the existing optional template.
+     *
+     * @param templateFlowSet optional template flowset.
+     */
+    @Override
+    public void updateOptionalTemplateFlowSet(OptionalTemplateFlowSet templateFlowSet) {
+        optionalTemplateFlowSet.put(templateFlowSet.getTemplateId(), templateFlowSet);
+    }
+
+    /**
+     * Add data flow record to the store.
+     * Add new data flow record to store
+     *
+     * @param dataFlowRecord data flow record.
+     */
+    @Override
+    public void addDataFlowSet(DataFlowRecord dataFlowRecord) {
+        dataFlowSet.compute(dataFlowRecord.getTemplateId(),
+                (id, flowSet) -> {
+                    List<DataFlowRecord> newSet = new ArrayList<>();
+                    if (flowSet != null) {
+                        newSet.addAll(flowSet);
+                    }
+                    newSet.add(dataFlowRecord);
+                    return newSet;
+                });
+    }
+
+    /**
+     * Remove template flowset from store.
+     * it will remove template flowset which is matching to the given template id from store.
+     *
+     * @param templateId template id.
+     */
+    @Override
+    public void clearTemplateFlowSet(TemplateId templateId) {
+        if (templateFlowSet.containsKey(templateId)) {
+            templateFlowSet.remove(templateId);
+        }
+    }
+
+    /**
+     * Remove all template flowset from store.
+     * it will remove all template flowsets from store.
+     */
+    @Override
+    public void clearTemplateFlowSet() {
+        templateFlowSet.clear();
+    }
+
+    /**
+     * Remove optional template flowset from store.
+     * it will remove optional template which is matching to the given template id from store.
+     *
+     * @param templateId template id.
+     */
+    @Override
+    public void clearOptionalTemplateFlowSet(TemplateId templateId) {
+        if (optionalTemplateFlowSet.containsKey(templateId)) {
+            optionalTemplateFlowSet.remove(templateId);
+        }
+    }
+
+    /**
+     * Remove all optional template flowset from store.
+     * it will remove all optional template flowsets from store.
+     */
+    @Override
+    public void clearOptionalTemplateFlowSet() {
+        optionalTemplateFlowSet.clear();
+    }
+
+    /**
+     * Remove data flowset from store.
+     * it will remove dataflowset which is matching to the given template id from store.
+     *
+     * @param templateId template id.
+     */
+    @Override
+    public void clearDataFlowSet(TemplateId templateId) {
+        if (dataFlowSet.containsKey(templateId)) {
+            dataFlowSet.remove(templateId);
+        }
+    }
+
+    /**
+     * Remove all data flowset from store.
+     * it will remove all data flowsets from store.
+     */
+    @Override
+    public void clearDataFlowSet() {
+        dataFlowSet.clear();
+    }
+
+    /**
+     * Remove template, optional template and data flowsets from store.
+     * it will remove all flowsets from store.
+     */
+    @Override
+    public void clearAllFlowSet() {
+        templateFlowSet.clear();
+        optionalTemplateFlowSet.clear();
+        dataFlowSet.clear();
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java
deleted file mode 100644
index 1b292a7..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.onlab.packet.BasePacket;
-import org.onlab.packet.DeserializationException;
-import org.onlab.packet.Deserializer;
-
-/**
- * FlowSet is a generic term for a collection of Flow Records that have.
- * a similar structure.  In an Export Packet, one or more FlowSets
- * follow the Packet Header.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public abstract class FlowSet extends BasePacket {
-
-    public static final int FLOW_SET_HEADER_LENTH = 4;
-
-    public static final int FIELD_LENTH = 4;
-
-    public static final int RECORD_HEADER_LENGTH = 4;
-
-    /**
-     * FlowSets type
-     * FlowSets: Template FlowSet, Options Template FlowSet, and Data FlowSet.
-     */
-    public enum Type {
-
-        TEMPLATE_FLOWSET(0, TemplateFlowSet.deserializer()),
-        OPTIONAL_TEMPLATE_FLOWSET(1, OptionalTemplateFlowSet.deserializer()),
-        DATA_FLOWSET(Integer.MAX_VALUE, DataFlowSet.deserializer());
-
-        private final int flowSetId;
-        private final Deserializer deserializer;
-
-        Type(int flowSetId, Deserializer deserializer) {
-            this.flowSetId = flowSetId;
-            this.deserializer = deserializer;
-        }
-
-        private static Map<Integer, Type> parser = new ConcurrentHashMap<>();
-
-        static {
-            Arrays.stream(Type.values()).forEach(type -> parser.put(type.flowSetId, type));
-        }
-
-        public static Type getType(int flowSetId) throws DeserializationException {
-            if (flowSetId < 0) {
-                throw new DeserializationException("Invalid trap type");
-            }
-            return Optional.of(flowSetId)
-                    .filter(id -> parser.containsKey(id))
-                    .map(id -> parser.get(id))
-                    .orElse(DATA_FLOWSET);
-        }
-
-        public Deserializer getDecoder() {
-            return this.deserializer;
-        }
-
-    }
-
-    public abstract Type getType();
-
-    public abstract int getFlowSetId();
-
-    public abstract int getLength();
-
-    @Override
-    public byte[] serialize() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
index df5ffce..27eb8ea 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.netflow.impl;
 
+import java.util.Optional;
 import org.jboss.netty.channel.ChannelHandlerContext;
 import org.jboss.netty.channel.ChannelStateEvent;
 import org.jboss.netty.channel.ExceptionEvent;
@@ -22,18 +23,35 @@
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.SimpleChannelHandler;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.FlowSet;
+import org.onosproject.netflow.TemplateFlowSet;
+import org.onosproject.netflow.DataFlowSet;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.NetflowController;
+
 /**
  * Channel handler deals with the netfow exporter connection and dispatches messages
  * from netfow exporter to the appropriate locations.
  */
 public class NeflowChannelHandler extends SimpleChannelHandler {
 
+    private static final Logger log = LoggerFactory.getLogger(NeflowChannelHandler.class);
+
     private Channel channel;
 
+    private NetflowController controller;
+
     /**
      * Create a new netflow channelHandler instance.
+     *
+     * @param controller netflow controller.
      */
-    NeflowChannelHandler() {
+    NeflowChannelHandler(NetflowController controller) {
+        this.controller = controller;
     }
 
     /**
@@ -81,8 +99,39 @@
      */
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
-        //TODO Netflow message store to netflow distributed store
-        NetFlowPacket packet = (NetFlowPacket) event.getMessage();
+        try {
+            NetFlowPacket netFlowPacket = (NetFlowPacket) event.getMessage();
+            netFlowPacket.getFlowSets()
+                    .stream()
+                    .filter(n -> n.getType() == FlowSet.Type.TEMPLATE_FLOWSET)
+                    .map(t -> (TemplateFlowSet) t)
+                    .flatMap(t -> t.getRecords().stream())
+                    .forEach(t -> controller.addTemplateFlowSet(t));
+
+            netFlowPacket.getFlowSets()
+                    .stream()
+                    .filter(n -> n.getType() == FlowSet.Type.DATA_FLOWSET)
+                    .map(t -> (DataFlowSet) t)
+                    .forEach(data -> {
+                        Optional<DataTemplateRecord> template = controller
+                                .getTemplateFlowSet(TemplateId.valueOf(data.getFlowSetId()));
+                        if (!template.isPresent()) {
+                            return;
+                        }
+                        try {
+                            data.dataDeserializer(template.get());
+                            data.getDataFlow()
+                                    .stream()
+                                    .forEach(dataflow -> controller.updateDataFlowSet(dataflow));
+                        } catch (Exception ex) {
+                            log.error("Netflow dataflow deserializer exception ", ex);
+                        }
+                    });
+            log.info("Netflow message received {}", netFlowPacket);
+        } catch (Exception er) {
+            log.error("Netflow message deserializer exception ", er);
+        }
+
     }
 
 }
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
index be6ae9a..bbe0c0f 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
@@ -23,6 +23,7 @@
 
 import org.onlab.packet.Deserializer;
 import org.onlab.packet.BasePacket;
+import org.onosproject.netflow.FlowSet;
 
 import static com.google.common.base.Preconditions.checkState;
 
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java
new file mode 100644
index 0000000..6d33f61
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import org.onosproject.netflow.NetflowController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import org.onosproject.netflow.NetflowStore;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.DataTemplateRecord;
+
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * It control and manage the netflow traffic.
+ * it is Collecting, Storing and analyzing NetFlow data
+ * it can help to understand which applications, and protocols
+ * may be consuming the most network bandwidth by tracking processes,
+ * protocols, times of day, and traffic routing.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+@Component(immediate = true, service = NetflowController.class)
+public class NetflowControllerImpl implements NetflowController {
+
+    private static final Logger log = LoggerFactory.getLogger(NetflowControllerImpl.class);
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected NetflowStore store;
+
+    @Activate
+    public void activate() {
+        Controller ctrl = new Controller(this);
+        ctrl.start();
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    /**
+     * Add template flowset to controller.
+     * Add new template to controller if it not exist,
+     * otherwise it will replace the existing template.
+     *
+     * @param templateRecord template flowset record.
+     */
+    @Override
+    public void addTemplateFlowSet(DataTemplateRecord templateRecord) {
+        store.updateTemplateFlowSet(templateRecord);
+    }
+
+    /**
+     * Update data flowset to controller.
+     * it will update new data flowset to store.
+     *
+     * @param dataFlowRecord data flowset record.
+     */
+    @Override
+    public void updateDataFlowSet(DataFlowRecord dataFlowRecord) {
+        store.addDataFlowSet(dataFlowRecord);
+    }
+
+    /**
+     * Get template flowset from controller.
+     * it will fetch current template which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return optional of data template record, optional will be empty if template not found.
+     */
+    @Override
+    public Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId) {
+        return store.getTemplateFlowSet(templateId);
+    }
+
+    /**
+     * Get data flowset from controller.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @param templateId template id.
+     * @return list of data flow record, list will be empty if template id not matched.
+     */
+    @Override
+    public List<DataFlowRecord> getDataFlowSet(TemplateId templateId) {
+        return store.getDataFlowSet(templateId);
+    }
+
+    /**
+     * Get all template flowsets from controller.
+     * it will fetch all templates from store.
+     *
+     * @return set of data template record, set will be empty if templates not found in the store.
+     */
+    @Override
+    public Set<DataTemplateRecord> getTemplateFlowSet() {
+        return store.getTemplateFlowSet();
+    }
+
+    /**
+     * Get data flowset from controller.
+     * it will fetch current data flowset which is matching to the template id from store.
+     *
+     * @return mapping from a template id to data flow record.
+     */
+    @Override
+    public Map<TemplateId, List<DataFlowRecord>> getDataFlowSet() {
+        return store.getDataFlowSet();
+    }
+
+}
\ No newline at end of file
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
index 8dad97d..6807a5c 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
@@ -42,7 +42,7 @@
                 return NetFlowPacket.deserializer().deserialize(bytes, 0, bytes.length);
             }
         } catch (Exception e) {
-            log.error("Netflow message decode error");
+            log.error("Netflow message decode error ", e);
             buffer.resetReaderIndex();
             buffer.discardReadBytes();
 
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
index 178f6e6..9f5ef2e 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
@@ -18,18 +18,23 @@
 import org.jboss.netty.channel.ChannelPipeline;
 import org.jboss.netty.channel.ChannelPipelineFactory;
 import org.jboss.netty.channel.Channels;
-
+import org.onosproject.netflow.NetflowController;
 
 /**
  * Creates a ChannelPipeline for a server-side netflow message channel.
  */
 public class NetflowPipelineFactory implements ChannelPipelineFactory {
 
+    private NetflowController controller;
+
     /**
      * Constructor to initialize the values.
+     *
+     * @param controller netflow controller.
      */
-    public NetflowPipelineFactory() {
+    public NetflowPipelineFactory(NetflowController controller) {
         super();
+        this.controller = controller;
     }
 
     /**
@@ -40,7 +45,7 @@
      */
     @Override
     public ChannelPipeline getPipeline() throws Exception {
-        NeflowChannelHandler handler = new NeflowChannelHandler();
+        NeflowChannelHandler handler = new NeflowChannelHandler(controller);
 
         ChannelPipeline pipeline = Channels.pipeline();
         pipeline.addLast("netflowmessagedecoder", new NetflowMessageDecoder());
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java
deleted file mode 100644
index 9a00bb2..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.util.List;
-
-import org.onlab.packet.Deserializer;
-
-
-/**
- * The Options Template Record (and its corresponding Options Data
- * Record) is used to supply information about the NetFlow process
- * configuration or NetFlow process specific data, rather than supplying
- * information about IP Flows.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public class OptionalTemplateFlowSet extends FlowSet {
-
-    /*
-        0                   1                   2                   3
-    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |       FlowSet ID = 1          |          Length               |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |         Template ID           |      Option Scope Length      |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |        Option Length          |       Scope 1 Field Type      |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |     Scope 1 Field Length      |               ...             |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |     Scope N Field Length      |      Option 1 Field Type      |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |     Option 1 Field Length     |             ...               |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |     Option M Field Length     |           Padding             |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-     */
-
-    private int flowSetId;
-
-    private int length;
-
-    private List<DataTemplateRecord> records;
-
-    /**
-     * Returns flowset type.
-     *
-     * @return flowset type
-     */
-    @Override
-    public Type getType() {
-        return Type.OPTIONAL_TEMPLATE_FLOWSET;
-    }
-
-    /**
-     * Returns flowset id.
-     * FlowSet ID value of 1 is reserved for the Options Template.
-     *
-     * @return flow set ID
-     */
-    public int getFlowSetId() {
-        return flowSetId;
-    }
-
-    /**
-     * Returns total length of this FlowSet.
-     * Each Options Template FlowSet
-     * MAY contain multiple Options Template Records.  Thus, the
-     * Length value MUST be used to determine the position of the next
-     * FlowSet record, which could be either a Template FlowSet or
-     * Data FlowSet.
-     *
-     * @return flow set ID
-     */
-    public int getLength() {
-        return length;
-    }
-
-    /**
-     * Returns list of optional data template records.
-     *
-     * @return list of optional data template records
-     */
-    public List<DataTemplateRecord> getRecords() {
-        return records;
-    }
-
-    /**
-     * Deserializer function for data option template flowset.
-     *
-     * @return data deserializer function
-     */
-    public static Deserializer<OptionalTemplateFlowSet> deserializer() {
-        return (data, offset, length) -> {
-            //TODO parse optional template
-            return new OptionalTemplateFlowSet();
-        };
-    }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java
deleted file mode 100644
index b6bc51e..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.function.Predicate;
-import java.util.Objects;
-
-import com.google.common.base.MoreObjects;
-
-import org.onlab.packet.DeserializationException;
-import org.onlab.packet.Deserializer;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * One of the essential elements in the NetFlow format is the Template
- * FlowSet.  Templates greatly enhance the flexibility of the Flow
- * Record format because they allow the NetFlow Collector to process
- * Flow Records without necessarily knowing the interpretation of all
- * the data in the Flow Record.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public final class TemplateFlowSet extends FlowSet {
-
-    /*
-        0                   1                   2                   3
-    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |       FlowSet ID = 0          |          Length               |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |      Template ID 256          |         Field Count           |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |        Field Type 1           |         Field Length 1        |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |        Field Type 2           |         Field Length 2        |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |             ...               |              ...              |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |        Field Type N           |         Field Length N        |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |      Template ID 257          |         Field Count           |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |        Field Type 1           |         Field Length 1        |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |        Field Type 2           |         Field Length 2        |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |             ...               |              ...              |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |        Field Type M           |         Field Length M        |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |             ...               |              ...              |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |        Template ID K          |         Field Count           |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-   |             ...               |              ...              |
-   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
-     */
-
-    private int flowSetId;
-
-    private int length;
-
-    private List<DataTemplateRecord> records;
-
-    private TemplateFlowSet(Builder builder) {
-        this.records = builder.records;
-        this.length = builder.length;
-        this.flowSetId = builder.flowSetId;
-    }
-
-    /**
-     * Return template flow set id.
-     * FlowSet ID value of 0 is reserved for the Template FlowSet.
-     *
-     * @return flow set ID
-     */
-    @Override
-    public int getFlowSetId() {
-        return this.flowSetId;
-    }
-
-    /**
-     * Returns total length of this flowSet.
-     *
-     * @return length of flowset
-     */
-    @Override
-    public int getLength() {
-        return this.length;
-    }
-
-    /**
-     * Returns list of flow records.
-     *
-     * @return list of flow records
-     */
-    public List<DataTemplateRecord> getRecords() {
-        return records;
-    }
-
-    /**
-     * Returns type of the flowset.
-     *
-     * @return type of the flowset
-     */
-    @Override
-    public Type getType() {
-        return Type.TEMPLATE_FLOWSET;
-    }
-
-    @Override
-    public int hashCode() {
-        int hash = 7;
-        hash = 53 * hash + this.flowSetId;
-        hash = 53 * hash + this.length;
-        hash = 53 * hash + Objects.hashCode(this.records);
-        return hash;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null) {
-            return false;
-        }
-        if (getClass() != obj.getClass()) {
-            return false;
-        }
-        final TemplateFlowSet other = (TemplateFlowSet) obj;
-        if (this.flowSetId != other.flowSetId) {
-            return false;
-        }
-        if (this.length != other.length) {
-            return false;
-        }
-        return Objects.equals(this.records, other.records);
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(getClass())
-                .add("flowSetId", flowSetId)
-                .add("length", length)
-                .add("records", records)
-                .toString();
-    }
-
-    /**
-     * Data deserializer function for template flow set.
-     *
-     * @return data deserializer function
-     */
-    public static Deserializer<TemplateFlowSet> deserializer() {
-        return (data, offset, length) -> {
-
-            Predicate<ByteBuffer> isValidBuffer = b -> b.remaining() < FlowSet.FIELD_LENTH;
-
-            ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
-            if (isValidBuffer.test(bb)) {
-                throw new DeserializationException("Invalid buffer size");
-            }
-            Builder builder = new Builder()
-                    .flowSetId(bb.getShort())
-                    .length(bb.getShort());
-            while (bb.hasRemaining()) {
-                if (isValidBuffer.test(bb)) {
-                    break;
-                }
-                int templateId = bb.getShort();
-                int fieldCount = bb.getShort();
-                int bufferLength = (fieldCount * FlowSet.FIELD_LENTH) + FlowSet.FIELD_LENTH;
-                byte[] record = new byte[bufferLength];
-                bb.position(bb.position() - FlowSet.FIELD_LENTH);
-                if (bb.remaining() < bufferLength) {
-                    break;
-                }
-                bb.get(record);
-                builder.templateRecord(DataTemplateRecord.deserializer().deserialize(record, 0, bufferLength));
-
-            }
-            return builder.build();
-        };
-    }
-
-    /**
-     * Builder for template flow set.
-     */
-    private static class Builder {
-
-        private int flowSetId;
-
-        private int length;
-
-        private List<DataTemplateRecord> records = new LinkedList<>();
-
-        /**
-         * Setter for template flow set id.
-         *
-         * @param flowSetId template flow set id.
-         * @return this class builder.
-         */
-        public Builder flowSetId(int flowSetId) {
-            this.flowSetId = flowSetId;
-            return this;
-        }
-
-        /**
-         * Setter for total length of this flowSet.
-         *
-         * @param length total length of this flowSet.
-         * @return this class builder.
-         */
-        public Builder length(int length) {
-            this.length = length;
-            return this;
-        }
-
-        /**
-         * Setter for list of flow records.
-         *
-         * @param records list of flow records.
-         * @return this class builder.
-         */
-        public Builder templateRecords(List<DataTemplateRecord> records) {
-            this.records = records;
-            return this;
-        }
-
-        /**
-         * Setter for flow records.
-         *
-         * @param record flow records.
-         * @return this class builder.
-         */
-        public Builder templateRecord(DataTemplateRecord record) {
-            this.records.add(record);
-            return this;
-        }
-
-        /**
-         * Checks arguments for template flow set.
-         */
-        private void checkArguments() {
-            checkState(flowSetId == 0, "Invalid flow set id.");
-            checkState(length == 0, "Invalid flow set length.");
-
-        }
-
-        /**
-         * Builds template flow set.
-         *
-         * @return template flow set.
-         */
-        public TemplateFlowSet build() {
-            checkArguments();
-            return new TemplateFlowSet(this);
-        }
-
-    }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java
deleted file mode 100644
index db66292..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import org.onlab.packet.BasePacket;
-
-import org.onosproject.netflow.FlowRecord;
-/**
- * A Template Record defines the structure and interpretation of fields.
- * in a Flow Data Record.
- */
-public abstract class TemplateRecord extends BasePacket implements FlowRecord {
-
-    @Override
-    public byte[] serialize() {
-        throw new UnsupportedOperationException("Not supported yet.");
-    }
-}