Initial cut at Leadership Manager
Change-Id: I658c6fca3dc6f686e0f7facc9e7b443679ebae1e
Change-Id: I293906add41ff4310e3584847d806345e0312703
Change-Id: I7fb13a72ba4aef10d7c2262b96e0df64efecfcef
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java b/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java
new file mode 100644
index 0000000..c370ab5
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/cluster/Leadership.java
@@ -0,0 +1,59 @@
+package org.onlab.onos.cluster;
+
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Abstract leadership concept.
+ */
+public class Leadership {
+
+ private final String topic;
+ private final ControllerNode leader;
+ private final long term;
+
+ public Leadership(String topic, ControllerNode leader, long term) {
+ this.topic = topic;
+ this.leader = leader;
+ this.term = term;
+ }
+
+ /**
+ * The topic for which this leadership applies.
+ * @return leadership topic.
+ */
+ public String topic() {
+ return topic;
+ }
+
+ /**
+ * The leader for this topic.
+ * @return leader node.
+ */
+ public ControllerNode leader() {
+ return leader;
+ }
+
+ /**
+ * The term number associated with this leadership.
+ * @return leadership term
+ */
+ public long term() {
+ return term;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(topic, leader, term);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this.getClass())
+ .add("topic", topic)
+ .add("leader", leader)
+ .add("term", term)
+ .toString();
+ }
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEvent.java
index c285eba..246f0fc 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEvent.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEvent.java
@@ -15,44 +15,73 @@
*/
package org.onlab.onos.cluster;
+import java.util.Objects;
+
import org.onlab.onos.event.AbstractEvent;
+import com.google.common.base.MoreObjects;
+
/**
* Describes leadership-related event.
*/
-public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, ControllerNode> {
+public class LeadershipEvent extends AbstractEvent<LeadershipEvent.Type, Leadership> {
/**
* Type of leadership-related events.
*/
public enum Type {
/**
- * Signifies that the leader has changed. The event subject is the
+ * Signifies that the leader has been elected. The event subject is the
* new leader.
*/
- LEADER_CHANGED
+ LEADER_ELECTED,
+
+ /**
+ * Signifies that the leader has been re-elected. The event subject is the
+ * leader.
+ */
+ LEADER_REELECTED,
+
+ /**
+ * Signifies that the leader has been booted and lost leadership. The event subject is the
+ * former leader.
+ */
+ LEADER_BOOTED
}
/**
* Creates an event of a given type and for the specified instance and the
* current time.
*
- * @param type leadership event type
- * @param instance cluster device subject
+ * @param type leadership event type
+ * @param leadership event subject
*/
- public LeadershipEvent(Type type, ControllerNode instance) {
- super(type, instance);
+ public LeadershipEvent(Type type, Leadership leadership) {
+ super(type, leadership);
}
/**
- * Creates an event of a given type and for the specified device and time.
+ * Creates an event of a given type and for the specified subject and time.
*
- * @param type device event type
- * @param instance event device subject
+ * @param type leadership event type
+ * @param leadership event subject
* @param time occurrence time
*/
- public LeadershipEvent(Type type, ControllerNode instance, long time) {
- super(type, instance, time);
+ public LeadershipEvent(Type type, Leadership leadership, long time) {
+ super(type, leadership, time);
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(type(), subject(), time());
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this.getClass())
+ .add("type", type())
+ .add("subject", subject())
+ .add("time", time())
+ .toString();
+ }
}
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEventListener.java b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEventListener.java
index 5de5596..8dbd162 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEventListener.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipEventListener.java
@@ -21,4 +21,4 @@
* Entity capable of receiving device leadership-related events.
*/
public interface LeadershipEventListener extends EventListener<LeadershipEvent> {
-}
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
index 240fb46..c9e336b 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/LeadershipService.java
@@ -16,29 +16,35 @@
package org.onlab.onos.cluster;
/**
- * Service for obtaining information about the leader election.
+ * Service for leader election.
+ * Leadership contents are organized around topics. ONOS instance can join the
+ * leadership race for a topic or withdraw from a race it has previously joined
+ * Once in the race, the instance can get asynchronously notified
+ * of leadership election results.
*/
public interface LeadershipService {
/**
- * Returns the current leader controller node.
- *
- * @return current leader controller node
+ * Joins the leadership contest.
+ * @param path topic for which this controller node wishes to be a leader.
*/
- ControllerNode getLeader();
+ void runForLeadership(String path);
/**
- * Adds the specified leadership event listener.
- *
- * @param listener the leadership listener
+ * Withdraws from a leadership contest.
+ * @param path topic for which this controller node no longer wishes to be a leader.
+ */
+ void withdraw(String path);
+
+ /**
+ * Registers a event listener to be notified of leadership events.
+ * @param listener listener that will asynchronously notified of leadership events.
*/
void addListener(LeadershipEventListener listener);
/**
- * Removes the specified leadership event listener.
- *
- * @param listener the leadership listener
+ * Unregisters a event listener for leadership events.
+ * @param listener listener to be removed.
*/
void removeListener(LeadershipEventListener listener);
-
-}
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
index 2f5f112..cef7752 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
@@ -1,6 +1,6 @@
package org.onlab.onos.store.service;
-import java.util.concurrent.Future;
+import java.util.concurrent.CompletableFuture;
/**
* A lock is a tool for controlling access to a shared resource by multiple processes.
@@ -43,7 +43,7 @@
* will be reserved before it becomes available for others.
* @return Future that can be used for blocking until lock is acquired.
*/
- Future<Void> lockAsync(int leaseDurationMillis);
+ CompletableFuture<Void> lockAsync(int leaseDurationMillis);
/**
* Acquires the lock only if it is free at the time of invocation.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
new file mode 100644
index 0000000..31bc47f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
@@ -0,0 +1,168 @@
+package org.onlab.onos.store.cluster.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Verify.verifyNotNull;
+import static org.onlab.util.Tools.namedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.Leadership;
+import org.onlab.onos.cluster.LeadershipEvent;
+import org.onlab.onos.cluster.LeadershipEventListener;
+import org.onlab.onos.cluster.LeadershipService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.LockService;
+import org.onlab.onos.store.service.impl.DistributedLockManager;
+import org.slf4j.Logger;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Distributed implementation of LeadershipService that is based on the primitives exposed by
+ * LockService.
+ */
+@Component(immediate = true)
+@Service
+public class LeadershipManager implements LeadershipService {
+
+ private final Logger log = getLogger(getClass());
+
+ // TODO: Remove this dependency
+ private static final int TERM_DURATION_MS =
+ DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
+
+ // TODO: Appropriate Thread pool sizing.
+ private static final ScheduledExecutorService THREAD_POOL =
+ Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private LockService lockService;
+
+ private Map<String, Lock> openContests = Maps.newHashMap();
+ private Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
+ private ControllerNode localNode;
+
+ @Activate
+ public void activate() {
+ localNode = clusterService.getLocalNode();
+ log.info("Started.");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ THREAD_POOL.shutdown();
+ log.info("Stopped.");
+ }
+
+ @Override
+ public void runForLeadership(String path) {
+ checkArgument(path != null);
+ if (openContests.containsKey(path)) {
+ log.info("Already in the leadership contest for {}", path);
+ return;
+ } else {
+ Lock lock = lockService.create(path);
+ openContests.put(path, lock);
+ tryAcquireLeadership(path);
+ }
+ }
+
+ @Override
+ public void withdraw(String path) {
+ checkArgument(path != null);
+ Lock lock = openContests.remove(path);
+
+ if (lock != null && lock.isLocked()) {
+ lock.unlock();
+ notifyListeners(
+ new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_BOOTED,
+ new Leadership(lock.path(), localNode, 0)));
+ // FIXME: Should set the correct term information.
+ }
+ }
+
+ @Override
+ public void addListener(LeadershipEventListener listener) {
+ checkArgument(listener != null);
+ listeners.add(listener);
+ }
+
+ @Override
+ public void removeListener(LeadershipEventListener listener) {
+ checkArgument(listener != null);
+ listeners.remove(listener);
+ }
+
+ private void notifyListeners(LeadershipEvent event) {
+ for (LeadershipEventListener listener : listeners) {
+ listener.event(event);
+ }
+ }
+
+ private void tryAcquireLeadership(String path) {
+ Lock lock = openContests.get(path);
+ verifyNotNull(lock, "Lock should not be null");
+ lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
+ if (error == null) {
+ THREAD_POOL.schedule(
+ new RelectionTask(lock),
+ TERM_DURATION_MS / 2,
+ TimeUnit.MILLISECONDS);
+ notifyListeners(
+ new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_ELECTED,
+ new Leadership(lock.path(), localNode, 0)));
+ } else {
+ log.error("Failed to acquire lock for {}", path, error);
+ // retry
+ tryAcquireLeadership(path);
+ }
+ });
+ }
+
+ private class RelectionTask implements Runnable {
+
+ private final Lock lock;
+
+ public RelectionTask(Lock lock) {
+ this.lock = lock;
+ }
+
+ @Override
+ public void run() {
+ if (lock.extendExpiration(TERM_DURATION_MS)) {
+ notifyListeners(
+ new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_REELECTED,
+ new Leadership(lock.path(), localNode, 0)));
+ THREAD_POOL.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
+ } else {
+ if (openContests.containsKey(lock.path())) {
+ notifyListeners(
+ new LeadershipEvent(
+ LeadershipEvent.Type.LEADER_BOOTED,
+ new Leadership(lock.path(), localNode, 0)));
+ tryAcquireLeadership(lock.path());
+ }
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index 7575ed9..74bff8b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -6,7 +6,6 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -62,7 +61,7 @@
}
@Override
- public Future<Void> lockAsync(int leaseDurationMillis) {
+ public CompletableFuture<Void> lockAsync(int leaseDurationMillis) {
if (isLocked() || tryLock(leaseDurationMillis)) {
return CompletableFuture.<Void>completedFuture(null);
}