[Emu] openTAM: FlowStatisticManager, DistributedFlowStatisticStore, get-flow-stats CLI Implementation and NewAdaptiveFlowStatsCollector update and typo

 - GetFlowStatistics.java
   .Fixed function name typo: immediateLoad()
 - SummaryFlowEntryWithLoad.java
   .Added javadoc
 - TypedFlowEntryWithLoad.java
   .Added javadoc,
   .and replace checknotnull and throw NullPointerException in typedPollInterval() at line 104

Change-Id: I23d2eaf234d0affeb5f927275148d9165c66c774
diff --git a/core/net/pom.xml b/core/net/pom.xml
index 9ea0007..c5d3126 100644
--- a/core/net/pom.xml
+++ b/core/net/pom.xml
@@ -52,6 +52,20 @@
 
         <dependency>
             <groupId>org.onosproject</groupId>
+            <version>${project.version}</version>
+            <artifactId>onos-cli</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-cli</artifactId>
+            <version>${project.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
             <artifactId>onos-core-common</artifactId>
             <version>${project.version}</version>
             <classifier>tests</classifier>
diff --git a/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java b/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java
new file mode 100644
index 0000000..6515ef3
--- /dev/null
+++ b/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java
@@ -0,0 +1,634 @@
+/*

+ * Copyright 2015 Open Networking Laboratory

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package org.onosproject.net.statistic.impl;

+

+import com.google.common.base.MoreObjects;

+import com.google.common.base.Predicate;

+import com.google.common.collect.ImmutableSet;

+import org.apache.felix.scr.annotations.Activate;

+import org.apache.felix.scr.annotations.Component;

+import org.apache.felix.scr.annotations.Deactivate;

+import org.apache.felix.scr.annotations.Reference;

+import org.apache.felix.scr.annotations.ReferenceCardinality;

+import org.apache.felix.scr.annotations.Service;

+import org.onosproject.cli.Comparators;

+import org.onosproject.net.ConnectPoint;

+import org.onosproject.net.Device;

+import org.onosproject.net.Port;

+import org.onosproject.net.PortNumber;

+import org.onosproject.net.device.DeviceService;

+import org.onosproject.net.flow.DefaultTypedFlowEntry;

+import org.onosproject.net.flow.FlowEntry;

+import org.onosproject.net.flow.FlowRule;

+import org.onosproject.net.flow.FlowRuleEvent;

+import org.onosproject.net.flow.FlowRuleListener;

+import org.onosproject.net.flow.FlowRuleService;

+import org.onosproject.net.flow.TypedStoredFlowEntry;

+import org.onosproject.net.flow.instructions.Instruction;

+import org.onosproject.net.statistic.DefaultLoad;

+import org.onosproject.net.statistic.FlowStatisticService;

+import org.onosproject.net.statistic.Load;

+import org.onosproject.net.statistic.FlowStatisticStore;

+import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;

+import org.onosproject.net.statistic.TypedFlowEntryWithLoad;

+

+import org.slf4j.Logger;

+

+import java.util.ArrayList;

+import java.util.HashMap;

+import java.util.List;

+import java.util.Map;

+import java.util.Objects;

+import java.util.Set;

+import java.util.TreeMap;

+import java.util.stream.Collectors;

+

+import static com.google.common.base.Preconditions.checkNotNull;

+import static org.onosproject.security.AppGuard.checkPermission;

+import static org.slf4j.LoggerFactory.getLogger;

+import static org.onosproject.security.AppPermission.Type.*;

+

+/**

+ * Provides an implementation of the Flow Statistic Service.

+ */

+@Component(immediate = true, enabled = true)

+@Service

+public class FlowStatisticManager implements FlowStatisticService {

+    private final Logger log = getLogger(getClass());

+

+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

+    protected FlowRuleService flowRuleService;

+

+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

+    protected FlowStatisticStore flowStatisticStore;

+

+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

+    protected DeviceService deviceService;

+

+    private final InternalFlowRuleStatsListener frListener = new InternalFlowRuleStatsListener();

+

+    @Activate

+    public void activate() {

+        flowRuleService.addListener(frListener);

+        log.info("Started");

+    }

+

+    @Deactivate

+    public void deactivate() {

+        flowRuleService.removeListener(frListener);

+        log.info("Stopped");

+    }

+

+    @Override

+    public Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device) {

+        checkPermission(STATISTIC_READ);

+

+        Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

+

+        if (device == null) {

+            return summaryLoad;

+        }

+

+        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

+

+        for (Port port : ports) {

+            ConnectPoint cp = new ConnectPoint(device.id(), port.number());

+            SummaryFlowEntryWithLoad sfe = loadSummaryPortInternal(cp);

+            summaryLoad.put(cp, sfe);

+        }

+

+        return summaryLoad;

+    }

+

+    @Override

+    public SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber) {

+        checkPermission(STATISTIC_READ);

+

+        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);

+        return loadSummaryPortInternal(cp);

+    }

+

+    @Override

+    public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,

+                                                                  TypedStoredFlowEntry.FlowLiveType liveType,

+                                                                  Instruction.Type instType) {

+        checkPermission(STATISTIC_READ);

+

+        Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

+

+        if (device == null) {

+            return allLoad;

+        }

+

+        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

+

+        for (Port port : ports) {

+            ConnectPoint cp = new ConnectPoint(device.id(), port.number());

+            List<TypedFlowEntryWithLoad> tfel = loadAllPortInternal(cp, liveType, instType);

+            allLoad.put(cp, tfel);

+        }

+

+        return allLoad;

+    }

+

+    @Override

+    public List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,

+                                               TypedStoredFlowEntry.FlowLiveType liveType,

+                                               Instruction.Type instType) {

+        checkPermission(STATISTIC_READ);

+

+        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);

+        return loadAllPortInternal(cp, liveType, instType);

+    }

+

+    @Override

+    public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,

+                                                                   TypedStoredFlowEntry.FlowLiveType liveType,

+                                                                   Instruction.Type instType,

+                                                                   int topn) {

+        checkPermission(STATISTIC_READ);

+

+        Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

+

+        if (device == null) {

+            return allLoad;

+        }

+

+        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

+

+        for (Port port : ports) {

+            ConnectPoint cp = new ConnectPoint(device.id(), port.number());

+            List<TypedFlowEntryWithLoad> tfel = loadTopnPortInternal(cp, liveType, instType, topn);

+            allLoad.put(cp, tfel);

+        }

+

+        return allLoad;

+    }

+

+    @Override

+    public List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,

+                                                TypedStoredFlowEntry.FlowLiveType liveType,

+                                                Instruction.Type instType,

+                                                int topn) {

+        checkPermission(STATISTIC_READ);

+

+        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);

+        return loadTopnPortInternal(cp, liveType, instType, topn);

+    }

+

+    private SummaryFlowEntryWithLoad loadSummaryPortInternal(ConnectPoint cp) {

+        checkPermission(STATISTIC_READ);

+

+        Set<FlowEntry> currentStats;

+        Set<FlowEntry> previousStats;

+

+        TypedStatistics typedStatistics;

+        synchronized (flowStatisticStore) {

+             currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);

+            if (currentStats == null) {

+                return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());

+            }

+            previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);

+            if (previousStats == null) {

+                return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());

+            }

+            // copy to local flow entry

+            typedStatistics = new TypedStatistics(currentStats, previousStats);

+

+            // Check for validity of this stats data

+            checkLoadValidity(currentStats, previousStats);

+        }

+

+        // current and previous set is not empty!

+        Set<FlowEntry> currentSet = typedStatistics.current();

+        Set<FlowEntry> previousSet = typedStatistics.previous();

+        Load totalLoad = new DefaultLoad(aggregateBytesSet(currentSet), aggregateBytesSet(previousSet),

+                TypedFlowEntryWithLoad.avgPollInterval());

+

+        Map<FlowRule, TypedStoredFlowEntry> currentMap;

+        Map<FlowRule, TypedStoredFlowEntry> previousMap;

+

+        currentMap = typedStatistics.currentImmediate();

+        previousMap = typedStatistics.previousImmediate();

+        Load immediateLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

+                TypedFlowEntryWithLoad.shortPollInterval());

+

+        currentMap = typedStatistics.currentShort();

+        previousMap = typedStatistics.previousShort();

+        Load shortLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

+                TypedFlowEntryWithLoad.shortPollInterval());

+

+        currentMap = typedStatistics.currentMid();

+        previousMap = typedStatistics.previousMid();

+        Load midLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

+                TypedFlowEntryWithLoad.midPollInterval());

+

+        currentMap = typedStatistics.currentLong();

+        previousMap = typedStatistics.previousLong();

+        Load longLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

+                TypedFlowEntryWithLoad.longPollInterval());

+

+        currentMap = typedStatistics.currentUnknown();

+        previousMap = typedStatistics.previousUnknown();

+        Load unknownLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

+                TypedFlowEntryWithLoad.avgPollInterval());

+

+        return new SummaryFlowEntryWithLoad(cp, totalLoad, immediateLoad, shortLoad, midLoad, longLoad, unknownLoad);

+    }

+

+    private List<TypedFlowEntryWithLoad> loadAllPortInternal(ConnectPoint cp,

+                                                             TypedStoredFlowEntry.FlowLiveType liveType,

+                                                             Instruction.Type instType) {

+        checkPermission(STATISTIC_READ);

+

+        List<TypedFlowEntryWithLoad> retTFEL = new ArrayList<>();

+

+        Set<FlowEntry> currentStats;

+        Set<FlowEntry> previousStats;

+

+        TypedStatistics typedStatistics;

+        synchronized (flowStatisticStore) {

+            currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);

+            if (currentStats == null) {

+                return retTFEL;

+            }

+            previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);

+            if (previousStats == null) {

+                return retTFEL;

+            }

+            // copy to local flow entry set

+            typedStatistics = new TypedStatistics(currentStats, previousStats);

+

+            // Check for validity of this stats data

+            checkLoadValidity(currentStats, previousStats);

+        }

+

+        // current and previous set is not empty!

+        boolean isAllLiveType = (liveType == null ? true : false); // null is all live type

+        boolean isAllInstType = (instType == null ? true : false); // null is all inst type

+

+        Map<FlowRule, TypedStoredFlowEntry> currentMap;

+        Map<FlowRule, TypedStoredFlowEntry> previousMap;

+

+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW) {

+            currentMap = typedStatistics.currentImmediate();

+            previousMap = typedStatistics.previousImmediate();

+

+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

+                    isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());

+            if (fel.size() > 0) {

+                retTFEL.addAll(fel);

+            }

+        }

+

+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW) {

+            currentMap = typedStatistics.currentShort();

+            previousMap = typedStatistics.previousShort();

+

+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

+                    isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());

+            if (fel.size() > 0) {

+                retTFEL.addAll(fel);

+            }

+        }

+

+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.MID_FLOW) {

+            currentMap = typedStatistics.currentMid();

+            previousMap = typedStatistics.previousMid();

+

+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

+                    isAllInstType, instType, TypedFlowEntryWithLoad.midPollInterval());

+            if (fel.size() > 0) {

+                retTFEL.addAll(fel);

+            }

+        }

+

+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.LONG_FLOW) {

+            currentMap = typedStatistics.currentLong();

+            previousMap = typedStatistics.previousLong();

+

+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

+                    isAllInstType, instType, TypedFlowEntryWithLoad.longPollInterval());

+            if (fel.size() > 0) {

+                retTFEL.addAll(fel);

+            }

+        }

+

+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW) {

+            currentMap = typedStatistics.currentUnknown();

+            previousMap = typedStatistics.previousUnknown();

+

+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

+                    isAllInstType, instType, TypedFlowEntryWithLoad.avgPollInterval());

+            if (fel.size() > 0) {

+                retTFEL.addAll(fel);

+            }

+        }

+

+        return retTFEL;

+    }

+

+    private List<TypedFlowEntryWithLoad> typedFlowEntryLoadByInstInternal(ConnectPoint cp,

+                                                                      Map<FlowRule, TypedStoredFlowEntry> currentMap,

+                                                                      Map<FlowRule, TypedStoredFlowEntry> previousMap,

+                                                                      boolean isAllInstType,

+                                                                      Instruction.Type instType,

+                                                                      int liveTypePollInterval) {

+        List<TypedFlowEntryWithLoad> fel = new ArrayList<>();

+

+        for (TypedStoredFlowEntry tfe : currentMap.values()) {

+            if (isAllInstType ||

+                    tfe.treatment().allInstructions().stream().

+                            filter(i -> i.type() == instType).

+                            findAny().isPresent()) {

+                long currentBytes = tfe.bytes();

+                long previousBytes = previousMap.getOrDefault(tfe, new DefaultTypedFlowEntry((FlowRule) tfe)).bytes();

+                Load fLoad = new DefaultLoad(currentBytes, previousBytes, liveTypePollInterval);

+                fel.add(new TypedFlowEntryWithLoad(cp, tfe, fLoad));

+            }

+        }

+

+        return fel;

+    }

+

+    private List<TypedFlowEntryWithLoad> loadTopnPortInternal(ConnectPoint cp,

+                                                             TypedStoredFlowEntry.FlowLiveType liveType,

+                                                             Instruction.Type instType,

+                                                             int topn) {

+        List<TypedFlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);

+

+        // Sort with descending order of load

+        List<TypedFlowEntryWithLoad> tfel =

+                fel.stream().sorted(Comparators.TYPEFLOWENTRY_WITHLOAD_COMPARATOR).

+                        limit(topn).collect(Collectors.toList());

+

+        return tfel;

+    }

+

+    private long aggregateBytesSet(Set<FlowEntry> setFE) {

+        return setFE.stream().mapToLong(FlowEntry::bytes).sum();

+    }

+

+    private long aggregateBytesMap(Map<FlowRule, TypedStoredFlowEntry> mapFE) {

+        return mapFE.values().stream().mapToLong(FlowEntry::bytes).sum();

+    }

+

+    /**

+     * Internal data class holding two set of typed flow entries.

+     */

+    private static class TypedStatistics {

+        private final ImmutableSet<FlowEntry> currentAll;

+        private final ImmutableSet<FlowEntry> previousAll;

+

+        private final Map<FlowRule, TypedStoredFlowEntry> currentImmediate = new HashMap<>();

+        private final Map<FlowRule, TypedStoredFlowEntry> previousImmediate = new HashMap<>();

+

+        private final Map<FlowRule, TypedStoredFlowEntry> currentShort = new HashMap<>();

+        private final Map<FlowRule, TypedStoredFlowEntry> previousShort = new HashMap<>();

+

+        private final Map<FlowRule, TypedStoredFlowEntry> currentMid = new HashMap<>();

+        private final Map<FlowRule, TypedStoredFlowEntry> previousMid = new HashMap<>();

+

+        private final Map<FlowRule, TypedStoredFlowEntry> currentLong = new HashMap<>();

+        private final Map<FlowRule, TypedStoredFlowEntry> previousLong = new HashMap<>();

+

+        private final Map<FlowRule, TypedStoredFlowEntry> currentUnknown = new HashMap<>();

+        private final Map<FlowRule, TypedStoredFlowEntry> previousUnknown = new HashMap<>();

+

+        public TypedStatistics(Set<FlowEntry> current, Set<FlowEntry> previous) {

+            this.currentAll = ImmutableSet.copyOf(checkNotNull(current));

+            this.previousAll = ImmutableSet.copyOf(checkNotNull(previous));

+

+            currentAll.forEach(fe -> {

+                TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);

+

+                switch (tfe.flowLiveType()) {

+                    case IMMEDIATE_FLOW:

+                        currentImmediate.put(fe, tfe);

+                        break;

+                    case SHORT_FLOW:

+                        currentShort.put(fe, tfe);

+                        break;

+                    case MID_FLOW:

+                        currentMid.put(fe, tfe);

+                        break;

+                    case LONG_FLOW:

+                        currentLong.put(fe, tfe);

+                        break;

+                    default:

+                        currentUnknown.put(fe, tfe);

+                        break;

+                }

+            });

+

+            previousAll.forEach(fe -> {

+                TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);

+

+                switch (tfe.flowLiveType()) {

+                    case IMMEDIATE_FLOW:

+                        if (currentImmediate.containsKey(fe)) {

+                            previousImmediate.put(fe, tfe);

+                        } else if (currentShort.containsKey(fe)) {

+                            previousShort.put(fe, tfe);

+                        } else if (currentMid.containsKey(fe)) {

+                            previousMid.put(fe, tfe);

+                        } else if (currentLong.containsKey(fe)) {

+                            previousLong.put(fe, tfe);

+                        } else {

+                            previousUnknown.put(fe, tfe);

+                        }

+                        break;

+                    case SHORT_FLOW:

+                        if (currentShort.containsKey(fe)) {

+                            previousShort.put(fe, tfe);

+                        } else if (currentMid.containsKey(fe)) {

+                            previousMid.put(fe, tfe);

+                        } else if (currentLong.containsKey(fe)) {

+                            previousLong.put(fe, tfe);

+                        } else {

+                            previousUnknown.put(fe, tfe);

+                        }

+                        break;

+                    case MID_FLOW:

+                        if (currentMid.containsKey(fe)) {

+                            previousMid.put(fe, tfe);

+                        } else if (currentLong.containsKey(fe)) {

+                            previousLong.put(fe, tfe);

+                        } else {

+                            previousUnknown.put(fe, tfe);

+                        }

+                        break;

+                    case LONG_FLOW:

+                        if (currentLong.containsKey(fe)) {

+                            previousLong.put(fe, tfe);

+                        } else {

+                            previousUnknown.put(fe, tfe);

+                        }

+                        break;

+                    default:

+                        previousUnknown.put(fe, tfe);

+                        break;

+                }

+            });

+        }

+

+        /**

+         * Returns flow entries as the current value.

+         *

+         * @return flow entries as the current value

+         */

+        public ImmutableSet<FlowEntry> current() {

+            return currentAll;

+        }

+

+        /**

+         * Returns flow entries as the previous value.

+         *

+         * @return flow entries as the previous value

+         */

+        public ImmutableSet<FlowEntry> previous() {

+            return previousAll;

+        }

+

+        public Map<FlowRule, TypedStoredFlowEntry> currentImmediate() {

+            return currentImmediate;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> previousImmediate() {

+            return previousImmediate;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> currentShort() {

+            return currentShort;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> previousShort() {

+            return previousShort;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> currentMid() {

+            return currentMid;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> previousMid() {

+            return previousMid;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> currentLong() {

+            return currentLong;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> previousLong() {

+            return previousLong;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> currentUnknown() {

+            return currentUnknown;

+        }

+        public Map<FlowRule, TypedStoredFlowEntry> previousUnknown() {

+            return previousUnknown;

+        }

+

+        /**

+         * Validates values are not empty.

+         *

+         * @return false if either of the sets is empty. Otherwise, true.

+         */

+        public boolean isValid() {

+            return !(currentAll.isEmpty() || previousAll.isEmpty());

+        }

+

+        @Override

+        public int hashCode() {

+            return Objects.hash(currentAll, previousAll);

+        }

+

+        @Override

+        public boolean equals(Object obj) {

+            if (this == obj) {

+                return true;

+            }

+            if (!(obj instanceof TypedStatistics)) {

+                return false;

+            }

+            final TypedStatistics other = (TypedStatistics) obj;

+            return Objects.equals(this.currentAll, other.currentAll) &&

+                    Objects.equals(this.previousAll, other.previousAll);

+        }

+

+        @Override

+        public String toString() {

+            return MoreObjects.toStringHelper(this)

+                    .add("current", currentAll)

+                    .add("previous", previousAll)

+                    .toString();

+        }

+    }

+

+    private void checkLoadValidity(Set<FlowEntry> current, Set<FlowEntry> previous) {

+        current.stream().forEach(c -> {

+            FlowEntry f = previous.stream().filter(p -> c.equals(p)).

+                    findAny().orElse(null);

+            if (f != null && c.bytes() < f.bytes()) {

+                log.debug("FlowStatisticManager:checkLoadValidity():" +

+                        "Error: " + c + " :Previous bytes=" + f.bytes() +

+                        " is larger than current bytes=" + c.bytes() + " !!!");

+            }

+        });

+

+    }

+

+    /**

+     * Creates a predicate that checks the instruction type of a flow entry is the same as

+     * the specified instruction type.

+     *

+     * @param instType instruction type to be checked

+     * @return predicate

+     */

+    private static Predicate<FlowEntry> hasInstructionType(Instruction.Type instType) {

+        return new Predicate<FlowEntry>() {

+            @Override

+            public boolean apply(FlowEntry flowEntry) {

+                List<Instruction> allInstructions = flowEntry.treatment().allInstructions();

+

+                return allInstructions.stream().filter(i -> i.type() == instType).findAny().isPresent();

+            }

+        };

+    }

+

+    /**

+     * Internal flow rule event listener for FlowStatisticManager.

+     */

+    private class InternalFlowRuleStatsListener implements FlowRuleListener {

+

+        @Override

+        public void event(FlowRuleEvent event) {

+            FlowRule rule = event.subject();

+            switch (event.type()) {

+                case RULE_ADDED:

+                    if (rule instanceof FlowEntry) {

+                        flowStatisticStore.addFlowStatistic((FlowEntry) rule);

+                    }

+                    break;

+                case RULE_UPDATED:

+                    flowStatisticStore.updateFlowStatistic((FlowEntry) rule);

+                    break;

+                case RULE_ADD_REQUESTED:

+                    break;

+                case RULE_REMOVE_REQUESTED:

+                    break;

+                case RULE_REMOVED:

+                    flowStatisticStore.removeFlowStatistic(rule);

+                    break;

+                default:

+                    log.warn("Unknown flow rule event {}", event);

+            }

+        }

+    }

+}