ONOS-1965: Evict inactive nodes from candidates map + Rely on cluster events to trigger stale lock purge

Change-Id: Ib7cfea397f98d6271beb78e3b88041bb84550506
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 434d0e2..5976325 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -13,6 +13,9 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEvent.Type;
+import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.LeadershipEvent;
@@ -32,6 +35,7 @@
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Objects;
@@ -55,7 +59,7 @@
 /**
  * Distributed Lock Manager implemented on top of ConsistentMap.
  * <p>
- * This implementation makes use of cluster manager's failure
+ * This implementation makes use of ClusterService's failure
  * detection capabilities to detect and purge stale locks.
  * TODO: Ensure lock safety and liveness.
  */
@@ -81,27 +85,28 @@
     private final Logger log = getLogger(getClass());
     private ExecutorService messageHandlingExecutor;
     private ScheduledExecutorService retryLeaderLockExecutor;
-    private ScheduledExecutorService deadLockDetectionExecutor;
+    private ScheduledExecutorService staleLeadershipPurgeExecutor;
     private ScheduledExecutorService leadershipStatusBroadcaster;
 
     private ConsistentMap<String, NodeId> leaderMap;
     private ConsistentMap<String, List<NodeId>> candidateMap;
 
-    private ListenerRegistry<LeadershipEvent, LeadershipEventListener>
-        listenerRegistry;
+    private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
     private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
     private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
-    private NodeId localNodeId;
+    private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
 
+    private NodeId localNodeId;
     private Set<String> activeTopics = Sets.newConcurrentHashSet();
 
     private static final int ELECTION_JOIN_ATTEMPT_INTERVAL_SEC = 2;
     private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
-    private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
     private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC = 2;
-
+    private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
     private static final int LEADER_CANDIDATE_POS = 0;
 
+    private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
+
     private static final Serializer SERIALIZER = Serializer.using(
             new KryoNamespace.Builder().register(KryoNamespaces.API).build());
 
@@ -122,8 +127,8 @@
                 groupedThreads("onos/store/leadership", "message-handler"));
         retryLeaderLockExecutor = Executors.newScheduledThreadPool(
                 4, groupedThreads("onos/store/leadership", "election-thread-%d"));
-        deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
-                groupedThreads("onos/store/leadership", "dead-lock-detector"));
+        staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
         leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
                 groupedThreads("onos/store/leadership", "peer-updater"));
         clusterCommunicator.addSubscriber(
@@ -132,8 +137,8 @@
                 this::onLeadershipEvent,
                 messageHandlingExecutor);
 
-        deadLockDetectionExecutor.scheduleWithFixedDelay(
-                this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
+        clusterService.addListener(clusterEventListener);
+
         leadershipStatusBroadcaster.scheduleWithFixedDelay(
                 this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL_SEC, TimeUnit.SECONDS);
 
@@ -151,12 +156,13 @@
             }
         });
 
+        clusterService.removeListener(clusterEventListener);
         eventDispatcher.removeSink(LeadershipEvent.class);
         clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
 
         messageHandlingExecutor.shutdown();
         retryLeaderLockExecutor.shutdown();
-        deadLockDetectionExecutor.shutdown();
+        staleLeadershipPurgeExecutor.shutdown();
         leadershipStatusBroadcaster.shutdown();
 
         log.info("Stopped");
@@ -508,12 +514,25 @@
                 TimeUnit.SECONDS);
     }
 
-    private void purgeStaleLocks() {
+    private void scheduleStaleLeadershipPurge(int afterDelaySec) {
+        if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
+            staleLeadershipPurgeExecutor.schedule(
+                    this::purgeStaleLeadership,
+                    afterDelaySec,
+                    TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
+     */
+    private void purgeStaleLeadership() {
+        AtomicBoolean rerunPurge = new AtomicBoolean(false);
         try {
+            staleLeadershipPurgeScheduled.set(false);
             leaderMap.entrySet()
                 .stream()
                 .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
-                .filter(e -> activeTopics.contains(e.getKey()))
                 .forEach(entry -> {
                     String path = entry.getKey();
                     NodeId nodeId = entry.getValue().value();
@@ -528,10 +547,52 @@
                         }
                     } catch (Exception e) {
                         log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
+                        rerunPurge.set(true);
+                    }
+                });
+
+            candidateMap.entrySet()
+                .forEach(entry -> {
+                    String path = entry.getKey();
+                    Versioned<List<NodeId>> candidates = entry.getValue();
+                    List<NodeId> candidatesList = candidates != null
+                            ? candidates.value() : Collections.emptyList();
+                    List<NodeId> activeCandidatesList =
+                            candidatesList.stream()
+                                          .filter(n -> clusterService.getState(n) == ACTIVE)
+                                          .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
+                                          .collect(Collectors.toList());
+                    if (activeCandidatesList.size() < candidatesList.size()) {
+                        Set<NodeId> removedCandidates =
+                                Sets.difference(Sets.newHashSet(candidatesList),
+                                                Sets.newHashSet(activeCandidatesList));
+                        try {
+                            if (candidateMap.replace(path, entry.getValue().version(), activeCandidatesList)) {
+                                log.info("Evicted inactive candidates {} from "
+                                        + "candidate list for {}", removedCandidates, path);
+                                Versioned<List<NodeId>> updatedCandidates = candidateMap.get(path);
+                                publish(new LeadershipEvent(
+                                        LeadershipEvent.Type.CANDIDATES_CHANGED,
+                                        new Leadership(path,
+                                                updatedCandidates.value(),
+                                                updatedCandidates.version(),
+                                                updatedCandidates.creationTime())));
+                            }
+                        } catch (Exception e) {
+                            log.warn("Failed to evict inactive candidates {} from "
+                                    + "candidate list for {}", removedCandidates, path, e);
+                            rerunPurge.set(true);
+                        }
                     }
                 });
         } catch (Exception e) {
-            log.debug("Failed cleaning up stale locks", e);
+            log.warn("Failure purging state leadership.", e);
+            rerunPurge.set(true);
+        }
+
+        if (rerunPurge.get()) {
+            log.info("Rescheduling stale leadership purge due to errors encountered in previous run");
+            scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
         }
     }
 
@@ -555,4 +616,14 @@
             log.debug("Failed to send leadership updates", e);
         }
     }
+
+    private class InternalClusterEventListener implements ClusterEventListener {
+
+        @Override
+        public void event(ClusterEvent event) {
+            if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
+                scheduleStaleLeadershipPurge(0);
+            }
+        }
+    }
 }