[SDFAB-642][SDFAB-643] Fix device disconnection when an instance rejoins

Changes in the GDP are done to make sure the default availability is always
updated through the two step process defined in the provider and to prevent
the disconnection of the devices when an instance rejoins the cluster

Finally, the patch improves logging inside the GossipDeviceStore and improve
the stability of PiPipeconfWatchdogManager by getting rid of the timer and
timer task and by executing the event listeners inside an executor.

Change-Id: Ibc6ce711e15e86bde05dbf3b1c37d2a93516fae3
diff --git a/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java b/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java
index d0b7b2b..271e678 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DefaultDeviceDescription.java
@@ -217,6 +217,7 @@
                 .add("hw", hwVersion).add("sw", swVersion)
                 .add("serial", serialNumber)
                 .add("chassisId", chassisId)
+                .add("defaultAvailable", defaultAvailable)
                 .add("annotations", annotations())
                 .toString();
     }
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceProviderService.java b/core/api/src/main/java/org/onosproject/net/device/DeviceProviderService.java
index b00fa63..f11f0cc 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceProviderService.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceProviderService.java
@@ -105,4 +105,14 @@
      */
     void updatePortStatistics(DeviceId deviceId, Collection<PortStatistics> portStatistics);
 
+    /**
+     * Returns the specified device description.
+     *
+     * @param deviceId device identifier
+     * @return device description or null if not present
+     */
+    default DeviceDescription getDeviceDescription(DeviceId deviceId) {
+        return null;
+    }
+
 }
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceStore.java b/core/api/src/main/java/org/onosproject/net/device/DeviceStore.java
index bd59838..ed5d33d 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceStore.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceStore.java
@@ -208,6 +208,17 @@
     PortDescription getPortDescription(ProviderId providerId, DeviceId deviceId, PortNumber portNumber);
 
     /**
+     * Returns the specified device description.
+     *
+     * @param providerId provider identifier
+     * @param deviceId device identifier
+     * @return device description or null if not present
+     */
+    default DeviceDescription getDeviceDescription(ProviderId providerId, DeviceId deviceId) {
+        return null;
+    }
+
+    /**
      * Indicates whether the specified device is available/online.
      *
      * @param deviceId device identifier
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 53a312d..cf6265b 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -856,6 +856,14 @@
                                                            deviceId, portStatistics);
             post(event);
         }
+
+        @Override
+        public DeviceDescription getDeviceDescription(DeviceId deviceId) {
+            checkNotNull(deviceId, DEVICE_ID_NULL);
+            checkValidity();
+
+            return store.getDeviceDescription(provider().id(), deviceId);
+        }
     }
 
     // by default allowed, otherwise check flag
diff --git a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
index 6dea30e..be4b329 100644
--- a/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
+++ b/core/net/src/main/java/org/onosproject/net/pi/impl/PiPipeconfWatchdogManager.java
@@ -64,13 +64,15 @@
 import java.util.Dictionary;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 
 import static java.util.Collections.singleton;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL;
 import static org.onosproject.net.OsgiPropertyConstants.PWM_PROBE_INTERVAL_DEFAULT;
@@ -94,8 +96,6 @@
 
     private final Logger log = getLogger(getClass());
 
-    private static final long SECONDS = 1000L;
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private PiPipeconfMappingStore pipeconfMappingStore;
 
@@ -125,8 +125,9 @@
     private final DeviceListener deviceListener = new InternalDeviceListener();
     private final PiPipeconfListener pipeconfListener = new InternalPipeconfListener();
 
-    private Timer timer;
-    private TimerTask task;
+    private ScheduledExecutorService eventExecutor = newSingleThreadScheduledExecutor(
+            groupedThreads("onos/pipeconf-event", "%d", log));
+    private ScheduledFuture<?> poller = null;
 
     private final Striped<Lock> locks = Striped.lock(30);
 
@@ -164,7 +165,6 @@
         // Register component configurable properties.
         componentConfigService.registerProperties(getClass());
         // Start periodic watchdog task.
-        timer = new Timer();
         startProbeTask();
         // Add listeners.
         deviceService.addListener(deviceListener);
@@ -196,7 +196,8 @@
         pipeconfService.removeListener(pipeconfListener);
         deviceService.removeListener(deviceListener);
         stopProbeTask();
-        timer = null;
+        eventExecutor.shutdown();
+        executor.shutdown();
         statusMap = null;
         localStatusMap = null;
         log.info("Stopped");
@@ -327,9 +328,8 @@
     private void startProbeTask() {
         synchronized (this) {
             log.info("Starting pipeline probe thread with {} seconds interval...", probeInterval);
-            task = new InternalTimerTask();
-            timer.scheduleAtFixedRate(task, probeInterval * SECONDS,
-                                      probeInterval * SECONDS);
+            poller = eventExecutor.scheduleAtFixedRate(this::triggerCheckAllDevices, probeInterval,
+                    probeInterval, TimeUnit.SECONDS);
         }
     }
 
@@ -337,8 +337,8 @@
     private void stopProbeTask() {
         synchronized (this) {
             log.info("Stopping pipeline probe thread...");
-            task.cancel();
-            task = null;
+            poller.cancel(false);
+            poller = null;
         }
     }
 
@@ -350,13 +350,6 @@
         }
     }
 
-    private class InternalTimerTask extends TimerTask {
-        @Override
-        public void run() {
-            triggerCheckAllDevices();
-        }
-    }
-
     /**
      * Listener of device events used to update the pipeline status.
      */
@@ -364,48 +357,50 @@
 
         @Override
         public void event(DeviceEvent event) {
-            final Device device = event.subject();
-            switch (event.type()) {
-                case DEVICE_ADDED:
-                case DEVICE_UPDATED:
-                case DEVICE_AVAILABILITY_CHANGED:
-                    if (!deviceService.isAvailable(device.id())) {
+            eventExecutor.execute(() -> {
+                final Device device = event.subject();
+                switch (event.type()) {
+                    case DEVICE_ADDED:
+                    case DEVICE_UPDATED:
+                    case DEVICE_AVAILABILITY_CHANGED:
+                        /*
+                         * The GeneralDeviceProvider marks online/offline devices that
+                         * have/have not ANY pipeline config set. Here we make sure the
+                         * one configured in the pipeconf service is the expected one.
+                         * Clearly, it would be better to let the GDP do this check and
+                         * avoid sending twice the same message to the switch.
+                         */
+                        if (!deviceService.isAvailable(device.id())) {
+                            signalStatusUnknown(device.id());
+                        } else {
+                            filterAndTriggerTasks(singleton(device));
+                        }
+                        break;
+                    case DEVICE_REMOVED:
+                    case DEVICE_SUSPENDED:
                         signalStatusUnknown(device.id());
-                    } else {
-                        // The GeneralDeviceProvider marks online devices that
-                        // have ANY pipeline config set. Here we make sure the
-                        // one configured in the pipeconf service is the
-                        // expected one. Clearly, it would be better to let the
-                        // GDP do this check and avoid sending twice the same
-                        // message to the switch.
-                        filterAndTriggerTasks(singleton(device));
-                    }
-                    break;
-                case DEVICE_REMOVED:
-                case DEVICE_SUSPENDED:
-                    signalStatusUnknown(device.id());
-                    signalStatusUnconfigured(device.id());
-                    break;
-                case PORT_ADDED:
-                case PORT_UPDATED:
-                case PORT_REMOVED:
-                case PORT_STATS_UPDATED:
-                default:
-                    break;
-            }
+                        signalStatusUnconfigured(device.id());
+                        break;
+                    case PORT_ADDED:
+                    case PORT_UPDATED:
+                    case PORT_REMOVED:
+                    case PORT_STATS_UPDATED:
+                    default:
+                        break;
+                }
+            });
         }
     }
 
     private class InternalPipeconfListener implements PiPipeconfListener {
         @Override
         public void event(PiPipeconfEvent event) {
-            pipeconfMappingStore.getDevices(event.subject())
-                    .forEach(PiPipeconfWatchdogManager.this::triggerProbe);
-        }
-
-        @Override
-        public boolean isRelevant(PiPipeconfEvent event) {
-            return Objects.equals(event.type(), PiPipeconfEvent.Type.REGISTERED);
+            eventExecutor.execute(() -> {
+                if (Objects.equals(event.type(), PiPipeconfEvent.Type.REGISTERED)) {
+                    pipeconfMappingStore.getDevices(event.subject())
+                            .forEach(PiPipeconfWatchdogManager.this::triggerProbe);
+                }
+            });
         }
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
index 5760e23..2bd4dfe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/device/impl/GossipDeviceStore.java
@@ -1037,6 +1037,20 @@
     }
 
     @Override
+    public DeviceDescription getDeviceDescription(ProviderId providerId, DeviceId deviceId) {
+        if (devices.containsKey(deviceId)) {
+            Map<ProviderId, DeviceDescriptions> descs = getOrCreateDeviceDescriptionsMap(deviceId);
+            synchronized (descs) {
+                DeviceDescriptions deviceDescriptions = descs.get(providerId);
+                return deviceDescriptions != null ? deviceDescriptions.getDeviceDesc().value() : null;
+            }
+        } else {
+            log.warn("Device {} does not exist in store", deviceId);
+        }
+        return null;
+    }
+
+    @Override
     public boolean isAvailable(DeviceId deviceId) {
         return availableDevices.contains(deviceId);
     }
@@ -1403,7 +1417,13 @@
      * @param advertisement to respond to
      */
     private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
-
+        /*
+         * NOTE that when an instance rejoins the cluster, it will generate
+         * device events and send to the local apps through the delegate. This
+         * approach might be not the best if the apps are not enough robust or
+         * if there is no proper coordination in the cluster. Also, note that
+         * any ECMap will act on the same way during the bootstrap process
+         */
         final NodeId sender = advertisement.sender();
 
         Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
@@ -1437,9 +1457,12 @@
                     if (advDevTimestamp == null || lProvDevice.isNewerThan(
                             advDevTimestamp)) {
                         // remote does not have it or outdated, suggest
+                        log.trace("send to {} device update {} for {}", sender, lProvDevice, deviceId);
                         notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
                     } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
                         // local is outdated, request
+                        log.trace("need update {} < {} for device {} from {}", lProvDevice.timestamp(),
+                                advDevTimestamp, deviceId, sender);
                         reqDevices.add(devFragId);
                     }
 
@@ -1456,10 +1479,12 @@
                         if (advPortTimestamp == null || lPort.isNewerThan(
                                 advPortTimestamp)) {
                             // remote does not have it or outdated, suggest
+                            log.trace("send to {} port update {} for {}/{}", sender, lPort, deviceId, num);
                             notifyPeer(sender, new InternalPortStatusEvent(provId, deviceId, lPort));
                         } else if (!lPort.timestamp().equals(advPortTimestamp)) {
                             // local is outdated, request
-                            log.trace("need update {} < {}", lPort.timestamp(), advPortTimestamp);
+                            log.trace("need update {} < {} for port {} from {}", lPort.timestamp(),
+                                    advPortTimestamp, num, sender);
                             reqPorts.add(portFragId);
                         }
 
@@ -1483,12 +1508,15 @@
                 if (localLatest == null || (rOffline != null && rOffline.compareTo(localLatest) > 0)) {
                     // remote offline timestamp suggests that the
                     // device is off-line
+                    log.trace("remote offline timestamp from {} suggests that the device {} is off-line",
+                            sender, deviceId);
                     markOfflineInternal(deviceId, rOffline);
                 }
 
                 Timestamp lOffline = offline.get(deviceId);
                 if (lOffline != null && rOffline == null) {
                     // locally offline, but remote is online, suggest offline
+                    log.trace("suggest to {} sthat the device {} is off-line", sender, deviceId);
                     notifyPeer(sender, new InternalDeviceStatusChangeEvent(deviceId, lOffline, false));
                 }
 
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 6b19cc8..b892460 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -643,13 +643,35 @@
             providerService.updatePorts(deviceId, ports);
         }
 
-        if (deviceService.getDevice(deviceId) != null
-                && deviceService.isAvailable(deviceId) == available) {
-            // Other nodes might have advertised this device before us.
-            return;
+        DeviceDescription deviceDescription = getDeviceDescription(deviceId, available);
+        DeviceDescription storeDescription = providerService.getDeviceDescription(deviceId);
+        if (deviceService.getDevice(deviceId) != null &&
+                deviceService.isAvailable(deviceId) == available &&
+                storeDescription != null) {
+            /*
+             * FIXME SDFAB-650 rethink the APIs and abstractions around the DeviceStore.
+             * Device registration is a two-step process for the GDP. Initially, the device is
+             * registered with default avail. to false. Later, the checkup task will update the
+             * description with the default avail to true in order to mark it available. Today,
+             * there is only one API to mark online a device from the device provider which is
+             * deviceConnected which assumes an update on the device description. The device provider
+             * is the only one able to update the device description and we have to make sure that
+             * the default avail. is flipped to true as it is used to mark as online the device when
+             * it is created or updated. Otherwise, if an ONOS instance fails and restarts, when re-joining
+             * the cluster, it will get the device marked as offline and will not be able to update
+             * its status until it become the master. This process concurs with the markOnline done
+             * by the background thread in the DeviceManager and its the reason why we cannot just check
+             * the device availability but we need to compare also the desc. Checking here the equality,
+             * as in general we may want to upgrade the device description at run time.
+             */
+            DeviceDescription testDeviceDescription = DefaultDeviceDescription.copyReplacingAnnotation(
+                    deviceDescription, storeDescription.annotations());
+            if (testDeviceDescription.equals(storeDescription)) {
+                return;
+            }
         }
-        providerService.deviceConnected(deviceId, getDeviceDescription(
-                deviceId, available));
+
+        providerService.deviceConnected(deviceId, deviceDescription);
     }
 
     private boolean probeAvailability(DeviceHandshaker handshaker) {