/*
 * Copyright 2015-present Open Networking Laboratory
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.onosproject.provider.of.flow.impl;

import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.onosproject.net.driver.DriverService;
import org.onosproject.net.flow.DefaultTypedFlowEntry;
import org.onosproject.net.flow.FlowEntry;
import org.onosproject.net.flow.FlowId;
import org.onosproject.net.flow.FlowRule;
import org.onosproject.net.flow.StoredFlowEntry;
import org.onosproject.net.flow.TypedStoredFlowEntry;
import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.openflow.controller.OpenFlowSwitch;
import org.onosproject.openflow.controller.RoleState;
import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;
import org.projectfloodlight.openflow.protocol.match.Match;
import org.projectfloodlight.openflow.types.OFPort;
import org.projectfloodlight.openflow.types.TableId;
import org.slf4j.Logger;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkNotNull;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.flow.TypedStoredFlowEntry.FlowLiveType;
import static org.slf4j.LoggerFactory.getLogger;

/**
 * Efficiently and adaptively collects flow statistics for the specified switch.
 */
public class NewAdaptiveFlowStatsCollector implements SwitchDataCollector {
    private final Logger log = getLogger(getClass());

    private static final String CHECK_AND_MOVE_LOG =
            "checkAndMoveLiveFlowInternal: flowId={}, state={}, afterLiveType={}"
                    + ", liveTime={}, life={}, bytes={}, packets={}, fromLastSeen={}"
                    + ", priority={}, selector={}, treatment={} dpid={}";

    private static final String CHECK_AND_MOVE_COUNT_LOG =
            "checkAndMoveLiveFlowAll: Total Flow_Count={}, add-remove_Count={}"
                    + ", IMMEDIATE_FLOW_Count={}, SHORT_FLOW_Count={}"
                    + ", MID_FLOW_Count={}, LONG_FLOW_Count={}, add_Count={}"
                    + ", addWithSetFlowLiveType_Count={}, remove_Count={}, dpid={}";

    private static final String ADD_INVALID_LOG =
            "addOrUpdateFlows: invalid flow update! The new life is SMALLER than the previous one"
                    + ", new flowId={}, old flowId={}, new bytes={}, old bytes={}"
                    + ", new life={}, old life={}, new lastSeen={}, old lastSeen={}";

    private final DriverService driverService;
    private final OpenFlowSwitch sw;

    private ScheduledExecutorService adaptiveFlowStatsScheduler =
            Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d", log));
    private ScheduledFuture<?> calAndShortFlowsThread;
    private ScheduledFuture<?> midFlowsThread;
    private ScheduledFuture<?> longFlowsThread;

    // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval
    private CalAndShortFlowsTask calAndShortFlowsTask;
    // Task that collects stats MID flows every 2*calAndPollInterval
    private MidFlowsTask midFlowsTask;
    // Task that collects stats LONG flows every 3*calAndPollInterval
    private LongFlowsTask longFlowsTask;

    private static final int CAL_AND_POLL_TIMES = 1; // must be always 0
    private static final int MID_POLL_TIMES = 2;     // variable greater or equal than 1
    private static final int LONG_POLL_TIMES = 3;    // variable greater or equal than MID_POLL_TIMES
    //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable
    // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES
    private static final int ENTIRE_POLL_TIMES = 6;

    private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;
    private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;
    private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;

    private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
    private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
    private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;
    // only used for checking condition at each task if it collects entire flows from a given switch or not
    private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;

    // Number of call count of each Task,
    // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask
    private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called
    private int callCountMidFlowsTask = 0;   // increased MID_POLL_TIMES whenever Task is called
    private int callCountLongFlowsTask = 0;  // increased LONG_POLL_TIMES whenever Task is called

    private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();

    private boolean isFirstTimeStart = true;

    public static final long NO_FLOW_MISSING_XID = (-1);
    private long flowMissingXid = NO_FLOW_MISSING_XID;

    /**
     * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.
     *
     * @param driverService driver service reference
     * @param sw            switch to pull
     * @param pollInterval  cal and immediate poll frequency in seconds
     */
    NewAdaptiveFlowStatsCollector(DriverService driverService, OpenFlowSwitch sw, int pollInterval) {
        this.driverService = driverService;
        this.sw = sw;
        initMemberVars(pollInterval);
    }

    // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count
    private void initMemberVars(int pollInterval) {
        if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {
            this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;
        } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {
            this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;
        } else {
            this.calAndPollInterval = pollInterval;
        }

        calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;
        midPollInterval = MID_POLL_TIMES * calAndPollInterval;
        longPollInterval = LONG_POLL_TIMES * calAndPollInterval;
        entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;

        callCountCalAndShortFlowsTask = 0;
        callCountMidFlowsTask = 0;
        callCountLongFlowsTask = 0;

        flowMissingXid = NO_FLOW_MISSING_XID;
    }

    /**
     * Adjusts adaptive poll frequency.
     *
     * @param pollInterval poll frequency in seconds
     */
    synchronized void adjustCalAndPollInterval(int pollInterval) {
        initMemberVars(pollInterval);

        if (calAndShortFlowsThread != null) {
            calAndShortFlowsThread.cancel(false);
        }
        if (midFlowsThread != null) {
            midFlowsThread.cancel(false);
        }
        if (longFlowsThread != null) {
            longFlowsThread.cancel(false);
        }

        calAndShortFlowsTask = new CalAndShortFlowsTask();
        calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
                calAndShortFlowsTask,
                0,
                calAndPollInterval,
                TimeUnit.SECONDS);

        midFlowsTask = new MidFlowsTask();
        midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
                midFlowsTask,
                0,
                midPollInterval,
                TimeUnit.SECONDS);

        longFlowsTask = new LongFlowsTask();
        longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
                longFlowsTask,
                0,
                longPollInterval,
                TimeUnit.SECONDS);

        log.debug("calAndPollInterval={} is adjusted", calAndPollInterval);
    }

    private class CalAndShortFlowsTask implements Runnable {
        @Override
        public void run() {
            if (sw.getRole() == RoleState.MASTER) {
                log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());

                if (isFirstTimeStart) {
                    // isFirstTimeStart, get entire flow stats from a given switch sw
                    log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",
                            sw.getStringId());
                    ofFlowStatsRequestAllSend();

                    callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
                    isFirstTimeStart = false;
                } else  if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {
                    // entire_poll_times, get entire flow stats from a given switch sw
                    log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());
                    ofFlowStatsRequestAllSend();

                    callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;
                    //TODO: check flows deleted in switch, but exist in controller flow table, then remove them
                    //
                } else {
                    calAndShortFlowsTaskInternal();
                    callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;
                }
            }
        }
    }

    // send openflow flow stats request message with getting all flow entries to a given switch sw
    private void ofFlowStatsRequestAllSend() {
        OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
                .setMatch(sw.factory().matchWildcardAll())
                .setTableId(TableId.ALL)
                .setOutPort(OFPort.NO_MASK)
                .build();

        synchronized (this) {
            // set the request xid to check the reply in OpenFlowRuleProvider
            // After processing the reply of this request message,
            // this must be set to NO_FLOW_MISSING_XID(-1) by provider
            setFlowMissingXid(request.getXid());
            log.debug("ofFlowStatsRequestAllSend: request={}, dpid={}",
                    request.toString(), sw.getStringId());

            sw.sendMsg(request);
        }
    }

    // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw
    private void ofFlowStatsRequestFlowSend(FlowEntry fe) {
        // set find match
        Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty(),
                Optional.of(driverService)).buildMatch();
        // set find tableId
        TableId tableId = TableId.of(fe.tableId());
        // set output port
        Instruction ins = fe.treatment().allInstructions().stream()
                .filter(i -> (i.type() == Instruction.Type.OUTPUT))
                .findFirst()
                .orElse(null);
        OFPort ofPort = OFPort.NO_MASK;
        if (ins != null) {
            Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;
            ofPort = OFPort.of((int) ((out.port().toLong())));
        }

        OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()
                .setMatch(match)
                .setTableId(tableId)
                .setOutPort(ofPort)
                .build();

        synchronized (this) {
            if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {
                log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"
                                + " set no flow missing xid anyway, for {}",
                        sw.getStringId());
                setFlowMissingXid(NO_FLOW_MISSING_XID);
            }

            sw.sendMsg(request);
        }
    }

    private void calAndShortFlowsTaskInternal() {
        deviceFlowTable.checkAndMoveLiveFlowAll();

        deviceFlowTable.getShortFlows().forEach(fe -> {
            ofFlowStatsRequestFlowSend(fe);
        });
    }

    private class MidFlowsTask implements Runnable {
        @Override
        public void run() {
            if (sw.getRole() == RoleState.MASTER) {
                log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());

                // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
                if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {
                    callCountMidFlowsTask = MID_POLL_TIMES;
                } else {
                    midFlowsTaskInternal();
                    callCountMidFlowsTask += MID_POLL_TIMES;
                }
            }
        }
    }

    private void midFlowsTaskInternal() {
        deviceFlowTable.getMidFlows().forEach(fe -> {
            ofFlowStatsRequestFlowSend(fe);
        });
    }

    private class LongFlowsTask implements Runnable {
        @Override
        public void run() {
            if (sw.getRole() == RoleState.MASTER) {
                log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());

                // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw
                if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {
                    callCountLongFlowsTask = LONG_POLL_TIMES;
                } else {
                    longFlowsTaskInternal();
                    callCountLongFlowsTask += LONG_POLL_TIMES;
                }
            }
        }
    }

    private void longFlowsTaskInternal() {
        deviceFlowTable.getLongFlows().forEach(fe -> {
            ofFlowStatsRequestFlowSend(fe);
        });
    }

    /**
     * Starts adaptive flow statistic collection.
     */
    public synchronized void start() {
        log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());
        callCountCalAndShortFlowsTask = 0;
        callCountMidFlowsTask = 0;
        callCountLongFlowsTask = 0;

        isFirstTimeStart = true;

        // Initially start polling quickly. Then drop down to configured value
        calAndShortFlowsTask = new CalAndShortFlowsTask();
        calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
                calAndShortFlowsTask,
                1,
                calAndPollInterval,
                TimeUnit.SECONDS);

        midFlowsTask = new MidFlowsTask();
        midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
                midFlowsTask,
                1,
                midPollInterval,
                TimeUnit.SECONDS);

        longFlowsTask = new LongFlowsTask();
        longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(
                longFlowsTask,
                1,
                longPollInterval,
                TimeUnit.SECONDS);

        log.info("Started");
    }

    /**
     * Stops adaptive flow statistic collection.
     */
    public synchronized void stop() {
        log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());
        if (calAndShortFlowsThread != null) {
            calAndShortFlowsThread.cancel(true);
        }
        if (midFlowsThread != null) {
            midFlowsThread.cancel(true);
        }
        if (longFlowsThread != null) {
            longFlowsThread.cancel(true);
        }

        adaptiveFlowStatsScheduler.shutdownNow();

        isFirstTimeStart = false;

        log.info("Stopped");
    }

    /**
     * Adds typed flow entry from flow rule into the internal flow table.
     *
     * @param flowRules the flow rules
     */
    public synchronized void addWithFlowRule(FlowRule... flowRules) {
        for (FlowRule fr : flowRules) {
            // First remove old entry unconditionally, if exist
            deviceFlowTable.remove(fr);

            // add new flow entry, we suppose IMMEDIATE_FLOW
            TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,
                    FlowLiveType.IMMEDIATE_FLOW);
            deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
        }
    }

    /**
     * Adds or updates typed flow entry from flow entry into the internal flow table.
     *
     * @param flowEntries the flow entries
     */
    public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {
        for (FlowEntry fe : flowEntries) {
            // check if this new rule is an update to an existing entry
            TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);

            if (stored != null) {
                // duplicated flow entry is collected!, just skip
                if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()
                        && fe.life() == stored.life()) {
                    if (log.isTraceEnabled()) {
                        log.trace("addOrUpdateFlows({}): flowId={},is DUPLICATED stats collection, just skip.",
                                sw.getStringId(), fe.id());
                    }

                    //FIXME modification of "stored" flow entry outside of store
                    stored.setLastSeen();
                    continue;
                } else if (fe.life() < stored.life()) {
                    // Invalid updates the stats values, i.e., bytes, packets, durations ...
                    if (log.isDebugEnabled()) {
                        log.debug(ADD_INVALID_LOG, fe.id(), stored.id(), fe.bytes(),
                                stored.bytes(), fe.life(), stored.life(),
                                fe.lastSeen(), stored.lastSeen());
                    }
                    // go next
                    //FIXME modification of "stored" flow entry outside of store
                    stored.setLastSeen();
                    continue;
                }

                // update now
                //FIXME modification of "stored" flow entry outside of store
                stored.setLife(fe.life(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
                stored.setPackets(fe.packets());
                stored.setBytes(fe.bytes());
                stored.setLastSeen();
                if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {
                    // flow is really RULE_ADDED
                    stored.setState(FlowEntry.FlowEntryState.ADDED);
                }
                // flow is RULE_UPDATED, skip adding and just updating flow live table
                //deviceFlowTable.calAndSetFlowLiveType(stored);
                continue;
            }

            // add new flow entry, we suppose IMMEDIATE_FLOW
            TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,
                    FlowLiveType.IMMEDIATE_FLOW);
            deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);
        }
    }

    /**
     * Removes typed flow entry from the internal flow table.
     *
     * @param flowRules the flow entries
     */
    public synchronized void removeFlows(FlowRule...  flowRules) {
        for (FlowRule rule : flowRules) {
            deviceFlowTable.remove(rule);
        }
    }

    // same as removeFlows() function
    /**
     * Removes typed flow entry from the internal flow table.
     *
     * @param flowRules the flow entries
     */
    public void flowRemoved(FlowRule... flowRules) {
        removeFlows(flowRules);
    }

    // same as addOrUpdateFlows() function
    /**
     * Adds or updates typed flow entry from flow entry into the internal flow table.
     *
     * @param flowEntries the flow entry list
     */
    public void pushFlowMetrics(List<FlowEntry> flowEntries) {
        flowEntries.forEach(this::addOrUpdateFlows);
    }

    /**
     * Returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).
     *
     * @return xid of missing flow
     */
    public long getFlowMissingXid() {
        return flowMissingXid;
    }

    /**
     * Sets flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.
     *
     * @param flowMissingXid the OFFlowStatsRequest message Id
     */
    public void setFlowMissingXid(long flowMissingXid) {
        this.flowMissingXid = flowMissingXid;
    }

    private class InternalDeviceFlowTable {

        private final Map<FlowId, Set<TypedStoredFlowEntry>>
                flowEntries = Maps.newConcurrentMap();

        private final Set<StoredFlowEntry> shortFlows = new HashSet<>();
        private final Set<StoredFlowEntry> midFlows = new HashSet<>();
        private final Set<StoredFlowEntry> longFlows = new HashSet<>();

        // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
        private final long latencyFlowStatsRequestAndReplyMillis = 500;


        // Statistics for table operation
        private long addCount = 0, addWithSetFlowLiveTypeCount = 0;
        private long removeCount = 0;

        /**
         * Resets all count values to zero.
         */
        public void resetAllCount() {
            addCount = 0;
            addWithSetFlowLiveTypeCount = 0;
            removeCount = 0;
        }

        // get set of flow entries for the given flowId
        private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {
            return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());
        }

        // get flow entry for the given flow rule
        private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {
            Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());
            return flowEntries.stream()
                    .filter(entry -> Objects.equal(entry, rule))
                    .findAny()
                    .orElse(null);
        }

        // get the flow entries for all flows in flow table
        private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {
            Set<TypedStoredFlowEntry> result = Sets.newHashSet();

            flowEntries.values().forEach(result::addAll);
            return result;
        }

        /**
         * Gets the number of flow entry in flow table.
         *
         * @return the number of flow entry
         */
        public long getFlowCount() {
            return flowEntries.values().stream().mapToLong(Set::size).sum();
        }

        /**
         * Gets the number of flow entry in flow table.
         *
         * @param rule the flow rule
         * @return the typed flow entry
         */
        public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {
            checkNotNull(rule);

            return getFlowEntryInternal(rule);
        }

        /**
         * Gets the all typed flow entries in flow table.
         *
         * @return the set of typed flow entry
         */
        public Set<TypedStoredFlowEntry> getFlowEntries() {
            return getFlowEntriesInternal();
        }

        /**
         * Gets the short typed flow entries in flow table.
         *
         * @return the set of typed flow entry
         */
        public Set<StoredFlowEntry> getShortFlows() {
            return ImmutableSet.copyOf(shortFlows);
        }

        /**
         * Gets the mid typed flow entries in flow table.
         *
         * @return the set of typed flow entry
         */
        public Set<StoredFlowEntry> getMidFlows() {
            return ImmutableSet.copyOf(midFlows);
        }

        /**
         * Gets the long typed flow entries in flow table.
         *
         * @return the set of typed flow entry
         */
        public Set<StoredFlowEntry> getLongFlows() {
            return ImmutableSet.copyOf(longFlows);
        }

        /**
         * Add typed flow entry into table only.
         *
         * @param rule the flow rule
         */
        public synchronized void add(TypedStoredFlowEntry rule) {
            checkNotNull(rule);

            //rule have to be new DefaultTypedFlowEntry
            boolean result = getFlowEntriesInternal(rule.id()).add(rule);

            if (result) {
                addCount++;
            }
        }

        /**
         * Calculates and sets the flow live type at the first time,
         * and then add it into a corresponding typed flow table.
         *
         * @param rule the flow rule
         */
        public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {
            checkNotNull(rule);

            calAndSetFlowLiveTypeInternal(rule);
        }

        /**
         * Adds the typed flow entry into table, and calculates and set the flow live type,
         * and then add it into a corresponding typed flow table.
         *
         * @param rule the flow rule
         */
        public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {
            checkNotNull(rule);

            //rule have to be new DefaultTypedFlowEntry
            boolean result = getFlowEntriesInternal(rule.id()).add(rule);
            if (result) {
                calAndSetFlowLiveTypeInternal(rule);
                addWithSetFlowLiveTypeCount++;
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("FlowId {} ADD failed, it may already exist in table - {}",
                            rule.id(), sw.getStringId());
                }
            }
        }

        // In real, calculates and set the flow live type at the first time,
        // and then add it into a corresponding typed flow table
        private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {
            long life = rule.life();
            FlowLiveType prevFlowLiveType = rule.flowLiveType();

            if (life >= longPollInterval) {
                rule.setFlowLiveType(FlowLiveType.LONG_FLOW);
                longFlows.add(rule);
            } else if (life >= midPollInterval) {
                rule.setFlowLiveType(FlowLiveType.MID_FLOW);
                midFlows.add(rule);
            } else if (life >= calAndPollInterval) {
                rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);
                shortFlows.add(rule);
            } else if (life >= 0) {
                rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);
            } else { // life < 0
                rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);
            }

            if (rule.flowLiveType() != prevFlowLiveType) {
                switch (prevFlowLiveType) {
                // delete it from previous flow table
                case SHORT_FLOW:
                    shortFlows.remove(rule);
                    break;
                case MID_FLOW:
                    midFlows.remove(rule);
                    break;
                case LONG_FLOW:
                    longFlows.remove(rule);
                    break;
                default:
                    break;
                }
            }
        }


        // check the flow live type based on current time, then set and add it into corresponding table
        private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {
            long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());
            // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply
            long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);
            // fe.life() unit is SECOND!
            long liveTime = fe.life() + fromLastSeen;


            switch (fe.flowLiveType()) {
            case IMMEDIATE_FLOW:
                if (liveTime >= longPollInterval) {
                    fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
                    longFlows.add(fe);
                } else if (liveTime >= midPollInterval) {
                    fe.setFlowLiveType(FlowLiveType.MID_FLOW);
                    midFlows.add(fe);
                } else if (liveTime >= calAndPollInterval) {
                    fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);
                    shortFlows.add(fe);
                }
                break;
            case SHORT_FLOW:
                if (liveTime >= longPollInterval) {
                    fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
                    shortFlows.remove(fe);
                    longFlows.add(fe);
                } else if (liveTime >= midPollInterval) {
                    fe.setFlowLiveType(FlowLiveType.MID_FLOW);
                    shortFlows.remove(fe);
                    midFlows.add(fe);
                }
                break;
            case MID_FLOW:
                if (liveTime >= longPollInterval) {
                    fe.setFlowLiveType(FlowLiveType.LONG_FLOW);
                    midFlows.remove(fe);
                    longFlows.add(fe);
                }
                break;
            case LONG_FLOW:
                if (fromLastSeen > entirePollInterval) {
                    log.trace("checkAndMoveLiveFlowInternal: flow is already removed at switch.");
                    return false;
                }
                break;
            case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through
            default:
                log.error("Unknown live type error for {}", sw.getStringId());
                return false;
            }

            if (log.isTraceEnabled()) {
                log.trace(CHECK_AND_MOVE_LOG, fe.id(), fe.state(), fe.flowLiveType(),
                        liveTime, fe.life(), fe.bytes(), fe.packets(), fromLastSeen,
                        fe.priority(), fe.selector().criteria(), fe.treatment(),
                        sw.getStringId());
            }

            return true;
        }

        /**
         * Checks and moves live type for all type flow entries in table at every calAndPollInterval time.
         */
        public void checkAndMoveLiveFlowAll() {
            Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();

            long calCurTime = System.currentTimeMillis();
            typedFlowEntries.forEach(fe -> {
                if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {
                    remove(fe);
                }
            });

            // print table counts for debug
            if (log.isTraceEnabled()) {
                synchronized (this) {
                    long totalFlowCount = getFlowCount();
                    long shortFlowCount = shortFlows.size();
                    long midFlowCount = midFlows.size();
                    long longFlowCount = longFlows.size();
                    long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;
                    long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;

                    log.trace(CHECK_AND_MOVE_COUNT_LOG, totalFlowCount, calTotalCount,
                            immediateFlowCount, shortFlowCount, midFlowCount, longFlowCount,
                            addCount, addWithSetFlowLiveTypeCount, removeCount, sw.getStringId());

                    if (totalFlowCount != calTotalCount) {
                        log.error("Real total flow count and calculated total flow count do NOT match");
                    }
                    if (immediateFlowCount < 0) {
                        log.error("Immediate flow count is negative");
                    }
                }
            }
            log.trace("checkAndMoveLiveFlowAll: adaptiveStats for {}", sw.getStringId());
        }

        /**
         * Removes the typed flow entry from table.
         *
         * @param rule the flow rule
         */
        public synchronized void remove(FlowRule rule) {
            checkNotNull(rule);

            TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);
            if (removeStore != null) {
                removeLiveFlowsInternal(removeStore);
                boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);

                if (result) {
                    removeCount++;
                }
            }
        }

        // Remove the typed flow entry from corresponding table
        private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {
            switch (fe.flowLiveType()) {
            case IMMEDIATE_FLOW:
                // do nothing
                break;
            case SHORT_FLOW:
                shortFlows.remove(fe);
                break;
            case MID_FLOW:
                midFlows.remove(fe);
                break;
            case LONG_FLOW:
                longFlows.remove(fe);
                break;
            default: // error in Flow Live Type
                log.error("removeLiveFlowsInternal: unknown live type error");
                break;
            }
        }
    }
}
