Modify OpenFlow provider to avoid stats collection when switch is under high load
* Track load via sliding window
* Pause stats collection when load is significantly above average
* Resume stats collection when load returns to normal
* Pause stats collection when reply is in-flight

Change-Id: I3159b4f806a6405ca6be494534497348716cc921
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 8730378..5f2713b5 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -293,6 +293,34 @@
         tableStatsCollectors.values().forEach(tsc -> tsc.adjustPollInterval(flowPollFrequency));
     }
 
+    private void resetEvents(Dpid dpid) {
+        SwitchDataCollector collector;
+        if (adaptiveFlowSampling) {
+            collector = afsCollectors.get(dpid);
+        } else {
+            collector = simpleCollectors.get(dpid);
+        }
+        if (collector != null) {
+            collector.resetEvents();
+        }
+    }
+
+    private void recordEvent(Dpid dpid) {
+        recordEvents(dpid, 1);
+    }
+
+    private void recordEvents(Dpid dpid, int events) {
+        SwitchDataCollector collector;
+        if (adaptiveFlowSampling) {
+            collector = afsCollectors.get(dpid);
+        } else {
+            collector = simpleCollectors.get(dpid);
+        }
+        if (collector != null) {
+            collector.recordEvents(events);
+        }
+    }
+
     @Override
     public void applyFlowRule(FlowRule... flowRules) {
         for (FlowRule flowRule : flowRules) {
@@ -310,6 +338,8 @@
 
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
                 Optional.empty(), Optional.of(driverService)).buildFlowAdd());
+
+        recordEvent(dpid);
     }
 
     @Override
@@ -329,6 +359,8 @@
 
         sw.sendMsg(FlowModBuilder.builder(flowRule, sw.factory(),
                                           Optional.empty(), Optional.of(driverService)).buildFlowDel());
+
+        recordEvent(dpid);
     }
 
     @Override
@@ -371,6 +403,8 @@
         OFBarrierRequest.Builder builder = sw.factory().buildBarrierRequest()
                 .setXid(batch.id());
         sw.sendMsg(builder.build());
+
+        recordEvents(dpid, batch.getOperations().size());
     }
 
     private class InternalFlowProvider
@@ -416,6 +450,16 @@
                     break;
                 case STATS_REPLY:
                     if (((OFStatsReply) msg).getStatsType() == OFStatsType.FLOW) {
+                        // Let's unblock first the collector
+                        SwitchDataCollector collector;
+                        if (adaptiveFlowSampling) {
+                            collector = afsCollectors.get(dpid);
+                        } else {
+                            collector = simpleCollectors.get(dpid);
+                        }
+                        if (collector != null) {
+                            collector.received();
+                        }
                         pushFlowMetrics(dpid, (OFFlowStatsReply) msg, getDriver(deviceId));
                     } else if (((OFStatsReply) msg).getStatsType() == OFStatsType.TABLE) {
                         pushTableStatistics(dpid, (OFTableStatsReply) msg);
@@ -584,7 +628,9 @@
         @Override
         public void receivedRoleReply(Dpid dpid, RoleState requested,
                                       RoleState response) {
-            // Do nothing here for now.
+            if (response == RoleState.MASTER) {
+                resetEvents(dpid);
+            }
         }
 
         private DriverHandler getDriver(DeviceId devId) {
@@ -757,4 +803,4 @@
         }
     }
 
-}
\ No newline at end of file
+}