LeadershipService: Support for a leaderBoard.

Change-Id: I0dd8267e104466ec65a2c67d23d1c4d923cad266

Change-Id: I6bc548510400eacabb12482f8fba1b7f2abb0604
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
index 31bc47f..38bb1a3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
@@ -5,6 +5,7 @@
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -23,9 +24,16 @@
 import org.onlab.onos.cluster.LeadershipEvent;
 import org.onlab.onos.cluster.LeadershipEventListener;
 import org.onlab.onos.cluster.LeadershipService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
 import org.onlab.onos.store.service.Lock;
 import org.onlab.onos.store.service.LockService;
 import org.onlab.onos.store.service.impl.DistributedLockManager;
+import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
 import com.google.common.collect.Maps;
@@ -45,32 +53,83 @@
     private static final int TERM_DURATION_MS =
             DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
 
+    // Time to wait before retrying leadership after
+    // a unexpected error.
+    private static final int WAIT_BEFORE_RETRY_MS = 2000;
+
     // TODO: Appropriate Thread pool sizing.
     private static final ScheduledExecutorService THREAD_POOL =
             Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
 
+    private static final MessageSubject LEADERSHIP_UPDATES =
+            new MessageSubject("leadership-contest-updates");
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private LockService lockService;
 
-    private Map<String, Lock> openContests = Maps.newHashMap();
-    private Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
+    private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
+
+    private final Map<String, Lock> openContests = Maps.newHashMap();
+    private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
     private ControllerNode localNode;
 
+    private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
+    private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
+
+    public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.API)
+                    .build()
+                    .populate(1);
+        }
+    };
+
     @Activate
     public void activate() {
         localNode = clusterService.getLocalNode();
+
+        addListener(peerAdvertiser);
+        addListener(leaderBoardUpdater);
+
+        clusterCommunicator.addSubscriber(
+                LEADERSHIP_UPDATES,
+                new PeerAdvertisementHandler());
+
         log.info("Started.");
     }
 
     @Deactivate
     public void deactivate() {
+        removeListener(peerAdvertiser);
+        removeListener(leaderBoardUpdater);
+
+        clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
+
         THREAD_POOL.shutdown();
+
         log.info("Stopped.");
     }
 
+
+    @Override
+    public ControllerNode getLeader(String path) {
+        synchronized (leaderBoard) {
+            Leadership leadership = leaderBoard.get(path);
+            if (leadership != null) {
+                return leadership.leader();
+            }
+        }
+        return null;
+    }
+
     @Override
     public void runForLeadership(String path) {
         checkArgument(path != null);
@@ -94,8 +153,7 @@
             notifyListeners(
                     new LeadershipEvent(
                             LeadershipEvent.Type.LEADER_BOOTED,
-                            new Leadership(lock.path(), localNode, 0)));
-                            // FIXME: Should set the correct term information.
+                            new Leadership(lock.path(), localNode, lock.epoch())));
         }
     }
 
@@ -123,26 +181,31 @@
         lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
             if (error == null) {
                 THREAD_POOL.schedule(
-                        new RelectionTask(lock),
+                        new ReelectionTask(lock),
                         TERM_DURATION_MS / 2,
                         TimeUnit.MILLISECONDS);
                 notifyListeners(
                         new LeadershipEvent(
                                 LeadershipEvent.Type.LEADER_ELECTED,
-                                new Leadership(lock.path(), localNode, 0)));
+                                new Leadership(lock.path(), localNode, lock.epoch())));
+                return;
             } else {
-                log.error("Failed to acquire lock for {}", path, error);
-                // retry
-                tryAcquireLeadership(path);
+                log.warn("Failed to acquire lock for {}. Will retry in {} sec", path, WAIT_BEFORE_RETRY_MS, error);
+                try {
+                    Thread.sleep(WAIT_BEFORE_RETRY_MS);
+                    tryAcquireLeadership(path);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
             }
         });
     }
 
-    private class RelectionTask implements Runnable {
+    private class ReelectionTask implements Runnable {
 
         private final Lock lock;
 
-        public RelectionTask(Lock lock) {
+        public ReelectionTask(Lock lock) {
            this.lock = lock;
         }
 
@@ -152,17 +215,69 @@
                 notifyListeners(
                         new LeadershipEvent(
                                 LeadershipEvent.Type.LEADER_REELECTED,
-                                new Leadership(lock.path(), localNode, 0)));
+                                new Leadership(lock.path(), localNode, lock.epoch())));
                 THREAD_POOL.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
             } else {
                 if (openContests.containsKey(lock.path())) {
                     notifyListeners(
                             new LeadershipEvent(
                                     LeadershipEvent.Type.LEADER_BOOTED,
-                                    new Leadership(lock.path(), localNode, 0)));
+                                    new Leadership(lock.path(), localNode, lock.epoch())));
                     tryAcquireLeadership(lock.path());
                 }
             }
         }
     }
+
+    private class PeerAdvertiser implements LeadershipEventListener {
+        @Override
+        public void event(LeadershipEvent event) {
+            // publish events originating on this host.
+            if (event.subject().leader().equals(localNode)) {
+                try {
+                    clusterCommunicator.broadcast(
+                            new ClusterMessage(
+                                    localNode.id(),
+                                    LEADERSHIP_UPDATES,
+                                    SERIALIZER.encode(event)));
+                } catch (IOException e) {
+                    log.error("Failed to broadcast leadership update message", e);
+                }
+            }
+        }
+    }
+
+    private class PeerAdvertisementHandler implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            LeadershipEvent event = SERIALIZER.decode(message.payload());
+            log.debug("Received {} from {}", event, message.sender());
+            notifyListeners(event);
+        }
+    }
+
+    private class LeaderBoardUpdater implements LeadershipEventListener {
+        @Override
+        public void event(LeadershipEvent event) {
+            Leadership leadershipUpdate = event.subject();
+            synchronized (leaderBoard) {
+                Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
+                switch (event.type()) {
+                case LEADER_ELECTED:
+                case LEADER_REELECTED:
+                        if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+                            leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
+                        }
+                    break;
+                case LEADER_BOOTED:
+                    if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
+                        leaderBoard.remove(leadershipUpdate.topic());
+                    }
+                    break;
+                default:
+                    break;
+                }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index 74bff8b..81195b4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -1,8 +1,10 @@
 package org.onlab.onos.store.service.impl;
 
+import static com.google.common.base.Verify.verify;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -15,6 +17,7 @@
 import org.onlab.onos.store.service.DatabaseException;
 import org.onlab.onos.store.service.DatabaseService;
 import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.VersionedValue;
 import org.slf4j.Logger;
 
 /**
@@ -29,6 +32,7 @@
     private final String path;
     private DateTime lockExpirationTime;
     private AtomicBoolean isLocked = new AtomicBoolean(false);
+    private volatile long epoch = 0;
     private byte[] lockId;
 
     public DistributedLock(
@@ -74,6 +78,10 @@
                 DistributedLockManager.ONOS_LOCK_TABLE_NAME,
                 path,
                 lockId)) {
+            VersionedValue vv =
+                    databaseService.get(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path);
+            verify(Arrays.equals(vv.value(), lockId));
+            epoch = vv.version();
             isLocked.set(true);
             lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
             return true;
@@ -121,6 +129,11 @@
     }
 
     @Override
+    public long epoch() {
+        return epoch;
+    }
+
+    @Override
     public void unlock() {
         if (!isLocked()) {
             return;