Prevents the stop of the stats collection

- Uses executor in place of a timer which can become unable
to schedule futher tasks if the exceptions are not caught
- Flow stats collector will wait at most 5 flows stats interval
for a reply before to request new stats

Change-Id: Ic50efd7b8afa1f182ba32c04aa971d3ef400529e
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
index c8bf707..638c195 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
@@ -25,9 +25,12 @@
 import org.projectfloodlight.openflow.types.TableId;
 import org.slf4j.Logger;
 
-import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
@@ -40,7 +43,7 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final int SECONDS = 1000;
+    private static final int MS = 1000;
 
     // Number of ticks which defines the pause window.
     private static final int PAUSE_WINDOW = 2;
@@ -56,27 +59,30 @@
     private static final int MAX_LOAD_RATE = 500;
 
     private final OpenFlowSwitch sw;
-    private Timer timer;
+    private ScheduledExecutorService executorService;
     private TimerTask pauseTask;
+    private ScheduledFuture<?> scheduledPauseTask;
     private TimerTask pollTask;
+    private ScheduledFuture<?> scheduledPollTask;
 
     private SlidingWindowCounter loadCounter;
     // Defines whether the collector is in pause or not for high load
     private final AtomicBoolean paused = new AtomicBoolean();
     // Defines whether the collector is in waiting or not for a previous stats reply
-    private final AtomicBoolean waiting = new AtomicBoolean();
+    private static final int WAITING_ATTEMPTS = 5;
+    private final AtomicInteger waiting = new AtomicInteger(0);
 
     private int pollInterval;
 
     /**
      * Creates a new collector for the given switch and poll frequency.
      *
-     * @param timer        timer to use for scheduling
-     * @param sw           switch to pull
+     * @param executorService executor used for scheduling
+     * @param sw switch to pull
      * @param pollInterval poll frequency in seconds
      */
-    FlowStatsCollector(Timer timer, OpenFlowSwitch sw, int pollInterval) {
-        this.timer = timer;
+    FlowStatsCollector(ScheduledExecutorService executorService, OpenFlowSwitch sw, int pollInterval) {
+        this.executorService = executorService;
         this.sw = checkNotNull(sw, "Null switch");
         this.pollInterval = pollInterval;
     }
@@ -91,11 +97,16 @@
         if (pollTask != null) {
             pollTask.cancel();
         }
+        if (scheduledPollTask != null) {
+            scheduledPollTask.cancel(false);
+        }
         // If we went through start - let's schedule it
         if (loadCounter != null) {
             pollTask = new PollTimerTask();
-            timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
+            scheduledPollTask = executorService.scheduleAtFixedRate(pollTask, pollInterval * MS,
+                                                pollInterval * MS, TimeUnit.MILLISECONDS);
         }
+        waiting.set(0);
     }
 
     /**
@@ -109,7 +120,7 @@
         }
         // Let's reset also waiting, the reply can be discarded/lost
         // during a change of mastership
-        waiting.set(false);
+        waiting.set(0);
     }
 
     /**
@@ -169,7 +180,12 @@
         @Override
         public void run() {
             // Check whether we are still waiting a previous reply
-            if (sw.getRole() == RoleState.MASTER && !waiting.get()) {
+            if (waiting.getAndDecrement() > 0) {
+                log.debug("Skipping stats collection for {} waiting for previous reply", sw.getStringId());
+                return;
+            }
+            // Check whether we are the master of the switch
+            if (sw.getRole() == RoleState.MASTER) {
                 // Check whether the switch is under high load from this master. This is done here in case a large
                 // batch was pushed immediately prior to this task running.
                 if (isHighLoad()) {
@@ -195,7 +211,7 @@
                 sw.sendMsg(request);
                 // Other flow stats will not be asked
                 // if we don't see first the reply of this request
-                waiting.set(true);
+                waiting.set(WAITING_ATTEMPTS);
             }
         }
     }
@@ -205,9 +221,11 @@
         log.debug("Starting Stats collection thread for {}", sw.getStringId());
         loadCounter = new SlidingWindowCounter(HIGH_WINDOW);
         pauseTask = new PauseTimerTask();
-        timer.scheduleAtFixedRate(pauseTask, 1 * SECONDS, 1 * SECONDS);
+        scheduledPauseTask = executorService.scheduleAtFixedRate(pauseTask, 1 * MS,
+                                            1 * MS, TimeUnit.MILLISECONDS);
         pollTask = new PollTimerTask();
-        timer.scheduleAtFixedRate(pollTask, 1 * SECONDS, pollInterval * SECONDS);
+        scheduledPollTask = executorService.scheduleAtFixedRate(pollTask, 1 * MS,
+                                            pollInterval * MS, TimeUnit.MILLISECONDS);
     }
 
     private synchronized void pause() {
@@ -219,6 +237,10 @@
             pollTask.cancel();
             pollTask = null;
         }
+        if (scheduledPollTask != null) {
+            scheduledPollTask.cancel(false);
+            scheduledPollTask = null;
+        }
     }
 
     private synchronized void resume() {
@@ -227,7 +249,8 @@
                   loadCounter.getWindowRate(PAUSE_WINDOW),
                   loadCounter.getWindowRate(HIGH_WINDOW));
         pollTask = new PollTimerTask();
-        timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
+        scheduledPollTask = executorService.scheduleAtFixedRate(pollTask, pollInterval * MS,
+                                            pollInterval * MS, TimeUnit.MILLISECONDS);
     }
 
     public synchronized void stop() {
@@ -235,11 +258,19 @@
             pauseTask.cancel();
             pauseTask = null;
         }
+        if (scheduledPauseTask != null) {
+            scheduledPauseTask.cancel(false);
+            scheduledPauseTask = null;
+        }
         if (pollTask != null) {
             log.debug("Stopping Stats collection thread for {}", sw.getStringId());
             pollTask.cancel();
             pollTask = null;
         }
+        if (scheduledPollTask != null) {
+            scheduledPollTask.cancel(false);
+            scheduledPollTask = null;
+        }
         if (loadCounter != null) {
             loadCounter.destroy();
             loadCounter = null;
@@ -247,7 +278,7 @@
     }
 
     public void received() {
-        waiting.set(false);
+        waiting.set(0);
     }
 
 }