[Emu] openTAM: NewAdaptiveFlowStatsCollector Implementation

  - NewAdaptiveFlowStatsCollector.java
   .Bug fix to initialize callCountCalAndShortFlowsTask value
   .Added flowMissingXid variable to identify individual StatsRequest or match all StatsRequest message or not
  - DefaultTypedFlowEntry.java, TypedStoredFlowEntry.java
   .Added javadoc for class
  - OpenFlowRuleProvider.java
   .Line 2: 2014 -> 2015
   .Added adaptiveFlowSampling boolean property with default
   .Added call providerService.pushFlowMetricsWithoutFlowMissing in case of  individual StatsRequest
  - FlowRuleProviderService.java
   .Added pushFlowMetricsWithoutFlowMissing() function
  - FlowRuleManager.java
   .Added pushFlowMetricsWithoutFlowMissing() implementation
  - OpenFlowControllerImpl.java
   .Bug fix to unchange the StatsRequest Xid value in case of StatsReply Flow message type

Change-Id: Id4dc4a164da654af7b6dfb090af7336e748ef118
diff --git a/core/api/src/main/java/org/onosproject/net/flow/DefaultTypedFlowEntry.java b/core/api/src/main/java/org/onosproject/net/flow/DefaultTypedFlowEntry.java
new file mode 100644
index 0000000..afceb14
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/DefaultTypedFlowEntry.java
@@ -0,0 +1,122 @@
+/*

+ * Copyright 2015 Open Networking Laboratory

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package org.onosproject.net.flow;

+

+import static com.google.common.base.MoreObjects.toStringHelper;

+

+/**

+ * Default flow entry class with FlowLiveType value, IMMEDIATE_FLOW, SHORT_FLOW, MID_FLOW, LONG_FLOW.

+ */

+public class DefaultTypedFlowEntry extends DefaultFlowEntry

+    implements TypedStoredFlowEntry {

+    private FlowLiveType liveType;

+

+    /**

+     * Creates a typed flow entry from flow rule and its statistics, with default flow live type(IMMEDIATE_FLOW).

+     *

+     * @param rule the flow rule

+     * @param state the flow state

+     * @param life the flow duration since creation

+     * @param packets the flow packets count

+     * @param bytes the flow bytes count

+     *

+     */

+    public DefaultTypedFlowEntry(FlowRule rule, FlowEntryState state,

+                            long life, long packets, long bytes) {

+        super(rule, state, life, packets, bytes);

+        this.liveType = FlowLiveType.IMMEDIATE_FLOW;

+    }

+

+    /**

+     * Creates a typed flow entry from flow rule,  with default flow live type(IMMEDIATE_FLOW).

+     *

+     * @param rule the flow rule

+     *

+     */

+    public DefaultTypedFlowEntry(FlowRule rule) {

+        super(rule);

+        this.liveType = FlowLiveType.IMMEDIATE_FLOW;

+    }

+

+    /**

+     * Creates a typed flow entry from flow entry,  with default flow live type(IMMEDIATE_FLOW).

+     *

+     * @param fe the flow entry

+     *

+     */

+    public DefaultTypedFlowEntry(FlowEntry fe) {

+        super(fe, fe.state(), fe.life(), fe.packets(), fe.bytes());

+        this.liveType = FlowLiveType.IMMEDIATE_FLOW;

+    }

+

+    /**

+     * Creates a typed flow entry from flow rule and flow live type.

+     *

+     * @param rule the flow rule

+     * @param liveType the flow live type

+     *

+     */

+    public DefaultTypedFlowEntry(FlowRule rule, FlowLiveType liveType) {

+        super(rule);

+        this.liveType = liveType;

+    }

+

+    /**

+     * Creates a typed flow entry from flow entry and flow live type.

+     *

+     * @param fe the flow rule

+     * @param liveType the flow live type

+     *

+     */

+    public DefaultTypedFlowEntry(FlowEntry fe,  FlowLiveType liveType) {

+        super(fe, fe.state(), fe.life(), fe.packets(), fe.bytes());

+        this.liveType = liveType;

+    }

+

+    /**

+     * Creates a typed flow entry from flow rule, error code and flow live type.

+     *

+     * @param rule the flow rule

+     * @param errType the flow error type

+     * @param errCode the flow error code

+     * @param liveType the flow live type

+     *

+     */

+    public DefaultTypedFlowEntry(FlowRule rule, int errType, int errCode, FlowLiveType liveType) {

+        super(rule, errType, errCode);

+        this.liveType = liveType;

+    }

+

+    @Override

+    public FlowLiveType flowLiveType() {

+        return this.liveType;

+    }

+

+    @Override

+    public void setFlowLiveType(FlowLiveType liveType) {

+        this.liveType = liveType;

+    }

+

+    @Override

+    public String toString() {

+        return toStringHelper(this)

+                .add("entry", super.toString())

+                .add("type", liveType)

+                .toString();

+    }

+}

+

diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
index 8a36a92..48aa504 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleProviderService.java
@@ -41,6 +41,15 @@
     void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries);
 
     /**
+     * Pushes the collection of flow entries currently applied on the given
+     * device without flowMissing process.
+     *
+     * @param deviceId device identifier
+     * @param flowEntries collection of flow rules
+     */
+    void pushFlowMetricsWithoutFlowMissing(DeviceId deviceId, Iterable<FlowEntry> flowEntries);
+
+    /**
      * Indicates to the core that the requested batch operation has
      * been completed.
      *
diff --git a/core/api/src/main/java/org/onosproject/net/flow/TypedStoredFlowEntry.java b/core/api/src/main/java/org/onosproject/net/flow/TypedStoredFlowEntry.java
new file mode 100644
index 0000000..a93dc07
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/net/flow/TypedStoredFlowEntry.java
@@ -0,0 +1,65 @@
+/*

+ * Copyright 2015 Open Networking Laboratory

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package org.onosproject.net.flow;

+

+/**

+ * Represents a flow live type for a given flow entry.

+ */

+public interface TypedStoredFlowEntry extends StoredFlowEntry {

+    enum FlowLiveType {

+        /**

+         * Indicates that this rule has been submitted for addition immediately.

+         * Not necessarily collecting flow stats.

+         */

+        IMMEDIATE_FLOW,

+

+        /**

+         * Indicates that this rule has been submitted for a short time.

+         * Necessarily collecting flow stats every calAndPollInterval.

+         */

+        SHORT_FLOW,

+

+        /**

+         * Indicates that this rule has been submitted for a mid time.

+         * Necessarily collecting flow stats every midPollInterval.

+         */

+        MID_FLOW,

+

+        /**

+         * Indicates that this rule has been submitted for a long time.

+         * Necessarily collecting flow stats every longPollInterval.

+         */

+        LONG_FLOW,

+

+        /**

+         * Indicates that this rule has been submitted for UNKNOWN or ERROR.

+         * Not necessarily collecting flow stats.

+         */

+        UNKNOWN_FLOW

+    }

+

+    /**

+     * Gets the flow live type for this entry.

+     */

+    FlowLiveType flowLiveType();

+

+    /**

+     * Sets the new flow live type for this entry.

+     * @param liveType new flow live type.

+     */

+    void setFlowLiveType(FlowLiveType liveType);

+}

diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index a1d046c..9bbd0aa 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -388,6 +388,16 @@
 
         @Override
         public void pushFlowMetrics(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
+            pushFlowMetricsInternal(deviceId, flowEntries, true);
+        }
+
+        @Override
+        public void pushFlowMetricsWithoutFlowMissing(DeviceId deviceId, Iterable<FlowEntry> flowEntries) {
+            pushFlowMetricsInternal(deviceId, flowEntries, false);
+        }
+
+        private void pushFlowMetricsInternal(DeviceId deviceId, Iterable<FlowEntry> flowEntries,
+                                             boolean useMissingFlow) {
             Map<FlowEntry, FlowEntry> storedRules = Maps.newHashMap();
             store.getFlowEntries(deviceId).forEach(f -> storedRules.put(f, f));
 
@@ -415,17 +425,20 @@
                     continue;
                 }
             }
-            for (FlowEntry rule : storedRules.keySet()) {
-                try {
-                    // there are rules in the store that aren't on the switch
-                    log.debug("Adding rule in store, but not on switch {}", rule);
-                    flowMissing(rule);
-                } catch (Exception e) {
-                    log.debug("Can't add missing flow rule {}", e.getMessage());
-                    continue;
+
+            // DO NOT reinstall
+            if (useMissingFlow) {
+                for (FlowEntry rule : storedRules.keySet()) {
+                    try {
+                        // there are rules in the store that aren't on the switch
+                        log.debug("Adding rule in store, but not on switch {}", rule);
+                        flowMissing(rule);
+                    } catch (Exception e) {
+                        log.debug("Can't add missing flow rule {}", e.getMessage());
+                        continue;
+                    }
                 }
             }
-
         }
 
         @Override
diff --git a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 99ca6b2..a25b7a8 100644
--- a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -273,6 +273,7 @@
                         OFFlowStatsReply.Builder rep =
                                 OFFactories.getFactory(msg.getVersion()).buildFlowStatsReply();
                         rep.setEntries(Lists.newLinkedList(flowStats));
+                        rep.setXid(reply.getXid());
                         executorMsgs.submit(new OFMessageHandler(dpid, rep.build()));
                     }
                     break;
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java
new file mode 100644
index 0000000..181ba00
--- /dev/null
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/NewAdaptiveFlowStatsCollector.java
@@ -0,0 +1,897 @@
+/*

+ * Copyright 2015 Open Networking Laboratory

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *     http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+package org.onosproject.provider.of.flow.impl;

+

+import java.util.HashSet;

+import java.util.List;

+import java.util.Map;

+import java.util.Optional;

+import java.util.Set;

+import java.util.concurrent.TimeUnit;

+import java.util.concurrent.ScheduledFuture;

+import java.util.concurrent.Executors;

+import java.util.concurrent.ScheduledExecutorService;

+

+import com.google.common.base.Objects;

+import com.google.common.collect.ImmutableSet;

+import com.google.common.collect.Maps;

+import com.google.common.collect.Sets;

+

+import org.onosproject.net.flow.DefaultTypedFlowEntry;

+import org.onosproject.net.flow.FlowEntry;

+import org.onosproject.net.flow.FlowId;

+import org.onosproject.net.flow.FlowRule;

+import org.onosproject.net.flow.StoredFlowEntry;

+import org.onosproject.net.flow.TypedStoredFlowEntry;

+import org.onosproject.net.flow.instructions.Instruction;

+import org.onosproject.net.flow.instructions.Instructions;

+import org.onosproject.openflow.controller.OpenFlowSwitch;

+import org.onosproject.openflow.controller.RoleState;

+import org.projectfloodlight.openflow.protocol.OFFlowStatsRequest;

+import org.projectfloodlight.openflow.protocol.match.Match;

+import org.projectfloodlight.openflow.types.OFPort;

+import org.projectfloodlight.openflow.types.TableId;

+import org.slf4j.Logger;

+

+import static com.google.common.base.Preconditions.checkNotNull;

+import static org.onlab.util.Tools.groupedThreads;

+import static org.onosproject.net.flow.TypedStoredFlowEntry.*;

+import static org.slf4j.LoggerFactory.getLogger;

+

+/**

+ * Efficiently and adaptively collects flow statistics for the specified switch.

+ */

+public class NewAdaptiveFlowStatsCollector {

+

+    private final Logger log = getLogger(getClass());

+

+    private final OpenFlowSwitch sw;

+

+    private ScheduledExecutorService adaptiveFlowStatsScheduler =

+            Executors.newScheduledThreadPool(4, groupedThreads("onos/flow", "device-stats-collector-%d"));

+    private ScheduledFuture<?> calAndShortFlowsThread;

+    private ScheduledFuture<?> midFlowsThread;

+    private ScheduledFuture<?> longFlowsThread;

+

+    // Task that calculates all flowEntries' FlowLiveType and collects stats IMMEDIATE flows every calAndPollInterval

+    private CalAndShortFlowsTask calAndShortFlowsTask;

+    // Task that collects stats MID flows every 2*calAndPollInterval

+    private MidFlowsTask midFlowsTask;

+    // Task that collects stats LONG flows every 3*calAndPollInterval

+    private LongFlowsTask longFlowsTask;

+

+    private static final int CAL_AND_POLL_TIMES = 1; // must be always 0

+    private static final int MID_POLL_TIMES = 2;     // variable greater or equal than 1

+    private static final int LONG_POLL_TIMES = 3;    // variable greater or equal than MID_POLL_TIMES

+    //TODO: make ENTIRE_POLL_TIMES configurable with enable or disable

+    // must be variable greater or equal than common multiple of MID_POLL_TIMES and LONG_POLL_TIMES

+    private static final int ENTIRE_POLL_TIMES = 6;

+

+    private static final int DEFAULT_CAL_AND_POLL_FREQUENCY = 5;

+    private static final int MIN_CAL_AND_POLL_FREQUENCY = 2;

+    private static final int MAX_CAL_AND_POLL_FREQUENCY = 60;

+

+    private int calAndPollInterval; // CAL_AND_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;

+    private int midPollInterval; // MID_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;

+    private int longPollInterval; // LONG_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;

+    // only used for checking condition at each task if it collects entire flows from a given switch or not

+    private int entirePollInterval; // ENTIRE_POLL_TIMES * DEFAULT_CAL_AND_POLL_FREQUENCY;

+

+    // Number of call count of each Task,

+    // for undoing collection except only entire flows collecting task in CalAndShortFlowsTask

+    private int callCountCalAndShortFlowsTask = 0; // increased CAL_AND_POLL_TIMES whenever Task is called

+    private int callCountMidFlowsTask = 0;   // increased MID_POLL_TIMES whenever Task is called

+    private int callCountLongFlowsTask = 0;  // increased LONG_POLL_TIMES whenever Task is called

+

+    private InternalDeviceFlowTable deviceFlowTable = new InternalDeviceFlowTable();

+

+    private boolean isFirstTimeStart = true;

+

+    public static final long NO_FLOW_MISSING_XID = (-1);

+    private long flowMissingXid = NO_FLOW_MISSING_XID;

+

+    /**

+     * Creates a new adaptive collector for the given switch and default cal_and_poll frequency.

+     *

+     * @param sw           switch to pull

+     * @param pollInterval cal and immediate poll frequency in seconds

+     */

+    NewAdaptiveFlowStatsCollector(OpenFlowSwitch sw, int pollInterval) {

+        this.sw = sw;

+

+        initMemberVars(pollInterval);

+    }

+

+    // check calAndPollInterval validity and set all pollInterval values and finally initialize each task call count

+    private void initMemberVars(int pollInterval) {

+        if (pollInterval < MIN_CAL_AND_POLL_FREQUENCY) {

+            this.calAndPollInterval = MIN_CAL_AND_POLL_FREQUENCY;

+        } else if (pollInterval >= MAX_CAL_AND_POLL_FREQUENCY) {

+            this.calAndPollInterval = MAX_CAL_AND_POLL_FREQUENCY;

+        } else {

+            this.calAndPollInterval = pollInterval;

+        }

+

+        calAndPollInterval = CAL_AND_POLL_TIMES * calAndPollInterval;

+        midPollInterval = MID_POLL_TIMES * calAndPollInterval;

+        longPollInterval = LONG_POLL_TIMES * calAndPollInterval;

+        entirePollInterval = ENTIRE_POLL_TIMES * calAndPollInterval;

+

+        callCountCalAndShortFlowsTask = 0;

+        callCountMidFlowsTask = 0;

+        callCountLongFlowsTask = 0;

+

+        flowMissingXid = NO_FLOW_MISSING_XID;

+    }

+

+    /**

+     * Adjusts adaptive poll frequency.

+     *

+     * @param pollInterval poll frequency in seconds

+     */

+    synchronized void adjustCalAndPollInterval(int pollInterval) {

+        initMemberVars(pollInterval);

+

+        if (calAndShortFlowsThread != null) {

+            calAndShortFlowsThread.cancel(false);

+        }

+        if (midFlowsThread != null) {

+            midFlowsThread.cancel(false);

+        }

+        if (longFlowsThread != null) {

+            longFlowsThread.cancel(false);

+        }

+

+        calAndShortFlowsTask = new CalAndShortFlowsTask();

+        calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(

+                calAndShortFlowsTask,

+                0,

+                calAndPollInterval,

+                TimeUnit.SECONDS);

+

+        midFlowsTask = new MidFlowsTask();

+        midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(

+                midFlowsTask,

+                0,

+                midPollInterval,

+                TimeUnit.SECONDS);

+

+        longFlowsTask = new LongFlowsTask();

+        longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(

+                longFlowsTask,

+                0,

+                longPollInterval,

+                TimeUnit.SECONDS);

+

+        log.debug("calAndPollInterval=" + calAndPollInterval + "is adjusted");

+    }

+

+    private class CalAndShortFlowsTask implements Runnable {

+        @Override

+        public void run() {

+            if (sw.getRole() == RoleState.MASTER) {

+                log.trace("CalAndShortFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());

+

+                if (isFirstTimeStart) {

+                    // isFirstTimeStart, get entire flow stats from a given switch sw

+                    log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats at first time start for {}",

+                            sw.getStringId());

+                    ofFlowStatsRequestAllSend();

+

+                    callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;

+                    isFirstTimeStart = false;

+                } else  if (callCountCalAndShortFlowsTask == ENTIRE_POLL_TIMES) {

+                    // entire_poll_times, get entire flow stats from a given switch sw

+                    log.trace("CalAndShortFlowsTask Collecting Entire AdaptiveStats for {}", sw.getStringId());

+                    ofFlowStatsRequestAllSend();

+

+                    callCountCalAndShortFlowsTask = CAL_AND_POLL_TIMES;

+                    //TODO: check flows deleted in switch, but exist in controller flow table, then remove them

+                    //

+                } else {

+                    calAndShortFlowsTaskInternal();

+                    callCountCalAndShortFlowsTask += CAL_AND_POLL_TIMES;

+                }

+            }

+        }

+    }

+

+    // send openflow flow stats request message with getting all flow entries to a given switch sw

+    private void ofFlowStatsRequestAllSend() {

+        OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()

+                .setMatch(sw.factory().matchWildcardAll())

+                .setTableId(TableId.ALL)

+                .setOutPort(OFPort.NO_MASK)

+                .build();

+

+        synchronized (this) {

+            // set the request xid to check the reply in OpenFlowRuleProvider

+            // After processing the reply of this request message,

+            // this must be set to NO_FLOW_MISSING_XID(-1) by provider

+            setFlowMissingXid(request.getXid());

+            log.debug("ofFlowStatsRequestAllSend,Request={},for {}", request.toString(), sw.getStringId());

+

+            sw.sendMsg(request);

+        }

+    }

+

+    // send openflow flow stats request message with getting the specific flow entry(fe) to a given switch sw

+    private void ofFlowStatsRequestFlowSend(FlowEntry fe) {

+        // set find match

+        Match match = FlowModBuilder.builder(fe, sw.factory(), Optional.empty()).buildMatch();

+        // set find tableId

+        TableId tableId = TableId.of(fe.tableId());

+        // set output port

+        Instruction ins = fe.treatment().allInstructions().stream()

+                .filter(i -> (i.type() == Instruction.Type.OUTPUT))

+                .findFirst()

+                .orElse(null);

+        OFPort ofPort = OFPort.NO_MASK;

+        if (ins != null) {

+            Instructions.OutputInstruction out = (Instructions.OutputInstruction) ins;

+            ofPort = OFPort.of((int) ((out.port().toLong())));

+        }

+

+        OFFlowStatsRequest request = sw.factory().buildFlowStatsRequest()

+                .setMatch(match)

+                .setTableId(tableId)

+                .setOutPort(ofPort)

+                .build();

+

+        synchronized (this) {

+            if (getFlowMissingXid() != NO_FLOW_MISSING_XID) {

+                log.debug("ofFlowStatsRequestFlowSend: previous FlowStatsRequestAll does not be processed yet,"

+                                + " set no flow missing xid anyway, for {}",

+                        sw.getStringId());

+                setFlowMissingXid(NO_FLOW_MISSING_XID);

+            }

+

+            sw.sendMsg(request);

+        }

+    }

+

+    private void calAndShortFlowsTaskInternal() {

+        deviceFlowTable.checkAndMoveLiveFlowAll();

+

+        deviceFlowTable.getShortFlows().forEach(fe -> {

+            ofFlowStatsRequestFlowSend(fe);

+        });

+    }

+

+    private class MidFlowsTask implements Runnable {

+        @Override

+        public void run() {

+            if (sw.getRole() == RoleState.MASTER) {

+                log.trace("MidFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());

+

+                // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw

+                if (callCountMidFlowsTask == ENTIRE_POLL_TIMES) {

+                    callCountMidFlowsTask = MID_POLL_TIMES;

+                } else {

+                    midFlowsTaskInternal();

+                    callCountMidFlowsTask += MID_POLL_TIMES;

+                }

+            }

+        }

+    }

+

+    private void midFlowsTaskInternal() {

+        deviceFlowTable.getMidFlows().forEach(fe -> {

+            ofFlowStatsRequestFlowSend(fe);

+        });

+    }

+

+    private class LongFlowsTask implements Runnable {

+        @Override

+        public void run() {

+            if (sw.getRole() == RoleState.MASTER) {

+                log.trace("LongFlowsTask Collecting AdaptiveStats for {}", sw.getStringId());

+

+                // skip collecting because CalAndShortFlowsTask collects entire flow stats from a given switch sw

+                if (callCountLongFlowsTask == ENTIRE_POLL_TIMES) {

+                    callCountLongFlowsTask = LONG_POLL_TIMES;

+                } else {

+                    longFlowsTaskInternal();

+                    callCountLongFlowsTask += LONG_POLL_TIMES;

+                }

+            }

+        }

+    }

+

+    private void longFlowsTaskInternal() {

+        deviceFlowTable.getLongFlows().forEach(fe -> {

+            ofFlowStatsRequestFlowSend(fe);

+        });

+    }

+

+    /**

+     * start adaptive flow statistic collection.

+     *

+     */

+    public synchronized void start() {

+        log.debug("Starting AdaptiveStats collection thread for {}", sw.getStringId());

+        callCountCalAndShortFlowsTask = 0;

+        callCountMidFlowsTask = 0;

+        callCountLongFlowsTask = 0;

+

+        isFirstTimeStart = true;

+

+        // Initially start polling quickly. Then drop down to configured value

+        calAndShortFlowsTask = new CalAndShortFlowsTask();

+        calAndShortFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(

+                calAndShortFlowsTask,

+                1,

+                calAndPollInterval,

+                TimeUnit.SECONDS);

+

+        midFlowsTask = new MidFlowsTask();

+        midFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(

+                midFlowsTask,

+                1,

+                midPollInterval,

+                TimeUnit.SECONDS);

+

+        longFlowsTask = new LongFlowsTask();

+        longFlowsThread = adaptiveFlowStatsScheduler.scheduleWithFixedDelay(

+                longFlowsTask,

+                1,

+                longPollInterval,

+                TimeUnit.SECONDS);

+

+        log.info("Started");

+    }

+

+    /**

+     * stop adaptive flow statistic collection.

+     *

+     */

+    public synchronized void stop() {

+        log.debug("Stopping AdaptiveStats collection thread for {}", sw.getStringId());

+        if (calAndShortFlowsThread != null) {

+            calAndShortFlowsThread.cancel(true);

+        }

+        if (midFlowsThread != null) {

+            midFlowsThread.cancel(true);

+        }

+        if (longFlowsThread != null) {

+            longFlowsThread.cancel(true);

+        }

+

+        adaptiveFlowStatsScheduler.shutdownNow();

+

+        isFirstTimeStart = false;

+

+        log.info("Stopped");

+    }

+

+    /**

+     * add typed flow entry from flow rule into the internal flow table.

+     *

+     * @param flowRules the flow rules

+     *

+     */

+    public synchronized void addWithFlowRule(FlowRule... flowRules) {

+        for (FlowRule fr : flowRules) {

+            // First remove old entry unconditionally, if exist

+            deviceFlowTable.remove(fr);

+

+            // add new flow entry, we suppose IMMEDIATE_FLOW

+            TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fr,

+                    FlowLiveType.IMMEDIATE_FLOW);

+            deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);

+        }

+    }

+

+    /**

+     * add or update typed flow entry from flow entry into the internal flow table.

+     *

+     * @param flowEntries the flow entries

+     *

+     */

+    public synchronized void addOrUpdateFlows(FlowEntry... flowEntries) {

+       for (FlowEntry fe : flowEntries) {

+           // check if this new rule is an update to an existing entry

+           TypedStoredFlowEntry stored = deviceFlowTable.getFlowEntry(fe);

+

+           if (stored != null) {

+               // duplicated flow entry is collected!, just skip

+               if (fe.bytes() == stored.bytes() && fe.packets() == stored.packets()

+                       && fe.life() == stored.life()) {

+                   log.debug("addOrUpdateFlows:, FlowId=" + Long.toHexString(fe.id().value())

+                                   + ",is DUPLICATED stats collection, just skip."

+                                   + " AdaptiveStats collection thread for {}",

+                           sw.getStringId());

+

+                   stored.setLastSeen();

+                   continue;

+               } else if (fe.life() < stored.life()) {

+                   // Invalid updates the stats values, i.e., bytes, packets, durations ...

+                   log.debug("addOrUpdateFlows():" +

+                               " Invalid Flow Update! The new life is SMALLER than the previous one, jus skip." +

+                               " new flowId=" + Long.toHexString(fe.id().value()) +

+                               ", old flowId=" + Long.toHexString(stored.id().value()) +

+                               ", new bytes=" + fe.bytes() + ", old bytes=" + stored.bytes() +

+                               ", new life=" + fe.life() + ", old life=" + stored.life() +

+                               ", new lastSeen=" + fe.lastSeen() + ", old lastSeen=" + stored.lastSeen());

+                   // go next

+                   stored.setLastSeen();

+                   continue;

+               }

+

+               // update now

+               stored.setLife(fe.life());

+               stored.setPackets(fe.packets());

+               stored.setBytes(fe.bytes());

+               stored.setLastSeen();

+               if (stored.state() == FlowEntry.FlowEntryState.PENDING_ADD) {

+                   // flow is really RULE_ADDED

+                   stored.setState(FlowEntry.FlowEntryState.ADDED);

+               }

+               // flow is RULE_UPDATED, skip adding and just updating flow live table

+               //deviceFlowTable.calAndSetFlowLiveType(stored);

+               continue;

+           }

+

+           // add new flow entry, we suppose IMMEDIATE_FLOW

+           TypedStoredFlowEntry newFlowEntry = new DefaultTypedFlowEntry(fe,

+                    FlowLiveType.IMMEDIATE_FLOW);

+           deviceFlowTable.addWithCalAndSetFlowLiveType(newFlowEntry);

+        }

+    }

+

+    /**

+     * remove typed flow entry from the internal flow table.

+     *

+     * @param flowRules the flow entries

+     *

+     */

+    public synchronized void removeFlows(FlowRule...  flowRules) {

+        for (FlowRule rule : flowRules) {

+            deviceFlowTable.remove(rule);

+        }

+    }

+

+    // same as removeFlows() function

+    /**

+     * remove typed flow entry from the internal flow table.

+     *

+     * @param flowRules the flow entries

+     *

+     */

+    public void flowRemoved(FlowRule... flowRules) {

+        removeFlows(flowRules);

+    }

+

+    // same as addOrUpdateFlows() function

+    /**

+     * add or update typed flow entry from flow entry into the internal flow table.

+     *

+     * @param flowEntries the flow entry list

+     *

+     */

+    public void pushFlowMetrics(List<FlowEntry> flowEntries) {

+        flowEntries.forEach(fe -> {

+            addOrUpdateFlows(fe);

+        });

+    }

+

+    /**

+     * returns flowMissingXid that indicates the execution of flowMissing process or not(NO_FLOW_MISSING_XID(-1)).

+     *

+     */

+    public long getFlowMissingXid() {

+        return flowMissingXid;

+    }

+

+    /**

+     * set flowMissingXid, namely OFFlowStatsRequest match any ALL message Id.

+     *

+     * @param flowMissingXid the OFFlowStatsRequest message Id

+     *

+     */

+    public void setFlowMissingXid(long flowMissingXid) {

+        this.flowMissingXid = flowMissingXid;

+    }

+

+    private class InternalDeviceFlowTable {

+

+        private final Map<FlowId, Set<TypedStoredFlowEntry>>

+                flowEntries = Maps.newConcurrentMap();

+

+        private final Set<StoredFlowEntry> shortFlows = new HashSet<>();

+        private final Set<StoredFlowEntry> midFlows = new HashSet<>();

+        private final Set<StoredFlowEntry> longFlows = new HashSet<>();

+

+        // Assumed latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply

+        private final long latencyFlowStatsRequestAndReplyMillis = 500;

+

+

+        // Statistics for table operation

+        private long addCount = 0, addWithSetFlowLiveTypeCount = 0;

+        private long removeCount = 0;

+

+        /**

+         * Resets all count values with zero.

+         *

+         */

+        public void resetAllCount() {

+            addCount = 0;

+            addWithSetFlowLiveTypeCount = 0;

+            removeCount = 0;

+        }

+

+        // get set of flow entries for the given flowId

+        private Set<TypedStoredFlowEntry> getFlowEntriesInternal(FlowId flowId) {

+            return flowEntries.computeIfAbsent(flowId, id -> Sets.newCopyOnWriteArraySet());

+        }

+

+        // get flow entry for the given flow rule

+        private TypedStoredFlowEntry getFlowEntryInternal(FlowRule rule) {

+            Set<TypedStoredFlowEntry> flowEntries = getFlowEntriesInternal(rule.id());

+            return flowEntries.stream()

+                    .filter(entry -> Objects.equal(entry, rule))

+                    .findAny()

+                    .orElse(null);

+        }

+

+        // get the flow entries for all flows in flow table

+        private Set<TypedStoredFlowEntry> getFlowEntriesInternal() {

+            Set<TypedStoredFlowEntry> result = Sets.newHashSet();

+

+            flowEntries.values().forEach(result::addAll);

+            return result;

+        }

+

+        /**

+         * Gets the number of flow entry in flow table.

+         *

+         * @return the number of flow entry.

+         *

+         */

+        public long getFlowCount() {

+            return flowEntries.values().stream().mapToLong(Set::size).sum();

+        }

+

+        /**

+         * Gets the number of flow entry in flow table.

+         *

+         * @param rule the flow rule

+         * @return the typed flow entry.

+         *

+         */

+        public TypedStoredFlowEntry getFlowEntry(FlowRule rule) {

+            checkNotNull(rule);

+

+            return getFlowEntryInternal(rule);

+        }

+

+        /**

+         * Gets the all typed flow entries in flow table.

+         *

+         * @return the set of typed flow entry.

+         *

+         */

+        public Set<TypedStoredFlowEntry> getFlowEntries() {

+            return getFlowEntriesInternal();

+        }

+

+        /**

+         * Gets the short typed flow entries in flow table.

+         *

+         * @return the set of typed flow entry.

+         *

+         */

+        public Set<StoredFlowEntry> getShortFlows() {

+            return ImmutableSet.copyOf(shortFlows); //Sets.newHashSet(shortFlows);

+        }

+

+        /**

+         * Gets the mid typed flow entries in flow table.

+         *

+         * @return the set of typed flow entry.

+         *

+         */

+        public Set<StoredFlowEntry> getMidFlows() {

+            return ImmutableSet.copyOf(midFlows); //Sets.newHashSet(midFlows);

+        }

+

+        /**

+         * Gets the long typed flow entries in flow table.

+         *

+         * @return the set of typed flow entry.

+         *

+         */

+        public Set<StoredFlowEntry> getLongFlows() {

+            return ImmutableSet.copyOf(longFlows); //Sets.newHashSet(longFlows);

+        }

+

+        /**

+         * Add typed flow entry into table only.

+         *

+         * @param rule the flow rule

+         *

+         */

+        public synchronized void add(TypedStoredFlowEntry rule) {

+            checkNotNull(rule);

+

+            //rule have to be new DefaultTypedFlowEntry

+            boolean result = getFlowEntriesInternal(rule.id()).add(rule);

+

+            if (result) {

+                addCount++;

+            }

+        }

+

+        /**

+         * Calculates and set the flow live type at the first time,

+         * and then add it into a corresponding typed flow table.

+         *

+         * @param rule the flow rule

+         *

+         */

+        public void calAndSetFlowLiveType(TypedStoredFlowEntry rule) {

+            checkNotNull(rule);

+

+            calAndSetFlowLiveTypeInternal(rule);

+        }

+

+        /**

+         * Add the typed flow entry into table, and calculates and set the flow live type,

+         * and then add it into a corresponding typed flow table.

+         *

+         * @param rule the flow rule

+         *

+         */

+       public synchronized void addWithCalAndSetFlowLiveType(TypedStoredFlowEntry rule) {

+            checkNotNull(rule);

+

+            //rule have to be new DefaultTypedFlowEntry

+            boolean result = getFlowEntriesInternal(rule.id()).add(rule);

+            if (result) {

+                calAndSetFlowLiveTypeInternal(rule);

+                addWithSetFlowLiveTypeCount++;

+            } else {

+                log.debug("addWithCalAndSetFlowLiveType, FlowId=" + Long.toHexString(rule.id().value())

+                                + " ADD Failed, cause it may already exists in table !!!,"

+                                + " AdaptiveStats collection thread for {}",

+                        sw.getStringId());

+            }

+        }

+

+        // In real, calculates and set the flow live type at the first time,

+        // and then add it into a corresponding typed flow table

+        private void calAndSetFlowLiveTypeInternal(TypedStoredFlowEntry rule) {

+            long life = rule.life();

+            FlowLiveType prevFlowLiveType = rule.flowLiveType();

+

+            if (life >= longPollInterval) {

+                rule.setFlowLiveType(FlowLiveType.LONG_FLOW);

+                longFlows.add(rule);

+            } else if (life >= midPollInterval) {

+                rule.setFlowLiveType(FlowLiveType.MID_FLOW);

+                midFlows.add(rule);

+            } else if (life >= calAndPollInterval) {

+                rule.setFlowLiveType(FlowLiveType.SHORT_FLOW);

+                shortFlows.add(rule);

+            } else if (life >= 0) {

+                rule.setFlowLiveType(FlowLiveType.IMMEDIATE_FLOW);

+            } else { // life < 0

+                rule.setFlowLiveType(FlowLiveType.UNKNOWN_FLOW);

+            }

+

+            if (rule.flowLiveType() != prevFlowLiveType) {

+                switch (prevFlowLiveType) {

+                    // delete it from previous flow table

+                    case SHORT_FLOW:

+                        shortFlows.remove(rule);

+                        break;

+                    case MID_FLOW:

+                        midFlows.remove(rule);

+                        break;

+                    case LONG_FLOW:

+                        longFlows.remove(rule);

+                        break;

+                    default:

+                        break;

+                }

+            }

+        }

+

+

+        // check the flow live type based on current time, then set and add it into corresponding table

+        private boolean checkAndMoveLiveFlowInternal(TypedStoredFlowEntry fe, long cTime) {

+            long curTime = (cTime > 0 ? cTime : System.currentTimeMillis());

+            // For latency adjustment(default=500 millisecond) between FlowStatsRequest and Reply

+            long fromLastSeen = ((curTime - fe.lastSeen() + latencyFlowStatsRequestAndReplyMillis) / 1000);

+

+            // fe.life() unit is SECOND!

+            long liveTime = fe.life() + fromLastSeen;

+

+            // check flow timeout

+            if (fe.timeout() > calAndPollInterval && fromLastSeen > fe.timeout()) {

+                if (!fe.isPermanent()) {

+                    log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())

+                                    + ", liveType=" + fe.flowLiveType()

+                                    + ", liveTime=" + liveTime

+                                    + ", life=" + fe.life()

+                                    + ", fromLastSeen=" + fromLastSeen

+                                    + ", timeout=" + fe.timeout()

+                                    + ", isPermanent=" + fe.isPermanent()

+                                    + " AdaptiveStats collection thread for {}",

+                            sw.getStringId());

+                    return false;

+                }

+            }

+

+            switch (fe.flowLiveType()) {

+                case IMMEDIATE_FLOW:

+                    if (liveTime >= longPollInterval) {

+                        fe.setFlowLiveType(FlowLiveType.LONG_FLOW);

+                         longFlows.add(fe);

+                    } else if (liveTime >= midPollInterval) {

+                        fe.setFlowLiveType(FlowLiveType.MID_FLOW);

+                        midFlows.add(fe);

+                    } else if (liveTime >= calAndPollInterval) {

+                        fe.setFlowLiveType(FlowLiveType.SHORT_FLOW);

+                        shortFlows.add(fe);

+                    }

+                    break;

+                case SHORT_FLOW:

+                    if (liveTime >= longPollInterval) {

+                        fe.setFlowLiveType(FlowLiveType.LONG_FLOW);

+                        shortFlows.remove(fe);

+                        longFlows.add(fe);

+                    } else if (liveTime >= midPollInterval) {

+                        fe.setFlowLiveType(FlowLiveType.MID_FLOW);

+                        shortFlows.remove(fe);

+                        midFlows.add(fe);

+                    }

+                    break;

+                case MID_FLOW:

+                    if (liveTime >= longPollInterval) {

+                        fe.setFlowLiveType(FlowLiveType.LONG_FLOW);

+                        midFlows.remove(fe);

+                        longFlows.add(fe);

+                    }

+                    break;

+                case LONG_FLOW:

+                    if (fromLastSeen > entirePollInterval) {

+                        log.trace("checkAndMoveLiveFlowInternal, flow is already removed at switch.");

+                        return false;

+                    }

+                    break;

+                case UNKNOWN_FLOW: // Unknown flow is an internal error flow type, just fall through

+                default :

+                    // Error Unknown Live Type

+                    log.error("checkAndMoveLiveFlowInternal, Unknown Live Type error!"

+                            + "AdaptiveStats collection thread for {}",

+                            sw.getStringId());

+                    return false;

+            }

+

+            log.debug("checkAndMoveLiveFlowInternal, FlowId=" + Long.toHexString(fe.id().value())

+                            + ", state=" + fe.state()

+                            + ", After liveType=" + fe.flowLiveType()

+                            + ", liveTime=" + liveTime

+                            + ", life=" + fe.life()

+                            + ", bytes=" + fe.bytes()

+                            + ", packets=" + fe.packets()

+                            + ", fromLastSeen=" + fromLastSeen

+                            + ", priority=" + fe.priority()

+                            + ", selector=" + fe.selector().criteria()

+                            + ", treatment=" + fe.treatment()

+                            + " AdaptiveStats collection thread for {}",

+                    sw.getStringId());

+

+            return true;

+        }

+

+        /**

+         * Check and move live type for all type flow entries in table at every calAndPollInterval time.

+         *

+         */

+        public void checkAndMoveLiveFlowAll() {

+            Set<TypedStoredFlowEntry> typedFlowEntries = getFlowEntriesInternal();

+

+            long calCurTime = System.currentTimeMillis();

+            typedFlowEntries.forEach(fe -> {

+                if (!checkAndMoveLiveFlowInternal(fe, calCurTime)) {

+                    remove(fe);

+                }

+            });

+

+            // print table counts for debug

+            if (log.isDebugEnabled()) {

+                synchronized (this) {

+                    long totalFlowCount = getFlowCount();

+                    long shortFlowCount = shortFlows.size();

+                    long midFlowCount = midFlows.size();

+                    long longFlowCount = longFlows.size();

+                    long immediateFlowCount = totalFlowCount - shortFlowCount - midFlowCount - longFlowCount;

+                    long calTotalCount = addCount + addWithSetFlowLiveTypeCount - removeCount;

+

+                    log.debug("--------------------------------------------------------------------------- for {}",

+                            sw.getStringId());

+                    log.debug("checkAndMoveLiveFlowAll, Total Flow_Count=" + totalFlowCount

+                            + ", add - remove_Count=" + calTotalCount

+                            + ", IMMEDIATE_FLOW_Count=" + immediateFlowCount

+                            + ", SHORT_FLOW_Count=" + shortFlowCount

+                            + ", MID_FLOW_Count=" + midFlowCount

+                            + ", LONG_FLOW_Count=" + longFlowCount

+                            + ", add_Count=" + addCount

+                            + ", addWithSetFlowLiveType_Count=" + addWithSetFlowLiveTypeCount

+                            + ", remove_Count=" + removeCount

+                            + " AdaptiveStats collection thread for {}", sw.getStringId());

+                    log.debug("--------------------------------------------------------------------------- for {}",

+                            sw.getStringId());

+                    if (totalFlowCount != calTotalCount) {

+                        log.error("checkAndMoveLiveFlowAll, Real total flow count and "

+                                + "calculated total flow count do NOT match, something is wrong internally "

+                                + "or check counter value bound is over!");

+                    }

+                    if (immediateFlowCount < 0) {

+                        log.error("checkAndMoveLiveFlowAll, IMMEDIATE_FLOW count is negative, "

+                                + "something is wrong internally "

+                                + "or check counter value bound is over!");

+                    }

+                }

+            }

+            log.trace("checkAndMoveLiveFlowAll, AdaptiveStats for {}", sw.getStringId());

+        }

+

+        /**

+         * Remove the typed flow entry from table.

+         *

+         * @param rule the flow rule

+         *

+         */

+        public synchronized void remove(FlowRule rule) {

+            checkNotNull(rule);

+

+            TypedStoredFlowEntry removeStore = getFlowEntryInternal(rule);

+            if (removeStore != null) {

+                removeLiveFlowsInternal((TypedStoredFlowEntry) removeStore);

+                boolean result = getFlowEntriesInternal(rule.id()).remove(removeStore);

+

+                if (result) {

+                    removeCount++;

+                }

+            }

+       }

+

+        // Remove the typed flow entry from corresponding table

+        private void removeLiveFlowsInternal(TypedStoredFlowEntry fe) {

+            switch (fe.flowLiveType()) {

+                case IMMEDIATE_FLOW:

+                    // do nothing

+                    break;

+                case SHORT_FLOW:

+                    shortFlows.remove(fe);

+                    break;

+                case MID_FLOW:

+                    midFlows.remove(fe);

+                    break;

+                case LONG_FLOW:

+                    longFlows.remove(fe);

+                    break;

+                default: // error in Flow Live Type

+                    log.error("removeLiveFlowsInternal, Unknown Live Type error!");

+                    break;

+            }

+        }

+    }

+}

diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index de079e0..949c657 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2014 Open Networking Laboratory
+ * Copyright 2015 Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -76,6 +76,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -99,11 +100,16 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
-    private static final int DEFAULT_POLL_FREQUENCY = 10;
+    private static final int DEFAULT_POLL_FREQUENCY = 5;
     @Property(name = "flowPollFrequency", intValue = DEFAULT_POLL_FREQUENCY,
             label = "Frequency (in seconds) for polling flow statistics")
     private int flowPollFrequency = DEFAULT_POLL_FREQUENCY;
 
+    private static final boolean DEFAULT_ADAPTIVE_FLOW_SAMPLING = true;
+    @Property(name = "adaptiveFlowSampling", boolValue = DEFAULT_ADAPTIVE_FLOW_SAMPLING,
+            label = "Adaptive Flow Sampling is on or off")
+    private boolean adaptiveFlowSampling = DEFAULT_ADAPTIVE_FLOW_SAMPLING;
+
     private FlowRuleProviderService providerService;
 
     private final InternalFlowProvider listener = new InternalFlowProvider();
@@ -111,7 +117,10 @@
     private Cache<Long, InternalCacheEntry> pendingBatches;
 
     private final Timer timer = new Timer("onos-openflow-collector");
-    private final Map<Dpid, FlowStatsCollector> collectors = Maps.newHashMap();
+    private final Map<Dpid, FlowStatsCollector> simpleCollectors = Maps.newHashMap();
+
+    // NewAdaptiveFlowStatsCollector Set
+    private final Map<Dpid, NewAdaptiveFlowStatsCollector> afsCollectors = Maps.newHashMap();
 
     /**
      * Creates an OpenFlow host provider.
@@ -128,9 +137,11 @@
         controller.addEventListener(listener);
 
         pendingBatches = createBatchCache();
+
         createCollectors();
 
-        log.info("Started");
+        log.info("Started with flowPollFrequency = {}, adaptiveFlowSampling = {}",
+                flowPollFrequency, adaptiveFlowSampling);
     }
 
     @Deactivate
@@ -161,6 +172,20 @@
         }
 
         log.info("Settings: flowPollFrequency={}", flowPollFrequency);
+
+        boolean newAdaptiveFlowSampling;
+        String s = get(properties, "adaptiveFlowSampling");
+        newAdaptiveFlowSampling = isNullOrEmpty(s) ? adaptiveFlowSampling : Boolean.parseBoolean(s.trim());
+
+        if (newAdaptiveFlowSampling != adaptiveFlowSampling) {
+            // stop previous collector
+            stopCollectors();
+            adaptiveFlowSampling = newAdaptiveFlowSampling;
+            // create new collectors
+            createCollectors();
+        }
+
+        log.info("Settings: adaptiveFlowSampling={}", adaptiveFlowSampling);
     }
 
     private Cache<Long, InternalCacheEntry> createBatchCache() {
@@ -179,19 +204,38 @@
     }
 
     private void createCollector(OpenFlowSwitch sw) {
-        FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
-        fsc.start();
-        collectors.put(new Dpid(sw.getId()), fsc);
+        if (adaptiveFlowSampling) {
+            // NewAdaptiveFlowStatsCollector Constructor
+            NewAdaptiveFlowStatsCollector fsc = new NewAdaptiveFlowStatsCollector(sw, flowPollFrequency);
+            fsc.start();
+            afsCollectors.put(new Dpid(sw.getId()), fsc);
+        } else {
+            FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
+            fsc.start();
+            simpleCollectors.put(new Dpid(sw.getId()), fsc);
+        }
     }
 
     private void stopCollectors() {
-        collectors.values().forEach(FlowStatsCollector::stop);
-        collectors.clear();
+        if (adaptiveFlowSampling) {
+            // NewAdaptiveFlowStatsCollector Destructor
+            afsCollectors.values().forEach(NewAdaptiveFlowStatsCollector::stop);
+            afsCollectors.clear();
+        } else {
+            simpleCollectors.values().forEach(FlowStatsCollector::stop);
+            simpleCollectors.clear();
+        }
     }
 
     private void adjustRate() {
         DefaultLoad.setPollInterval(flowPollFrequency);
-        collectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
+
+        if (adaptiveFlowSampling) {
+            // NewAdaptiveFlowStatsCollector calAndPollInterval
+            afsCollectors.values().forEach(fsc -> fsc.adjustCalAndPollInterval(flowPollFrequency));
+        } else {
+            simpleCollectors.values().forEach(fsc -> fsc.adjustPollInterval(flowPollFrequency));
+        }
     }
 
     @Override
@@ -202,8 +246,9 @@
     }
 
     private void applyRule(FlowRule flowRule) {
-        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
-                                                                   .uri()));
+        Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
+        OpenFlowSwitch sw = controller.getSwitch(dpid);
+
         FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
         if (hasPayload(flowRuleExtPayLoad)) {
             OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
@@ -212,6 +257,11 @@
         }
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
                                           Optional.empty()).buildFlowAdd());
+
+        if (adaptiveFlowSampling) {
+            // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+            afsCollectors.get(dpid).addWithFlowRule(flowRule);
+        }
     }
 
     @Override
@@ -222,8 +272,9 @@
     }
 
     private void removeRule(FlowRule flowRule) {
-        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(flowRule.deviceId()
-                                                                   .uri()));
+        Dpid dpid = Dpid.dpid(flowRule.deviceId().uri());
+        OpenFlowSwitch sw = controller.getSwitch(dpid);
+
         FlowRuleExtPayLoad flowRuleExtPayLoad = flowRule.payLoad();
         if (hasPayload(flowRuleExtPayLoad)) {
             OFMessage msg = new ThirdPartyMessage(flowRuleExtPayLoad.payLoad());
@@ -232,6 +283,11 @@
         }
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
                                           Optional.empty()).buildFlowDel());
+
+        if (adaptiveFlowSampling) {
+            // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+            afsCollectors.get(dpid).removeFlows(flowRule);
+        }
     }
 
     @Override
@@ -242,11 +298,12 @@
 
     @Override
     public void executeBatch(FlowRuleBatchOperation batch) {
+        checkNotNull(batch);
 
         pendingBatches.put(batch.id(), new InternalCacheEntry(batch));
 
-        OpenFlowSwitch sw = controller.getSwitch(Dpid.dpid(batch.deviceId()
-                                                                   .uri()));
+        Dpid dpid = Dpid.dpid(batch.deviceId().uri());
+        OpenFlowSwitch sw = controller.getSwitch(dpid);
         OFFlowMod mod;
         for (FlowRuleBatchEntry fbe : batch.getOperations()) {
             // flow is the third party privacy flow
@@ -262,16 +319,32 @@
             switch (fbe.operator()) {
                 case ADD:
                     mod = builder.buildFlowAdd();
+
+                    if (adaptiveFlowSampling) {
+                        // Add TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+                        afsCollectors.get(dpid).addWithFlowRule(fbe.target());
+                    }
                     break;
                 case REMOVE:
                     mod = builder.buildFlowDel();
+
+                    if (adaptiveFlowSampling) {
+                        // Remove TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+                        afsCollectors.get(dpid).removeFlows(fbe.target());
+                    }
                     break;
                 case MODIFY:
                     mod = builder.buildFlowMod();
+
+                    if (adaptiveFlowSampling) {
+                        // Add or Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+                        // afsCollectors.get(dpid).addWithFlowRule(fbe.target()); //check if add is good or not
+                        afsCollectors.get(dpid).addOrUpdateFlows((FlowEntry) fbe.target());
+                    }
                     break;
                 default:
                     log.error("Unsupported batch operation {}; skipping flowmod {}",
-                              fbe.operator(), fbe);
+                            fbe.operator(), fbe);
                     continue;
             }
             sw.sendMsg(mod);
@@ -292,14 +365,24 @@
 
         @Override
         public void switchAdded(Dpid dpid) {
+
+            OpenFlowSwitch sw = controller.getSwitch(dpid);
+
             createCollector(controller.getSwitch(dpid));
         }
 
         @Override
         public void switchRemoved(Dpid dpid) {
-            FlowStatsCollector collector = collectors.remove(dpid);
-            if (collector != null) {
-                collector.stop();
+            if (adaptiveFlowSampling) {
+                NewAdaptiveFlowStatsCollector collector = afsCollectors.remove(dpid);
+                if (collector != null) {
+                    collector.stop();
+                }
+            } else {
+                FlowStatsCollector collector = simpleCollectors.remove(dpid);
+                if (collector != null) {
+                    collector.stop();
+                }
             }
         }
 
@@ -321,6 +404,11 @@
 
                     FlowEntry fr = new FlowEntryBuilder(dpid, removed).build();
                     providerService.flowRemoved(fr);
+
+                    if (adaptiveFlowSampling) {
+                        // Removed TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+                        afsCollectors.get(dpid).flowRemoved(fr);
+                    }
                     break;
                 case STATS_REPLY:
                     if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
@@ -370,11 +458,10 @@
                                               + " tell us which one.");
                         }
                     }
-                    break;
+
                 default:
                     log.debug("Unhandled message type: {}", msg.getType());
             }
-
         }
 
         @Override
@@ -392,7 +479,38 @@
                     .map(entry -> new FlowEntryBuilder(dpid, entry).build())
                     .collect(Collectors.toList());
 
-            providerService.pushFlowMetrics(did, flowEntries);
+            if (adaptiveFlowSampling)  {
+                NewAdaptiveFlowStatsCollector afsc = afsCollectors.get(dpid);
+
+                synchronized (afsc) {
+                    if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
+                        log.debug("OpenFlowRuleProvider:pushFlowMetrics, flowMissingXid={}, "
+                                        + "OFFlowStatsReply Xid={}, for {}",
+                                afsc.getFlowMissingXid(), replies.getXid(), dpid);
+                    }
+
+                    // Check that OFFlowStatsReply Xid is same with the one of OFFlowStatsRequest?
+                    if (afsc.getFlowMissingXid() != NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID) {
+                        if (afsc.getFlowMissingXid() == replies.getXid()) {
+                            // call entire flow stats update with flowMissing synchronization.
+                            // used existing pushFlowMetrics
+                            providerService.pushFlowMetrics(did, flowEntries);
+                        }
+                        // reset flowMissingXid to NO_FLOW_MISSING_XID
+                        afsc.setFlowMissingXid(NewAdaptiveFlowStatsCollector.NO_FLOW_MISSING_XID);
+
+                    } else {
+                        // call individual flow stats update
+                        providerService.pushFlowMetricsWithoutFlowMissing(did, flowEntries);
+                    }
+
+                    // Update TypedFlowEntry to deviceFlowEntries in NewAdaptiveFlowStatsCollector
+                    afsc.pushFlowMetrics(flowEntries);
+                }
+            } else {
+                // call existing entire flow stats update with flowMissing synchronization
+                providerService.pushFlowMetrics(did, flowEntries);
+            }
         }
     }