LeadershipService and MastershipService/Store APIs return CompletableFutures so that callers can (optionally) chain together dependent operations
Change-Id: Ia00fcb7d98fbfce897527f67ea9690abf6fe846a
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index a4f265f..5b73246 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -188,7 +189,7 @@
}
@Override
- public void runForLeadership(String path) {
+ public CompletableFuture<Leadership> runForLeadership(String path) {
checkArgument(path != null);
Topic topic = new Topic(path);
Topic oldTopic = topics.putIfAbsent(path, topic);
@@ -198,16 +199,18 @@
} else {
oldTopic.runForLeadership();
}
+ return CompletableFuture.completedFuture(getLeadership(path));
}
@Override
- public void withdraw(String path) {
+ public CompletableFuture<Void> withdraw(String path) {
checkArgument(path != null);
Topic topic = topics.get(path);
if (topic != null) {
topics.remove(path, topic);
topic.stop();
}
+ return CompletableFuture.completedFuture(null);
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index a013737..434d0e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -37,6 +37,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -199,8 +201,14 @@
}
@Override
- public void runForLeadership(String path) {
+ public CompletableFuture<Leadership> runForLeadership(String path) {
log.debug("Running for leadership for topic: {}", path);
+ CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
+ doRunForLeadership(path, resultFuture);
+ return resultFuture;
+ }
+
+ private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
try {
Versioned<List<NodeId>> candidates = candidateMap.get(path);
if (candidates != null) {
@@ -216,7 +224,7 @@
newCandidates.version(),
newCandidates.creationTime())));
} else {
- rerunForLeadership(path);
+ rerunForLeadership(path, future);
return;
}
}
@@ -231,28 +239,38 @@
newCandidates.version(),
newCandidates.creationTime())));
} else {
- rerunForLeadership(path);
+ rerunForLeadership(path, future);
return;
}
}
log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
activeTopics.add(path);
- tryLeaderLock(path);
+ tryLeaderLock(path, future);
} catch (ConsistentMapException e) {
log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
- rerunForLeadership(path);
+ rerunForLeadership(path, future);
}
}
@Override
- public void withdraw(String path) {
+ public CompletableFuture<Void> withdraw(String path) {
activeTopics.remove(path);
+ CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+ doWithdraw(path, resultFuture);
+ return resultFuture;
+ }
+
+ private void doWithdraw(String path, CompletableFuture<Void> future) {
+ if (activeTopics.contains(path)) {
+ future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
+ }
try {
Versioned<NodeId> leader = leaderMap.get(path);
if (leader != null && Objects.equals(leader.value(), localNodeId)) {
if (leaderMap.remove(path, leader.version())) {
log.info("Gave up leadership for {}", path);
+ future.complete(null);
publish(new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(path,
@@ -267,10 +285,12 @@
? Lists.newArrayList(candidates.value())
: Lists.newArrayList();
if (!candidateList.remove(localNodeId)) {
+ future.complete(null);
return;
}
if (candidateMap.replace(path, candidates.version(), candidateList)) {
Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+ future.complete(null);
publish(new LeadershipEvent(
LeadershipEvent.Type.CANDIDATES_CHANGED,
new Leadership(path,
@@ -279,11 +299,11 @@
newCandidates.creationTime())));
} else {
log.warn("Failed to withdraw from candidates list. Will retry");
- retryWithdraw(path);
+ retryWithdraw(path, future);
}
} catch (Exception e) {
log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
- retryWithdraw(path);
+ retryWithdraw(path, future);
}
}
@@ -304,7 +324,7 @@
localNodeId,
leader.version(),
leader.creationTime())));
- retryLock(path);
+ retryLock(path, new CompletableFuture<>());
return true;
}
}
@@ -350,7 +370,7 @@
return updated;
}
- private void tryLeaderLock(String path) {
+ private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
return;
}
@@ -362,35 +382,37 @@
.filter(n -> clusterService.getState(n) == ACTIVE)
.collect(Collectors.toList());
if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
- leaderLockAttempt(path, candidates.value());
+ leaderLockAttempt(path, candidates.value(), future);
} else {
- retryLock(path);
+ retryLock(path, future);
}
} else {
throw new IllegalStateException("should not be here");
}
} catch (Exception e) {
log.debug("Failed to fetch candidate information for {}", path, e);
- retryLock(path);
+ retryLock(path, future);
}
}
- private void leaderLockAttempt(String path, List<NodeId> candidates) {
+ private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
try {
Versioned<NodeId> currentLeader = leaderMap.get(path);
if (currentLeader != null) {
if (localNodeId.equals(currentLeader.value())) {
log.info("Already has leadership for {}", path);
// FIXME: candidates can get out of sync.
+ Leadership leadership = new Leadership(path,
+ localNodeId,
+ currentLeader.version(),
+ currentLeader.creationTime());
+ future.complete(leadership);
publish(new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(path,
- localNodeId,
- currentLeader.version(),
- currentLeader.creationTime())));
+ leadership));
} else {
// someone else has leadership. will retry after sometime.
- retryLock(path);
+ retryLock(path, future);
}
} else {
if (leaderMap.putIfAbsent(path, localNodeId) == null) {
@@ -398,20 +420,22 @@
// do a get again to get the version (epoch)
Versioned<NodeId> newLeader = leaderMap.get(path);
// FIXME: candidates can get out of sync
+ Leadership leadership = new Leadership(path,
+ newLeader.value(),
+ newLeader.version(),
+ newLeader.creationTime());
+ future.complete(leadership);
publish(new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(path,
- newLeader.value(),
- newLeader.version(),
- newLeader.creationTime())));
+ leadership));
} else {
// someone beat us to it.
- retryLock(path);
+ retryLock(path, future);
}
}
} catch (Exception e) {
log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
- retryLock(path);
+ retryLock(path, future);
}
}
@@ -463,23 +487,23 @@
}
}
- private void rerunForLeadership(String path) {
+ private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
retryLeaderLockExecutor.schedule(
- () -> runForLeadership(path),
+ () -> doRunForLeadership(path, future),
ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
TimeUnit.SECONDS);
}
- private void retryLock(String path) {
+ private void retryLock(String path, CompletableFuture<Leadership> future) {
retryLeaderLockExecutor.schedule(
- () -> tryLeaderLock(path),
+ () -> tryLeaderLock(path, future),
DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
TimeUnit.SECONDS);
}
- private void retryWithdraw(String path) {
+ private void retryWithdraw(String path, CompletableFuture<Void> future) {
retryLeaderLockExecutor.schedule(
- () -> withdraw(path),
+ () -> doWithdraw(path, future),
DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
TimeUnit.SECONDS);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 9aa376a..5225549 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -163,19 +163,22 @@
}
@Override
- public MastershipRole requestRole(DeviceId deviceId) {
+ public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
if (connectedDevices.add(deviceId)) {
- leadershipService.runForLeadership(leadershipTopic);
- return MastershipRole.STANDBY;
+ return leadershipService.runForLeadership(leadershipTopic)
+ .thenApply(leadership -> {
+ return Objects.equal(localNodeId, leadership.leader())
+ ? MastershipRole.MASTER : MastershipRole.STANDBY;
+ });
} else {
Leadership leadership = leadershipService.getLeadership(leadershipTopic);
if (leadership != null && leadership.leader().equals(localNodeId)) {
- return MastershipRole.MASTER;
+ return CompletableFuture.completedFuture(MastershipRole.MASTER);
} else {
- return MastershipRole.STANDBY;
+ return CompletableFuture.completedFuture(MastershipRole.STANDBY);
}
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
index d6a857c..b2c5ade 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
@@ -205,7 +205,7 @@
}
@Override
- public MastershipRole requestRole(DeviceId deviceId) {
+ public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
// if no master => become master
// if there already exists a master:
@@ -225,7 +225,7 @@
updateTerm(deviceId);
roleMap.put(deviceId, rv);
- return MASTER;
+ return CompletableFuture.completedFuture(MASTER);
}
final MastershipRole currentRole = rv.getRole(local);
switch (currentRole) {
@@ -239,7 +239,7 @@
roleMap.put(deviceId, rv);
// trigger BACKUPS_CHANGED?
}
- return currentRole;
+ return CompletableFuture.completedFuture(currentRole);
case STANDBY:
// RoleInfo integrity check
modified = rv.reassign(local, NONE, STANDBY);
@@ -250,16 +250,16 @@
roleMap.put(deviceId, rv);
// trigger BACKUPS_CHANGED?
}
- return currentRole;
+ return CompletableFuture.completedFuture(currentRole);
case NONE:
rv.reassign(local, NONE, STANDBY);
roleMap.put(deviceId, rv);
// TODO: notifyDelegate BACKUPS_CHANGED
- return STANDBY;
+ return CompletableFuture.completedFuture(STANDBY);
default:
log.warn("unknown Mastership Role {}", currentRole);
}
- return currentRole;
+ return CompletableFuture.completedFuture(currentRole);
} finally {
roleMap.unlock(deviceId);
}