Prevents the stop of the stats collection

- Uses executor in place of a timer which can become unable
to schedule futher tasks if the exceptions are not caught
- Flow stats collector will wait at most 5 flows stats interval
for a reply before to request new stats

Change-Id: Ic50efd7b8afa1f182ba32c04aa971d3ef400529e
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
index c8bf707..638c195 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/FlowStatsCollector.java
@@ -25,9 +25,12 @@
 import org.projectfloodlight.openflow.types.TableId;
 import org.slf4j.Logger;
 
-import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.lang.Math.max;
 import static java.lang.Math.min;
@@ -40,7 +43,7 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final int SECONDS = 1000;
+    private static final int MS = 1000;
 
     // Number of ticks which defines the pause window.
     private static final int PAUSE_WINDOW = 2;
@@ -56,27 +59,30 @@
     private static final int MAX_LOAD_RATE = 500;
 
     private final OpenFlowSwitch sw;
-    private Timer timer;
+    private ScheduledExecutorService executorService;
     private TimerTask pauseTask;
+    private ScheduledFuture<?> scheduledPauseTask;
     private TimerTask pollTask;
+    private ScheduledFuture<?> scheduledPollTask;
 
     private SlidingWindowCounter loadCounter;
     // Defines whether the collector is in pause or not for high load
     private final AtomicBoolean paused = new AtomicBoolean();
     // Defines whether the collector is in waiting or not for a previous stats reply
-    private final AtomicBoolean waiting = new AtomicBoolean();
+    private static final int WAITING_ATTEMPTS = 5;
+    private final AtomicInteger waiting = new AtomicInteger(0);
 
     private int pollInterval;
 
     /**
      * Creates a new collector for the given switch and poll frequency.
      *
-     * @param timer        timer to use for scheduling
-     * @param sw           switch to pull
+     * @param executorService executor used for scheduling
+     * @param sw switch to pull
      * @param pollInterval poll frequency in seconds
      */
-    FlowStatsCollector(Timer timer, OpenFlowSwitch sw, int pollInterval) {
-        this.timer = timer;
+    FlowStatsCollector(ScheduledExecutorService executorService, OpenFlowSwitch sw, int pollInterval) {
+        this.executorService = executorService;
         this.sw = checkNotNull(sw, "Null switch");
         this.pollInterval = pollInterval;
     }
@@ -91,11 +97,16 @@
         if (pollTask != null) {
             pollTask.cancel();
         }
+        if (scheduledPollTask != null) {
+            scheduledPollTask.cancel(false);
+        }
         // If we went through start - let's schedule it
         if (loadCounter != null) {
             pollTask = new PollTimerTask();
-            timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
+            scheduledPollTask = executorService.scheduleAtFixedRate(pollTask, pollInterval * MS,
+                                                pollInterval * MS, TimeUnit.MILLISECONDS);
         }
+        waiting.set(0);
     }
 
     /**
@@ -109,7 +120,7 @@
         }
         // Let's reset also waiting, the reply can be discarded/lost
         // during a change of mastership
-        waiting.set(false);
+        waiting.set(0);
     }
 
     /**
@@ -169,7 +180,12 @@
         @Override
         public void run() {
             // Check whether we are still waiting a previous reply
-            if (sw.getRole() == RoleState.MASTER && !waiting.get()) {
+            if (waiting.getAndDecrement() > 0) {
+                log.debug("Skipping stats collection for {} waiting for previous reply", sw.getStringId());
+                return;
+            }
+            // Check whether we are the master of the switch
+            if (sw.getRole() == RoleState.MASTER) {
                 // Check whether the switch is under high load from this master. This is done here in case a large
                 // batch was pushed immediately prior to this task running.
                 if (isHighLoad()) {
@@ -195,7 +211,7 @@
                 sw.sendMsg(request);
                 // Other flow stats will not be asked
                 // if we don't see first the reply of this request
-                waiting.set(true);
+                waiting.set(WAITING_ATTEMPTS);
             }
         }
     }
@@ -205,9 +221,11 @@
         log.debug("Starting Stats collection thread for {}", sw.getStringId());
         loadCounter = new SlidingWindowCounter(HIGH_WINDOW);
         pauseTask = new PauseTimerTask();
-        timer.scheduleAtFixedRate(pauseTask, 1 * SECONDS, 1 * SECONDS);
+        scheduledPauseTask = executorService.scheduleAtFixedRate(pauseTask, 1 * MS,
+                                            1 * MS, TimeUnit.MILLISECONDS);
         pollTask = new PollTimerTask();
-        timer.scheduleAtFixedRate(pollTask, 1 * SECONDS, pollInterval * SECONDS);
+        scheduledPollTask = executorService.scheduleAtFixedRate(pollTask, 1 * MS,
+                                            pollInterval * MS, TimeUnit.MILLISECONDS);
     }
 
     private synchronized void pause() {
@@ -219,6 +237,10 @@
             pollTask.cancel();
             pollTask = null;
         }
+        if (scheduledPollTask != null) {
+            scheduledPollTask.cancel(false);
+            scheduledPollTask = null;
+        }
     }
 
     private synchronized void resume() {
@@ -227,7 +249,8 @@
                   loadCounter.getWindowRate(PAUSE_WINDOW),
                   loadCounter.getWindowRate(HIGH_WINDOW));
         pollTask = new PollTimerTask();
-        timer.scheduleAtFixedRate(pollTask, pollInterval * SECONDS, pollInterval * SECONDS);
+        scheduledPollTask = executorService.scheduleAtFixedRate(pollTask, pollInterval * MS,
+                                            pollInterval * MS, TimeUnit.MILLISECONDS);
     }
 
     public synchronized void stop() {
@@ -235,11 +258,19 @@
             pauseTask.cancel();
             pauseTask = null;
         }
+        if (scheduledPauseTask != null) {
+            scheduledPauseTask.cancel(false);
+            scheduledPauseTask = null;
+        }
         if (pollTask != null) {
             log.debug("Stopping Stats collection thread for {}", sw.getStringId());
             pollTask.cancel();
             pollTask = null;
         }
+        if (scheduledPollTask != null) {
+            scheduledPollTask.cancel(false);
+            scheduledPollTask = null;
+        }
         if (loadCounter != null) {
             loadCounter.destroy();
             loadCounter = null;
@@ -247,7 +278,7 @@
     }
 
     public void received() {
-        waiting.set(false);
+        waiting.set(0);
     }
 
 }
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
index 6b47889..c051c5e 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -92,17 +92,20 @@
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.Timer;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.get;
 import static org.onosproject.provider.of.flow.impl.OsgiPropertyConstants.ADAPTIVE_FLOW_SAMPLING;
 import static org.onosproject.provider.of.flow.impl.OsgiPropertyConstants.ADAPTIVE_FLOW_SAMPLING_DEFAULT;
 import static org.onosproject.provider.of.flow.impl.OsgiPropertyConstants.POLL_FREQUENCY;
 import static org.onosproject.provider.of.flow.impl.OsgiPropertyConstants.POLL_FREQUENCY_DEFAULT;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -146,8 +149,8 @@
 
     private Cache<Long, InternalCacheEntry> pendingBatches;
 
-    private final Timer timer = new Timer("onos-openflow-collector");
-
+    private ScheduledExecutorService executorService = newScheduledThreadPool(1,
+                                   groupedThreads("onos/of", "collector-%d", log));
 
     // Old simple collector set
     private final Map<Dpid, FlowStatsCollector> simpleCollectors = Maps.newConcurrentMap();
@@ -169,6 +172,8 @@
         providerService = providerRegistry.register(this);
         controller.addListener(listener);
         controller.addEventListener(listener);
+        // Evicts the tasks if cancelled
+        ((ScheduledThreadPoolExecutor) executorService).setRemoveOnCancelPolicy(true);
 
         modified(context);
 
@@ -186,6 +191,7 @@
         stopCollectors();
         providerRegistry.unregister(this);
         providerService = null;
+        executorService.shutdown();
 
         log.info("Stopped");
     }
@@ -251,13 +257,13 @@
                 stopCollectorIfNeeded(afsCollectors.put(new Dpid(sw.getId()), fsc));
                 fsc.start();
             } else {
-                FlowStatsCollector fsc = new FlowStatsCollector(timer, sw, flowPollFrequency);
+                FlowStatsCollector fsc = new FlowStatsCollector(executorService, sw, flowPollFrequency);
                 stopCollectorIfNeeded(simpleCollectors.put(new Dpid(sw.getId()), fsc));
                 fsc.start();
             }
         }
         if (sw.features().getCapabilities().contains(OFCapabilities.TABLE_STATS)) {
-            TableStatisticsCollector tsc = new TableStatisticsCollector(timer, sw, flowPollFrequency);
+            TableStatisticsCollector tsc = new TableStatisticsCollector(executorService, sw, flowPollFrequency);
             stopCollectorIfNeeded(tableStatsCollectors.put(new Dpid(sw.getId()), tsc));
             tsc.start();
         }
diff --git a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/TableStatisticsCollector.java b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/TableStatisticsCollector.java
index a3644d2..466c91a 100644
--- a/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/TableStatisticsCollector.java
+++ b/providers/openflow/flow/src/main/java/org/onosproject/provider/of/flow/impl/TableStatisticsCollector.java
@@ -15,14 +15,15 @@
  */
 package org.onosproject.provider.of.flow.impl;
 
-import org.onlab.util.SharedExecutors;
 import org.onosproject.openflow.controller.OpenFlowSwitch;
 import org.onosproject.openflow.controller.RoleState;
 import org.projectfloodlight.openflow.protocol.OFTableStatsRequest;
 import org.slf4j.Logger;
 
-import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -33,23 +34,24 @@
 
     private final Logger log = getLogger(getClass());
 
-    public static final long SECONDS = 1000L;
+    public static final long MS = 1000;
 
     private final OpenFlowSwitch sw;
-    private Timer timer;
+    private ScheduledExecutorService executorService;
     private TimerTask task;
+    private ScheduledFuture<?> scheduledTask;
 
     private int pollInterval;
 
     /**
      * Creates a new table statistics collector for the given switch and poll frequency.
      *
-     * @param timer        timer to use for scheduling
-     * @param sw           switch to pull
+     * @param executorService executor used for scheduling
+     * @param sw switch to pull
      * @param pollInterval poll frequency in seconds
      */
-    TableStatisticsCollector(Timer timer, OpenFlowSwitch sw, int pollInterval) {
-        this.timer = timer;
+    TableStatisticsCollector(ScheduledExecutorService executorService, OpenFlowSwitch sw, int pollInterval) {
+        this.executorService = executorService;
         this.sw = sw;
         this.pollInterval = pollInterval;
     }
@@ -61,9 +63,15 @@
      */
     synchronized void adjustPollInterval(int pollInterval) {
         this.pollInterval = pollInterval;
-        task.cancel();
+        if (task != null) {
+            task.cancel();
+        }
         task = new InternalTimerTask();
-        timer.scheduleAtFixedRate(task, pollInterval * SECONDS, pollInterval * 1000L);
+        if (scheduledTask != null) {
+            scheduledTask.cancel(false);
+        }
+        scheduledTask = executorService.scheduleAtFixedRate(task, pollInterval * MS,
+                                            pollInterval * MS, TimeUnit.MILLISECONDS);
     }
 
     private class InternalTimerTask extends TimerTask {
@@ -82,14 +90,20 @@
         // Initially start polling quickly. Then drop down to configured value
         log.debug("Starting Table Stats collection thread for {}", sw.getStringId());
         task = new InternalTimerTask();
-        SharedExecutors.getTimer().scheduleAtFixedRate(task, 1 * SECONDS,
-                                                       pollInterval * SECONDS);
+        scheduledTask = executorService.scheduleAtFixedRate(task, 1 * MS,
+                                            pollInterval * MS, TimeUnit.MILLISECONDS);
     }
 
     public synchronized void stop() {
         log.debug("Stopping Table Stats collection thread for {}", sw.getStringId());
-        task.cancel();
+        if (task != null) {
+            task.cancel();
+        }
         task = null;
+        if (scheduledTask != null) {
+            scheduledTask.cancel(false);
+        }
+        scheduledTask = null;
     }
 
 }