Restructuring netflow protocol
Changed netflow structure from
//apps/netflow to //apps/ipflow-monitor/netflow
Change-Id: I11722b2c68105137306755e890da0dd8ba7245df
diff --git a/apps/ipflow-monitor/netflow/BUILD b/apps/ipflow-monitor/netflow/BUILD
new file mode 100644
index 0000000..51f149e
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/BUILD
@@ -0,0 +1,13 @@
+BUNDLES = [
+ "//apps/ipflow-monitor/netflow/api:onos-apps-ipflow-monitor-netflow-api",
+ "//apps/ipflow-monitor/netflow/app:onos-apps-ipflow-monitor-netflow-app",
+]
+
+onos_app(
+ category = "Traffic Monitoring",
+ description = "Provides handling of netflow traffic.",
+ included_bundles = BUNDLES,
+ origin = "ONOS Community",
+ title = "Netflow traffic monitoring",
+ url = "http://onosproject.org",
+)
diff --git a/apps/ipflow-monitor/netflow/api/BUILD b/apps/ipflow-monitor/netflow/api/BUILD
new file mode 100644
index 0000000..f72e3bc
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/BUILD
@@ -0,0 +1,3 @@
+osgi_jar_with_tests(
+ deps = CORE_DEPS,
+)
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataDeserializer.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataDeserializer.java
new file mode 100644
index 0000000..5821361
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataDeserializer.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netflow;
+
+import org.onlab.packet.IPacket;
+import org.onlab.packet.DeserializationException;
+@FunctionalInterface
+public interface DataDeserializer<U extends IPacket> {
+
+ /**
+ * Deserialize a netflow data flowset packet object from a byte array.
+ *
+ * @param data input array to take packet bytes from
+ * @param offset index where this packet header begins in the byte array
+ * @param length length of the packet header
+ * @param template data template
+ * @return a deserialized packet object
+ * @throws DeserializationException if the packet cannot be deserialized
+ * from the input
+ */
+ U deserialize(byte[] data, int offset, int length, FlowRecord template) throws DeserializationException;
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataFlowRecord.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataFlowRecord.java
new file mode 100644
index 0000000..7f29f43
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataFlowRecord.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import com.google.common.base.MoreObjects;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+
+/**
+ * A Flow Data Record is a data record that contains values of the Flow.
+ * parameters corresponding to a Template Record.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public class DataFlowRecord extends DataRecord {
+
+ /*
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Record 1 - Field Value 1 | Record 1 - Field Value 2 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Record 1 - Field Value 3 | ... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Record 2 - Field Value 1 | Record 2 - Field Value 2 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Record 2 - Field Value 3 | ... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ */
+
+ private TemplateId templateId;
+
+ private List<Flow> flows;
+
+ public DataFlowRecord(Builder builder) {
+ this.templateId = builder.templateId;
+ this.flows = builder.flows;
+ }
+
+ /**
+ * Returns unique template ID.
+ * Template Records is given a unique Template ID.
+ * This uniqueness is local to the Observation
+ * Domain that generated the Template ID. Template IDs 0-255 are
+ * reserved for Template FlowSets, Options FlowSets, and other
+ * reserved FlowSets yet to be created.
+ *
+ * @return list of flowsets
+ */
+ @Override
+ public TemplateId getTemplateId() {
+ return this.templateId;
+ }
+
+ /**
+ * Returns type of data flow.
+ *
+ * @return type of data flow
+ */
+ public List<Flow> getFlows() {
+ return flows;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 7;
+ hash = 29 * hash + Objects.hashCode(this.templateId);
+ hash = 29 * hash + Objects.hashCode(this.flows);
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final DataFlowRecord other = (DataFlowRecord) obj;
+ if (!Objects.equals(this.templateId, other.templateId)) {
+ return false;
+ }
+ return Objects.equals(this.flows, other.flows);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("templateId", templateId)
+ .add("\n\nflows", flows)
+ .toString();
+ }
+
+ /**
+ * Deserializer function for data flow record.
+ *
+ * @return deserializer function
+ */
+ public static DataDeserializer<DataFlowRecord> deserializer() {
+ return (data, offset, length, template) -> {
+ ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+
+ Predicate<FlowTemplateField> isValidTemplate = t
+ -> Objects.nonNull(t) && Objects.nonNull(t.getFlowField()) && t.getLength() > 0;
+
+ BiPredicate<ByteBuffer, Integer> isValidBuffer = (b, l)
+ -> b.hasRemaining() && b.remaining() >= l;
+
+ Function<FlowTemplateField, Flow> parser = (f) -> {
+
+ if (!isValidTemplate.test(f) && isValidBuffer.test(bb, f.getLength())) {
+ throw new IllegalStateException("Invalid data set");
+ }
+ return new Flow.Builder()
+ .field(f.getFlowField())
+ .value(f.getFlowField().getParser().apply(bb, f.getLength()))
+ .build();
+
+ };
+ DataTemplateRecord templateRecord = (DataTemplateRecord) template;
+ Builder builder = new Builder()
+ .templateId(templateRecord.getTemplateId());
+ long count = templateRecord.getFields().stream()
+ .filter(Objects::nonNull)
+ .map(t -> builder.flow(parser.apply(t)))
+ .count();
+
+ if (count != templateRecord.getFiledCount()) {
+ throw new IllegalStateException("Invalid parsing fields");
+ }
+ return builder.build();
+ };
+ }
+
+ /**
+ * Builder for data flow record.
+ */
+ private static class Builder {
+
+ private TemplateId templateId;
+
+ private List<Flow> flows = new LinkedList<>();
+
+ /**
+ * Setter for unique template ID.
+ *
+ * @param templateId template id.
+ * @return this class builder.
+ */
+ public Builder templateId(TemplateId templateId) {
+ this.templateId = templateId;
+ return this;
+ }
+
+ /**
+ * Setter for data flow.
+ *
+ * @param flow data flow.
+ * @return this class builder.
+ */
+ public Builder flow(Flow flow) {
+ this.flows.add(flow);
+ return this;
+ }
+
+ /**
+ * Setter for list of data flow.
+ *
+ * @param flows list of data flow.
+ * @return this class builder.
+ */
+ public Builder flows(List<Flow> flows) {
+ this.flows = flows;
+ return this;
+ }
+
+ /**
+ * Checks arguments for data flow record.
+ */
+ private void checkArguments() {
+ checkNotNull(templateId, "TemplateId cannot be null");
+ }
+
+ /**
+ * Builds data flow record.
+ *
+ * @return data flow record.
+ */
+ public DataFlowRecord build() {
+ checkArguments();
+ return new DataFlowRecord(this);
+ }
+
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataFlowSet.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataFlowSet.java
new file mode 100644
index 0000000..b569387
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataFlowSet.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import com.google.common.base.MoreObjects;
+
+import org.onlab.packet.DeserializationException;
+import org.onlab.packet.Deserializer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * A Data FlowSet is one or more records, of the same type, that are
+ * grouped together in an Export Packet. Each record is either a Flow
+ * Data Record or an Options Data Record previously defined by a
+ * Template Record or an Options Template Record.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public final class DataFlowSet extends FlowSet {
+
+ /*
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | FlowSet ID = Template ID | Length |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Record 1 - Field Value 1 | Record 1 - Field Value 2 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Record 1 - Field Value 3 | ... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Record 2 - Field Value 1 | Record 2 - Field Value 2 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Record 2 - Field Value 3 | ... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Record 3 - Field Value 1 | ... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | ... | Padding |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ */
+
+ private int flowSetId;
+
+ private int length;
+
+ private byte[] data;
+
+ private DataFlowSet(Builder builder) {
+ this.flowSetId = builder.flowSetId;
+ this.length = builder.length;
+ this.data = builder.data;
+ }
+
+ private List<DataFlowRecord> dataFlow = new LinkedList<>();
+
+ /**
+ * Returns flowset id.
+ * Each Data FlowSet is associated with a FlowSet ID. The FlowSet
+ * ID maps to a (previously generated) Template ID
+ *
+ * @return flowset id
+ */
+ @Override
+ public int getFlowSetId() {
+ return this.flowSetId;
+ }
+
+ /**
+ * Returns length of this FlowSet.
+ * Length is the sum of the lengths
+ * of the FlowSet ID, Length itself, all Flow Records within this
+ * FlowSet, and the padding bytes, if any.
+ *
+ * @return length of the flowset
+ */
+ @Override
+ public int getLength() {
+ return this.length;
+ }
+
+ /**
+ * Returns list of data flow records.
+ *
+ * @return list of data flow records
+ */
+ public List<DataFlowRecord> getDataFlow() {
+ return dataFlow;
+ }
+
+ /**
+ * Set data flow record.
+ *
+ * @param dataFlow data flow record
+ */
+ public void setDataFlow(DataFlowRecord dataFlow) {
+ this.dataFlow.add(dataFlow);
+ }
+
+ /**
+ * Returns type of flowset.
+ *
+ * @return type of flowset
+ */
+ @Override
+ public Type getType() {
+ return Type.DATA_FLOWSET;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 3;
+ hash = 79 * hash + this.flowSetId;
+ hash = 79 * hash + this.length;
+ hash = 79 * hash + Objects.hashCode(this.dataFlow);
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final DataFlowSet other = (DataFlowSet) obj;
+ if (this.flowSetId != other.flowSetId) {
+ return false;
+ }
+ if (this.length != other.length) {
+ return false;
+ }
+ return Objects.equals(this.dataFlow, other.dataFlow);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("flowSetId", flowSetId)
+ .add("length", length)
+ .add("data", data)
+ .add("dataFlow", dataFlow)
+ .toString();
+ }
+
+ /**
+ * Deserializer function for data flow set.
+ *
+ * @return deserializer function
+ */
+ public static Deserializer<DataFlowSet> deserializer() {
+ return (data, offset, length) -> {
+ Function<ByteBuffer, byte[]> readBytes = b -> {
+ if (b.remaining() == b.limit()) {
+ return null;
+ }
+ byte[] bytes = new byte[b.remaining()];
+ b.get(bytes);
+ return bytes;
+ };
+ ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+ return new Builder()
+ .flowSetId(bb.getShort())
+ .length(bb.getShort())
+ .data(readBytes.apply(bb))
+ .build();
+
+ };
+ }
+
+ /**
+ * Data eserializer function for data flow record.
+ *
+ * @param template data template record
+ * @throws DeserializationException if unable to deserialize data
+ */
+ public void dataDeserializer(DataTemplateRecord template) throws DeserializationException {
+ dataFlow = new LinkedList<>();
+ ByteBuffer bb = ByteBuffer.wrap(data, 0, data.length);
+
+ while (bb.hasRemaining()) {
+ if (bb.remaining() < template.getValueLength()) {
+ break;
+ }
+ byte[] dataRecord = new byte[template.getValueLength()];
+ bb.get(dataRecord);
+ this.setDataFlow(DataFlowRecord.deserializer().deserialize(
+ dataRecord, 0, template.getValueLength(), template));
+ }
+
+ }
+
+ /**
+ * Builder for data flow set.
+ */
+ private static class Builder {
+
+ private int flowSetId;
+
+ private int length;
+
+ private byte[] data;
+
+ /**
+ * Setter for flowset id.
+ *
+ * @param flowSetId flowset id.
+ * @return this class builder.
+ */
+ public Builder flowSetId(int flowSetId) {
+ this.flowSetId = flowSetId;
+ return this;
+ }
+
+ /**
+ * Setter for length of this FlowSet.
+ *
+ * @param length length of this FlowSet.
+ * @return this class builder.
+ */
+ public Builder length(int length) {
+ this.length = length;
+ return this;
+ }
+
+ /**
+ * Setter for flow data.
+ *
+ * @param data flow data.
+ * @return this class builder.
+ */
+ public Builder data(byte[] data) {
+ this.data = data;
+ return this;
+ }
+
+ /**
+ * Checks arguments for data flow set.
+ */
+ private void checkArguments() {
+ checkState(flowSetId != 0, "Invalid data flowset id.");
+ checkState(length != 0, "Invalid data flowset length.");
+ checkNotNull(data, "Data flow set cannot be null");
+
+ }
+
+ /**
+ * Builds data flowset.
+ *
+ * @return data flowset.
+ */
+ public DataFlowSet build() {
+ checkArguments();
+ return new DataFlowSet(this);
+ }
+
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataRecord.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataRecord.java
new file mode 100644
index 0000000..c584044
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataRecord.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import org.onlab.packet.BasePacket;
+
+/**
+ * A Flow Data Record is a data record that contains values of the Flow
+ * parameters corresponding to a Template Record.
+ */
+public abstract class DataRecord extends BasePacket implements FlowRecord {
+
+ @Override
+ public byte[] serialize() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+}
+
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataTemplateRecord.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataTemplateRecord.java
new file mode 100644
index 0000000..4230a10
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/DataTemplateRecord.java
@@ -0,0 +1,256 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.base.MoreObjects;
+
+import org.onlab.packet.Deserializer;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Template Record defines the structure and interpretation.
+ * of fields in an Options Data Record, including defining the scope
+ * within which the Options Data Record is relevant.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public final class DataTemplateRecord extends TemplateRecord {
+
+ /*
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Template ID 256 | Field Count |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Field Type 1 | Field Length 1 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Field Type 2 | Field Length 2 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ */
+
+ private TemplateId templateId;
+
+ private int filedCount;
+
+ private List<FlowTemplateField> fields;
+
+ private DataTemplateRecord(Builder builder) {
+ this.templateId = builder.templateId;
+ this.filedCount = builder.filedCount;
+ this.fields = builder.fields;
+ }
+
+ /**
+ * Returns template record's template id.
+ * Template Records is given a unique Template ID.
+ * This uniqueness is local to the Observation
+ * Domain that generated the Template ID. Template IDs 0-255 are
+ * reserved for Template FlowSets, Options FlowSets, and other
+ * reserved FlowSets yet to be created.
+ *
+ * @return list of flowsets
+ */
+ @Override
+ public TemplateId getTemplateId() {
+ return templateId;
+ }
+
+ /**
+ * Returns number of fields in this Template Record.
+ *
+ * @return field count
+ */
+ public int getFiledCount() {
+ return filedCount;
+ }
+
+ /**
+ * Returns list of flow template fields.
+ *
+ * @return list of flow template fields
+ */
+ public List<FlowTemplateField> getFields() {
+ return fields;
+ }
+
+ public int getValueLength() {
+ Optional.ofNullable(fields)
+ .orElseThrow(() -> new IllegalStateException("Invalid fields"));
+ return fields.stream()
+ .filter(Objects::nonNull)
+ .map(FlowTemplateField::getLength)
+ .collect(Collectors.summingInt(Integer::intValue));
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("templateId", templateId)
+ .add("filedCount", filedCount)
+ .add("fields", fields)
+ .toString();
+ }
+
+
+ @Override
+ public int hashCode() {
+ int hash = 3;
+ hash = 79 * hash + Objects.hashCode(this.templateId);
+ hash = 79 * hash + this.filedCount;
+ hash = 79 * hash + Objects.hashCode(this.fields);
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final DataTemplateRecord other = (DataTemplateRecord) obj;
+ if (this.filedCount != other.filedCount) {
+ return false;
+ }
+ if (!Objects.equals(this.templateId, other.templateId)) {
+ return false;
+ }
+ return Objects.equals(this.fields, other.fields);
+ }
+
+ /**
+ * Data deserializer function for data template record.
+ *
+ * @return data deserializer function
+ */
+ public static Deserializer<DataTemplateRecord> deserializer() {
+ return (data, offset, length) -> {
+ Predicate<ByteBuffer> isValidBuffer = b -> b.remaining() < FlowSet.FIELD_LENTH;
+ Function<ByteBuffer, FlowTemplateField> parse = (b)
+ -> {
+ if (isValidBuffer.test(b)) {
+ throw new IllegalStateException("Invalid buffersize");
+ }
+ return new FlowTemplateField.Builder()
+ .flowField(b.getShort())
+ .length(b.getShort())
+ .build();
+ };
+
+ Builder builder = new Builder();
+ ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+ if (isValidBuffer.test(bb)) {
+ throw new IllegalStateException("Invalid buffersize");
+ }
+ builder.templateId(bb.getShort())
+ .filedCount(bb.getShort());
+ IntStream.rangeClosed(1, builder.filedCount).forEach(i -> builder.templateField(parse.apply(bb)));
+ return builder.build();
+ };
+ }
+
+ /**
+ * Builder for data template record.
+ */
+ private static class Builder {
+
+ private TemplateId templateId;
+
+ private int filedCount;
+
+ private List<FlowTemplateField> fields = new LinkedList<>();
+
+ /**
+ * Setter for template record's template id.
+ *
+ * @param templateId template record's template id.
+ * @return this class builder.
+ */
+ public Builder templateId(int templateId) {
+ this.templateId = new TemplateId((templateId));
+ return this;
+ }
+
+ /**
+ * Setter for number of fields in this Template Record.
+ *
+ * @param filedCount number of fields in this Template Record.
+ * @return this class builder.
+ */
+ public Builder filedCount(int filedCount) {
+ this.filedCount = filedCount;
+ return this;
+ }
+
+ /**
+ * Setter for list of flow template fields.
+ *
+ * @param fields list of flow template fields.
+ * @return this class builder.
+ */
+ public Builder templateFields(List<FlowTemplateField> fields) {
+ this.fields = fields;
+ return this;
+ }
+
+ /**
+ * Setter for flow template fields.
+ *
+ * @param field flow template fields.
+ * @return this class builder.
+ */
+ public Builder templateField(FlowTemplateField field) {
+ this.fields.add(field);
+ return this;
+ }
+
+ /**
+ * Checks arguments for data template record.
+ */
+ private void checkArguments() {
+ checkState(filedCount != 0, "Invalid template filed count.");
+ checkNotNull(templateId, "Template Id cannot be null.");
+
+ }
+
+ /**
+ * Builds data template record.
+ *
+ * @return data template record.
+ */
+ public DataTemplateRecord build() {
+ checkArguments();
+ return new DataTemplateRecord(this);
+ }
+
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/Flow.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/Flow.java
new file mode 100644
index 0000000..2c09e71
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/Flow.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Flow is a collection of Flow Data Record(s).
+ * each containing a set of field values. The Type and
+ * Length of the fields have been previously defined in the
+ * Template Record referenced by the FlowSet ID or Template ID.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public final class Flow {
+
+ /*
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Record 1 - Field Value 1 | Record 1 - Field Value 2 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ */
+
+ private FlowField field;
+
+ private Object value;
+
+ private Flow(Builder builder) {
+ this.field = builder.field;
+ this.value = builder.value;
+ }
+
+ /**
+ * The getter for flow fields.
+ *
+ * @return flow field
+ */
+ public FlowField getField() {
+ return field;
+ }
+
+ /**
+ * Returns flow value.
+ *
+ * @return flow value
+ */
+ public Object getValue() {
+ return value;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public int hashCode() {
+ int hash = 5;
+ hash = 23 * hash + Objects.hashCode(this.field);
+ hash = 23 * hash + Objects.hashCode(this.value);
+ return hash;
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see java.lang.Object#equals(java.lang.Object)
+ */
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final Flow other = (Flow) obj;
+ if (this.field != other.field) {
+ return false;
+ }
+ return Objects.equals(this.value, other.value);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("field", field)
+ .add("value", value)
+ .toString();
+ }
+
+ /**
+ * Flow data value builder.
+ */
+ public static class Builder {
+
+ private FlowField field;
+
+ private Object value;
+
+ /**
+ * Setter for flow fields.
+ *
+ * @param field flow field.
+ * @return this class builder.
+ */
+ public Builder field(FlowField field) {
+ this.field = field;
+ return this;
+ }
+
+ /**
+ * Setter for flow data value.
+ *
+ * @param value flow data value.
+ * @return this class builder.
+ */
+ public Builder value(Object value) {
+ this.value = value;
+ return this;
+ }
+
+ /**
+ * Checks arguments for flow data value.
+ */
+ private void checkArguments() {
+ checkNotNull(field, "flow field cannot be null");
+ checkNotNull(value, "value cannot be null");
+ }
+
+ /**
+ * Builds data flow.
+ *
+ * @return data flow object.
+ */
+ public Flow build() {
+ checkArguments();
+ return new Flow(this);
+ }
+
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowField.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowField.java
new file mode 100644
index 0000000..fcaa120
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowField.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Set;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+
+import static org.onosproject.netflow.NetflowUtils.VAR_INT_LONG;
+import static org.onosproject.netflow.NetflowUtils.VAR_BYTE;
+import static org.onosproject.netflow.NetflowUtils.NULL;
+import static org.onosproject.netflow.NetflowUtils.VAR_SHORT;
+import static org.onosproject.netflow.NetflowUtils.VAR_IPV4_ADDRESS;
+import static org.onosproject.netflow.NetflowUtils.VAR_IPV6_ADDRESS;
+import static org.onosproject.netflow.NetflowUtils.VAR_SHORT_INT;
+import static org.onosproject.netflow.NetflowUtils.VAR_INT;
+import static org.onosproject.netflow.NetflowUtils.VAR_MAC;
+import static org.onosproject.netflow.NetflowUtils.VAR_UNSIGNED_BYTE;
+import static org.onosproject.netflow.NetflowUtils.VAR_UNSIGNED_INT;
+import static org.onosproject.netflow.NetflowUtils.VAR_UNSIGNED_SHORT;;
+
+/**
+ * The flow fields are a selection of Packet Header
+ * fields, lookup results (for example, the autonomous system numbers or
+ * the subnet masks), and properties of the packet such as length.
+ */
+public enum FlowField {
+
+ /**
+ * Flow field type definition reference.
+ * RFC reference :- rfc3954
+ * https://www.ietf.org/rfc/rfc3954.txt
+ * Section :- Field Type Definitions.
+ */
+ IN_BYTES(1, VAR_UNSIGNED_INT),
+ IN_PKTS(2, VAR_UNSIGNED_INT),
+ FLOWS(3, VAR_INT_LONG),
+ PROTOCOL(4, VAR_BYTE),
+ SRC_TOS(5, VAR_BYTE),
+ TCP_FLAGS(6, VAR_BYTE),
+ L4_SRC_PORT(7, VAR_UNSIGNED_SHORT),
+ IPV4_SRC_ADDR(8, VAR_IPV4_ADDRESS),
+ SRC_MASK(9, VAR_BYTE),
+ INPUT_SNMP(10, VAR_UNSIGNED_SHORT),
+ L4_DST_PORT(11, VAR_UNSIGNED_SHORT),
+ IPV4_DST_ADDR(12, VAR_IPV4_ADDRESS),
+ DST_MASK(13, VAR_BYTE),
+ OUTPUT_SNMP(14, VAR_UNSIGNED_SHORT),
+ IPV4_NEXT_HOP(15, VAR_IPV4_ADDRESS),
+ SRC_AS(16, VAR_SHORT_INT),
+ DST_AST(17, VAR_SHORT_INT),
+ BGP_IPV4_NEXT_HOP(18, VAR_IPV4_ADDRESS),
+ MUL_DST_PKTS(19, VAR_INT_LONG),
+ MUL_DST_BYTES(20, VAR_INT_LONG),
+ LAST_SWITCHED(21, VAR_UNSIGNED_INT),
+ FIRST_SWITCHED(22, VAR_UNSIGNED_INT),
+ OUT_BYTES(23, VAR_INT_LONG),
+ OUT_PKTS(24, VAR_INT_LONG),
+ MIN_PKT_LNGTH(25, NULL),
+ MAX_PKT_LNGTH(26, NULL),
+ IPV6_SRC_ADDR(27, VAR_IPV6_ADDRESS),
+ IPV6_DST_ADDR(28, VAR_IPV6_ADDRESS),
+ IPV6_SRC_MASK(29, VAR_BYTE),
+ IPV6_DST_MASK(30, VAR_BYTE),
+ IPV6_FLOW_LABEL(31, NULL),
+ ICMP_TYPE(32, VAR_SHORT),
+ MUL_IGMP_TYPE(33, VAR_BYTE),
+ SAMPLING_INTERVAL(34, VAR_INT),
+ SAMPLING_ALGORITHM(35, VAR_BYTE),
+ FLOW_ACTIVE_TIMEOUT(36, VAR_SHORT),
+ FLOW_INACTIVE_TIMEOUT(37, VAR_SHORT),
+ ENGINE_TYPE(38, VAR_BYTE),
+ ENGINE_ID(39, VAR_BYTE),
+ TOTAL_BYTES_EXP(40, VAR_INT_LONG),
+ TOTAL_PKTS_EXP(41, VAR_INT_LONG),
+ TOTAL_FLOWS_EXP(42, VAR_INT_LONG),
+ IPV4_SRC_PREFIX(44, NULL),
+ IPV4_DST_PREFIX(45, NULL),
+ MPLS_TOP_LABEL_TYPE(46, VAR_BYTE),
+ MPLS_TOP_LABEL_IP_ADDR(47, VAR_IPV4_ADDRESS),
+ FLOW_SAMPLER_ID(48, VAR_BYTE),
+ FLOW_SAMPLER_MODE(49, VAR_BYTE),
+ FLOW_SAMPLER_RANDOM_INTERVAL(50, NULL),
+ MIN_TTL(52, NULL),
+ MAX_TTL(53, NULL),
+ IPV4_IDENT(54, NULL),
+ DST_TOS(55, VAR_BYTE),
+ IN_SRC_MAC(56, VAR_MAC),
+ OUT_DST_MAC(57, VAR_MAC),
+ SRC_VLAN(58, VAR_SHORT),
+ DST_VLAN(59, VAR_SHORT),
+ IP_PROTOCOL_VERSION(60, VAR_UNSIGNED_BYTE),
+ DIRECTION(61, VAR_BYTE),
+ IPV6_NEXT_HOP(62, VAR_IPV6_ADDRESS),
+ BPG_IPV6_NEXT_HOP(63, VAR_IPV6_ADDRESS),
+ IPV6_OPTION_HEADERS(64, VAR_INT),
+ MPLS_LABEL_1(70, NULL),
+ MPLS_LABEL_2(71, NULL),
+ MPLS_LABEL_3(72, NULL),
+ MPLS_LABEL_4(73, NULL),
+ MPLS_LABEL_5(74, NULL),
+ MPLS_LABEL_6(75, NULL),
+ MPLS_LABEL_7(76, NULL),
+ MPLS_LABEL_8(77, NULL),
+ MPLS_LABEL_9(78, NULL),
+ MPLS_LABEL_10(79, NULL),
+ IN_DST_MAC(80, VAR_MAC),
+ OUT_SRC_MAC(81, VAR_MAC),
+ IF_NAME(82, NULL),
+ IF_DESC(83, NULL),
+ SAMPLER_NAME(84, NULL),
+ IN_PERMANENT_BYTES(85, NULL),
+ IN_PERMANENT_PKTS(86, NULL),
+ FRAGMENT_OFFSET(88, NULL),
+ FORWARDING_STATUS(89, NULL),
+ MPLS_PAL_RD(90, NULL),
+ MPLS_PREFIX_LEN(91, NULL),
+ SRC_TRAFFIC_INDEX(92, NULL),
+ DST_TRAFFIC_INDEX(93, NULL),
+ APPLICATION_DESCRIPTION(94, NULL),
+ APPLICATION_TAG(95, NULL),
+ APPLICATION_NAME(96, NULL),
+ POST_IP_DIFF_SERV_CODE_POINT(98, NULL),
+ REPLICATION_FACTOR(99, NULL),
+ DEPRECATED(100, NULL),
+ LAYER2_PACKET_SECTION_OFFSET(102, NULL),
+ LAYER2_PACKET_SECTION_SIZE(103, NULL),
+ LAYER2_PACKET_SECTION_DATA(104, NULL);
+
+
+ final int fieldID;
+ final BiFunction<ByteBuffer, Integer, Object> parser;
+ private static Map<Integer, FlowField> fields = new ConcurrentHashMap<>();
+
+ private FlowField(int fieldID, BiFunction<ByteBuffer, Integer, Object> parser) {
+ this.fieldID = fieldID;
+ this.parser = parser;
+ }
+
+ static {
+ Arrays.stream(FlowField.values()).forEach(f -> fields.put(f.fieldID, f));
+ }
+
+ public static Optional<FlowField> getField(int fieldId) {
+ return Optional.of(fieldId)
+ .filter(id -> fields.containsKey(id))
+ .map(id -> fields.get(id));
+ }
+
+ public static Set<String> getAllFlowField() {
+ return fields.values().stream()
+ .map(f -> f.name())
+ .collect(Collectors.toSet());
+ }
+
+ public BiFunction<ByteBuffer, Integer, Object> getParser() {
+ return this.parser;
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowRecord.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowRecord.java
new file mode 100644
index 0000000..a19a162
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowRecord.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+/**
+ * A Flow Record is a data record that contains values of the Flow
+ * parameters corresponding to a Template Record.
+ */
+public interface FlowRecord {
+ TemplateId getTemplateId();
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowSet.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowSet.java
new file mode 100644
index 0000000..e6c5a50
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowSet.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.onlab.packet.BasePacket;
+import org.onlab.packet.DeserializationException;
+import org.onlab.packet.Deserializer;
+
+/**
+ * FlowSet is a generic term for a collection of Flow Records that have.
+ * a similar structure. In an Export Packet, one or more FlowSets
+ * follow the Packet Header.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public abstract class FlowSet extends BasePacket {
+
+ public static final int FLOW_SET_HEADER_LENTH = 4;
+
+ public static final int FIELD_LENTH = 4;
+
+ public static final int RECORD_HEADER_LENGTH = 4;
+
+ /**
+ * FlowSets type
+ * FlowSets: Template FlowSet, Options Template FlowSet, and Data FlowSet.
+ */
+ public enum Type {
+
+ TEMPLATE_FLOWSET(0, TemplateFlowSet.deserializer()),
+ OPTIONAL_TEMPLATE_FLOWSET(1, OptionalTemplateFlowSet.deserializer()),
+ DATA_FLOWSET(Integer.MAX_VALUE, DataFlowSet.deserializer());
+
+ private final int flowSetId;
+ private final Deserializer deserializer;
+
+ Type(int flowSetId, Deserializer deserializer) {
+ this.flowSetId = flowSetId;
+ this.deserializer = deserializer;
+ }
+
+ private static Map<Integer, Type> parser = new ConcurrentHashMap<>();
+
+ static {
+ Arrays.stream(Type.values()).forEach(type -> parser.put(type.flowSetId, type));
+ }
+
+ public static Type getType(int flowSetId) throws DeserializationException {
+ if (flowSetId < 0) {
+ throw new DeserializationException("Invalid trap type");
+ }
+ return Optional.of(flowSetId)
+ .filter(id -> parser.containsKey(id))
+ .map(id -> parser.get(id))
+ .orElse(DATA_FLOWSET);
+ }
+
+ public Deserializer getDecoder() {
+ return this.deserializer;
+ }
+
+ }
+
+ public abstract Type getType();
+
+ public abstract int getFlowSetId();
+
+ public abstract int getLength();
+
+ @Override
+ public byte[] serialize() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowTemplateField.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowTemplateField.java
new file mode 100644
index 0000000..b274409
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/FlowTemplateField.java
@@ -0,0 +1,148 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Flow template fields.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public final class FlowTemplateField {
+
+ /*
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Field Type 1 | Field Length 1 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Field Type 2 | Field Length 2 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ */
+
+ private FlowField flowField;
+
+ private int length;
+
+ private FlowTemplateField(Builder builder) {
+ this.flowField = builder.flowField;
+ this.length = builder.length;
+ }
+
+ /**
+ * Returns a numeric value that represents the type of the field.
+ *
+ * @return flow field
+ */
+ public FlowField getFlowField() {
+ return flowField;
+ }
+
+ /**
+ * Returns length of the corresponding Field Type, in bytes.
+ *
+ * @return flow value length
+ */
+ public int getLength() {
+ return length;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 7;
+ hash = 97 * hash + Objects.hashCode(this.flowField);
+ hash = 97 * hash + this.length;
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final FlowTemplateField other = (FlowTemplateField) obj;
+ if (this.length != other.length) {
+ return false;
+ }
+ return this.flowField == other.flowField;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("flowField", flowField)
+ .add("length", length)
+ .toString();
+ }
+
+
+ public static class Builder {
+ private FlowField flowField;
+
+ private int length;
+
+ /**
+ * Setter for flowfield.
+ *
+ * @param fieldId flow field.
+ * @return this class builder.
+ */
+ public Builder flowField(int fieldId) {
+ this.flowField = FlowField.getField(fieldId)
+ .orElseThrow(() -> new RuntimeException("Unsupported flow field"));
+ return this;
+ }
+
+ /**
+ * Setter for flow template length.
+ *
+ * @param length flow template length.
+ * @return this class builder.
+ */
+ public Builder length(int length) {
+ this.length = length;
+ return this;
+ }
+
+ /**
+ * Checks arguments for flow template field.
+ */
+ private void checkArguments() {
+ checkState(length != 0, "Flow length can be zero.");
+ checkNotNull(flowField, "Flow field can not be null.");
+ }
+
+ /**
+ * Builds flow template field.
+ *
+ * @return flow template field.
+ */
+ public FlowTemplateField build() {
+ checkArguments();
+ return new FlowTemplateField(this);
+ }
+ }
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/NetflowController.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/NetflowController.java
new file mode 100644
index 0000000..91b3b62
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/NetflowController.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Map;
+
+/**
+ * It control and manage the netflow traffic.
+ * it is Collecting, Storing and analyzing NetFlow data
+ * it can help to understand which applications, and protocols
+ * may be consuming the most network bandwidth by tracking processes,
+ * protocols, times of day, and traffic routing.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public interface NetflowController {
+
+ /**
+ * Add template flowset to controller.
+ * Add new template to controller if it not exist,
+ * otherwise it will replace the existing template.
+ *
+ * @param templateRecord template flowset record.
+ */
+ void addTemplateFlowSet(DataTemplateRecord templateRecord);
+
+ /**
+ * Update data flowset to controller.
+ * it will update new data flowset to store.
+ *
+ * @param dataFlowRecord data flowset record.
+ */
+ void updateDataFlowSet(DataFlowRecord dataFlowRecord);
+
+ /**
+ * Get template flowset from controller.
+ * it will fetch current template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of data template record, optional will be empty if template not found.
+ */
+ Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId);
+
+ /**
+ * Get data flowset from controller.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return list of data flow record, list will be empty if template id not matched.
+ */
+ List<DataFlowRecord> getDataFlowSet(TemplateId templateId);
+
+ /**
+ * Get all template flowsets from controller.
+ * it will fetch all templates from store.
+ *
+ * @return set of data template record, set will be empty if templates not found in the store.
+ */
+ Set<DataTemplateRecord> getTemplateFlowSet();
+
+ /**
+ * Get data flowset from controller.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @return mapping from a template id to data flow record.
+ */
+ Map<TemplateId, List<DataFlowRecord>> getDataFlowSet();
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/NetflowStore.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/NetflowStore.java
new file mode 100644
index 0000000..5dc5719
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/NetflowStore.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Manages inventory of Netflow template and data flowset to distribute
+ * information.
+ */
+public interface NetflowStore {
+
+ /**
+ * Get template flowset from store.
+ * it will fetch current template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of data template record, optional will be empty if template not found.
+ */
+ Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId);
+
+ /**
+ * Get set of template flowsets from store.
+ * it will fetch all template flowsets from store.
+ *
+ * @return set of data template record, set will be empty if templates not found in the store.
+ */
+ Set<DataTemplateRecord> getTemplateFlowSet();
+
+ /**
+ * Get optional template flowset from store.
+ * it will fetch current optional template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of optional template flowset, optional will be empty if template not found.
+ */
+ Optional<OptionalTemplateFlowSet> getOptionalTemplateFlowSet(TemplateId templateId);
+
+ /**
+ * Get set of optional template flowsets from store.
+ * it will fetch all optional template flowsets from store.
+ *
+ * @return set of optional template flowsets, set will be empty if templates not found in the store.
+ */
+ Set<OptionalTemplateFlowSet> getOptionalTemplateFlowSet();
+
+ /**
+ * Get data flowset from store.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return list of data flow record, list will be empty if template id not matched.
+ */
+ List<DataFlowRecord> getDataFlowSet(TemplateId templateId);
+
+ /**
+ * Get data flowset from store.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @return mapping from a template id to data flow record.
+ */
+ Map<TemplateId, List<DataFlowRecord>> getDataFlowSet();
+
+ /**
+ * Update template flowset to the store.
+ * Add new template to store if it not exist,
+ * otherwise it will replace the existing template.
+ *
+ * @param templateRecord template flowset record.
+ */
+ void updateTemplateFlowSet(DataTemplateRecord templateRecord);
+
+ /**
+ * Update optional template flowset to the store.
+ * Add new optional template to store if it not exist,
+ * otherwise it will replace the existing optional template.
+ *
+ * @param optionalTemplateFlowSet optional template flowset.
+ */
+ void updateOptionalTemplateFlowSet(OptionalTemplateFlowSet optionalTemplateFlowSet);
+
+ /**
+ * Add data flow record to the store.
+ * Add new data flow record to store
+ *
+ * @param dataFlowRecord data flow record.
+ */
+ void addDataFlowSet(DataFlowRecord dataFlowRecord);
+
+ /**
+ * Remove template flowset from store.
+ * it will remove template flowset which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ void clearTemplateFlowSet(TemplateId templateId);
+
+ /**
+ * Remove all template flowset from store.
+ * it will remove all template flowsets from store.
+ */
+ void clearTemplateFlowSet();
+
+ /**
+ * Remove optional template flowset from store.
+ * it will remove optional template which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ void clearOptionalTemplateFlowSet(TemplateId templateId);
+
+ /**
+ * Remove all optional template flowset from store.
+ * it will remove all optional template flowsets from store.
+ */
+ void clearOptionalTemplateFlowSet();
+
+ /**
+ * Remove data flowset from store.
+ * it will remove dataflowset which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ void clearDataFlowSet(TemplateId templateId);
+
+ /**
+ * Remove all data flowset from store.
+ * it will remove all data flowsets from store.
+ */
+ void clearDataFlowSet();
+
+ /**
+ * Remove template, optional template and data flowsets from store.
+ * it will remove all flowsets from store.
+ */
+ void clearAllFlowSet();
+
+}
\ No newline at end of file
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/NetflowUtils.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/NetflowUtils.java
new file mode 100644
index 0000000..ebadd3d
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/NetflowUtils.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.nio.ByteBuffer;
+import java.util.function.BiFunction;
+import org.onlab.packet.IpAddress;
+
+
+/**
+ * Netflow utility.
+ */
+public final class NetflowUtils {
+ private NetflowUtils() {
+ }
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_BYTE = (bb, len) -> {
+ return bb.get();
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_UNSIGNED_BYTE = (bb, len) -> {
+ return getUnsignedByte(bb);
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_SHORT = (bb, len) -> {
+ return bb.getShort();
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_UNSIGNED_SHORT = (bb, len) -> {
+ return asUnsignedShort(bb);
+ };
+
+ public static short getUnsignedByte(ByteBuffer bb) {
+ return ((short) (bb.get() & 0xFF));
+ }
+
+ public static int asUnsignedShort(ByteBuffer bb) {
+ return bb.getShort() & 0xFFFF;
+ }
+
+ public static long getUnsignedInt(ByteBuffer bb) {
+ return ((long) bb.getInt() & 0xFFFFFFFFL);
+ }
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_INT = (bb, len) -> {
+ return bb.getInt();
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_UNSIGNED_INT = (bb, len) -> {
+ return getUnsignedInt(bb);
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_FLOAT = (bb, len) -> {
+ return bb.getFloat();
+ };
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_LONG = (bb, len) -> {
+ return bb.getLong();
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> NULL = (bb, len) -> {
+ byte[] address = new byte[len];
+ bb.get(address);
+ return null;
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, byte[]> BYTE_ARRAY = (bb, len) -> {
+ byte[] bytes = new byte[len];
+ bb.get(bytes);
+ return bytes;
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_INT_LONG = (bb, len) -> {
+ if (len == 4) {
+ return bb.getInt();
+ }
+ return bb.getLong();
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_SHORT_INT = (bb, len) -> {
+ if (len == 2) {
+ return bb.getShort();
+ }
+ return bb.getInt();
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_IPV4_ADDRESS = (bb, len) -> {
+ byte[] value = getByteArray(bb, len);
+ return IpAddress.valueOf(IpAddress.Version.INET, value).toInetAddress().getHostAddress();
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_IPV6_ADDRESS = (bb, len) -> {
+ byte[] value = getByteArray(bb, len);
+ return IpAddress.valueOf(IpAddress.Version.INET6, value).toInetAddress().getHostAddress();
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_MAC = (bb, len) -> {
+ byte[] mac = new byte[len];
+ bb.get(mac);
+ //return MacAddress.valueOf(mac);;
+ return null;
+ };
+
+ private static byte[] getByteArray(ByteBuffer bb, int length) {
+ byte[] bytes = new byte[length];
+ bb.get(bytes);
+ return bytes;
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/OptionalTemplateFlowSet.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/OptionalTemplateFlowSet.java
new file mode 100644
index 0000000..068a814
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/OptionalTemplateFlowSet.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.util.List;
+
+import org.onlab.packet.Deserializer;
+
+
+/**
+ * The Options Template Record (and its corresponding Options Data
+ * Record) is used to supply information about the NetFlow process
+ * configuration or NetFlow process specific data, rather than supplying
+ * information about IP Flows.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public class OptionalTemplateFlowSet extends FlowSet {
+
+ /*
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | FlowSet ID = 1 | Length |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Template ID | Option Scope Length |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Option Length | Scope 1 Field Type |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Scope 1 Field Length | ... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Scope N Field Length | Option 1 Field Type |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Option 1 Field Length | ... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Option M Field Length | Padding |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ */
+
+ private int flowSetId;
+
+ private int length;
+
+ private TemplateId templateId;
+
+ private List<DataTemplateRecord> records;
+
+ /**
+ * Returns flowset type.
+ *
+ * @return flowset type
+ */
+ @Override
+ public Type getType() {
+ return Type.OPTIONAL_TEMPLATE_FLOWSET;
+ }
+
+ /**
+ * Returns flowset id.
+ * FlowSet ID value of 1 is reserved for the Options Template.
+ *
+ * @return flow set ID
+ */
+ public int getFlowSetId() {
+ return flowSetId;
+ }
+
+ /**
+ * Returns total length of this FlowSet.
+ * Each Options Template FlowSet
+ * MAY contain multiple Options Template Records. Thus, the
+ * Length value MUST be used to determine the position of the next
+ * FlowSet record, which could be either a Template FlowSet or
+ * Data FlowSet.
+ *
+ * @return flow set ID
+ */
+ public int getLength() {
+ return length;
+ }
+
+ /**
+ * Returns template record's template id.
+ * Template Records is given a unique Template ID.
+ * This uniqueness is local to the Observation
+ * Domain that generated the Template ID. Template IDs 0-255 are
+ * reserved for Template FlowSets, Options FlowSets, and other
+ * reserved FlowSets yet to be created.
+ *
+ * @return list of flowsets
+ */
+ public TemplateId getTemplateId() {
+ return templateId;
+ }
+
+ /**
+ * Returns list of optional data template records.
+ *
+ * @return list of optional data template records
+ */
+ public List<DataTemplateRecord> getRecords() {
+ return records;
+ }
+
+ /**
+ * Deserializer function for data option template flowset.
+ *
+ * @return data deserializer function
+ */
+ public static Deserializer<OptionalTemplateFlowSet> deserializer() {
+ return (data, offset, length) -> {
+ //TODO parse optional template
+ return new OptionalTemplateFlowSet();
+ };
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/SourceId.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/SourceId.java
new file mode 100644
index 0000000..1b7731f
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/SourceId.java
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.util.Objects;
+import com.google.common.base.MoreObjects;
+
+/**
+ * SourceId is that identifies the Exporter Observation Domain.
+ * NetFlow Collectors SHOULD use the combination of the source IP
+ * address and the Source ID field to separate different export
+ * streams originating from the same Exporter.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public class SourceId {
+
+ private String sourceIp;
+
+ private int id;
+
+ public SourceId(int id, String sourceIp) {
+ this.sourceIp = sourceIp;
+ this.id = id;
+ }
+
+ public SourceId(int id) {
+ this.id = id;
+ }
+
+ /**
+ * Returns exporter ip address.
+ *
+ * @return exporter ip address.
+ */
+ public String getSourceIp() {
+ return sourceIp;
+ }
+
+ /**
+ * Returns exporter unique id.
+ *
+ * @return exporter unique id.
+ */
+ public int getId() {
+ return id;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 7;
+ hash = 23 * hash + Objects.hashCode(this.sourceIp);
+ hash = 23 * hash + this.id;
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final SourceId other = (SourceId) obj;
+ if (this.id != other.id) {
+ return false;
+ }
+ return Objects.equals(this.sourceIp, other.sourceIp);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("sourceIp", sourceIp)
+ .add("id", id)
+ .toString();
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/TemplateFlowSet.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/TemplateFlowSet.java
new file mode 100644
index 0000000..6ba1d66
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/TemplateFlowSet.java
@@ -0,0 +1,267 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+import org.onlab.packet.DeserializationException;
+import org.onlab.packet.Deserializer;
+
+/**
+ * One of the essential elements in the NetFlow format is the Template
+ * FlowSet. Templates greatly enhance the flexibility of the Flow
+ * Record format because they allow the NetFlow Collector to process
+ * Flow Records without necessarily knowing the interpretation of all
+ * the data in the Flow Record.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public final class TemplateFlowSet extends FlowSet {
+
+ /*
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | FlowSet ID = 0 | Length |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Template ID 256 | Field Count |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Field Type 1 | Field Length 1 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Field Type 2 | Field Length 2 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | ... | ... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Field Type N | Field Length N |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Template ID 257 | Field Count |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Field Type 1 | Field Length 1 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Field Type 2 | Field Length 2 |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | ... | ... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Field Type M | Field Length M |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | ... | ... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Template ID K | Field Count |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | ... | ... |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ */
+
+ private int flowSetId;
+
+ private int length;
+
+ private List<DataTemplateRecord> records;
+
+ private TemplateFlowSet(Builder builder) {
+ this.records = builder.records;
+ this.length = builder.length;
+ this.flowSetId = builder.flowSetId;
+ }
+
+ /**
+ * Return template flow set id.
+ * FlowSet ID value of 0 is reserved for the Template FlowSet.
+ *
+ * @return flow set ID
+ */
+ @Override
+ public int getFlowSetId() {
+ return this.flowSetId;
+ }
+
+ /**
+ * Returns total length of this flowSet.
+ *
+ * @return length of flowset
+ */
+ @Override
+ public int getLength() {
+ return this.length;
+ }
+
+ /**
+ * Returns list of flow records.
+ *
+ * @return list of flow records
+ */
+ public List<DataTemplateRecord> getRecords() {
+ return records;
+ }
+
+ /**
+ * Returns type of the flowset.
+ *
+ * @return type of the flowset
+ */
+ @Override
+ public Type getType() {
+ return Type.TEMPLATE_FLOWSET;
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 7;
+ hash = 53 * hash + this.flowSetId;
+ hash = 53 * hash + this.length;
+ hash = 53 * hash + Objects.hashCode(this.records);
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final TemplateFlowSet other = (TemplateFlowSet) obj;
+ if (this.flowSetId != other.flowSetId) {
+ return false;
+ }
+ if (this.length != other.length) {
+ return false;
+ }
+ return Objects.equals(this.records, other.records);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("flowSetId", flowSetId)
+ .add("length", length)
+ .add("records", records)
+ .toString();
+ }
+
+ /**
+ * Data deserializer function for template flow set.
+ *
+ * @return data deserializer function
+ */
+ public static Deserializer<TemplateFlowSet> deserializer() {
+ return (data, offset, length) -> {
+
+ Predicate<ByteBuffer> isValidBuffer = b -> b.remaining() < FlowSet.FIELD_LENTH;
+
+ ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+ if (isValidBuffer.test(bb)) {
+ throw new DeserializationException("Invalid buffer size");
+ }
+ Builder builder = new Builder()
+ .flowSetId(bb.getShort())
+ .length(bb.getShort());
+ while (bb.hasRemaining()) {
+ if (isValidBuffer.test(bb)) {
+ break;
+ }
+ int templateId = bb.getShort();
+ int fieldCount = bb.getShort();
+ int bufferLength = (fieldCount * FlowSet.FIELD_LENTH) + FlowSet.FIELD_LENTH;
+ byte[] record = new byte[bufferLength];
+ bb.position(bb.position() - FlowSet.FIELD_LENTH);
+ if (bb.remaining() < bufferLength) {
+ break;
+ }
+ bb.get(record);
+ builder.templateRecord(DataTemplateRecord.deserializer().deserialize(record, 0, bufferLength));
+
+ }
+ return builder.build();
+ };
+ }
+
+ /**
+ * Builder for template flow set.
+ */
+ private static class Builder {
+
+ private int flowSetId;
+
+ private int length;
+
+ private List<DataTemplateRecord> records = new LinkedList<>();
+
+ /**
+ * Setter for template flow set id.
+ *
+ * @param flowSetId template flow set id.
+ * @return this class builder.
+ */
+ public Builder flowSetId(int flowSetId) {
+ this.flowSetId = flowSetId;
+ return this;
+ }
+
+ /**
+ * Setter for total length of this flowSet.
+ *
+ * @param length total length of this flowSet.
+ * @return this class builder.
+ */
+ public Builder length(int length) {
+ this.length = length;
+ return this;
+ }
+
+ /**
+ * Setter for list of flow records.
+ *
+ * @param records list of flow records.
+ * @return this class builder.
+ */
+ public Builder templateRecords(List<DataTemplateRecord> records) {
+ this.records = records;
+ return this;
+ }
+
+ /**
+ * Setter for flow records.
+ *
+ * @param record flow records.
+ * @return this class builder.
+ */
+ public Builder templateRecord(DataTemplateRecord record) {
+ this.records.add(record);
+ return this;
+ }
+
+ /**
+ * Builds template flow set.
+ *
+ * @return template flow set.
+ */
+ public TemplateFlowSet build() {
+ return new TemplateFlowSet(this);
+ }
+
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/TemplateId.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/TemplateId.java
new file mode 100644
index 0000000..8ec8d07
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/TemplateId.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import org.onlab.util.Identifier;
+
+/**
+ * Template Records is given a unique Template ID.
+ * This uniqueness is local to the Observation
+ * Domain that generated the Template ID. Template IDs 0-255 are
+ * reserved for Template FlowSets, Options FlowSets, and other
+ * reserved FlowSets yet to be created. Template IDs of Data
+ * FlowSets are numbered from 256 to 65535.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public class TemplateId extends Identifier<Integer> {
+
+ /**
+ * Default constructor.
+ *
+ * @param id template id.
+ */
+ public TemplateId(int id) {
+ super(id);
+ }
+
+ /**
+ * Get the value of the template id.
+ *
+ * @return the value of the template id.
+ */
+ public int getId() {
+ return identifier;
+ }
+
+ /**
+ * Get the value of the templateid object.
+ *
+ * @param id template id.
+ * @return the value of the templateid object.
+ */
+ public static TemplateId valueOf(int id) {
+ return new TemplateId(id);
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/TemplateRecord.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/TemplateRecord.java
new file mode 100644
index 0000000..744db66
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/TemplateRecord.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import org.onlab.packet.BasePacket;
+
+/**
+ * A Template Record defines the structure and interpretation of fields.
+ * in a Flow Data Record.
+ */
+public abstract class TemplateRecord extends BasePacket implements FlowRecord {
+
+ @Override
+ public byte[] serialize() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+}
diff --git a/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/package-info.java b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/package-info.java
new file mode 100644
index 0000000..4ae967a
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/api/src/main/java/org/onosproject/netflow/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Sample application that assigns and manages netflow collector.
+ */
+package org.onosproject.netflow;
\ No newline at end of file
diff --git a/apps/ipflow-monitor/netflow/app/BUILD b/apps/ipflow-monitor/netflow/app/BUILD
new file mode 100644
index 0000000..0e6c0ee
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/BUILD
@@ -0,0 +1,11 @@
+COMPILE_DEPS = CORE_DEPS + KRYO + CLI + [
+ "//core/store/serializers:onos-core-serializers",
+ "//apps/ipflow-monitor/netflow/api:onos-apps-ipflow-monitor-netflow-api",
+ "@io_netty_netty_common//jar",
+ "@io_netty_netty//jar",
+]
+
+osgi_jar(
+ karaf_command_packages = ["org.onosproject.netflow.cli"],
+ deps = COMPILE_DEPS,
+)
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java
new file mode 100644
index 0000000..760d40b
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.api.console.Completer;
+import org.apache.karaf.shell.api.console.CommandLine;
+import org.apache.karaf.shell.api.console.Session;
+import org.apache.karaf.shell.support.completers.StringsCompleter;
+
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Flow field completer.
+ */
+@Service
+public class FlowFieldCompleter implements Completer {
+
+ @Override
+ public int complete(Session session, CommandLine commandLine, List<String> candidates) {
+
+ StringsCompleter delegate = new StringsCompleter();
+ Set<String> flowfields = FlowField.getAllFlowField();
+ SortedSet<String> strings = delegate.getStrings();
+ for (String field : flowfields) {
+ strings.add(field);
+ }
+ return delegate.complete(session, commandLine, candidates);
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java
new file mode 100644
index 0000000..7a5db0d
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Map;
+import java.util.Collection;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+
+/**
+ * Lists all netflow data flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-dataflow",
+ description = "Lists all data flowsets received from netflow exporter.")
+public class NetflowDataflowCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "templateid",
+ description = "Data flowset template id",
+ required = false, multiValued = false)
+ int templateID = 0;
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+
+ if (templateID < 0) {
+ print("Invalid template ID");
+ return;
+ }
+ if (templateID > 0) {
+ List<DataFlowRecord> dataFlowSet = controller.getDataFlowSet(TemplateId.valueOf(templateID));
+ if (!dataFlowSet.isEmpty()) {
+ dataFlowSet.stream().forEach(data -> printDataflowSet(data));
+ } else {
+ print("Default template not found");
+ }
+ } else {
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Default template not found");
+ } else {
+ dataFlowSets.values().stream().flatMap(Collection::stream).forEach(data -> printDataflowSet(data));
+ }
+ }
+ }
+
+ /**
+ * Adds data flowset details in specified row wise.
+ *
+ * @param dataFlowRecord data flowset record.
+ */
+ private void printDataflowSet(DataFlowRecord dataFlowRecord) {
+ print("Template ID : %d", dataFlowRecord.getTemplateId().getId());
+ dataFlowRecord.getFlows().forEach(dataflow -> {
+ print("Field : %s, Value : %s", dataflow.getField().name(),
+ dataflow.getValue().toString());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java
new file mode 100644
index 0000000..c23acdf
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.DataFlowRecord;
+
+/**
+ * Netflow overall summary report.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-summary",
+ description = "Summary of template flowsets and data flowsets.")
+public class NetflowSummaryCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ Set<DataTemplateRecord> templates = controller.getTemplateFlowSet();
+ if (templates.isEmpty()) {
+ print("Template not found");
+ } else {
+ Set<Integer> templateIds = templates.stream()
+ .map(DataTemplateRecord::getTemplateId)
+ .map(TemplateId::getId)
+ .collect(Collectors.toSet());
+ printTemplateflowSet(templateIds);
+
+ }
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Data not found");
+ } else {
+ printDataflowSet(dataFlowSets);
+ }
+ }
+
+ /**
+ * Adds template flowset summary in specified row wise.
+ *
+ * @param templateIds template ids
+ */
+ private void printTemplateflowSet(Set<Integer> templateIds) {
+ print("Number of templates : %d, Template IDs : %s",
+ templateIds.size(),
+ templateIds.toString());
+ }
+
+ /**
+ * Adds data flowset summary in specified row wise.
+ *
+ * @param dataflowMap data flow map
+ */
+ private void printDataflowSet(Map<TemplateId, List<DataFlowRecord>> dataflowMap) {
+ dataflowMap.entrySet().forEach(entry -> {
+ print("Template ID : %d, Data flow count : %d",
+ entry.getKey().getId(),
+ entry.getValue().size());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java
new file mode 100644
index 0000000..abfbb73
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.Set;
+import java.util.Optional;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataTemplateRecord;
+
+/**
+ * Lists all netflow template flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-template",
+ description = "Lists all template flowsets received from netflow exporter.")
+public class NetflowTemplateCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "templateID",
+ description = "Netflow template ID",
+ required = false, multiValued = false)
+ int templateID = 0;
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ if (templateID < 0) {
+ print("Invalid template ID");
+ return;
+ }
+ if (templateID > 0) {
+ Optional<DataTemplateRecord> template = controller.getTemplateFlowSet(TemplateId.valueOf(templateID));
+ if (template.isPresent()) {
+ printTemplate(template.get());
+ } else {
+ print("Default template not found");
+ }
+ } else {
+ Set<DataTemplateRecord> templates = controller.getTemplateFlowSet();
+ if (templates.isEmpty()) {
+ print("Default template not found");
+ } else {
+ templates.stream().forEach(template -> printTemplate(template));
+ }
+ }
+ }
+
+ /**
+ * Adds template flowset details in specified row wise.
+ *
+ * @param template template flow set.
+ */
+ private void printTemplate(DataTemplateRecord template) {
+ print("Template ID : %d, Field Count : %d",
+ template.getTemplateId().getId(),
+ template.getFiledCount());
+ template.getFields().forEach(flowTemplateField -> {
+ print("Field : %s, Length : %d",
+ flowTemplateField.getFlowField().name(),
+ flowTemplateField.getLength());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java
new file mode 100644
index 0000000..51cefa1
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Optional;
+import java.util.Map;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Lists all filtered data flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-traffic-filter",
+ description = "Lists all filtered data flowsets received from netflow exporter.")
+public class NetflowTrafficFilterCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "field", description = "flow field",
+ required = false, multiValued = false)
+ @Completion(FlowFieldCompleter.class)
+ protected String field = null;
+
+ @Argument(index = 1, name = "value", description = "flow value",
+ required = false, multiValued = false)
+ protected String value = null;
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Data not found");
+ return;
+ }
+ dataFlowSets.values()
+ .stream()
+ .flatMap(Collection::stream)
+ .filter(df -> {
+ Optional<Object> fieldValue = getFieldValue(df.getFlows(), field);
+ if (fieldValue.isPresent() && fieldValue.toString().equals(value)) {
+ return true;
+ }
+ return false;
+ })
+ .forEach(df -> prinDataflowSet(df));
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field.
+ *
+ * @param flows collection of flows
+ * @param field flow field
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, String field) {
+ FlowField flowField = FlowField.valueOf(field);
+ return flows.stream()
+ .filter(flow -> flow.getField() == flowField)
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field predicates.
+ *
+ * @param flows collection of flows
+ * @param field flow field predicates
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, Predicate<FlowField> field) {
+ return flows.stream()
+ .filter(flow -> field.test(flow.getField()))
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+
+ /**
+ * Adds data flowset record details in specified row wise.
+ *
+ * @param dataFlowRecord data flowset record.
+ */
+ private void prinDataflowSet(DataFlowRecord dataFlowRecord) {
+ print("Template ID : %d", dataFlowRecord.getTemplateId().getId());
+ dataFlowRecord.getFlows().forEach(dataflow -> {
+ print("Field : %s, Value : %s",
+ dataflow.getField().name(),
+ dataflow.getValue().toString());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java
new file mode 100644
index 0000000..fec9ba7
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Optional;
+import java.util.Map;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import java.util.HashMap;
+import java.util.function.Predicate;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Flow traffic summary report.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-traffic-summary",
+ description = "Summary of data flowset records based on in/out bytes/packets, protocols or applications.")
+public class NetflowTrafficSummaryCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Data not found");
+ return;
+ }
+
+ Map<String, List<DataFlowRecord>> ds = dataFlowSets.values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.groupingBy(d -> hostWiseFlow(d.getFlows())));
+ printHostTraffic(ds);
+
+ Map<String, Map<String, List<DataFlowRecord>>> protocol = new HashMap<>();
+ ds.entrySet().forEach(entry -> {
+ protocol.put(entry.getKey(), protocolWiseFlow(entry.getValue()));
+ });
+ printPrtocolTraffic(protocol);
+
+ Map<String, Map<String, List<DataFlowRecord>>> application = new HashMap<>();
+ ds.entrySet().forEach(entry -> {
+ application.put(entry.getKey(), applicationWiseFlow(entry.getValue()));
+ });
+ printApplicationTraffic(application);
+ }
+
+ /**
+ * Host wise flow filter.
+ * ipv4 and ipv6 host flow fields will be filtered and flow direction
+ * added from src ip to dst ip
+ *
+ * @param flows collection of flows
+ */
+ private String hostWiseFlow(List<Flow> flows) {
+ Predicate<FlowField> srcIpParser = f -> (f.equals(FlowField.IPV4_SRC_ADDR) ||
+ f.equals(FlowField.IPV6_SRC_ADDR));
+ Predicate<FlowField> dstIpParser = f -> (f.equals(FlowField.IPV4_DST_ADDR) ||
+ f.equals(FlowField.IPV6_DST_ADDR));
+ String srcIp = getFieldValue(flows, srcIpParser).get().toString();
+ String dstIp = getFieldValue(flows, dstIpParser).get().toString();
+ return srcIp + " => " + dstIp;
+ }
+
+ /**
+ * Summary of all ingress bytes.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private int totalInBytes(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .map(data -> inBytes(data.getFlows()))
+ .collect(Collectors.summingInt(Integer::intValue));
+ }
+
+ /**
+ * Summary of all ingress packets.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private int totalInPackets(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .map(data -> inPackets(data.getFlows()))
+ .collect(Collectors.summingInt(Integer::intValue));
+ }
+
+ /**
+ * Application protocol wise flow filter.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private Map<String, List<DataFlowRecord>> applicationWiseFlow(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .collect(Collectors.groupingBy(d -> getFieldValue(d.getFlows(),
+ FlowField.L4_DST_PORT).get().toString()));
+
+ }
+
+ /**
+ * Transport protocol wise flow filter.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private Map<String, List<DataFlowRecord>> protocolWiseFlow(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .collect(Collectors.groupingBy(d -> getFieldValue(d.getFlows(), FlowField.PROTOCOL).get().toString()));
+
+ }
+
+ /**
+ * Filter ingress bytes from flows and type cast to integer.
+ *
+ * @param flows collection of flows.
+ */
+ private int inBytes(List<Flow> flows) {
+ return Integer.parseInt(getFieldValue(flows, FlowField.IN_BYTES).get().toString());
+ }
+
+ /**
+ * Filter ingress packets from flows and type cast to integer.
+ *
+ * @param flows collection of flows.
+ */
+ private int inPackets(List<Flow> flows) {
+ return Integer.parseInt(getFieldValue(flows, FlowField.IN_PKTS).get().toString());
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field.
+ *
+ * @param flows collection of flows
+ * @param field flow field
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, FlowField field) {
+ return flows.stream()
+ .filter(flow -> flow.getField() == field)
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field predicates.
+ *
+ * @param flows collection of flows
+ * @param field flow field predicates
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, Predicate<FlowField> field) {
+ return flows.stream()
+ .filter(flow -> field.test(flow.getField()))
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+ /**
+ * Adds host wise traffic summary in specified row wise.
+ *
+ * @param hosts mapping of host and traffic details.
+ */
+ private void printHostTraffic(Map<String, List<DataFlowRecord>> hosts) {
+ print("\nHost wise traffic flow");
+ hosts.entrySet().forEach(entry -> {
+ print("Traffic flow : %s, Total bytes : %d, Total packets : %d",
+ entry.getKey(),
+ totalInBytes(entry.getValue()),
+ totalInPackets(entry.getValue()));
+ });
+ print("\n");
+ }
+
+ /**
+ * Adds protocol wise traffic summary in specified row wise.
+ *
+ * @param protocol mapping of portocol and traffic details.
+ */
+ private void printPrtocolTraffic(Map<String, Map<String, List<DataFlowRecord>>> protocol) {
+ print("Protocol wise traffic flow");
+ protocol.entrySet().forEach(entry -> {
+ entry.getValue().entrySet().forEach(port -> {
+ print("Traffic flow : %s, Protocol : %s, Total bytes : %d, Total packets : %d",
+ entry.getKey(),
+ port.getKey(),
+ totalInBytes(port.getValue()),
+ totalInPackets(port.getValue()));
+ });
+ });
+ print("\n");
+ }
+
+ /**
+ * Adds application wise traffic summary in specified row wise.
+ *
+ * @param applications mapping of application and traffic details.
+ */
+ private void printApplicationTraffic(Map<String, Map<String, List<DataFlowRecord>>> applications) {
+ print("Application wise traffic flow");
+ applications.entrySet().forEach(entry -> {
+ entry.getValue().entrySet().forEach(port -> {
+ print("Traffic flow : %s, Application : %s, Total bytes : %d, Total packets : %d",
+ entry.getKey(),
+ port.getKey(),
+ totalInBytes(port.getValue()),
+ totalInPackets(port.getValue()));
+ });
+ });
+ print("\n");
+ }
+}
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java
new file mode 100644
index 0000000..f7183d6
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * CLI implementation for netflow collector.
+ */
+package org.onosproject.netflow.cli;
\ No newline at end of file
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java
new file mode 100644
index 0000000..8f17232
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netflow.impl;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.netflow.NetflowController;
+
+/**
+ * The main controller class. Handles all setup and network listeners -
+ * Ownership of netflow message receiver.
+ */
+public class Controller {
+
+ private static final Logger log = LoggerFactory.getLogger(Controller.class);
+
+ private ChannelGroup channelGroup;
+ private Channel serverChannel;
+
+ // Configuration options
+ protected static final short NETFLOW_PORT_NUM = 2055;
+ private final int workerThreads = 16;
+
+ // Start time of the controller
+ private long systemStartTime;
+
+ private ChannelFactory serverExecFactory;
+
+ private static final int BUFFER_SIZE = 5 * 1024;
+
+ private NetflowController controller;
+
+ /**
+ * Constructor to initialize the values.
+ *
+ * @param controller netflow controller instance
+ */
+ public Controller(NetflowController controller) {
+ this.controller = controller;
+ }
+
+ /**
+ * To get system start time.
+ *
+ * @return system start time in milliseconds
+ */
+ public long getSystemStartTime() {
+ return (this.systemStartTime);
+ }
+
+ /**
+ * Initialize timer.
+ */
+ public void init() {
+ this.systemStartTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Gets run time memory.
+ *
+ * @return run time memory
+ */
+ public Map<String, Long> getMemory() {
+ Map<String, Long> m = new HashMap<>();
+ Runtime runtime = Runtime.getRuntime();
+ m.put("total", runtime.totalMemory());
+ m.put("free", runtime.freeMemory());
+ return m;
+ }
+
+ /**
+ * Gets UP time.
+ *
+ * @return UP time
+ */
+ public Long getUptime() {
+ RuntimeMXBean rb = ManagementFactory.getRuntimeMXBean();
+ return rb.getUptime();
+ }
+
+ /**
+ * Netflow collector it will receive message from netflow exporter.
+ */
+ public void run() {
+
+ try {
+
+ final ConnectionlessBootstrap bootstrap = createServerBootStrap();
+
+ bootstrap.setOption("reuseAddress", false);
+ bootstrap.setOption("child.reuseAddress", false);
+ bootstrap.setOption("readBufferSize", BUFFER_SIZE); //15M
+ bootstrap.setOption("receiveBufferSizePredictor",
+ new FixedReceiveBufferSizePredictor(BUFFER_SIZE));
+ bootstrap.setOption("receiveBufferSizePredictorFactory",
+ new FixedReceiveBufferSizePredictorFactory(BUFFER_SIZE));
+ ChannelPipelineFactory pfact = new NetflowPipelineFactory(this.controller);
+
+ bootstrap.setPipelineFactory(pfact);
+ InetSocketAddress inetSocketAddress = new InetSocketAddress(NETFLOW_PORT_NUM);
+ channelGroup = new DefaultChannelGroup();
+ serverChannel = bootstrap.bind(inetSocketAddress);
+ channelGroup.add(serverChannel);
+ log.info("Listening for netflow exporter connection on {}", inetSocketAddress);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Creates server boot strap.
+ *
+ * @return ServerBootStrap
+ */
+ private ConnectionlessBootstrap createServerBootStrap() {
+
+ if (workerThreads == 0) {
+ serverExecFactory = new NioDatagramChannelFactory(
+ Executors.newFixedThreadPool(2));
+ return new ConnectionlessBootstrap(serverExecFactory);
+ } else {
+ serverExecFactory = new NioDatagramChannelFactory(
+ Executors.newFixedThreadPool(2),
+ workerThreads);
+ return new ConnectionlessBootstrap(serverExecFactory);
+ }
+ }
+
+ /**
+ * Stops the netflow collector.
+ */
+ public void stop() {
+ log.info("Stopped");
+ channelGroup.close();
+ }
+
+ /**
+ * Starts the netflow collector.
+ */
+ public void start() {
+ log.info("Started");
+ this.run();
+ }
+}
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java
new file mode 100644
index 0000000..90d75dd
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.Optional;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.onosproject.netflow.NetflowStore;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+import org.onosproject.netflow.FlowTemplateField;
+import org.onosproject.netflow.SourceId;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataRecord;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.FlowSet;
+import org.onosproject.netflow.DataFlowSet;
+import org.onosproject.netflow.TemplateRecord;
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.OptionalTemplateFlowSet;
+import org.onosproject.netflow.TemplateFlowSet;
+import org.onlab.packet.BasePacket;
+
+/**
+ * Manages inventory of Netflow template and data flowset to distribute
+ * information.
+ */
+@Component(immediate = true, service = NetflowStore.class)
+public class DistributedNetflowStore implements NetflowStore {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private EventuallyConsistentMap<TemplateId, DataTemplateRecord> templateFlowSet;
+ private EventuallyConsistentMap<TemplateId, OptionalTemplateFlowSet> optionalTemplateFlowSet;
+ private EventuallyConsistentMap<TemplateId, List<DataFlowRecord>> dataFlowSet;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Activate
+ protected void activate() {
+ KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.MISC)
+ .register(List.class)
+ .register(LinkedList.class)
+ .register(ArrayList.class)
+ .register(HashSet.class)
+ .register(BasePacket.class)
+ .register(Flow.class)
+ .register(FlowField.class)
+ .register(FlowTemplateField.class)
+ .register(SourceId.class)
+ .register(TemplateId.class)
+ .register(DataRecord.class)
+ .register(DataFlowRecord.class)
+ .register(FlowSet.class)
+ .register(DataFlowSet.class)
+ .register(TemplateRecord.class)
+ .register(DataTemplateRecord.class)
+ .register(NetFlowPacket.class)
+ .register(OptionalTemplateFlowSet.class)
+ .register(TemplateFlowSet.class)
+ .register(TemplateRecord.class);
+
+ templateFlowSet = storageService.<TemplateId, DataTemplateRecord>eventuallyConsistentMapBuilder()
+ .withSerializer(serializer)
+ .withName("netflow-templateflowset")
+ .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+
+ optionalTemplateFlowSet = storageService.<TemplateId, OptionalTemplateFlowSet>eventuallyConsistentMapBuilder()
+ .withSerializer(serializer)
+ .withName("netflow-optionaltemplateflowset")
+ .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+
+ dataFlowSet = storageService.<TemplateId, List<DataFlowRecord>>eventuallyConsistentMapBuilder()
+ .withSerializer(serializer)
+ .withName("netflow-dataflowset")
+ .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactive() {
+ log.info("Stopped");
+ }
+
+ /**
+ * Get template flowset from store.
+ * it will fetch current template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of data template record, optional will be empty if template not found.
+ */
+ @Override
+ public Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId) {
+ return Optional.ofNullable(templateFlowSet.get(templateId));
+ }
+
+ /**
+ * Get set of template flowsets from store.
+ * it will fetch all template flowsets from store.
+ *
+ * @return set of data template record, set will be empty if templates not found in the store.
+ */
+ @Override
+ public Set<DataTemplateRecord> getTemplateFlowSet() {
+ return templateFlowSet.values().stream()
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Get optional template flowset from store.
+ * it will fetch current optional template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of optional template flowset, optional will be empty if template not found.
+ */
+ @Override
+ public Optional<OptionalTemplateFlowSet> getOptionalTemplateFlowSet(TemplateId templateId) {
+ return Optional.ofNullable(optionalTemplateFlowSet.get(templateId));
+ }
+
+ /**
+ * Get set of optional template flowsets from store.
+ * it will fetch all optional template flowsets from store.
+ *
+ * @return set of optional template flowsets, set will be empty if templates not found in the store.
+ */
+ @Override
+ public Set<OptionalTemplateFlowSet> getOptionalTemplateFlowSet() {
+ return ImmutableSet.copyOf(optionalTemplateFlowSet.values());
+ }
+
+ /**
+ * Get data flowset from store.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return list of data flow record, list will be empty if template id not matched.
+ */
+ @Override
+ public List<DataFlowRecord> getDataFlowSet(TemplateId templateId) {
+ List<DataFlowRecord> dataRecord = dataFlowSet.get(templateId);
+ if (!Objects.isNull(dataRecord)) {
+ return ImmutableList.copyOf(dataRecord);
+ }
+ return Lists.newArrayList();
+ }
+
+ /**
+ * Get data flowset from store.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @return mapping from a template id to data flow record.
+ */
+ @Override
+ public Map<TemplateId, List<DataFlowRecord>> getDataFlowSet() {
+ return dataFlowSet.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
+ }
+
+ /**
+ * Update template flowset to the store.
+ * Add new template to store if it not exist,
+ * otherwise it will replace the existing template.
+ *
+ * @param templateRecord template flowset record.
+ */
+ @Override
+ public void updateTemplateFlowSet(DataTemplateRecord templateRecord) {
+ templateFlowSet.put(templateRecord.getTemplateId(), templateRecord);
+ }
+
+ /**
+ * Update optional template flowset to the store.
+ * Add new optional template to store if it not exist,
+ * otherwise it will replace the existing optional template.
+ *
+ * @param templateFlowSet optional template flowset.
+ */
+ @Override
+ public void updateOptionalTemplateFlowSet(OptionalTemplateFlowSet templateFlowSet) {
+ optionalTemplateFlowSet.put(templateFlowSet.getTemplateId(), templateFlowSet);
+ }
+
+ /**
+ * Add data flow record to the store.
+ * Add new data flow record to store
+ *
+ * @param dataFlowRecord data flow record.
+ */
+ @Override
+ public void addDataFlowSet(DataFlowRecord dataFlowRecord) {
+ dataFlowSet.compute(dataFlowRecord.getTemplateId(),
+ (id, flowSet) -> {
+ List<DataFlowRecord> newSet = new ArrayList<>();
+ if (flowSet != null) {
+ newSet.addAll(flowSet);
+ }
+ newSet.add(dataFlowRecord);
+ return newSet;
+ });
+ }
+
+ /**
+ * Remove template flowset from store.
+ * it will remove template flowset which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ @Override
+ public void clearTemplateFlowSet(TemplateId templateId) {
+ if (templateFlowSet.containsKey(templateId)) {
+ templateFlowSet.remove(templateId);
+ }
+ }
+
+ /**
+ * Remove all template flowset from store.
+ * it will remove all template flowsets from store.
+ */
+ @Override
+ public void clearTemplateFlowSet() {
+ templateFlowSet.clear();
+ }
+
+ /**
+ * Remove optional template flowset from store.
+ * it will remove optional template which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ @Override
+ public void clearOptionalTemplateFlowSet(TemplateId templateId) {
+ if (optionalTemplateFlowSet.containsKey(templateId)) {
+ optionalTemplateFlowSet.remove(templateId);
+ }
+ }
+
+ /**
+ * Remove all optional template flowset from store.
+ * it will remove all optional template flowsets from store.
+ */
+ @Override
+ public void clearOptionalTemplateFlowSet() {
+ optionalTemplateFlowSet.clear();
+ }
+
+ /**
+ * Remove data flowset from store.
+ * it will remove dataflowset which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ @Override
+ public void clearDataFlowSet(TemplateId templateId) {
+ if (dataFlowSet.containsKey(templateId)) {
+ dataFlowSet.remove(templateId);
+ }
+ }
+
+ /**
+ * Remove all data flowset from store.
+ * it will remove all data flowsets from store.
+ */
+ @Override
+ public void clearDataFlowSet() {
+ dataFlowSet.clear();
+ }
+
+ /**
+ * Remove template, optional template and data flowsets from store.
+ * it will remove all flowsets from store.
+ */
+ @Override
+ public void clearAllFlowSet() {
+ templateFlowSet.clear();
+ optionalTemplateFlowSet.clear();
+ dataFlowSet.clear();
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
new file mode 100644
index 0000000..27eb8ea
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import java.util.Optional;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.SimpleChannelHandler;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.FlowSet;
+import org.onosproject.netflow.TemplateFlowSet;
+import org.onosproject.netflow.DataFlowSet;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.NetflowController;
+
+/**
+ * Channel handler deals with the netfow exporter connection and dispatches messages
+ * from netfow exporter to the appropriate locations.
+ */
+public class NeflowChannelHandler extends SimpleChannelHandler {
+
+ private static final Logger log = LoggerFactory.getLogger(NeflowChannelHandler.class);
+
+ private Channel channel;
+
+ private NetflowController controller;
+
+ /**
+ * Create a new netflow channelHandler instance.
+ *
+ * @param controller netflow controller.
+ */
+ NeflowChannelHandler(NetflowController controller) {
+ this.controller = controller;
+ }
+
+ /**
+ * Netflow channel connect to netflow exporter.
+ *
+ * @param ctx channel handler context
+ * @param event channel state event
+ * @throws Exception on error while connecting channel
+ */
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception {
+ channel = event.getChannel();
+ }
+
+ /**
+ * Netflow channel disconnect to netflow exporter.
+ *
+ * @param ctx channel handler context
+ * @param event channel state event
+ * @throws Exception on error while disconnecting channel
+ */
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent event) throws Exception {
+ channel = event.getChannel();
+ }
+
+ /**
+ * Netflow channel exception to netflow exporter.
+ *
+ * @param ctx channel handler context
+ * @param event channel exception event
+ * @throws Exception on error while parsing exception
+ */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) throws Exception {
+ //TODO exception handler
+ }
+
+ /**
+ * Netflow message receive from netflow exporter.
+ *
+ * @param ctx channel handler context
+ * @param event channel message event
+ * @throws Exception on error while parsing exception
+ */
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
+ try {
+ NetFlowPacket netFlowPacket = (NetFlowPacket) event.getMessage();
+ netFlowPacket.getFlowSets()
+ .stream()
+ .filter(n -> n.getType() == FlowSet.Type.TEMPLATE_FLOWSET)
+ .map(t -> (TemplateFlowSet) t)
+ .flatMap(t -> t.getRecords().stream())
+ .forEach(t -> controller.addTemplateFlowSet(t));
+
+ netFlowPacket.getFlowSets()
+ .stream()
+ .filter(n -> n.getType() == FlowSet.Type.DATA_FLOWSET)
+ .map(t -> (DataFlowSet) t)
+ .forEach(data -> {
+ Optional<DataTemplateRecord> template = controller
+ .getTemplateFlowSet(TemplateId.valueOf(data.getFlowSetId()));
+ if (!template.isPresent()) {
+ return;
+ }
+ try {
+ data.dataDeserializer(template.get());
+ data.getDataFlow()
+ .stream()
+ .forEach(dataflow -> controller.updateDataFlowSet(dataflow));
+ } catch (Exception ex) {
+ log.error("Netflow dataflow deserializer exception ", ex);
+ }
+ });
+ log.info("Netflow message received {}", netFlowPacket);
+ } catch (Exception er) {
+ log.error("Netflow message deserializer exception ", er);
+ }
+
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
new file mode 100644
index 0000000..bbe0c0f
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
@@ -0,0 +1,373 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+import com.google.common.base.MoreObjects;
+import java.util.Objects;
+
+import org.onlab.packet.Deserializer;
+import org.onlab.packet.BasePacket;
+import org.onosproject.netflow.FlowSet;
+
+import static com.google.common.base.Preconditions.checkState;
+
+
+/**
+ * An Netflow Packet consists of a Packet Header followed by one or more
+ * FlowSets. The FlowSets can be any of the possible three types:
+ * Template, Data, or Options Template.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public final class NetFlowPacket extends BasePacket {
+
+ /*
+ +--------+-------------------------------------------+
+ | | +----------+ +---------+ +----------+ |
+ | Packet | | Template | | Data | | Options | |
+ | Header | | FlowSet | | FlowSet | | Template | ... |
+ | | | | | | | FlowSet | |
+ | | +----------+ +---------+ +----------+ |
+ +--------+-------------------------------------------+
+
+ The Packet Header format is specified as:
+
+ 0 1 2 3
+ 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Version Number | Count |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | sysUpTime |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | UNIX Secs |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Sequence Number |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ | Source ID |
+ +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+
+*/
+
+
+ private int version;
+ private int count;
+ private long sysUptime;
+ private long timestamp;
+ private int flowSequence;
+ private int sourceId;
+ private List<FlowSet> flowSets;
+
+ private NetFlowPacket(Builder builder) {
+ this.version = builder.version;
+ this.count = builder.count;
+ this.sysUptime = builder.sysUptime;
+ this.timestamp = builder.timestamp;
+ this.flowSequence = builder.flowSequence;
+ this.sourceId = builder.sourceId;
+ this.flowSets = builder.flowSets;
+ }
+
+ /**
+ * Returns Version of Flow Record format exported in this packet.
+ *
+ * @return version number
+ */
+ public int getVersion() {
+ return version;
+ }
+
+ /**
+ * Returns total number of records in the Export Packet.
+ *
+ * @return total number of recoreds
+ */
+ public int getCount() {
+ return count;
+ }
+
+ /**
+ * Returns time in milliseconds since this device was first booted.
+ *
+ * @return system up time
+ */
+ public long getSysUptime() {
+ return sysUptime;
+ }
+
+ /**
+ * Returns packet timestamp.
+ * Time in seconds since 0000 UTC 1970, at which the Export Packet
+ * leaves the Exporter
+ *
+ * @return packet time stamp
+ */
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Returns flow packet sequence number.
+ * Incremental sequence counter of all Export Packets sent from
+ * the current Observation Domain by the Exporter. This value
+ * MUST be cumulative, and SHOULD be used by the Collector to
+ * identify whether any Export Packets have been missed.
+ *
+ * @return packet sequence number
+ */
+ public int getFlowSequence() {
+ return flowSequence;
+ }
+
+ /**
+ * Returns A 32-bit value that identifies the Exporter Observation Domain.
+ *
+ * @return exporter source id
+ */
+ public int getSourceId() {
+ return sourceId;
+ }
+
+ /**
+ * Returns list of flowsets.
+ *
+ * @return list of flowsets
+ */
+ public List<FlowSet> getFlowSets() {
+ return flowSets;
+ }
+
+ /**
+ * Deserializer function for netflow packets.
+ *
+ * @return deserializer function
+ */
+ public static Deserializer<NetFlowPacket> deserializer() {
+ return (data, offset, length) -> {
+ System.out.println(length);
+ ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+ Builder builder = new Builder();
+ builder.version(bb.getShort())
+ .count(bb.getShort())
+ .sysUptime(bb.getInt())
+ .timestamp(bb.getInt())
+ .flowSequence(bb.getInt())
+ .sourceId(bb.getInt());
+ while (bb.hasRemaining()) {
+
+ int flowSetId = bb.getShort();
+ int flowSetLength = bb.getShort();
+ bb.position(bb.position() - FlowSet.FLOW_SET_HEADER_LENTH);
+ byte[] flowSet;
+ if (bb.remaining() < flowSetLength) {
+ break;
+ }
+
+ flowSet = new byte[flowSetLength];
+ bb.get(flowSet);
+
+ builder.flowSet((FlowSet) FlowSet.Type.getType(flowSetId).getDecoder()
+ .deserialize(flowSet, 0, flowSetLength));
+ }
+ return builder.build();
+ };
+ }
+
+ @Override
+ public byte[] serialize() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 3;
+ hash = 59 * hash + this.version;
+ hash = 59 * hash + this.count;
+ hash = 59 * hash + (int) (this.timestamp ^ (this.timestamp >>> 32));
+ hash = 59 * hash + this.flowSequence;
+ hash = 59 * hash + this.sourceId;
+ hash = 59 * hash + Objects.hashCode(this.flowSets);
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final NetFlowPacket other = (NetFlowPacket) obj;
+ if (this.version != other.version) {
+ return false;
+ }
+ if (this.count != other.count) {
+ return false;
+ }
+ if (this.timestamp != other.timestamp) {
+ return false;
+ }
+ if (this.flowSequence != other.flowSequence) {
+ return false;
+ }
+ if (this.sourceId != other.sourceId) {
+ return false;
+ }
+ return Objects.equals(this.flowSets, other.flowSets);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("version", version)
+ .add("count", count)
+ .add("sysUptime", sysUptime)
+ .add("timestamp", timestamp)
+ .add("flowSequence", flowSequence)
+ .add("sourceId", sourceId)
+ .add("flowSets", flowSets)
+ .toString();
+ }
+
+ /**
+ * Builder for netflow packet.
+ */
+ private static class Builder {
+
+ private int version;
+ private int count;
+ private long sysUptime;
+ private long timestamp;
+ private int flowSequence;
+ private int sourceId;
+ private List<FlowSet> flowSets = new LinkedList<>();
+
+ /**
+ * Setter Version of Flow Record format exported in this packet.
+ *
+ * @param version number.
+ * @return this class builder.
+ */
+ public Builder version(int version) {
+ this.version = version;
+ return this;
+ }
+
+ /**
+ * Setter for total number of records in the Export Packet.
+ *
+ * @param count flow record count.
+ * @return this class builder.
+ */
+ public Builder count(int count) {
+ this.count = count;
+ return this;
+ }
+
+ /**
+ * Setter for time in milliseconds since this device was first booted.
+ *
+ * @param sysUptime system up time.
+ * @return this class builder.
+ */
+ public Builder sysUptime(long sysUptime) {
+ this.sysUptime = sysUptime;
+ return this;
+ }
+
+ /**
+ * Setter for packet timestamp.
+ *
+ * @param timestamp packet timestamp.
+ * @return this class builder.
+ */
+ public Builder timestamp(long timestamp) {
+ this.timestamp = timestamp;
+ return this;
+ }
+
+ /**
+ * Setter for flow sequence.
+ *
+ * @param flowSequence flow sequence.
+ * @return this class builder.
+ */
+ public Builder flowSequence(int flowSequence) {
+ this.flowSequence = flowSequence;
+ return this;
+ }
+
+ /**
+ * Setter for sourceId.
+ *
+ * @param sourceId exporter sourceid.
+ * @return this class builder.
+ */
+ public Builder sourceId(int sourceId) {
+ this.sourceId = sourceId;
+ return this;
+ }
+
+ /**
+ * Setter for list of flowsets.
+ *
+ * @param flowSets list of flowsets.
+ * @return this class builder.
+ */
+ public Builder flowSets(List<FlowSet> flowSets) {
+ this.flowSets = flowSets;
+ return this;
+ }
+
+ /**
+ * Setter for flowset.
+ *
+ * @param flowSet flowset.
+ * @return this class builder.
+ */
+ public Builder flowSet(FlowSet flowSet) {
+ this.flowSets.add(flowSet);
+ return this;
+ }
+
+ /**
+ * Checks arguments for netflow packet.
+ */
+ private void checkArguments() {
+ checkState(version != 0, "Invalid Version.");
+ checkState(count != 0, "Invalid record count.");
+ checkState(sysUptime != 0, "Invalid system up time.");
+ checkState(timestamp != 0, "Invalid flow timestamp.");
+ }
+
+ /**
+ * Builds Netflow packet.
+ *
+ * @return Netflowpacket.
+ */
+ public NetFlowPacket build() {
+ checkArguments();
+ return new NetFlowPacket(this);
+ }
+
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java
new file mode 100644
index 0000000..6d33f61
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import org.onosproject.netflow.NetflowController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import org.onosproject.netflow.NetflowStore;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.DataTemplateRecord;
+
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * It control and manage the netflow traffic.
+ * it is Collecting, Storing and analyzing NetFlow data
+ * it can help to understand which applications, and protocols
+ * may be consuming the most network bandwidth by tracking processes,
+ * protocols, times of day, and traffic routing.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+@Component(immediate = true, service = NetflowController.class)
+public class NetflowControllerImpl implements NetflowController {
+
+ private static final Logger log = LoggerFactory.getLogger(NetflowControllerImpl.class);
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected NetflowStore store;
+
+ @Activate
+ public void activate() {
+ Controller ctrl = new Controller(this);
+ ctrl.start();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ /**
+ * Add template flowset to controller.
+ * Add new template to controller if it not exist,
+ * otherwise it will replace the existing template.
+ *
+ * @param templateRecord template flowset record.
+ */
+ @Override
+ public void addTemplateFlowSet(DataTemplateRecord templateRecord) {
+ store.updateTemplateFlowSet(templateRecord);
+ }
+
+ /**
+ * Update data flowset to controller.
+ * it will update new data flowset to store.
+ *
+ * @param dataFlowRecord data flowset record.
+ */
+ @Override
+ public void updateDataFlowSet(DataFlowRecord dataFlowRecord) {
+ store.addDataFlowSet(dataFlowRecord);
+ }
+
+ /**
+ * Get template flowset from controller.
+ * it will fetch current template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of data template record, optional will be empty if template not found.
+ */
+ @Override
+ public Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId) {
+ return store.getTemplateFlowSet(templateId);
+ }
+
+ /**
+ * Get data flowset from controller.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return list of data flow record, list will be empty if template id not matched.
+ */
+ @Override
+ public List<DataFlowRecord> getDataFlowSet(TemplateId templateId) {
+ return store.getDataFlowSet(templateId);
+ }
+
+ /**
+ * Get all template flowsets from controller.
+ * it will fetch all templates from store.
+ *
+ * @return set of data template record, set will be empty if templates not found in the store.
+ */
+ @Override
+ public Set<DataTemplateRecord> getTemplateFlowSet() {
+ return store.getTemplateFlowSet();
+ }
+
+ /**
+ * Get data flowset from controller.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @return mapping from a template id to data flow record.
+ */
+ @Override
+ public Map<TemplateId, List<DataFlowRecord>> getDataFlowSet() {
+ return store.getDataFlowSet();
+ }
+
+}
\ No newline at end of file
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
new file mode 100644
index 0000000..6807a5c
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decode an netflow message from a Channel, for use in a netty pipeline.
+ */
+public class NetflowMessageDecoder extends FrameDecoder {
+
+ private static final Logger log = LoggerFactory.getLogger(NetflowMessageDecoder.class);
+
+ @Override
+ protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
+
+ try {
+ if (buffer.readableBytes() > 0) {
+ byte[] bytes = new byte[buffer.readableBytes()];
+ buffer.readBytes(bytes);
+ ctx.setAttachment(null);
+ return NetFlowPacket.deserializer().deserialize(bytes, 0, bytes.length);
+ }
+ } catch (Exception e) {
+ log.error("Netflow message decode error ", e);
+ buffer.resetReaderIndex();
+ buffer.discardReadBytes();
+
+ }
+ return null;
+ }
+
+}
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
new file mode 100644
index 0000000..9f5ef2e
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.onosproject.netflow.NetflowController;
+
+/**
+ * Creates a ChannelPipeline for a server-side netflow message channel.
+ */
+public class NetflowPipelineFactory implements ChannelPipelineFactory {
+
+ private NetflowController controller;
+
+ /**
+ * Constructor to initialize the values.
+ *
+ * @param controller netflow controller.
+ */
+ public NetflowPipelineFactory(NetflowController controller) {
+ super();
+ this.controller = controller;
+ }
+
+ /**
+ * Get server-side pipe line channel.
+ *
+ * @return ChannelPipeline server-side pipe line channel
+ * @throws Exception on while getting pipe line
+ */
+ @Override
+ public ChannelPipeline getPipeline() throws Exception {
+ NeflowChannelHandler handler = new NeflowChannelHandler(controller);
+
+ ChannelPipeline pipeline = Channels.pipeline();
+ pipeline.addLast("netflowmessagedecoder", new NetflowMessageDecoder());
+ pipeline.addLast("ActiveHandler", handler);
+ return pipeline;
+ }
+
+}
+
diff --git a/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/package-info.java b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/package-info.java
new file mode 100644
index 0000000..bd4bf71
--- /dev/null
+++ b/apps/ipflow-monitor/netflow/app/src/main/java/org/onosproject/netflow/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Implementation classes for sample application that assigns and manages netflow collector.
+ */
+package org.onosproject.netflow.impl;
\ No newline at end of file
diff --git a/apps/ipflow-monitor/netflow/placeholder b/apps/ipflow-monitor/netflow/placeholder
deleted file mode 100644
index e69de29..0000000
--- a/apps/ipflow-monitor/netflow/placeholder
+++ /dev/null