[ONOS-4142] Restart failed tasks when using SharedScheduledExecutor

With current SharedScheduledExecutor, all failed tasks are simply
suspended in background. This commit enables tasks that are
executed using SharedScheduledExecutor to have the ability to
be restarted even if the tasks are encountered failures.

Change-Id: Ibe00c7f5920b8ae3fe5a433a6f9ec08684d88f36
diff --git a/providers/openflow/message/pom.xml b/providers/openflow/message/pom.xml
index fe3c262..212d929 100644
--- a/providers/openflow/message/pom.xml
+++ b/providers/openflow/message/pom.xml
@@ -36,5 +36,10 @@
             <artifactId>onos-app-cpman-api</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-misc</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 </project>
diff --git a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
index 10b10ba..714340b 100644
--- a/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
+++ b/providers/openflow/message/src/main/java/org/onosproject/provider/of/message/impl/OpenFlowControlMessageProvider.java
@@ -22,6 +22,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.metrics.MetricsService;
+import org.onlab.util.SharedScheduledExecutorService;
 import org.onlab.util.SharedScheduledExecutors;
 import org.onosproject.cpman.message.ControlMessageProvider;
 import org.onosproject.cpman.message.ControlMessageProviderRegistry;
@@ -40,7 +41,6 @@
 import org.slf4j.Logger;
 
 import java.util.HashMap;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -77,7 +77,7 @@
                     new InternalOutgoingMessageProvider();
 
     private HashMap<Dpid, OpenFlowControlMessageAggregator> aggregators = Maps.newHashMap();
-    private ScheduledExecutorService executor;
+    private SharedScheduledExecutorService executor;
     private static final int AGGR_INIT_DELAY = 1;
     private static final int AGGR_PERIOD = 1;
     private static final TimeUnit AGGR_TIME_UNIT = TimeUnit.MINUTES;
@@ -159,7 +159,7 @@
                     new OpenFlowControlMessageAggregator(metricsService,
                             providerService, deviceId);
             ScheduledFuture result = executor.scheduleAtFixedRate(ofcma,
-                    AGGR_INIT_DELAY, AGGR_PERIOD, AGGR_TIME_UNIT);
+                    AGGR_INIT_DELAY, AGGR_PERIOD, AGGR_TIME_UNIT, true);
             aggregators.put(dpid, ofcma);
             executorResults.put(dpid, result);
         }
diff --git a/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutorService.java b/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutorService.java
index e2b9b6f..79ee96c 100644
--- a/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutorService.java
+++ b/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutorService.java
@@ -34,7 +34,7 @@
 /**
  * A new scheduled executor service that does not eat exception.
  */
-class SharedScheduledExecutorService implements ScheduledExecutorService {
+public class SharedScheduledExecutorService implements ScheduledExecutorService {
 
     private static final String NOT_ALLOWED = "Shutdown of scheduled executor is not allowed";
     private final Logger log = getLogger(getClass());
@@ -62,17 +62,34 @@
     /**
      * Swaps the backing executor with a new one and shuts down the old one.
      *
-     * @param executor new scheduled executor service
+     * @param executorService new scheduled executor service
      */
-    void setBackingExecutor(ScheduledExecutorService executor) {
+    void setBackingExecutor(ScheduledExecutorService executorService) {
         ScheduledExecutorService oldExecutor = this.executor;
-        this.executor = executor;
+        this.executor = executorService;
         oldExecutor.shutdown();
     }
 
+    /**
+     * Creates and executes a one-shot action that becomes enabled
+     * after the given delay.
+     *
+     * @param command the task to execute
+     * @param delay the time from now to delay execution
+     * @param unit the time unit of the delay parameter
+     * @param repeatFlag the flag to denote whether to restart a failed task
+     * @return a ScheduledFuture representing pending completion of
+     *         the task and whose {@code get()} method will return
+     *         {@code null} upon completion
+     */
+    public ScheduledFuture<?> schedule(Runnable command, long delay,
+                                       TimeUnit unit, boolean repeatFlag) {
+        return executor.schedule(wrap(command, repeatFlag), delay, unit);
+    }
+
     @Override
     public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-        return executor.schedule(wrap(command), delay, unit);
+        return schedule(command, delay, unit, false);
     }
 
     @Override
@@ -88,16 +105,76 @@
         }, delay, unit);
     }
 
+    /**
+     * Creates and executes a periodic action that becomes enabled first
+     * after the given initial delay, and subsequently with the given
+     * period; that is executions will commence after
+     * {@code initialDelay} then {@code initialDelay+period}, then
+     * {@code initialDelay + 2 * period}, and so on.
+     * Depends on the repeat flag that the user set, the failed tasks can be
+     * either restarted or terminated. If the repeat flag is set to to true,
+     * ant execution of the task encounters an exception, subsequent executions
+     * are permitted, otherwise, subsequent executions are suppressed.
+     * If any execution of this task takes longer than its period, then
+     * subsequent executions may start late, but will not concurrently execute.
+     *
+     * @param command the task to execute
+     * @param initialDelay the time to delay first execution
+     * @param period the period between successive executions
+     * @param unit the time unit of the initialDelay and period parameters
+     * @param repeatFlag the flag to denote whether to restart a failed task
+     * @return a ScheduledFuture representing pending completion of
+     *         the task, and whose {@code get()} method will throw an
+     *         exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
+                                                  long initialDelay,
+                                                  long period,
+                                                  TimeUnit unit,
+                                                  boolean repeatFlag) {
+        return executor.scheduleAtFixedRate(wrap(command, repeatFlag),
+                initialDelay, period, unit);
+    }
+
     @Override
     public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
                                                   long period, TimeUnit unit) {
-        return executor.scheduleAtFixedRate(wrap(command), initialDelay, period, unit);
+        return scheduleAtFixedRate(command, initialDelay, period, unit, false);
+    }
+
+    /**
+     * Creates and executes a periodic action that becomes enabled first
+     * after the given initial delay, and subsequently with the
+     * given delay between the termination of one execution and the
+     * commencement of the next.
+     * Depends on the repeat flag that the user set, the failed tasks can be
+     * either restarted or terminated. If the repeat flag is set to to true,
+     * ant execution of the task encounters an exception, subsequent executions
+     * are permitted, otherwise, subsequent executions are suppressed.
+     *
+     * @param command the task to execute
+     * @param initialDelay the time to delay first execution
+     * @param delay the delay between the termination of one
+     * execution and the commencement of the next
+     * @param unit the time unit of the initialDelay and delay parameters
+     * @param repeatFlag the flag to denote whether to restart a failed task
+     * @return a ScheduledFuture representing pending completion of
+     *         the task, and whose {@code get()} method will throw an
+     *         exception upon cancellation
+     */
+    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
+                                                     long initialDelay,
+                                                     long delay,
+                                                     TimeUnit unit,
+                                                     boolean repeatFlag) {
+        return executor.scheduleWithFixedDelay(wrap(command, repeatFlag),
+                initialDelay, delay, unit);
     }
 
     @Override
     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
                                                      long delay, TimeUnit unit) {
-        return executor.scheduleWithFixedDelay(wrap(command), initialDelay, delay, unit);
+        return scheduleWithFixedDelay(command, initialDelay, delay, unit, false);
     }
 
     @Override
@@ -171,8 +248,8 @@
         executor.execute(command);
     }
 
-    private Runnable wrap(Runnable command) {
-        return new LoggableRunnable(command);
+    private Runnable wrap(Runnable command, boolean repeatFlag) {
+        return new LoggableRunnable(command, repeatFlag);
     }
 
     /**
@@ -180,19 +257,31 @@
      */
     private class LoggableRunnable implements Runnable {
         private Runnable runnable;
+        private boolean repeatFlag;
 
-        public LoggableRunnable(Runnable runnable) {
+        public LoggableRunnable(Runnable runnable, boolean repeatFlag) {
             super();
             this.runnable = runnable;
+            this.repeatFlag = repeatFlag;
         }
 
         @Override
         public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                log.info("Task interrupted, quitting");
+                return;
+            }
+
             try {
                 runnable.run();
             } catch (Exception e) {
                 log.error("Uncaught exception on " + runnable.getClass().getSimpleName(), e);
-                throw Throwables.propagate(e);
+
+                // if repeat flag set as false, we simply throw an exception to
+                // terminate this task
+                if (!repeatFlag) {
+                    throw Throwables.propagate(e);
+                }
             }
         }
     }
diff --git a/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutors.java b/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutors.java
index 5a2f85c..5469e62 100644
--- a/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutors.java
+++ b/utils/misc/src/main/java/org/onlab/util/SharedScheduledExecutors.java
@@ -15,8 +15,6 @@
  */
 package org.onlab.util;
 
-import java.util.concurrent.ScheduledExecutorService;
-
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -56,7 +54,7 @@
      *
      * @return shared scheduled single thread executor
      */
-    public static ScheduledExecutorService getSingleThreadExecutor() {
+    public static SharedScheduledExecutorService getSingleThreadExecutor() {
         return singleThreadExecutor;
     }
 
@@ -65,7 +63,7 @@
      *
      * @return shared scheduled executor pool
      */
-    public static ScheduledExecutorService getPoolThreadExecutor() {
+    public static SharedScheduledExecutorService getPoolThreadExecutor() {
         return poolThreadExecutor;
     }