[SDFAB-977][SDFAB-978] Implement persistent mastership logic in ONOS

Design doc can be found here:
https://docs.google.com/document/d/1UStD1hGCy5YQlFmaWfclq6uYROtMY56jWxUBwfUEsRM

Additionally, introduce few more changes in the DeviceManager:
- Always reassert the role if the instance has a valid role
- Test reachability through isReachable and probeReachability

Change-Id: I35183c04b65a98bc0332ed256be8333b53d6b68f
(cherry picked from commit b2f636bffd2783266826503755ba5f42dfdb2b75)
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index e50fe37..cd8ab96 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -23,7 +23,9 @@
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipAdminService;
 import org.onosproject.mastership.MastershipEvent;
 import org.onosproject.mastership.MastershipListener;
 import org.onosproject.mastership.MastershipService;
@@ -146,6 +148,9 @@
     protected MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipAdminService mastershipAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected MastershipTermService termService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -157,7 +162,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ClusterCommunicationService communicationService;
 
-    private ExecutorService portRequestExecutor;
+    private ExecutorService clusterRequestExecutor;
     /**
      * List of all registered PortConfigOperator.
      */
@@ -177,6 +182,11 @@
     private static final MessageSubject PORT_UPDOWN_SUBJECT =
             new MessageSubject("port-updown-req");
 
+    private static final MessageSubject PROBE_SUBJECT =
+            new MessageSubject("probe-req");
+    private static final long PROBE_TIMEOUT_MILLIS = 5000;
+    private static final int PROBE_ATTEMPTS = 3;
+
     private static final Serializer SERIALIZER = Serializer.using(
             KryoNamespace.newBuilder()
                     .register(KryoNamespaces.API)
@@ -229,13 +239,20 @@
             }
         }, 1, 1, TimeUnit.MINUTES);
 
-        portRequestExecutor = newSingleThreadExecutor();
+        clusterRequestExecutor = newSingleThreadExecutor();
 
-        communicationService.<InternalPortUpDownEvent>addSubscriber(
+        communicationService.addSubscriber(
                 PORT_UPDOWN_SUBJECT,
                 SERIALIZER::decode,
                 this::handlePortRequest,
-                portRequestExecutor);
+                clusterRequestExecutor);
+
+        communicationService.addSubscriber(
+                PROBE_SUBJECT,
+                SERIALIZER::decode,
+                this::handleProbeRequest,
+                SERIALIZER::encode,
+                clusterRequestExecutor);
 
         backgroundRoleChecker = newSingleThreadScheduledExecutor(
                 groupedThreads("onos/device", "manager-role", log));
@@ -258,7 +275,7 @@
         mastershipService.removeListener(mastershipListener);
         eventDispatcher.removeSink(DeviceEvent.class);
         communicationService.removeSubscriber(PORT_UPDOWN_SUBJECT);
-        portRequestExecutor.shutdown();
+        clusterRequestExecutor.shutdown();
         backgroundRoleChecker.shutdown();
         log.info("Stopped");
     }
@@ -389,7 +406,7 @@
         }
         DeviceProvider provider = getProvider(deviceId);
         if (provider != null) {
-            boolean reachable = provider.isReachable(deviceId);
+            boolean reachable = probeReachability(deviceId);
             if (reachable && !isLocallyConnected(deviceId)) {
                 deviceLocalStatus.put(deviceId, new LocalStatus(true, Instant.now()));
             } else if (!reachable && isLocallyConnected(deviceId)) {
@@ -498,42 +515,35 @@
             log.trace("Checking device {}. Current role is {}", deviceId, myRole);
             if (!isReachable(deviceId)) {
                 if (myRole != NONE) {
-                    // can't be master if device is not reachable
-                    try {
-                        if (myRole == MASTER) {
-                            log.info("Local Role {}, Marking unreachable device {} offline", MASTER, deviceId);
-                            post(store.markOffline(deviceId));
-                        }
-                        //relinquish master role and ability to be backup.
-                        roleToAcknowledge.remove(deviceId);
-                        mastershipService.relinquishMastership(deviceId).get();
-                    } catch (InterruptedException e) {
-                        log.warn("Interrupted while relinquishing role for {}", deviceId);
-                        Thread.currentThread().interrupt();
-                    } catch (ExecutionException e) {
-                        log.error("Exception thrown while relinquishing role for {}", deviceId, e);
+                    // Verify if the device is fully disconnected from the cluster
+                    if (updateMastershipFor(deviceId) == null
+                            && myRole == MASTER && isAvailable(deviceId)) {
+                        log.info("Local Role {}, Marking unreachable device {} offline", MASTER, deviceId);
+                        post(store.markOffline(deviceId));
                     }
                 } else {
-                    // check if the device has master and is available to the store, if not, mark it offline
-                    // only the nodes which has mastership role can mark any device offline.
-                    // This condition should never be hit unless in a device removed phase for NONE mastership roles.
-                    NodeId master = mastershipService.getMasterFor(deviceId);
-                    if (master == null && isAvailable(deviceId)) {
-                        CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
-                        roleFuture.thenAccept(role -> {
-                            MastershipTerm term = termService.getMastershipTerm(deviceId);
-                            if (term != null && localNodeId.equals(term.master())) {
-                                log.info("Marking unreachable device {} offline", deviceId);
-                                post(store.markOffline(deviceId));
-                            } else {
-                                log.info("Failed marking {} offline. {}", deviceId, role);
-                            }
-                            // Stop the timer - just to prevent any race
-                            roleToAcknowledge.remove(deviceId);
-                            mastershipService.relinquishMastership(deviceId);
-                        });
+                    /* Firstly get a role and then check if the device is available in the store.
+                       if it is, if this node is the master and the device is fully disconnected
+                       from the cluster mark the device offline. In principle, this condition should
+                       never be hit unless in a device removed phase for NONE mastership roles. */
+                    try {
+                        mastershipService.requestRoleFor(deviceId).get();
+                    } catch (InterruptedException e) {
+                        Thread.currentThread().interrupt();
+                        log.error("Interrupted waiting for Mastership", e);
+                    } catch (ExecutionException e) {
+                        log.error("Encountered an error waiting for Mastership", e);
+                    }
+
+                    MastershipTerm term = termService.getMastershipTerm(deviceId);
+                    if (updateMastershipFor(deviceId) == null &&
+                            term != null && localNodeId.equals(term.master()) &&
+                            isAvailable(deviceId)) {
+                        log.info("Marking unreachable device {} offline", deviceId);
+                        post(store.markOffline(deviceId));
                     }
                 }
+                roleToAcknowledge.remove(deviceId);
                 continue;
             }
 
@@ -542,15 +552,11 @@
                 post(store.markOnline(deviceId));
             }
 
-            if (myRole != NONE) {
-                continue;
-            }
+            log.info("{} is reachable - reasserting the role", deviceId);
 
-            log.info("{} is reachable but did not have a valid role, reasserting", deviceId);
-
-            // isReachable but was not MASTER or STANDBY, get a role and apply
-            // Note: NONE triggers request to MastershipService
-            reassertRole(deviceId, NONE);
+            /* Device is still reachable. It is useful for some protocols
+               to reassert the role. Note: NONE triggers request to MastershipService */
+            reassertRole(deviceId, myRole);
         }
     }
 
@@ -592,14 +598,7 @@
                Yield mastership to other instance*/
             log.warn("Failed to assert role onto device {}. requested={}, no response",
                     deviceId, myRole);
-            mastershipService.relinquishMastership(deviceId).whenComplete((result, error) -> {
-                if (error != null) {
-                    log.error("Exception while relinquishing mastership", error);
-                } else {
-                    log.info("Successfully relinquished mastership for {}. Requesting new role", deviceId);
-                    mastershipService.requestRoleFor(deviceId);
-                }
-            });
+            updateMastershipFor(deviceId);
         }
     }
 
@@ -700,55 +699,55 @@
         public void deviceDisconnected(DeviceId deviceId) {
             checkNotNull(deviceId, DEVICE_ID_NULL);
             checkValidity();
+            // Update the local status
             deviceLocalStatus.put(deviceId, new LocalStatus(false, Instant.now()));
             log.info("Device {} disconnected from this node: {}", deviceId,
                      clusterService.getLocalNode().id());
 
-            List<PortDescription> descs = store.getPortDescriptions(provider().id(), deviceId)
-                    .map(desc -> ensurePortEnabledState(desc, false))
-                    .collect(Collectors.toList());
+            /* If none can reach the device, we will continue with the disconnection logic.
+               If there is one instance that reported device is still reachable, we hand over
+               the mastership to it if we are the current master, otherwise if we are a backup
+               we demote ourselves to the bottom of the backups list */
+            if (updateMastershipFor(deviceId) == null) {
+                log.info("Device {} is fully disconnected from the cluster", deviceId);
+                List<PortDescription> descs = store.getPortDescriptions(provider().id(), deviceId)
+                        .map(desc -> ensurePortEnabledState(desc, false))
+                        .collect(Collectors.toList());
+                store.updatePorts(this.provider().id(), deviceId, descs);
 
-            store.updatePorts(this.provider().id(), deviceId, descs);
-            try {
-                if (mastershipService.isLocalMaster(deviceId)) {
-                    post(store.markOffline(deviceId));
-                }
-            } catch (IllegalStateException e) {
-                log.warn("Failed to mark {} offline", deviceId);
-                // only the MASTER should be marking off-line in normal cases,
-                // but if I was the last STANDBY connection, etc. and no one else
-                // was there to mark the device offline, this instance may need to
-                // temporarily request for Master Role and mark offline.
-
-                //there are times when this node will correctly have mastership, BUT
-                //that isn't reflected in the ClockManager before the device disconnects.
-                //we want to let go of the device anyways, so make sure this happens.
-
-                // FIXME: Store semantics leaking out as IllegalStateException.
-                //  Consider revising store API to handle this scenario.
-                CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
-                roleFuture.whenComplete((role, error) -> {
-                    MastershipTerm term = termService.getMastershipTerm(deviceId);
-                    // TODO: Move this type of check inside device clock manager, etc.
-                    if (term != null && localNodeId.equals(term.master())) {
-                        log.info("Retry marking {} offline", deviceId);
-                        post(store.markOffline(deviceId));
-                    } else {
-                        log.info("Failed again marking {} offline. {}", deviceId, role);
-                    }
-                });
-            } finally {
                 try {
-                    //relinquish master role and ability to be backup.
+                    if (mastershipService.isLocalMaster(deviceId)) {
+                        post(store.markOffline(deviceId));
+                    }
+                } catch (IllegalStateException e) {
+                    log.warn("Failed to mark {} offline", deviceId);
+                    // only the MASTER should be marking off-line in normal cases,
+                    // but if I was the last STANDBY connection, etc. and no one else
+                    // was there to mark the device offline, this instance may need to
+                    // temporarily request for Master Role and mark offline.
+
+                    //there are times when this node will correctly have mastership, BUT
+                    //that isn't reflected in the ClockManager before the device disconnects.
+                    //we want to let go of the device anyways, so make sure this happens.
+
+                    // FIXME: Store semantics leaking out as IllegalStateException.
+                    //  Consider revising store API to handle this scenario.
+                    CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
+                    roleFuture.whenComplete((role, error) -> {
+                        MastershipTerm term = termService.getMastershipTerm(deviceId);
+                        // TODO: Move this type of check inside device clock manager, etc.
+                        if (term != null && localNodeId.equals(term.master())) {
+                            log.info("Retry marking {} offline", deviceId);
+                            post(store.markOffline(deviceId));
+                        } else {
+                            log.info("Failed again marking {} offline. {}", deviceId, role);
+                        }
+                    });
+                } finally {
                     roleToAcknowledge.remove(deviceId);
-                    mastershipService.relinquishMastership(deviceId).get();
-                } catch (InterruptedException e) {
-                    log.warn("Interrupted while reliquishing role for {}", deviceId);
-                    Thread.currentThread().interrupt();
-                } catch (ExecutionException e) {
-                    log.error("Exception thrown while relinquishing role for {}", deviceId, e);
                 }
             }
+
         }
 
         @Override
@@ -875,7 +874,7 @@
                 // something was off with DeviceProvider, maybe check channel too?
                 log.warn("Failed to assert role onto Device {}", deviceId);
                 roleToAcknowledge.remove(deviceId);
-                mastershipService.relinquishMastership(deviceId);
+                updateMastershipFor(deviceId);
                 return;
             }
 
@@ -901,7 +900,6 @@
                     // roleManager got the device to comply, but doesn't agree with
                     // the store; use the store's view, then try to reassert.
                     backgroundService.execute(() -> reassertRole(deviceId, expected));
-                    return;
                 }
             } else {
                 // we didn't get back what we asked for. Reelect someone else.
@@ -910,17 +908,13 @@
                 if (requested == MastershipRole.MASTER) {
                     // Stop the timer
                     roleToAcknowledge.remove(deviceId);
-                    mastershipService.relinquishMastership(deviceId);
-                    // TODO: Shouldn't we be triggering event?
-                    //final Device device = getDevice(deviceId);
-                    //post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device));
+                    updateMastershipFor(deviceId);
                 } else if (requested ==  MastershipRole.STANDBY) {
                     // For P4RT devices, the response role will be NONE when this node is expected to be STANDBY
                     // but the stream channel is not opened correctly.
                     // Calling reassertRole will trigger the mechanism in GeneralDeviceProvider that
                     // attempts to re-establish the stream channel
                     backgroundService.execute(() -> reassertRole(deviceId, expected));
-                    return;
                 }
             }
         }
@@ -991,6 +985,16 @@
         return true;
     }
 
+    private boolean probeReachability(DeviceId deviceId) {
+        DeviceProvider provider = getProvider(deviceId);
+        if (provider == null) {
+            log.warn("Provider for {} was not found. Cannot probe reachability", deviceId);
+            return false;
+        }
+        return provider.isReachable(deviceId) && Tools.futureGetOrElse(provider.probeReachability(deviceId),
+                PROBE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, Boolean.FALSE);
+    }
+
     /**
      * Reassert role for specified device connected to this node.
      *
@@ -1030,7 +1034,7 @@
                 if (!applyRoleAndProbe(did, MASTER)) {
                     log.warn("Unsuccessful applying role {} to {}", myNextRole, did);
                     // immediately failed to apply role
-                    mastershipService.relinquishMastership(did);
+                    updateMastershipFor(did);
                     // FIXME disconnect?
                 }
                 break;
@@ -1039,7 +1043,7 @@
                 if (!applyRoleAndProbe(did, STANDBY)) {
                     log.warn("Unsuccessful applying role {} to {}", myNextRole, did);
                     // immediately failed to apply role
-                    mastershipService.relinquishMastership(did);
+                    updateMastershipFor(did);
                     // FIXME disconnect?
                 }
                 break;
@@ -1053,13 +1057,9 @@
     }
 
     private void handleMastershipEvent(MastershipEvent event) {
-        if (event.type() == MastershipEvent.Type.BACKUPS_CHANGED) {
-            // Don't care if backup list changed.
-            return;
-        }
         final DeviceId did = event.subject();
 
-        // myRole suggested by MastershipService
+        // myNextRole suggested by MastershipService event
         MastershipRole myNextRole;
         if (event.type() == MastershipEvent.Type.SUSPENDED) {
             myNextRole = NONE; // FIXME STANDBY OR NONE?
@@ -1080,24 +1080,40 @@
 
         final boolean isReachable = isReachable(did);
         if (!isReachable) {
-            // device is not connected to this node
+            // device is not connected to this node, nevertheless we should get a role
             if (mastershipService.getLocalRole(did) == NONE) {
                 log.debug("Node was instructed to be {} role for {}, "
                                   + "but this node cannot reach the device "
-                                  + "and role is already None. Ignoring request.",
+                                  + "and role is already None. Asking a new role "
+                                  + "and then apply the disconnection protocol.",
                           myNextRole, did);
+                try {
+                    mastershipService.requestRoleFor(did).get();
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    log.error("Interrupted waiting for Mastership", e);
+                } catch (ExecutionException e) {
+                    log.error("Encountered an error waiting for Mastership", e);
+                }
             } else if (myNextRole != NONE) {
                 log.warn("Node was instructed to be {} role for {}, "
-                                 + "but this node cannot reach the device.  "
-                                 + "Relinquishing role.  ",
+                                 + "but this node cannot reach the device. "
+                                 + "Apply the disconnection protocol.",
                          myNextRole, did);
-                roleToAcknowledge.remove(did);
-                mastershipService.relinquishMastership(did);
             }
+            // Let's put some order in the candidates list
+            roleToAcknowledge.remove(did);
+            updateMastershipFor(did);
             return;
         }
 
-        // device is connected to this node:
+        /* Device is connected to this node - always reassert the role.
+           Ideally, protocols like OpenFlow would not need to reassert the
+           role because the instances are only identified by the role. However,
+           other protocols like P4RT require to provide also an election id
+           which maybe different over time, by reasserting the role will guarantee
+           that updated election ids are communicated to the devices. It should not
+           cost us a lot as it is equivalent to a probe.*/
         if (store.getDevice(did) != null) {
             reassertRole(did, myNextRole);
         } else {
@@ -1375,6 +1391,83 @@
     }
 
     /**
+     * Handler for remote probe requests.
+     *
+     * @param deviceId the device to check
+     * @return whether or not the device is reachable
+     */
+    private boolean handleProbeRequest(DeviceId deviceId) {
+        int attempt = 0;
+        // Let's do a number of attempts
+        while (attempt < PROBE_ATTEMPTS) {
+            if (!probeReachability(deviceId)) {
+                return false;
+            }
+            attempt++;
+        }
+        return true;
+    }
+
+    /**
+     * Update the mastership for this device. If there is a node able
+     * to reach the device and this node is the master move the
+     * mastership to the next node still connected to this device.
+     * If the current node is a backup, it demotes itself to the bottom
+     * of the candidates list
+     *
+     * @param deviceId the device for which we have to update the mastership
+     * @return the NodeId of any node that can reach the device, or null if
+     * none of the ONOS instances can reach the device
+     */
+    private NodeId updateMastershipFor(DeviceId deviceId) {
+        Map<NodeId, CompletableFuture<Boolean>> probes = Maps.newHashMap();
+        // Request a probe only if the node is ready
+        for (ControllerNode onosNode : clusterService.getNodes()) {
+            if (!clusterService.getState(onosNode.id()).isReady() || localNodeId.equals(onosNode.id())) {
+                continue;
+            }
+            probes.put(onosNode.id(), communicationService.sendAndReceive(deviceId, PROBE_SUBJECT, SERIALIZER::encode,
+                    SERIALIZER::decode, onosNode.id()));
+        }
+
+        // Returns the first node able to reach the device
+        // FIXME [SDFAB-935] optimize by looking at the MastershipInfo
+        boolean isReachable;
+        NodeId nextMaster = null;
+        // FIXME Should we expose timeout? Understand if there is need to signal to the caller
+        for (Map.Entry<NodeId, CompletableFuture<Boolean>> probe : probes.entrySet()) {
+            isReachable = Tools.futureGetOrElse(probe.getValue(), PROBE_TIMEOUT_MILLIS,
+                    TimeUnit.MILLISECONDS, Boolean.FALSE);
+            if (isReachable) {
+                nextMaster = probe.getKey();
+            }
+        }
+
+        // FIXME [SDFAB-935] optimize demote by looking at the MastershipInfo;
+        if (nextMaster != null) {
+            log.info("Device {} is still connected to {}", deviceId, nextMaster);
+            MastershipRole myRole = mastershipService.getLocalRole(deviceId);
+            if (myRole == MASTER) {
+                log.info("Handing over the mastership of {} to next master {}", deviceId, nextMaster);
+                mastershipAdminService.setRole(nextMaster, deviceId, MASTER);
+                /* Do not demote here because setRole can return before the mastership has been passed.
+                   Current implementation promotes first the nextMaster as top of candidate list and then
+                   transfer the leadership. We can use the BACKUP events to do demote or leverage periodic
+                   checks.*/
+            } else if (myRole == STANDBY) {
+                log.info("Demote current instance to the bottom of the candidates list for {}", deviceId);
+                mastershipAdminService.demote(localNodeId, deviceId);
+            } else {
+                log.debug("No valid role for {}", deviceId);
+            }
+        }
+
+        return nextMaster;
+    }
+
+
+
+    /**
      * Port Enable/Disable message sent to the device's MASTER node.
      */
     private class InternalPortUpDownEvent {
diff --git a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
index 3e1e4d1..c2339f3 100644
--- a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
@@ -361,6 +361,11 @@
             return local;
         }
 
+        @Override
+        public ControllerNode.State getState(NodeId nodeId) {
+            return ControllerNode.State.READY;
+        }
+
     }
 
     private class TestNetworkConfigService extends NetworkConfigServiceAdapter {