Netflow store and cli implementation
Implemented netflow version 9 of netflow collector store and cli.
Reference : https://datatracker.ietf.org/doc/html/rfc3954
Change-Id: I0b2ec932c28bc3b36042c7a725a41956cc859fcd
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowRecord.java
similarity index 95%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowRecord.java
index 8dd434c..7f29f43 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowRecord.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
import java.nio.ByteBuffer;
import java.util.LinkedList;
@@ -25,12 +25,6 @@
import com.google.common.base.MoreObjects;
-import org.onosproject.netflow.DataRecord;
-import org.onosproject.netflow.TemplateId;
-import org.onosproject.netflow.Flow;
-import org.onosproject.netflow.FlowTemplateField;
-import org.onosproject.netflow.DataDeserializer;
-
import static com.google.common.base.Preconditions.checkNotNull;
@@ -118,7 +112,7 @@
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("templateId", templateId)
- .add("flows", flows)
+ .add("\n\nflows", flows)
.toString();
}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowSet.java
similarity index 97%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowSet.java
index 7183915..b569387 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataFlowSet.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
import java.nio.ByteBuffer;
import java.util.LinkedList;
@@ -161,6 +161,7 @@
.add("flowSetId", flowSetId)
.add("length", length)
.add("data", data)
+ .add("dataFlow", dataFlow)
.toString();
}
@@ -206,7 +207,7 @@
byte[] dataRecord = new byte[template.getValueLength()];
bb.get(dataRecord);
this.setDataFlow(DataFlowRecord.deserializer().deserialize(
- data, 0, template.getValueLength(), template));
+ dataRecord, 0, template.getValueLength(), template));
}
}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataTemplateRecord.java
similarity index 88%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/DataTemplateRecord.java
index dc98531..4230a10 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/DataTemplateRecord.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
import java.nio.ByteBuffer;
import java.util.LinkedList;
@@ -28,8 +28,6 @@
import com.google.common.base.MoreObjects;
import org.onlab.packet.Deserializer;
-import org.onosproject.netflow.TemplateId;
-import org.onosproject.netflow.FlowTemplateField;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
@@ -117,6 +115,37 @@
.toString();
}
+
+ @Override
+ public int hashCode() {
+ int hash = 3;
+ hash = 79 * hash + Objects.hashCode(this.templateId);
+ hash = 79 * hash + this.filedCount;
+ hash = 79 * hash + Objects.hashCode(this.fields);
+ return hash;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ final DataTemplateRecord other = (DataTemplateRecord) obj;
+ if (this.filedCount != other.filedCount) {
+ return false;
+ }
+ if (!Objects.equals(this.templateId, other.templateId)) {
+ return false;
+ }
+ return Objects.equals(this.fields, other.fields);
+ }
+
/**
* Data deserializer function for data template record.
*
diff --git a/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowField.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowField.java
index 207903c..fcaa120 100644
--- a/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowField.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowField.java
@@ -18,18 +18,24 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Map;
+import java.util.Set;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
import static org.onosproject.netflow.NetflowUtils.VAR_INT_LONG;
import static org.onosproject.netflow.NetflowUtils.VAR_BYTE;
import static org.onosproject.netflow.NetflowUtils.NULL;
import static org.onosproject.netflow.NetflowUtils.VAR_SHORT;
-import static org.onosproject.netflow.NetflowUtils.VAR_IP_ADDRESS;
+import static org.onosproject.netflow.NetflowUtils.VAR_IPV4_ADDRESS;
+import static org.onosproject.netflow.NetflowUtils.VAR_IPV6_ADDRESS;
import static org.onosproject.netflow.NetflowUtils.VAR_SHORT_INT;
import static org.onosproject.netflow.NetflowUtils.VAR_INT;
import static org.onosproject.netflow.NetflowUtils.VAR_MAC;
+import static org.onosproject.netflow.NetflowUtils.VAR_UNSIGNED_BYTE;
+import static org.onosproject.netflow.NetflowUtils.VAR_UNSIGNED_INT;
+import static org.onosproject.netflow.NetflowUtils.VAR_UNSIGNED_SHORT;;
/**
* The flow fields are a selection of Packet Header
@@ -44,34 +50,34 @@
* https://www.ietf.org/rfc/rfc3954.txt
* Section :- Field Type Definitions.
*/
- IN_BYTES(1, VAR_INT_LONG),
- IN_PKTS(2, VAR_INT_LONG),
+ IN_BYTES(1, VAR_UNSIGNED_INT),
+ IN_PKTS(2, VAR_UNSIGNED_INT),
FLOWS(3, VAR_INT_LONG),
PROTOCOL(4, VAR_BYTE),
SRC_TOS(5, VAR_BYTE),
TCP_FLAGS(6, VAR_BYTE),
- L4_SRC_PORT(7, VAR_SHORT),
- IPV4_SRC_ADDR(8, VAR_IP_ADDRESS),
+ L4_SRC_PORT(7, VAR_UNSIGNED_SHORT),
+ IPV4_SRC_ADDR(8, VAR_IPV4_ADDRESS),
SRC_MASK(9, VAR_BYTE),
- INPUT_SNMP(10, VAR_SHORT_INT),
- L4_DST_PORT(11, VAR_SHORT),
- IPV4_DST_ADDR(12, VAR_IP_ADDRESS),
+ INPUT_SNMP(10, VAR_UNSIGNED_SHORT),
+ L4_DST_PORT(11, VAR_UNSIGNED_SHORT),
+ IPV4_DST_ADDR(12, VAR_IPV4_ADDRESS),
DST_MASK(13, VAR_BYTE),
- OUTPUT_SNMP(14, VAR_SHORT_INT),
- IPV4_NEXT_HOP(15, VAR_IP_ADDRESS),
+ OUTPUT_SNMP(14, VAR_UNSIGNED_SHORT),
+ IPV4_NEXT_HOP(15, VAR_IPV4_ADDRESS),
SRC_AS(16, VAR_SHORT_INT),
DST_AST(17, VAR_SHORT_INT),
- BGP_IPV4_NEXT_HOP(18, VAR_IP_ADDRESS),
+ BGP_IPV4_NEXT_HOP(18, VAR_IPV4_ADDRESS),
MUL_DST_PKTS(19, VAR_INT_LONG),
MUL_DST_BYTES(20, VAR_INT_LONG),
- LAST_SWITCHED(21, VAR_INT),
- FIRST_SWITCHED(22, VAR_INT),
+ LAST_SWITCHED(21, VAR_UNSIGNED_INT),
+ FIRST_SWITCHED(22, VAR_UNSIGNED_INT),
OUT_BYTES(23, VAR_INT_LONG),
OUT_PKTS(24, VAR_INT_LONG),
MIN_PKT_LNGTH(25, NULL),
MAX_PKT_LNGTH(26, NULL),
- IPV6_SRC_ADDR(27, VAR_IP_ADDRESS),
- IPV6_DST_ADDR(28, VAR_IP_ADDRESS),
+ IPV6_SRC_ADDR(27, VAR_IPV6_ADDRESS),
+ IPV6_DST_ADDR(28, VAR_IPV6_ADDRESS),
IPV6_SRC_MASK(29, VAR_BYTE),
IPV6_DST_MASK(30, VAR_BYTE),
IPV6_FLOW_LABEL(31, NULL),
@@ -89,7 +95,7 @@
IPV4_SRC_PREFIX(44, NULL),
IPV4_DST_PREFIX(45, NULL),
MPLS_TOP_LABEL_TYPE(46, VAR_BYTE),
- MPLS_TOP_LABEL_IP_ADDR(47, VAR_IP_ADDRESS),
+ MPLS_TOP_LABEL_IP_ADDR(47, VAR_IPV4_ADDRESS),
FLOW_SAMPLER_ID(48, VAR_BYTE),
FLOW_SAMPLER_MODE(49, VAR_BYTE),
FLOW_SAMPLER_RANDOM_INTERVAL(50, NULL),
@@ -101,10 +107,10 @@
OUT_DST_MAC(57, VAR_MAC),
SRC_VLAN(58, VAR_SHORT),
DST_VLAN(59, VAR_SHORT),
- IP_PROTOCOL_VERSION(60, VAR_BYTE),
+ IP_PROTOCOL_VERSION(60, VAR_UNSIGNED_BYTE),
DIRECTION(61, VAR_BYTE),
- IPV6_NEXT_HOP(62, VAR_IP_ADDRESS),
- BPG_IPV6_NEXT_HOP(63, VAR_IP_ADDRESS),
+ IPV6_NEXT_HOP(62, VAR_IPV6_ADDRESS),
+ BPG_IPV6_NEXT_HOP(63, VAR_IPV6_ADDRESS),
IPV6_OPTION_HEADERS(64, VAR_INT),
MPLS_LABEL_1(70, NULL),
MPLS_LABEL_2(71, NULL),
@@ -139,6 +145,7 @@
LAYER2_PACKET_SECTION_SIZE(103, NULL),
LAYER2_PACKET_SECTION_DATA(104, NULL);
+
final int fieldID;
final BiFunction<ByteBuffer, Integer, Object> parser;
private static Map<Integer, FlowField> fields = new ConcurrentHashMap<>();
@@ -158,6 +165,12 @@
.map(id -> fields.get(id));
}
+ public static Set<String> getAllFlowField() {
+ return fields.values().stream()
+ .map(f -> f.name())
+ .collect(Collectors.toSet());
+ }
+
public BiFunction<ByteBuffer, Integer, Object> getParser() {
return this.parser;
}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowSet.java
similarity index 98%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/FlowSet.java
index 1b292a7..e6c5a50 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/FlowSet.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
import java.util.Arrays;
import java.util.Map;
diff --git a/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowController.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowController.java
new file mode 100644
index 0000000..91b3b62
--- /dev/null
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowController.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.Map;
+
+/**
+ * It control and manage the netflow traffic.
+ * it is Collecting, Storing and analyzing NetFlow data
+ * it can help to understand which applications, and protocols
+ * may be consuming the most network bandwidth by tracking processes,
+ * protocols, times of day, and traffic routing.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+public interface NetflowController {
+
+ /**
+ * Add template flowset to controller.
+ * Add new template to controller if it not exist,
+ * otherwise it will replace the existing template.
+ *
+ * @param templateRecord template flowset record.
+ */
+ void addTemplateFlowSet(DataTemplateRecord templateRecord);
+
+ /**
+ * Update data flowset to controller.
+ * it will update new data flowset to store.
+ *
+ * @param dataFlowRecord data flowset record.
+ */
+ void updateDataFlowSet(DataFlowRecord dataFlowRecord);
+
+ /**
+ * Get template flowset from controller.
+ * it will fetch current template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of data template record, optional will be empty if template not found.
+ */
+ Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId);
+
+ /**
+ * Get data flowset from controller.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return list of data flow record, list will be empty if template id not matched.
+ */
+ List<DataFlowRecord> getDataFlowSet(TemplateId templateId);
+
+ /**
+ * Get all template flowsets from controller.
+ * it will fetch all templates from store.
+ *
+ * @return set of data template record, set will be empty if templates not found in the store.
+ */
+ Set<DataTemplateRecord> getTemplateFlowSet();
+
+ /**
+ * Get data flowset from controller.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @return mapping from a template id to data flow record.
+ */
+ Map<TemplateId, List<DataFlowRecord>> getDataFlowSet();
+}
diff --git a/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowStore.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowStore.java
new file mode 100644
index 0000000..5dc5719
--- /dev/null
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowStore.java
@@ -0,0 +1,154 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * Manages inventory of Netflow template and data flowset to distribute
+ * information.
+ */
+public interface NetflowStore {
+
+ /**
+ * Get template flowset from store.
+ * it will fetch current template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of data template record, optional will be empty if template not found.
+ */
+ Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId);
+
+ /**
+ * Get set of template flowsets from store.
+ * it will fetch all template flowsets from store.
+ *
+ * @return set of data template record, set will be empty if templates not found in the store.
+ */
+ Set<DataTemplateRecord> getTemplateFlowSet();
+
+ /**
+ * Get optional template flowset from store.
+ * it will fetch current optional template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of optional template flowset, optional will be empty if template not found.
+ */
+ Optional<OptionalTemplateFlowSet> getOptionalTemplateFlowSet(TemplateId templateId);
+
+ /**
+ * Get set of optional template flowsets from store.
+ * it will fetch all optional template flowsets from store.
+ *
+ * @return set of optional template flowsets, set will be empty if templates not found in the store.
+ */
+ Set<OptionalTemplateFlowSet> getOptionalTemplateFlowSet();
+
+ /**
+ * Get data flowset from store.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return list of data flow record, list will be empty if template id not matched.
+ */
+ List<DataFlowRecord> getDataFlowSet(TemplateId templateId);
+
+ /**
+ * Get data flowset from store.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @return mapping from a template id to data flow record.
+ */
+ Map<TemplateId, List<DataFlowRecord>> getDataFlowSet();
+
+ /**
+ * Update template flowset to the store.
+ * Add new template to store if it not exist,
+ * otherwise it will replace the existing template.
+ *
+ * @param templateRecord template flowset record.
+ */
+ void updateTemplateFlowSet(DataTemplateRecord templateRecord);
+
+ /**
+ * Update optional template flowset to the store.
+ * Add new optional template to store if it not exist,
+ * otherwise it will replace the existing optional template.
+ *
+ * @param optionalTemplateFlowSet optional template flowset.
+ */
+ void updateOptionalTemplateFlowSet(OptionalTemplateFlowSet optionalTemplateFlowSet);
+
+ /**
+ * Add data flow record to the store.
+ * Add new data flow record to store
+ *
+ * @param dataFlowRecord data flow record.
+ */
+ void addDataFlowSet(DataFlowRecord dataFlowRecord);
+
+ /**
+ * Remove template flowset from store.
+ * it will remove template flowset which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ void clearTemplateFlowSet(TemplateId templateId);
+
+ /**
+ * Remove all template flowset from store.
+ * it will remove all template flowsets from store.
+ */
+ void clearTemplateFlowSet();
+
+ /**
+ * Remove optional template flowset from store.
+ * it will remove optional template which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ void clearOptionalTemplateFlowSet(TemplateId templateId);
+
+ /**
+ * Remove all optional template flowset from store.
+ * it will remove all optional template flowsets from store.
+ */
+ void clearOptionalTemplateFlowSet();
+
+ /**
+ * Remove data flowset from store.
+ * it will remove dataflowset which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ void clearDataFlowSet(TemplateId templateId);
+
+ /**
+ * Remove all data flowset from store.
+ * it will remove all data flowsets from store.
+ */
+ void clearDataFlowSet();
+
+ /**
+ * Remove template, optional template and data flowsets from store.
+ * it will remove all flowsets from store.
+ */
+ void clearAllFlowSet();
+
+}
\ No newline at end of file
diff --git a/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowUtils.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowUtils.java
index 7e75dc7..ebadd3d 100644
--- a/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowUtils.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/NetflowUtils.java
@@ -15,10 +15,9 @@
*/
package org.onosproject.netflow;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.function.BiFunction;
+import org.onlab.packet.IpAddress;
/**
@@ -32,13 +31,38 @@
return bb.get();
};
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_UNSIGNED_BYTE = (bb, len) -> {
+ return getUnsignedByte(bb);
+ };
+
public static final BiFunction<ByteBuffer, Integer, Object> VAR_SHORT = (bb, len) -> {
return bb.getShort();
};
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_UNSIGNED_SHORT = (bb, len) -> {
+ return asUnsignedShort(bb);
+ };
+
+ public static short getUnsignedByte(ByteBuffer bb) {
+ return ((short) (bb.get() & 0xFF));
+ }
+
+ public static int asUnsignedShort(ByteBuffer bb) {
+ return bb.getShort() & 0xFFFF;
+ }
+
+ public static long getUnsignedInt(ByteBuffer bb) {
+ return ((long) bb.getInt() & 0xFFFFFFFFL);
+ }
+
public static final BiFunction<ByteBuffer, Integer, Object> VAR_INT = (bb, len) -> {
return bb.getInt();
};
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_UNSIGNED_INT = (bb, len) -> {
+ return getUnsignedInt(bb);
+ };
+
public static final BiFunction<ByteBuffer, Integer, Object> VAR_FLOAT = (bb, len) -> {
return bb.getFloat();
};
@@ -52,6 +76,12 @@
return null;
};
+ public static final BiFunction<ByteBuffer, Integer, byte[]> BYTE_ARRAY = (bb, len) -> {
+ byte[] bytes = new byte[len];
+ bb.get(bytes);
+ return bytes;
+ };
+
public static final BiFunction<ByteBuffer, Integer, Object> VAR_INT_LONG = (bb, len) -> {
if (len == 4) {
return bb.getInt();
@@ -66,8 +96,14 @@
return bb.getInt();
};
- public static final BiFunction<ByteBuffer, Integer, Object> VAR_IP_ADDRESS = (bb, len) -> {
- return toInetAddress(bb, len);
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_IPV4_ADDRESS = (bb, len) -> {
+ byte[] value = getByteArray(bb, len);
+ return IpAddress.valueOf(IpAddress.Version.INET, value).toInetAddress().getHostAddress();
+ };
+
+ public static final BiFunction<ByteBuffer, Integer, Object> VAR_IPV6_ADDRESS = (bb, len) -> {
+ byte[] value = getByteArray(bb, len);
+ return IpAddress.valueOf(IpAddress.Version.INET6, value).toInetAddress().getHostAddress();
};
public static final BiFunction<ByteBuffer, Integer, Object> VAR_MAC = (bb, len) -> {
@@ -77,18 +113,10 @@
return null;
};
- public static InetAddress toInetAddress(ByteBuffer bb, int length) {
- byte[] address = new byte[length];
- bb.get(address);
-
- InetAddress ipAddress = null;
- try {
- ipAddress = InetAddress.getByAddress(address);
- } catch (UnknownHostException e) {
- throw new IllegalArgumentException("Invalid host buffer");
- }
-
- return ipAddress;
+ private static byte[] getByteArray(ByteBuffer bb, int length) {
+ byte[] bytes = new byte[length];
+ bb.get(bytes);
+ return bytes;
}
}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/OptionalTemplateFlowSet.java
similarity index 87%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/OptionalTemplateFlowSet.java
index 9a00bb2..068a814 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/OptionalTemplateFlowSet.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
import java.util.List;
@@ -53,6 +53,8 @@
private int length;
+ private TemplateId templateId;
+
private List<DataTemplateRecord> records;
/**
@@ -90,6 +92,20 @@
}
/**
+ * Returns template record's template id.
+ * Template Records is given a unique Template ID.
+ * This uniqueness is local to the Observation
+ * Domain that generated the Template ID. Template IDs 0-255 are
+ * reserved for Template FlowSets, Options FlowSets, and other
+ * reserved FlowSets yet to be created.
+ *
+ * @return list of flowsets
+ */
+ public TemplateId getTemplateId() {
+ return templateId;
+ }
+
+ /**
* Returns list of optional data template records.
*
* @return list of optional data template records
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateFlowSet.java
similarity index 95%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateFlowSet.java
index b6bc51e..6ba1d66 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateFlowSet.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
import java.nio.ByteBuffer;
import java.util.LinkedList;
@@ -26,8 +26,6 @@
import org.onlab.packet.DeserializationException;
import org.onlab.packet.Deserializer;
-import static com.google.common.base.Preconditions.checkState;
-
/**
* One of the essential elements in the NetFlow format is the Template
* FlowSet. Templates greatly enhance the flexibility of the Flow
@@ -256,21 +254,11 @@
}
/**
- * Checks arguments for template flow set.
- */
- private void checkArguments() {
- checkState(flowSetId == 0, "Invalid flow set id.");
- checkState(length == 0, "Invalid flow set length.");
-
- }
-
- /**
* Builds template flow set.
*
* @return template flow set.
*/
public TemplateFlowSet build() {
- checkArguments();
return new TemplateFlowSet(this);
}
diff --git a/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateId.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateId.java
index bf6f4a2..8ec8d07 100644
--- a/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateId.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateId.java
@@ -15,7 +15,7 @@
*/
package org.onosproject.netflow;
-import com.google.common.base.MoreObjects;
+import org.onlab.util.Identifier;
/**
* Template Records is given a unique Template ID.
@@ -26,23 +26,34 @@
* FlowSets are numbered from 256 to 65535.
* Ref: https://www.ietf.org/rfc/rfc3954.txt
*/
-public class TemplateId {
+public class TemplateId extends Identifier<Integer> {
- private int id;
-
+ /**
+ * Default constructor.
+ *
+ * @param id template id.
+ */
public TemplateId(int id) {
- this.id = id;
+ super(id);
}
+ /**
+ * Get the value of the template id.
+ *
+ * @return the value of the template id.
+ */
public int getId() {
- return id;
+ return identifier;
}
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("id", id)
- .toString();
+ /**
+ * Get the value of the templateid object.
+ *
+ * @param id template id.
+ * @return the value of the templateid object.
+ */
+ public static TemplateId valueOf(int id) {
+ return new TemplateId(id);
}
}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateRecord.java
similarity index 91%
rename from apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java
rename to apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateRecord.java
index db66292..744db66 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java
+++ b/apps/netflow/api/src/main/java/org/onosproject/netflow/TemplateRecord.java
@@ -13,11 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.netflow.impl;
+package org.onosproject.netflow;
import org.onlab.packet.BasePacket;
-import org.onosproject.netflow.FlowRecord;
/**
* A Template Record defines the structure and interpretation of fields.
* in a Flow Data Record.
diff --git a/apps/netflow/app/BUILD b/apps/netflow/app/BUILD
index 747d61d..c1a4601 100644
--- a/apps/netflow/app/BUILD
+++ b/apps/netflow/app/BUILD
@@ -1,4 +1,4 @@
-COMPILE_DEPS = CORE_DEPS + [
+COMPILE_DEPS = CORE_DEPS + KRYO + CLI + [
"//core/store/serializers:onos-core-serializers",
"//apps/netflow/api:onos-apps-netflow-api",
"@io_netty_netty_common//jar",
@@ -6,5 +6,6 @@
]
osgi_jar(
+ karaf_command_packages = ["org.onosproject.netflow.cli"],
deps = COMPILE_DEPS,
)
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java
new file mode 100644
index 0000000..760d40b
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.api.console.Completer;
+import org.apache.karaf.shell.api.console.CommandLine;
+import org.apache.karaf.shell.api.console.Session;
+import org.apache.karaf.shell.support.completers.StringsCompleter;
+
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Flow field completer.
+ */
+@Service
+public class FlowFieldCompleter implements Completer {
+
+ @Override
+ public int complete(Session session, CommandLine commandLine, List<String> candidates) {
+
+ StringsCompleter delegate = new StringsCompleter();
+ Set<String> flowfields = FlowField.getAllFlowField();
+ SortedSet<String> strings = delegate.getStrings();
+ for (String field : flowfields) {
+ strings.add(field);
+ }
+ return delegate.complete(session, commandLine, candidates);
+ }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java
new file mode 100644
index 0000000..7a5db0d
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Map;
+import java.util.Collection;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+
+/**
+ * Lists all netflow data flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-dataflow",
+ description = "Lists all data flowsets received from netflow exporter.")
+public class NetflowDataflowCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "templateid",
+ description = "Data flowset template id",
+ required = false, multiValued = false)
+ int templateID = 0;
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+
+ if (templateID < 0) {
+ print("Invalid template ID");
+ return;
+ }
+ if (templateID > 0) {
+ List<DataFlowRecord> dataFlowSet = controller.getDataFlowSet(TemplateId.valueOf(templateID));
+ if (!dataFlowSet.isEmpty()) {
+ dataFlowSet.stream().forEach(data -> printDataflowSet(data));
+ } else {
+ print("Default template not found");
+ }
+ } else {
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Default template not found");
+ } else {
+ dataFlowSets.values().stream().flatMap(Collection::stream).forEach(data -> printDataflowSet(data));
+ }
+ }
+ }
+
+ /**
+ * Adds data flowset details in specified row wise.
+ *
+ * @param dataFlowRecord data flowset record.
+ */
+ private void printDataflowSet(DataFlowRecord dataFlowRecord) {
+ print("Template ID : %d", dataFlowRecord.getTemplateId().getId());
+ dataFlowRecord.getFlows().forEach(dataflow -> {
+ print("Field : %s, Value : %s", dataflow.getField().name(),
+ dataflow.getValue().toString());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java
new file mode 100644
index 0000000..c23acdf
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.DataFlowRecord;
+
+/**
+ * Netflow overall summary report.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-summary",
+ description = "Summary of template flowsets and data flowsets.")
+public class NetflowSummaryCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ Set<DataTemplateRecord> templates = controller.getTemplateFlowSet();
+ if (templates.isEmpty()) {
+ print("Template not found");
+ } else {
+ Set<Integer> templateIds = templates.stream()
+ .map(DataTemplateRecord::getTemplateId)
+ .map(TemplateId::getId)
+ .collect(Collectors.toSet());
+ printTemplateflowSet(templateIds);
+
+ }
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Data not found");
+ } else {
+ printDataflowSet(dataFlowSets);
+ }
+ }
+
+ /**
+ * Adds template flowset summary in specified row wise.
+ *
+ * @param templateIds template ids
+ */
+ private void printTemplateflowSet(Set<Integer> templateIds) {
+ print("Number of templates : %d, Template IDs : %s",
+ templateIds.size(),
+ templateIds.toString());
+ }
+
+ /**
+ * Adds data flowset summary in specified row wise.
+ *
+ * @param dataflowMap data flow map
+ */
+ private void printDataflowSet(Map<TemplateId, List<DataFlowRecord>> dataflowMap) {
+ dataflowMap.entrySet().forEach(entry -> {
+ print("Template ID : %d, Data flow count : %d",
+ entry.getKey().getId(),
+ entry.getValue().size());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java
new file mode 100644
index 0000000..abfbb73
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.Set;
+import java.util.Optional;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataTemplateRecord;
+
+/**
+ * Lists all netflow template flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-template",
+ description = "Lists all template flowsets received from netflow exporter.")
+public class NetflowTemplateCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "templateID",
+ description = "Netflow template ID",
+ required = false, multiValued = false)
+ int templateID = 0;
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ if (templateID < 0) {
+ print("Invalid template ID");
+ return;
+ }
+ if (templateID > 0) {
+ Optional<DataTemplateRecord> template = controller.getTemplateFlowSet(TemplateId.valueOf(templateID));
+ if (template.isPresent()) {
+ printTemplate(template.get());
+ } else {
+ print("Default template not found");
+ }
+ } else {
+ Set<DataTemplateRecord> templates = controller.getTemplateFlowSet();
+ if (templates.isEmpty()) {
+ print("Default template not found");
+ } else {
+ templates.stream().forEach(template -> printTemplate(template));
+ }
+ }
+ }
+
+ /**
+ * Adds template flowset details in specified row wise.
+ *
+ * @param template template flow set.
+ */
+ private void printTemplate(DataTemplateRecord template) {
+ print("Template ID : %d, Field Count : %d",
+ template.getTemplateId().getId(),
+ template.getFiledCount());
+ template.getFields().forEach(flowTemplateField -> {
+ print("Field : %s, Length : %d",
+ flowTemplateField.getFlowField().name(),
+ flowTemplateField.getLength());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java
new file mode 100644
index 0000000..51cefa1
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Optional;
+import java.util.Map;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Lists all filtered data flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-traffic-filter",
+ description = "Lists all filtered data flowsets received from netflow exporter.")
+public class NetflowTrafficFilterCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "field", description = "flow field",
+ required = false, multiValued = false)
+ @Completion(FlowFieldCompleter.class)
+ protected String field = null;
+
+ @Argument(index = 1, name = "value", description = "flow value",
+ required = false, multiValued = false)
+ protected String value = null;
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Data not found");
+ return;
+ }
+ dataFlowSets.values()
+ .stream()
+ .flatMap(Collection::stream)
+ .filter(df -> {
+ Optional<Object> fieldValue = getFieldValue(df.getFlows(), field);
+ if (fieldValue.isPresent() && fieldValue.toString().equals(value)) {
+ return true;
+ }
+ return false;
+ })
+ .forEach(df -> prinDataflowSet(df));
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field.
+ *
+ * @param flows collection of flows
+ * @param field flow field
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, String field) {
+ FlowField flowField = FlowField.valueOf(field);
+ return flows.stream()
+ .filter(flow -> flow.getField() == flowField)
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field predicates.
+ *
+ * @param flows collection of flows
+ * @param field flow field predicates
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, Predicate<FlowField> field) {
+ return flows.stream()
+ .filter(flow -> field.test(flow.getField()))
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+
+ /**
+ * Adds data flowset record details in specified row wise.
+ *
+ * @param dataFlowRecord data flowset record.
+ */
+ private void prinDataflowSet(DataFlowRecord dataFlowRecord) {
+ print("Template ID : %d", dataFlowRecord.getTemplateId().getId());
+ dataFlowRecord.getFlows().forEach(dataflow -> {
+ print("Field : %s, Value : %s",
+ dataflow.getField().name(),
+ dataflow.getValue().toString());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java
new file mode 100644
index 0000000..fec9ba7
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Optional;
+import java.util.Map;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import java.util.HashMap;
+import java.util.function.Predicate;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Flow traffic summary report.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-traffic-summary",
+ description = "Summary of data flowset records based on in/out bytes/packets, protocols or applications.")
+public class NetflowTrafficSummaryCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Data not found");
+ return;
+ }
+
+ Map<String, List<DataFlowRecord>> ds = dataFlowSets.values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.groupingBy(d -> hostWiseFlow(d.getFlows())));
+ printHostTraffic(ds);
+
+ Map<String, Map<String, List<DataFlowRecord>>> protocol = new HashMap<>();
+ ds.entrySet().forEach(entry -> {
+ protocol.put(entry.getKey(), protocolWiseFlow(entry.getValue()));
+ });
+ printPrtocolTraffic(protocol);
+
+ Map<String, Map<String, List<DataFlowRecord>>> application = new HashMap<>();
+ ds.entrySet().forEach(entry -> {
+ application.put(entry.getKey(), applicationWiseFlow(entry.getValue()));
+ });
+ printApplicationTraffic(application);
+ }
+
+ /**
+ * Host wise flow filter.
+ * ipv4 and ipv6 host flow fields will be filtered and flow direction
+ * added from src ip to dst ip
+ *
+ * @param flows collection of flows
+ */
+ private String hostWiseFlow(List<Flow> flows) {
+ Predicate<FlowField> srcIpParser = f -> (f.equals(FlowField.IPV4_SRC_ADDR) ||
+ f.equals(FlowField.IPV6_SRC_ADDR));
+ Predicate<FlowField> dstIpParser = f -> (f.equals(FlowField.IPV4_DST_ADDR) ||
+ f.equals(FlowField.IPV6_DST_ADDR));
+ String srcIp = getFieldValue(flows, srcIpParser).get().toString();
+ String dstIp = getFieldValue(flows, dstIpParser).get().toString();
+ return srcIp + " => " + dstIp;
+ }
+
+ /**
+ * Summary of all ingress bytes.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private int totalInBytes(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .map(data -> inBytes(data.getFlows()))
+ .collect(Collectors.summingInt(Integer::intValue));
+ }
+
+ /**
+ * Summary of all ingress packets.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private int totalInPackets(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .map(data -> inPackets(data.getFlows()))
+ .collect(Collectors.summingInt(Integer::intValue));
+ }
+
+ /**
+ * Application protocol wise flow filter.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private Map<String, List<DataFlowRecord>> applicationWiseFlow(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .collect(Collectors.groupingBy(d -> getFieldValue(d.getFlows(),
+ FlowField.L4_DST_PORT).get().toString()));
+
+ }
+
+ /**
+ * Transport protocol wise flow filter.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private Map<String, List<DataFlowRecord>> protocolWiseFlow(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .collect(Collectors.groupingBy(d -> getFieldValue(d.getFlows(), FlowField.PROTOCOL).get().toString()));
+
+ }
+
+ /**
+ * Filter ingress bytes from flows and type cast to integer.
+ *
+ * @param flows collection of flows.
+ */
+ private int inBytes(List<Flow> flows) {
+ return Integer.parseInt(getFieldValue(flows, FlowField.IN_BYTES).get().toString());
+ }
+
+ /**
+ * Filter ingress packets from flows and type cast to integer.
+ *
+ * @param flows collection of flows.
+ */
+ private int inPackets(List<Flow> flows) {
+ return Integer.parseInt(getFieldValue(flows, FlowField.IN_PKTS).get().toString());
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field.
+ *
+ * @param flows collection of flows
+ * @param field flow field
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, FlowField field) {
+ return flows.stream()
+ .filter(flow -> flow.getField() == field)
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field predicates.
+ *
+ * @param flows collection of flows
+ * @param field flow field predicates
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, Predicate<FlowField> field) {
+ return flows.stream()
+ .filter(flow -> field.test(flow.getField()))
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+ /**
+ * Adds host wise traffic summary in specified row wise.
+ *
+ * @param hosts mapping of host and traffic details.
+ */
+ private void printHostTraffic(Map<String, List<DataFlowRecord>> hosts) {
+ print("\nHost wise traffic flow");
+ hosts.entrySet().forEach(entry -> {
+ print("Traffic flow : %s, Total bytes : %d, Total packets : %d",
+ entry.getKey(),
+ totalInBytes(entry.getValue()),
+ totalInPackets(entry.getValue()));
+ });
+ print("\n");
+ }
+
+ /**
+ * Adds protocol wise traffic summary in specified row wise.
+ *
+ * @param protocol mapping of portocol and traffic details.
+ */
+ private void printPrtocolTraffic(Map<String, Map<String, List<DataFlowRecord>>> protocol) {
+ print("Protocol wise traffic flow");
+ protocol.entrySet().forEach(entry -> {
+ entry.getValue().entrySet().forEach(port -> {
+ print("Traffic flow : %s, Protocol : %s, Total bytes : %d, Total packets : %d",
+ entry.getKey(),
+ port.getKey(),
+ totalInBytes(port.getValue()),
+ totalInPackets(port.getValue()));
+ });
+ });
+ print("\n");
+ }
+
+ /**
+ * Adds application wise traffic summary in specified row wise.
+ *
+ * @param applications mapping of application and traffic details.
+ */
+ private void printApplicationTraffic(Map<String, Map<String, List<DataFlowRecord>>> applications) {
+ print("Application wise traffic flow");
+ applications.entrySet().forEach(entry -> {
+ entry.getValue().entrySet().forEach(port -> {
+ print("Traffic flow : %s, Application : %s, Total bytes : %d, Total packets : %d",
+ entry.getKey(),
+ port.getKey(),
+ totalInBytes(port.getValue()),
+ totalInPackets(port.getValue()));
+ });
+ });
+ print("\n");
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java
new file mode 100644
index 0000000..f7183d6
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * CLI implementation for netflow collector.
+ */
+package org.onosproject.netflow.cli;
\ No newline at end of file
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java
new file mode 100644
index 0000000..8f17232
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netflow.impl;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.netflow.NetflowController;
+
+/**
+ * The main controller class. Handles all setup and network listeners -
+ * Ownership of netflow message receiver.
+ */
+public class Controller {
+
+ private static final Logger log = LoggerFactory.getLogger(Controller.class);
+
+ private ChannelGroup channelGroup;
+ private Channel serverChannel;
+
+ // Configuration options
+ protected static final short NETFLOW_PORT_NUM = 2055;
+ private final int workerThreads = 16;
+
+ // Start time of the controller
+ private long systemStartTime;
+
+ private ChannelFactory serverExecFactory;
+
+ private static final int BUFFER_SIZE = 5 * 1024;
+
+ private NetflowController controller;
+
+ /**
+ * Constructor to initialize the values.
+ *
+ * @param controller netflow controller instance
+ */
+ public Controller(NetflowController controller) {
+ this.controller = controller;
+ }
+
+ /**
+ * To get system start time.
+ *
+ * @return system start time in milliseconds
+ */
+ public long getSystemStartTime() {
+ return (this.systemStartTime);
+ }
+
+ /**
+ * Initialize timer.
+ */
+ public void init() {
+ this.systemStartTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Gets run time memory.
+ *
+ * @return run time memory
+ */
+ public Map<String, Long> getMemory() {
+ Map<String, Long> m = new HashMap<>();
+ Runtime runtime = Runtime.getRuntime();
+ m.put("total", runtime.totalMemory());
+ m.put("free", runtime.freeMemory());
+ return m;
+ }
+
+ /**
+ * Gets UP time.
+ *
+ * @return UP time
+ */
+ public Long getUptime() {
+ RuntimeMXBean rb = ManagementFactory.getRuntimeMXBean();
+ return rb.getUptime();
+ }
+
+ /**
+ * Netflow collector it will receive message from netflow exporter.
+ */
+ public void run() {
+
+ try {
+
+ final ConnectionlessBootstrap bootstrap = createServerBootStrap();
+
+ bootstrap.setOption("reuseAddress", false);
+ bootstrap.setOption("child.reuseAddress", false);
+ bootstrap.setOption("readBufferSize", BUFFER_SIZE); //15M
+ bootstrap.setOption("receiveBufferSizePredictor",
+ new FixedReceiveBufferSizePredictor(BUFFER_SIZE));
+ bootstrap.setOption("receiveBufferSizePredictorFactory",
+ new FixedReceiveBufferSizePredictorFactory(BUFFER_SIZE));
+ ChannelPipelineFactory pfact = new NetflowPipelineFactory(this.controller);
+
+ bootstrap.setPipelineFactory(pfact);
+ InetSocketAddress inetSocketAddress = new InetSocketAddress(NETFLOW_PORT_NUM);
+ channelGroup = new DefaultChannelGroup();
+ serverChannel = bootstrap.bind(inetSocketAddress);
+ channelGroup.add(serverChannel);
+ log.info("Listening for netflow exporter connection on {}", inetSocketAddress);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Creates server boot strap.
+ *
+ * @return ServerBootStrap
+ */
+ private ConnectionlessBootstrap createServerBootStrap() {
+
+ if (workerThreads == 0) {
+ serverExecFactory = new NioDatagramChannelFactory(
+ Executors.newFixedThreadPool(2));
+ return new ConnectionlessBootstrap(serverExecFactory);
+ } else {
+ serverExecFactory = new NioDatagramChannelFactory(
+ Executors.newFixedThreadPool(2),
+ workerThreads);
+ return new ConnectionlessBootstrap(serverExecFactory);
+ }
+ }
+
+ /**
+ * Stops the netflow collector.
+ */
+ public void stop() {
+ log.info("Stopped");
+ channelGroup.close();
+ }
+
+ /**
+ * Starts the netflow collector.
+ */
+ public void start() {
+ log.info("Started");
+ this.run();
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java
new file mode 100644
index 0000000..90d75dd
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.Optional;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.onosproject.netflow.NetflowStore;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+import org.onosproject.netflow.FlowTemplateField;
+import org.onosproject.netflow.SourceId;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataRecord;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.FlowSet;
+import org.onosproject.netflow.DataFlowSet;
+import org.onosproject.netflow.TemplateRecord;
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.OptionalTemplateFlowSet;
+import org.onosproject.netflow.TemplateFlowSet;
+import org.onlab.packet.BasePacket;
+
+/**
+ * Manages inventory of Netflow template and data flowset to distribute
+ * information.
+ */
+@Component(immediate = true, service = NetflowStore.class)
+public class DistributedNetflowStore implements NetflowStore {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private EventuallyConsistentMap<TemplateId, DataTemplateRecord> templateFlowSet;
+ private EventuallyConsistentMap<TemplateId, OptionalTemplateFlowSet> optionalTemplateFlowSet;
+ private EventuallyConsistentMap<TemplateId, List<DataFlowRecord>> dataFlowSet;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Activate
+ protected void activate() {
+ KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.MISC)
+ .register(List.class)
+ .register(LinkedList.class)
+ .register(ArrayList.class)
+ .register(HashSet.class)
+ .register(BasePacket.class)
+ .register(Flow.class)
+ .register(FlowField.class)
+ .register(FlowTemplateField.class)
+ .register(SourceId.class)
+ .register(TemplateId.class)
+ .register(DataRecord.class)
+ .register(DataFlowRecord.class)
+ .register(FlowSet.class)
+ .register(DataFlowSet.class)
+ .register(TemplateRecord.class)
+ .register(DataTemplateRecord.class)
+ .register(NetFlowPacket.class)
+ .register(OptionalTemplateFlowSet.class)
+ .register(TemplateFlowSet.class)
+ .register(TemplateRecord.class);
+
+ templateFlowSet = storageService.<TemplateId, DataTemplateRecord>eventuallyConsistentMapBuilder()
+ .withSerializer(serializer)
+ .withName("netflow-templateflowset")
+ .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+
+ optionalTemplateFlowSet = storageService.<TemplateId, OptionalTemplateFlowSet>eventuallyConsistentMapBuilder()
+ .withSerializer(serializer)
+ .withName("netflow-optionaltemplateflowset")
+ .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+
+ dataFlowSet = storageService.<TemplateId, List<DataFlowRecord>>eventuallyConsistentMapBuilder()
+ .withSerializer(serializer)
+ .withName("netflow-dataflowset")
+ .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactive() {
+ log.info("Stopped");
+ }
+
+ /**
+ * Get template flowset from store.
+ * it will fetch current template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of data template record, optional will be empty if template not found.
+ */
+ @Override
+ public Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId) {
+ return Optional.ofNullable(templateFlowSet.get(templateId));
+ }
+
+ /**
+ * Get set of template flowsets from store.
+ * it will fetch all template flowsets from store.
+ *
+ * @return set of data template record, set will be empty if templates not found in the store.
+ */
+ @Override
+ public Set<DataTemplateRecord> getTemplateFlowSet() {
+ return templateFlowSet.values().stream()
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Get optional template flowset from store.
+ * it will fetch current optional template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of optional template flowset, optional will be empty if template not found.
+ */
+ @Override
+ public Optional<OptionalTemplateFlowSet> getOptionalTemplateFlowSet(TemplateId templateId) {
+ return Optional.ofNullable(optionalTemplateFlowSet.get(templateId));
+ }
+
+ /**
+ * Get set of optional template flowsets from store.
+ * it will fetch all optional template flowsets from store.
+ *
+ * @return set of optional template flowsets, set will be empty if templates not found in the store.
+ */
+ @Override
+ public Set<OptionalTemplateFlowSet> getOptionalTemplateFlowSet() {
+ return ImmutableSet.copyOf(optionalTemplateFlowSet.values());
+ }
+
+ /**
+ * Get data flowset from store.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return list of data flow record, list will be empty if template id not matched.
+ */
+ @Override
+ public List<DataFlowRecord> getDataFlowSet(TemplateId templateId) {
+ List<DataFlowRecord> dataRecord = dataFlowSet.get(templateId);
+ if (!Objects.isNull(dataRecord)) {
+ return ImmutableList.copyOf(dataRecord);
+ }
+ return Lists.newArrayList();
+ }
+
+ /**
+ * Get data flowset from store.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @return mapping from a template id to data flow record.
+ */
+ @Override
+ public Map<TemplateId, List<DataFlowRecord>> getDataFlowSet() {
+ return dataFlowSet.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
+ }
+
+ /**
+ * Update template flowset to the store.
+ * Add new template to store if it not exist,
+ * otherwise it will replace the existing template.
+ *
+ * @param templateRecord template flowset record.
+ */
+ @Override
+ public void updateTemplateFlowSet(DataTemplateRecord templateRecord) {
+ templateFlowSet.put(templateRecord.getTemplateId(), templateRecord);
+ }
+
+ /**
+ * Update optional template flowset to the store.
+ * Add new optional template to store if it not exist,
+ * otherwise it will replace the existing optional template.
+ *
+ * @param templateFlowSet optional template flowset.
+ */
+ @Override
+ public void updateOptionalTemplateFlowSet(OptionalTemplateFlowSet templateFlowSet) {
+ optionalTemplateFlowSet.put(templateFlowSet.getTemplateId(), templateFlowSet);
+ }
+
+ /**
+ * Add data flow record to the store.
+ * Add new data flow record to store
+ *
+ * @param dataFlowRecord data flow record.
+ */
+ @Override
+ public void addDataFlowSet(DataFlowRecord dataFlowRecord) {
+ dataFlowSet.compute(dataFlowRecord.getTemplateId(),
+ (id, flowSet) -> {
+ List<DataFlowRecord> newSet = new ArrayList<>();
+ if (flowSet != null) {
+ newSet.addAll(flowSet);
+ }
+ newSet.add(dataFlowRecord);
+ return newSet;
+ });
+ }
+
+ /**
+ * Remove template flowset from store.
+ * it will remove template flowset which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ @Override
+ public void clearTemplateFlowSet(TemplateId templateId) {
+ if (templateFlowSet.containsKey(templateId)) {
+ templateFlowSet.remove(templateId);
+ }
+ }
+
+ /**
+ * Remove all template flowset from store.
+ * it will remove all template flowsets from store.
+ */
+ @Override
+ public void clearTemplateFlowSet() {
+ templateFlowSet.clear();
+ }
+
+ /**
+ * Remove optional template flowset from store.
+ * it will remove optional template which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ @Override
+ public void clearOptionalTemplateFlowSet(TemplateId templateId) {
+ if (optionalTemplateFlowSet.containsKey(templateId)) {
+ optionalTemplateFlowSet.remove(templateId);
+ }
+ }
+
+ /**
+ * Remove all optional template flowset from store.
+ * it will remove all optional template flowsets from store.
+ */
+ @Override
+ public void clearOptionalTemplateFlowSet() {
+ optionalTemplateFlowSet.clear();
+ }
+
+ /**
+ * Remove data flowset from store.
+ * it will remove dataflowset which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ @Override
+ public void clearDataFlowSet(TemplateId templateId) {
+ if (dataFlowSet.containsKey(templateId)) {
+ dataFlowSet.remove(templateId);
+ }
+ }
+
+ /**
+ * Remove all data flowset from store.
+ * it will remove all data flowsets from store.
+ */
+ @Override
+ public void clearDataFlowSet() {
+ dataFlowSet.clear();
+ }
+
+ /**
+ * Remove template, optional template and data flowsets from store.
+ * it will remove all flowsets from store.
+ */
+ @Override
+ public void clearAllFlowSet() {
+ templateFlowSet.clear();
+ optionalTemplateFlowSet.clear();
+ dataFlowSet.clear();
+ }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
index df5ffce..27eb8ea 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.netflow.impl;
+import java.util.Optional;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
@@ -22,18 +23,35 @@
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.SimpleChannelHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.FlowSet;
+import org.onosproject.netflow.TemplateFlowSet;
+import org.onosproject.netflow.DataFlowSet;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.NetflowController;
+
/**
* Channel handler deals with the netfow exporter connection and dispatches messages
* from netfow exporter to the appropriate locations.
*/
public class NeflowChannelHandler extends SimpleChannelHandler {
+ private static final Logger log = LoggerFactory.getLogger(NeflowChannelHandler.class);
+
private Channel channel;
+ private NetflowController controller;
+
/**
* Create a new netflow channelHandler instance.
+ *
+ * @param controller netflow controller.
*/
- NeflowChannelHandler() {
+ NeflowChannelHandler(NetflowController controller) {
+ this.controller = controller;
}
/**
@@ -81,8 +99,39 @@
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
- //TODO Netflow message store to netflow distributed store
- NetFlowPacket packet = (NetFlowPacket) event.getMessage();
+ try {
+ NetFlowPacket netFlowPacket = (NetFlowPacket) event.getMessage();
+ netFlowPacket.getFlowSets()
+ .stream()
+ .filter(n -> n.getType() == FlowSet.Type.TEMPLATE_FLOWSET)
+ .map(t -> (TemplateFlowSet) t)
+ .flatMap(t -> t.getRecords().stream())
+ .forEach(t -> controller.addTemplateFlowSet(t));
+
+ netFlowPacket.getFlowSets()
+ .stream()
+ .filter(n -> n.getType() == FlowSet.Type.DATA_FLOWSET)
+ .map(t -> (DataFlowSet) t)
+ .forEach(data -> {
+ Optional<DataTemplateRecord> template = controller
+ .getTemplateFlowSet(TemplateId.valueOf(data.getFlowSetId()));
+ if (!template.isPresent()) {
+ return;
+ }
+ try {
+ data.dataDeserializer(template.get());
+ data.getDataFlow()
+ .stream()
+ .forEach(dataflow -> controller.updateDataFlowSet(dataflow));
+ } catch (Exception ex) {
+ log.error("Netflow dataflow deserializer exception ", ex);
+ }
+ });
+ log.info("Netflow message received {}", netFlowPacket);
+ } catch (Exception er) {
+ log.error("Netflow message deserializer exception ", er);
+ }
+
}
}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
index be6ae9a..bbe0c0f 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
@@ -23,6 +23,7 @@
import org.onlab.packet.Deserializer;
import org.onlab.packet.BasePacket;
+import org.onosproject.netflow.FlowSet;
import static com.google.common.base.Preconditions.checkState;
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java
new file mode 100644
index 0000000..6d33f61
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import org.onosproject.netflow.NetflowController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import org.onosproject.netflow.NetflowStore;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.DataTemplateRecord;
+
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * It control and manage the netflow traffic.
+ * it is Collecting, Storing and analyzing NetFlow data
+ * it can help to understand which applications, and protocols
+ * may be consuming the most network bandwidth by tracking processes,
+ * protocols, times of day, and traffic routing.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+@Component(immediate = true, service = NetflowController.class)
+public class NetflowControllerImpl implements NetflowController {
+
+ private static final Logger log = LoggerFactory.getLogger(NetflowControllerImpl.class);
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected NetflowStore store;
+
+ @Activate
+ public void activate() {
+ Controller ctrl = new Controller(this);
+ ctrl.start();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ /**
+ * Add template flowset to controller.
+ * Add new template to controller if it not exist,
+ * otherwise it will replace the existing template.
+ *
+ * @param templateRecord template flowset record.
+ */
+ @Override
+ public void addTemplateFlowSet(DataTemplateRecord templateRecord) {
+ store.updateTemplateFlowSet(templateRecord);
+ }
+
+ /**
+ * Update data flowset to controller.
+ * it will update new data flowset to store.
+ *
+ * @param dataFlowRecord data flowset record.
+ */
+ @Override
+ public void updateDataFlowSet(DataFlowRecord dataFlowRecord) {
+ store.addDataFlowSet(dataFlowRecord);
+ }
+
+ /**
+ * Get template flowset from controller.
+ * it will fetch current template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of data template record, optional will be empty if template not found.
+ */
+ @Override
+ public Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId) {
+ return store.getTemplateFlowSet(templateId);
+ }
+
+ /**
+ * Get data flowset from controller.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return list of data flow record, list will be empty if template id not matched.
+ */
+ @Override
+ public List<DataFlowRecord> getDataFlowSet(TemplateId templateId) {
+ return store.getDataFlowSet(templateId);
+ }
+
+ /**
+ * Get all template flowsets from controller.
+ * it will fetch all templates from store.
+ *
+ * @return set of data template record, set will be empty if templates not found in the store.
+ */
+ @Override
+ public Set<DataTemplateRecord> getTemplateFlowSet() {
+ return store.getTemplateFlowSet();
+ }
+
+ /**
+ * Get data flowset from controller.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @return mapping from a template id to data flow record.
+ */
+ @Override
+ public Map<TemplateId, List<DataFlowRecord>> getDataFlowSet() {
+ return store.getDataFlowSet();
+ }
+
+}
\ No newline at end of file
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
index 8dad97d..6807a5c 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
@@ -42,7 +42,7 @@
return NetFlowPacket.deserializer().deserialize(bytes, 0, bytes.length);
}
} catch (Exception e) {
- log.error("Netflow message decode error");
+ log.error("Netflow message decode error ", e);
buffer.resetReaderIndex();
buffer.discardReadBytes();
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
index 178f6e6..9f5ef2e 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
@@ -18,18 +18,23 @@
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
-
+import org.onosproject.netflow.NetflowController;
/**
* Creates a ChannelPipeline for a server-side netflow message channel.
*/
public class NetflowPipelineFactory implements ChannelPipelineFactory {
+ private NetflowController controller;
+
/**
* Constructor to initialize the values.
+ *
+ * @param controller netflow controller.
*/
- public NetflowPipelineFactory() {
+ public NetflowPipelineFactory(NetflowController controller) {
super();
+ this.controller = controller;
}
/**
@@ -40,7 +45,7 @@
*/
@Override
public ChannelPipeline getPipeline() throws Exception {
- NeflowChannelHandler handler = new NeflowChannelHandler();
+ NeflowChannelHandler handler = new NeflowChannelHandler(controller);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("netflowmessagedecoder", new NetflowMessageDecoder());