LeadershipService: Support for a leaderBoard.
Change-Id: I0dd8267e104466ec65a2c67d23d1c4d923cad266
Change-Id: I6bc548510400eacabb12482f8fba1b7f2abb0604
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
index 31bc47f..38bb1a3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
@@ -5,6 +5,7 @@
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
+import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
@@ -23,9 +24,16 @@
import org.onlab.onos.cluster.LeadershipEvent;
import org.onlab.onos.cluster.LeadershipEventListener;
import org.onlab.onos.cluster.LeadershipService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.KryoNamespaces;
+import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.onos.store.service.Lock;
import org.onlab.onos.store.service.LockService;
import org.onlab.onos.store.service.impl.DistributedLockManager;
+import org.onlab.util.KryoNamespace;
import org.slf4j.Logger;
import com.google.common.collect.Maps;
@@ -45,32 +53,83 @@
private static final int TERM_DURATION_MS =
DistributedLockManager.DEAD_LOCK_TIMEOUT_MS;
+ // Time to wait before retrying leadership after
+ // a unexpected error.
+ private static final int WAIT_BEFORE_RETRY_MS = 2000;
+
// TODO: Appropriate Thread pool sizing.
private static final ScheduledExecutorService THREAD_POOL =
Executors.newScheduledThreadPool(25, namedThreads("leadership-manager-%d"));
+ private static final MessageSubject LEADERSHIP_UPDATES =
+ new MessageSubject("leadership-contest-updates");
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private LockService lockService;
- private Map<String, Lock> openContests = Maps.newHashMap();
- private Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
+ private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
+
+ private final Map<String, Lock> openContests = Maps.newHashMap();
+ private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
private ControllerNode localNode;
+ private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
+ private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
+
+ public static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .build()
+ .populate(1);
+ }
+ };
+
@Activate
public void activate() {
localNode = clusterService.getLocalNode();
+
+ addListener(peerAdvertiser);
+ addListener(leaderBoardUpdater);
+
+ clusterCommunicator.addSubscriber(
+ LEADERSHIP_UPDATES,
+ new PeerAdvertisementHandler());
+
log.info("Started.");
}
@Deactivate
public void deactivate() {
+ removeListener(peerAdvertiser);
+ removeListener(leaderBoardUpdater);
+
+ clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
+
THREAD_POOL.shutdown();
+
log.info("Stopped.");
}
+
+ @Override
+ public ControllerNode getLeader(String path) {
+ synchronized (leaderBoard) {
+ Leadership leadership = leaderBoard.get(path);
+ if (leadership != null) {
+ return leadership.leader();
+ }
+ }
+ return null;
+ }
+
@Override
public void runForLeadership(String path) {
checkArgument(path != null);
@@ -94,8 +153,7 @@
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(lock.path(), localNode, 0)));
- // FIXME: Should set the correct term information.
+ new Leadership(lock.path(), localNode, lock.epoch())));
}
}
@@ -123,26 +181,31 @@
lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
if (error == null) {
THREAD_POOL.schedule(
- new RelectionTask(lock),
+ new ReelectionTask(lock),
TERM_DURATION_MS / 2,
TimeUnit.MILLISECONDS);
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(lock.path(), localNode, 0)));
+ new Leadership(lock.path(), localNode, lock.epoch())));
+ return;
} else {
- log.error("Failed to acquire lock for {}", path, error);
- // retry
- tryAcquireLeadership(path);
+ log.warn("Failed to acquire lock for {}. Will retry in {} sec", path, WAIT_BEFORE_RETRY_MS, error);
+ try {
+ Thread.sleep(WAIT_BEFORE_RETRY_MS);
+ tryAcquireLeadership(path);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
});
}
- private class RelectionTask implements Runnable {
+ private class ReelectionTask implements Runnable {
private final Lock lock;
- public RelectionTask(Lock lock) {
+ public ReelectionTask(Lock lock) {
this.lock = lock;
}
@@ -152,17 +215,69 @@
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_REELECTED,
- new Leadership(lock.path(), localNode, 0)));
+ new Leadership(lock.path(), localNode, lock.epoch())));
THREAD_POOL.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
} else {
if (openContests.containsKey(lock.path())) {
notifyListeners(
new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
- new Leadership(lock.path(), localNode, 0)));
+ new Leadership(lock.path(), localNode, lock.epoch())));
tryAcquireLeadership(lock.path());
}
}
}
}
+
+ private class PeerAdvertiser implements LeadershipEventListener {
+ @Override
+ public void event(LeadershipEvent event) {
+ // publish events originating on this host.
+ if (event.subject().leader().equals(localNode)) {
+ try {
+ clusterCommunicator.broadcast(
+ new ClusterMessage(
+ localNode.id(),
+ LEADERSHIP_UPDATES,
+ SERIALIZER.encode(event)));
+ } catch (IOException e) {
+ log.error("Failed to broadcast leadership update message", e);
+ }
+ }
+ }
+ }
+
+ private class PeerAdvertisementHandler implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ LeadershipEvent event = SERIALIZER.decode(message.payload());
+ log.debug("Received {} from {}", event, message.sender());
+ notifyListeners(event);
+ }
+ }
+
+ private class LeaderBoardUpdater implements LeadershipEventListener {
+ @Override
+ public void event(LeadershipEvent event) {
+ Leadership leadershipUpdate = event.subject();
+ synchronized (leaderBoard) {
+ Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
+ switch (event.type()) {
+ case LEADER_ELECTED:
+ case LEADER_REELECTED:
+ if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+ leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
+ }
+ break;
+ case LEADER_BOOTED:
+ if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
+ leaderBoard.remove(leadershipUpdate.topic());
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index 74bff8b..81195b4 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -1,8 +1,10 @@
package org.onlab.onos.store.service.impl;
+import static com.google.common.base.Verify.verify;
import static org.slf4j.LoggerFactory.getLogger;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@@ -15,6 +17,7 @@
import org.onlab.onos.store.service.DatabaseException;
import org.onlab.onos.store.service.DatabaseService;
import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.VersionedValue;
import org.slf4j.Logger;
/**
@@ -29,6 +32,7 @@
private final String path;
private DateTime lockExpirationTime;
private AtomicBoolean isLocked = new AtomicBoolean(false);
+ private volatile long epoch = 0;
private byte[] lockId;
public DistributedLock(
@@ -74,6 +78,10 @@
DistributedLockManager.ONOS_LOCK_TABLE_NAME,
path,
lockId)) {
+ VersionedValue vv =
+ databaseService.get(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path);
+ verify(Arrays.equals(vv.value(), lockId));
+ epoch = vv.version();
isLocked.set(true);
lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
return true;
@@ -121,6 +129,11 @@
}
@Override
+ public long epoch() {
+ return epoch;
+ }
+
+ @Override
public void unlock() {
if (!isLocked()) {
return;