Netflow support for ONOS

Change-Id: I6c55168229f63d70c97fc6e19169d120a39de3fe
diff --git a/apps/netflow/app/BUILD b/apps/netflow/app/BUILD
new file mode 100644
index 0000000..747d61d
--- /dev/null
+++ b/apps/netflow/app/BUILD
@@ -0,0 +1,10 @@
+COMPILE_DEPS = CORE_DEPS + [
+    "//core/store/serializers:onos-core-serializers",
+    "//apps/netflow/api:onos-apps-netflow-api",
+    "@io_netty_netty_common//jar",
+    "@io_netty_netty//jar",
+]
+
+osgi_jar(
+    deps = COMPILE_DEPS,
+)
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java
new file mode 100644
index 0000000..8dd434c
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.base.MoreObjects;
+
+import org.onosproject.netflow.DataRecord;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowTemplateField;
+import org.onosproject.netflow.DataDeserializer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+/**
+ * A Flow Data Record is a data record that contains values of the Flow.
+ * parameters corresponding to a Template Record.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public class DataFlowRecord extends DataRecord {
+
+    /*
+    0                   1                   2                   3
+    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |   Record 1 - Field Value 1    |   Record 1 - Field Value 2    |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |   Record 1 - Field Value 3    |             ...               |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |   Record 2 - Field Value 1    |   Record 2 - Field Value 2    |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |   Record 2 - Field Value 3    |             ...               |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+     */
+
+    private TemplateId templateId;
+
+    private List<Flow> flows;
+
+    public DataFlowRecord(Builder builder) {
+        this.templateId = builder.templateId;
+        this.flows = builder.flows;
+    }
+
+    /**
+     * Returns unique template ID.
+     * Template Records is given a unique Template ID.
+     * This uniqueness is local to the Observation
+     * Domain that generated the Template ID.  Template IDs 0-255 are
+     * reserved for Template FlowSets, Options FlowSets, and other
+     * reserved FlowSets yet to be created.
+     *
+     * @return list of flowsets
+     */
+    @Override
+    public TemplateId getTemplateId() {
+        return this.templateId;
+    }
+
+    /**
+     * Returns type of data flow.
+     *
+     * @return type of data flow
+     */
+    public List<Flow> getFlows() {
+        return flows;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 7;
+        hash = 29 * hash + Objects.hashCode(this.templateId);
+        hash = 29 * hash + Objects.hashCode(this.flows);
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final DataFlowRecord other = (DataFlowRecord) obj;
+        if (!Objects.equals(this.templateId, other.templateId)) {
+            return false;
+        }
+        return Objects.equals(this.flows, other.flows);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("templateId", templateId)
+                .add("flows", flows)
+                .toString();
+    }
+
+    /**
+     * Deserializer function for data flow record.
+     *
+     * @return deserializer function
+     */
+    public static DataDeserializer<DataFlowRecord> deserializer() {
+        return (data, offset, length, template) -> {
+            ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+
+            Predicate<FlowTemplateField> isValidTemplate = t
+                    -> Objects.nonNull(t) && Objects.nonNull(t.getFlowField()) && t.getLength() > 0;
+
+            BiPredicate<ByteBuffer, Integer> isValidBuffer = (b, l)
+                    -> b.hasRemaining() && b.remaining() >= l;
+
+            Function<FlowTemplateField, Flow> parser = (f) -> {
+
+                if (!isValidTemplate.test(f) && isValidBuffer.test(bb, f.getLength())) {
+                    throw new IllegalStateException("Invalid data set");
+                }
+                return new Flow.Builder()
+                        .field(f.getFlowField())
+                        .value(f.getFlowField().getParser().apply(bb, f.getLength()))
+                        .build();
+
+            };
+            DataTemplateRecord templateRecord = (DataTemplateRecord) template;
+            Builder builder = new Builder()
+                    .templateId(templateRecord.getTemplateId());
+            long count = templateRecord.getFields().stream()
+                    .filter(Objects::nonNull)
+                    .map(t -> builder.flow(parser.apply(t)))
+                    .count();
+
+            if (count != templateRecord.getFiledCount()) {
+                throw new IllegalStateException("Invalid parsing fields");
+            }
+            return builder.build();
+        };
+    }
+
+    /**
+     * Builder for data flow record.
+     */
+    private static class Builder {
+
+        private TemplateId templateId;
+
+        private List<Flow> flows = new LinkedList<>();
+
+        /**
+         * Setter for unique template ID.
+         *
+         * @param templateId template id.
+         * @return this class builder.
+         */
+        public Builder templateId(TemplateId templateId) {
+            this.templateId = templateId;
+            return this;
+        }
+
+        /**
+         * Setter for data flow.
+         *
+         * @param flow data flow.
+         * @return this class builder.
+         */
+        public Builder flow(Flow flow) {
+            this.flows.add(flow);
+            return this;
+        }
+
+        /**
+         * Setter for list of data flow.
+         *
+         * @param flows list of data flow.
+         * @return this class builder.
+         */
+        public Builder flows(List<Flow> flows) {
+            this.flows = flows;
+            return this;
+        }
+
+        /**
+         * Checks arguments for data flow record.
+         */
+        private void checkArguments() {
+            checkNotNull(templateId, "TemplateId cannot be null");
+        }
+
+        /**
+         * Builds data flow record.
+         *
+         * @return data flow record.
+         */
+        public DataFlowRecord build() {
+            checkArguments();
+            return new DataFlowRecord(this);
+        }
+
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java
new file mode 100644
index 0000000..7183915
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java
@@ -0,0 +1,280 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import com.google.common.base.MoreObjects;
+
+import org.onlab.packet.DeserializationException;
+import org.onlab.packet.Deserializer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * A Data FlowSet is one or more records, of the same type, that are
+ * grouped together in an Export Packet.  Each record is either a Flow
+ * Data Record or an Options Data Record previously defined by a
+ * Template Record or an Options Template Record.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public final class DataFlowSet extends FlowSet {
+
+    /*
+    0                   1                   2                   3
+    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |   FlowSet ID = Template ID    |          Length               |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |   Record 1 - Field Value 1    |   Record 1 - Field Value 2    |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |   Record 1 - Field Value 3    |             ...               |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |   Record 2 - Field Value 1    |   Record 2 - Field Value 2    |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |   Record 2 - Field Value 3    |             ...               |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |   Record 3 - Field Value 1    |             ...               |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |              ...              |            Padding            |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+     */
+
+    private int flowSetId;
+
+    private int length;
+
+    private byte[] data;
+
+    private DataFlowSet(Builder builder) {
+        this.flowSetId = builder.flowSetId;
+        this.length = builder.length;
+        this.data = builder.data;
+    }
+
+    private List<DataFlowRecord> dataFlow = new LinkedList<>();
+
+    /**
+     * Returns flowset id.
+     * Each Data FlowSet is associated with a FlowSet ID.  The FlowSet
+     * ID maps to a (previously generated) Template ID
+     *
+     * @return flowset id
+     */
+    @Override
+    public int getFlowSetId() {
+        return this.flowSetId;
+    }
+
+    /**
+     * Returns length of this FlowSet.
+     * Length is the sum of the lengths
+     * of the FlowSet ID, Length itself, all Flow Records within this
+     * FlowSet, and the padding bytes, if any.
+     *
+     * @return length of the flowset
+     */
+    @Override
+    public int getLength() {
+        return this.length;
+    }
+
+    /**
+     * Returns list of data flow records.
+     *
+     * @return list of data flow records
+     */
+    public List<DataFlowRecord> getDataFlow() {
+        return dataFlow;
+    }
+
+    /**
+     * Set data flow record.
+     *
+     * @param dataFlow data flow record
+     */
+    public void setDataFlow(DataFlowRecord dataFlow) {
+        this.dataFlow.add(dataFlow);
+    }
+
+    /**
+     * Returns type of flowset.
+     *
+     * @return type of flowset
+     */
+    @Override
+    public Type getType() {
+        return Type.DATA_FLOWSET;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 3;
+        hash = 79 * hash + this.flowSetId;
+        hash = 79 * hash + this.length;
+        hash = 79 * hash + Objects.hashCode(this.dataFlow);
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final DataFlowSet other = (DataFlowSet) obj;
+        if (this.flowSetId != other.flowSetId) {
+            return false;
+        }
+        if (this.length != other.length) {
+            return false;
+        }
+        return Objects.equals(this.dataFlow, other.dataFlow);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("flowSetId", flowSetId)
+                .add("length", length)
+                .add("data", data)
+                .toString();
+    }
+
+    /**
+     * Deserializer function for data flow set.
+     *
+     * @return deserializer function
+     */
+    public static Deserializer<DataFlowSet> deserializer() {
+        return (data, offset, length) -> {
+            Function<ByteBuffer, byte[]> readBytes = b -> {
+                if (b.remaining() == b.limit()) {
+                    return null;
+                }
+                byte[] bytes = new byte[b.remaining()];
+                b.get(bytes);
+                return bytes;
+            };
+            ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+            return new Builder()
+                    .flowSetId(bb.getShort())
+                    .length(bb.getShort())
+                    .data(readBytes.apply(bb))
+                    .build();
+
+        };
+    }
+
+    /**
+     * Data eserializer function for data flow record.
+     *
+     * @param template data template record
+     * @throws DeserializationException if unable to deserialize data
+     */
+    public void dataDeserializer(DataTemplateRecord template) throws DeserializationException {
+        dataFlow = new LinkedList<>();
+        ByteBuffer bb = ByteBuffer.wrap(data, 0, data.length);
+
+        while (bb.hasRemaining()) {
+            if (bb.remaining() < template.getValueLength()) {
+                break;
+            }
+            byte[] dataRecord = new byte[template.getValueLength()];
+            bb.get(dataRecord);
+            this.setDataFlow(DataFlowRecord.deserializer().deserialize(
+                    data, 0, template.getValueLength(), template));
+        }
+
+    }
+
+    /**
+     * Builder for data flow set.
+     */
+    private static class Builder {
+
+        private int flowSetId;
+
+        private int length;
+
+        private byte[] data;
+
+        /**
+         * Setter for flowset id.
+         *
+         * @param flowSetId flowset id.
+         * @return this class builder.
+         */
+        public Builder flowSetId(int flowSetId) {
+            this.flowSetId = flowSetId;
+            return this;
+        }
+
+        /**
+         * Setter for length of this FlowSet.
+         *
+         * @param length length of this FlowSet.
+         * @return this class builder.
+         */
+        public Builder length(int length) {
+            this.length = length;
+            return this;
+        }
+
+        /**
+         * Setter for flow data.
+         *
+         * @param data flow data.
+         * @return this class builder.
+         */
+        public Builder data(byte[] data) {
+            this.data = data;
+            return this;
+        }
+
+        /**
+         * Checks arguments for data flow set.
+         */
+        private void checkArguments() {
+            checkState(flowSetId != 0, "Invalid data flowset id.");
+            checkState(length != 0, "Invalid data flowset length.");
+            checkNotNull(data, "Data flow set cannot be null");
+
+        }
+
+        /**
+         * Builds data flowset.
+         *
+         * @return data flowset.
+         */
+        public DataFlowSet build() {
+            checkArguments();
+            return new DataFlowSet(this);
+        }
+
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java
new file mode 100644
index 0000000..dc98531
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.base.MoreObjects;
+
+import org.onlab.packet.Deserializer;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.FlowTemplateField;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Template Record defines the structure and interpretation.
+ * of fields in an Options Data Record, including defining the scope
+ * within which the Options Data Record is relevant.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public final class DataTemplateRecord extends TemplateRecord {
+
+    /*
+        0                   1                   2                   3
+    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |      Template ID 256          |         Field Count           |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |        Field Type 1           |         Field Length 1        |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |        Field Type 2           |         Field Length 2        |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+     */
+
+    private TemplateId templateId;
+
+    private int filedCount;
+
+    private List<FlowTemplateField> fields;
+
+    private DataTemplateRecord(Builder builder) {
+        this.templateId = builder.templateId;
+        this.filedCount = builder.filedCount;
+        this.fields = builder.fields;
+    }
+
+    /**
+     * Returns template record's template id.
+     * Template Records is given a unique Template ID.
+     * This uniqueness is local to the Observation
+     * Domain that generated the Template ID.  Template IDs 0-255 are
+     * reserved for Template FlowSets, Options FlowSets, and other
+     * reserved FlowSets yet to be created.
+     *
+     * @return list of flowsets
+     */
+    @Override
+    public TemplateId getTemplateId() {
+        return templateId;
+    }
+
+    /**
+     * Returns number of fields in this Template Record.
+     *
+     * @return field count
+     */
+    public int getFiledCount() {
+        return filedCount;
+    }
+
+    /**
+     * Returns list of flow template fields.
+     *
+     * @return list of flow template fields
+     */
+    public List<FlowTemplateField> getFields() {
+        return fields;
+    }
+
+    public int getValueLength() {
+        Optional.ofNullable(fields)
+                .orElseThrow(() -> new IllegalStateException("Invalid fields"));
+        return fields.stream()
+                .filter(Objects::nonNull)
+                .map(FlowTemplateField::getLength)
+                .collect(Collectors.summingInt(Integer::intValue));
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("templateId", templateId)
+                .add("filedCount", filedCount)
+                .add("fields", fields)
+                .toString();
+    }
+
+    /**
+     * Data deserializer function for data template record.
+     *
+     * @return data deserializer function
+     */
+    public static Deserializer<DataTemplateRecord> deserializer() {
+        return (data, offset, length) -> {
+            Predicate<ByteBuffer> isValidBuffer = b -> b.remaining() < FlowSet.FIELD_LENTH;
+            Function<ByteBuffer, FlowTemplateField> parse = (b)
+                    -> {
+                if (isValidBuffer.test(b)) {
+                    throw new IllegalStateException("Invalid buffersize");
+                }
+                return new FlowTemplateField.Builder()
+                        .flowField(b.getShort())
+                        .length(b.getShort())
+                        .build();
+            };
+
+            Builder builder = new Builder();
+            ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+            if (isValidBuffer.test(bb)) {
+                throw new IllegalStateException("Invalid buffersize");
+            }
+            builder.templateId(bb.getShort())
+                    .filedCount(bb.getShort());
+            IntStream.rangeClosed(1, builder.filedCount).forEach(i -> builder.templateField(parse.apply(bb)));
+            return builder.build();
+        };
+    }
+
+    /**
+     * Builder for data template record.
+     */
+    private static class Builder {
+
+        private TemplateId templateId;
+
+        private int filedCount;
+
+        private List<FlowTemplateField> fields = new LinkedList<>();
+
+        /**
+         * Setter for template record's template id.
+         *
+         * @param templateId template record's template id.
+         * @return this class builder.
+         */
+        public Builder templateId(int templateId) {
+            this.templateId = new TemplateId((templateId));
+            return this;
+        }
+
+        /**
+         * Setter for number of fields in this Template Record.
+         *
+         * @param filedCount number of fields in this Template Record.
+         * @return this class builder.
+         */
+        public Builder filedCount(int filedCount) {
+            this.filedCount = filedCount;
+            return this;
+        }
+
+        /**
+         * Setter for list of flow template fields.
+         *
+         * @param fields list of flow template fields.
+         * @return this class builder.
+         */
+        public Builder templateFields(List<FlowTemplateField> fields) {
+            this.fields = fields;
+            return this;
+        }
+
+        /**
+         * Setter for  flow template fields.
+         *
+         * @param field flow template fields.
+         * @return this class builder.
+         */
+        public Builder templateField(FlowTemplateField field) {
+            this.fields.add(field);
+            return this;
+        }
+
+        /**
+         * Checks arguments for data template record.
+         */
+        private void checkArguments() {
+            checkState(filedCount != 0, "Invalid template filed count.");
+            checkNotNull(templateId, "Template Id cannot be null.");
+
+        }
+
+        /**
+         * Builds data template record.
+         *
+         * @return data template record.
+         */
+        public DataTemplateRecord build() {
+            checkArguments();
+            return new DataTemplateRecord(this);
+        }
+
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java
new file mode 100644
index 0000000..1b292a7
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.onlab.packet.BasePacket;
+import org.onlab.packet.DeserializationException;
+import org.onlab.packet.Deserializer;
+
+/**
+ * FlowSet is a generic term for a collection of Flow Records that have.
+ * a similar structure.  In an Export Packet, one or more FlowSets
+ * follow the Packet Header.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public abstract class FlowSet extends BasePacket {
+
+    public static final int FLOW_SET_HEADER_LENTH = 4;
+
+    public static final int FIELD_LENTH = 4;
+
+    public static final int RECORD_HEADER_LENGTH = 4;
+
+    /**
+     * FlowSets type
+     * FlowSets: Template FlowSet, Options Template FlowSet, and Data FlowSet.
+     */
+    public enum Type {
+
+        TEMPLATE_FLOWSET(0, TemplateFlowSet.deserializer()),
+        OPTIONAL_TEMPLATE_FLOWSET(1, OptionalTemplateFlowSet.deserializer()),
+        DATA_FLOWSET(Integer.MAX_VALUE, DataFlowSet.deserializer());
+
+        private final int flowSetId;
+        private final Deserializer deserializer;
+
+        Type(int flowSetId, Deserializer deserializer) {
+            this.flowSetId = flowSetId;
+            this.deserializer = deserializer;
+        }
+
+        private static Map<Integer, Type> parser = new ConcurrentHashMap<>();
+
+        static {
+            Arrays.stream(Type.values()).forEach(type -> parser.put(type.flowSetId, type));
+        }
+
+        public static Type getType(int flowSetId) throws DeserializationException {
+            if (flowSetId < 0) {
+                throw new DeserializationException("Invalid trap type");
+            }
+            return Optional.of(flowSetId)
+                    .filter(id -> parser.containsKey(id))
+                    .map(id -> parser.get(id))
+                    .orElse(DATA_FLOWSET);
+        }
+
+        public Deserializer getDecoder() {
+            return this.deserializer;
+        }
+
+    }
+
+    public abstract Type getType();
+
+    public abstract int getFlowSetId();
+
+    public abstract int getLength();
+
+    @Override
+    public byte[] serialize() {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
new file mode 100644
index 0000000..df5ffce
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+/**
+ * Channel handler deals with the netfow exporter connection and dispatches messages
+ * from netfow exporter to the appropriate locations.
+ */
+public class NeflowChannelHandler extends SimpleChannelHandler {
+
+    private Channel channel;
+
+    /**
+     * Create a new netflow channelHandler instance.
+     */
+    NeflowChannelHandler() {
+    }
+
+    /**
+     * Netflow channel connect to netflow exporter.
+     *
+     * @param ctx   channel handler context
+     * @param event channel state event
+     * @throws Exception on error while connecting channel
+     */
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception {
+        channel = event.getChannel();
+    }
+
+    /**
+     * Netflow channel disconnect to netflow exporter.
+     *
+     * @param ctx   channel handler context
+     * @param event channel state event
+     * @throws Exception on error while disconnecting channel
+     */
+    @Override
+    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception {
+        channel = event.getChannel();
+    }
+
+    /**
+     * Netflow channel exception to netflow exporter.
+     *
+     * @param ctx   channel handler context
+     * @param event channel exception event
+     * @throws Exception on error while parsing exception
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) throws Exception {
+        //TODO exception handler
+    }
+
+    /**
+     * Netflow message receive from netflow exporter.
+     *
+     * @param ctx   channel handler context
+     * @param event channel message event
+     * @throws Exception on error while parsing exception
+     */
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
+        //TODO Netflow message store to netflow distributed store
+        NetFlowPacket packet = (NetFlowPacket) event.getMessage();
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
new file mode 100644
index 0000000..be6ae9a
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
@@ -0,0 +1,372 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import com.google.common.base.MoreObjects;
+import java.util.Objects;
+
+import org.onlab.packet.Deserializer;
+import org.onlab.packet.BasePacket;
+
+import static com.google.common.base.Preconditions.checkState;
+
+
+/**
+ * An Netflow Packet consists of a Packet Header followed by one or more
+ * FlowSets.  The FlowSets can be any of the possible three types:
+ * Template, Data, or Options Template.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public final class NetFlowPacket extends BasePacket {
+
+    /*
+     +--------+-------------------------------------------+
+     |        | +----------+ +---------+ +----------+     |
+     | Packet | | Template | | Data    | | Options  |     |
+     | Header | | FlowSet  | | FlowSet | | Template | ... |
+     |        | |          | |         | | FlowSet  |     |
+     |        | +----------+ +---------+ +----------+     |
+     +--------+-------------------------------------------+
+
+   The Packet Header format is specified as:
+
+    0                   1                   2                   3
+    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |       Version Number          |            Count              |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |                           sysUpTime                           |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |                           UNIX Secs                           |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |                       Sequence Number                         |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |                        Source ID                              |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+*/
+
+
+    private int version;
+    private int count;
+    private long sysUptime;
+    private long timestamp;
+    private int flowSequence;
+    private int sourceId;
+    private List<FlowSet> flowSets;
+
+    private NetFlowPacket(Builder builder) {
+        this.version = builder.version;
+        this.count = builder.count;
+        this.sysUptime = builder.sysUptime;
+        this.timestamp = builder.timestamp;
+        this.flowSequence = builder.flowSequence;
+        this.sourceId = builder.sourceId;
+        this.flowSets = builder.flowSets;
+    }
+
+    /**
+     * Returns Version of Flow Record format exported in this packet.
+     *
+     * @return version number
+     */
+    public int getVersion() {
+        return version;
+    }
+
+    /**
+     * Returns total number of records in the Export Packet.
+     *
+     * @return total number of recoreds
+     */
+    public int getCount() {
+        return count;
+    }
+
+    /**
+     * Returns time in milliseconds since this device was first booted.
+     *
+     * @return system up time
+     */
+    public long getSysUptime() {
+        return sysUptime;
+    }
+
+    /**
+     * Returns packet timestamp.
+     * Time in seconds since 0000 UTC 1970, at which the Export Packet
+     * leaves the Exporter
+     *
+     * @return packet time stamp
+     */
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    /**
+     * Returns flow packet sequence number.
+     * Incremental sequence counter of all Export Packets sent from
+     * the current Observation Domain by the Exporter.  This value
+     * MUST be cumulative, and SHOULD be used by the Collector to
+     * identify whether any Export Packets have been missed.
+     *
+     * @return packet sequence number
+     */
+    public int getFlowSequence() {
+        return flowSequence;
+    }
+
+    /**
+     * Returns A 32-bit value that identifies the Exporter Observation Domain.
+     *
+     * @return exporter source id
+     */
+    public int getSourceId() {
+        return sourceId;
+    }
+
+    /**
+     * Returns list of flowsets.
+     *
+     * @return list of flowsets
+     */
+    public List<FlowSet> getFlowSets() {
+        return flowSets;
+    }
+
+    /**
+     * Deserializer function for netflow packets.
+     *
+     * @return deserializer function
+     */
+    public static Deserializer<NetFlowPacket> deserializer() {
+        return (data, offset, length) -> {
+            System.out.println(length);
+            ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+            Builder builder = new Builder();
+            builder.version(bb.getShort())
+                    .count(bb.getShort())
+                    .sysUptime(bb.getInt())
+                    .timestamp(bb.getInt())
+                    .flowSequence(bb.getInt())
+                    .sourceId(bb.getInt());
+            while (bb.hasRemaining()) {
+
+                int flowSetId = bb.getShort();
+                int flowSetLength = bb.getShort();
+                bb.position(bb.position() - FlowSet.FLOW_SET_HEADER_LENTH);
+                byte[] flowSet;
+                if (bb.remaining() < flowSetLength) {
+                    break;
+                }
+
+                flowSet = new byte[flowSetLength];
+                bb.get(flowSet);
+
+                builder.flowSet((FlowSet) FlowSet.Type.getType(flowSetId).getDecoder()
+                        .deserialize(flowSet, 0, flowSetLength));
+            }
+            return builder.build();
+        };
+    }
+
+    @Override
+    public byte[] serialize() {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 3;
+        hash = 59 * hash + this.version;
+        hash = 59 * hash + this.count;
+        hash = 59 * hash + (int) (this.timestamp ^ (this.timestamp >>> 32));
+        hash = 59 * hash + this.flowSequence;
+        hash = 59 * hash + this.sourceId;
+        hash = 59 * hash + Objects.hashCode(this.flowSets);
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final NetFlowPacket other = (NetFlowPacket) obj;
+        if (this.version != other.version) {
+            return false;
+        }
+        if (this.count != other.count) {
+            return false;
+        }
+        if (this.timestamp != other.timestamp) {
+            return false;
+        }
+        if (this.flowSequence != other.flowSequence) {
+            return false;
+        }
+        if (this.sourceId != other.sourceId) {
+            return false;
+        }
+        return Objects.equals(this.flowSets, other.flowSets);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("version", version)
+                .add("count", count)
+                .add("sysUptime", sysUptime)
+                .add("timestamp", timestamp)
+                .add("flowSequence", flowSequence)
+                .add("sourceId", sourceId)
+                .add("flowSets", flowSets)
+                .toString();
+    }
+
+    /**
+     * Builder for netflow packet.
+     */
+    private static class Builder {
+
+        private int version;
+        private int count;
+        private long sysUptime;
+        private long timestamp;
+        private int flowSequence;
+        private int sourceId;
+        private List<FlowSet> flowSets = new LinkedList<>();
+
+        /**
+         * Setter Version of Flow Record format exported in this packet.
+         *
+         * @param version number.
+         * @return this class builder.
+         */
+        public Builder version(int version) {
+            this.version = version;
+            return this;
+        }
+
+        /**
+         * Setter for total number of records in the Export Packet.
+         *
+         * @param count flow record count.
+         * @return this class builder.
+         */
+        public Builder count(int count) {
+            this.count = count;
+            return this;
+        }
+
+        /**
+         * Setter for time in milliseconds since this device was first booted.
+         *
+         * @param sysUptime system up time.
+         * @return this class builder.
+         */
+        public Builder sysUptime(long sysUptime) {
+            this.sysUptime = sysUptime;
+            return this;
+        }
+
+        /**
+         * Setter for packet timestamp.
+         *
+         * @param timestamp packet timestamp.
+         * @return this class builder.
+         */
+        public Builder timestamp(long timestamp) {
+            this.timestamp = timestamp;
+            return this;
+        }
+
+        /**
+         * Setter for flow sequence.
+         *
+         * @param flowSequence flow sequence.
+         * @return this class builder.
+         */
+        public Builder flowSequence(int flowSequence) {
+            this.flowSequence = flowSequence;
+            return this;
+        }
+
+        /**
+         * Setter for sourceId.
+         *
+         * @param sourceId exporter sourceid.
+         * @return this class builder.
+         */
+        public Builder sourceId(int sourceId) {
+            this.sourceId = sourceId;
+            return this;
+        }
+
+        /**
+         * Setter for list of flowsets.
+         *
+         * @param flowSets list of flowsets.
+         * @return this class builder.
+         */
+        public Builder flowSets(List<FlowSet> flowSets) {
+            this.flowSets = flowSets;
+            return this;
+        }
+
+        /**
+         * Setter for flowset.
+         *
+         * @param flowSet flowset.
+         * @return this class builder.
+         */
+        public Builder flowSet(FlowSet flowSet) {
+            this.flowSets.add(flowSet);
+            return this;
+        }
+
+        /**
+         * Checks arguments for netflow packet.
+         */
+        private void checkArguments() {
+            checkState(version != 0, "Invalid Version.");
+            checkState(count != 0, "Invalid record count.");
+            checkState(sysUptime != 0, "Invalid system up time.");
+            checkState(timestamp != 0, "Invalid flow timestamp.");
+        }
+
+        /**
+         * Builds Netflow packet.
+         *
+         * @return Netflowpacket.
+         */
+        public NetFlowPacket build() {
+            checkArguments();
+            return new NetFlowPacket(this);
+        }
+
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
new file mode 100644
index 0000000..8dad97d
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decode an netflow message from a Channel, for use in a netty pipeline.
+ */
+public class NetflowMessageDecoder extends FrameDecoder {
+
+    private static final Logger log = LoggerFactory.getLogger(NetflowMessageDecoder.class);
+
+    @Override
+    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
+
+        try {
+            if (buffer.readableBytes() > 0) {
+                byte[] bytes = new byte[buffer.readableBytes()];
+                buffer.readBytes(bytes);
+                ctx.setAttachment(null);
+                return NetFlowPacket.deserializer().deserialize(bytes, 0, bytes.length);
+            }
+        } catch (Exception e) {
+            log.error("Netflow message decode error");
+            buffer.resetReaderIndex();
+            buffer.discardReadBytes();
+
+        }
+        return null;
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
new file mode 100644
index 0000000..178f6e6
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+
+
+/**
+ * Creates a ChannelPipeline for a server-side netflow message channel.
+ */
+public class NetflowPipelineFactory implements ChannelPipelineFactory {
+
+    /**
+     * Constructor to initialize the values.
+     */
+    public NetflowPipelineFactory() {
+        super();
+    }
+
+    /**
+     * Get server-side pipe line channel.
+     *
+     * @return ChannelPipeline server-side pipe line channel
+     * @throws Exception on while getting pipe line
+     */
+    @Override
+    public ChannelPipeline getPipeline() throws Exception {
+        NeflowChannelHandler handler = new NeflowChannelHandler();
+
+        ChannelPipeline pipeline = Channels.pipeline();
+        pipeline.addLast("netflowmessagedecoder", new NetflowMessageDecoder());
+        pipeline.addLast("ActiveHandler", handler);
+        return pipeline;
+    }
+
+}
+
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java
new file mode 100644
index 0000000..9a00bb2
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import java.util.List;
+
+import org.onlab.packet.Deserializer;
+
+
+/**
+ * The Options Template Record (and its corresponding Options Data
+ * Record) is used to supply information about the NetFlow process
+ * configuration or NetFlow process specific data, rather than supplying
+ * information about IP Flows.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public class OptionalTemplateFlowSet extends FlowSet {
+
+    /*
+        0                   1                   2                   3
+    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |       FlowSet ID = 1          |          Length               |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |         Template ID           |      Option Scope Length      |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |        Option Length          |       Scope 1 Field Type      |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |     Scope 1 Field Length      |               ...             |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |     Scope N Field Length      |      Option 1 Field Type      |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |     Option 1 Field Length     |             ...               |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |     Option M Field Length     |           Padding             |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+     */
+
+    private int flowSetId;
+
+    private int length;
+
+    private List<DataTemplateRecord> records;
+
+    /**
+     * Returns flowset type.
+     *
+     * @return flowset type
+     */
+    @Override
+    public Type getType() {
+        return Type.OPTIONAL_TEMPLATE_FLOWSET;
+    }
+
+    /**
+     * Returns flowset id.
+     * FlowSet ID value of 1 is reserved for the Options Template.
+     *
+     * @return flow set ID
+     */
+    public int getFlowSetId() {
+        return flowSetId;
+    }
+
+    /**
+     * Returns total length of this FlowSet.
+     * Each Options Template FlowSet
+     * MAY contain multiple Options Template Records.  Thus, the
+     * Length value MUST be used to determine the position of the next
+     * FlowSet record, which could be either a Template FlowSet or
+     * Data FlowSet.
+     *
+     * @return flow set ID
+     */
+    public int getLength() {
+        return length;
+    }
+
+    /**
+     * Returns list of optional data template records.
+     *
+     * @return list of optional data template records
+     */
+    public List<DataTemplateRecord> getRecords() {
+        return records;
+    }
+
+    /**
+     * Deserializer function for data option template flowset.
+     *
+     * @return data deserializer function
+     */
+    public static Deserializer<OptionalTemplateFlowSet> deserializer() {
+        return (data, offset, length) -> {
+            //TODO parse optional template
+            return new OptionalTemplateFlowSet();
+        };
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java
new file mode 100644
index 0000000..b6bc51e
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java
@@ -0,0 +1,279 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+import org.onlab.packet.DeserializationException;
+import org.onlab.packet.Deserializer;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * One of the essential elements in the NetFlow format is the Template
+ * FlowSet.  Templates greatly enhance the flexibility of the Flow
+ * Record format because they allow the NetFlow Collector to process
+ * Flow Records without necessarily knowing the interpretation of all
+ * the data in the Flow Record.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public final class TemplateFlowSet extends FlowSet {
+
+    /*
+        0                   1                   2                   3
+    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |       FlowSet ID = 0          |          Length               |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |      Template ID 256          |         Field Count           |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |        Field Type 1           |         Field Length 1        |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |        Field Type 2           |         Field Length 2        |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |             ...               |              ...              |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |        Field Type N           |         Field Length N        |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |      Template ID 257          |         Field Count           |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |        Field Type 1           |         Field Length 1        |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |        Field Type 2           |         Field Length 2        |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |             ...               |              ...              |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |        Field Type M           |         Field Length M        |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |             ...               |              ...              |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |        Template ID K          |         Field Count           |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   |             ...               |              ...              |
+   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+     */
+
+    private int flowSetId;
+
+    private int length;
+
+    private List<DataTemplateRecord> records;
+
+    private TemplateFlowSet(Builder builder) {
+        this.records = builder.records;
+        this.length = builder.length;
+        this.flowSetId = builder.flowSetId;
+    }
+
+    /**
+     * Return template flow set id.
+     * FlowSet ID value of 0 is reserved for the Template FlowSet.
+     *
+     * @return flow set ID
+     */
+    @Override
+    public int getFlowSetId() {
+        return this.flowSetId;
+    }
+
+    /**
+     * Returns total length of this flowSet.
+     *
+     * @return length of flowset
+     */
+    @Override
+    public int getLength() {
+        return this.length;
+    }
+
+    /**
+     * Returns list of flow records.
+     *
+     * @return list of flow records
+     */
+    public List<DataTemplateRecord> getRecords() {
+        return records;
+    }
+
+    /**
+     * Returns type of the flowset.
+     *
+     * @return type of the flowset
+     */
+    @Override
+    public Type getType() {
+        return Type.TEMPLATE_FLOWSET;
+    }
+
+    @Override
+    public int hashCode() {
+        int hash = 7;
+        hash = 53 * hash + this.flowSetId;
+        hash = 53 * hash + this.length;
+        hash = 53 * hash + Objects.hashCode(this.records);
+        return hash;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null) {
+            return false;
+        }
+        if (getClass() != obj.getClass()) {
+            return false;
+        }
+        final TemplateFlowSet other = (TemplateFlowSet) obj;
+        if (this.flowSetId != other.flowSetId) {
+            return false;
+        }
+        if (this.length != other.length) {
+            return false;
+        }
+        return Objects.equals(this.records, other.records);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("flowSetId", flowSetId)
+                .add("length", length)
+                .add("records", records)
+                .toString();
+    }
+
+    /**
+     * Data deserializer function for template flow set.
+     *
+     * @return data deserializer function
+     */
+    public static Deserializer<TemplateFlowSet> deserializer() {
+        return (data, offset, length) -> {
+
+            Predicate<ByteBuffer> isValidBuffer = b -> b.remaining() < FlowSet.FIELD_LENTH;
+
+            ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+            if (isValidBuffer.test(bb)) {
+                throw new DeserializationException("Invalid buffer size");
+            }
+            Builder builder = new Builder()
+                    .flowSetId(bb.getShort())
+                    .length(bb.getShort());
+            while (bb.hasRemaining()) {
+                if (isValidBuffer.test(bb)) {
+                    break;
+                }
+                int templateId = bb.getShort();
+                int fieldCount = bb.getShort();
+                int bufferLength = (fieldCount * FlowSet.FIELD_LENTH) + FlowSet.FIELD_LENTH;
+                byte[] record = new byte[bufferLength];
+                bb.position(bb.position() - FlowSet.FIELD_LENTH);
+                if (bb.remaining() < bufferLength) {
+                    break;
+                }
+                bb.get(record);
+                builder.templateRecord(DataTemplateRecord.deserializer().deserialize(record, 0, bufferLength));
+
+            }
+            return builder.build();
+        };
+    }
+
+    /**
+     * Builder for template flow set.
+     */
+    private static class Builder {
+
+        private int flowSetId;
+
+        private int length;
+
+        private List<DataTemplateRecord> records = new LinkedList<>();
+
+        /**
+         * Setter for template flow set id.
+         *
+         * @param flowSetId template flow set id.
+         * @return this class builder.
+         */
+        public Builder flowSetId(int flowSetId) {
+            this.flowSetId = flowSetId;
+            return this;
+        }
+
+        /**
+         * Setter for total length of this flowSet.
+         *
+         * @param length total length of this flowSet.
+         * @return this class builder.
+         */
+        public Builder length(int length) {
+            this.length = length;
+            return this;
+        }
+
+        /**
+         * Setter for list of flow records.
+         *
+         * @param records list of flow records.
+         * @return this class builder.
+         */
+        public Builder templateRecords(List<DataTemplateRecord> records) {
+            this.records = records;
+            return this;
+        }
+
+        /**
+         * Setter for flow records.
+         *
+         * @param record flow records.
+         * @return this class builder.
+         */
+        public Builder templateRecord(DataTemplateRecord record) {
+            this.records.add(record);
+            return this;
+        }
+
+        /**
+         * Checks arguments for template flow set.
+         */
+        private void checkArguments() {
+            checkState(flowSetId == 0, "Invalid flow set id.");
+            checkState(length == 0, "Invalid flow set length.");
+
+        }
+
+        /**
+         * Builds template flow set.
+         *
+         * @return template flow set.
+         */
+        public TemplateFlowSet build() {
+            checkArguments();
+            return new TemplateFlowSet(this);
+        }
+
+    }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java
new file mode 100644
index 0000000..db66292
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import org.onlab.packet.BasePacket;
+
+import org.onosproject.netflow.FlowRecord;
+/**
+ * A Template Record defines the structure and interpretation of fields.
+ * in a Flow Data Record.
+ */
+public abstract class TemplateRecord extends BasePacket implements FlowRecord {
+
+    @Override
+    public byte[] serialize() {
+        throw new UnsupportedOperationException("Not supported yet.");
+    }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/package-info.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/package-info.java
new file mode 100644
index 0000000..bd4bf71
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation classes for sample application that assigns and manages netflow collector.
+ */
+package org.onosproject.netflow.impl;
\ No newline at end of file