/*
 * Copyright 2015-present Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.onosproject.net.statistic.impl;

import com.google.common.base.MoreObjects;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onosproject.utils.Comparators;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultFlowEntry;
import org.onosproject.net.flow.DefaultTypedFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TypedStoredFlowEntry;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.statistic.DefaultLoad;
import org.onosproject.net.statistic.FlowEntryWithLoad;
import org.onosproject.net.statistic.FlowStatisticService;
import org.onosproject.net.statistic.Load;
import org.onosproject.net.statistic.PollInterval;
import org.onosproject.net.statistic.StatisticStore;
import org.onosproject.net.statistic.SummaryFlowEntryWithLoad;
import org.onosproject.net.statistic.TypedFlowEntryWithLoad;

import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.slf4j.LoggerFactory.getLogger;
import static org.onosproject.security.AppPermission.Type.*;

/**
 * Provides an implementation of the Flow Statistic Service.
 */
@Component(immediate = true)
@Service
public class FlowStatisticManager implements FlowStatisticService {
    private final Logger log = getLogger(getClass());

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected StatisticStore statisticStore;

    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
    protected DeviceService deviceService;

    @Activate
    public void activate() {
        log.info("Started");
    }

    @Deactivate
    public void deactivate() {
        log.info("Stopped");
    }

    @Override
    public Map<ConnectPoint, SummaryFlowEntryWithLoad> loadSummary(Device device) {
        checkPermission(STATISTIC_READ);

        Map<ConnectPoint, SummaryFlowEntryWithLoad> summaryLoad =
                                        new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

        if (device == null) {
            return summaryLoad;
        }

        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

        for (Port port : ports) {
            ConnectPoint cp = new ConnectPoint(device.id(), port.number());
            SummaryFlowEntryWithLoad sfe = loadSummaryPortInternal(cp);
            summaryLoad.put(cp, sfe);
        }

        return summaryLoad;
    }

    @Override
    public SummaryFlowEntryWithLoad loadSummary(Device device, PortNumber pNumber) {
        checkPermission(STATISTIC_READ);

        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
        return loadSummaryPortInternal(cp);
    }

    @Override
    public Map<ConnectPoint, List<FlowEntryWithLoad>> loadAllByType(Device device,
                                                                  FlowEntry.FlowLiveType liveType,
                                                                  Instruction.Type instType) {
        checkPermission(STATISTIC_READ);

        Map<ConnectPoint, List<FlowEntryWithLoad>> allLoad =
                                        new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

        if (device == null) {
            return allLoad;
        }

        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

        for (Port port : ports) {
            ConnectPoint cp = new ConnectPoint(device.id(), port.number());
            List<FlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);
            allLoad.put(cp, fel);
        }

        return allLoad;
    }

    @Override
    public List<FlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,
                                               FlowEntry.FlowLiveType liveType,
                                               Instruction.Type instType) {
        checkPermission(STATISTIC_READ);

        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
        return loadAllPortInternal(cp, liveType, instType);
    }

    @Override
    public Map<ConnectPoint, List<FlowEntryWithLoad>> loadTopnByType(Device device,
                                                                   FlowEntry.FlowLiveType liveType,
                                                                   Instruction.Type instType,
                                                                   int topn) {
        checkPermission(STATISTIC_READ);

        Map<ConnectPoint, List<FlowEntryWithLoad>> allLoad =
                                        new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

        if (device == null) {
            return allLoad;
        }

        List<Port> ports = new ArrayList<>(deviceService.getPorts(device.id()));

        for (Port port : ports) {
            ConnectPoint cp = new ConnectPoint(device.id(), port.number());
            List<FlowEntryWithLoad> fel = loadTopnPortInternal(cp, liveType, instType, topn);
            allLoad.put(cp, fel);
        }

        return allLoad;
    }

    @Override
    public List<FlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,
                                                FlowEntry.FlowLiveType liveType,
                                                Instruction.Type instType,
                                                int topn) {
        checkPermission(STATISTIC_READ);

        ConnectPoint cp = new ConnectPoint(device.id(), pNumber);
        return loadTopnPortInternal(cp, liveType, instType, topn);
    }

    private SummaryFlowEntryWithLoad loadSummaryPortInternal(ConnectPoint cp) {
        checkPermission(STATISTIC_READ);

        Set<FlowEntry> currentStats;
        Set<FlowEntry> previousStats;

        TypedStatistics typedStatistics;
        synchronized (statisticStore) {
             currentStats = statisticStore.getCurrentStatistic(cp);
            if (currentStats == null) {
                return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
            }
            previousStats = statisticStore.getPreviousStatistic(cp);
            if (previousStats == null) {
                return new SummaryFlowEntryWithLoad(cp, new DefaultLoad());
            }
            // copy to local flow entry
            typedStatistics = new TypedStatistics(currentStats, previousStats);

            // Check for validity of this stats data
            checkLoadValidity(currentStats, previousStats);
        }

        // current and previous set is not empty!
        Set<FlowEntry> currentSet = typedStatistics.current();
        Set<FlowEntry> previousSet = typedStatistics.previous();
        PollInterval pollIntervalInstance = PollInterval.getInstance();

        // We assume that default pollInterval is flowPollFrequency in case adaptiveFlowSampling is true or false
        Load totalLoad = new DefaultLoad(aggregateBytesSet(currentSet), aggregateBytesSet(previousSet),
                                         pollIntervalInstance.getPollInterval());

        Map<FlowRule, FlowEntry> currentMap;
        Map<FlowRule, FlowEntry> previousMap;

        currentMap = typedStatistics.currentImmediate();
        previousMap = typedStatistics.previousImmediate();
        Load immediateLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
                                             pollIntervalInstance.getPollInterval());

        currentMap = typedStatistics.currentShort();
        previousMap = typedStatistics.previousShort();
        Load shortLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
                                         pollIntervalInstance.getPollInterval());

        currentMap = typedStatistics.currentMid();
        previousMap = typedStatistics.previousMid();
        Load midLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
                                       pollIntervalInstance.getMidPollInterval());

        currentMap = typedStatistics.currentLong();
        previousMap = typedStatistics.previousLong();
        Load longLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
                                        pollIntervalInstance.getLongPollInterval());

        currentMap = typedStatistics.currentUnknown();
        previousMap = typedStatistics.previousUnknown();
        Load unknownLoad = new DefaultLoad(aggregateBytesMap(currentMap), aggregateBytesMap(previousMap),
                                           pollIntervalInstance.getPollInterval());

        return new SummaryFlowEntryWithLoad(cp, totalLoad, immediateLoad, shortLoad, midLoad, longLoad, unknownLoad);
    }

    private List<FlowEntryWithLoad> loadAllPortInternal(ConnectPoint cp,
                                                             FlowEntry.FlowLiveType liveType,
                                                             Instruction.Type instType) {
        checkPermission(STATISTIC_READ);

        List<FlowEntryWithLoad> retFel = new ArrayList<>();

        Set<FlowEntry> currentStats;
        Set<FlowEntry> previousStats;

        TypedStatistics typedStatistics;
        synchronized (statisticStore) {
            currentStats = statisticStore.getCurrentStatistic(cp);
            if (currentStats == null) {
                return retFel;
            }
            previousStats = statisticStore.getPreviousStatistic(cp);
            if (previousStats == null) {
                return retFel;
            }
            // copy to local flow entry set
            typedStatistics = new TypedStatistics(currentStats, previousStats);

            // Check for validity of this stats data
            checkLoadValidity(currentStats, previousStats);
        }

        // current and previous set is not empty!
        boolean isAllInstType = (instType == null ? true : false); // null is all inst type
        boolean isAllLiveType = (liveType == null ? true : false); // null is all live type

        Map<FlowRule, FlowEntry> currentMap;
        Map<FlowRule, FlowEntry> previousMap;

        if (isAllLiveType) {
            currentMap = typedStatistics.currentAll();
            previousMap = typedStatistics.previousAll();
        } else {
            switch (liveType) {
                case IMMEDIATE:
                    currentMap = typedStatistics.currentImmediate();
                    previousMap = typedStatistics.previousImmediate();
                    break;
                case SHORT:
                    currentMap = typedStatistics.currentShort();
                    previousMap = typedStatistics.previousShort();
                    break;
                case MID:
                    currentMap = typedStatistics.currentMid();
                    previousMap = typedStatistics.previousMid();
                    break;
                case LONG:
                    currentMap = typedStatistics.currentLong();
                    previousMap = typedStatistics.previousLong();
                    break;
                case UNKNOWN:
                    currentMap = typedStatistics.currentUnknown();
                    previousMap = typedStatistics.previousUnknown();
                    break;
                default:
                    currentMap = new HashMap<>();
                    previousMap = new HashMap<>();
                    break;
            }
        }

        return typedFlowEntryLoadByInstInternal(cp, currentMap, previousMap, isAllInstType, instType);
    }

    private List<FlowEntryWithLoad> typedFlowEntryLoadByInstInternal(ConnectPoint cp,
                                                                      Map<FlowRule, FlowEntry> currentMap,
                                                                      Map<FlowRule, FlowEntry> previousMap,
                                                                      boolean isAllInstType,
                                                                      Instruction.Type instType) {
        List<FlowEntryWithLoad> fel = new ArrayList<>();

        currentMap.values().forEach(fe -> {
            if (isAllInstType ||
                    fe.treatment().allInstructions().stream().
                            filter(i -> i.type() == instType).
                            findAny().isPresent()) {
                long currentBytes = fe.bytes();
                long previousBytes = previousMap.getOrDefault(fe, new DefaultFlowEntry(fe)).bytes();
                long liveTypePollInterval = getLiveTypePollInterval(fe.liveType());
                Load fLoad = new DefaultLoad(currentBytes, previousBytes, liveTypePollInterval);
                fel.add(new FlowEntryWithLoad(cp, fe, fLoad));
            }
        });

        return fel;
    }

    private List<FlowEntryWithLoad> loadTopnPortInternal(ConnectPoint cp,
                                                             FlowEntry.FlowLiveType liveType,
                                                             Instruction.Type instType,
                                                             int topn) {
        List<FlowEntryWithLoad> fel = loadAllPortInternal(cp, liveType, instType);

        // Sort with descending order of load
        List<FlowEntryWithLoad> retFel =
                fel.stream().sorted(Comparators.FLOWENTRY_WITHLOAD_COMPARATOR).
                        limit(topn).collect(Collectors.toList());

        return retFel;
    }

    private long aggregateBytesSet(Set<FlowEntry> setFE) {
        return setFE.stream().mapToLong(FlowEntry::bytes).sum();
    }

    private long aggregateBytesMap(Map<FlowRule, FlowEntry> mapFE) {
        return mapFE.values().stream().mapToLong(FlowEntry::bytes).sum();
    }

    private long getLiveTypePollInterval(FlowEntry.FlowLiveType liveType) {
        // returns the flow live type poll interval value
        PollInterval pollIntervalInstance = PollInterval.getInstance();

        switch (liveType) {
            case LONG:
                return pollIntervalInstance.getLongPollInterval();
            case MID:
                return pollIntervalInstance.getMidPollInterval();
            case SHORT:
            case IMMEDIATE:
            default: // UNKNOWN
                return pollIntervalInstance.getPollInterval();
        }
    }

    //
    // Deprecated interfaces...
    //
    @Override
    public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadAllByType(Device device,
                                                                  TypedStoredFlowEntry.FlowLiveType liveType,
                                                                  Instruction.Type instType) {
        FlowEntry.FlowLiveType type = toFlowEntryLiveType(liveType);

        Map<ConnectPoint, List<FlowEntryWithLoad>> loadMap = loadAllByType(device, type, instType);

        return toFlowEntryWithLoadMap(loadMap);
    }

    @Override
    public List<TypedFlowEntryWithLoad> loadAllByType(Device device, PortNumber pNumber,
                                               TypedStoredFlowEntry.FlowLiveType liveType,
                                               Instruction.Type instType) {
        FlowEntry.FlowLiveType type = toFlowEntryLiveType(liveType);

        List<FlowEntryWithLoad> loadList = loadAllByType(device, pNumber, type, instType);

        return toFlowEntryWithLoad(loadList);
    }

    @Override
    public Map<ConnectPoint, List<TypedFlowEntryWithLoad>> loadTopnByType(Device device,
                                                                   TypedStoredFlowEntry.FlowLiveType liveType,
                                                                   Instruction.Type instType,
                                                                   int topn) {
        FlowEntry.FlowLiveType type = toFlowEntryLiveType(liveType);

        Map<ConnectPoint, List<FlowEntryWithLoad>> loadMap = loadTopnByType(device, type, instType, topn);

        return toFlowEntryWithLoadMap(loadMap);
    }

    @Override
    public List<TypedFlowEntryWithLoad> loadTopnByType(Device device, PortNumber pNumber,
                                                TypedStoredFlowEntry.FlowLiveType liveType,
                                                Instruction.Type instType,
                                                int topn) {
        FlowEntry.FlowLiveType type = toFlowEntryLiveType(liveType);

        List<FlowEntryWithLoad> loadList = loadTopnByType(device, pNumber, type, instType, topn);

        return toFlowEntryWithLoad(loadList);
    }

    private FlowEntry.FlowLiveType toFlowEntryLiveType(TypedStoredFlowEntry.FlowLiveType liveType) {
        if (liveType == null) {
            return null;
        }

        // convert TypedStoredFlowEntry flow live type to FlowEntry one
        switch (liveType) {
            case IMMEDIATE_FLOW:
                return FlowEntry.FlowLiveType.IMMEDIATE;
            case SHORT_FLOW:
                return FlowEntry.FlowLiveType.SHORT;
            case MID_FLOW:
                return FlowEntry.FlowLiveType.MID;
            case LONG_FLOW:
                return FlowEntry.FlowLiveType.LONG;
            default:
                return FlowEntry.FlowLiveType.UNKNOWN;
        }
    }

    private TypedStoredFlowEntry.FlowLiveType toTypedStoredFlowEntryLiveType(FlowEntry.FlowLiveType liveType) {
        if (liveType == null) {
            return null;
        }

        // convert TypedStoredFlowEntry flow live type to FlowEntry one
        switch (liveType) {
            case IMMEDIATE:
                return TypedStoredFlowEntry.FlowLiveType.IMMEDIATE_FLOW;
            case SHORT:
                return TypedStoredFlowEntry.FlowLiveType.SHORT_FLOW;
            case MID:
                return TypedStoredFlowEntry.FlowLiveType.MID_FLOW;
            case LONG:
                return TypedStoredFlowEntry.FlowLiveType.LONG_FLOW;
            default:
                return TypedStoredFlowEntry.FlowLiveType.UNKNOWN_FLOW;
        }
    }

    private Map<ConnectPoint, List<TypedFlowEntryWithLoad>> toFlowEntryWithLoadMap(
            Map<ConnectPoint, List<FlowEntryWithLoad>> loadMap) {
        // convert FlowEntryWithLoad list to TypedFlowEntryWithLoad list
        Map<ConnectPoint, List<TypedFlowEntryWithLoad>> allLoad =
                                        new TreeMap<>(Comparators.CONNECT_POINT_COMPARATOR);

        loadMap.forEach((k, v) -> {
            List<TypedFlowEntryWithLoad> tfelList =
                    toFlowEntryWithLoad(v);
            allLoad.put(k, tfelList);
        });

        return allLoad;
    }

    private List<TypedFlowEntryWithLoad> toFlowEntryWithLoad(List<FlowEntryWithLoad> loadList) {
        // convert FlowEntryWithLoad list to TypedFlowEntryWithLoad list
        List<TypedFlowEntryWithLoad> tfelList = new ArrayList<>();
        loadList.forEach(fel -> {
            StoredFlowEntry sfe = fel.storedFlowEntry();
            TypedStoredFlowEntry.FlowLiveType liveType = toTypedStoredFlowEntryLiveType(sfe.liveType());
            TypedStoredFlowEntry tfe = new DefaultTypedFlowEntry(sfe, liveType);
            TypedFlowEntryWithLoad tfel = new TypedFlowEntryWithLoad(fel.connectPoint(), tfe, fel.load());
            tfelList.add(tfel);
        });

        return tfelList;
    }

    /**
     * Internal data class holding two set of flow entries included flow liveType.
     */
    private static class TypedStatistics {
        private final ImmutableSet<FlowEntry> current;
        private final ImmutableSet<FlowEntry> previous;

        private final Map<FlowRule, FlowEntry> currentAll = new HashMap<>();
        private final Map<FlowRule, FlowEntry> previousAll = new HashMap<>();

        private final Map<FlowRule, FlowEntry> currentImmediate = new HashMap<>();
        private final Map<FlowRule, FlowEntry> previousImmediate = new HashMap<>();

        private final Map<FlowRule, FlowEntry> currentShort = new HashMap<>();
        private final Map<FlowRule, FlowEntry> previousShort = new HashMap<>();

        private final Map<FlowRule, FlowEntry> currentMid = new HashMap<>();
        private final Map<FlowRule, FlowEntry> previousMid = new HashMap<>();

        private final Map<FlowRule, FlowEntry> currentLong = new HashMap<>();
        private final Map<FlowRule, FlowEntry> previousLong = new HashMap<>();

        private final Map<FlowRule, FlowEntry> currentUnknown = new HashMap<>();
        private final Map<FlowRule, FlowEntry> previousUnknown = new HashMap<>();

        public TypedStatistics(Set<FlowEntry> current, Set<FlowEntry> previous) {
            this.current = ImmutableSet.copyOf(checkNotNull(current));
            this.previous = ImmutableSet.copyOf(checkNotNull(previous));

            current.forEach(fe -> {
                switch (fe.liveType()) {
                    case IMMEDIATE:
                        currentImmediate.put(fe, fe);
                        break;
                    case SHORT:
                        currentShort.put(fe, fe);
                        break;
                    case MID:
                        currentMid.put(fe, fe);
                        break;
                    case LONG:
                        currentLong.put(fe, fe);
                        break;
                    default: // unknown
                        currentUnknown.put(fe, fe);
                        break;
                }
                currentAll.put(fe, fe);
            });

            previous.forEach(fe -> {
                switch (fe.liveType()) {
                    case IMMEDIATE:
                        if (currentImmediate.containsKey(fe)) {
                            previousImmediate.put(fe, fe);
                        } else if (currentShort.containsKey(fe)) {
                            previousShort.put(fe, fe);
                        } else if (currentMid.containsKey(fe)) {
                            previousMid.put(fe, fe);
                        } else if (currentLong.containsKey(fe)) {
                            previousLong.put(fe, fe);
                        } else {
                            previousUnknown.put(fe, fe);
                        }
                        break;
                    case SHORT:
                        if (currentShort.containsKey(fe)) {
                            previousShort.put(fe, fe);
                        } else if (currentMid.containsKey(fe)) {
                            previousMid.put(fe, fe);
                        } else if (currentLong.containsKey(fe)) {
                            previousLong.put(fe, fe);
                        } else {
                            previousUnknown.put(fe, fe);
                        }
                        break;
                    case MID:
                        if (currentMid.containsKey(fe)) {
                            previousMid.put(fe, fe);
                        } else if (currentLong.containsKey(fe)) {
                            previousLong.put(fe, fe);
                        } else {
                            previousUnknown.put(fe, fe);
                        }
                        break;
                    case LONG:
                        if (currentLong.containsKey(fe)) {
                            previousLong.put(fe, fe);
                        } else {
                            previousUnknown.put(fe, fe);
                        }
                        break;
                    default: // unknown
                        previousUnknown.put(fe, fe);
                        break;
                }
                previousAll.put(fe, fe);
            });
        }

        /**
         * Returns flow entries as the current value.
         *
         * @return flow entries as the current value
         */
        public ImmutableSet<FlowEntry> current() {
            return current;
        }

        /**
         * Returns flow entries as the previous value.
         *
         * @return flow entries as the previous value
         */
        public ImmutableSet<FlowEntry> previous() {
            return previous;
        }

        public Map<FlowRule, FlowEntry> currentAll() {
            return currentAll;
        }

        public Map<FlowRule, FlowEntry> previousAll() {
            return previousAll;
        }

        public Map<FlowRule, FlowEntry> currentImmediate() {
            return currentImmediate;
        }
        public Map<FlowRule, FlowEntry> previousImmediate() {
            return previousImmediate;
        }
        public Map<FlowRule, FlowEntry> currentShort() {
            return currentShort;
        }
        public Map<FlowRule, FlowEntry> previousShort() {
            return previousShort;
        }
        public Map<FlowRule, FlowEntry> currentMid() {
            return currentMid;
        }
        public Map<FlowRule, FlowEntry> previousMid() {
            return previousMid;
        }
        public Map<FlowRule, FlowEntry> currentLong() {
            return currentLong;
        }
        public Map<FlowRule, FlowEntry> previousLong() {
            return previousLong;
        }
        public Map<FlowRule, FlowEntry> currentUnknown() {
            return currentUnknown;
        }
        public Map<FlowRule, FlowEntry> previousUnknown() {
            return previousUnknown;
        }

        /**
         * Validates values are not empty.
         *
         * @return false if either of the sets is empty. Otherwise, true.
         */
        public boolean isValid() {
            return !(currentAll.isEmpty() || previousAll.isEmpty());
        }

        @Override
        public int hashCode() {
            return Objects.hash(currentAll, previousAll);
        }

        @Override
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof TypedStatistics)) {
                return false;
            }
            final TypedStatistics other = (TypedStatistics) obj;
            return Objects.equals(this.currentAll, other.currentAll) &&
                    Objects.equals(this.previousAll, other.previousAll);
        }

        @Override
        public String toString() {
            return MoreObjects.toStringHelper(this)
                    .add("current", currentAll)
                    .add("previous", previousAll)
                    .toString();
        }
    }

    private void checkLoadValidity(Set<FlowEntry> current, Set<FlowEntry> previous) {
        current.forEach(c -> {
            FlowEntry f = previous.stream().filter(p -> c.equals(p)).
                    findAny().orElse(null);
            if (f != null && c.bytes() < f.bytes()) {
                log.debug("FlowStatisticManager:checkLoadValidity():" +
                        "Error: " + c + " :Previous bytes=" + f.bytes() +
                        " is larger than current bytes=" + c.bytes() + " !!!");
            }
        });

    }

    /**
     * Creates a predicate that checks the instruction type of a flow entry is the same as
     * the specified instruction type.
     *
     * @param instType instruction type to be checked
     * @return predicate
     */
    private static Predicate<FlowEntry> hasInstructionType(Instruction.Type instType) {
        return new Predicate<FlowEntry>() {
            @Override
            public boolean apply(FlowEntry flowEntry) {
                List<Instruction> allInstructions = flowEntry.treatment().allInstructions();

                return allInstructions.stream().filter(i -> i.type() == instType).findAny().isPresent();
            }
        };
    }

    /**
     * Creates a predicate that checks the flow type of a flow entry is the same as
     * the specified live type.
     *
     * @param liveType flow live type to be checked
     * @return predicate
     */
    private static Predicate<FlowEntry> hasLiveType(FlowEntry.FlowLiveType liveType) {
        return flowEntry -> flowEntry.liveType() == liveType;
    }
}
