Initial cut at Leadership Manager

Change-Id: I658c6fca3dc6f686e0f7facc9e7b443679ebae1e

Change-Id: I293906add41ff4310e3584847d806345e0312703

Change-Id: I7fb13a72ba4aef10d7c2262b96e0df64efecfcef
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java b/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java
new file mode 100644
index 0000000..c370ab5
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java
@@ -0,0 +1,59 @@
+package org.onlab.onos.cluster;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Abstract leadership concept.
+ */
+public class Leadership {
+
+    private final String topic;
+    private final ControllerNode leader;
+    private final long term;
+
+    public Leadership(String topic, ControllerNode leader, long term) {
+        this.topic = topic;
+        this.leader = leader;
+        this.term = term;
+    }
+
+    /**
+     * The topic for which this leadership applies.
+     * @return leadership topic.
+     */
+    public String topic() {
+        return topic;
+    }
+
+    /**
+     * The leader for this topic.
+     * @return leader node.
+     */
+    public ControllerNode leader() {
+        return leader;
+    }
+
+    /**
+     * The term number associated with this leadership.
+     * @return leadership term
+     */
+    public long term() {
+        return term;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(topic, leader, term);
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this.getClass())
+            .add("topic", topic)
+            .add("leader", leader)
+            .add("term", term)
+            .toString();
+    }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEvent.java
index c285eba..246f0fc 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEvent.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEvent.java
@@ -15,44 +15,73 @@
  */
 package org.onlab.onos.cluster;
 
+import java.util.Objects;
+
 import org.onlab.onos.event.AbstractEvent;
 
+import com.google.common.base.MoreObjects;
+
 /**
  * Describes leadership-related event.
  */
-public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, ControllerNode> {
+public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, Leadership> {
 
     /**
      * Type of leadership-related events.
      */
     public enum Type {
         /**
-         * Signifies that the leader has changed. The event subject is the
+         * Signifies that the leader has been elected. The event subject is the
          * new leader.
          */
-        LEADER_CHANGED
+        LEADER_ELECTED,
+
+        /**
+         * Signifies that the leader has been re-elected. The event subject is the
+         * leader.
+         */
+        LEADER_REELECTED,
+
+        /**
+         * Signifies that the leader has been booted and lost leadership. The event subject is the
+         * former leader.
+         */
+        LEADER_BOOTED
     }
 
     /**
      * Creates an event of a given type and for the specified instance and the
      * current time.
      *
-     * @param type     leadership event type
-     * @param instance cluster device subject
+     * @param type       leadership event type
+     * @param leadership event subject
      */
-    public LeadershipEvent(Type type, ControllerNode instance) {
-        super(type, instance);
+    public LeadershipEvent(Type type, Leadership leadership) {
+        super(type, leadership);
     }
 
     /**
-     * Creates an event of a given type and for the specified device and time.
+     * Creates an event of a given type and for the specified subject and time.
      *
-     * @param type     device event type
-     * @param instance event device subject
+     * @param type     leadership event type
+     * @param leadership event subject
      * @param time     occurrence time
      */
-    public LeadershipEvent(Type type, ControllerNode instance, long time) {
-        super(type, instance, time);
+    public LeadershipEvent(Type type, Leadership leadership, long time) {
+        super(type, leadership, time);
     }
 
+    @Override
+    public int hashCode() {
+        return Objects.hash(type(), subject(), time());
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this.getClass())
+            .add("type", type())
+            .add("subject", subject())
+            .add("time", time())
+            .toString();
+    }
 }
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEventListener.java b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEventListener.java
index 5de5596..8dbd162 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEventListener.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEventListener.java
@@ -21,4 +21,4 @@
  * Entity capable of receiving device leadership-related events.
  */
 public interface LeadershipEventListener  extends EventListener<LeadershipEvent> {
-}
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
index 240fb46..c9e336b 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
@@ -16,29 +16,35 @@
 package org.onlab.onos.cluster;
 
 /**
- * Service for obtaining information about the leader election.
+ * Service for leader election.
+ * Leadership contents are organized around topics. ONOS instance can join the
+ * leadership race for a topic or withdraw from a race it has previously joined
+ * Once in the race, the instance can get asynchronously notified
+ * of leadership election results.
  */
 public interface LeadershipService {
 
     /**
-     * Returns the current leader controller node.
-     *
-     * @return current leader controller node
+     * Joins the leadership contest.
+     * @param path topic for which this controller node wishes to be a leader.
      */
-    ControllerNode getLeader();
+    void runForLeadership(String path);
 
     /**
-     * Adds the specified leadership event listener.
-     *
-     * @param listener the leadership listener
+     * Withdraws from a leadership contest.
+     * @param path topic for which this controller node no longer wishes to be a leader.
+     */
+    void withdraw(String path);
+
+    /**
+     * Registers a event listener to be notified of leadership events.
+     * @param listener listener that will asynchronously notified of leadership events.
      */
     void addListener(LeadershipEventListener listener);
 
     /**
-     * Removes the specified leadership event listener.
-     *
-     * @param listener the leadership listener
+     * Unregisters a event listener for leadership events.
+     * @param listener listener to be removed.
      */
     void removeListener(LeadershipEventListener listener);
-
-}
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
index 2f5f112..cef7752 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
@@ -1,6 +1,6 @@
 package org.onlab.onos.store.service;
 
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * A lock is a tool for controlling access to a shared resource by multiple processes.
@@ -43,7 +43,7 @@
      * will be reserved before it becomes available for others.
      * @return Future that can be used for blocking until lock is acquired.
      */
-    Future<Void> lockAsync(int leaseDurationMillis);
+    CompletableFuture<Void> lockAsync(int leaseDurationMillis);
 
     /**
      * Acquires the lock only if it is free at the time of invocation.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
new file mode 100644
index 0000000..31bc47f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
@@ -0,0 +1,168 @@
+package org.onlab.onos.store.cluster.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
+import static org.onlab.util.Tools.namedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.Leadership;
+import org.onlab.onos.cluster.LeadershipEvent;
+import org.onlab.onos.cluster.LeadershipEventListener;
+import org.onlab.onos.cluster.LeadershipService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.LockService;
+import org.onlab.onos.store.service.impl.DistributedLockManager;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Distributed implementation of LeadershipService that is based on the primitives exposed by
+ * LockService.
+ */
+@Component(immediate = true)
+@Service
+public class LeadershipManager implements LeadershipService {
+
+    private final Logger log = getLogger(getClass());
+
+    // TODO: Remove this dependency
+    private static final int TERM_DURATION_MS =
+            DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
+
+    // TODO: Appropriate Thread pool sizing.
+    private static final ScheduledExecutorService THREAD_POOL =
+            Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private LockService lockService;
+
+    private Map<String, Lock> openContests = Maps.newHashMap();
+    private Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
+    private ControllerNode localNode;
+
+    @Activate
+    public void activate() {
+        localNode = clusterService.getLocalNode();
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        THREAD_POOL.shutdown();
+        log.info("Stopped.");
+    }
+
+    @Override
+    public void runForLeadership(String path) {
+        checkArgument(path != null);
+        if (openContests.containsKey(path)) {
+            log.info("Already in the leadership contest for {}", path);
+            return;
+        } else {
+            Lock lock = lockService.create(path);
+            openContests.put(path, lock);
+            tryAcquireLeadership(path);
+        }
+    }
+
+    @Override
+    public void withdraw(String path) {
+        checkArgument(path != null);
+        Lock lock = openContests.remove(path);
+
+        if (lock != null && lock.isLocked()) {
+            lock.unlock();
+            notifyListeners(
+                    new LeadershipEvent(
+                            LeadershipEvent.Type.LEADER_BOOTED,
+                            new Leadership(lock.path(), localNode, 0)));
+                            // FIXME: Should set the correct term information.
+        }
+    }
+
+    @Override
+    public void addListener(LeadershipEventListener listener) {
+        checkArgument(listener != null);
+        listeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(LeadershipEventListener listener) {
+        checkArgument(listener != null);
+        listeners.remove(listener);
+    }
+
+    private void notifyListeners(LeadershipEvent event) {
+        for (LeadershipEventListener listener : listeners) {
+            listener.event(event);
+        }
+    }
+
+    private void tryAcquireLeadership(String path) {
+        Lock lock = openContests.get(path);
+        verifyNotNull(lock, "Lock should not be null");
+        lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
+            if (error == null) {
+                THREAD_POOL.schedule(
+                        new RelectionTask(lock),
+                        TERM_DURATION_MS / 2,
+                        TimeUnit.MILLISECONDS);
+                notifyListeners(
+                        new LeadershipEvent(
+                                LeadershipEvent.Type.LEADER_ELECTED,
+                                new Leadership(lock.path(), localNode, 0)));
+            } else {
+                log.error("Failed to acquire lock for {}", path, error);
+                // retry
+                tryAcquireLeadership(path);
+            }
+        });
+    }
+
+    private class RelectionTask implements Runnable {
+
+        private final Lock lock;
+
+        public RelectionTask(Lock lock) {
+           this.lock = lock;
+        }
+
+        @Override
+        public void run() {
+            if (lock.extendExpiration(TERM_DURATION_MS)) {
+                notifyListeners(
+                        new LeadershipEvent(
+                                LeadershipEvent.Type.LEADER_REELECTED,
+                                new Leadership(lock.path(), localNode, 0)));
+                THREAD_POOL.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
+            } else {
+                if (openContests.containsKey(lock.path())) {
+                    notifyListeners(
+                            new LeadershipEvent(
+                                    LeadershipEvent.Type.LEADER_BOOTED,
+                                    new Leadership(lock.path(), localNode, 0)));
+                    tryAcquireLeadership(lock.path());
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index 7575ed9..74bff8b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -6,7 +6,6 @@
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +61,7 @@
     }
 
     @Override
-    public Future<Void> lockAsync(int leaseDurationMillis) {
+    public CompletableFuture<Void> lockAsync(int leaseDurationMillis) {
         if (isLocked() || tryLock(leaseDurationMillis)) {
             return CompletableFuture.<Void>completedFuture(null);
         }