[ONOS-4460] Relinquish device role when partitioned away from cluster

Change-Id: I578029614cced96a2d4503e4fe3052c927f051ab
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index 58dffce..f8be97e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -15,12 +15,16 @@
  */
 package org.onosproject.store.cluster.impl;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.Consumer;
 
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -35,6 +39,7 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.event.Change;
 import org.onosproject.store.AbstractStore;
+import org.onosproject.store.service.DistributedPrimitive.Status;
 import org.onosproject.store.service.LeaderElector;
 import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
@@ -57,8 +62,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
+    private ExecutorService statusChangeHandler;
     private NodeId localNodeId;
     private LeaderElector leaderElector;
+    private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
 
     private final Consumer<Change<Leadership>> leadershipChangeListener =
             change -> {
@@ -77,22 +84,54 @@
                     eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
                 }
                 notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
+                // Update local cache of currently held leaderships
+                if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
+                    localLeaderCache.put(newValue.topic(), newValue);
+                } else {
+                    localLeaderCache.remove(newValue.topic());
+                }
             };
 
+    private final Consumer<Status> clientStatusListener = status ->
+            statusChangeHandler.execute(() -> handleStatusChange(status));
+
+    private void handleStatusChange(Status status) {
+        // Notify mastership Service of disconnect and reconnect
+        if (status == Status.ACTIVE) {
+            // Service Restored
+            localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
+            leaderElector.getLeaderships().forEach((topic, leadership) ->
+                    notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_RESTORED, leadership)));
+        } else if (status == Status.SUSPENDED) {
+            // Service Suspended
+            localLeaderCache.forEach((topic, leadership) ->
+                    notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_DISRUPTED, leadership)));
+        } else {
+            // Should be only inactive state
+            return;
+        }
+    }
+
+
     @Activate
     public void activate() {
+        statusChangeHandler = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
         localNodeId = clusterService.getLocalNode().id();
         leaderElector = storageService.leaderElectorBuilder()
                       .withName("onos-leadership-elections")
                       .build()
                       .asLeaderElector();
         leaderElector.addChangeListener(leadershipChangeListener);
+        leaderElector.addStatusChangeListener(clientStatusListener);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         leaderElector.removeChangeListener(leadershipChangeListener);
+        leaderElector.removeStatusChangeListener(clientStatusListener);
+        statusChangeHandler.shutdown();
         log.info("Stopped");
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 3e83183..3b87637 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -18,6 +18,7 @@
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
 import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
+import static org.onosproject.mastership.MastershipEvent.Type.SUSPENDED;
 import static org.slf4j.LoggerFactory.getLogger;
 import static com.google.common.base.Preconditions.checkArgument;
 
@@ -319,7 +320,8 @@
         private void handleEvent(LeadershipEvent event) {
             Leadership leadership = event.subject();
             DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
-            RoleInfo roleInfo = getNodes(deviceId);
+            RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
+                    getNodes(deviceId) : new RoleInfo();
             switch (event.type()) {
             case LEADER_AND_CANDIDATES_CHANGED:
                 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
@@ -331,6 +333,12 @@
             case CANDIDATES_CHANGED:
                 notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
                 break;
+            case SERVICE_DISRUPTED:
+                notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, roleInfo));
+                break;
+            case SERVICE_RESTORED:
+                // Do nothing, wait for updates from peers
+                break;
             default:
                 return;
             }