[ONOS-4460] Relinquish device role when partitioned away from cluster

Change-Id: I578029614cced96a2d4503e4fe3052c927f051ab
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
index 79bd717..23d42e2 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
@@ -41,13 +41,23 @@
          * Signifies that the leader for a topic has changed.
          */
         // TODO: We may not need this. We currently do not support a way for a current leader to step down
-        // while still reamining a candidate
+        // while still remaining a candidate
         LEADER_CHANGED,
 
         /**
          * Signifies a change in the list of candidates for a topic.
          */
-        CANDIDATES_CHANGED
+        CANDIDATES_CHANGED,
+
+        /**
+         * Signifies the Leadership Elector is unavailable.
+         */
+        SERVICE_DISRUPTED,
+
+        /**
+         * Signifies the Leadership Elector is available again.
+         */
+        SERVICE_RESTORED
     }
 
     /**
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java
index 70c54b4..701e690 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java
@@ -24,7 +24,7 @@
 public interface LeadershipStore extends Store<LeadershipEvent, LeadershipStoreDelegate> {
 
     /**
-     * Adds registration for the local instance to be leader for topic.
+     * Adds registration for the local instance to be part of the leadership contest for topic.
      *
      * @param topic leadership topic
      * @return Updated leadership after operation is completed
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java b/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
index 1d3bbfd..1156787 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
@@ -44,7 +44,13 @@
          * the change in the backups list is accompanied by a change in
          * master, the event is subsumed by MASTER_CHANGED.
          */
-        BACKUPS_CHANGED
+        BACKUPS_CHANGED,
+
+        /**
+         * Signifies that the underlying storage for the Mastership state
+         * of this device is unavailable.
+         */
+        SUSPENDED
     }
 
     /**
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 21f0f76..8e09e8e 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -729,16 +729,17 @@
     }
 
     private void handleMastershipEvent(MastershipEvent event) {
-        if (event.type() != MastershipEvent.Type.MASTER_CHANGED) {
+        if (event.type() == MastershipEvent.Type.BACKUPS_CHANGED) {
             // Don't care if backup list changed.
             return;
         }
-
         final DeviceId did = event.subject();
 
         // myRole suggested by MastershipService
         MastershipRole myNextRole;
-        if (localNodeId.equals(event.roleInfo().master())) {
+        if (event.type() == MastershipEvent.Type.SUSPENDED) {
+            myNextRole = NONE; // FIXME STANDBY OR NONE?
+        } else if (localNodeId.equals(event.roleInfo().master())) {
             // confirm latest info
             MastershipTerm term = termService.getMastershipTerm(did);
             final boolean iHaveControl = term != null && localNodeId.equals(term.master());
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index 58dffce..f8be97e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -15,12 +15,16 @@
  */
 package org.onosproject.store.cluster.impl;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -35,6 +39,7 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Change;
 import org.onosproject.store.AbstractStore;
+import org.onosproject.store.service.DistributedPrimitive.Status;
 import org.onosproject.store.service.LeaderElector;
 import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
@@ -57,8 +62,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
+    private ExecutorService statusChangeHandler;
     private NodeId localNodeId;
     private LeaderElector leaderElector;
+    private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
 
     private final Consumer<Change<Leadership>> leadershipChangeListener =
             change -> {
@@ -77,22 +84,54 @@
                     eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
                 }
                 notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
+                // Update local cache of currently held leaderships
+                if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
+                    localLeaderCache.put(newValue.topic(), newValue);
+                } else {
+                    localLeaderCache.remove(newValue.topic());
+                }
             };
 
+    private final Consumer<Status> clientStatusListener = status ->
+            statusChangeHandler.execute(() -> handleStatusChange(status));
+
+    private void handleStatusChange(Status status) {
+        // Notify mastership Service of disconnect and reconnect
+        if (status == Status.ACTIVE) {
+            // Service Restored
+            localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
+            leaderElector.getLeaderships().forEach((topic, leadership) ->
+                    notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_RESTORED, leadership)));
+        } else if (status == Status.SUSPENDED) {
+            // Service Suspended
+            localLeaderCache.forEach((topic, leadership) ->
+                    notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_DISRUPTED, leadership)));
+        } else {
+            // Should be only inactive state
+            return;
+        }
+    }
+
+
     @Activate
     public void activate() {
+        statusChangeHandler = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
         localNodeId = clusterService.getLocalNode().id();
         leaderElector = storageService.leaderElectorBuilder()
                       .withName("onos-leadership-elections")
                       .build()
                       .asLeaderElector();
         leaderElector.addChangeListener(leadershipChangeListener);
+        leaderElector.addStatusChangeListener(clientStatusListener);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         leaderElector.removeChangeListener(leadershipChangeListener);
+        leaderElector.removeStatusChangeListener(clientStatusListener);
+        statusChangeHandler.shutdown();
         log.info("Stopped");
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 3e83183..3b87637 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -18,6 +18,7 @@
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
 import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
+import static org.onosproject.mastership.MastershipEvent.Type.SUSPENDED;
 import static org.slf4j.LoggerFactory.getLogger;
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -319,7 +320,8 @@
         private void handleEvent(LeadershipEvent event) {
             Leadership leadership = event.subject();
             DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
-            RoleInfo roleInfo = getNodes(deviceId);
+            RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
+                    getNodes(deviceId) : new RoleInfo();
             switch (event.type()) {
             case LEADER_AND_CANDIDATES_CHANGED:
                 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
@@ -331,6 +333,12 @@
             case CANDIDATES_CHANGED:
                 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
                 break;
+            case SERVICE_DISRUPTED:
+                notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, roleInfo));
+                break;
+            case SERVICE_RESTORED:
+                // Do nothing, wait for updates from peers
+                break;
             default:
                 return;
             }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 9b2f11e..3e45091 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -26,6 +26,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.NodeId;
@@ -63,6 +64,19 @@
     public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
     private final LoadingCache<String, CompletableFuture<Leadership>> cache;
 
+    Function<CopycatClient.State, Status> mapper = state -> {
+        switch (state) {
+            case CONNECTED:
+                return Status.ACTIVE;
+            case SUSPENDED:
+                return Status.SUSPENDED;
+            case CLOSED:
+                return Status.INACTIVE;
+            default:
+                throw new IllegalStateException("Unknown state " + state);
+        }
+    };
+
     public AtomixLeaderElector(CopycatClient client, Properties properties) {
         super(client, properties);
         cache = CacheBuilder.newBuilder()
@@ -79,6 +93,7 @@
             }
         };
         addStatusChangeListener(statusListener);
+        client.onStateChange(this::handleStateChange);
     }
 
     @Override
@@ -193,4 +208,8 @@
     private boolean isListening() {
         return !leadershipChangeListeners.isEmpty();
     }
+
+    private void handleStateChange(CopycatClient.State state) {
+        statusChangeListeners().forEach(listener -> listener.accept(mapper.apply(state)));
+    }
 }
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
index dca849e..e5423c2 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
@@ -542,22 +542,20 @@
     private class InternalRoleListener implements MastershipListener {
         @Override
         public void event(MastershipEvent event) {
-            if (MastershipEvent.Type.BACKUPS_CHANGED.equals(event.type())) {
+            if (MastershipEvent.Type.MASTER_CHANGED.equals(event.type())) {
                 // only need new master events
-                return;
+                eventExecutor.execute(() -> {
+                    DeviceId deviceId = event.subject();
+                    Device device = deviceService.getDevice(deviceId);
+                    if (device == null) {
+                        log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
+                        return;
+                    }
+                    if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
+                        updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
+                    }
+                });
             }
-
-            eventExecutor.execute(() -> {
-                DeviceId deviceId = event.subject();
-                Device device = deviceService.getDevice(deviceId);
-                if (device == null) {
-                    log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
-                    return;
-                }
-                if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
-                    updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
-                }
-            });
         }
     }