Refactor change: Moved all distributed primitive implementation classes into onos-core-primitives bundle

Change-Id: Icd5dbd4133cb2f21bd403bcd598e6012813e6bfd
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedLeadershipManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedLeadershipManager.java
new file mode 100644
index 0000000..db55a85
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DistributedLeadershipManager.java
@@ -0,0 +1,614 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.MapDifference;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEvent.Type;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.cluster.ControllerNode.State.ACTIVE;
+import static org.onosproject.cluster.ControllerNode.State.INACTIVE;
+
+/**
+ * Distributed Lock Manager implemented on top of ConsistentMap.
+ * <p>
+ * This implementation makes use of ClusterService's failure
+ * detection capabilities to detect and purge stale locks.
+ * TODO: Ensure lock safety and liveness.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DistributedLeadershipManager implements LeadershipService {
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDispatcher;
+
+    private final Logger log = getLogger(getClass());
+
+    private ScheduledExecutorService electionRunner;
+    private ScheduledExecutorService lockExecutor;
+    private ScheduledExecutorService staleLeadershipPurgeExecutor;
+    private ScheduledExecutorService leadershipRefresher;
+
+    // leader for each topic
+    private ConsistentMap<String, NodeId> leaderMap;
+    // list of candidates (includes chosen leader) for each topic
+    private ConsistentMap<String, List<NodeId>> candidateMap;
+
+    private ListenerRegistry<LeadershipEvent, LeadershipEventListener> listenerRegistry;
+
+    // cached copy of leaderMap
+    // Note: Map value, Leadership, does not contain proper candidates info
+    private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
+    // cached copy of candidateMap
+    // Note: Map value, Leadership, does not contain proper leader info
+    private final Map<String, Leadership> candidateBoard = Maps.newConcurrentMap();
+
+    private final ClusterEventListener clusterEventListener = new InternalClusterEventListener();
+
+    private NodeId localNodeId;
+    private Set<String> activeTopics = Sets.newConcurrentHashSet();
+    private Map<String, CompletableFuture<Leadership>> pendingFutures = Maps.newConcurrentMap();
+
+    // The actual delay is randomly chosen from the interval [0, WAIT_BEFORE_RETRY_MILLIS)
+    private static final int WAIT_BEFORE_RETRY_MILLIS = 150;
+    private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
+    private static final int LEADERSHIP_REFRESH_INTERVAL_SEC = 2;
+    private static final int DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC = 2;
+
+    private final AtomicBoolean staleLeadershipPurgeScheduled = new AtomicBoolean(false);
+
+    private static final Serializer SERIALIZER = Serializer.using(KryoNamespaces.API);
+
+    @Activate
+    public void activate() {
+        leaderMap = storageService.<String, NodeId>consistentMapBuilder()
+                .withName("onos-topic-leaders")
+                .withSerializer(SERIALIZER)
+                .withPartitionsDisabled().build();
+        candidateMap = storageService.<String, List<NodeId>>consistentMapBuilder()
+                .withName("onos-topic-candidates")
+                .withSerializer(SERIALIZER)
+                .withPartitionsDisabled().build();
+
+        leaderMap.addListener(event -> {
+            log.debug("Received {}", event);
+            LeadershipEvent.Type leadershipEventType = null;
+            if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
+                leadershipEventType = LeadershipEvent.Type.LEADER_ELECTED;
+            } else if (event.type() == MapEvent.Type.REMOVE) {
+                leadershipEventType = LeadershipEvent.Type.LEADER_BOOTED;
+            }
+            onLeadershipEvent(new LeadershipEvent(
+                    leadershipEventType,
+                    new Leadership(event.key(),
+                            event.value().value(),
+                            event.value().version(),
+                            event.value().creationTime())));
+        });
+
+        candidateMap.addListener(event -> {
+            log.debug("Received {}", event);
+            if (event.type() != MapEvent.Type.INSERT && event.type() != MapEvent.Type.UPDATE) {
+                log.error("Entries must not be removed from candidate map");
+                return;
+            }
+            onLeadershipEvent(new LeadershipEvent(
+                    LeadershipEvent.Type.CANDIDATES_CHANGED,
+                    new Leadership(event.key(),
+                            event.value().value(),
+                            event.value().version(),
+                            event.value().creationTime())));
+        });
+
+        localNodeId = clusterService.getLocalNode().id();
+
+        electionRunner = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/store/leadership", "election-runner"));
+        lockExecutor = Executors.newScheduledThreadPool(
+                4, groupedThreads("onos/store/leadership", "election-thread-%d"));
+        staleLeadershipPurgeExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/store/leadership", "stale-leadership-evictor"));
+        leadershipRefresher = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/store/leadership", "refresh-thread"));
+
+        clusterService.addListener(clusterEventListener);
+
+        electionRunner.scheduleWithFixedDelay(
+                this::electLeaders, 0, DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC, TimeUnit.SECONDS);
+
+        leadershipRefresher.scheduleWithFixedDelay(
+                this::refreshLeaderBoard, 0, LEADERSHIP_REFRESH_INTERVAL_SEC, TimeUnit.SECONDS);
+
+        listenerRegistry = new ListenerRegistry<>();
+        eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        if (clusterService.getNodes().size() > 1) {
+            // FIXME: Determine why this takes ~50 seconds to shutdown on a single node!
+            leaderBoard.forEach((topic, leadership) -> {
+                if (localNodeId.equals(leadership.leader())) {
+                    withdraw(topic);
+                }
+            });
+        }
+
+        clusterService.removeListener(clusterEventListener);
+        eventDispatcher.removeSink(LeadershipEvent.class);
+
+        electionRunner.shutdown();
+        lockExecutor.shutdown();
+        staleLeadershipPurgeExecutor.shutdown();
+        leadershipRefresher.shutdown();
+
+        log.info("Stopped");
+    }
+
+    @Override
+    public Map<String, Leadership> getLeaderBoard() {
+        return ImmutableMap.copyOf(leaderBoard);
+    }
+
+    @Override
+    public Map<String, List<NodeId>> getCandidates() {
+        return Maps.toMap(candidateBoard.keySet(), this::getCandidates);
+    }
+
+    @Override
+    public List<NodeId> getCandidates(String path) {
+        Leadership current = candidateBoard.get(path);
+        return current == null ? ImmutableList.of() : ImmutableList.copyOf(current.candidates());
+    }
+
+    @Override
+    public NodeId getLeader(String path) {
+        Leadership leadership = leaderBoard.get(path);
+        return leadership != null ? leadership.leader() : null;
+    }
+
+    @Override
+    public Leadership getLeadership(String path) {
+        checkArgument(path != null);
+        return leaderBoard.get(path);
+    }
+
+    @Override
+    public Set<String> ownedTopics(NodeId nodeId) {
+        checkArgument(nodeId != null);
+        return leaderBoard.entrySet()
+                    .stream()
+                    .filter(entry -> nodeId.equals(entry.getValue().leader()))
+                    .map(Entry::getKey)
+                    .collect(Collectors.toSet());
+    }
+
+    @Override
+    public CompletableFuture<Leadership> runForLeadership(String path) {
+        log.debug("Running for leadership for topic: {}", path);
+        CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
+        doRunForLeadership(path, resultFuture);
+        return resultFuture;
+    }
+
+    private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
+        try {
+            Versioned<List<NodeId>> candidates = candidateMap.computeIf(path,
+                    currentList -> currentList == null || !currentList.contains(localNodeId),
+                    (topic, currentList) -> {
+                        if (currentList == null) {
+                            return ImmutableList.of(localNodeId);
+                        } else {
+                            List<NodeId> newList = Lists.newLinkedList();
+                            newList.addAll(currentList);
+                            newList.add(localNodeId);
+                            return newList;
+                        }
+                    });
+            log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
+            activeTopics.add(path);
+            Leadership leadership = electLeader(path, candidates.value());
+            if (leadership == null) {
+                pendingFutures.put(path, future);
+            } else {
+                future.complete(leadership);
+            }
+        } catch (ConsistentMapException e) {
+            log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
+            rerunForLeadership(path, future);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> withdraw(String path) {
+        activeTopics.remove(path);
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        doWithdraw(path, resultFuture);
+        return resultFuture;
+    }
+
+
+    private void doWithdraw(String path, CompletableFuture<Void> future) {
+        if (activeTopics.contains(path)) {
+            future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
+        }
+        try {
+            leaderMap.computeIf(path,
+                                localNodeId::equals,
+                                (topic, leader) -> null);
+            candidateMap.computeIf(path,
+                                   candidates -> candidates != null && candidates.contains(localNodeId),
+                                   (topic, candidates) -> candidates.stream()
+                                                                    .filter(nodeId -> !localNodeId.equals(nodeId))
+                                                                    .collect(Collectors.toList()));
+            future.complete(null);
+        } catch (Exception e) {
+            log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
+            retryWithdraw(path, future);
+        }
+    }
+
+    @Override
+    public boolean stepdown(String path) {
+        if (!activeTopics.contains(path) || !Objects.equals(localNodeId, getLeader(path))) {
+            return false;
+        }
+
+        try {
+            return leaderMap.computeIf(path,
+                                       localNodeId::equals,
+                                       (topic, leader) -> null) == null;
+        } catch (Exception e) {
+            log.warn("Error executing stepdown for {}", path, e);
+        }
+        return false;
+    }
+
+    @Override
+    public void addListener(LeadershipEventListener listener) {
+        listenerRegistry.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(LeadershipEventListener listener) {
+        listenerRegistry.removeListener(listener);
+    }
+
+    @Override
+    public boolean makeTopCandidate(String path, NodeId nodeId) {
+        Versioned<List<NodeId>> candidateList = candidateMap.computeIf(path,
+                candidates -> candidates != null &&
+                              candidates.contains(nodeId) &&
+                              !nodeId.equals(Iterables.getFirst(candidates, null)),
+                (topic, candidates) -> {
+                    List<NodeId> updatedCandidates = new ArrayList<>(candidates.size());
+                    updatedCandidates.add(nodeId);
+                    candidates.stream().filter(id -> !nodeId.equals(id)).forEach(updatedCandidates::add);
+                    return updatedCandidates;
+                });
+        List<NodeId> candidates = candidateList != null ? candidateList.value() : Collections.emptyList();
+        return candidates.size() > 0 && nodeId.equals(candidates.get(0));
+    }
+
+    private Leadership electLeader(String path, List<NodeId> candidates) {
+        Leadership currentLeadership = getLeadership(path);
+        if (currentLeadership != null) {
+            return currentLeadership;
+        } else {
+            NodeId topCandidate = candidates
+                        .stream()
+                        .filter(n -> clusterService.getState(n) == ACTIVE)
+                        .findFirst()
+                        .orElse(null);
+            try {
+                Versioned<NodeId> leader = localNodeId.equals(topCandidate)
+                        ? leaderMap.computeIfAbsent(path, p -> localNodeId) : leaderMap.get(path);
+                if (leader != null) {
+                    Leadership newLeadership = new Leadership(path,
+                            leader.value(),
+                            leader.version(),
+                            leader.creationTime());
+                    // Since reads only go through the local copy of leader board, we ought to update it
+                    // first before returning from this method.
+                    // This is to ensure a subsequent read will not read a stale value.
+                    onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership));
+                    return newLeadership;
+                }
+            } catch (Exception e) {
+                log.debug("Failed to elect leader for {}", path, e);
+            }
+        }
+        return null;
+    }
+
+    private void electLeaders() {
+        try {
+            candidateMap.entrySet().forEach(entry -> {
+                String path = entry.getKey();
+                Versioned<List<NodeId>> candidates = entry.getValue();
+                // for active topics, check if this node can become a leader (if it isn't already)
+                if (activeTopics.contains(path)) {
+                    lockExecutor.submit(() -> {
+                        Leadership leadership = electLeader(path, candidates.value());
+                        if (leadership != null) {
+                            CompletableFuture<Leadership> future = pendingFutures.remove(path);
+                            if (future != null) {
+                                future.complete(leadership);
+                            }
+                        }
+                    });
+                }
+                // Raise a CANDIDATES_CHANGED event to force refresh local candidate board
+                // and also to update local listeners.
+                // Don't worry about duplicate events as they will be suppressed.
+                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.CANDIDATES_CHANGED,
+                                                      new Leadership(path,
+                                                                     candidates.value(),
+                                                                     candidates.version(),
+                                                                     candidates.creationTime())));
+            });
+        } catch (Exception e) {
+            log.debug("Failure electing leaders", e);
+        }
+    }
+
+    private void onLeadershipEvent(LeadershipEvent leadershipEvent) {
+        log.trace("Leadership Event: time = {} type = {} event = {}",
+                leadershipEvent.time(), leadershipEvent.type(),
+                leadershipEvent);
+
+        Leadership leadershipUpdate = leadershipEvent.subject();
+        LeadershipEvent.Type eventType = leadershipEvent.type();
+        String topic = leadershipUpdate.topic();
+
+        AtomicBoolean updateAccepted = new AtomicBoolean(false);
+        if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+            leaderBoard.compute(topic, (k, currentLeadership) -> {
+                if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+                    updateAccepted.set(true);
+                    return leadershipUpdate;
+                }
+                return currentLeadership;
+            });
+        } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
+            leaderBoard.compute(topic, (k, currentLeadership) -> {
+                if (currentLeadership == null || currentLeadership.epoch() <= leadershipUpdate.epoch()) {
+                    updateAccepted.set(true);
+                    // FIXME: Removing entries from leaderboard is not safe and should be visited.
+                    return null;
+                }
+                return currentLeadership;
+            });
+        } else if (eventType.equals(LeadershipEvent.Type.CANDIDATES_CHANGED)) {
+            candidateBoard.compute(topic, (k, currentInfo) -> {
+                if (currentInfo == null || currentInfo.epoch() < leadershipUpdate.epoch()) {
+                    updateAccepted.set(true);
+                    return leadershipUpdate;
+                }
+                return currentInfo;
+            });
+        } else {
+            throw new IllegalStateException("Unknown event type.");
+        }
+
+        if (updateAccepted.get()) {
+            eventDispatcher.post(leadershipEvent);
+        }
+    }
+
+    private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
+        lockExecutor.schedule(
+                () -> doRunForLeadership(path, future),
+                RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
+                TimeUnit.MILLISECONDS);
+    }
+
+    private void retryWithdraw(String path, CompletableFuture<Void> future) {
+        lockExecutor.schedule(
+                () -> doWithdraw(path, future),
+                RandomUtils.nextInt(WAIT_BEFORE_RETRY_MILLIS),
+                TimeUnit.MILLISECONDS);
+    }
+
+    private void scheduleStaleLeadershipPurge(int afterDelaySec) {
+        if (staleLeadershipPurgeScheduled.compareAndSet(false, true)) {
+            staleLeadershipPurgeExecutor.schedule(
+                    this::purgeStaleLeadership,
+                    afterDelaySec,
+                    TimeUnit.SECONDS);
+        }
+    }
+
+    /**
+     * Purges locks held by inactive nodes and evicts inactive nodes from candidacy.
+     */
+    private void purgeStaleLeadership() {
+        AtomicBoolean rerunPurge = new AtomicBoolean(false);
+        try {
+            staleLeadershipPurgeScheduled.set(false);
+            leaderMap.entrySet()
+                .stream()
+                .filter(e -> clusterService.getState(e.getValue().value()) == INACTIVE)
+                .forEach(entry -> {
+                    String path = entry.getKey();
+                    NodeId nodeId = entry.getValue().value();
+                    try {
+                        leaderMap.computeIf(path, nodeId::equals, (topic, leader) -> null);
+                    } catch (Exception e) {
+                        log.debug("Failed to purge stale lock held by {} for {}", nodeId, path, e);
+                        rerunPurge.set(true);
+                    }
+                });
+
+            candidateMap.entrySet()
+                .forEach(entry -> {
+                    String path = entry.getKey();
+                    Versioned<List<NodeId>> candidates = entry.getValue();
+                    List<NodeId> candidatesList = candidates != null
+                            ? candidates.value() : Collections.emptyList();
+                    List<NodeId> activeCandidatesList =
+                            candidatesList.stream()
+                                          .filter(n -> clusterService.getState(n) == ACTIVE)
+                                          .filter(n -> !localNodeId.equals(n) || activeTopics.contains(path))
+                                          .collect(Collectors.toList());
+                    if (activeCandidatesList.size() < candidatesList.size()) {
+                        Set<NodeId> removedCandidates =
+                                Sets.difference(Sets.newHashSet(candidatesList),
+                                                Sets.newHashSet(activeCandidatesList));
+                        try {
+                            candidateMap.computeIf(path,
+                                        c -> c.stream()
+                                              .filter(n -> clusterService.getState(n) == INACTIVE)
+                                              .count() > 0,
+                                        (topic, c) -> c.stream()
+                                                       .filter(n -> clusterService.getState(n) == ACTIVE)
+                                                       .filter(n -> !localNodeId.equals(n) ||
+                                                                   activeTopics.contains(path))
+                                                       .collect(Collectors.toList()));
+                        } catch (Exception e) {
+                            log.debug("Failed to evict inactive candidates {} from "
+                                    + "candidate list for {}", removedCandidates, path, e);
+                            rerunPurge.set(true);
+                        }
+                    }
+                });
+        } catch (Exception e) {
+            log.debug("Failure purging state leadership.", e);
+            rerunPurge.set(true);
+        }
+
+        if (rerunPurge.get()) {
+            log.debug("Rescheduling stale leadership purge due to errors encountered in previous run");
+            scheduleStaleLeadershipPurge(DELAY_BETWEEN_STALE_LEADERSHIP_PURGE_ATTEMPTS_SEC);
+        }
+    }
+
+    private void refreshLeaderBoard() {
+        try {
+            Map<String, Leadership> newLeaderBoard = Maps.newHashMap();
+            leaderMap.entrySet().forEach(entry -> {
+                String path = entry.getKey();
+                Versioned<NodeId> leader = entry.getValue();
+                Leadership leadership = new Leadership(path,
+                                                       leader.value(),
+                                                       leader.version(),
+                                                       leader.creationTime());
+                newLeaderBoard.put(path, leadership);
+            });
+
+            // first take snapshot of current leader board.
+            Map<String, Leadership> currentLeaderBoard = ImmutableMap.copyOf(leaderBoard);
+
+            MapDifference<String, Leadership> diff = Maps.difference(currentLeaderBoard, newLeaderBoard);
+
+            // evict stale leaders
+            diff.entriesOnlyOnLeft().forEach((path, leadership) -> {
+                log.debug("Evicting {} from leaderboard. It is no longer active leader.", leadership);
+                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, leadership));
+            });
+
+            // add missing leaders
+            diff.entriesOnlyOnRight().forEach((path, leadership) -> {
+                log.debug("Adding {} to leaderboard. It is now the active leader.", leadership);
+                onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership));
+            });
+
+            // add updated leaders
+            diff.entriesDiffering().forEach((path, difference) -> {
+                Leadership current = difference.leftValue();
+                Leadership updated = difference.rightValue();
+                if (current.epoch() < updated.epoch()) {
+                    log.debug("Updated {} in leaderboard.", updated);
+                    onLeadershipEvent(new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, updated));
+                }
+            });
+        } catch (Exception e) {
+            log.debug("Failed to refresh leader board", e);
+        }
+    }
+
+    private class InternalClusterEventListener implements ClusterEventListener {
+
+        @Override
+        public void event(ClusterEvent event) {
+            if (event.type() == Type.INSTANCE_DEACTIVATED || event.type() == Type.INSTANCE_REMOVED) {
+                scheduleStaleLeadershipPurge(0);
+            }
+        }
+    }
+}