[SDFAB-642][SDFAB-643] Fix device disconnection when an instance rejoins

Changes in the GDP are done to make sure the default availability is always
updated through the two step process defined in the provider and to prevent
the disconnection of the devices when an instance rejoins the cluster

Finally, the patch improves logging inside the GossipDeviceStore and improve
the stability of PiPipeconfWatchdogManager by getting rid of the timer and
timer task and by executing the event listeners inside an executor.

Change-Id: Ibc6ce711e15e86bde05dbf3b1c37d2a93516fae3
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 53a312d..cf6265b 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -856,6 +856,14 @@
                                                            deviceId, portStatistics);
             post(event);
         }
+
+        @Override
+        public DeviceDescription getDeviceDescription(DeviceId deviceId) {
+            checkNotNull(deviceId, DEVICE_ID_NULL);
+            checkValidity();
+
+            return store.getDeviceDescription(provider().id(), deviceId);
+        }
     }
 
     // by default allowed, otherwise check flag
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
index 6dea30e..be4b329 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
@@ -64,13 +64,15 @@
 import java.util.Dictionary;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 
 import static java.util.Collections.singleton;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL;
 import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL_DEFAULT;
@@ -94,8 +96,6 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final long SECONDS = 1000L;
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private PiPipeconfMappingStore pipeconfMappingStore;
 
@@ -125,8 +125,9 @@
     private final DeviceListener deviceListener = new InternalDeviceListener();
     private final PiPipeconfListener pipeconfListener = new InternalPipeconfListener();
 
-    private Timer timer;
-    private TimerTask task;
+    private ScheduledExecutorService eventExecutor = newSingleThreadScheduledExecutor(
+            groupedThreads("onos/pipeconf-event", "%d", log));
+    private ScheduledFuture<?> poller = null;
 
     private final Striped<Lock> locks = Striped.lock(30);
 
@@ -164,7 +165,6 @@
         // Register component configurable properties.
         componentConfigService.registerProperties(getClass());
         // Start periodic watchdog task.
-        timer = new Timer();
         startProbeTask();
         // Add listeners.
         deviceService.addListener(deviceListener);
@@ -196,7 +196,8 @@
         pipeconfService.removeListener(pipeconfListener);
         deviceService.removeListener(deviceListener);
         stopProbeTask();
-        timer = null;
+        eventExecutor.shutdown();
+        executor.shutdown();
         statusMap = null;
         localStatusMap = null;
         log.info("Stopped");
@@ -327,9 +328,8 @@
     private void startProbeTask() {
         synchronized (this) {
             log.info("Starting pipeline probe thread with {} seconds interval...", probeInterval);
-            task = new InternalTimerTask();
-            timer.scheduleAtFixedRate(task, probeInterval * SECONDS,
-                                      probeInterval * SECONDS);
+            poller = eventExecutor.scheduleAtFixedRate(this::triggerCheckAllDevices, probeInterval,
+                    probeInterval, TimeUnit.SECONDS);
         }
     }
 
@@ -337,8 +337,8 @@
     private void stopProbeTask() {
         synchronized (this) {
             log.info("Stopping pipeline probe thread...");
-            task.cancel();
-            task = null;
+            poller.cancel(false);
+            poller = null;
         }
     }
 
@@ -350,13 +350,6 @@
         }
     }
 
-    private class InternalTimerTask extends TimerTask {
-        @Override
-        public void run() {
-            triggerCheckAllDevices();
-        }
-    }
-
     /**
      * Listener of device events used to update the pipeline status.
      */
@@ -364,48 +357,50 @@
 
         @Override
         public void event(DeviceEvent event) {
-            final Device device = event.subject();
-            switch (event.type()) {
-                case DEVICE_ADDED:
-                case DEVICE_UPDATED:
-                case DEVICE_AVAILABILITY_CHANGED:
-                    if (!deviceService.isAvailable(device.id())) {
+            eventExecutor.execute(() -> {
+                final Device device = event.subject();
+                switch (event.type()) {
+                    case DEVICE_ADDED:
+                    case DEVICE_UPDATED:
+                    case DEVICE_AVAILABILITY_CHANGED:
+                        /*
+                         * The GeneralDeviceProvider marks online/offline devices that
+                         * have/have not ANY pipeline config set. Here we make sure the
+                         * one configured in the pipeconf service is the expected one.
+                         * Clearly, it would be better to let the GDP do this check and
+                         * avoid sending twice the same message to the switch.
+                         */
+                        if (!deviceService.isAvailable(device.id())) {
+                            signalStatusUnknown(device.id());
+                        } else {
+                            filterAndTriggerTasks(singleton(device));
+                        }
+                        break;
+                    case DEVICE_REMOVED:
+                    case DEVICE_SUSPENDED:
                         signalStatusUnknown(device.id());
-                    } else {
-                        // The GeneralDeviceProvider marks online devices that
-                        // have ANY pipeline config set. Here we make sure the
-                        // one configured in the pipeconf service is the
-                        // expected one. Clearly, it would be better to let the
-                        // GDP do this check and avoid sending twice the same
-                        // message to the switch.
-                        filterAndTriggerTasks(singleton(device));
-                    }
-                    break;
-                case DEVICE_REMOVED:
-                case DEVICE_SUSPENDED:
-                    signalStatusUnknown(device.id());
-                    signalStatusUnconfigured(device.id());
-                    break;
-                case PORT_ADDED:
-                case PORT_UPDATED:
-                case PORT_REMOVED:
-                case PORT_STATS_UPDATED:
-                default:
-                    break;
-            }
+                        signalStatusUnconfigured(device.id());
+                        break;
+                    case PORT_ADDED:
+                    case PORT_UPDATED:
+                    case PORT_REMOVED:
+                    case PORT_STATS_UPDATED:
+                    default:
+                        break;
+                }
+            });
         }
     }
 
     private class InternalPipeconfListener implements PiPipeconfListener {
         @Override
         public void event(PiPipeconfEvent event) {
-            pipeconfMappingStore.getDevices(event.subject())
-                    .forEach(PiPipeconfWatchdogManager.this::triggerProbe);
-        }
-
-        @Override
-        public boolean isRelevant(PiPipeconfEvent event) {
-            return Objects.equals(event.type(), PiPipeconfEvent.Type.REGISTERED);
+            eventExecutor.execute(() -> {
+                if (Objects.equals(event.type(), PiPipeconfEvent.Type.REGISTERED)) {
+                    pipeconfMappingStore.getDevices(event.subject())
+                            .forEach(PiPipeconfWatchdogManager.this::triggerProbe);
+                }
+            });
         }
     }