LeadershipService: Support for a leaderBoard.

Change-Id: I0dd8267e104466ec65a2c67d23d1c4d923cad266

Change-Id: I6bc548510400eacabb12482f8fba1b7f2abb0604
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java b/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java
index c370ab5..dbf36a5 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java
@@ -11,12 +11,12 @@
 
     private final String topic;
     private final ControllerNode leader;
-    private final long term;
+    private final long epoch;
 
-    public Leadership(String topic, ControllerNode leader, long term) {
+    public Leadership(String topic, ControllerNode leader, long epoch) {
         this.topic = topic;
         this.leader = leader;
-        this.term = term;
+        this.epoch = epoch;
     }
 
     /**
@@ -36,16 +36,16 @@
     }
 
     /**
-     * The term number associated with this leadership.
-     * @return leadership term
+     * The epoch when the leadership was assumed.
+     * @return leadership epoch
      */
-    public long term() {
-        return term;
+    public long epoch() {
+        return epoch;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(topic, leader, term);
+        return Objects.hash(topic, leader, epoch);
     }
 
     @Override
@@ -53,7 +53,7 @@
         return MoreObjects.toStringHelper(this.getClass())
             .add("topic", topic)
             .add("leader", leader)
-            .add("term", term)
+            .add("epoch", epoch)
             .toString();
     }
 }
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
index c9e336b..e70cc1f 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
@@ -17,14 +17,21 @@
 
 /**
  * Service for leader election.
- * Leadership contents are organized around topics. ONOS instance can join the
- * leadership race for a topic or withdraw from a race it has previously joined
- * Once in the race, the instance can get asynchronously notified
- * of leadership election results.
+ * Leadership contests are organized around topics. A instance can join the
+ * leadership race for a topic or withdraw from a race it has previously joined.
+ * Listeners can be added to receive notifications asynchronously for various
+ * leadership contests.
  */
 public interface LeadershipService {
 
     /**
+     * Gets the most recent leader for the topic.
+     * @param path topic
+     * @return node who is the leader, null if so such topic exists.
+     */
+    ControllerNode getLeader(String path);
+
+    /**
      * Joins the leadership contest.
      * @param path topic for which this controller node wishes to be a leader.
      */
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
index cef7752..01682d1 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
@@ -76,6 +76,15 @@
     boolean isLocked();
 
     /**
+     * Returns the epoch for this lock.
+     * If this lock is currently locked i.e. isLocked() returns true, epoch signifies the logical time
+     * when the lock was acquired. The concept of epoch lets one come up with a global ordering for all
+     * lock acquisition events
+     * @return epoch
+     */
+     long epoch();
+
+    /**
      * Releases the lock.
      */
     void unlock();
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
index 31bc47f..38bb1a3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
@@ -5,6 +5,7 @@
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.Executors;
@@ -23,9 +24,16 @@
 import org.onlab.onos.cluster.LeadershipEvent;
 import org.onlab.onos.cluster.LeadershipEventListener;
 import org.onlab.onos.cluster.LeadershipService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
 import org.onlab.onos.store.service.Lock;
 import org.onlab.onos.store.service.LockService;
 import org.onlab.onos.store.service.impl.DistributedLockManager;
+import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
 import com.google.common.collect.Maps;
@@ -45,32 +53,83 @@
     private static final int TERM_DURATION_MS =
             DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
 
+    // Time to wait before retrying leadership after
+    // a unexpected error.
+    private static final int WAIT_BEFORE_RETRY_MS = 2000;
+
     // TODO: Appropriate Thread pool sizing.
     private static final ScheduledExecutorService THREAD_POOL =
             Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
 
+    private static final MessageSubject LEADERSHIP_UPDATES =
+            new MessageSubject("leadership-contest-updates");
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private LockService lockService;
 
-    private Map<String, Lock> openContests = Maps.newHashMap();
-    private Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
+    private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
+
+    private final Map<String, Lock> openContests = Maps.newHashMap();
+    private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
     private ControllerNode localNode;
 
+    private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
+    private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
+
+    public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.API)
+                    .build()
+                    .populate(1);
+        }
+    };
+
     @Activate
     public void activate() {
         localNode = clusterService.getLocalNode();
+
+        addListener(peerAdvertiser);
+        addListener(leaderBoardUpdater);
+
+        clusterCommunicator.addSubscriber(
+                LEADERSHIP_UPDATES,
+                new PeerAdvertisementHandler());
+
         log.info("Started.");
     }
 
     @Deactivate
     public void deactivate() {
+        removeListener(peerAdvertiser);
+        removeListener(leaderBoardUpdater);
+
+        clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
+
         THREAD_POOL.shutdown();
+
         log.info("Stopped.");
     }
 
+
+    @Override
+    public ControllerNode getLeader(String path) {
+        synchronized (leaderBoard) {
+            Leadership leadership = leaderBoard.get(path);
+            if (leadership != null) {
+                return leadership.leader();
+            }
+        }
+        return null;
+    }
+
     @Override
     public void runForLeadership(String path) {
         checkArgument(path != null);
@@ -94,8 +153,7 @@
             notifyListeners(
                     new LeadershipEvent(
                             LeadershipEvent.Type.LEADER_BOOTED,
-                            new Leadership(lock.path(), localNode, 0)));
-                            // FIXME: Should set the correct term information.
+                            new Leadership(lock.path(), localNode, lock.epoch())));
         }
     }
 
@@ -123,26 +181,31 @@
         lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
             if (error == null) {
                 THREAD_POOL.schedule(
-                        new RelectionTask(lock),
+                        new ReelectionTask(lock),
                         TERM_DURATION_MS / 2,
                         TimeUnit.MILLISECONDS);
                 notifyListeners(
                         new LeadershipEvent(
                                 LeadershipEvent.Type.LEADER_ELECTED,
-                                new Leadership(lock.path(), localNode, 0)));
+                                new Leadership(lock.path(), localNode, lock.epoch())));
+                return;
             } else {
-                log.error("Failed to acquire lock for {}", path, error);
-                // retry
-                tryAcquireLeadership(path);
+                log.warn("Failed to acquire lock for {}. Will retry in {} sec", path, WAIT_BEFORE_RETRY_MS, error);
+                try {
+                    Thread.sleep(WAIT_BEFORE_RETRY_MS);
+                    tryAcquireLeadership(path);
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                }
             }
         });
     }
 
-    private class RelectionTask implements Runnable {
+    private class ReelectionTask implements Runnable {
 
         private final Lock lock;
 
-        public RelectionTask(Lock lock) {
+        public ReelectionTask(Lock lock) {
            this.lock = lock;
         }
 
@@ -152,17 +215,69 @@
                 notifyListeners(
                         new LeadershipEvent(
                                 LeadershipEvent.Type.LEADER_REELECTED,
-                                new Leadership(lock.path(), localNode, 0)));
+                                new Leadership(lock.path(), localNode, lock.epoch())));
                 THREAD_POOL.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
             } else {
                 if (openContests.containsKey(lock.path())) {
                     notifyListeners(
                             new LeadershipEvent(
                                     LeadershipEvent.Type.LEADER_BOOTED,
-                                    new Leadership(lock.path(), localNode, 0)));
+                                    new Leadership(lock.path(), localNode, lock.epoch())));
                     tryAcquireLeadership(lock.path());
                 }
             }
         }
     }
+
+    private class PeerAdvertiser implements LeadershipEventListener {
+        @Override
+        public void event(LeadershipEvent event) {
+            // publish events originating on this host.
+            if (event.subject().leader().equals(localNode)) {
+                try {
+                    clusterCommunicator.broadcast(
+                            new ClusterMessage(
+                                    localNode.id(),
+                                    LEADERSHIP_UPDATES,
+                                    SERIALIZER.encode(event)));
+                } catch (IOException e) {
+                    log.error("Failed to broadcast leadership update message", e);
+                }
+            }
+        }
+    }
+
+    private class PeerAdvertisementHandler implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            LeadershipEvent event = SERIALIZER.decode(message.payload());
+            log.debug("Received {} from {}", event, message.sender());
+            notifyListeners(event);
+        }
+    }
+
+    private class LeaderBoardUpdater implements LeadershipEventListener {
+        @Override
+        public void event(LeadershipEvent event) {
+            Leadership leadershipUpdate = event.subject();
+            synchronized (leaderBoard) {
+                Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
+                switch (event.type()) {
+                case LEADER_ELECTED:
+                case LEADER_REELECTED:
+                        if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+                            leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
+                        }
+                    break;
+                case LEADER_BOOTED:
+                    if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
+                        leaderBoard.remove(leadershipUpdate.topic());
+                    }
+                    break;
+                default:
+                    break;
+                }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index 74bff8b..81195b4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -1,8 +1,10 @@
 package org.onlab.onos.store.service.impl;
 
+import static com.google.common.base.Verify.verify;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -15,6 +17,7 @@
 import org.onlab.onos.store.service.DatabaseException;
 import org.onlab.onos.store.service.DatabaseService;
 import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.VersionedValue;
 import org.slf4j.Logger;
 
 /**
@@ -29,6 +32,7 @@
     private final String path;
     private DateTime lockExpirationTime;
     private AtomicBoolean isLocked = new AtomicBoolean(false);
+    private volatile long epoch = 0;
     private byte[] lockId;
 
     public DistributedLock(
@@ -74,6 +78,10 @@
                 DistributedLockManager.ONOS_LOCK_TABLE_NAME,
                 path,
                 lockId)) {
+            VersionedValue vv =
+                    databaseService.get(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path);
+            verify(Arrays.equals(vv.value(), lockId));
+            epoch = vv.version();
             isLocked.set(true);
             lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
             return true;
@@ -121,6 +129,11 @@
     }
 
     @Override
+    public long epoch() {
+        return epoch;
+    }
+
+    @Override
     public void unlock() {
         if (!isLocked()) {
             return;
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
index 7043a61..fea74bf 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoNamespaces.java
@@ -25,6 +25,8 @@
 
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.Leadership;
+import org.onlab.onos.cluster.LeadershipEvent;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.cluster.RoleInfo;
 import org.onlab.onos.core.DefaultApplicationId;
@@ -166,6 +168,9 @@
                     Link.Type.class,
                     Link.State.class,
                     Timestamp.class,
+                    Leadership.class,
+                    LeadershipEvent.class,
+                    LeadershipEvent.Type.class,
                     HostId.class,
                     HostDescription.class,
                     DefaultHostDescription.class,