[SDFAB-954] Non-leader instance can mark UNKNOWN the pipeline.

The probe task is not performed in atomic way and between the
initial mastership check and the actual probe the execution
can be blocked many times and the mastership can change. Recheck
the mastership after pipeline probe returns.

We are seeing an issue when a network partition occurs: watchdog
is stuck for 60s before returning and will mark the device offline.
However, in the meanwhile the mastership has been passed to another
instance which is already connected and has already marked the device
online. An harmless side effect of this change is that when we return
from the pipeline config we might be no longer the master and this
will delay in the worst case the markonline of the device for 15s
(next reconcile interval)

Additionally, this patch simplifies the Manager by removing the
executor lock and by using only one worker per device. This change
prevents also the exhaustion of all workers than can easily happen
if there is a network partition that prevents the probe to return
immediately.

Change-Id: I3429cd0598c95589e50f35139f6087f83ceb60f2
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
index be4b329..3c85802 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
@@ -18,8 +18,8 @@
 
 import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.Striped;
 import org.onlab.util.KryoNamespace;
+import org.onlab.util.PredictableExecutor;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.event.AbstractListenerManager;
@@ -64,12 +64,9 @@
 import java.util.Dictionary;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
 
 import static java.util.Collections.singleton;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
@@ -119,8 +116,10 @@
      */
     private int probeInterval = PWM_PROBE_INTERVAL_DEFAULT;
 
-    protected ExecutorService executor = Executors.newFixedThreadPool(
-            30, groupedThreads("onos/pipeconf-watchdog", "%d", log));
+    // Setting to 0 will leverage available processors
+    private static final int DEFAULT_THREADS = 0;
+    protected PredictableExecutor watchdogWorkers = new PredictableExecutor(DEFAULT_THREADS,
+            groupedThreads("onos/pipeconf-watchdog", "%d", log));
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
     private final PiPipeconfListener pipeconfListener = new InternalPipeconfListener();
@@ -129,8 +128,6 @@
             groupedThreads("onos/pipeconf-event", "%d", log));
     private ScheduledFuture<?> poller = null;
 
-    private final Striped<Lock> locks = Striped.lock(30);
-
     private EventuallyConsistentMap<DeviceId, PipelineStatus> statusMap;
     private Map<DeviceId, PipelineStatus> localStatusMap;
 
@@ -197,7 +194,7 @@
         deviceService.removeListener(deviceListener);
         stopProbeTask();
         eventExecutor.shutdown();
-        executor.shutdown();
+        watchdogWorkers.shutdown();
         statusMap = null;
         localStatusMap = null;
         log.info("Stopped");
@@ -222,40 +219,55 @@
     }
 
     private void filterAndTriggerTasks(Iterable<Device> devices) {
-        devices.forEach(device -> {
-            if (!isLocalMaster(device)) {
-                return;
+        devices.forEach(device -> watchdogWorkers.execute(() -> probeTask(device), device.id().hashCode()));
+    }
+
+    private void probeTask(Device device) {
+        if (!isLocalMaster(device)) {
+            return;
+        }
+
+        final PiPipeconfId pipeconfId = pipeconfMappingStore.getPipeconfId(device.id());
+        if (pipeconfId == null || !device.is(PiPipelineProgrammable.class)) {
+            return;
+        }
+
+        if (pipeconfService.getPipeconf(pipeconfId).isEmpty()) {
+            log.warn("Pipeconf {} is not registered, skipping probe for {}",
+                    pipeconfId, device.id());
+            return;
+        }
+
+        final PiPipeconf pipeconf = pipeconfService.getPipeconf(pipeconfId).get();
+
+        if (!device.is(DeviceHandshaker.class)) {
+            log.error("Missing DeviceHandshaker behavior for {}", device.id());
+            return;
+        }
+
+        final boolean success = doSetPipeconfIfRequired(device, pipeconf);
+        // The probe task is not performed in atomic way and between the
+        // initial mastership check and the actual probe the execution
+        // can be blocked many times and the mastership can change. Recheck
+        // the mastership after pipeline probe returns.
+        if (isLocalMaster(device)) {
+            // An harmless side effect of the check above is that when we return
+            // from the set pipeline config we might be no longer the master and this
+            // will delay in the worst case the mark online of the device for 15s
+            // (next reconcile interval)
+            if (success) {
+                signalStatusReady(device.id());
+                signalStatusConfigured(device.id());
+            } else {
+                // When a network partition occurs watchdog is stuck for LONG_TIMEOUT
+                // before returning and will mark the device offline. However, in the
+                // meanwhile the mastership has been passed to another instance which is
+                // already connected and has already marked the device online.
+                signalStatusUnknown(device.id());
             }
-
-            final PiPipeconfId pipeconfId = pipeconfMappingStore.getPipeconfId(device.id());
-            if (pipeconfId == null || !device.is(PiPipelineProgrammable.class)) {
-                return;
-            }
-
-            if (!pipeconfService.getPipeconf(pipeconfId).isPresent()) {
-                log.warn("Pipeconf {} is not registered, skipping probe for {}",
-                         pipeconfId, device.id());
-                return;
-            }
-
-            final PiPipeconf pipeconf = pipeconfService.getPipeconf(pipeconfId).get();
-
-            if (!device.is(DeviceHandshaker.class)) {
-                log.error("Missing DeviceHandshaker behavior for {}", device.id());
-                return;
-            }
-
-            // Trigger task with per-device lock.
-            executor.execute(withLock(() -> {
-                final boolean success = doSetPipeconfIfRequired(device, pipeconf);
-                if (success) {
-                    signalStatusReady(device.id());
-                    signalStatusConfigured(device.id());
-                } else {
-                    signalStatusUnknown(device.id());
-                }
-            }, device.id()));
-        });
+        } else {
+            log.warn("No longer the master for {} aborting probe task", device.id());
+        }
     }
 
     /**
@@ -284,18 +296,6 @@
         return Futures.getUnchecked(pipelineProg.setPipeconf(pipeconf));
     }
 
-    private Runnable withLock(Runnable task, Object object) {
-        return () -> {
-            final Lock lock = locks.get(object);
-            lock.lock();
-            try {
-                task.run();
-            } finally {
-                lock.unlock();
-            }
-        };
-    }
-
     private void signalStatusUnknown(DeviceId deviceId) {
         statusMap.remove(deviceId);
     }
@@ -363,13 +363,11 @@
                     case DEVICE_ADDED:
                     case DEVICE_UPDATED:
                     case DEVICE_AVAILABILITY_CHANGED:
-                        /*
-                         * The GeneralDeviceProvider marks online/offline devices that
-                         * have/have not ANY pipeline config set. Here we make sure the
-                         * one configured in the pipeconf service is the expected one.
-                         * Clearly, it would be better to let the GDP do this check and
-                         * avoid sending twice the same message to the switch.
-                         */
+                        // The GeneralDeviceProvider marks online/offline devices that
+                        // have/have not ANY pipeline config set. Here we make sure the
+                        // one configured in the pipeconf service is the expected one.
+                        // Clearly, it would be better to let the GDP do this check and
+                        // avoid sending twice the same message to the switch.
                         if (!deviceService.isAvailable(device.id())) {
                             signalStatusUnknown(device.id());
                         } else {