Updated DistributedLeadershipManager to use ConsistentMap notifications

Change-Id: Ice4e9b295f4216fee13144ec631904f34bdf7b2b
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 763deca..439852f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -15,7 +15,6 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.ClusterEvent.Type;
 import org.onosproject.cluster.ClusterEventListener;
@@ -28,10 +27,10 @@
 import org.onosproject.event.ListenerRegistry;
 import org.onosproject.event.EventDeliveryService;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Versioned;
@@ -46,7 +45,6 @@
 import java.util.List;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -82,11 +80,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected EventDeliveryService eventDispatcher;
 
-    private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
-            new MessageSubject("distributed-leadership-manager-events");
-
     private final Logger log = getLogger(getClass());
-    private ExecutorService messageHandlingExecutor;
     private ScheduledExecutorService electionRunner;
     private ScheduledExecutorService lockExecutor;
     private ScheduledExecutorService staleLeadershipPurgeExecutor;
@@ -104,7 +98,7 @@
     private Set<String> activeTopics = Sets.newConcurrentHashSet();
     private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
 
-    // The actual delay is randomly chosen between the interval [0, WAIT_BEFORE_RETRY_MILLIS)
+    // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
     private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
     private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
     private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
@@ -112,8 +106,7 @@
 
     private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
 
-    private static final Serializer SERIALIZER = Serializer.using(
-            new KryoNamespace.Builder().register(KryoNamespaces.API).build());
+    private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
 
     @Activate
     public void activate() {
@@ -126,10 +119,38 @@
                 .withSerializer(SERIALIZER)
                 .withPartitionsDisabled().build();
 
+        leaderMap.addListener(event -> {
+            log.debug("Received {}", event);
+            LeadershipEvent.Type leadershipEventType = null;
+            if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
+                leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
+            } else if (event.type() == MapEvent.Type.REMOVE) {
+                leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
+            }
+            onLeadershipEvent(new LeadershipEvent(
+                    leadershipEventType,
+                    new Leadership(event.key(),
+                            event.value().value(),
+                            event.value().version(),
+                            event.value().creationTime())));
+        });
+
+        candidateMap.addListener(event -> {
+            log.debug("Received {}", event);
+            if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
+                log.error("Entries must not be removed from candidate map");
+                return;
+            }
+            onLeadershipEvent(new LeadershipEvent(
+                    LeadershipEvent.Type.CANDIDATES_CHANGED,
+                    new Leadership(event.key(),
+                            event.value().value(),
+                            event.value().version(),
+                            event.value().creationTime())));
+        });
+
         localNodeId = clusterService.getLocalNode().id();
 
-        messageHandlingExecutor = Executors.newSingleThreadExecutor(
-                groupedThreads("onos/store/leadership", "message-handler"));
         electionRunner = Executors.newSingleThreadScheduledExecutor(
                 groupedThreads("onos/store/leadership", "election-runner"));
         lockExecutor = Executors.newScheduledThreadPool(
@@ -138,11 +159,6 @@
                 groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
         leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
                 groupedThreads("onos/store/leadership", "refresh-thread"));
-        clusterCommunicator.addSubscriber(
-                LEADERSHIP_EVENT_MESSAGE_SUBJECT,
-                SERIALIZER::decode,
-                this::onLeadershipEvent,
-                messageHandlingExecutor);
 
         clusterService.addListener(clusterEventListener);
 
@@ -168,10 +184,8 @@
 
         clusterService.removeListener(clusterEventListener);
         eventDispatcher.removeSink(LeadershipEvent.class);
-        clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
 
         electionRunner.shutdown();
-        messageHandlingExecutor.shutdown();
         lockExecutor.shutdown();
         staleLeadershipPurgeExecutor.shutdown();
         leadershipRefresher.shutdown();
@@ -239,12 +253,6 @@
                             return newList;
                         }
                     });
-            publish(new LeadershipEvent(
-                    LeadershipEvent.Type.CANDIDATES_CHANGED,
-                    new Leadership(path,
-                            candidates.value(),
-                            candidates.version(),
-                            candidates.creationTime())));
             log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
             activeTopics.add(path);
             Leadership leadership = electLeader(path, candidates.value());
@@ -273,41 +281,14 @@
             future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
         }
         try {
-            Versioned<NodeId> leader = leaderMap.get(path);
-            if (leader != null && Objects.equals(leader.value(), localNodeId)) {
-                if (leaderMap.remove(path, leader.version())) {
-                    log.debug("Gave up leadership for {}", path);
-                    future.complete(null);
-                    publish(new LeadershipEvent(
-                            LeadershipEvent.Type.LEADER_BOOTED,
-                            new Leadership(path,
-                                localNodeId,
-                                leader.version(),
-                                leader.creationTime())));
-                }
-            }
-            // else we are not the current leader, can still be a candidate.
-            Versioned<List<NodeId>> candidates = candidateMap.get(path);
-            List<NodeId> candidateList = candidates != null
-                    ? Lists.newArrayList(candidates.value())
-                    : Lists.newArrayList();
-            if (!candidateList.remove(localNodeId)) {
-                future.complete(null);
-                return;
-            }
-            if (candidateMap.replace(path, candidates.version(), candidateList)) {
-                Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
-                future.complete(null);
-                publish(new LeadershipEvent(
-                                LeadershipEvent.Type.CANDIDATES_CHANGED,
-                                new Leadership(path,
-                                    newCandidates.value(),
-                                    newCandidates.version(),
-                                    newCandidates.creationTime())));
-            } else {
-                log.debug("Failed to withdraw from candidates list for {}. Will retry", path);
-                retryWithdraw(path, future);
-            }
+            leaderMap.computeIf(path,
+                                localNodeId::equals,
+                                (topic, leader) -> null);
+            candidateMap.computeIf(path,
+                                   candidates -> candidates != null && candidates.contains(localNodeId),
+                                   (topic, candidates) -> candidates.stream()
+                                                                    .filter(nodeId -> !localNodeId.equals(nodeId))
+                                                                    .collect(Collectors.toList()));
         } catch (Exception e) {
             log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
             retryWithdraw(path, future);
@@ -321,19 +302,9 @@
         }
 
         try {
-            Versioned<NodeId> leader = leaderMap.get(path);
-            if (leader != null && Objects.equals(leader.value(), localNodeId)) {
-                if (leaderMap.remove(path, leader.version())) {
-                    log.debug("Stepped down from leadership for {}", path);
-                    publish(new LeadershipEvent(
-                            LeadershipEvent.Type.LEADER_BOOTED,
-                            new Leadership(path,
-                                localNodeId,
-                                leader.version(),
-                                leader.creationTime())));
-                    return true;
-                }
-            }
+            return leaderMap.computeIf(path,
+                                       localNodeId::equals,
+                                       (topic, leader) -> null) == null;
         } catch (Exception e) {
             log.warn("Error executing stepdown for {}", path, e);
         }
@@ -352,7 +323,7 @@
 
     @Override
     public boolean makeTopCandidate(String path, NodeId nodeId) {
-        Versioned<List<NodeId>> newCandidates = candidateMap.computeIf(path,
+        Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
                 candidates -> candidates != null &&
                               candidates.contains(nodeId) &&
                               !nodeId.equals(Iterables.getFirst(candidates, null)),
@@ -362,13 +333,8 @@
                     candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
                     return updatedCandidates;
                 });
-        publish(new LeadershipEvent(
-                    LeadershipEvent.Type.CANDIDATES_CHANGED,
-                    new Leadership(path,
-                        newCandidates.value(),
-                        newCandidates.version(),
-                        newCandidates.creationTime())));
-        return true;
+        List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
+        return candidates.size() > 0 && nodeId.equals(candidates.get(0));
     }
 
     private Leadership electLeader(String path, List<NodeId> candidates) {
@@ -389,9 +355,6 @@
                             leader.value(),
                             leader.version(),
                             leader.creationTime());
-                    publish(new LeadershipEvent(
-                            LeadershipEvent.Type.LEADER_ELECTED,
-                            newLeadership));
                     return newLeadership;
                 }
             } catch (Exception e) {
@@ -432,11 +395,6 @@
         }
     }
 
-    private void publish(LeadershipEvent event) {
-        onLeadershipEvent(event);
-        clusterCommunicator.broadcast(event, LEADERSHIP_EVENT_MESSAGE_SUBJECT, SERIALIZER::encode);
-    }
-
     private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
         log.trace("Leadership Event: time = {} type = {} event = {}",
                 leadershipEvent.time(), leadershipEvent.type(),
@@ -517,15 +475,8 @@
                 .forEach(entry -> {
                     String path = entry.getKey();
                     NodeId nodeId = entry.getValue().value();
-                    long epoch = entry.getValue().version();
-                    long creationTime = entry.getValue().creationTime();
                     try {
-                        if (leaderMap.remove(path, epoch)) {
-                            log.debug("Purged stale lock held by {} for {}", nodeId, path);
-                            publish(new LeadershipEvent(
-                                    LeadershipEvent.Type.LEADER_BOOTED,
-                                    new Leadership(path, nodeId, epoch, creationTime)));
-                        }
+                        leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
                     } catch (Exception e) {
                         log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
                         rerunPurge.set(true);
@@ -548,21 +499,15 @@
                                 Sets.difference(Sets.newHashSet(candidatesList),
                                                 Sets.newHashSet(activeCandidatesList));
                         try {
-                            if (candidateMap.replace(path, entry.getValue().version(), activeCandidatesList)) {
-                                log.info("Evicted inactive candidates {} from "
-                                        + "candidate list for {}", removedCandidates, path);
-                                Versioned<List<NodeId>> updatedCandidates = candidateMap.get(path);
-                                publish(new LeadershipEvent(
-                                        LeadershipEvent.Type.CANDIDATES_CHANGED,
-                                        new Leadership(path,
-                                                updatedCandidates.value(),
-                                                updatedCandidates.version(),
-                                                updatedCandidates.creationTime())));
-                            } else {
-                                // Conflicting update detected. Rerun purge to make sure
-                                // inactive candidates are evicted.
-                                rerunPurge.set(true);
-                            }
+                            candidateMap.computeIf(path,
+                                        c -> c.stream()
+                                              .filter(n -> clusterService.getState(n) == INACTIVE)
+                                              .count() > 0,
+                                        (topic, c) -> c.stream()
+                                                       .filter(n -> clusterService.getState(n) == ACTIVE)
+                                                       .filter(n -> !localNodeId.equals(n) ||
+                                                                   activeTopics.contains(path))
+                                                       .collect(Collectors.toList()));
                         } catch (Exception e) {
                             log.debug("Failed to evict inactive candidates {} from "
                                     + "candidate list for {}", removedCandidates, path, e);