Netflow store and cli implementation
Implemented netflow version 9 of netflow collector store and cli.
Reference : https://datatracker.ietf.org/doc/html/rfc3954
Change-Id: I0b2ec932c28bc3b36042c7a725a41956cc859fcd
diff --git a/apps/netflow/app/BUILD b/apps/netflow/app/BUILD
index 747d61d..c1a4601 100644
--- a/apps/netflow/app/BUILD
+++ b/apps/netflow/app/BUILD
@@ -1,4 +1,4 @@
-COMPILE_DEPS = CORE_DEPS + [
+COMPILE_DEPS = CORE_DEPS + KRYO + CLI + [
"//core/store/serializers:onos-core-serializers",
"//apps/netflow/api:onos-apps-netflow-api",
"@io_netty_netty_common//jar",
@@ -6,5 +6,6 @@
]
osgi_jar(
+ karaf_command_packages = ["org.onosproject.netflow.cli"],
deps = COMPILE_DEPS,
)
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java
new file mode 100644
index 0000000..760d40b
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/FlowFieldCompleter.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.apache.karaf.shell.api.console.Completer;
+import org.apache.karaf.shell.api.console.CommandLine;
+import org.apache.karaf.shell.api.console.Session;
+import org.apache.karaf.shell.support.completers.StringsCompleter;
+
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Flow field completer.
+ */
+@Service
+public class FlowFieldCompleter implements Completer {
+
+ @Override
+ public int complete(Session session, CommandLine commandLine, List<String> candidates) {
+
+ StringsCompleter delegate = new StringsCompleter();
+ Set<String> flowfields = FlowField.getAllFlowField();
+ SortedSet<String> strings = delegate.getStrings();
+ for (String field : flowfields) {
+ strings.add(field);
+ }
+ return delegate.complete(session, commandLine, candidates);
+ }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java
new file mode 100644
index 0000000..7a5db0d
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowDataflowCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Map;
+import java.util.Collection;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+
+/**
+ * Lists all netflow data flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-dataflow",
+ description = "Lists all data flowsets received from netflow exporter.")
+public class NetflowDataflowCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "templateid",
+ description = "Data flowset template id",
+ required = false, multiValued = false)
+ int templateID = 0;
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+
+ if (templateID < 0) {
+ print("Invalid template ID");
+ return;
+ }
+ if (templateID > 0) {
+ List<DataFlowRecord> dataFlowSet = controller.getDataFlowSet(TemplateId.valueOf(templateID));
+ if (!dataFlowSet.isEmpty()) {
+ dataFlowSet.stream().forEach(data -> printDataflowSet(data));
+ } else {
+ print("Default template not found");
+ }
+ } else {
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Default template not found");
+ } else {
+ dataFlowSets.values().stream().flatMap(Collection::stream).forEach(data -> printDataflowSet(data));
+ }
+ }
+ }
+
+ /**
+ * Adds data flowset details in specified row wise.
+ *
+ * @param dataFlowRecord data flowset record.
+ */
+ private void printDataflowSet(DataFlowRecord dataFlowRecord) {
+ print("Template ID : %d", dataFlowRecord.getTemplateId().getId());
+ dataFlowRecord.getFlows().forEach(dataflow -> {
+ print("Field : %s, Value : %s", dataflow.getField().name(),
+ dataflow.getValue().toString());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java
new file mode 100644
index 0000000..c23acdf
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowSummaryCommand.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.DataFlowRecord;
+
+/**
+ * Netflow overall summary report.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-summary",
+ description = "Summary of template flowsets and data flowsets.")
+public class NetflowSummaryCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ Set<DataTemplateRecord> templates = controller.getTemplateFlowSet();
+ if (templates.isEmpty()) {
+ print("Template not found");
+ } else {
+ Set<Integer> templateIds = templates.stream()
+ .map(DataTemplateRecord::getTemplateId)
+ .map(TemplateId::getId)
+ .collect(Collectors.toSet());
+ printTemplateflowSet(templateIds);
+
+ }
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Data not found");
+ } else {
+ printDataflowSet(dataFlowSets);
+ }
+ }
+
+ /**
+ * Adds template flowset summary in specified row wise.
+ *
+ * @param templateIds template ids
+ */
+ private void printTemplateflowSet(Set<Integer> templateIds) {
+ print("Number of templates : %d, Template IDs : %s",
+ templateIds.size(),
+ templateIds.toString());
+ }
+
+ /**
+ * Adds data flowset summary in specified row wise.
+ *
+ * @param dataflowMap data flow map
+ */
+ private void printDataflowSet(Map<TemplateId, List<DataFlowRecord>> dataflowMap) {
+ dataflowMap.entrySet().forEach(entry -> {
+ print("Template ID : %d, Data flow count : %d",
+ entry.getKey().getId(),
+ entry.getValue().size());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java
new file mode 100644
index 0000000..abfbb73
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTemplateCommand.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.Set;
+import java.util.Optional;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataTemplateRecord;
+
+/**
+ * Lists all netflow template flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-template",
+ description = "Lists all template flowsets received from netflow exporter.")
+public class NetflowTemplateCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "templateID",
+ description = "Netflow template ID",
+ required = false, multiValued = false)
+ int templateID = 0;
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ if (templateID < 0) {
+ print("Invalid template ID");
+ return;
+ }
+ if (templateID > 0) {
+ Optional<DataTemplateRecord> template = controller.getTemplateFlowSet(TemplateId.valueOf(templateID));
+ if (template.isPresent()) {
+ printTemplate(template.get());
+ } else {
+ print("Default template not found");
+ }
+ } else {
+ Set<DataTemplateRecord> templates = controller.getTemplateFlowSet();
+ if (templates.isEmpty()) {
+ print("Default template not found");
+ } else {
+ templates.stream().forEach(template -> printTemplate(template));
+ }
+ }
+ }
+
+ /**
+ * Adds template flowset details in specified row wise.
+ *
+ * @param template template flow set.
+ */
+ private void printTemplate(DataTemplateRecord template) {
+ print("Template ID : %d, Field Count : %d",
+ template.getTemplateId().getId(),
+ template.getFiledCount());
+ template.getFields().forEach(flowTemplateField -> {
+ print("Field : %s, Length : %d",
+ flowTemplateField.getFlowField().name(),
+ flowTemplateField.getLength());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java
new file mode 100644
index 0000000..51cefa1
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficFilterCommand.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Optional;
+import java.util.Map;
+import java.util.Collection;
+import java.util.function.Predicate;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Lists all filtered data flowsets.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-traffic-filter",
+ description = "Lists all filtered data flowsets received from netflow exporter.")
+public class NetflowTrafficFilterCommand extends AbstractShellCommand {
+
+ @Argument(index = 0, name = "field", description = "flow field",
+ required = false, multiValued = false)
+ @Completion(FlowFieldCompleter.class)
+ protected String field = null;
+
+ @Argument(index = 1, name = "value", description = "flow value",
+ required = false, multiValued = false)
+ protected String value = null;
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Data not found");
+ return;
+ }
+ dataFlowSets.values()
+ .stream()
+ .flatMap(Collection::stream)
+ .filter(df -> {
+ Optional<Object> fieldValue = getFieldValue(df.getFlows(), field);
+ if (fieldValue.isPresent() && fieldValue.toString().equals(value)) {
+ return true;
+ }
+ return false;
+ })
+ .forEach(df -> prinDataflowSet(df));
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field.
+ *
+ * @param flows collection of flows
+ * @param field flow field
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, String field) {
+ FlowField flowField = FlowField.valueOf(field);
+ return flows.stream()
+ .filter(flow -> flow.getField() == flowField)
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field predicates.
+ *
+ * @param flows collection of flows
+ * @param field flow field predicates
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, Predicate<FlowField> field) {
+ return flows.stream()
+ .filter(flow -> field.test(flow.getField()))
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+
+ /**
+ * Adds data flowset record details in specified row wise.
+ *
+ * @param dataFlowRecord data flowset record.
+ */
+ private void prinDataflowSet(DataFlowRecord dataFlowRecord) {
+ print("Template ID : %d", dataFlowRecord.getTemplateId().getId());
+ dataFlowRecord.getFlows().forEach(dataflow -> {
+ print("Field : %s, Value : %s",
+ dataflow.getField().name(),
+ dataflow.getValue().toString());
+ });
+ print("\n");
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java
new file mode 100644
index 0000000..fec9ba7
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/NetflowTrafficSummaryCommand.java
@@ -0,0 +1,232 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.onosproject.cli.AbstractShellCommand;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import java.util.List;
+import java.util.Optional;
+import java.util.Map;
+import java.util.Collection;
+import java.util.stream.Collectors;
+import java.util.HashMap;
+import java.util.function.Predicate;
+
+import org.onosproject.netflow.NetflowController;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+
+/**
+ * Flow traffic summary report.
+ */
+@Service
+@Command(scope = "onos", name = "netflow-traffic-summary",
+ description = "Summary of data flowset records based on in/out bytes/packets, protocols or applications.")
+public class NetflowTrafficSummaryCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ NetflowController controller = AbstractShellCommand.get(NetflowController.class);
+ Map<TemplateId, List<DataFlowRecord>> dataFlowSets = controller.getDataFlowSet();
+ if (dataFlowSets.isEmpty()) {
+ print("Data not found");
+ return;
+ }
+
+ Map<String, List<DataFlowRecord>> ds = dataFlowSets.values()
+ .stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.groupingBy(d -> hostWiseFlow(d.getFlows())));
+ printHostTraffic(ds);
+
+ Map<String, Map<String, List<DataFlowRecord>>> protocol = new HashMap<>();
+ ds.entrySet().forEach(entry -> {
+ protocol.put(entry.getKey(), protocolWiseFlow(entry.getValue()));
+ });
+ printPrtocolTraffic(protocol);
+
+ Map<String, Map<String, List<DataFlowRecord>>> application = new HashMap<>();
+ ds.entrySet().forEach(entry -> {
+ application.put(entry.getKey(), applicationWiseFlow(entry.getValue()));
+ });
+ printApplicationTraffic(application);
+ }
+
+ /**
+ * Host wise flow filter.
+ * ipv4 and ipv6 host flow fields will be filtered and flow direction
+ * added from src ip to dst ip
+ *
+ * @param flows collection of flows
+ */
+ private String hostWiseFlow(List<Flow> flows) {
+ Predicate<FlowField> srcIpParser = f -> (f.equals(FlowField.IPV4_SRC_ADDR) ||
+ f.equals(FlowField.IPV6_SRC_ADDR));
+ Predicate<FlowField> dstIpParser = f -> (f.equals(FlowField.IPV4_DST_ADDR) ||
+ f.equals(FlowField.IPV6_DST_ADDR));
+ String srcIp = getFieldValue(flows, srcIpParser).get().toString();
+ String dstIp = getFieldValue(flows, dstIpParser).get().toString();
+ return srcIp + " => " + dstIp;
+ }
+
+ /**
+ * Summary of all ingress bytes.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private int totalInBytes(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .map(data -> inBytes(data.getFlows()))
+ .collect(Collectors.summingInt(Integer::intValue));
+ }
+
+ /**
+ * Summary of all ingress packets.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private int totalInPackets(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .map(data -> inPackets(data.getFlows()))
+ .collect(Collectors.summingInt(Integer::intValue));
+ }
+
+ /**
+ * Application protocol wise flow filter.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private Map<String, List<DataFlowRecord>> applicationWiseFlow(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .collect(Collectors.groupingBy(d -> getFieldValue(d.getFlows(),
+ FlowField.L4_DST_PORT).get().toString()));
+
+ }
+
+ /**
+ * Transport protocol wise flow filter.
+ *
+ * @param dataFlowRecords collection of data flowset recored.
+ */
+ private Map<String, List<DataFlowRecord>> protocolWiseFlow(List<DataFlowRecord> dataFlowRecords) {
+ return dataFlowRecords.stream()
+ .collect(Collectors.groupingBy(d -> getFieldValue(d.getFlows(), FlowField.PROTOCOL).get().toString()));
+
+ }
+
+ /**
+ * Filter ingress bytes from flows and type cast to integer.
+ *
+ * @param flows collection of flows.
+ */
+ private int inBytes(List<Flow> flows) {
+ return Integer.parseInt(getFieldValue(flows, FlowField.IN_BYTES).get().toString());
+ }
+
+ /**
+ * Filter ingress packets from flows and type cast to integer.
+ *
+ * @param flows collection of flows.
+ */
+ private int inPackets(List<Flow> flows) {
+ return Integer.parseInt(getFieldValue(flows, FlowField.IN_PKTS).get().toString());
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field.
+ *
+ * @param flows collection of flows
+ * @param field flow field
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, FlowField field) {
+ return flows.stream()
+ .filter(flow -> flow.getField() == field)
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+ /**
+ * Get flow field value from collection of flows.
+ * get flow field value which is matching to the given flow field predicates.
+ *
+ * @param flows collection of flows
+ * @param field flow field predicates
+ */
+ private Optional<Object> getFieldValue(List<Flow> flows, Predicate<FlowField> field) {
+ return flows.stream()
+ .filter(flow -> field.test(flow.getField()))
+ .map(Flow::getValue)
+ .findAny();
+ }
+
+ /**
+ * Adds host wise traffic summary in specified row wise.
+ *
+ * @param hosts mapping of host and traffic details.
+ */
+ private void printHostTraffic(Map<String, List<DataFlowRecord>> hosts) {
+ print("\nHost wise traffic flow");
+ hosts.entrySet().forEach(entry -> {
+ print("Traffic flow : %s, Total bytes : %d, Total packets : %d",
+ entry.getKey(),
+ totalInBytes(entry.getValue()),
+ totalInPackets(entry.getValue()));
+ });
+ print("\n");
+ }
+
+ /**
+ * Adds protocol wise traffic summary in specified row wise.
+ *
+ * @param protocol mapping of portocol and traffic details.
+ */
+ private void printPrtocolTraffic(Map<String, Map<String, List<DataFlowRecord>>> protocol) {
+ print("Protocol wise traffic flow");
+ protocol.entrySet().forEach(entry -> {
+ entry.getValue().entrySet().forEach(port -> {
+ print("Traffic flow : %s, Protocol : %s, Total bytes : %d, Total packets : %d",
+ entry.getKey(),
+ port.getKey(),
+ totalInBytes(port.getValue()),
+ totalInPackets(port.getValue()));
+ });
+ });
+ print("\n");
+ }
+
+ /**
+ * Adds application wise traffic summary in specified row wise.
+ *
+ * @param applications mapping of application and traffic details.
+ */
+ private void printApplicationTraffic(Map<String, Map<String, List<DataFlowRecord>>> applications) {
+ print("Application wise traffic flow");
+ applications.entrySet().forEach(entry -> {
+ entry.getValue().entrySet().forEach(port -> {
+ print("Traffic flow : %s, Application : %s, Total bytes : %d, Total packets : %d",
+ entry.getKey(),
+ port.getKey(),
+ totalInBytes(port.getValue()),
+ totalInPackets(port.getValue()));
+ });
+ });
+ print("\n");
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java
new file mode 100644
index 0000000..f7183d6
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/cli/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * CLI implementation for netflow collector.
+ */
+package org.onosproject.netflow.cli;
\ No newline at end of file
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java
new file mode 100644
index 0000000..8f17232
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/Controller.java
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.netflow.impl;
+
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+
+import org.jboss.netty.bootstrap.ConnectionlessBootstrap;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictorFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioDatagramChannelFactory;
+import org.jboss.netty.channel.FixedReceiveBufferSizePredictor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.netflow.NetflowController;
+
+/**
+ * The main controller class. Handles all setup and network listeners -
+ * Ownership of netflow message receiver.
+ */
+public class Controller {
+
+ private static final Logger log = LoggerFactory.getLogger(Controller.class);
+
+ private ChannelGroup channelGroup;
+ private Channel serverChannel;
+
+ // Configuration options
+ protected static final short NETFLOW_PORT_NUM = 2055;
+ private final int workerThreads = 16;
+
+ // Start time of the controller
+ private long systemStartTime;
+
+ private ChannelFactory serverExecFactory;
+
+ private static final int BUFFER_SIZE = 5 * 1024;
+
+ private NetflowController controller;
+
+ /**
+ * Constructor to initialize the values.
+ *
+ * @param controller netflow controller instance
+ */
+ public Controller(NetflowController controller) {
+ this.controller = controller;
+ }
+
+ /**
+ * To get system start time.
+ *
+ * @return system start time in milliseconds
+ */
+ public long getSystemStartTime() {
+ return (this.systemStartTime);
+ }
+
+ /**
+ * Initialize timer.
+ */
+ public void init() {
+ this.systemStartTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Gets run time memory.
+ *
+ * @return run time memory
+ */
+ public Map<String, Long> getMemory() {
+ Map<String, Long> m = new HashMap<>();
+ Runtime runtime = Runtime.getRuntime();
+ m.put("total", runtime.totalMemory());
+ m.put("free", runtime.freeMemory());
+ return m;
+ }
+
+ /**
+ * Gets UP time.
+ *
+ * @return UP time
+ */
+ public Long getUptime() {
+ RuntimeMXBean rb = ManagementFactory.getRuntimeMXBean();
+ return rb.getUptime();
+ }
+
+ /**
+ * Netflow collector it will receive message from netflow exporter.
+ */
+ public void run() {
+
+ try {
+
+ final ConnectionlessBootstrap bootstrap = createServerBootStrap();
+
+ bootstrap.setOption("reuseAddress", false);
+ bootstrap.setOption("child.reuseAddress", false);
+ bootstrap.setOption("readBufferSize", BUFFER_SIZE); //15M
+ bootstrap.setOption("receiveBufferSizePredictor",
+ new FixedReceiveBufferSizePredictor(BUFFER_SIZE));
+ bootstrap.setOption("receiveBufferSizePredictorFactory",
+ new FixedReceiveBufferSizePredictorFactory(BUFFER_SIZE));
+ ChannelPipelineFactory pfact = new NetflowPipelineFactory(this.controller);
+
+ bootstrap.setPipelineFactory(pfact);
+ InetSocketAddress inetSocketAddress = new InetSocketAddress(NETFLOW_PORT_NUM);
+ channelGroup = new DefaultChannelGroup();
+ serverChannel = bootstrap.bind(inetSocketAddress);
+ channelGroup.add(serverChannel);
+ log.info("Listening for netflow exporter connection on {}", inetSocketAddress);
+ } catch (Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ /**
+ * Creates server boot strap.
+ *
+ * @return ServerBootStrap
+ */
+ private ConnectionlessBootstrap createServerBootStrap() {
+
+ if (workerThreads == 0) {
+ serverExecFactory = new NioDatagramChannelFactory(
+ Executors.newFixedThreadPool(2));
+ return new ConnectionlessBootstrap(serverExecFactory);
+ } else {
+ serverExecFactory = new NioDatagramChannelFactory(
+ Executors.newFixedThreadPool(2),
+ workerThreads);
+ return new ConnectionlessBootstrap(serverExecFactory);
+ }
+ }
+
+ /**
+ * Stops the netflow collector.
+ */
+ public void stop() {
+ log.info("Stopped");
+ channelGroup.close();
+ }
+
+ /**
+ * Starts the netflow collector.
+ */
+ public void start() {
+ log.info("Started");
+ this.run();
+ }
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java
deleted file mode 100644
index 8dd434c..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowRecord.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.function.BiPredicate;
-import java.util.function.Function;
-import java.util.function.Predicate;
-
-import com.google.common.base.MoreObjects;
-
-import org.onosproject.netflow.DataRecord;
-import org.onosproject.netflow.TemplateId;
-import org.onosproject.netflow.Flow;
-import org.onosproject.netflow.FlowTemplateField;
-import org.onosproject.netflow.DataDeserializer;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-
-/**
- * A Flow Data Record is a data record that contains values of the Flow.
- * parameters corresponding to a Template Record.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public class DataFlowRecord extends DataRecord {
-
- /*
- 0 1 2 3
- 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Record 1 - Field Value 1 | Record 1 - Field Value 2 |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Record 1 - Field Value 3 | ... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Record 2 - Field Value 1 | Record 2 - Field Value 2 |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Record 2 - Field Value 3 | ... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- */
-
- private TemplateId templateId;
-
- private List<Flow> flows;
-
- public DataFlowRecord(Builder builder) {
- this.templateId = builder.templateId;
- this.flows = builder.flows;
- }
-
- /**
- * Returns unique template ID.
- * Template Records is given a unique Template ID.
- * This uniqueness is local to the Observation
- * Domain that generated the Template ID. Template IDs 0-255 are
- * reserved for Template FlowSets, Options FlowSets, and other
- * reserved FlowSets yet to be created.
- *
- * @return list of flowsets
- */
- @Override
- public TemplateId getTemplateId() {
- return this.templateId;
- }
-
- /**
- * Returns type of data flow.
- *
- * @return type of data flow
- */
- public List<Flow> getFlows() {
- return flows;
- }
-
- @Override
- public int hashCode() {
- int hash = 7;
- hash = 29 * hash + Objects.hashCode(this.templateId);
- hash = 29 * hash + Objects.hashCode(this.flows);
- return hash;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final DataFlowRecord other = (DataFlowRecord) obj;
- if (!Objects.equals(this.templateId, other.templateId)) {
- return false;
- }
- return Objects.equals(this.flows, other.flows);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("templateId", templateId)
- .add("flows", flows)
- .toString();
- }
-
- /**
- * Deserializer function for data flow record.
- *
- * @return deserializer function
- */
- public static DataDeserializer<DataFlowRecord> deserializer() {
- return (data, offset, length, template) -> {
- ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
-
- Predicate<FlowTemplateField> isValidTemplate = t
- -> Objects.nonNull(t) && Objects.nonNull(t.getFlowField()) && t.getLength() > 0;
-
- BiPredicate<ByteBuffer, Integer> isValidBuffer = (b, l)
- -> b.hasRemaining() && b.remaining() >= l;
-
- Function<FlowTemplateField, Flow> parser = (f) -> {
-
- if (!isValidTemplate.test(f) && isValidBuffer.test(bb, f.getLength())) {
- throw new IllegalStateException("Invalid data set");
- }
- return new Flow.Builder()
- .field(f.getFlowField())
- .value(f.getFlowField().getParser().apply(bb, f.getLength()))
- .build();
-
- };
- DataTemplateRecord templateRecord = (DataTemplateRecord) template;
- Builder builder = new Builder()
- .templateId(templateRecord.getTemplateId());
- long count = templateRecord.getFields().stream()
- .filter(Objects::nonNull)
- .map(t -> builder.flow(parser.apply(t)))
- .count();
-
- if (count != templateRecord.getFiledCount()) {
- throw new IllegalStateException("Invalid parsing fields");
- }
- return builder.build();
- };
- }
-
- /**
- * Builder for data flow record.
- */
- private static class Builder {
-
- private TemplateId templateId;
-
- private List<Flow> flows = new LinkedList<>();
-
- /**
- * Setter for unique template ID.
- *
- * @param templateId template id.
- * @return this class builder.
- */
- public Builder templateId(TemplateId templateId) {
- this.templateId = templateId;
- return this;
- }
-
- /**
- * Setter for data flow.
- *
- * @param flow data flow.
- * @return this class builder.
- */
- public Builder flow(Flow flow) {
- this.flows.add(flow);
- return this;
- }
-
- /**
- * Setter for list of data flow.
- *
- * @param flows list of data flow.
- * @return this class builder.
- */
- public Builder flows(List<Flow> flows) {
- this.flows = flows;
- return this;
- }
-
- /**
- * Checks arguments for data flow record.
- */
- private void checkArguments() {
- checkNotNull(templateId, "TemplateId cannot be null");
- }
-
- /**
- * Builds data flow record.
- *
- * @return data flow record.
- */
- public DataFlowRecord build() {
- checkArguments();
- return new DataFlowRecord(this);
- }
-
- }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java
deleted file mode 100644
index 7183915..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataFlowSet.java
+++ /dev/null
@@ -1,280 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.function.Function;
-
-import com.google.common.base.MoreObjects;
-
-import org.onlab.packet.DeserializationException;
-import org.onlab.packet.Deserializer;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * A Data FlowSet is one or more records, of the same type, that are
- * grouped together in an Export Packet. Each record is either a Flow
- * Data Record or an Options Data Record previously defined by a
- * Template Record or an Options Template Record.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public final class DataFlowSet extends FlowSet {
-
- /*
- 0 1 2 3
- 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | FlowSet ID = Template ID | Length |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Record 1 - Field Value 1 | Record 1 - Field Value 2 |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Record 1 - Field Value 3 | ... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Record 2 - Field Value 1 | Record 2 - Field Value 2 |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Record 2 - Field Value 3 | ... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Record 3 - Field Value 1 | ... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | ... | Padding |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- */
-
- private int flowSetId;
-
- private int length;
-
- private byte[] data;
-
- private DataFlowSet(Builder builder) {
- this.flowSetId = builder.flowSetId;
- this.length = builder.length;
- this.data = builder.data;
- }
-
- private List<DataFlowRecord> dataFlow = new LinkedList<>();
-
- /**
- * Returns flowset id.
- * Each Data FlowSet is associated with a FlowSet ID. The FlowSet
- * ID maps to a (previously generated) Template ID
- *
- * @return flowset id
- */
- @Override
- public int getFlowSetId() {
- return this.flowSetId;
- }
-
- /**
- * Returns length of this FlowSet.
- * Length is the sum of the lengths
- * of the FlowSet ID, Length itself, all Flow Records within this
- * FlowSet, and the padding bytes, if any.
- *
- * @return length of the flowset
- */
- @Override
- public int getLength() {
- return this.length;
- }
-
- /**
- * Returns list of data flow records.
- *
- * @return list of data flow records
- */
- public List<DataFlowRecord> getDataFlow() {
- return dataFlow;
- }
-
- /**
- * Set data flow record.
- *
- * @param dataFlow data flow record
- */
- public void setDataFlow(DataFlowRecord dataFlow) {
- this.dataFlow.add(dataFlow);
- }
-
- /**
- * Returns type of flowset.
- *
- * @return type of flowset
- */
- @Override
- public Type getType() {
- return Type.DATA_FLOWSET;
- }
-
- @Override
- public int hashCode() {
- int hash = 3;
- hash = 79 * hash + this.flowSetId;
- hash = 79 * hash + this.length;
- hash = 79 * hash + Objects.hashCode(this.dataFlow);
- return hash;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final DataFlowSet other = (DataFlowSet) obj;
- if (this.flowSetId != other.flowSetId) {
- return false;
- }
- if (this.length != other.length) {
- return false;
- }
- return Objects.equals(this.dataFlow, other.dataFlow);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("flowSetId", flowSetId)
- .add("length", length)
- .add("data", data)
- .toString();
- }
-
- /**
- * Deserializer function for data flow set.
- *
- * @return deserializer function
- */
- public static Deserializer<DataFlowSet> deserializer() {
- return (data, offset, length) -> {
- Function<ByteBuffer, byte[]> readBytes = b -> {
- if (b.remaining() == b.limit()) {
- return null;
- }
- byte[] bytes = new byte[b.remaining()];
- b.get(bytes);
- return bytes;
- };
- ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
- return new Builder()
- .flowSetId(bb.getShort())
- .length(bb.getShort())
- .data(readBytes.apply(bb))
- .build();
-
- };
- }
-
- /**
- * Data eserializer function for data flow record.
- *
- * @param template data template record
- * @throws DeserializationException if unable to deserialize data
- */
- public void dataDeserializer(DataTemplateRecord template) throws DeserializationException {
- dataFlow = new LinkedList<>();
- ByteBuffer bb = ByteBuffer.wrap(data, 0, data.length);
-
- while (bb.hasRemaining()) {
- if (bb.remaining() < template.getValueLength()) {
- break;
- }
- byte[] dataRecord = new byte[template.getValueLength()];
- bb.get(dataRecord);
- this.setDataFlow(DataFlowRecord.deserializer().deserialize(
- data, 0, template.getValueLength(), template));
- }
-
- }
-
- /**
- * Builder for data flow set.
- */
- private static class Builder {
-
- private int flowSetId;
-
- private int length;
-
- private byte[] data;
-
- /**
- * Setter for flowset id.
- *
- * @param flowSetId flowset id.
- * @return this class builder.
- */
- public Builder flowSetId(int flowSetId) {
- this.flowSetId = flowSetId;
- return this;
- }
-
- /**
- * Setter for length of this FlowSet.
- *
- * @param length length of this FlowSet.
- * @return this class builder.
- */
- public Builder length(int length) {
- this.length = length;
- return this;
- }
-
- /**
- * Setter for flow data.
- *
- * @param data flow data.
- * @return this class builder.
- */
- public Builder data(byte[] data) {
- this.data = data;
- return this;
- }
-
- /**
- * Checks arguments for data flow set.
- */
- private void checkArguments() {
- checkState(flowSetId != 0, "Invalid data flowset id.");
- checkState(length != 0, "Invalid data flowset length.");
- checkNotNull(data, "Data flow set cannot be null");
-
- }
-
- /**
- * Builds data flowset.
- *
- * @return data flowset.
- */
- public DataFlowSet build() {
- checkArguments();
- return new DataFlowSet(this);
- }
-
- }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java
deleted file mode 100644
index dc98531..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DataTemplateRecord.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import com.google.common.base.MoreObjects;
-
-import org.onlab.packet.Deserializer;
-import org.onosproject.netflow.TemplateId;
-import org.onosproject.netflow.FlowTemplateField;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Template Record defines the structure and interpretation.
- * of fields in an Options Data Record, including defining the scope
- * within which the Options Data Record is relevant.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public final class DataTemplateRecord extends TemplateRecord {
-
- /*
- 0 1 2 3
- 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Template ID 256 | Field Count |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Field Type 1 | Field Length 1 |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Field Type 2 | Field Length 2 |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- */
-
- private TemplateId templateId;
-
- private int filedCount;
-
- private List<FlowTemplateField> fields;
-
- private DataTemplateRecord(Builder builder) {
- this.templateId = builder.templateId;
- this.filedCount = builder.filedCount;
- this.fields = builder.fields;
- }
-
- /**
- * Returns template record's template id.
- * Template Records is given a unique Template ID.
- * This uniqueness is local to the Observation
- * Domain that generated the Template ID. Template IDs 0-255 are
- * reserved for Template FlowSets, Options FlowSets, and other
- * reserved FlowSets yet to be created.
- *
- * @return list of flowsets
- */
- @Override
- public TemplateId getTemplateId() {
- return templateId;
- }
-
- /**
- * Returns number of fields in this Template Record.
- *
- * @return field count
- */
- public int getFiledCount() {
- return filedCount;
- }
-
- /**
- * Returns list of flow template fields.
- *
- * @return list of flow template fields
- */
- public List<FlowTemplateField> getFields() {
- return fields;
- }
-
- public int getValueLength() {
- Optional.ofNullable(fields)
- .orElseThrow(() -> new IllegalStateException("Invalid fields"));
- return fields.stream()
- .filter(Objects::nonNull)
- .map(FlowTemplateField::getLength)
- .collect(Collectors.summingInt(Integer::intValue));
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("templateId", templateId)
- .add("filedCount", filedCount)
- .add("fields", fields)
- .toString();
- }
-
- /**
- * Data deserializer function for data template record.
- *
- * @return data deserializer function
- */
- public static Deserializer<DataTemplateRecord> deserializer() {
- return (data, offset, length) -> {
- Predicate<ByteBuffer> isValidBuffer = b -> b.remaining() < FlowSet.FIELD_LENTH;
- Function<ByteBuffer, FlowTemplateField> parse = (b)
- -> {
- if (isValidBuffer.test(b)) {
- throw new IllegalStateException("Invalid buffersize");
- }
- return new FlowTemplateField.Builder()
- .flowField(b.getShort())
- .length(b.getShort())
- .build();
- };
-
- Builder builder = new Builder();
- ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
- if (isValidBuffer.test(bb)) {
- throw new IllegalStateException("Invalid buffersize");
- }
- builder.templateId(bb.getShort())
- .filedCount(bb.getShort());
- IntStream.rangeClosed(1, builder.filedCount).forEach(i -> builder.templateField(parse.apply(bb)));
- return builder.build();
- };
- }
-
- /**
- * Builder for data template record.
- */
- private static class Builder {
-
- private TemplateId templateId;
-
- private int filedCount;
-
- private List<FlowTemplateField> fields = new LinkedList<>();
-
- /**
- * Setter for template record's template id.
- *
- * @param templateId template record's template id.
- * @return this class builder.
- */
- public Builder templateId(int templateId) {
- this.templateId = new TemplateId((templateId));
- return this;
- }
-
- /**
- * Setter for number of fields in this Template Record.
- *
- * @param filedCount number of fields in this Template Record.
- * @return this class builder.
- */
- public Builder filedCount(int filedCount) {
- this.filedCount = filedCount;
- return this;
- }
-
- /**
- * Setter for list of flow template fields.
- *
- * @param fields list of flow template fields.
- * @return this class builder.
- */
- public Builder templateFields(List<FlowTemplateField> fields) {
- this.fields = fields;
- return this;
- }
-
- /**
- * Setter for flow template fields.
- *
- * @param field flow template fields.
- * @return this class builder.
- */
- public Builder templateField(FlowTemplateField field) {
- this.fields.add(field);
- return this;
- }
-
- /**
- * Checks arguments for data template record.
- */
- private void checkArguments() {
- checkState(filedCount != 0, "Invalid template filed count.");
- checkNotNull(templateId, "Template Id cannot be null.");
-
- }
-
- /**
- * Builds data template record.
- *
- * @return data template record.
- */
- public DataTemplateRecord build() {
- checkArguments();
- return new DataTemplateRecord(this);
- }
-
- }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java
new file mode 100644
index 0000000..90d75dd
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/DistributedNetflowStore.java
@@ -0,0 +1,327 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.Optional;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import org.onosproject.netflow.NetflowStore;
+import org.onosproject.netflow.Flow;
+import org.onosproject.netflow.FlowField;
+import org.onosproject.netflow.FlowTemplateField;
+import org.onosproject.netflow.SourceId;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataRecord;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.FlowSet;
+import org.onosproject.netflow.DataFlowSet;
+import org.onosproject.netflow.TemplateRecord;
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.OptionalTemplateFlowSet;
+import org.onosproject.netflow.TemplateFlowSet;
+import org.onlab.packet.BasePacket;
+
+/**
+ * Manages inventory of Netflow template and data flowset to distribute
+ * information.
+ */
+@Component(immediate = true, service = NetflowStore.class)
+public class DistributedNetflowStore implements NetflowStore {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private EventuallyConsistentMap<TemplateId, DataTemplateRecord> templateFlowSet;
+ private EventuallyConsistentMap<TemplateId, OptionalTemplateFlowSet> optionalTemplateFlowSet;
+ private EventuallyConsistentMap<TemplateId, List<DataFlowRecord>> dataFlowSet;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Activate
+ protected void activate() {
+ KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.MISC)
+ .register(List.class)
+ .register(LinkedList.class)
+ .register(ArrayList.class)
+ .register(HashSet.class)
+ .register(BasePacket.class)
+ .register(Flow.class)
+ .register(FlowField.class)
+ .register(FlowTemplateField.class)
+ .register(SourceId.class)
+ .register(TemplateId.class)
+ .register(DataRecord.class)
+ .register(DataFlowRecord.class)
+ .register(FlowSet.class)
+ .register(DataFlowSet.class)
+ .register(TemplateRecord.class)
+ .register(DataTemplateRecord.class)
+ .register(NetFlowPacket.class)
+ .register(OptionalTemplateFlowSet.class)
+ .register(TemplateFlowSet.class)
+ .register(TemplateRecord.class);
+
+ templateFlowSet = storageService.<TemplateId, DataTemplateRecord>eventuallyConsistentMapBuilder()
+ .withSerializer(serializer)
+ .withName("netflow-templateflowset")
+ .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+
+ optionalTemplateFlowSet = storageService.<TemplateId, OptionalTemplateFlowSet>eventuallyConsistentMapBuilder()
+ .withSerializer(serializer)
+ .withName("netflow-optionaltemplateflowset")
+ .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+
+ dataFlowSet = storageService.<TemplateId, List<DataFlowRecord>>eventuallyConsistentMapBuilder()
+ .withSerializer(serializer)
+ .withName("netflow-dataflowset")
+ .withAntiEntropyPeriod(10, TimeUnit.SECONDS)
+ .withTimestampProvider((k, v) -> new org.onosproject.store.service.WallClockTimestamp())
+ .withTombstonesDisabled()
+ .build();
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactive() {
+ log.info("Stopped");
+ }
+
+ /**
+ * Get template flowset from store.
+ * it will fetch current template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of data template record, optional will be empty if template not found.
+ */
+ @Override
+ public Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId) {
+ return Optional.ofNullable(templateFlowSet.get(templateId));
+ }
+
+ /**
+ * Get set of template flowsets from store.
+ * it will fetch all template flowsets from store.
+ *
+ * @return set of data template record, set will be empty if templates not found in the store.
+ */
+ @Override
+ public Set<DataTemplateRecord> getTemplateFlowSet() {
+ return templateFlowSet.values().stream()
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Get optional template flowset from store.
+ * it will fetch current optional template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of optional template flowset, optional will be empty if template not found.
+ */
+ @Override
+ public Optional<OptionalTemplateFlowSet> getOptionalTemplateFlowSet(TemplateId templateId) {
+ return Optional.ofNullable(optionalTemplateFlowSet.get(templateId));
+ }
+
+ /**
+ * Get set of optional template flowsets from store.
+ * it will fetch all optional template flowsets from store.
+ *
+ * @return set of optional template flowsets, set will be empty if templates not found in the store.
+ */
+ @Override
+ public Set<OptionalTemplateFlowSet> getOptionalTemplateFlowSet() {
+ return ImmutableSet.copyOf(optionalTemplateFlowSet.values());
+ }
+
+ /**
+ * Get data flowset from store.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return list of data flow record, list will be empty if template id not matched.
+ */
+ @Override
+ public List<DataFlowRecord> getDataFlowSet(TemplateId templateId) {
+ List<DataFlowRecord> dataRecord = dataFlowSet.get(templateId);
+ if (!Objects.isNull(dataRecord)) {
+ return ImmutableList.copyOf(dataRecord);
+ }
+ return Lists.newArrayList();
+ }
+
+ /**
+ * Get data flowset from store.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @return mapping from a template id to data flow record.
+ */
+ @Override
+ public Map<TemplateId, List<DataFlowRecord>> getDataFlowSet() {
+ return dataFlowSet.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
+ }
+
+ /**
+ * Update template flowset to the store.
+ * Add new template to store if it not exist,
+ * otherwise it will replace the existing template.
+ *
+ * @param templateRecord template flowset record.
+ */
+ @Override
+ public void updateTemplateFlowSet(DataTemplateRecord templateRecord) {
+ templateFlowSet.put(templateRecord.getTemplateId(), templateRecord);
+ }
+
+ /**
+ * Update optional template flowset to the store.
+ * Add new optional template to store if it not exist,
+ * otherwise it will replace the existing optional template.
+ *
+ * @param templateFlowSet optional template flowset.
+ */
+ @Override
+ public void updateOptionalTemplateFlowSet(OptionalTemplateFlowSet templateFlowSet) {
+ optionalTemplateFlowSet.put(templateFlowSet.getTemplateId(), templateFlowSet);
+ }
+
+ /**
+ * Add data flow record to the store.
+ * Add new data flow record to store
+ *
+ * @param dataFlowRecord data flow record.
+ */
+ @Override
+ public void addDataFlowSet(DataFlowRecord dataFlowRecord) {
+ dataFlowSet.compute(dataFlowRecord.getTemplateId(),
+ (id, flowSet) -> {
+ List<DataFlowRecord> newSet = new ArrayList<>();
+ if (flowSet != null) {
+ newSet.addAll(flowSet);
+ }
+ newSet.add(dataFlowRecord);
+ return newSet;
+ });
+ }
+
+ /**
+ * Remove template flowset from store.
+ * it will remove template flowset which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ @Override
+ public void clearTemplateFlowSet(TemplateId templateId) {
+ if (templateFlowSet.containsKey(templateId)) {
+ templateFlowSet.remove(templateId);
+ }
+ }
+
+ /**
+ * Remove all template flowset from store.
+ * it will remove all template flowsets from store.
+ */
+ @Override
+ public void clearTemplateFlowSet() {
+ templateFlowSet.clear();
+ }
+
+ /**
+ * Remove optional template flowset from store.
+ * it will remove optional template which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ @Override
+ public void clearOptionalTemplateFlowSet(TemplateId templateId) {
+ if (optionalTemplateFlowSet.containsKey(templateId)) {
+ optionalTemplateFlowSet.remove(templateId);
+ }
+ }
+
+ /**
+ * Remove all optional template flowset from store.
+ * it will remove all optional template flowsets from store.
+ */
+ @Override
+ public void clearOptionalTemplateFlowSet() {
+ optionalTemplateFlowSet.clear();
+ }
+
+ /**
+ * Remove data flowset from store.
+ * it will remove dataflowset which is matching to the given template id from store.
+ *
+ * @param templateId template id.
+ */
+ @Override
+ public void clearDataFlowSet(TemplateId templateId) {
+ if (dataFlowSet.containsKey(templateId)) {
+ dataFlowSet.remove(templateId);
+ }
+ }
+
+ /**
+ * Remove all data flowset from store.
+ * it will remove all data flowsets from store.
+ */
+ @Override
+ public void clearDataFlowSet() {
+ dataFlowSet.clear();
+ }
+
+ /**
+ * Remove template, optional template and data flowsets from store.
+ * it will remove all flowsets from store.
+ */
+ @Override
+ public void clearAllFlowSet() {
+ templateFlowSet.clear();
+ optionalTemplateFlowSet.clear();
+ dataFlowSet.clear();
+ }
+
+}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java
deleted file mode 100644
index 1b292a7..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/FlowSet.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.util.Arrays;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.onlab.packet.BasePacket;
-import org.onlab.packet.DeserializationException;
-import org.onlab.packet.Deserializer;
-
-/**
- * FlowSet is a generic term for a collection of Flow Records that have.
- * a similar structure. In an Export Packet, one or more FlowSets
- * follow the Packet Header.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public abstract class FlowSet extends BasePacket {
-
- public static final int FLOW_SET_HEADER_LENTH = 4;
-
- public static final int FIELD_LENTH = 4;
-
- public static final int RECORD_HEADER_LENGTH = 4;
-
- /**
- * FlowSets type
- * FlowSets: Template FlowSet, Options Template FlowSet, and Data FlowSet.
- */
- public enum Type {
-
- TEMPLATE_FLOWSET(0, TemplateFlowSet.deserializer()),
- OPTIONAL_TEMPLATE_FLOWSET(1, OptionalTemplateFlowSet.deserializer()),
- DATA_FLOWSET(Integer.MAX_VALUE, DataFlowSet.deserializer());
-
- private final int flowSetId;
- private final Deserializer deserializer;
-
- Type(int flowSetId, Deserializer deserializer) {
- this.flowSetId = flowSetId;
- this.deserializer = deserializer;
- }
-
- private static Map<Integer, Type> parser = new ConcurrentHashMap<>();
-
- static {
- Arrays.stream(Type.values()).forEach(type -> parser.put(type.flowSetId, type));
- }
-
- public static Type getType(int flowSetId) throws DeserializationException {
- if (flowSetId < 0) {
- throw new DeserializationException("Invalid trap type");
- }
- return Optional.of(flowSetId)
- .filter(id -> parser.containsKey(id))
- .map(id -> parser.get(id))
- .orElse(DATA_FLOWSET);
- }
-
- public Deserializer getDecoder() {
- return this.deserializer;
- }
-
- }
-
- public abstract Type getType();
-
- public abstract int getFlowSetId();
-
- public abstract int getLength();
-
- @Override
- public byte[] serialize() {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
index df5ffce..27eb8ea 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NeflowChannelHandler.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.netflow.impl;
+import java.util.Optional;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
@@ -22,18 +23,35 @@
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.SimpleChannelHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.onosproject.netflow.DataTemplateRecord;
+import org.onosproject.netflow.FlowSet;
+import org.onosproject.netflow.TemplateFlowSet;
+import org.onosproject.netflow.DataFlowSet;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.NetflowController;
+
/**
* Channel handler deals with the netfow exporter connection and dispatches messages
* from netfow exporter to the appropriate locations.
*/
public class NeflowChannelHandler extends SimpleChannelHandler {
+ private static final Logger log = LoggerFactory.getLogger(NeflowChannelHandler.class);
+
private Channel channel;
+ private NetflowController controller;
+
/**
* Create a new netflow channelHandler instance.
+ *
+ * @param controller netflow controller.
*/
- NeflowChannelHandler() {
+ NeflowChannelHandler(NetflowController controller) {
+ this.controller = controller;
}
/**
@@ -81,8 +99,39 @@
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
- //TODO Netflow message store to netflow distributed store
- NetFlowPacket packet = (NetFlowPacket) event.getMessage();
+ try {
+ NetFlowPacket netFlowPacket = (NetFlowPacket) event.getMessage();
+ netFlowPacket.getFlowSets()
+ .stream()
+ .filter(n -> n.getType() == FlowSet.Type.TEMPLATE_FLOWSET)
+ .map(t -> (TemplateFlowSet) t)
+ .flatMap(t -> t.getRecords().stream())
+ .forEach(t -> controller.addTemplateFlowSet(t));
+
+ netFlowPacket.getFlowSets()
+ .stream()
+ .filter(n -> n.getType() == FlowSet.Type.DATA_FLOWSET)
+ .map(t -> (DataFlowSet) t)
+ .forEach(data -> {
+ Optional<DataTemplateRecord> template = controller
+ .getTemplateFlowSet(TemplateId.valueOf(data.getFlowSetId()));
+ if (!template.isPresent()) {
+ return;
+ }
+ try {
+ data.dataDeserializer(template.get());
+ data.getDataFlow()
+ .stream()
+ .forEach(dataflow -> controller.updateDataFlowSet(dataflow));
+ } catch (Exception ex) {
+ log.error("Netflow dataflow deserializer exception ", ex);
+ }
+ });
+ log.info("Netflow message received {}", netFlowPacket);
+ } catch (Exception er) {
+ log.error("Netflow message deserializer exception ", er);
+ }
+
}
}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
index be6ae9a..bbe0c0f 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetFlowPacket.java
@@ -23,6 +23,7 @@
import org.onlab.packet.Deserializer;
import org.onlab.packet.BasePacket;
+import org.onosproject.netflow.FlowSet;
import static com.google.common.base.Preconditions.checkState;
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java
new file mode 100644
index 0000000..6d33f61
--- /dev/null
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowControllerImpl.java
@@ -0,0 +1,135 @@
+/*
+ * Copyright 2023-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.netflow.impl;
+
+import org.onosproject.netflow.NetflowController;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import org.onosproject.netflow.NetflowStore;
+import org.onosproject.netflow.TemplateId;
+import org.onosproject.netflow.DataFlowRecord;
+import org.onosproject.netflow.DataTemplateRecord;
+
+import java.util.List;
+import java.util.Set;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * It control and manage the netflow traffic.
+ * it is Collecting, Storing and analyzing NetFlow data
+ * it can help to understand which applications, and protocols
+ * may be consuming the most network bandwidth by tracking processes,
+ * protocols, times of day, and traffic routing.
+ * Ref: https://www.ietf.org/rfc/rfc3954.txt
+ */
+@Component(immediate = true, service = NetflowController.class)
+public class NetflowControllerImpl implements NetflowController {
+
+ private static final Logger log = LoggerFactory.getLogger(NetflowControllerImpl.class);
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected NetflowStore store;
+
+ @Activate
+ public void activate() {
+ Controller ctrl = new Controller(this);
+ ctrl.start();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ /**
+ * Add template flowset to controller.
+ * Add new template to controller if it not exist,
+ * otherwise it will replace the existing template.
+ *
+ * @param templateRecord template flowset record.
+ */
+ @Override
+ public void addTemplateFlowSet(DataTemplateRecord templateRecord) {
+ store.updateTemplateFlowSet(templateRecord);
+ }
+
+ /**
+ * Update data flowset to controller.
+ * it will update new data flowset to store.
+ *
+ * @param dataFlowRecord data flowset record.
+ */
+ @Override
+ public void updateDataFlowSet(DataFlowRecord dataFlowRecord) {
+ store.addDataFlowSet(dataFlowRecord);
+ }
+
+ /**
+ * Get template flowset from controller.
+ * it will fetch current template which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return optional of data template record, optional will be empty if template not found.
+ */
+ @Override
+ public Optional<DataTemplateRecord> getTemplateFlowSet(TemplateId templateId) {
+ return store.getTemplateFlowSet(templateId);
+ }
+
+ /**
+ * Get data flowset from controller.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @param templateId template id.
+ * @return list of data flow record, list will be empty if template id not matched.
+ */
+ @Override
+ public List<DataFlowRecord> getDataFlowSet(TemplateId templateId) {
+ return store.getDataFlowSet(templateId);
+ }
+
+ /**
+ * Get all template flowsets from controller.
+ * it will fetch all templates from store.
+ *
+ * @return set of data template record, set will be empty if templates not found in the store.
+ */
+ @Override
+ public Set<DataTemplateRecord> getTemplateFlowSet() {
+ return store.getTemplateFlowSet();
+ }
+
+ /**
+ * Get data flowset from controller.
+ * it will fetch current data flowset which is matching to the template id from store.
+ *
+ * @return mapping from a template id to data flow record.
+ */
+ @Override
+ public Map<TemplateId, List<DataFlowRecord>> getDataFlowSet() {
+ return store.getDataFlowSet();
+ }
+
+}
\ No newline at end of file
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
index 8dad97d..6807a5c 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowMessageDecoder.java
@@ -42,7 +42,7 @@
return NetFlowPacket.deserializer().deserialize(bytes, 0, bytes.length);
}
} catch (Exception e) {
- log.error("Netflow message decode error");
+ log.error("Netflow message decode error ", e);
buffer.resetReaderIndex();
buffer.discardReadBytes();
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
index 178f6e6..9f5ef2e 100644
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
+++ b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/NetflowPipelineFactory.java
@@ -18,18 +18,23 @@
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
-
+import org.onosproject.netflow.NetflowController;
/**
* Creates a ChannelPipeline for a server-side netflow message channel.
*/
public class NetflowPipelineFactory implements ChannelPipelineFactory {
+ private NetflowController controller;
+
/**
* Constructor to initialize the values.
+ *
+ * @param controller netflow controller.
*/
- public NetflowPipelineFactory() {
+ public NetflowPipelineFactory(NetflowController controller) {
super();
+ this.controller = controller;
}
/**
@@ -40,7 +45,7 @@
*/
@Override
public ChannelPipeline getPipeline() throws Exception {
- NeflowChannelHandler handler = new NeflowChannelHandler();
+ NeflowChannelHandler handler = new NeflowChannelHandler(controller);
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("netflowmessagedecoder", new NetflowMessageDecoder());
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java
deleted file mode 100644
index 9a00bb2..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/OptionalTemplateFlowSet.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.util.List;
-
-import org.onlab.packet.Deserializer;
-
-
-/**
- * The Options Template Record (and its corresponding Options Data
- * Record) is used to supply information about the NetFlow process
- * configuration or NetFlow process specific data, rather than supplying
- * information about IP Flows.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public class OptionalTemplateFlowSet extends FlowSet {
-
- /*
- 0 1 2 3
- 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | FlowSet ID = 1 | Length |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Template ID | Option Scope Length |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Option Length | Scope 1 Field Type |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Scope 1 Field Length | ... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Scope N Field Length | Option 1 Field Type |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Option 1 Field Length | ... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Option M Field Length | Padding |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- */
-
- private int flowSetId;
-
- private int length;
-
- private List<DataTemplateRecord> records;
-
- /**
- * Returns flowset type.
- *
- * @return flowset type
- */
- @Override
- public Type getType() {
- return Type.OPTIONAL_TEMPLATE_FLOWSET;
- }
-
- /**
- * Returns flowset id.
- * FlowSet ID value of 1 is reserved for the Options Template.
- *
- * @return flow set ID
- */
- public int getFlowSetId() {
- return flowSetId;
- }
-
- /**
- * Returns total length of this FlowSet.
- * Each Options Template FlowSet
- * MAY contain multiple Options Template Records. Thus, the
- * Length value MUST be used to determine the position of the next
- * FlowSet record, which could be either a Template FlowSet or
- * Data FlowSet.
- *
- * @return flow set ID
- */
- public int getLength() {
- return length;
- }
-
- /**
- * Returns list of optional data template records.
- *
- * @return list of optional data template records
- */
- public List<DataTemplateRecord> getRecords() {
- return records;
- }
-
- /**
- * Deserializer function for data option template flowset.
- *
- * @return data deserializer function
- */
- public static Deserializer<OptionalTemplateFlowSet> deserializer() {
- return (data, offset, length) -> {
- //TODO parse optional template
- return new OptionalTemplateFlowSet();
- };
- }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java
deleted file mode 100644
index b6bc51e..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateFlowSet.java
+++ /dev/null
@@ -1,279 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.function.Predicate;
-import java.util.Objects;
-
-import com.google.common.base.MoreObjects;
-
-import org.onlab.packet.DeserializationException;
-import org.onlab.packet.Deserializer;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * One of the essential elements in the NetFlow format is the Template
- * FlowSet. Templates greatly enhance the flexibility of the Flow
- * Record format because they allow the NetFlow Collector to process
- * Flow Records without necessarily knowing the interpretation of all
- * the data in the Flow Record.
- * Ref: https://www.ietf.org/rfc/rfc3954.txt
- */
-public final class TemplateFlowSet extends FlowSet {
-
- /*
- 0 1 2 3
- 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | FlowSet ID = 0 | Length |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Template ID 256 | Field Count |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Field Type 1 | Field Length 1 |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Field Type 2 | Field Length 2 |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | ... | ... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Field Type N | Field Length N |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Template ID 257 | Field Count |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Field Type 1 | Field Length 1 |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Field Type 2 | Field Length 2 |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | ... | ... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Field Type M | Field Length M |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | ... | ... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | Template ID K | Field Count |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- | ... | ... |
- +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
- */
-
- private int flowSetId;
-
- private int length;
-
- private List<DataTemplateRecord> records;
-
- private TemplateFlowSet(Builder builder) {
- this.records = builder.records;
- this.length = builder.length;
- this.flowSetId = builder.flowSetId;
- }
-
- /**
- * Return template flow set id.
- * FlowSet ID value of 0 is reserved for the Template FlowSet.
- *
- * @return flow set ID
- */
- @Override
- public int getFlowSetId() {
- return this.flowSetId;
- }
-
- /**
- * Returns total length of this flowSet.
- *
- * @return length of flowset
- */
- @Override
- public int getLength() {
- return this.length;
- }
-
- /**
- * Returns list of flow records.
- *
- * @return list of flow records
- */
- public List<DataTemplateRecord> getRecords() {
- return records;
- }
-
- /**
- * Returns type of the flowset.
- *
- * @return type of the flowset
- */
- @Override
- public Type getType() {
- return Type.TEMPLATE_FLOWSET;
- }
-
- @Override
- public int hashCode() {
- int hash = 7;
- hash = 53 * hash + this.flowSetId;
- hash = 53 * hash + this.length;
- hash = 53 * hash + Objects.hashCode(this.records);
- return hash;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null) {
- return false;
- }
- if (getClass() != obj.getClass()) {
- return false;
- }
- final TemplateFlowSet other = (TemplateFlowSet) obj;
- if (this.flowSetId != other.flowSetId) {
- return false;
- }
- if (this.length != other.length) {
- return false;
- }
- return Objects.equals(this.records, other.records);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("flowSetId", flowSetId)
- .add("length", length)
- .add("records", records)
- .toString();
- }
-
- /**
- * Data deserializer function for template flow set.
- *
- * @return data deserializer function
- */
- public static Deserializer<TemplateFlowSet> deserializer() {
- return (data, offset, length) -> {
-
- Predicate<ByteBuffer> isValidBuffer = b -> b.remaining() < FlowSet.FIELD_LENTH;
-
- ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
- if (isValidBuffer.test(bb)) {
- throw new DeserializationException("Invalid buffer size");
- }
- Builder builder = new Builder()
- .flowSetId(bb.getShort())
- .length(bb.getShort());
- while (bb.hasRemaining()) {
- if (isValidBuffer.test(bb)) {
- break;
- }
- int templateId = bb.getShort();
- int fieldCount = bb.getShort();
- int bufferLength = (fieldCount * FlowSet.FIELD_LENTH) + FlowSet.FIELD_LENTH;
- byte[] record = new byte[bufferLength];
- bb.position(bb.position() - FlowSet.FIELD_LENTH);
- if (bb.remaining() < bufferLength) {
- break;
- }
- bb.get(record);
- builder.templateRecord(DataTemplateRecord.deserializer().deserialize(record, 0, bufferLength));
-
- }
- return builder.build();
- };
- }
-
- /**
- * Builder for template flow set.
- */
- private static class Builder {
-
- private int flowSetId;
-
- private int length;
-
- private List<DataTemplateRecord> records = new LinkedList<>();
-
- /**
- * Setter for template flow set id.
- *
- * @param flowSetId template flow set id.
- * @return this class builder.
- */
- public Builder flowSetId(int flowSetId) {
- this.flowSetId = flowSetId;
- return this;
- }
-
- /**
- * Setter for total length of this flowSet.
- *
- * @param length total length of this flowSet.
- * @return this class builder.
- */
- public Builder length(int length) {
- this.length = length;
- return this;
- }
-
- /**
- * Setter for list of flow records.
- *
- * @param records list of flow records.
- * @return this class builder.
- */
- public Builder templateRecords(List<DataTemplateRecord> records) {
- this.records = records;
- return this;
- }
-
- /**
- * Setter for flow records.
- *
- * @param record flow records.
- * @return this class builder.
- */
- public Builder templateRecord(DataTemplateRecord record) {
- this.records.add(record);
- return this;
- }
-
- /**
- * Checks arguments for template flow set.
- */
- private void checkArguments() {
- checkState(flowSetId == 0, "Invalid flow set id.");
- checkState(length == 0, "Invalid flow set length.");
-
- }
-
- /**
- * Builds template flow set.
- *
- * @return template flow set.
- */
- public TemplateFlowSet build() {
- checkArguments();
- return new TemplateFlowSet(this);
- }
-
- }
-
-}
diff --git a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java b/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java
deleted file mode 100644
index db66292..0000000
--- a/apps/netflow/app/src/main/java/org/onosproject/netflow/impl/TemplateRecord.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Copyright 2023-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.netflow.impl;
-
-import org.onlab.packet.BasePacket;
-
-import org.onosproject.netflow.FlowRecord;
-/**
- * A Template Record defines the structure and interpretation of fields.
- * in a Flow Data Record.
- */
-public abstract class TemplateRecord extends BasePacket implements FlowRecord {
-
- @Override
- public byte[] serialize() {
- throw new UnsupportedOperationException("Not supported yet.");
- }
-}