Fix files with windows line endings + Add checkstyle rule to catch this issue
Change-Id: Ic1905f2121c5c2ab66259f7f531c1e36fe58e9d4
diff --git a/core/api/src/main/java/org/onosproject/net/flow/DefaultTypedFlowEntry.java b/core/api/src/main/java/org/onosproject/net/flow/DefaultTypedFlowEntry.java
index afceb14..43a32de 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/DefaultTypedFlowEntry.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/DefaultTypedFlowEntry.java
@@ -1,122 +1,122 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.net.flow;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-
-/**
- * Default flow entry class with FlowLiveType value, IMMEDIATE_FLOW, SHORT_FLOW, MID_FLOW, LONG_FLOW.
- */
-public class DefaultTypedFlowEntry extends DefaultFlowEntry
- implements TypedStoredFlowEntry {
- private FlowLiveType liveType;
-
- /**
- * Creates a typed flow entry from flow rule and its statistics, with default flow live type(IMMEDIATE_FLOW).
- *
- * @param rule the flow rule
- * @param state the flow state
- * @param life the flow duration since creation
- * @param packets the flow packets count
- * @param bytes the flow bytes count
- *
- */
- public DefaultTypedFlowEntry(FlowRule rule, FlowEntryState state,
- long life, long packets, long bytes) {
- super(rule, state, life, packets, bytes);
- this.liveType = FlowLiveType.IMMEDIATE_FLOW;
- }
-
- /**
- * Creates a typed flow entry from flow rule, with default flow live type(IMMEDIATE_FLOW).
- *
- * @param rule the flow rule
- *
- */
- public DefaultTypedFlowEntry(FlowRule rule) {
- super(rule);
- this.liveType = FlowLiveType.IMMEDIATE_FLOW;
- }
-
- /**
- * Creates a typed flow entry from flow entry, with default flow live type(IMMEDIATE_FLOW).
- *
- * @param fe the flow entry
- *
- */
- public DefaultTypedFlowEntry(FlowEntry fe) {
- super(fe, fe.state(), fe.life(), fe.packets(), fe.bytes());
- this.liveType = FlowLiveType.IMMEDIATE_FLOW;
- }
-
- /**
- * Creates a typed flow entry from flow rule and flow live type.
- *
- * @param rule the flow rule
- * @param liveType the flow live type
- *
- */
- public DefaultTypedFlowEntry(FlowRule rule, FlowLiveType liveType) {
- super(rule);
- this.liveType = liveType;
- }
-
- /**
- * Creates a typed flow entry from flow entry and flow live type.
- *
- * @param fe the flow rule
- * @param liveType the flow live type
- *
- */
- public DefaultTypedFlowEntry(FlowEntry fe, FlowLiveType liveType) {
- super(fe, fe.state(), fe.life(), fe.packets(), fe.bytes());
- this.liveType = liveType;
- }
-
- /**
- * Creates a typed flow entry from flow rule, error code and flow live type.
- *
- * @param rule the flow rule
- * @param errType the flow error type
- * @param errCode the flow error code
- * @param liveType the flow live type
- *
- */
- public DefaultTypedFlowEntry(FlowRule rule, int errType, int errCode, FlowLiveType liveType) {
- super(rule, errType, errCode);
- this.liveType = liveType;
- }
-
- @Override
- public FlowLiveType flowLiveType() {
- return this.liveType;
- }
-
- @Override
- public void setFlowLiveType(FlowLiveType liveType) {
- this.liveType = liveType;
- }
-
- @Override
- public String toString() {
- return toStringHelper(this)
- .add("entry", super.toString())
- .add("type", liveType)
- .toString();
- }
-}
-
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.flow;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Default flow entry class with FlowLiveType value, IMMEDIATE_FLOW, SHORT_FLOW, MID_FLOW, LONG_FLOW.
+ */
+public class DefaultTypedFlowEntry extends DefaultFlowEntry
+ implements TypedStoredFlowEntry {
+ private FlowLiveType liveType;
+
+ /**
+ * Creates a typed flow entry from flow rule and its statistics, with default flow live type(IMMEDIATE_FLOW).
+ *
+ * @param rule the flow rule
+ * @param state the flow state
+ * @param life the flow duration since creation
+ * @param packets the flow packets count
+ * @param bytes the flow bytes count
+ *
+ */
+ public DefaultTypedFlowEntry(FlowRule rule, FlowEntryState state,
+ long life, long packets, long bytes) {
+ super(rule, state, life, packets, bytes);
+ this.liveType = FlowLiveType.IMMEDIATE_FLOW;
+ }
+
+ /**
+ * Creates a typed flow entry from flow rule, with default flow live type(IMMEDIATE_FLOW).
+ *
+ * @param rule the flow rule
+ *
+ */
+ public DefaultTypedFlowEntry(FlowRule rule) {
+ super(rule);
+ this.liveType = FlowLiveType.IMMEDIATE_FLOW;
+ }
+
+ /**
+ * Creates a typed flow entry from flow entry, with default flow live type(IMMEDIATE_FLOW).
+ *
+ * @param fe the flow entry
+ *
+ */
+ public DefaultTypedFlowEntry(FlowEntry fe) {
+ super(fe, fe.state(), fe.life(), fe.packets(), fe.bytes());
+ this.liveType = FlowLiveType.IMMEDIATE_FLOW;
+ }
+
+ /**
+ * Creates a typed flow entry from flow rule and flow live type.
+ *
+ * @param rule the flow rule
+ * @param liveType the flow live type
+ *
+ */
+ public DefaultTypedFlowEntry(FlowRule rule, FlowLiveType liveType) {
+ super(rule);
+ this.liveType = liveType;
+ }
+
+ /**
+ * Creates a typed flow entry from flow entry and flow live type.
+ *
+ * @param fe the flow rule
+ * @param liveType the flow live type
+ *
+ */
+ public DefaultTypedFlowEntry(FlowEntry fe, FlowLiveType liveType) {
+ super(fe, fe.state(), fe.life(), fe.packets(), fe.bytes());
+ this.liveType = liveType;
+ }
+
+ /**
+ * Creates a typed flow entry from flow rule, error code and flow live type.
+ *
+ * @param rule the flow rule
+ * @param errType the flow error type
+ * @param errCode the flow error code
+ * @param liveType the flow live type
+ *
+ */
+ public DefaultTypedFlowEntry(FlowRule rule, int errType, int errCode, FlowLiveType liveType) {
+ super(rule, errType, errCode);
+ this.liveType = liveType;
+ }
+
+ @Override
+ public FlowLiveType flowLiveType() {
+ return this.liveType;
+ }
+
+ @Override
+ public void setFlowLiveType(FlowLiveType liveType) {
+ this.liveType = liveType;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("entry", super.toString())
+ .add("type", liveType)
+ .toString();
+ }
+}
+
diff --git a/core/api/src/main/java/org/onosproject/net/flow/TypedStoredFlowEntry.java b/core/api/src/main/java/org/onosproject/net/flow/TypedStoredFlowEntry.java
index 965fd1f..8cdfae7 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/TypedStoredFlowEntry.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/TypedStoredFlowEntry.java
@@ -1,67 +1,67 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.net.flow;
-
-/**
- * Represents a flow live type for a given flow entry.
- */
-public interface TypedStoredFlowEntry extends StoredFlowEntry {
- enum FlowLiveType {
- /**
- * Indicates that this rule has been submitted for addition immediately.
- * Not necessarily collecting flow stats.
- */
- IMMEDIATE_FLOW,
-
- /**
- * Indicates that this rule has been submitted for a short time.
- * Necessarily collecting flow stats every calAndPollInterval.
- */
- SHORT_FLOW,
-
- /**
- * Indicates that this rule has been submitted for a mid time.
- * Necessarily collecting flow stats every midPollInterval.
- */
- MID_FLOW,
-
- /**
- * Indicates that this rule has been submitted for a long time.
- * Necessarily collecting flow stats every longPollInterval.
- */
- LONG_FLOW,
-
- /**
- * Indicates that this rule has been submitted for UNKNOWN or ERROR.
- * Not necessarily collecting flow stats.
- */
- UNKNOWN_FLOW
- }
-
- /**
- * Gets the flow live type for this entry.
- *
- * @return flow live type
- */
- FlowLiveType flowLiveType();
-
- /**
- * Sets the new flow live type for this entry.
- * @param liveType new flow live type.
- */
- void setFlowLiveType(FlowLiveType liveType);
-}
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.flow;
+
+/**
+ * Represents a flow live type for a given flow entry.
+ */
+public interface TypedStoredFlowEntry extends StoredFlowEntry {
+ enum FlowLiveType {
+ /**
+ * Indicates that this rule has been submitted for addition immediately.
+ * Not necessarily collecting flow stats.
+ */
+ IMMEDIATE_FLOW,
+
+ /**
+ * Indicates that this rule has been submitted for a short time.
+ * Necessarily collecting flow stats every calAndPollInterval.
+ */
+ SHORT_FLOW,
+
+ /**
+ * Indicates that this rule has been submitted for a mid time.
+ * Necessarily collecting flow stats every midPollInterval.
+ */
+ MID_FLOW,
+
+ /**
+ * Indicates that this rule has been submitted for a long time.
+ * Necessarily collecting flow stats every longPollInterval.
+ */
+ LONG_FLOW,
+
+ /**
+ * Indicates that this rule has been submitted for UNKNOWN or ERROR.
+ * Not necessarily collecting flow stats.
+ */
+ UNKNOWN_FLOW
+ }
+
+ /**
+ * Gets the flow live type for this entry.
+ *
+ * @return flow live type
+ */
+ FlowLiveType flowLiveType();
+
+ /**
+ * Sets the new flow live type for this entry.
+ * @param liveType new flow live type.
+ */
+ void setFlowLiveType(FlowLiveType liveType);
+}
diff --git a/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticService.java b/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticService.java
index 5216839..544364d 100644
--- a/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticService.java
+++ b/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticService.java
@@ -1,106 +1,106 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.net.statistic;
-
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.Device;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.flow.TypedStoredFlowEntry;
-import org.onosproject.net.flow.instructions.Instruction;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Service for obtaining individual flow statistic information about device and link in the system.
- * Basic statistics are obtained from the StatisticService
- */
-public interface FlowStatisticService {
-
- /**
- * Obtain the summary load list for the device with the given link.
- *
- * @param device the Device to query.
- * @return map of summary flow entry load
- */
- Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device);
-
- /**
- * Obtain the summary load for the device with the given link or port.
- *
- * @param device the Device to query.
- * @param pNumber the port number to query.
- * @return summary flow entry load
- */
- SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber);
-
- /**
- * Obtain the set of the flow type and load list for the device with the given link.
- *
- * @param device the Device to query.
- * @param liveType the FlowLiveType to filter, null means no filtering .
- * @param instType the InstructionType to filter, null means no filtering.
- * @return map of flow entry load
- */
- Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,
- TypedStoredFlowEntry.FlowLiveType liveType,
- Instruction.Type instType);
-
- /**
- * Obtain the flow type and load list for the device with the given link or port.
- *
- * @param device the Device to query.
- * @param pNumber the port number of the Device to query
- * @param liveType the FlowLiveType to filter, null means no filtering .
- * @param instType the InstructionType to filter, null means no filtering.
- * @return list of flow entry load
- */
- List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,
- TypedStoredFlowEntry.FlowLiveType liveType,
- Instruction.Type instType);
-
- /**
- * Obtain the set of the flow type and load topn list for the device with the given link.
- *
- * @param device the Device to query.
- * @param liveType the FlowLiveType to filter, null means no filtering .
- * @param instType the InstructionType to filter, null means no filtering.
- * @param topn the top number to filter, null means no filtering.
- * @return map of flow entry load
- */
- Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,
- TypedStoredFlowEntry.FlowLiveType liveType,
- Instruction.Type instType,
- int topn);
-
- /**
- * Obtain the flow type and load topn list for the device with the given link or port.
- *
- * @param device the Device to query.
- * @param pNumber the port number of the Device to query
- * @param liveType the FlowLiveType to filter, null means no filtering .
- * @param instType the InstructionType to filter, null means no filtering.
- * @param topn topn //FIXME what?
- * @return list of flow entry load
- */
- List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,
- TypedStoredFlowEntry.FlowLiveType liveType,
- Instruction.Type instType,
- int topn);
-}
-
-
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.statistic;
+
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.TypedStoredFlowEntry;
+import org.onosproject.net.flow.instructions.Instruction;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Service for obtaining individual flow statistic information about device and link in the system.
+ * Basic statistics are obtained from the StatisticService
+ */
+public interface FlowStatisticService {
+
+ /**
+ * Obtain the summary load list for the device with the given link.
+ *
+ * @param device the Device to query.
+ * @return map of summary flow entry load
+ */
+ Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device);
+
+ /**
+ * Obtain the summary load for the device with the given link or port.
+ *
+ * @param device the Device to query.
+ * @param pNumber the port number to query.
+ * @return summary flow entry load
+ */
+ SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber);
+
+ /**
+ * Obtain the set of the flow type and load list for the device with the given link.
+ *
+ * @param device the Device to query.
+ * @param liveType the FlowLiveType to filter, null means no filtering .
+ * @param instType the InstructionType to filter, null means no filtering.
+ * @return map of flow entry load
+ */
+ Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,
+ TypedStoredFlowEntry.FlowLiveType liveType,
+ Instruction.Type instType);
+
+ /**
+ * Obtain the flow type and load list for the device with the given link or port.
+ *
+ * @param device the Device to query.
+ * @param pNumber the port number of the Device to query
+ * @param liveType the FlowLiveType to filter, null means no filtering .
+ * @param instType the InstructionType to filter, null means no filtering.
+ * @return list of flow entry load
+ */
+ List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,
+ TypedStoredFlowEntry.FlowLiveType liveType,
+ Instruction.Type instType);
+
+ /**
+ * Obtain the set of the flow type and load topn list for the device with the given link.
+ *
+ * @param device the Device to query.
+ * @param liveType the FlowLiveType to filter, null means no filtering .
+ * @param instType the InstructionType to filter, null means no filtering.
+ * @param topn the top number to filter, null means no filtering.
+ * @return map of flow entry load
+ */
+ Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,
+ TypedStoredFlowEntry.FlowLiveType liveType,
+ Instruction.Type instType,
+ int topn);
+
+ /**
+ * Obtain the flow type and load topn list for the device with the given link or port.
+ *
+ * @param device the Device to query.
+ * @param pNumber the port number of the Device to query
+ * @param liveType the FlowLiveType to filter, null means no filtering .
+ * @param instType the InstructionType to filter, null means no filtering.
+ * @param topn topn //FIXME what?
+ * @return list of flow entry load
+ */
+ List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,
+ TypedStoredFlowEntry.FlowLiveType liveType,
+ Instruction.Type instType,
+ int topn);
+}
+
+
diff --git a/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticStore.java b/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticStore.java
index 3c2aa89..a27e167 100644
--- a/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticStore.java
+++ b/core/api/src/main/java/org/onosproject/net/statistic/FlowStatisticStore.java
@@ -1,65 +1,65 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.net.statistic;
-
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.flow.FlowEntry;
-import org.onosproject.net.flow.FlowRule;
-
-import java.util.Set;
-
-/**
- * Flow Store to house the computed statistics.
- */
-public interface FlowStatisticStore {
- /**
- * Remove entries associated with this rule.
- *
- * @param rule {@link org.onosproject.net.flow.FlowRule}
- */
- void removeFlowStatistic(FlowRule rule);
-
- /**
- * Adds a flow stats observation for a flow rule. The previous flow will be removed.
- *
- * @param rule a {@link org.onosproject.net.flow.FlowEntry}
- */
- void addFlowStatistic(FlowEntry rule);
-
- /**
- * Updates a stats observation for a flow rule. The old flow stats will be moved to previous stats.
- *
- * @param rule a {@link org.onosproject.net.flow.FlowEntry}
- */
- void updateFlowStatistic(FlowEntry rule);
-
- /**
- * Fetches the current observed flow stats values.
- *
- * @param connectPoint the port to fetch information for
- * @return set of current flow rules
- */
- Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint);
-
- /**
- * Fetches the current observed flow stats values.
- *
- * @param connectPoint the port to fetch information for
- * @return set of current values
- */
- Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint);
-}
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.statistic;
+
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+
+import java.util.Set;
+
+/**
+ * Flow Store to house the computed statistics.
+ */
+public interface FlowStatisticStore {
+ /**
+ * Remove entries associated with this rule.
+ *
+ * @param rule {@link org.onosproject.net.flow.FlowRule}
+ */
+ void removeFlowStatistic(FlowRule rule);
+
+ /**
+ * Adds a flow stats observation for a flow rule. The previous flow will be removed.
+ *
+ * @param rule a {@link org.onosproject.net.flow.FlowEntry}
+ */
+ void addFlowStatistic(FlowEntry rule);
+
+ /**
+ * Updates a stats observation for a flow rule. The old flow stats will be moved to previous stats.
+ *
+ * @param rule a {@link org.onosproject.net.flow.FlowEntry}
+ */
+ void updateFlowStatistic(FlowEntry rule);
+
+ /**
+ * Fetches the current observed flow stats values.
+ *
+ * @param connectPoint the port to fetch information for
+ * @return set of current flow rules
+ */
+ Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint);
+
+ /**
+ * Fetches the current observed flow stats values.
+ *
+ * @param connectPoint the port to fetch information for
+ * @return set of current values
+ */
+ Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint);
+}
diff --git a/core/api/src/main/java/org/onosproject/net/statistic/SummaryFlowEntryWithLoad.java b/core/api/src/main/java/org/onosproject/net/statistic/SummaryFlowEntryWithLoad.java
index 1ec427c..e98774c 100644
--- a/core/api/src/main/java/org/onosproject/net/statistic/SummaryFlowEntryWithLoad.java
+++ b/core/api/src/main/java/org/onosproject/net/statistic/SummaryFlowEntryWithLoad.java
@@ -1,157 +1,157 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.net.statistic;
-
-import org.onosproject.net.ConnectPoint;
-
-/**
- * Summary Load classified by flow live type.
- */
-public class SummaryFlowEntryWithLoad {
- private ConnectPoint cp;
- private Load totalLoad;
- private Load immediateLoad;
- private Load shortLoad;
- private Load midLoad;
- private Load longLoad;
- private Load unknownLoad;
-
- /**
- * Creates a new summary flow entry having load for the given connect point and total load.
- *
- * @param cp connect point
- * @param totalLoad total load
- */
- public SummaryFlowEntryWithLoad(ConnectPoint cp, Load totalLoad) {
- this.cp = cp;
- this.totalLoad = totalLoad;
- this.immediateLoad = new DefaultLoad();
- this.shortLoad = new DefaultLoad();
- this.midLoad = new DefaultLoad();
- this.longLoad = new DefaultLoad();
- this.unknownLoad = new DefaultLoad();
- }
-
- /**
- * Creates a new summary flow entry having load for the given connect point
- * and total, immediate, short, mid, and long load.
- *
- * @param cp connect point
- * @param totalLoad total load
- * @param immediateLoad immediate load
- * @param shortLoad short load
- * @param midLoad mid load
- * @param longLoad long load
- */
- public SummaryFlowEntryWithLoad(ConnectPoint cp,
- Load totalLoad, Load immediateLoad, Load shortLoad, Load midLoad, Load longLoad) {
- this.cp = cp;
- this.totalLoad = totalLoad;
- this.immediateLoad = immediateLoad;
- this.shortLoad = shortLoad;
- this.midLoad = midLoad;
- this.longLoad = longLoad;
- this.unknownLoad = new DefaultLoad();
- }
-
- /**
- * Creates a new summary flow entry having load for the given connect point
- * and total, immediate, short, mid, long, and unknown load.
- *
- * @param cp connect point
- * @param totalLoad total load
- * @param immediateLoad immediate load
- * @param shortLoad short load
- * @param midLoad mid load
- * @param longLoad long load
- * @param unknownLoad long load
- */
- public SummaryFlowEntryWithLoad(ConnectPoint cp,
- Load totalLoad, Load immediateLoad,
- Load shortLoad, Load midLoad, Load longLoad, Load unknownLoad) {
- this.cp = cp;
- this.totalLoad = totalLoad;
- this.immediateLoad = immediateLoad;
- this.shortLoad = shortLoad;
- this.midLoad = midLoad;
- this.longLoad = longLoad;
- this.unknownLoad = unknownLoad;
- }
-
- /**
- * Returns connect point.
- *
- * @return connect point
- */
- public ConnectPoint connectPoint() {
- return cp;
- }
-
- /**
- * Returns total load of connect point.
- *
- * @return total load
- */
- public Load totalLoad() {
- return totalLoad;
- }
-
- /**
- * Returns immediate load of connect point.
- *
- * @return immediate load
- */
- public Load immediateLoad() {
- return immediateLoad;
- }
-
- /**
- * Returns short load of connect point.
- *
- * @return short load
- */
- public Load shortLoad() {
- return shortLoad;
- }
-
- /**
- * Returns mid load of connect point.
- *
- * @return mid load
- */
- public Load midLoad() {
- return midLoad;
- }
-
- /**
- * Returns long load of connect point.
- *
- * @return long load
- */
- public Load longLoad() {
- return longLoad;
- }
-
- /**
- * Returns unknown load of connect point.
- *
- * @return unknown load
- */
- public Load unknownLoad() {
- return unknownLoad;
- }
-}
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.statistic;
+
+import org.onosproject.net.ConnectPoint;
+
+/**
+ * Summary Load classified by flow live type.
+ */
+public class SummaryFlowEntryWithLoad {
+ private ConnectPoint cp;
+ private Load totalLoad;
+ private Load immediateLoad;
+ private Load shortLoad;
+ private Load midLoad;
+ private Load longLoad;
+ private Load unknownLoad;
+
+ /**
+ * Creates a new summary flow entry having load for the given connect point and total load.
+ *
+ * @param cp connect point
+ * @param totalLoad total load
+ */
+ public SummaryFlowEntryWithLoad(ConnectPoint cp, Load totalLoad) {
+ this.cp = cp;
+ this.totalLoad = totalLoad;
+ this.immediateLoad = new DefaultLoad();
+ this.shortLoad = new DefaultLoad();
+ this.midLoad = new DefaultLoad();
+ this.longLoad = new DefaultLoad();
+ this.unknownLoad = new DefaultLoad();
+ }
+
+ /**
+ * Creates a new summary flow entry having load for the given connect point
+ * and total, immediate, short, mid, and long load.
+ *
+ * @param cp connect point
+ * @param totalLoad total load
+ * @param immediateLoad immediate load
+ * @param shortLoad short load
+ * @param midLoad mid load
+ * @param longLoad long load
+ */
+ public SummaryFlowEntryWithLoad(ConnectPoint cp,
+ Load totalLoad, Load immediateLoad, Load shortLoad, Load midLoad, Load longLoad) {
+ this.cp = cp;
+ this.totalLoad = totalLoad;
+ this.immediateLoad = immediateLoad;
+ this.shortLoad = shortLoad;
+ this.midLoad = midLoad;
+ this.longLoad = longLoad;
+ this.unknownLoad = new DefaultLoad();
+ }
+
+ /**
+ * Creates a new summary flow entry having load for the given connect point
+ * and total, immediate, short, mid, long, and unknown load.
+ *
+ * @param cp connect point
+ * @param totalLoad total load
+ * @param immediateLoad immediate load
+ * @param shortLoad short load
+ * @param midLoad mid load
+ * @param longLoad long load
+ * @param unknownLoad long load
+ */
+ public SummaryFlowEntryWithLoad(ConnectPoint cp,
+ Load totalLoad, Load immediateLoad,
+ Load shortLoad, Load midLoad, Load longLoad, Load unknownLoad) {
+ this.cp = cp;
+ this.totalLoad = totalLoad;
+ this.immediateLoad = immediateLoad;
+ this.shortLoad = shortLoad;
+ this.midLoad = midLoad;
+ this.longLoad = longLoad;
+ this.unknownLoad = unknownLoad;
+ }
+
+ /**
+ * Returns connect point.
+ *
+ * @return connect point
+ */
+ public ConnectPoint connectPoint() {
+ return cp;
+ }
+
+ /**
+ * Returns total load of connect point.
+ *
+ * @return total load
+ */
+ public Load totalLoad() {
+ return totalLoad;
+ }
+
+ /**
+ * Returns immediate load of connect point.
+ *
+ * @return immediate load
+ */
+ public Load immediateLoad() {
+ return immediateLoad;
+ }
+
+ /**
+ * Returns short load of connect point.
+ *
+ * @return short load
+ */
+ public Load shortLoad() {
+ return shortLoad;
+ }
+
+ /**
+ * Returns mid load of connect point.
+ *
+ * @return mid load
+ */
+ public Load midLoad() {
+ return midLoad;
+ }
+
+ /**
+ * Returns long load of connect point.
+ *
+ * @return long load
+ */
+ public Load longLoad() {
+ return longLoad;
+ }
+
+ /**
+ * Returns unknown load of connect point.
+ *
+ * @return unknown load
+ */
+ public Load unknownLoad() {
+ return unknownLoad;
+ }
+}
diff --git a/core/api/src/main/java/org/onosproject/net/statistic/TypedFlowEntryWithLoad.java b/core/api/src/main/java/org/onosproject/net/statistic/TypedFlowEntryWithLoad.java
index a4cbd7d..9d23ade 100644
--- a/core/api/src/main/java/org/onosproject/net/statistic/TypedFlowEntryWithLoad.java
+++ b/core/api/src/main/java/org/onosproject/net/statistic/TypedFlowEntryWithLoad.java
@@ -1,171 +1,171 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.net.statistic;
-
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.flow.FlowEntry;
-import org.onosproject.net.flow.TypedStoredFlowEntry;
-import org.onosproject.net.flow.DefaultTypedFlowEntry;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Load of flow entry of flow live type.
- */
-public class TypedFlowEntryWithLoad {
- private ConnectPoint cp;
- private TypedStoredFlowEntry tfe;
- private Load load;
-
- //TODO: make this variables class, and share with NewAdaptivceFlowStatsCollector class
- private static final int CAL_AND_POLL_INTERVAL = 5; // means SHORT_POLL_INTERVAL
- private static final int MID_POLL_INTERVAL = 10;
- private static final int LONG_POLL_INTERVAL = 15;
-
- /**
- * Creates a new typed flow entry with load.
- *
- * @param cp connect point
- * @param tfe typed flow entry
- * @param load load
- */
- public TypedFlowEntryWithLoad(ConnectPoint cp, TypedStoredFlowEntry tfe, Load load) {
- this.cp = cp;
- this.tfe = tfe;
- this.load = load;
- }
-
- /**
- * Creates a new typed flow entry with load.
- *
- * @param cp connect point
- * @param tfe typed flow entry
- */
- public TypedFlowEntryWithLoad(ConnectPoint cp, TypedStoredFlowEntry tfe) {
- this.cp = cp;
- this.tfe = tfe;
- this.load = new DefaultLoad(tfe.bytes(), 0, typedPollInterval(tfe));
- }
-
- /**
- * Creates a new typed flow entry with load.
- *
- * @param cp connect point
- * @param fe flow entry
- */
- public TypedFlowEntryWithLoad(ConnectPoint cp, FlowEntry fe) {
- this.cp = cp;
- this.tfe = newTypedStoredFlowEntry(fe);
- this.load = new DefaultLoad(fe.bytes(), 0, typedPollInterval(this.tfe));
- }
-
- public ConnectPoint connectPoint() {
- return cp;
- }
- public TypedStoredFlowEntry typedStoredFlowEntry() {
- return tfe;
- }
- public Load load() {
- return load;
- }
- public void setLoad(Load load) {
- this.load = load;
- }
-
- /**
- * Returns short polling interval.
- *
- * @return short poll interval
- */
- public static int shortPollInterval() {
- return CAL_AND_POLL_INTERVAL;
- }
-
- /**
- * Returns mid polling interval.
- *
- * @return mid poll interval
- */
- public static int midPollInterval() {
- return MID_POLL_INTERVAL;
- }
-
- /**
- * Returns long polling interval.
- *
- * @return long poll interval
- */
- public static int longPollInterval() {
- return LONG_POLL_INTERVAL;
- }
-
- /**
- * Returns average polling interval.
- *
- * @return average poll interval
- */
- public static int avgPollInterval() {
- return (CAL_AND_POLL_INTERVAL + MID_POLL_INTERVAL + LONG_POLL_INTERVAL) / 3;
- }
-
- /**
- * Returns current typed flow entry's polling interval.
- *
- * @param tfe typed flow entry
- * @return typed poll interval
- */
- public static long typedPollInterval(TypedStoredFlowEntry tfe) {
- checkNotNull(tfe, "TypedStoredFlowEntry cannot be null");
-
- switch (tfe.flowLiveType()) {
- case LONG_FLOW:
- return LONG_POLL_INTERVAL;
- case MID_FLOW:
- return MID_POLL_INTERVAL;
- case SHORT_FLOW:
- case IMMEDIATE_FLOW:
- default:
- return CAL_AND_POLL_INTERVAL;
- }
- }
-
- /**
- * Creates a new typed flow entry with the given flow entry fe.
- *
- * @param fe flow entry
- * @return new typed flow entry
- */
- public static TypedStoredFlowEntry newTypedStoredFlowEntry(FlowEntry fe) {
- if (fe == null) {
- return null;
- }
-
- long life = fe.life();
-
- if (life >= LONG_POLL_INTERVAL) {
- return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.LONG_FLOW);
- } else if (life >= MID_POLL_INTERVAL) {
- return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.MID_FLOW);
- } else if (life >= CAL_AND_POLL_INTERVAL) {
- return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW);
- } else if (life >= 0) {
- return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW);
- } else { // life < 0
- return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW);
- }
- }
-}
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.statistic;
+
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.TypedStoredFlowEntry;
+import org.onosproject.net.flow.DefaultTypedFlowEntry;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Load of flow entry of flow live type.
+ */
+public class TypedFlowEntryWithLoad {
+ private ConnectPoint cp;
+ private TypedStoredFlowEntry tfe;
+ private Load load;
+
+ //TODO: make this variables class, and share with NewAdaptivceFlowStatsCollector class
+ private static final int CAL_AND_POLL_INTERVAL = 5; // means SHORT_POLL_INTERVAL
+ private static final int MID_POLL_INTERVAL = 10;
+ private static final int LONG_POLL_INTERVAL = 15;
+
+ /**
+ * Creates a new typed flow entry with load.
+ *
+ * @param cp connect point
+ * @param tfe typed flow entry
+ * @param load load
+ */
+ public TypedFlowEntryWithLoad(ConnectPoint cp, TypedStoredFlowEntry tfe, Load load) {
+ this.cp = cp;
+ this.tfe = tfe;
+ this.load = load;
+ }
+
+ /**
+ * Creates a new typed flow entry with load.
+ *
+ * @param cp connect point
+ * @param tfe typed flow entry
+ */
+ public TypedFlowEntryWithLoad(ConnectPoint cp, TypedStoredFlowEntry tfe) {
+ this.cp = cp;
+ this.tfe = tfe;
+ this.load = new DefaultLoad(tfe.bytes(), 0, typedPollInterval(tfe));
+ }
+
+ /**
+ * Creates a new typed flow entry with load.
+ *
+ * @param cp connect point
+ * @param fe flow entry
+ */
+ public TypedFlowEntryWithLoad(ConnectPoint cp, FlowEntry fe) {
+ this.cp = cp;
+ this.tfe = newTypedStoredFlowEntry(fe);
+ this.load = new DefaultLoad(fe.bytes(), 0, typedPollInterval(this.tfe));
+ }
+
+ public ConnectPoint connectPoint() {
+ return cp;
+ }
+ public TypedStoredFlowEntry typedStoredFlowEntry() {
+ return tfe;
+ }
+ public Load load() {
+ return load;
+ }
+ public void setLoad(Load load) {
+ this.load = load;
+ }
+
+ /**
+ * Returns short polling interval.
+ *
+ * @return short poll interval
+ */
+ public static int shortPollInterval() {
+ return CAL_AND_POLL_INTERVAL;
+ }
+
+ /**
+ * Returns mid polling interval.
+ *
+ * @return mid poll interval
+ */
+ public static int midPollInterval() {
+ return MID_POLL_INTERVAL;
+ }
+
+ /**
+ * Returns long polling interval.
+ *
+ * @return long poll interval
+ */
+ public static int longPollInterval() {
+ return LONG_POLL_INTERVAL;
+ }
+
+ /**
+ * Returns average polling interval.
+ *
+ * @return average poll interval
+ */
+ public static int avgPollInterval() {
+ return (CAL_AND_POLL_INTERVAL + MID_POLL_INTERVAL + LONG_POLL_INTERVAL) / 3;
+ }
+
+ /**
+ * Returns current typed flow entry's polling interval.
+ *
+ * @param tfe typed flow entry
+ * @return typed poll interval
+ */
+ public static long typedPollInterval(TypedStoredFlowEntry tfe) {
+ checkNotNull(tfe, "TypedStoredFlowEntry cannot be null");
+
+ switch (tfe.flowLiveType()) {
+ case LONG_FLOW:
+ return LONG_POLL_INTERVAL;
+ case MID_FLOW:
+ return MID_POLL_INTERVAL;
+ case SHORT_FLOW:
+ case IMMEDIATE_FLOW:
+ default:
+ return CAL_AND_POLL_INTERVAL;
+ }
+ }
+
+ /**
+ * Creates a new typed flow entry with the given flow entry fe.
+ *
+ * @param fe flow entry
+ * @return new typed flow entry
+ */
+ public static TypedStoredFlowEntry newTypedStoredFlowEntry(FlowEntry fe) {
+ if (fe == null) {
+ return null;
+ }
+
+ long life = fe.life();
+
+ if (life >= LONG_POLL_INTERVAL) {
+ return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.LONG_FLOW);
+ } else if (life >= MID_POLL_INTERVAL) {
+ return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.MID_FLOW);
+ } else if (life >= CAL_AND_POLL_INTERVAL) {
+ return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW);
+ } else if (life >= 0) {
+ return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW);
+ } else { // life < 0
+ return new DefaultTypedFlowEntry(fe, TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW);
+ }
+ }
+}
diff --git a/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java b/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java
index f18c56d..75e8c00 100644
--- a/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java
+++ b/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java
@@ -1,634 +1,634 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.net.statistic.impl;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableSet;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onosproject.cli.Comparators;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.Device;
-import org.onosproject.net.Port;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flow.DefaultTypedFlowEntry;
-import org.onosproject.net.flow.FlowEntry;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.FlowRuleEvent;
-import org.onosproject.net.flow.FlowRuleListener;
-import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.flow.TypedStoredFlowEntry;
-import org.onosproject.net.flow.instructions.Instruction;
-import org.onosproject.net.statistic.DefaultLoad;
-import org.onosproject.net.statistic.FlowStatisticService;
-import org.onosproject.net.statistic.Load;
-import org.onosproject.net.statistic.FlowStatisticStore;
-import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;
-import org.onosproject.net.statistic.TypedFlowEntryWithLoad;
-
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.stream.Collectors;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.security.AppGuard.checkPermission;
-import static org.slf4j.LoggerFactory.getLogger;
-import static org.onosproject.security.AppPermission.Type.*;
-
-/**
- * Provides an implementation of the Flow Statistic Service.
- */
-@Component(immediate = true, enabled = true)
-@Service
-public class FlowStatisticManager implements FlowStatisticService {
- private final Logger log = getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected FlowRuleService flowRuleService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected FlowStatisticStore flowStatisticStore;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected DeviceService deviceService;
-
- private final InternalFlowRuleStatsListener frListener = new InternalFlowRuleStatsListener();
-
- @Activate
- public void activate() {
- flowRuleService.addListener(frListener);
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- flowRuleService.removeListener(frListener);
- log.info("Stopped");
- }
-
- @Override
- public Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device) {
- checkPermission(STATISTIC_READ);
-
- Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
-
- if (device == null) {
- return summaryLoad;
- }
-
- List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
-
- for (Port port : ports) {
- ConnectPoint cp = new ConnectPoint(device.id(), port.number());
- SummaryFlowEntryWithLoad sfe = loadSummaryPortInternal(cp);
- summaryLoad.put(cp, sfe);
- }
-
- return summaryLoad;
- }
-
- @Override
- public SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber) {
- checkPermission(STATISTIC_READ);
-
- ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
- return loadSummaryPortInternal(cp);
- }
-
- @Override
- public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,
- TypedStoredFlowEntry.FlowLiveType liveType,
- Instruction.Type instType) {
- checkPermission(STATISTIC_READ);
-
- Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
-
- if (device == null) {
- return allLoad;
- }
-
- List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
-
- for (Port port : ports) {
- ConnectPoint cp = new ConnectPoint(device.id(), port.number());
- List<TypedFlowEntryWithLoad> tfel = loadAllPortInternal(cp, liveType, instType);
- allLoad.put(cp, tfel);
- }
-
- return allLoad;
- }
-
- @Override
- public List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,
- TypedStoredFlowEntry.FlowLiveType liveType,
- Instruction.Type instType) {
- checkPermission(STATISTIC_READ);
-
- ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
- return loadAllPortInternal(cp, liveType, instType);
- }
-
- @Override
- public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,
- TypedStoredFlowEntry.FlowLiveType liveType,
- Instruction.Type instType,
- int topn) {
- checkPermission(STATISTIC_READ);
-
- Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
-
- if (device == null) {
- return allLoad;
- }
-
- List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
-
- for (Port port : ports) {
- ConnectPoint cp = new ConnectPoint(device.id(), port.number());
- List<TypedFlowEntryWithLoad> tfel = loadTopnPortInternal(cp, liveType, instType, topn);
- allLoad.put(cp, tfel);
- }
-
- return allLoad;
- }
-
- @Override
- public List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,
- TypedStoredFlowEntry.FlowLiveType liveType,
- Instruction.Type instType,
- int topn) {
- checkPermission(STATISTIC_READ);
-
- ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
- return loadTopnPortInternal(cp, liveType, instType, topn);
- }
-
- private SummaryFlowEntryWithLoad loadSummaryPortInternal(ConnectPoint cp) {
- checkPermission(STATISTIC_READ);
-
- Set<FlowEntry> currentStats;
- Set<FlowEntry> previousStats;
-
- TypedStatistics typedStatistics;
- synchronized (flowStatisticStore) {
- currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);
- if (currentStats == null) {
- return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
- }
- previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);
- if (previousStats == null) {
- return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
- }
- // copy to local flow entry
- typedStatistics = new TypedStatistics(currentStats, previousStats);
-
- // Check for validity of this stats data
- checkLoadValidity(currentStats, previousStats);
- }
-
- // current and previous set is not empty!
- Set<FlowEntry> currentSet = typedStatistics.current();
- Set<FlowEntry> previousSet = typedStatistics.previous();
- Load totalLoad = new DefaultLoad(aggregateBytesSet(currentSet), aggregateBytesSet(previousSet),
- TypedFlowEntryWithLoad.avgPollInterval());
-
- Map<FlowRule, TypedStoredFlowEntry> currentMap;
- Map<FlowRule, TypedStoredFlowEntry> previousMap;
-
- currentMap = typedStatistics.currentImmediate();
- previousMap = typedStatistics.previousImmediate();
- Load immediateLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
- TypedFlowEntryWithLoad.shortPollInterval());
-
- currentMap = typedStatistics.currentShort();
- previousMap = typedStatistics.previousShort();
- Load shortLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
- TypedFlowEntryWithLoad.shortPollInterval());
-
- currentMap = typedStatistics.currentMid();
- previousMap = typedStatistics.previousMid();
- Load midLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
- TypedFlowEntryWithLoad.midPollInterval());
-
- currentMap = typedStatistics.currentLong();
- previousMap = typedStatistics.previousLong();
- Load longLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
- TypedFlowEntryWithLoad.longPollInterval());
-
- currentMap = typedStatistics.currentUnknown();
- previousMap = typedStatistics.previousUnknown();
- Load unknownLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
- TypedFlowEntryWithLoad.avgPollInterval());
-
- return new SummaryFlowEntryWithLoad(cp, totalLoad, immediateLoad, shortLoad, midLoad, longLoad, unknownLoad);
- }
-
- private List<TypedFlowEntryWithLoad> loadAllPortInternal(ConnectPoint cp,
- TypedStoredFlowEntry.FlowLiveType liveType,
- Instruction.Type instType) {
- checkPermission(STATISTIC_READ);
-
- List<TypedFlowEntryWithLoad> retTfel = new ArrayList<>();
-
- Set<FlowEntry> currentStats;
- Set<FlowEntry> previousStats;
-
- TypedStatistics typedStatistics;
- synchronized (flowStatisticStore) {
- currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);
- if (currentStats == null) {
- return retTfel;
- }
- previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);
- if (previousStats == null) {
- return retTfel;
- }
- // copy to local flow entry set
- typedStatistics = new TypedStatistics(currentStats, previousStats);
-
- // Check for validity of this stats data
- checkLoadValidity(currentStats, previousStats);
- }
-
- // current and previous set is not empty!
- boolean isAllLiveType = (liveType == null ? true : false); // null is all live type
- boolean isAllInstType = (instType == null ? true : false); // null is all inst type
-
- Map<FlowRule, TypedStoredFlowEntry> currentMap;
- Map<FlowRule, TypedStoredFlowEntry> previousMap;
-
- if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW) {
- currentMap = typedStatistics.currentImmediate();
- previousMap = typedStatistics.previousImmediate();
-
- List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
- isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());
- if (fel.size() > 0) {
- retTfel.addAll(fel);
- }
- }
-
- if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW) {
- currentMap = typedStatistics.currentShort();
- previousMap = typedStatistics.previousShort();
-
- List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
- isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());
- if (fel.size() > 0) {
- retTfel.addAll(fel);
- }
- }
-
- if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.MID_FLOW) {
- currentMap = typedStatistics.currentMid();
- previousMap = typedStatistics.previousMid();
-
- List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
- isAllInstType, instType, TypedFlowEntryWithLoad.midPollInterval());
- if (fel.size() > 0) {
- retTfel.addAll(fel);
- }
- }
-
- if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.LONG_FLOW) {
- currentMap = typedStatistics.currentLong();
- previousMap = typedStatistics.previousLong();
-
- List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
- isAllInstType, instType, TypedFlowEntryWithLoad.longPollInterval());
- if (fel.size() > 0) {
- retTfel.addAll(fel);
- }
- }
-
- if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW) {
- currentMap = typedStatistics.currentUnknown();
- previousMap = typedStatistics.previousUnknown();
-
- List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
- isAllInstType, instType, TypedFlowEntryWithLoad.avgPollInterval());
- if (fel.size() > 0) {
- retTfel.addAll(fel);
- }
- }
-
- return retTfel;
- }
-
- private List<TypedFlowEntryWithLoad> typedFlowEntryLoadByInstInternal(ConnectPoint cp,
- Map<FlowRule, TypedStoredFlowEntry> currentMap,
- Map<FlowRule, TypedStoredFlowEntry> previousMap,
- boolean isAllInstType,
- Instruction.Type instType,
- int liveTypePollInterval) {
- List<TypedFlowEntryWithLoad> fel = new ArrayList<>();
-
- for (TypedStoredFlowEntry tfe : currentMap.values()) {
- if (isAllInstType ||
- tfe.treatment().allInstructions().stream().
- filter(i -> i.type() == instType).
- findAny().isPresent()) {
- long currentBytes = tfe.bytes();
- long previousBytes = previousMap.getOrDefault(tfe, new DefaultTypedFlowEntry((FlowRule) tfe)).bytes();
- Load fLoad = new DefaultLoad(currentBytes, previousBytes, liveTypePollInterval);
- fel.add(new TypedFlowEntryWithLoad(cp, tfe, fLoad));
- }
- }
-
- return fel;
- }
-
- private List<TypedFlowEntryWithLoad> loadTopnPortInternal(ConnectPoint cp,
- TypedStoredFlowEntry.FlowLiveType liveType,
- Instruction.Type instType,
- int topn) {
- List<TypedFlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);
-
- // Sort with descending order of load
- List<TypedFlowEntryWithLoad> tfel =
- fel.stream().sorted(Comparators.TYPEFLOWENTRY_WITHLOAD_COMPARATOR).
- limit(topn).collect(Collectors.toList());
-
- return tfel;
- }
-
- private long aggregateBytesSet(Set<FlowEntry> setFE) {
- return setFE.stream().mapToLong(FlowEntry::bytes).sum();
- }
-
- private long aggregateBytesMap(Map<FlowRule, TypedStoredFlowEntry> mapFE) {
- return mapFE.values().stream().mapToLong(FlowEntry::bytes).sum();
- }
-
- /**
- * Internal data class holding two set of typed flow entries.
- */
- private static class TypedStatistics {
- private final ImmutableSet<FlowEntry> currentAll;
- private final ImmutableSet<FlowEntry> previousAll;
-
- private final Map<FlowRule, TypedStoredFlowEntry> currentImmediate = new HashMap<>();
- private final Map<FlowRule, TypedStoredFlowEntry> previousImmediate = new HashMap<>();
-
- private final Map<FlowRule, TypedStoredFlowEntry> currentShort = new HashMap<>();
- private final Map<FlowRule, TypedStoredFlowEntry> previousShort = new HashMap<>();
-
- private final Map<FlowRule, TypedStoredFlowEntry> currentMid = new HashMap<>();
- private final Map<FlowRule, TypedStoredFlowEntry> previousMid = new HashMap<>();
-
- private final Map<FlowRule, TypedStoredFlowEntry> currentLong = new HashMap<>();
- private final Map<FlowRule, TypedStoredFlowEntry> previousLong = new HashMap<>();
-
- private final Map<FlowRule, TypedStoredFlowEntry> currentUnknown = new HashMap<>();
- private final Map<FlowRule, TypedStoredFlowEntry> previousUnknown = new HashMap<>();
-
- public TypedStatistics(Set<FlowEntry> current, Set<FlowEntry> previous) {
- this.currentAll = ImmutableSet.copyOf(checkNotNull(current));
- this.previousAll = ImmutableSet.copyOf(checkNotNull(previous));
-
- currentAll.forEach(fe -> {
- TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);
-
- switch (tfe.flowLiveType()) {
- case IMMEDIATE_FLOW:
- currentImmediate.put(fe, tfe);
- break;
- case SHORT_FLOW:
- currentShort.put(fe, tfe);
- break;
- case MID_FLOW:
- currentMid.put(fe, tfe);
- break;
- case LONG_FLOW:
- currentLong.put(fe, tfe);
- break;
- default:
- currentUnknown.put(fe, tfe);
- break;
- }
- });
-
- previousAll.forEach(fe -> {
- TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);
-
- switch (tfe.flowLiveType()) {
- case IMMEDIATE_FLOW:
- if (currentImmediate.containsKey(fe)) {
- previousImmediate.put(fe, tfe);
- } else if (currentShort.containsKey(fe)) {
- previousShort.put(fe, tfe);
- } else if (currentMid.containsKey(fe)) {
- previousMid.put(fe, tfe);
- } else if (currentLong.containsKey(fe)) {
- previousLong.put(fe, tfe);
- } else {
- previousUnknown.put(fe, tfe);
- }
- break;
- case SHORT_FLOW:
- if (currentShort.containsKey(fe)) {
- previousShort.put(fe, tfe);
- } else if (currentMid.containsKey(fe)) {
- previousMid.put(fe, tfe);
- } else if (currentLong.containsKey(fe)) {
- previousLong.put(fe, tfe);
- } else {
- previousUnknown.put(fe, tfe);
- }
- break;
- case MID_FLOW:
- if (currentMid.containsKey(fe)) {
- previousMid.put(fe, tfe);
- } else if (currentLong.containsKey(fe)) {
- previousLong.put(fe, tfe);
- } else {
- previousUnknown.put(fe, tfe);
- }
- break;
- case LONG_FLOW:
- if (currentLong.containsKey(fe)) {
- previousLong.put(fe, tfe);
- } else {
- previousUnknown.put(fe, tfe);
- }
- break;
- default:
- previousUnknown.put(fe, tfe);
- break;
- }
- });
- }
-
- /**
- * Returns flow entries as the current value.
- *
- * @return flow entries as the current value
- */
- public ImmutableSet<FlowEntry> current() {
- return currentAll;
- }
-
- /**
- * Returns flow entries as the previous value.
- *
- * @return flow entries as the previous value
- */
- public ImmutableSet<FlowEntry> previous() {
- return previousAll;
- }
-
- public Map<FlowRule, TypedStoredFlowEntry> currentImmediate() {
- return currentImmediate;
- }
- public Map<FlowRule, TypedStoredFlowEntry> previousImmediate() {
- return previousImmediate;
- }
- public Map<FlowRule, TypedStoredFlowEntry> currentShort() {
- return currentShort;
- }
- public Map<FlowRule, TypedStoredFlowEntry> previousShort() {
- return previousShort;
- }
- public Map<FlowRule, TypedStoredFlowEntry> currentMid() {
- return currentMid;
- }
- public Map<FlowRule, TypedStoredFlowEntry> previousMid() {
- return previousMid;
- }
- public Map<FlowRule, TypedStoredFlowEntry> currentLong() {
- return currentLong;
- }
- public Map<FlowRule, TypedStoredFlowEntry> previousLong() {
- return previousLong;
- }
- public Map<FlowRule, TypedStoredFlowEntry> currentUnknown() {
- return currentUnknown;
- }
- public Map<FlowRule, TypedStoredFlowEntry> previousUnknown() {
- return previousUnknown;
- }
-
- /**
- * Validates values are not empty.
- *
- * @return false if either of the sets is empty. Otherwise, true.
- */
- public boolean isValid() {
- return !(currentAll.isEmpty() || previousAll.isEmpty());
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(currentAll, previousAll);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (!(obj instanceof TypedStatistics)) {
- return false;
- }
- final TypedStatistics other = (TypedStatistics) obj;
- return Objects.equals(this.currentAll, other.currentAll) &&
- Objects.equals(this.previousAll, other.previousAll);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("current", currentAll)
- .add("previous", previousAll)
- .toString();
- }
- }
-
- private void checkLoadValidity(Set<FlowEntry> current, Set<FlowEntry> previous) {
- current.stream().forEach(c -> {
- FlowEntry f = previous.stream().filter(p -> c.equals(p)).
- findAny().orElse(null);
- if (f != null && c.bytes() < f.bytes()) {
- log.debug("FlowStatisticManager:checkLoadValidity():" +
- "Error: " + c + " :Previous bytes=" + f.bytes() +
- " is larger than current bytes=" + c.bytes() + " !!!");
- }
- });
-
- }
-
- /**
- * Creates a predicate that checks the instruction type of a flow entry is the same as
- * the specified instruction type.
- *
- * @param instType instruction type to be checked
- * @return predicate
- */
- private static Predicate<FlowEntry> hasInstructionType(Instruction.Type instType) {
- return new Predicate<FlowEntry>() {
- @Override
- public boolean apply(FlowEntry flowEntry) {
- List<Instruction> allInstructions = flowEntry.treatment().allInstructions();
-
- return allInstructions.stream().filter(i -> i.type() == instType).findAny().isPresent();
- }
- };
- }
-
- /**
- * Internal flow rule event listener for FlowStatisticManager.
- */
- private class InternalFlowRuleStatsListener implements FlowRuleListener {
-
- @Override
- public void event(FlowRuleEvent event) {
- FlowRule rule = event.subject();
- switch (event.type()) {
- case RULE_ADDED:
- if (rule instanceof FlowEntry) {
- flowStatisticStore.addFlowStatistic((FlowEntry) rule);
- }
- break;
- case RULE_UPDATED:
- flowStatisticStore.updateFlowStatistic((FlowEntry) rule);
- break;
- case RULE_ADD_REQUESTED:
- break;
- case RULE_REMOVE_REQUESTED:
- break;
- case RULE_REMOVED:
- flowStatisticStore.removeFlowStatistic(rule);
- break;
- default:
- log.warn("Unknown flow rule event {}", event);
- }
- }
- }
-}
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.statistic.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cli.Comparators;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTypedFlowEntry;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TypedStoredFlowEntry;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.statistic.DefaultLoad;
+import org.onosproject.net.statistic.FlowStatisticService;
+import org.onosproject.net.statistic.Load;
+import org.onosproject.net.statistic.FlowStatisticStore;
+import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;
+import org.onosproject.net.statistic.TypedFlowEntryWithLoad;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.security.AppPermission.Type.*;
+
+/**
+ * Provides an implementation of the Flow Statistic Service.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class FlowStatisticManager implements FlowStatisticService {
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowRuleService flowRuleService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected FlowStatisticStore flowStatisticStore;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ private final InternalFlowRuleStatsListener frListener = new InternalFlowRuleStatsListener();
+
+ @Activate
+ public void activate() {
+ flowRuleService.addListener(frListener);
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ flowRuleService.removeListener(frListener);
+ log.info("Stopped");
+ }
+
+ @Override
+ public Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device) {
+ checkPermission(STATISTIC_READ);
+
+ Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
+
+ if (device == null) {
+ return summaryLoad;
+ }
+
+ List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
+
+ for (Port port : ports) {
+ ConnectPoint cp = new ConnectPoint(device.id(), port.number());
+ SummaryFlowEntryWithLoad sfe = loadSummaryPortInternal(cp);
+ summaryLoad.put(cp, sfe);
+ }
+
+ return summaryLoad;
+ }
+
+ @Override
+ public SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber) {
+ checkPermission(STATISTIC_READ);
+
+ ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
+ return loadSummaryPortInternal(cp);
+ }
+
+ @Override
+ public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,
+ TypedStoredFlowEntry.FlowLiveType liveType,
+ Instruction.Type instType) {
+ checkPermission(STATISTIC_READ);
+
+ Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
+
+ if (device == null) {
+ return allLoad;
+ }
+
+ List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
+
+ for (Port port : ports) {
+ ConnectPoint cp = new ConnectPoint(device.id(), port.number());
+ List<TypedFlowEntryWithLoad> tfel = loadAllPortInternal(cp, liveType, instType);
+ allLoad.put(cp, tfel);
+ }
+
+ return allLoad;
+ }
+
+ @Override
+ public List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,
+ TypedStoredFlowEntry.FlowLiveType liveType,
+ Instruction.Type instType) {
+ checkPermission(STATISTIC_READ);
+
+ ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
+ return loadAllPortInternal(cp, liveType, instType);
+ }
+
+ @Override
+ public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,
+ TypedStoredFlowEntry.FlowLiveType liveType,
+ Instruction.Type instType,
+ int topn) {
+ checkPermission(STATISTIC_READ);
+
+ Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
+
+ if (device == null) {
+ return allLoad;
+ }
+
+ List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
+
+ for (Port port : ports) {
+ ConnectPoint cp = new ConnectPoint(device.id(), port.number());
+ List<TypedFlowEntryWithLoad> tfel = loadTopnPortInternal(cp, liveType, instType, topn);
+ allLoad.put(cp, tfel);
+ }
+
+ return allLoad;
+ }
+
+ @Override
+ public List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,
+ TypedStoredFlowEntry.FlowLiveType liveType,
+ Instruction.Type instType,
+ int topn) {
+ checkPermission(STATISTIC_READ);
+
+ ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
+ return loadTopnPortInternal(cp, liveType, instType, topn);
+ }
+
+ private SummaryFlowEntryWithLoad loadSummaryPortInternal(ConnectPoint cp) {
+ checkPermission(STATISTIC_READ);
+
+ Set<FlowEntry> currentStats;
+ Set<FlowEntry> previousStats;
+
+ TypedStatistics typedStatistics;
+ synchronized (flowStatisticStore) {
+ currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);
+ if (currentStats == null) {
+ return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
+ }
+ previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);
+ if (previousStats == null) {
+ return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
+ }
+ // copy to local flow entry
+ typedStatistics = new TypedStatistics(currentStats, previousStats);
+
+ // Check for validity of this stats data
+ checkLoadValidity(currentStats, previousStats);
+ }
+
+ // current and previous set is not empty!
+ Set<FlowEntry> currentSet = typedStatistics.current();
+ Set<FlowEntry> previousSet = typedStatistics.previous();
+ Load totalLoad = new DefaultLoad(aggregateBytesSet(currentSet), aggregateBytesSet(previousSet),
+ TypedFlowEntryWithLoad.avgPollInterval());
+
+ Map<FlowRule, TypedStoredFlowEntry> currentMap;
+ Map<FlowRule, TypedStoredFlowEntry> previousMap;
+
+ currentMap = typedStatistics.currentImmediate();
+ previousMap = typedStatistics.previousImmediate();
+ Load immediateLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
+ TypedFlowEntryWithLoad.shortPollInterval());
+
+ currentMap = typedStatistics.currentShort();
+ previousMap = typedStatistics.previousShort();
+ Load shortLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
+ TypedFlowEntryWithLoad.shortPollInterval());
+
+ currentMap = typedStatistics.currentMid();
+ previousMap = typedStatistics.previousMid();
+ Load midLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
+ TypedFlowEntryWithLoad.midPollInterval());
+
+ currentMap = typedStatistics.currentLong();
+ previousMap = typedStatistics.previousLong();
+ Load longLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
+ TypedFlowEntryWithLoad.longPollInterval());
+
+ currentMap = typedStatistics.currentUnknown();
+ previousMap = typedStatistics.previousUnknown();
+ Load unknownLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
+ TypedFlowEntryWithLoad.avgPollInterval());
+
+ return new SummaryFlowEntryWithLoad(cp, totalLoad, immediateLoad, shortLoad, midLoad, longLoad, unknownLoad);
+ }
+
+ private List<TypedFlowEntryWithLoad> loadAllPortInternal(ConnectPoint cp,
+ TypedStoredFlowEntry.FlowLiveType liveType,
+ Instruction.Type instType) {
+ checkPermission(STATISTIC_READ);
+
+ List<TypedFlowEntryWithLoad> retTfel = new ArrayList<>();
+
+ Set<FlowEntry> currentStats;
+ Set<FlowEntry> previousStats;
+
+ TypedStatistics typedStatistics;
+ synchronized (flowStatisticStore) {
+ currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);
+ if (currentStats == null) {
+ return retTfel;
+ }
+ previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);
+ if (previousStats == null) {
+ return retTfel;
+ }
+ // copy to local flow entry set
+ typedStatistics = new TypedStatistics(currentStats, previousStats);
+
+ // Check for validity of this stats data
+ checkLoadValidity(currentStats, previousStats);
+ }
+
+ // current and previous set is not empty!
+ boolean isAllLiveType = (liveType == null ? true : false); // null is all live type
+ boolean isAllInstType = (instType == null ? true : false); // null is all inst type
+
+ Map<FlowRule, TypedStoredFlowEntry> currentMap;
+ Map<FlowRule, TypedStoredFlowEntry> previousMap;
+
+ if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW) {
+ currentMap = typedStatistics.currentImmediate();
+ previousMap = typedStatistics.previousImmediate();
+
+ List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
+ isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());
+ if (fel.size() > 0) {
+ retTfel.addAll(fel);
+ }
+ }
+
+ if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW) {
+ currentMap = typedStatistics.currentShort();
+ previousMap = typedStatistics.previousShort();
+
+ List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
+ isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());
+ if (fel.size() > 0) {
+ retTfel.addAll(fel);
+ }
+ }
+
+ if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.MID_FLOW) {
+ currentMap = typedStatistics.currentMid();
+ previousMap = typedStatistics.previousMid();
+
+ List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
+ isAllInstType, instType, TypedFlowEntryWithLoad.midPollInterval());
+ if (fel.size() > 0) {
+ retTfel.addAll(fel);
+ }
+ }
+
+ if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.LONG_FLOW) {
+ currentMap = typedStatistics.currentLong();
+ previousMap = typedStatistics.previousLong();
+
+ List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
+ isAllInstType, instType, TypedFlowEntryWithLoad.longPollInterval());
+ if (fel.size() > 0) {
+ retTfel.addAll(fel);
+ }
+ }
+
+ if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW) {
+ currentMap = typedStatistics.currentUnknown();
+ previousMap = typedStatistics.previousUnknown();
+
+ List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
+ isAllInstType, instType, TypedFlowEntryWithLoad.avgPollInterval());
+ if (fel.size() > 0) {
+ retTfel.addAll(fel);
+ }
+ }
+
+ return retTfel;
+ }
+
+ private List<TypedFlowEntryWithLoad> typedFlowEntryLoadByInstInternal(ConnectPoint cp,
+ Map<FlowRule, TypedStoredFlowEntry> currentMap,
+ Map<FlowRule, TypedStoredFlowEntry> previousMap,
+ boolean isAllInstType,
+ Instruction.Type instType,
+ int liveTypePollInterval) {
+ List<TypedFlowEntryWithLoad> fel = new ArrayList<>();
+
+ for (TypedStoredFlowEntry tfe : currentMap.values()) {
+ if (isAllInstType ||
+ tfe.treatment().allInstructions().stream().
+ filter(i -> i.type() == instType).
+ findAny().isPresent()) {
+ long currentBytes = tfe.bytes();
+ long previousBytes = previousMap.getOrDefault(tfe, new DefaultTypedFlowEntry((FlowRule) tfe)).bytes();
+ Load fLoad = new DefaultLoad(currentBytes, previousBytes, liveTypePollInterval);
+ fel.add(new TypedFlowEntryWithLoad(cp, tfe, fLoad));
+ }
+ }
+
+ return fel;
+ }
+
+ private List<TypedFlowEntryWithLoad> loadTopnPortInternal(ConnectPoint cp,
+ TypedStoredFlowEntry.FlowLiveType liveType,
+ Instruction.Type instType,
+ int topn) {
+ List<TypedFlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);
+
+ // Sort with descending order of load
+ List<TypedFlowEntryWithLoad> tfel =
+ fel.stream().sorted(Comparators.TYPEFLOWENTRY_WITHLOAD_COMPARATOR).
+ limit(topn).collect(Collectors.toList());
+
+ return tfel;
+ }
+
+ private long aggregateBytesSet(Set<FlowEntry> setFE) {
+ return setFE.stream().mapToLong(FlowEntry::bytes).sum();
+ }
+
+ private long aggregateBytesMap(Map<FlowRule, TypedStoredFlowEntry> mapFE) {
+ return mapFE.values().stream().mapToLong(FlowEntry::bytes).sum();
+ }
+
+ /**
+ * Internal data class holding two set of typed flow entries.
+ */
+ private static class TypedStatistics {
+ private final ImmutableSet<FlowEntry> currentAll;
+ private final ImmutableSet<FlowEntry> previousAll;
+
+ private final Map<FlowRule, TypedStoredFlowEntry> currentImmediate = new HashMap<>();
+ private final Map<FlowRule, TypedStoredFlowEntry> previousImmediate = new HashMap<>();
+
+ private final Map<FlowRule, TypedStoredFlowEntry> currentShort = new HashMap<>();
+ private final Map<FlowRule, TypedStoredFlowEntry> previousShort = new HashMap<>();
+
+ private final Map<FlowRule, TypedStoredFlowEntry> currentMid = new HashMap<>();
+ private final Map<FlowRule, TypedStoredFlowEntry> previousMid = new HashMap<>();
+
+ private final Map<FlowRule, TypedStoredFlowEntry> currentLong = new HashMap<>();
+ private final Map<FlowRule, TypedStoredFlowEntry> previousLong = new HashMap<>();
+
+ private final Map<FlowRule, TypedStoredFlowEntry> currentUnknown = new HashMap<>();
+ private final Map<FlowRule, TypedStoredFlowEntry> previousUnknown = new HashMap<>();
+
+ public TypedStatistics(Set<FlowEntry> current, Set<FlowEntry> previous) {
+ this.currentAll = ImmutableSet.copyOf(checkNotNull(current));
+ this.previousAll = ImmutableSet.copyOf(checkNotNull(previous));
+
+ currentAll.forEach(fe -> {
+ TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);
+
+ switch (tfe.flowLiveType()) {
+ case IMMEDIATE_FLOW:
+ currentImmediate.put(fe, tfe);
+ break;
+ case SHORT_FLOW:
+ currentShort.put(fe, tfe);
+ break;
+ case MID_FLOW:
+ currentMid.put(fe, tfe);
+ break;
+ case LONG_FLOW:
+ currentLong.put(fe, tfe);
+ break;
+ default:
+ currentUnknown.put(fe, tfe);
+ break;
+ }
+ });
+
+ previousAll.forEach(fe -> {
+ TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);
+
+ switch (tfe.flowLiveType()) {
+ case IMMEDIATE_FLOW:
+ if (currentImmediate.containsKey(fe)) {
+ previousImmediate.put(fe, tfe);
+ } else if (currentShort.containsKey(fe)) {
+ previousShort.put(fe, tfe);
+ } else if (currentMid.containsKey(fe)) {
+ previousMid.put(fe, tfe);
+ } else if (currentLong.containsKey(fe)) {
+ previousLong.put(fe, tfe);
+ } else {
+ previousUnknown.put(fe, tfe);
+ }
+ break;
+ case SHORT_FLOW:
+ if (currentShort.containsKey(fe)) {
+ previousShort.put(fe, tfe);
+ } else if (currentMid.containsKey(fe)) {
+ previousMid.put(fe, tfe);
+ } else if (currentLong.containsKey(fe)) {
+ previousLong.put(fe, tfe);
+ } else {
+ previousUnknown.put(fe, tfe);
+ }
+ break;
+ case MID_FLOW:
+ if (currentMid.containsKey(fe)) {
+ previousMid.put(fe, tfe);
+ } else if (currentLong.containsKey(fe)) {
+ previousLong.put(fe, tfe);
+ } else {
+ previousUnknown.put(fe, tfe);
+ }
+ break;
+ case LONG_FLOW:
+ if (currentLong.containsKey(fe)) {
+ previousLong.put(fe, tfe);
+ } else {
+ previousUnknown.put(fe, tfe);
+ }
+ break;
+ default:
+ previousUnknown.put(fe, tfe);
+ break;
+ }
+ });
+ }
+
+ /**
+ * Returns flow entries as the current value.
+ *
+ * @return flow entries as the current value
+ */
+ public ImmutableSet<FlowEntry> current() {
+ return currentAll;
+ }
+
+ /**
+ * Returns flow entries as the previous value.
+ *
+ * @return flow entries as the previous value
+ */
+ public ImmutableSet<FlowEntry> previous() {
+ return previousAll;
+ }
+
+ public Map<FlowRule, TypedStoredFlowEntry> currentImmediate() {
+ return currentImmediate;
+ }
+ public Map<FlowRule, TypedStoredFlowEntry> previousImmediate() {
+ return previousImmediate;
+ }
+ public Map<FlowRule, TypedStoredFlowEntry> currentShort() {
+ return currentShort;
+ }
+ public Map<FlowRule, TypedStoredFlowEntry> previousShort() {
+ return previousShort;
+ }
+ public Map<FlowRule, TypedStoredFlowEntry> currentMid() {
+ return currentMid;
+ }
+ public Map<FlowRule, TypedStoredFlowEntry> previousMid() {
+ return previousMid;
+ }
+ public Map<FlowRule, TypedStoredFlowEntry> currentLong() {
+ return currentLong;
+ }
+ public Map<FlowRule, TypedStoredFlowEntry> previousLong() {
+ return previousLong;
+ }
+ public Map<FlowRule, TypedStoredFlowEntry> currentUnknown() {
+ return currentUnknown;
+ }
+ public Map<FlowRule, TypedStoredFlowEntry> previousUnknown() {
+ return previousUnknown;
+ }
+
+ /**
+ * Validates values are not empty.
+ *
+ * @return false if either of the sets is empty. Otherwise, true.
+ */
+ public boolean isValid() {
+ return !(currentAll.isEmpty() || previousAll.isEmpty());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(currentAll, previousAll);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof TypedStatistics)) {
+ return false;
+ }
+ final TypedStatistics other = (TypedStatistics) obj;
+ return Objects.equals(this.currentAll, other.currentAll) &&
+ Objects.equals(this.previousAll, other.previousAll);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("current", currentAll)
+ .add("previous", previousAll)
+ .toString();
+ }
+ }
+
+ private void checkLoadValidity(Set<FlowEntry> current, Set<FlowEntry> previous) {
+ current.stream().forEach(c -> {
+ FlowEntry f = previous.stream().filter(p -> c.equals(p)).
+ findAny().orElse(null);
+ if (f != null && c.bytes() < f.bytes()) {
+ log.debug("FlowStatisticManager:checkLoadValidity():" +
+ "Error: " + c + " :Previous bytes=" + f.bytes() +
+ " is larger than current bytes=" + c.bytes() + " !!!");
+ }
+ });
+
+ }
+
+ /**
+ * Creates a predicate that checks the instruction type of a flow entry is the same as
+ * the specified instruction type.
+ *
+ * @param instType instruction type to be checked
+ * @return predicate
+ */
+ private static Predicate<FlowEntry> hasInstructionType(Instruction.Type instType) {
+ return new Predicate<FlowEntry>() {
+ @Override
+ public boolean apply(FlowEntry flowEntry) {
+ List<Instruction> allInstructions = flowEntry.treatment().allInstructions();
+
+ return allInstructions.stream().filter(i -> i.type() == instType).findAny().isPresent();
+ }
+ };
+ }
+
+ /**
+ * Internal flow rule event listener for FlowStatisticManager.
+ */
+ private class InternalFlowRuleStatsListener implements FlowRuleListener {
+
+ @Override
+ public void event(FlowRuleEvent event) {
+ FlowRule rule = event.subject();
+ switch (event.type()) {
+ case RULE_ADDED:
+ if (rule instanceof FlowEntry) {
+ flowStatisticStore.addFlowStatistic((FlowEntry) rule);
+ }
+ break;
+ case RULE_UPDATED:
+ flowStatisticStore.updateFlowStatistic((FlowEntry) rule);
+ break;
+ case RULE_ADD_REQUESTED:
+ break;
+ case RULE_REMOVE_REQUESTED:
+ break;
+ case RULE_REMOVED:
+ flowStatisticStore.removeFlowStatistic(rule);
+ break;
+ default:
+ log.warn("Unknown flow rule event {}", event);
+ }
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
index 0cd4a83..cc14eba 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/statistic/impl/DistributedFlowStatisticStore.java
@@ -1,289 +1,289 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.statistic.impl;
-
-import com.google.common.base.Objects;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.mastership.MastershipService;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.flow.FlowEntry;
-import org.onosproject.net.flow.FlowRule;
-import org.onosproject.net.flow.instructions.Instruction;
-import org.onosproject.net.flow.instructions.Instructions;
-import org.onosproject.net.statistic.FlowStatisticStore;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.slf4j.Logger;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
-import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Maintains flow statistics using RPC calls to collect stats from remote instances
- * on demand.
- */
-@Component(immediate = true)
-@Service
-public class DistributedFlowStatisticStore implements FlowStatisticStore {
- private final Logger log = getLogger(getClass());
-
- // TODO: Make configurable.
- private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MastershipService mastershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterCommunicationService clusterCommunicator;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClusterService clusterService;
-
- private Map<ConnectPoint, Set<FlowEntry>> previous =
- new ConcurrentHashMap<>();
-
- private Map<ConnectPoint, Set<FlowEntry>> current =
- new ConcurrentHashMap<>();
-
- protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
- @Override
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
- // register this store specific classes here
- .build();
- }
- };
-
- private NodeId local;
- private ExecutorService messageHandlingExecutor;
-
- private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
-
- @Activate
- public void activate() {
- local = clusterService.getLocalNode().id();
-
- messageHandlingExecutor = Executors.newFixedThreadPool(
- MESSAGE_HANDLER_THREAD_POOL_SIZE,
- groupedThreads("onos/store/statistic", "message-handlers"));
-
- clusterCommunicator.addSubscriber(
- GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
- messageHandlingExecutor);
-
- clusterCommunicator.addSubscriber(
- GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
- messageHandlingExecutor);
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- clusterCommunicator.removeSubscriber(GET_PREVIOUS);
- clusterCommunicator.removeSubscriber(GET_CURRENT);
- messageHandlingExecutor.shutdown();
- log.info("Stopped");
- }
-
- @Override
- public synchronized void removeFlowStatistic(FlowRule rule) {
- ConnectPoint cp = buildConnectPoint(rule);
- if (cp == null) {
- return;
- }
-
- // remove this rule if present from current map
- current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
-
- // remove this on if present from previous map
- previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
- }
-
- @Override
- public synchronized void addFlowStatistic(FlowEntry rule) {
- ConnectPoint cp = buildConnectPoint(rule);
- if (cp == null) {
- return;
- }
-
- // create one if absent and add this rule
- current.putIfAbsent(cp, new HashSet<>());
- current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
-
- // remove previous one if present
- previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
- }
-
- public synchronized void updateFlowStatistic(FlowEntry rule) {
- ConnectPoint cp = buildConnectPoint(rule);
- if (cp == null) {
- return;
- }
-
- Set<FlowEntry> curr = current.get(cp);
- if (curr == null) {
- addFlowStatistic(rule);
- } else {
- Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
- findAny();
- if (f.isPresent() && rule.bytes() < f.get().bytes()) {
- log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
- " Invalid Flow Update! Will be removed!!" +
- " curr flowId=" + Long.toHexString(rule.id().value()) +
- ", prev flowId=" + Long.toHexString(f.get().id().value()) +
- ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
- ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
- ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
- // something is wrong! invalid flow entry, so delete it
- removeFlowStatistic(rule);
- return;
- }
- Set<FlowEntry> prev = previous.get(cp);
- if (prev == null) {
- prev = new HashSet<>();
- previous.put(cp, prev);
- }
-
- // previous one is exist
- if (f.isPresent()) {
- // remove old one and add new one
- prev.remove(rule);
- if (!prev.add(f.get())) {
- log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
- " flowId={}, add failed into previous.",
- Long.toHexString(rule.id().value()));
- }
- }
-
- // remove old one and add new one
- curr.remove(rule);
- if (!curr.add(rule)) {
- log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
- " flowId={}, add failed into current.",
- Long.toHexString(rule.id().value()));
- }
- }
- }
-
- @Override
- public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
- final DeviceId deviceId = connectPoint.deviceId();
-
- NodeId master = mastershipService.getMasterFor(deviceId);
- if (master == null) {
- log.warn("No master for {}", deviceId);
- return Collections.emptySet();
- }
-
- if (Objects.equal(local, master)) {
- return getCurrentStatisticInternal(connectPoint);
- } else {
- return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
- connectPoint,
- GET_CURRENT,
- SERIALIZER::encode,
- SERIALIZER::decode,
- master),
- STATISTIC_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptySet());
- }
- }
-
- private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
- return current.get(connectPoint);
- }
-
- @Override
- public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
- final DeviceId deviceId = connectPoint.deviceId();
-
- NodeId master = mastershipService.getMasterFor(deviceId);
- if (master == null) {
- log.warn("No master for {}", deviceId);
- return Collections.emptySet();
- }
-
- if (Objects.equal(local, master)) {
- return getPreviousStatisticInternal(connectPoint);
- } else {
- return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
- connectPoint,
- GET_PREVIOUS,
- SERIALIZER::encode,
- SERIALIZER::decode,
- master),
- STATISTIC_STORE_TIMEOUT_MILLIS,
- TimeUnit.MILLISECONDS,
- Collections.emptySet());
- }
- }
-
- private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
- return previous.get(connectPoint);
- }
-
- private ConnectPoint buildConnectPoint(FlowRule rule) {
- PortNumber port = getOutput(rule);
-
- if (port == null) {
- return null;
- }
- ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
- return cp;
- }
-
- private PortNumber getOutput(FlowRule rule) {
- for (Instruction i : rule.treatment().allInstructions()) {
- if (i.type() == Instruction.Type.OUTPUT) {
- Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
- return out.port();
- }
- if (i.type() == Instruction.Type.DROP) {
- return PortNumber.P0;
- }
- }
- return null;
- }
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.statistic.impl;
+
+import com.google.common.base.Objects;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.statistic.FlowStatisticStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_CURRENT;
+import static org.onosproject.store.statistic.impl.StatisticStoreMessageSubjects.GET_PREVIOUS;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Maintains flow statistics using RPC calls to collect stats from remote instances
+ * on demand.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedFlowStatisticStore implements FlowStatisticStore {
+ private final Logger log = getLogger(getClass());
+
+ // TODO: Make configurable.
+ private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 4;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ private Map<ConnectPoint, Set<FlowEntry>> previous =
+ new ConcurrentHashMap<>();
+
+ private Map<ConnectPoint, Set<FlowEntry>> current =
+ new ConcurrentHashMap<>();
+
+ protected static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
+ // register this store specific classes here
+ .build();
+ }
+ };
+
+ private NodeId local;
+ private ExecutorService messageHandlingExecutor;
+
+ private static final long STATISTIC_STORE_TIMEOUT_MILLIS = 3000;
+
+ @Activate
+ public void activate() {
+ local = clusterService.getLocalNode().id();
+
+ messageHandlingExecutor = Executors.newFixedThreadPool(
+ MESSAGE_HANDLER_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/statistic", "message-handlers"));
+
+ clusterCommunicator.addSubscriber(
+ GET_CURRENT, SERIALIZER::decode, this::getCurrentStatisticInternal, SERIALIZER::encode,
+ messageHandlingExecutor);
+
+ clusterCommunicator.addSubscriber(
+ GET_CURRENT, SERIALIZER::decode, this::getPreviousStatisticInternal, SERIALIZER::encode,
+ messageHandlingExecutor);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicator.removeSubscriber(GET_PREVIOUS);
+ clusterCommunicator.removeSubscriber(GET_CURRENT);
+ messageHandlingExecutor.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public synchronized void removeFlowStatistic(FlowRule rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ // remove this rule if present from current map
+ current.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+
+ // remove this on if present from previous map
+ previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+ }
+
+ @Override
+ public synchronized void addFlowStatistic(FlowEntry rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ // create one if absent and add this rule
+ current.putIfAbsent(cp, new HashSet<>());
+ current.computeIfPresent(cp, (c, e) -> { e.add(rule); return e; });
+
+ // remove previous one if present
+ previous.computeIfPresent(cp, (c, e) -> { e.remove(rule); return e; });
+ }
+
+ public synchronized void updateFlowStatistic(FlowEntry rule) {
+ ConnectPoint cp = buildConnectPoint(rule);
+ if (cp == null) {
+ return;
+ }
+
+ Set<FlowEntry> curr = current.get(cp);
+ if (curr == null) {
+ addFlowStatistic(rule);
+ } else {
+ Optional<FlowEntry> f = curr.stream().filter(c -> rule.equals(c)).
+ findAny();
+ if (f.isPresent() && rule.bytes() < f.get().bytes()) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " Invalid Flow Update! Will be removed!!" +
+ " curr flowId=" + Long.toHexString(rule.id().value()) +
+ ", prev flowId=" + Long.toHexString(f.get().id().value()) +
+ ", curr bytes=" + rule.bytes() + ", prev bytes=" + f.get().bytes() +
+ ", curr life=" + rule.life() + ", prev life=" + f.get().life() +
+ ", curr lastSeen=" + rule.lastSeen() + ", prev lastSeen=" + f.get().lastSeen());
+ // something is wrong! invalid flow entry, so delete it
+ removeFlowStatistic(rule);
+ return;
+ }
+ Set<FlowEntry> prev = previous.get(cp);
+ if (prev == null) {
+ prev = new HashSet<>();
+ previous.put(cp, prev);
+ }
+
+ // previous one is exist
+ if (f.isPresent()) {
+ // remove old one and add new one
+ prev.remove(rule);
+ if (!prev.add(f.get())) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " flowId={}, add failed into previous.",
+ Long.toHexString(rule.id().value()));
+ }
+ }
+
+ // remove old one and add new one
+ curr.remove(rule);
+ if (!curr.add(rule)) {
+ log.debug("DistributedFlowStatisticStore:updateFlowStatistic():" +
+ " flowId={}, add failed into current.",
+ Long.toHexString(rule.id().value()));
+ }
+ }
+ }
+
+ @Override
+ public Set<FlowEntry> getCurrentFlowStatistic(ConnectPoint connectPoint) {
+ final DeviceId deviceId = connectPoint.deviceId();
+
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
+ log.warn("No master for {}", deviceId);
+ return Collections.emptySet();
+ }
+
+ if (Objects.equal(local, master)) {
+ return getCurrentStatisticInternal(connectPoint);
+ } else {
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_CURRENT,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ master),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
+ }
+ }
+
+ private synchronized Set<FlowEntry> getCurrentStatisticInternal(ConnectPoint connectPoint) {
+ return current.get(connectPoint);
+ }
+
+ @Override
+ public Set<FlowEntry> getPreviousFlowStatistic(ConnectPoint connectPoint) {
+ final DeviceId deviceId = connectPoint.deviceId();
+
+ NodeId master = mastershipService.getMasterFor(deviceId);
+ if (master == null) {
+ log.warn("No master for {}", deviceId);
+ return Collections.emptySet();
+ }
+
+ if (Objects.equal(local, master)) {
+ return getPreviousStatisticInternal(connectPoint);
+ } else {
+ return Tools.futureGetOrElse(clusterCommunicator.sendAndReceive(
+ connectPoint,
+ GET_PREVIOUS,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ master),
+ STATISTIC_STORE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS,
+ Collections.emptySet());
+ }
+ }
+
+ private synchronized Set<FlowEntry> getPreviousStatisticInternal(ConnectPoint connectPoint) {
+ return previous.get(connectPoint);
+ }
+
+ private ConnectPoint buildConnectPoint(FlowRule rule) {
+ PortNumber port = getOutput(rule);
+
+ if (port == null) {
+ return null;
+ }
+ ConnectPoint cp = new ConnectPoint(rule.deviceId(), port);
+ return cp;
+ }
+
+ private PortNumber getOutput(FlowRule rule) {
+ for (Instruction i : rule.treatment().allInstructions()) {
+ if (i.type() == Instruction.Type.OUTPUT) {
+ Instructions.OutputInstruction out = (Instructions.OutputInstruction) i;
+ return out.port();
+ }
+ if (i.type() == Instruction.Type.DROP) {
+ return PortNumber.P0;
+ }
+ }
+ return null;
+ }
}
\ No newline at end of file