[SDFAB-821] Implement auto-recovery mechanism for failed mastership changes

With this fix the ONOS cluster will start a new term if the mastership is not
acknowledged by the device after 10 seconds.

Change-Id: Id0927181e4ba37ed1af52dd9843d4c2563469025
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index cf6265b..e50fe37 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -90,6 +90,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -98,6 +99,7 @@
 import static com.google.common.collect.Multimaps.synchronizedListMultimap;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.lang.System.currentTimeMillis;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.MastershipRole.MASTER;
 import static org.onosproject.net.MastershipRole.NONE;
@@ -155,7 +157,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ClusterCommunicationService communicationService;
 
-    private ExecutorService portReqeustExecutor;
+    private ExecutorService portRequestExecutor;
     /**
      * List of all registered PortConfigOperator.
      */
@@ -198,6 +200,12 @@
     private final Map<DeviceId, LocalStatus> deviceLocalStatus =
             Maps.newConcurrentMap();
 
+    // To remember whether or not the role was acknowledged by the device
+    private final Map<DeviceId, Long> roleToAcknowledge =
+            Maps.newConcurrentMap();
+    private ScheduledExecutorService backgroundRoleChecker;
+    private static final int ROLE_TIMEOUT_SECONDS = 10;
+
     @Activate
     public void activate() {
         portAnnotationOp = new PortAnnotationOperator(networkConfigService);
@@ -217,17 +225,27 @@
             try {
                 mastershipCheck();
             } catch (Exception e) {
-                log.error("Exception thrown during integrity check", e);
+                log.error("Exception thrown during mastership integrity check", e);
             }
         }, 1, 1, TimeUnit.MINUTES);
 
-        portReqeustExecutor = newSingleThreadExecutor();
+        portRequestExecutor = newSingleThreadExecutor();
 
         communicationService.<InternalPortUpDownEvent>addSubscriber(
                 PORT_UPDOWN_SUBJECT,
                 SERIALIZER::decode,
                 this::handlePortRequest,
-                portReqeustExecutor);
+                portRequestExecutor);
+
+        backgroundRoleChecker = newSingleThreadScheduledExecutor(
+                groupedThreads("onos/device", "manager-role", log));
+        backgroundRoleChecker.scheduleAtFixedRate(() -> {
+            try {
+                roleCheck();
+            } catch (Exception e) {
+                log.error("Exception thrown while verifying role acknowledgement from all devices", e);
+            }
+        }, 0, 10, TimeUnit.SECONDS);
 
         log.info("Started");
     }
@@ -240,7 +258,8 @@
         mastershipService.removeListener(mastershipListener);
         eventDispatcher.removeSink(DeviceEvent.class);
         communicationService.removeSubscriber(PORT_UPDOWN_SUBJECT);
-        portReqeustExecutor.shutdown();
+        portRequestExecutor.shutdown();
+        backgroundRoleChecker.shutdown();
         log.info("Stopped");
     }
 
@@ -486,6 +505,7 @@
                             post(store.markOffline(deviceId));
                         }
                         //relinquish master role and ability to be backup.
+                        roleToAcknowledge.remove(deviceId);
                         mastershipService.relinquishMastership(deviceId).get();
                     } catch (InterruptedException e) {
                         log.warn("Interrupted while relinquishing role for {}", deviceId);
@@ -508,6 +528,8 @@
                             } else {
                                 log.info("Failed marking {} offline. {}", deviceId, role);
                             }
+                            // Stop the timer - just to prevent any race
+                            roleToAcknowledge.remove(deviceId);
                             mastershipService.relinquishMastership(deviceId);
                         });
                     }
@@ -532,6 +554,55 @@
         }
     }
 
+    /**
+     * Checks if all the devices have acknowledged the mastership role.
+     */
+    private void roleCheck() {
+        log.debug("Checking role");
+        for (Device device : getDevices()) {
+            final DeviceId deviceId = device.id();
+            MastershipRole myRole = mastershipService.getLocalRole(deviceId);
+            log.trace("Checking device {}. Current role is {}", deviceId, myRole);
+            final AtomicBoolean exists = new AtomicBoolean(false);
+            final Long ts = roleToAcknowledge.compute(deviceId, (key, value) -> {
+                if (value == null) {
+                    return null;
+                }
+                exists.set(true);
+                if (currentTimeMillis() - value < (ROLE_TIMEOUT_SECONDS * 1000)) {
+                    return value;
+                }
+                return null;
+            });
+            // Nobody applied the role recently
+            if (!exists.get()) {
+                log.trace("Role was not applied or it has been acknowledged for device {}", deviceId);
+                continue;
+            }
+            // Timeout still on
+            if (ts != null) {
+                log.debug("Timeout expires in {} ms", ((ROLE_TIMEOUT_SECONDS * 1000) - currentTimeMillis() + ts));
+                continue;
+            }
+            if (myRole != MASTER) {
+                log.debug("Timeout is expired but current role is not MASTER ({}), nothing to do", myRole);
+                continue;
+            }
+            /* Switch failed to acknowledge master role we asked for.
+               Yield mastership to other instance*/
+            log.warn("Failed to assert role onto device {}. requested={}, no response",
+                    deviceId, myRole);
+            mastershipService.relinquishMastership(deviceId).whenComplete((result, error) -> {
+                if (error != null) {
+                    log.error("Exception while relinquishing mastership", error);
+                } else {
+                    log.info("Successfully relinquished mastership for {}. Requesting new role", deviceId);
+                    mastershipService.requestRoleFor(deviceId);
+                }
+            });
+        }
+    }
+
     // Personalized device provider service issued to the supplied provider.
     private class InternalDeviceProviderService
             extends AbstractProviderService<DeviceProvider>
@@ -561,6 +632,8 @@
                          deviceId, newRole);
                 return false;
             }
+            // Start the timer
+            roleToAcknowledge.put(deviceId, currentTimeMillis());
             provider.roleChanged(deviceId, newRole);
             // not triggering probe when triggered by provider service event
             return true;
@@ -667,6 +740,7 @@
             } finally {
                 try {
                     //relinquish master role and ability to be backup.
+                    roleToAcknowledge.remove(deviceId);
                     mastershipService.relinquishMastership(deviceId).get();
                 } catch (InterruptedException e) {
                     log.warn("Interrupted while reliquishing role for {}", deviceId);
@@ -800,6 +874,7 @@
             if (requested == null && response == null) {
                 // something was off with DeviceProvider, maybe check channel too?
                 log.warn("Failed to assert role onto Device {}", deviceId);
+                roleToAcknowledge.remove(deviceId);
                 mastershipService.relinquishMastership(deviceId);
                 return;
             }
@@ -817,7 +892,9 @@
 
             if (Objects.equals(requested, response)) {
                 if (Objects.equals(requested, expected)) {
-                    return;
+                    // Stop the timer
+                    log.info("Role has been acknowledged for device {}", deviceId);
+                    roleToAcknowledge.remove(deviceId);
                 } else {
                     log.warn("Role mismatch on {}. Set to {}, but store demands {}",
                              deviceId, response, expected);
@@ -831,6 +908,8 @@
                 log.warn("Failed to assert role onto device {}. requested={}, response={}",
                          deviceId, requested, response);
                 if (requested == MastershipRole.MASTER) {
+                    // Stop the timer
+                    roleToAcknowledge.remove(deviceId);
                     mastershipService.relinquishMastership(deviceId);
                     // TODO: Shouldn't we be triggering event?
                     //final Device device = getDevice(deviceId);
@@ -900,6 +979,8 @@
             log.warn("Provider for {} was not found. Cannot apply role {}", deviceId, newRole);
             return false;
         }
+        // Start the timer
+        roleToAcknowledge.put(deviceId, currentTimeMillis());
         provider.roleChanged(deviceId, newRole);
 
         if (newRole.equals(MastershipRole.MASTER)) {
@@ -1010,6 +1091,7 @@
                                  + "but this node cannot reach the device.  "
                                  + "Relinquishing role.  ",
                          myNextRole, did);
+                roleToAcknowledge.remove(did);
                 mastershipService.relinquishMastership(did);
             }
             return;