Fix files with windows line endings + Add checkstyle rule to catch this issue

Change-Id: Ic1905f2121c5c2ab66259f7f531c1e36fe58e9d4
diff --git a/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java b/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java
index f18c56d..75e8c00 100644
--- a/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java
+++ b/core/net/src/main/java/org/onosproject/net/statistic/impl/FlowStatisticManager.java
@@ -1,634 +1,634 @@
-/*

- * Copyright 2015 Open Networking Laboratory

- *

- * Licensed under the Apache License, Version 2.0 (the "License");

- * you may not use this file except in compliance with the License.

- * You may obtain a copy of the License at

- *

- *     http://www.apache.org/licenses/LICENSE-2.0

- *

- * Unless required by applicable law or agreed to in writing, software

- * distributed under the License is distributed on an "AS IS" BASIS,

- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

- * See the License for the specific language governing permissions and

- * limitations under the License.

- */

-

-package org.onosproject.net.statistic.impl;

-

-import com.google.common.base.MoreObjects;

-import com.google.common.base.Predicate;

-import com.google.common.collect.ImmutableSet;

-import org.apache.felix.scr.annotations.Activate;

-import org.apache.felix.scr.annotations.Component;

-import org.apache.felix.scr.annotations.Deactivate;

-import org.apache.felix.scr.annotations.Reference;

-import org.apache.felix.scr.annotations.ReferenceCardinality;

-import org.apache.felix.scr.annotations.Service;

-import org.onosproject.cli.Comparators;

-import org.onosproject.net.ConnectPoint;

-import org.onosproject.net.Device;

-import org.onosproject.net.Port;

-import org.onosproject.net.PortNumber;

-import org.onosproject.net.device.DeviceService;

-import org.onosproject.net.flow.DefaultTypedFlowEntry;

-import org.onosproject.net.flow.FlowEntry;

-import org.onosproject.net.flow.FlowRule;

-import org.onosproject.net.flow.FlowRuleEvent;

-import org.onosproject.net.flow.FlowRuleListener;

-import org.onosproject.net.flow.FlowRuleService;

-import org.onosproject.net.flow.TypedStoredFlowEntry;

-import org.onosproject.net.flow.instructions.Instruction;

-import org.onosproject.net.statistic.DefaultLoad;

-import org.onosproject.net.statistic.FlowStatisticService;

-import org.onosproject.net.statistic.Load;

-import org.onosproject.net.statistic.FlowStatisticStore;

-import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;

-import org.onosproject.net.statistic.TypedFlowEntryWithLoad;

-

-import org.slf4j.Logger;

-

-import java.util.ArrayList;

-import java.util.HashMap;

-import java.util.List;

-import java.util.Map;

-import java.util.Objects;

-import java.util.Set;

-import java.util.TreeMap;

-import java.util.stream.Collectors;

-

-import static com.google.common.base.Preconditions.checkNotNull;

-import static org.onosproject.security.AppGuard.checkPermission;

-import static org.slf4j.LoggerFactory.getLogger;

-import static org.onosproject.security.AppPermission.Type.*;

-

-/**

- * Provides an implementation of the Flow Statistic Service.

- */

-@Component(immediate = true, enabled = true)

-@Service

-public class FlowStatisticManager implements FlowStatisticService {

-    private final Logger log = getLogger(getClass());

-

-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

-    protected FlowRuleService flowRuleService;

-

-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

-    protected FlowStatisticStore flowStatisticStore;

-

-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)

-    protected DeviceService deviceService;

-

-    private final InternalFlowRuleStatsListener frListener = new InternalFlowRuleStatsListener();

-

-    @Activate

-    public void activate() {

-        flowRuleService.addListener(frListener);

-        log.info("Started");

-    }

-

-    @Deactivate

-    public void deactivate() {

-        flowRuleService.removeListener(frListener);

-        log.info("Stopped");

-    }

-

-    @Override

-    public Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device) {

-        checkPermission(STATISTIC_READ);

-

-        Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

-

-        if (device == null) {

-            return summaryLoad;

-        }

-

-        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

-

-        for (Port port : ports) {

-            ConnectPoint cp = new ConnectPoint(device.id(), port.number());

-            SummaryFlowEntryWithLoad sfe = loadSummaryPortInternal(cp);

-            summaryLoad.put(cp, sfe);

-        }

-

-        return summaryLoad;

-    }

-

-    @Override

-    public SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber) {

-        checkPermission(STATISTIC_READ);

-

-        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);

-        return loadSummaryPortInternal(cp);

-    }

-

-    @Override

-    public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,

-                                                                  TypedStoredFlowEntry.FlowLiveType liveType,

-                                                                  Instruction.Type instType) {

-        checkPermission(STATISTIC_READ);

-

-        Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

-

-        if (device == null) {

-            return allLoad;

-        }

-

-        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

-

-        for (Port port : ports) {

-            ConnectPoint cp = new ConnectPoint(device.id(), port.number());

-            List<TypedFlowEntryWithLoad> tfel = loadAllPortInternal(cp, liveType, instType);

-            allLoad.put(cp, tfel);

-        }

-

-        return allLoad;

-    }

-

-    @Override

-    public List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,

-                                               TypedStoredFlowEntry.FlowLiveType liveType,

-                                               Instruction.Type instType) {

-        checkPermission(STATISTIC_READ);

-

-        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);

-        return loadAllPortInternal(cp, liveType, instType);

-    }

-

-    @Override

-    public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,

-                                                                   TypedStoredFlowEntry.FlowLiveType liveType,

-                                                                   Instruction.Type instType,

-                                                                   int topn) {

-        checkPermission(STATISTIC_READ);

-

-        Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

-

-        if (device == null) {

-            return allLoad;

-        }

-

-        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

-

-        for (Port port : ports) {

-            ConnectPoint cp = new ConnectPoint(device.id(), port.number());

-            List<TypedFlowEntryWithLoad> tfel = loadTopnPortInternal(cp, liveType, instType, topn);

-            allLoad.put(cp, tfel);

-        }

-

-        return allLoad;

-    }

-

-    @Override

-    public List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,

-                                                TypedStoredFlowEntry.FlowLiveType liveType,

-                                                Instruction.Type instType,

-                                                int topn) {

-        checkPermission(STATISTIC_READ);

-

-        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);

-        return loadTopnPortInternal(cp, liveType, instType, topn);

-    }

-

-    private SummaryFlowEntryWithLoad loadSummaryPortInternal(ConnectPoint cp) {

-        checkPermission(STATISTIC_READ);

-

-        Set<FlowEntry> currentStats;

-        Set<FlowEntry> previousStats;

-

-        TypedStatistics typedStatistics;

-        synchronized (flowStatisticStore) {

-             currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);

-            if (currentStats == null) {

-                return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());

-            }

-            previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);

-            if (previousStats == null) {

-                return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());

-            }

-            // copy to local flow entry

-            typedStatistics = new TypedStatistics(currentStats, previousStats);

-

-            // Check for validity of this stats data

-            checkLoadValidity(currentStats, previousStats);

-        }

-

-        // current and previous set is not empty!

-        Set<FlowEntry> currentSet = typedStatistics.current();

-        Set<FlowEntry> previousSet = typedStatistics.previous();

-        Load totalLoad = new DefaultLoad(aggregateBytesSet(currentSet), aggregateBytesSet(previousSet),

-                TypedFlowEntryWithLoad.avgPollInterval());

-

-        Map<FlowRule, TypedStoredFlowEntry> currentMap;

-        Map<FlowRule, TypedStoredFlowEntry> previousMap;

-

-        currentMap = typedStatistics.currentImmediate();

-        previousMap = typedStatistics.previousImmediate();

-        Load immediateLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

-                TypedFlowEntryWithLoad.shortPollInterval());

-

-        currentMap = typedStatistics.currentShort();

-        previousMap = typedStatistics.previousShort();

-        Load shortLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

-                TypedFlowEntryWithLoad.shortPollInterval());

-

-        currentMap = typedStatistics.currentMid();

-        previousMap = typedStatistics.previousMid();

-        Load midLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

-                TypedFlowEntryWithLoad.midPollInterval());

-

-        currentMap = typedStatistics.currentLong();

-        previousMap = typedStatistics.previousLong();

-        Load longLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

-                TypedFlowEntryWithLoad.longPollInterval());

-

-        currentMap = typedStatistics.currentUnknown();

-        previousMap = typedStatistics.previousUnknown();

-        Load unknownLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),

-                TypedFlowEntryWithLoad.avgPollInterval());

-

-        return new SummaryFlowEntryWithLoad(cp, totalLoad, immediateLoad, shortLoad, midLoad, longLoad, unknownLoad);

-    }

-

-    private List<TypedFlowEntryWithLoad> loadAllPortInternal(ConnectPoint cp,

-                                                             TypedStoredFlowEntry.FlowLiveType liveType,

-                                                             Instruction.Type instType) {

-        checkPermission(STATISTIC_READ);

-

-        List<TypedFlowEntryWithLoad> retTfel = new ArrayList<>();

-

-        Set<FlowEntry> currentStats;

-        Set<FlowEntry> previousStats;

-

-        TypedStatistics typedStatistics;

-        synchronized (flowStatisticStore) {

-            currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);

-            if (currentStats == null) {

-                return retTfel;

-            }

-            previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);

-            if (previousStats == null) {

-                return retTfel;

-            }

-            // copy to local flow entry set

-            typedStatistics = new TypedStatistics(currentStats, previousStats);

-

-            // Check for validity of this stats data

-            checkLoadValidity(currentStats, previousStats);

-        }

-

-        // current and previous set is not empty!

-        boolean isAllLiveType = (liveType == null ? true : false); // null is all live type

-        boolean isAllInstType = (instType == null ? true : false); // null is all inst type

-

-        Map<FlowRule, TypedStoredFlowEntry> currentMap;

-        Map<FlowRule, TypedStoredFlowEntry> previousMap;

-

-        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW) {

-            currentMap = typedStatistics.currentImmediate();

-            previousMap = typedStatistics.previousImmediate();

-

-            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

-                    isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());

-            if (fel.size() > 0) {

-                retTfel.addAll(fel);

-            }

-        }

-

-        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW) {

-            currentMap = typedStatistics.currentShort();

-            previousMap = typedStatistics.previousShort();

-

-            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

-                    isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());

-            if (fel.size() > 0) {

-                retTfel.addAll(fel);

-            }

-        }

-

-        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.MID_FLOW) {

-            currentMap = typedStatistics.currentMid();

-            previousMap = typedStatistics.previousMid();

-

-            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

-                    isAllInstType, instType, TypedFlowEntryWithLoad.midPollInterval());

-            if (fel.size() > 0) {

-                retTfel.addAll(fel);

-            }

-        }

-

-        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.LONG_FLOW) {

-            currentMap = typedStatistics.currentLong();

-            previousMap = typedStatistics.previousLong();

-

-            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

-                    isAllInstType, instType, TypedFlowEntryWithLoad.longPollInterval());

-            if (fel.size() > 0) {

-                retTfel.addAll(fel);

-            }

-        }

-

-        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW) {

-            currentMap = typedStatistics.currentUnknown();

-            previousMap = typedStatistics.previousUnknown();

-

-            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,

-                    isAllInstType, instType, TypedFlowEntryWithLoad.avgPollInterval());

-            if (fel.size() > 0) {

-                retTfel.addAll(fel);

-            }

-        }

-

-        return retTfel;

-    }

-

-    private List<TypedFlowEntryWithLoad> typedFlowEntryLoadByInstInternal(ConnectPoint cp,

-                                                                      Map<FlowRule, TypedStoredFlowEntry> currentMap,

-                                                                      Map<FlowRule, TypedStoredFlowEntry> previousMap,

-                                                                      boolean isAllInstType,

-                                                                      Instruction.Type instType,

-                                                                      int liveTypePollInterval) {

-        List<TypedFlowEntryWithLoad> fel = new ArrayList<>();

-

-        for (TypedStoredFlowEntry tfe : currentMap.values()) {

-            if (isAllInstType ||

-                    tfe.treatment().allInstructions().stream().

-                            filter(i -> i.type() == instType).

-                            findAny().isPresent()) {

-                long currentBytes = tfe.bytes();

-                long previousBytes = previousMap.getOrDefault(tfe, new DefaultTypedFlowEntry((FlowRule) tfe)).bytes();

-                Load fLoad = new DefaultLoad(currentBytes, previousBytes, liveTypePollInterval);

-                fel.add(new TypedFlowEntryWithLoad(cp, tfe, fLoad));

-            }

-        }

-

-        return fel;

-    }

-

-    private List<TypedFlowEntryWithLoad> loadTopnPortInternal(ConnectPoint cp,

-                                                             TypedStoredFlowEntry.FlowLiveType liveType,

-                                                             Instruction.Type instType,

-                                                             int topn) {

-        List<TypedFlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);

-

-        // Sort with descending order of load

-        List<TypedFlowEntryWithLoad> tfel =

-                fel.stream().sorted(Comparators.TYPEFLOWENTRY_WITHLOAD_COMPARATOR).

-                        limit(topn).collect(Collectors.toList());

-

-        return tfel;

-    }

-

-    private long aggregateBytesSet(Set<FlowEntry> setFE) {

-        return setFE.stream().mapToLong(FlowEntry::bytes).sum();

-    }

-

-    private long aggregateBytesMap(Map<FlowRule, TypedStoredFlowEntry> mapFE) {

-        return mapFE.values().stream().mapToLong(FlowEntry::bytes).sum();

-    }

-

-    /**

-     * Internal data class holding two set of typed flow entries.

-     */

-    private static class TypedStatistics {

-        private final ImmutableSet<FlowEntry> currentAll;

-        private final ImmutableSet<FlowEntry> previousAll;

-

-        private final Map<FlowRule, TypedStoredFlowEntry> currentImmediate = new HashMap<>();

-        private final Map<FlowRule, TypedStoredFlowEntry> previousImmediate = new HashMap<>();

-

-        private final Map<FlowRule, TypedStoredFlowEntry> currentShort = new HashMap<>();

-        private final Map<FlowRule, TypedStoredFlowEntry> previousShort = new HashMap<>();

-

-        private final Map<FlowRule, TypedStoredFlowEntry> currentMid = new HashMap<>();

-        private final Map<FlowRule, TypedStoredFlowEntry> previousMid = new HashMap<>();

-

-        private final Map<FlowRule, TypedStoredFlowEntry> currentLong = new HashMap<>();

-        private final Map<FlowRule, TypedStoredFlowEntry> previousLong = new HashMap<>();

-

-        private final Map<FlowRule, TypedStoredFlowEntry> currentUnknown = new HashMap<>();

-        private final Map<FlowRule, TypedStoredFlowEntry> previousUnknown = new HashMap<>();

-

-        public TypedStatistics(Set<FlowEntry> current, Set<FlowEntry> previous) {

-            this.currentAll = ImmutableSet.copyOf(checkNotNull(current));

-            this.previousAll = ImmutableSet.copyOf(checkNotNull(previous));

-

-            currentAll.forEach(fe -> {

-                TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);

-

-                switch (tfe.flowLiveType()) {

-                    case IMMEDIATE_FLOW:

-                        currentImmediate.put(fe, tfe);

-                        break;

-                    case SHORT_FLOW:

-                        currentShort.put(fe, tfe);

-                        break;

-                    case MID_FLOW:

-                        currentMid.put(fe, tfe);

-                        break;

-                    case LONG_FLOW:

-                        currentLong.put(fe, tfe);

-                        break;

-                    default:

-                        currentUnknown.put(fe, tfe);

-                        break;

-                }

-            });

-

-            previousAll.forEach(fe -> {

-                TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);

-

-                switch (tfe.flowLiveType()) {

-                    case IMMEDIATE_FLOW:

-                        if (currentImmediate.containsKey(fe)) {

-                            previousImmediate.put(fe, tfe);

-                        } else if (currentShort.containsKey(fe)) {

-                            previousShort.put(fe, tfe);

-                        } else if (currentMid.containsKey(fe)) {

-                            previousMid.put(fe, tfe);

-                        } else if (currentLong.containsKey(fe)) {

-                            previousLong.put(fe, tfe);

-                        } else {

-                            previousUnknown.put(fe, tfe);

-                        }

-                        break;

-                    case SHORT_FLOW:

-                        if (currentShort.containsKey(fe)) {

-                            previousShort.put(fe, tfe);

-                        } else if (currentMid.containsKey(fe)) {

-                            previousMid.put(fe, tfe);

-                        } else if (currentLong.containsKey(fe)) {

-                            previousLong.put(fe, tfe);

-                        } else {

-                            previousUnknown.put(fe, tfe);

-                        }

-                        break;

-                    case MID_FLOW:

-                        if (currentMid.containsKey(fe)) {

-                            previousMid.put(fe, tfe);

-                        } else if (currentLong.containsKey(fe)) {

-                            previousLong.put(fe, tfe);

-                        } else {

-                            previousUnknown.put(fe, tfe);

-                        }

-                        break;

-                    case LONG_FLOW:

-                        if (currentLong.containsKey(fe)) {

-                            previousLong.put(fe, tfe);

-                        } else {

-                            previousUnknown.put(fe, tfe);

-                        }

-                        break;

-                    default:

-                        previousUnknown.put(fe, tfe);

-                        break;

-                }

-            });

-        }

-

-        /**

-         * Returns flow entries as the current value.

-         *

-         * @return flow entries as the current value

-         */

-        public ImmutableSet<FlowEntry> current() {

-            return currentAll;

-        }

-

-        /**

-         * Returns flow entries as the previous value.

-         *

-         * @return flow entries as the previous value

-         */

-        public ImmutableSet<FlowEntry> previous() {

-            return previousAll;

-        }

-

-        public Map<FlowRule, TypedStoredFlowEntry> currentImmediate() {

-            return currentImmediate;

-        }

-        public Map<FlowRule, TypedStoredFlowEntry> previousImmediate() {

-            return previousImmediate;

-        }

-        public Map<FlowRule, TypedStoredFlowEntry> currentShort() {

-            return currentShort;

-        }

-        public Map<FlowRule, TypedStoredFlowEntry> previousShort() {

-            return previousShort;

-        }

-        public Map<FlowRule, TypedStoredFlowEntry> currentMid() {

-            return currentMid;

-        }

-        public Map<FlowRule, TypedStoredFlowEntry> previousMid() {

-            return previousMid;

-        }

-        public Map<FlowRule, TypedStoredFlowEntry> currentLong() {

-            return currentLong;

-        }

-        public Map<FlowRule, TypedStoredFlowEntry> previousLong() {

-            return previousLong;

-        }

-        public Map<FlowRule, TypedStoredFlowEntry> currentUnknown() {

-            return currentUnknown;

-        }

-        public Map<FlowRule, TypedStoredFlowEntry> previousUnknown() {

-            return previousUnknown;

-        }

-

-        /**

-         * Validates values are not empty.

-         *

-         * @return false if either of the sets is empty. Otherwise, true.

-         */

-        public boolean isValid() {

-            return !(currentAll.isEmpty() || previousAll.isEmpty());

-        }

-

-        @Override

-        public int hashCode() {

-            return Objects.hash(currentAll, previousAll);

-        }

-

-        @Override

-        public boolean equals(Object obj) {

-            if (this == obj) {

-                return true;

-            }

-            if (!(obj instanceof TypedStatistics)) {

-                return false;

-            }

-            final TypedStatistics other = (TypedStatistics) obj;

-            return Objects.equals(this.currentAll, other.currentAll) &&

-                    Objects.equals(this.previousAll, other.previousAll);

-        }

-

-        @Override

-        public String toString() {

-            return MoreObjects.toStringHelper(this)

-                    .add("current", currentAll)

-                    .add("previous", previousAll)

-                    .toString();

-        }

-    }

-

-    private void checkLoadValidity(Set<FlowEntry> current, Set<FlowEntry> previous) {

-        current.stream().forEach(c -> {

-            FlowEntry f = previous.stream().filter(p -> c.equals(p)).

-                    findAny().orElse(null);

-            if (f != null && c.bytes() < f.bytes()) {

-                log.debug("FlowStatisticManager:checkLoadValidity():" +

-                        "Error: " + c + " :Previous bytes=" + f.bytes() +

-                        " is larger than current bytes=" + c.bytes() + " !!!");

-            }

-        });

-

-    }

-

-    /**

-     * Creates a predicate that checks the instruction type of a flow entry is the same as

-     * the specified instruction type.

-     *

-     * @param instType instruction type to be checked

-     * @return predicate

-     */

-    private static Predicate<FlowEntry> hasInstructionType(Instruction.Type instType) {

-        return new Predicate<FlowEntry>() {

-            @Override

-            public boolean apply(FlowEntry flowEntry) {

-                List<Instruction> allInstructions = flowEntry.treatment().allInstructions();

-

-                return allInstructions.stream().filter(i -> i.type() == instType).findAny().isPresent();

-            }

-        };

-    }

-

-    /**

-     * Internal flow rule event listener for FlowStatisticManager.

-     */

-    private class InternalFlowRuleStatsListener implements FlowRuleListener {

-

-        @Override

-        public void event(FlowRuleEvent event) {

-            FlowRule rule = event.subject();

-            switch (event.type()) {

-                case RULE_ADDED:

-                    if (rule instanceof FlowEntry) {

-                        flowStatisticStore.addFlowStatistic((FlowEntry) rule);

-                    }

-                    break;

-                case RULE_UPDATED:

-                    flowStatisticStore.updateFlowStatistic((FlowEntry) rule);

-                    break;

-                case RULE_ADD_REQUESTED:

-                    break;

-                case RULE_REMOVE_REQUESTED:

-                    break;

-                case RULE_REMOVED:

-                    flowStatisticStore.removeFlowStatistic(rule);

-                    break;

-                default:

-                    log.warn("Unknown flow rule event {}", event);

-            }

-        }

-    }

-}

+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.net.statistic.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cli.Comparators;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.DefaultTypedFlowEntry;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleListener;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.TypedStoredFlowEntry;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.statistic.DefaultLoad;
+import org.onosproject.net.statistic.FlowStatisticService;
+import org.onosproject.net.statistic.Load;
+import org.onosproject.net.statistic.FlowStatisticStore;
+import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;
+import org.onosproject.net.statistic.TypedFlowEntryWithLoad;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.security.AppGuard.checkPermission;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.security.AppPermission.Type.*;
+
+/**
+ * Provides an implementation of the Flow Statistic Service.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class FlowStatisticManager implements FlowStatisticService {
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected FlowRuleService flowRuleService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected FlowStatisticStore flowStatisticStore;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    private final InternalFlowRuleStatsListener frListener = new InternalFlowRuleStatsListener();
+
+    @Activate
+    public void activate() {
+        flowRuleService.addListener(frListener);
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        flowRuleService.removeListener(frListener);
+        log.info("Stopped");
+    }
+
+    @Override
+    public Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device) {
+        checkPermission(STATISTIC_READ);
+
+        Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
+
+        if (device == null) {
+            return summaryLoad;
+        }
+
+        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
+
+        for (Port port : ports) {
+            ConnectPoint cp = new ConnectPoint(device.id(), port.number());
+            SummaryFlowEntryWithLoad sfe = loadSummaryPortInternal(cp);
+            summaryLoad.put(cp, sfe);
+        }
+
+        return summaryLoad;
+    }
+
+    @Override
+    public SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber) {
+        checkPermission(STATISTIC_READ);
+
+        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
+        return loadSummaryPortInternal(cp);
+    }
+
+    @Override
+    public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,
+                                                                  TypedStoredFlowEntry.FlowLiveType liveType,
+                                                                  Instruction.Type instType) {
+        checkPermission(STATISTIC_READ);
+
+        Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
+
+        if (device == null) {
+            return allLoad;
+        }
+
+        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
+
+        for (Port port : ports) {
+            ConnectPoint cp = new ConnectPoint(device.id(), port.number());
+            List<TypedFlowEntryWithLoad> tfel = loadAllPortInternal(cp, liveType, instType);
+            allLoad.put(cp, tfel);
+        }
+
+        return allLoad;
+    }
+
+    @Override
+    public List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,
+                                               TypedStoredFlowEntry.FlowLiveType liveType,
+                                               Instruction.Type instType) {
+        checkPermission(STATISTIC_READ);
+
+        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
+        return loadAllPortInternal(cp, liveType, instType);
+    }
+
+    @Override
+    public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,
+                                                                   TypedStoredFlowEntry.FlowLiveType liveType,
+                                                                   Instruction.Type instType,
+                                                                   int topn) {
+        checkPermission(STATISTIC_READ);
+
+        Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad = new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);
+
+        if (device == null) {
+            return allLoad;
+        }
+
+        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));
+
+        for (Port port : ports) {
+            ConnectPoint cp = new ConnectPoint(device.id(), port.number());
+            List<TypedFlowEntryWithLoad> tfel = loadTopnPortInternal(cp, liveType, instType, topn);
+            allLoad.put(cp, tfel);
+        }
+
+        return allLoad;
+    }
+
+    @Override
+    public List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,
+                                                TypedStoredFlowEntry.FlowLiveType liveType,
+                                                Instruction.Type instType,
+                                                int topn) {
+        checkPermission(STATISTIC_READ);
+
+        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
+        return loadTopnPortInternal(cp, liveType, instType, topn);
+    }
+
+    private SummaryFlowEntryWithLoad loadSummaryPortInternal(ConnectPoint cp) {
+        checkPermission(STATISTIC_READ);
+
+        Set<FlowEntry> currentStats;
+        Set<FlowEntry> previousStats;
+
+        TypedStatistics typedStatistics;
+        synchronized (flowStatisticStore) {
+             currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);
+            if (currentStats == null) {
+                return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
+            }
+            previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);
+            if (previousStats == null) {
+                return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
+            }
+            // copy to local flow entry
+            typedStatistics = new TypedStatistics(currentStats, previousStats);
+
+            // Check for validity of this stats data
+            checkLoadValidity(currentStats, previousStats);
+        }
+
+        // current and previous set is not empty!
+        Set<FlowEntry> currentSet = typedStatistics.current();
+        Set<FlowEntry> previousSet = typedStatistics.previous();
+        Load totalLoad = new DefaultLoad(aggregateBytesSet(currentSet), aggregateBytesSet(previousSet),
+                TypedFlowEntryWithLoad.avgPollInterval());
+
+        Map<FlowRule, TypedStoredFlowEntry> currentMap;
+        Map<FlowRule, TypedStoredFlowEntry> previousMap;
+
+        currentMap = typedStatistics.currentImmediate();
+        previousMap = typedStatistics.previousImmediate();
+        Load immediateLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
+                TypedFlowEntryWithLoad.shortPollInterval());
+
+        currentMap = typedStatistics.currentShort();
+        previousMap = typedStatistics.previousShort();
+        Load shortLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
+                TypedFlowEntryWithLoad.shortPollInterval());
+
+        currentMap = typedStatistics.currentMid();
+        previousMap = typedStatistics.previousMid();
+        Load midLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
+                TypedFlowEntryWithLoad.midPollInterval());
+
+        currentMap = typedStatistics.currentLong();
+        previousMap = typedStatistics.previousLong();
+        Load longLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
+                TypedFlowEntryWithLoad.longPollInterval());
+
+        currentMap = typedStatistics.currentUnknown();
+        previousMap = typedStatistics.previousUnknown();
+        Load unknownLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
+                TypedFlowEntryWithLoad.avgPollInterval());
+
+        return new SummaryFlowEntryWithLoad(cp, totalLoad, immediateLoad, shortLoad, midLoad, longLoad, unknownLoad);
+    }
+
+    private List<TypedFlowEntryWithLoad> loadAllPortInternal(ConnectPoint cp,
+                                                             TypedStoredFlowEntry.FlowLiveType liveType,
+                                                             Instruction.Type instType) {
+        checkPermission(STATISTIC_READ);
+
+        List<TypedFlowEntryWithLoad> retTfel = new ArrayList<>();
+
+        Set<FlowEntry> currentStats;
+        Set<FlowEntry> previousStats;
+
+        TypedStatistics typedStatistics;
+        synchronized (flowStatisticStore) {
+            currentStats = flowStatisticStore.getCurrentFlowStatistic(cp);
+            if (currentStats == null) {
+                return retTfel;
+            }
+            previousStats = flowStatisticStore.getPreviousFlowStatistic(cp);
+            if (previousStats == null) {
+                return retTfel;
+            }
+            // copy to local flow entry set
+            typedStatistics = new TypedStatistics(currentStats, previousStats);
+
+            // Check for validity of this stats data
+            checkLoadValidity(currentStats, previousStats);
+        }
+
+        // current and previous set is not empty!
+        boolean isAllLiveType = (liveType == null ? true : false); // null is all live type
+        boolean isAllInstType = (instType == null ? true : false); // null is all inst type
+
+        Map<FlowRule, TypedStoredFlowEntry> currentMap;
+        Map<FlowRule, TypedStoredFlowEntry> previousMap;
+
+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW) {
+            currentMap = typedStatistics.currentImmediate();
+            previousMap = typedStatistics.previousImmediate();
+
+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
+                    isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());
+            if (fel.size() > 0) {
+                retTfel.addAll(fel);
+            }
+        }
+
+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW) {
+            currentMap = typedStatistics.currentShort();
+            previousMap = typedStatistics.previousShort();
+
+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
+                    isAllInstType, instType, TypedFlowEntryWithLoad.shortPollInterval());
+            if (fel.size() > 0) {
+                retTfel.addAll(fel);
+            }
+        }
+
+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.MID_FLOW) {
+            currentMap = typedStatistics.currentMid();
+            previousMap = typedStatistics.previousMid();
+
+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
+                    isAllInstType, instType, TypedFlowEntryWithLoad.midPollInterval());
+            if (fel.size() > 0) {
+                retTfel.addAll(fel);
+            }
+        }
+
+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.LONG_FLOW) {
+            currentMap = typedStatistics.currentLong();
+            previousMap = typedStatistics.previousLong();
+
+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
+                    isAllInstType, instType, TypedFlowEntryWithLoad.longPollInterval());
+            if (fel.size() > 0) {
+                retTfel.addAll(fel);
+            }
+        }
+
+        if (isAllLiveType || liveType == TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW) {
+            currentMap = typedStatistics.currentUnknown();
+            previousMap = typedStatistics.previousUnknown();
+
+            List<TypedFlowEntryWithLoad> fel = typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap,
+                    isAllInstType, instType, TypedFlowEntryWithLoad.avgPollInterval());
+            if (fel.size() > 0) {
+                retTfel.addAll(fel);
+            }
+        }
+
+        return retTfel;
+    }
+
+    private List<TypedFlowEntryWithLoad> typedFlowEntryLoadByInstInternal(ConnectPoint cp,
+                                                                      Map<FlowRule, TypedStoredFlowEntry> currentMap,
+                                                                      Map<FlowRule, TypedStoredFlowEntry> previousMap,
+                                                                      boolean isAllInstType,
+                                                                      Instruction.Type instType,
+                                                                      int liveTypePollInterval) {
+        List<TypedFlowEntryWithLoad> fel = new ArrayList<>();
+
+        for (TypedStoredFlowEntry tfe : currentMap.values()) {
+            if (isAllInstType ||
+                    tfe.treatment().allInstructions().stream().
+                            filter(i -> i.type() == instType).
+                            findAny().isPresent()) {
+                long currentBytes = tfe.bytes();
+                long previousBytes = previousMap.getOrDefault(tfe, new DefaultTypedFlowEntry((FlowRule) tfe)).bytes();
+                Load fLoad = new DefaultLoad(currentBytes, previousBytes, liveTypePollInterval);
+                fel.add(new TypedFlowEntryWithLoad(cp, tfe, fLoad));
+            }
+        }
+
+        return fel;
+    }
+
+    private List<TypedFlowEntryWithLoad> loadTopnPortInternal(ConnectPoint cp,
+                                                             TypedStoredFlowEntry.FlowLiveType liveType,
+                                                             Instruction.Type instType,
+                                                             int topn) {
+        List<TypedFlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);
+
+        // Sort with descending order of load
+        List<TypedFlowEntryWithLoad> tfel =
+                fel.stream().sorted(Comparators.TYPEFLOWENTRY_WITHLOAD_COMPARATOR).
+                        limit(topn).collect(Collectors.toList());
+
+        return tfel;
+    }
+
+    private long aggregateBytesSet(Set<FlowEntry> setFE) {
+        return setFE.stream().mapToLong(FlowEntry::bytes).sum();
+    }
+
+    private long aggregateBytesMap(Map<FlowRule, TypedStoredFlowEntry> mapFE) {
+        return mapFE.values().stream().mapToLong(FlowEntry::bytes).sum();
+    }
+
+    /**
+     * Internal data class holding two set of typed flow entries.
+     */
+    private static class TypedStatistics {
+        private final ImmutableSet<FlowEntry> currentAll;
+        private final ImmutableSet<FlowEntry> previousAll;
+
+        private final Map<FlowRule, TypedStoredFlowEntry> currentImmediate = new HashMap<>();
+        private final Map<FlowRule, TypedStoredFlowEntry> previousImmediate = new HashMap<>();
+
+        private final Map<FlowRule, TypedStoredFlowEntry> currentShort = new HashMap<>();
+        private final Map<FlowRule, TypedStoredFlowEntry> previousShort = new HashMap<>();
+
+        private final Map<FlowRule, TypedStoredFlowEntry> currentMid = new HashMap<>();
+        private final Map<FlowRule, TypedStoredFlowEntry> previousMid = new HashMap<>();
+
+        private final Map<FlowRule, TypedStoredFlowEntry> currentLong = new HashMap<>();
+        private final Map<FlowRule, TypedStoredFlowEntry> previousLong = new HashMap<>();
+
+        private final Map<FlowRule, TypedStoredFlowEntry> currentUnknown = new HashMap<>();
+        private final Map<FlowRule, TypedStoredFlowEntry> previousUnknown = new HashMap<>();
+
+        public TypedStatistics(Set<FlowEntry> current, Set<FlowEntry> previous) {
+            this.currentAll = ImmutableSet.copyOf(checkNotNull(current));
+            this.previousAll = ImmutableSet.copyOf(checkNotNull(previous));
+
+            currentAll.forEach(fe -> {
+                TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);
+
+                switch (tfe.flowLiveType()) {
+                    case IMMEDIATE_FLOW:
+                        currentImmediate.put(fe, tfe);
+                        break;
+                    case SHORT_FLOW:
+                        currentShort.put(fe, tfe);
+                        break;
+                    case MID_FLOW:
+                        currentMid.put(fe, tfe);
+                        break;
+                    case LONG_FLOW:
+                        currentLong.put(fe, tfe);
+                        break;
+                    default:
+                        currentUnknown.put(fe, tfe);
+                        break;
+                }
+            });
+
+            previousAll.forEach(fe -> {
+                TypedStoredFlowEntry tfe = TypedFlowEntryWithLoad.newTypedStoredFlowEntry(fe);
+
+                switch (tfe.flowLiveType()) {
+                    case IMMEDIATE_FLOW:
+                        if (currentImmediate.containsKey(fe)) {
+                            previousImmediate.put(fe, tfe);
+                        } else if (currentShort.containsKey(fe)) {
+                            previousShort.put(fe, tfe);
+                        } else if (currentMid.containsKey(fe)) {
+                            previousMid.put(fe, tfe);
+                        } else if (currentLong.containsKey(fe)) {
+                            previousLong.put(fe, tfe);
+                        } else {
+                            previousUnknown.put(fe, tfe);
+                        }
+                        break;
+                    case SHORT_FLOW:
+                        if (currentShort.containsKey(fe)) {
+                            previousShort.put(fe, tfe);
+                        } else if (currentMid.containsKey(fe)) {
+                            previousMid.put(fe, tfe);
+                        } else if (currentLong.containsKey(fe)) {
+                            previousLong.put(fe, tfe);
+                        } else {
+                            previousUnknown.put(fe, tfe);
+                        }
+                        break;
+                    case MID_FLOW:
+                        if (currentMid.containsKey(fe)) {
+                            previousMid.put(fe, tfe);
+                        } else if (currentLong.containsKey(fe)) {
+                            previousLong.put(fe, tfe);
+                        } else {
+                            previousUnknown.put(fe, tfe);
+                        }
+                        break;
+                    case LONG_FLOW:
+                        if (currentLong.containsKey(fe)) {
+                            previousLong.put(fe, tfe);
+                        } else {
+                            previousUnknown.put(fe, tfe);
+                        }
+                        break;
+                    default:
+                        previousUnknown.put(fe, tfe);
+                        break;
+                }
+            });
+        }
+
+        /**
+         * Returns flow entries as the current value.
+         *
+         * @return flow entries as the current value
+         */
+        public ImmutableSet<FlowEntry> current() {
+            return currentAll;
+        }
+
+        /**
+         * Returns flow entries as the previous value.
+         *
+         * @return flow entries as the previous value
+         */
+        public ImmutableSet<FlowEntry> previous() {
+            return previousAll;
+        }
+
+        public Map<FlowRule, TypedStoredFlowEntry> currentImmediate() {
+            return currentImmediate;
+        }
+        public Map<FlowRule, TypedStoredFlowEntry> previousImmediate() {
+            return previousImmediate;
+        }
+        public Map<FlowRule, TypedStoredFlowEntry> currentShort() {
+            return currentShort;
+        }
+        public Map<FlowRule, TypedStoredFlowEntry> previousShort() {
+            return previousShort;
+        }
+        public Map<FlowRule, TypedStoredFlowEntry> currentMid() {
+            return currentMid;
+        }
+        public Map<FlowRule, TypedStoredFlowEntry> previousMid() {
+            return previousMid;
+        }
+        public Map<FlowRule, TypedStoredFlowEntry> currentLong() {
+            return currentLong;
+        }
+        public Map<FlowRule, TypedStoredFlowEntry> previousLong() {
+            return previousLong;
+        }
+        public Map<FlowRule, TypedStoredFlowEntry> currentUnknown() {
+            return currentUnknown;
+        }
+        public Map<FlowRule, TypedStoredFlowEntry> previousUnknown() {
+            return previousUnknown;
+        }
+
+        /**
+         * Validates values are not empty.
+         *
+         * @return false if either of the sets is empty. Otherwise, true.
+         */
+        public boolean isValid() {
+            return !(currentAll.isEmpty() || previousAll.isEmpty());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(currentAll, previousAll);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof TypedStatistics)) {
+                return false;
+            }
+            final TypedStatistics other = (TypedStatistics) obj;
+            return Objects.equals(this.currentAll, other.currentAll) &&
+                    Objects.equals(this.previousAll, other.previousAll);
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(this)
+                    .add("current", currentAll)
+                    .add("previous", previousAll)
+                    .toString();
+        }
+    }
+
+    private void checkLoadValidity(Set<FlowEntry> current, Set<FlowEntry> previous) {
+        current.stream().forEach(c -> {
+            FlowEntry f = previous.stream().filter(p -> c.equals(p)).
+                    findAny().orElse(null);
+            if (f != null && c.bytes() < f.bytes()) {
+                log.debug("FlowStatisticManager:checkLoadValidity():" +
+                        "Error: " + c + " :Previous bytes=" + f.bytes() +
+                        " is larger than current bytes=" + c.bytes() + " !!!");
+            }
+        });
+
+    }
+
+    /**
+     * Creates a predicate that checks the instruction type of a flow entry is the same as
+     * the specified instruction type.
+     *
+     * @param instType instruction type to be checked
+     * @return predicate
+     */
+    private static Predicate<FlowEntry> hasInstructionType(Instruction.Type instType) {
+        return new Predicate<FlowEntry>() {
+            @Override
+            public boolean apply(FlowEntry flowEntry) {
+                List<Instruction> allInstructions = flowEntry.treatment().allInstructions();
+
+                return allInstructions.stream().filter(i -> i.type() == instType).findAny().isPresent();
+            }
+        };
+    }
+
+    /**
+     * Internal flow rule event listener for FlowStatisticManager.
+     */
+    private class InternalFlowRuleStatsListener implements FlowRuleListener {
+
+        @Override
+        public void event(FlowRuleEvent event) {
+            FlowRule rule = event.subject();
+            switch (event.type()) {
+                case RULE_ADDED:
+                    if (rule instanceof FlowEntry) {
+                        flowStatisticStore.addFlowStatistic((FlowEntry) rule);
+                    }
+                    break;
+                case RULE_UPDATED:
+                    flowStatisticStore.updateFlowStatistic((FlowEntry) rule);
+                    break;
+                case RULE_ADD_REQUESTED:
+                    break;
+                case RULE_REMOVE_REQUESTED:
+                    break;
+                case RULE_REMOVED:
+                    flowStatisticStore.removeFlowStatistic(rule);
+                    break;
+                default:
+                    log.warn("Unknown flow rule event {}", event);
+            }
+        }
+    }
+}