Initial cut at Leadership Manager

Change-Id: I658c6fca3dc6f686e0f7facc9e7b443679ebae1e

Change-Id: I293906add41ff4310e3584847d806345e0312703

Change-Id: I7fb13a72ba4aef10d7c2262b96e0df64efecfcef
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
new file mode 100644
index 0000000..31bc47f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
@@ -0,0 +1,168 @@
+package org.onlab.onos.store.cluster.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
+import static org.onlab.util.Tools.namedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.Leadership;
+import org.onlab.onos.cluster.LeadershipEvent;
+import org.onlab.onos.cluster.LeadershipEventListener;
+import org.onlab.onos.cluster.LeadershipService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.LockService;
+import org.onlab.onos.store.service.impl.DistributedLockManager;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Distributed implementation of LeadershipService that is based on the primitives exposed by
+ * LockService.
+ */
+@Component(immediate = true)
+@Service
+public class LeadershipManager implements LeadershipService {
+
+    private final Logger log = getLogger(getClass());
+
+    // TODO: Remove this dependency
+    private static final int TERM_DURATION_MS =
+            DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
+
+    // TODO: Appropriate Thread pool sizing.
+    private static final ScheduledExecutorService THREAD_POOL =
+            Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private LockService lockService;
+
+    private Map<String, Lock> openContests = Maps.newHashMap();
+    private Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
+    private ControllerNode localNode;
+
+    @Activate
+    public void activate() {
+        localNode = clusterService.getLocalNode();
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        THREAD_POOL.shutdown();
+        log.info("Stopped.");
+    }
+
+    @Override
+    public void runForLeadership(String path) {
+        checkArgument(path != null);
+        if (openContests.containsKey(path)) {
+            log.info("Already in the leadership contest for {}", path);
+            return;
+        } else {
+            Lock lock = lockService.create(path);
+            openContests.put(path, lock);
+            tryAcquireLeadership(path);
+        }
+    }
+
+    @Override
+    public void withdraw(String path) {
+        checkArgument(path != null);
+        Lock lock = openContests.remove(path);
+
+        if (lock != null && lock.isLocked()) {
+            lock.unlock();
+            notifyListeners(
+                    new LeadershipEvent(
+                            LeadershipEvent.Type.LEADER_BOOTED,
+                            new Leadership(lock.path(), localNode, 0)));
+                            // FIXME: Should set the correct term information.
+        }
+    }
+
+    @Override
+    public void addListener(LeadershipEventListener listener) {
+        checkArgument(listener != null);
+        listeners.add(listener);
+    }
+
+    @Override
+    public void removeListener(LeadershipEventListener listener) {
+        checkArgument(listener != null);
+        listeners.remove(listener);
+    }
+
+    private void notifyListeners(LeadershipEvent event) {
+        for (LeadershipEventListener listener : listeners) {
+            listener.event(event);
+        }
+    }
+
+    private void tryAcquireLeadership(String path) {
+        Lock lock = openContests.get(path);
+        verifyNotNull(lock, "Lock should not be null");
+        lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
+            if (error == null) {
+                THREAD_POOL.schedule(
+                        new RelectionTask(lock),
+                        TERM_DURATION_MS / 2,
+                        TimeUnit.MILLISECONDS);
+                notifyListeners(
+                        new LeadershipEvent(
+                                LeadershipEvent.Type.LEADER_ELECTED,
+                                new Leadership(lock.path(), localNode, 0)));
+            } else {
+                log.error("Failed to acquire lock for {}", path, error);
+                // retry
+                tryAcquireLeadership(path);
+            }
+        });
+    }
+
+    private class RelectionTask implements Runnable {
+
+        private final Lock lock;
+
+        public RelectionTask(Lock lock) {
+           this.lock = lock;
+        }
+
+        @Override
+        public void run() {
+            if (lock.extendExpiration(TERM_DURATION_MS)) {
+                notifyListeners(
+                        new LeadershipEvent(
+                                LeadershipEvent.Type.LEADER_REELECTED,
+                                new Leadership(lock.path(), localNode, 0)));
+                THREAD_POOL.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
+            } else {
+                if (openContests.containsKey(lock.path())) {
+                    notifyListeners(
+                            new LeadershipEvent(
+                                    LeadershipEvent.Type.LEADER_BOOTED,
+                                    new Leadership(lock.path(), localNode, 0)));
+                    tryAcquireLeadership(lock.path());
+                }
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index 7575ed9..74bff8b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -6,7 +6,6 @@
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +61,7 @@
     }
 
     @Override
-    public Future<Void> lockAsync(int leaseDurationMillis) {
+    public CompletableFuture<Void> lockAsync(int leaseDurationMillis) {
         if (isLocked() || tryLock(leaseDurationMillis)) {
             return CompletableFuture.<Void>completedFuture(null);
         }