LeadershipService and MastershipService/Store APIs return CompletableFutures so that callers can (optionally) chain together dependent operations
Change-Id: Ia00fcb7d98fbfce897527f67ea9690abf6fe846a
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
index 9b94056..9130f4f 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
@@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
/**
* Service for leader election.
@@ -55,16 +56,18 @@
/**
* Joins the leadership contest.
*
- * @param path topic for which this controller node wishes to be a leader.
+ * @param path topic for which this controller node wishes to be a leader
+ * @return {@code Leadership} future
*/
- void runForLeadership(String path);
+ CompletableFuture<Leadership> runForLeadership(String path);
/**
* Withdraws from a leadership contest.
*
- * @param path topic for which this controller node no longer wishes to be a leader.
+ * @param path topic for which this controller node no longer wishes to be a leader
+ * @return future that is successfully completed when withdraw is done
*/
- void withdraw(String path);
+ CompletableFuture<Void> withdraw(String path);
/**
* If the local nodeId is the leader for specified topic, this method causes it to
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipAdminService.java b/core/api/src/main/java/org/onosproject/mastership/MastershipAdminService.java
index 6c41fb2..a8835fc 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipAdminService.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipAdminService.java
@@ -15,6 +15,8 @@
*/
package org.onosproject.mastership;
+import java.util.concurrent.CompletableFuture;
+
import org.onosproject.cluster.NodeId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
@@ -30,8 +32,9 @@
* @param instance controller instance identifier
* @param deviceId device identifier
* @param role requested role
+ * @return future that is completed when the role is set
*/
- void setRole(NodeId instance, DeviceId deviceId, MastershipRole role);
+ CompletableFuture<Void> setRole(NodeId instance, DeviceId deviceId, MastershipRole role);
/**
* Balances the mastership to be shared as evenly as possibly by all
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipService.java b/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
index f03f6c8..450bcc5 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
@@ -16,6 +16,7 @@
package org.onosproject.mastership;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.RoleInfo;
@@ -46,7 +47,7 @@
* @param deviceId the the identifier of the device
* @return the role of this controller instance
*/
- MastershipRole requestRoleFor(DeviceId deviceId);
+ CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId);
/**
* Abandons mastership of the specified device on the local node thus
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java b/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
index 6b64705..81c2d8b 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
@@ -38,7 +38,7 @@
* @param deviceId device identifier
* @return established or newly negotiated mastership role
*/
- MastershipRole requestRole(DeviceId deviceId);
+ CompletableFuture<MastershipRole> requestRole(DeviceId deviceId);
/**
* Returns the role of a device for a specific controller instance.
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
index abc92f5..e1d421d 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
@@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
/**
* Test adapter for leadership service.
@@ -40,13 +41,13 @@
}
@Override
- public void runForLeadership(String path) {
-
+ public CompletableFuture<Leadership> runForLeadership(String path) {
+ return null;
}
@Override
- public void withdraw(String path) {
-
+ public CompletableFuture<Void> withdraw(String path) {
+ return null;
}
@Override
diff --git a/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
index be1f4b9..aff67cd 100644
--- a/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
@@ -21,6 +21,7 @@
import org.onosproject.net.MastershipRole;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
/**
* Test adapter for mastership service.
@@ -32,7 +33,7 @@
}
@Override
- public MastershipRole requestRoleFor(DeviceId deviceId) {
+ public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
return null;
}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
index 1f3b360..ee60cc7 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
@@ -17,6 +17,8 @@
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -109,7 +111,7 @@
}
@Override
- public void setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
+ public CompletableFuture<Void> setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
checkNotNull(nodeId, NODE_ID_NULL);
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(role, ROLE_NULL);
@@ -128,14 +130,14 @@
break;
default:
log.info("Unknown role; ignoring");
- return;
+ return CompletableFuture.completedFuture(null);
}
- eventFuture.whenComplete((event, error) -> {
+ return eventFuture.whenComplete((event, error) -> {
if (event != null) {
post(event);
}
- });
+ }).thenApply(v -> null);
}
@Override
@@ -155,14 +157,11 @@
}
@Override
- public MastershipRole requestRoleFor(DeviceId deviceId) {
+ public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
final Context timer = startTimer(requestRoleTimer);
- try {
- return store.requestRole(deviceId);
- } finally {
- stopTimer(timer);
- }
+ return store.requestRole(deviceId).whenComplete((result, error) -> stopTimer(timer));
+
}
@Override
@@ -222,13 +221,18 @@
}
// Now re-balance the buckets until they are roughly even.
+ List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newLinkedList();
int rounds = controllerDevices.keySet().size();
for (int i = 0; i < rounds; i++) {
// Iterate over the buckets and find the smallest and the largest.
ControllerNode smallest = findBucket(true, controllerDevices);
ControllerNode largest = findBucket(false, controllerDevices);
- balanceBuckets(smallest, largest, controllerDevices, deviceCount);
+ balanceBucketsFutures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount));
}
+ CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
+ balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
+
+ Futures.getUnchecked(balanceRolesFuture);
}
private ControllerNode findBucket(boolean min,
@@ -245,7 +249,7 @@
return xNode;
}
- private void balanceBuckets(ControllerNode smallest, ControllerNode largest,
+ private CompletableFuture<Void> balanceBuckets(ControllerNode smallest, ControllerNode largest,
Map<ControllerNode, Set<DeviceId>> controllerDevices,
int deviceCount) {
Collection<DeviceId> minBucket = controllerDevices.get(smallest);
@@ -255,6 +259,8 @@
int delta = (maxBucket.size() - minBucket.size()) / 2;
delta = Math.min(deviceCount / bucketCount, delta);
+ List<CompletableFuture<Void>> setRoleFutures = Lists.newLinkedList();
+
if (delta > 0) {
log.info("Attempting to move {} nodes from {} to {}...", delta,
largest.id(), smallest.id());
@@ -264,12 +270,14 @@
while (it.hasNext() && i < delta) {
DeviceId deviceId = it.next();
log.info("Setting {} as the master for {}", smallest.id(), deviceId);
- setRole(smallest.id(), deviceId, MASTER);
+ setRoleFutures.add(setRole(smallest.id(), deviceId, MASTER));
controllerDevices.get(smallest).add(deviceId);
it.remove();
i++;
}
}
+
+ return CompletableFuture.allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()]));
}
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 661ddca..d12664b 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -16,6 +16,7 @@
package org.onosproject.net.device.impl;
import com.google.common.collect.Lists;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -57,6 +58,7 @@
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -79,7 +81,6 @@
private static final String PORT_NUMBER_NULL = "Port number cannot be null";
private static final String DEVICE_DESCRIPTION_NULL = "Device description cannot be null";
private static final String PORT_DESCRIPTION_NULL = "Port description cannot be null";
- private static final String ROLE_NULL = "Role cannot be null";
private final Logger log = getLogger(getClass());
@@ -89,6 +90,7 @@
private final DeviceStoreDelegate delegate = new InternalStoreDelegate();
private final MastershipListener mastershipListener = new InternalMastershipListener();
+ private NodeId localNodeId;
private ScheduledExecutorService backgroundService;
@@ -113,6 +115,7 @@
@Activate
public void activate() {
backgroundService = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "manager-background"));
+ localNodeId = clusterService.getLocalNode().id();
store.setDelegate(delegate);
eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
@@ -302,12 +305,10 @@
checkValidity();
log.info("Device {} connected", deviceId);
- final NodeId myNodeId = clusterService.getLocalNode().id();
-
// check my Role
mastershipService.requestRoleFor(deviceId);
final MastershipTerm term = termService.getMastershipTerm(deviceId);
- if (term == null || !myNodeId.equals(term.master())) {
+ if (term == null || !localNodeId.equals(term.master())) {
log.info("Role of this node is STANDBY for {}", deviceId);
// TODO: Do we need to explicitly tell the Provider that
// this instance is not the MASTER
@@ -337,7 +338,6 @@
log.info("Device {} disconnected from this node", deviceId);
- DeviceEvent event = null;
List<Port> ports = store.getPorts(deviceId);
List<PortDescription> descs = Lists.newArrayList();
ports.forEach(port ->
@@ -346,7 +346,7 @@
port.portSpeed())));
store.updatePorts(this.provider().id(), deviceId, descs);
try {
- event = store.markOffline(deviceId);
+ post(store.markOffline(deviceId));
} catch (IllegalStateException e) {
log.warn("Failed to mark {} offline", deviceId);
// only the MASTER should be marking off-line in normal cases,
@@ -360,26 +360,21 @@
// FIXME: Store semantics leaking out as IllegalStateException.
// Consider revising store API to handle this scenario.
-
- MastershipRole role = mastershipService.requestRoleFor(deviceId);
- MastershipTerm term = termService.getMastershipTerm(deviceId);
- final NodeId myNodeId = clusterService.getLocalNode().id();
- // TODO: Move this type of check inside device clock manager, etc.
- if (term != null && myNodeId.equals(term.master())) {
- log.info("Retry marking {} offline", deviceId);
- deviceClockProviderService.setMastershipTerm(deviceId, term);
- event = store.markOffline(deviceId);
- } else {
- log.info("Failed again marking {} offline. {}", deviceId, role);
- }
+ CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
+ roleFuture.whenComplete((role, error) -> {
+ MastershipTerm term = termService.getMastershipTerm(deviceId);
+ // TODO: Move this type of check inside device clock manager, etc.
+ if (term != null && localNodeId.equals(term.master())) {
+ log.info("Retry marking {} offline", deviceId);
+ deviceClockProviderService.setMastershipTerm(deviceId, term);
+ post(store.markOffline(deviceId));
+ } else {
+ log.info("Failed again marking {} offline. {}", deviceId, role);
+ }
+ });
} finally {
//relinquish master role and ability to be backup.
mastershipService.relinquishMastership(deviceId);
-
- if (event != null) {
- log.info("Device {} disconnected from cluster", deviceId);
- post(event);
- }
}
}
@@ -531,12 +526,11 @@
private void reassertRole(final DeviceId did,
final MastershipRole nextRole) {
- final NodeId myNodeId = clusterService.getLocalNode().id();
MastershipRole myNextRole = nextRole;
if (myNextRole == NONE) {
mastershipService.requestRoleFor(did);
MastershipTerm term = termService.getMastershipTerm(did);
- if (term != null && myNodeId.equals(term.master())) {
+ if (term != null && localNodeId.equals(term.master())) {
myNextRole = MASTER;
} else {
myNextRole = STANDBY;
@@ -597,21 +591,20 @@
}
final DeviceId did = event.subject();
- final NodeId myNodeId = clusterService.getLocalNode().id();
// myRole suggested by MastershipService
MastershipRole myNextRole;
- if (myNodeId.equals(event.roleInfo().master())) {
+ if (localNodeId.equals(event.roleInfo().master())) {
// confirm latest info
MastershipTerm term = termService.getMastershipTerm(did);
- final boolean iHaveControl = term != null && myNodeId.equals(term.master());
+ final boolean iHaveControl = term != null && localNodeId.equals(term.master());
if (iHaveControl) {
deviceClockProviderService.setMastershipTerm(did, term);
myNextRole = MASTER;
} else {
myNextRole = STANDBY;
}
- } else if (event.roleInfo().backups().contains(myNodeId)) {
+ } else if (event.roleInfo().backups().contains(localNodeId)) {
myNextRole = STANDBY;
} else {
myNextRole = NONE;
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
index 87b08d6..b5ae31b 100644
--- a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
@@ -34,6 +34,7 @@
import org.onosproject.store.trivial.impl.SimpleMastershipStore;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -77,7 +78,7 @@
public void setRole() {
mgr.setRole(NID_OTHER, DEV_MASTER, MASTER);
assertEquals("wrong local role:", NONE, mgr.getLocalRole(DEV_MASTER));
- assertEquals("wrong obtained role:", STANDBY, mgr.requestRoleFor(DEV_MASTER));
+ assertEquals("wrong obtained role:", STANDBY, Futures.getUnchecked(mgr.requestRoleFor(DEV_MASTER)));
//set to master
mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
@@ -112,8 +113,8 @@
mgr.setRole(NID_OTHER, DEV_OTHER, MASTER);
//local should be master for one but standby for other
- assertEquals("wrong role:", MASTER, mgr.requestRoleFor(DEV_MASTER));
- assertEquals("wrong role:", STANDBY, mgr.requestRoleFor(DEV_OTHER));
+ assertEquals("wrong role:", MASTER, Futures.getUnchecked(mgr.requestRoleFor(DEV_MASTER)));
+ assertEquals("wrong role:", STANDBY, Futures.getUnchecked(mgr.requestRoleFor(DEV_OTHER)));
}
@Test
diff --git a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
index 20d55c4..d6bfda2 100644
--- a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
@@ -19,6 +19,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.junit.After;
import org.junit.Before;
@@ -305,8 +306,8 @@
}
@Override
- public MastershipRole requestRoleFor(DeviceId deviceId) {
- return MastershipRole.MASTER;
+ public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
+ return CompletableFuture.completedFuture(MastershipRole.MASTER);
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index a4f265f..5b73246 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -49,6 +49,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -188,7 +189,7 @@
}
@Override
- public void runForLeadership(String path) {
+ public CompletableFuture<Leadership> runForLeadership(String path) {
checkArgument(path != null);
Topic topic = new Topic(path);
Topic oldTopic = topics.putIfAbsent(path, topic);
@@ -198,16 +199,18 @@
} else {
oldTopic.runForLeadership();
}
+ return CompletableFuture.completedFuture(getLeadership(path));
}
@Override
- public void withdraw(String path) {
+ public CompletableFuture<Void> withdraw(String path) {
checkArgument(path != null);
Topic topic = topics.get(path);
if (topic != null) {
topics.remove(path, topic);
topic.stop();
}
+ return CompletableFuture.completedFuture(null);
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index a013737..434d0e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -37,6 +37,8 @@
import java.util.Objects;
import java.util.Set;
import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -199,8 +201,14 @@
}
@Override
- public void runForLeadership(String path) {
+ public CompletableFuture<Leadership> runForLeadership(String path) {
log.debug("Running for leadership for topic: {}", path);
+ CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
+ doRunForLeadership(path, resultFuture);
+ return resultFuture;
+ }
+
+ private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
try {
Versioned<List<NodeId>> candidates = candidateMap.get(path);
if (candidates != null) {
@@ -216,7 +224,7 @@
newCandidates.version(),
newCandidates.creationTime())));
} else {
- rerunForLeadership(path);
+ rerunForLeadership(path, future);
return;
}
}
@@ -231,28 +239,38 @@
newCandidates.version(),
newCandidates.creationTime())));
} else {
- rerunForLeadership(path);
+ rerunForLeadership(path, future);
return;
}
}
log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
activeTopics.add(path);
- tryLeaderLock(path);
+ tryLeaderLock(path, future);
} catch (ConsistentMapException e) {
log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
- rerunForLeadership(path);
+ rerunForLeadership(path, future);
}
}
@Override
- public void withdraw(String path) {
+ public CompletableFuture<Void> withdraw(String path) {
activeTopics.remove(path);
+ CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+ doWithdraw(path, resultFuture);
+ return resultFuture;
+ }
+
+ private void doWithdraw(String path, CompletableFuture<Void> future) {
+ if (activeTopics.contains(path)) {
+ future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
+ }
try {
Versioned<NodeId> leader = leaderMap.get(path);
if (leader != null && Objects.equals(leader.value(), localNodeId)) {
if (leaderMap.remove(path, leader.version())) {
log.info("Gave up leadership for {}", path);
+ future.complete(null);
publish(new LeadershipEvent(
LeadershipEvent.Type.LEADER_BOOTED,
new Leadership(path,
@@ -267,10 +285,12 @@
? Lists.newArrayList(candidates.value())
: Lists.newArrayList();
if (!candidateList.remove(localNodeId)) {
+ future.complete(null);
return;
}
if (candidateMap.replace(path, candidates.version(), candidateList)) {
Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+ future.complete(null);
publish(new LeadershipEvent(
LeadershipEvent.Type.CANDIDATES_CHANGED,
new Leadership(path,
@@ -279,11 +299,11 @@
newCandidates.creationTime())));
} else {
log.warn("Failed to withdraw from candidates list. Will retry");
- retryWithdraw(path);
+ retryWithdraw(path, future);
}
} catch (Exception e) {
log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
- retryWithdraw(path);
+ retryWithdraw(path, future);
}
}
@@ -304,7 +324,7 @@
localNodeId,
leader.version(),
leader.creationTime())));
- retryLock(path);
+ retryLock(path, new CompletableFuture<>());
return true;
}
}
@@ -350,7 +370,7 @@
return updated;
}
- private void tryLeaderLock(String path) {
+ private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
return;
}
@@ -362,35 +382,37 @@
.filter(n -> clusterService.getState(n) == ACTIVE)
.collect(Collectors.toList());
if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
- leaderLockAttempt(path, candidates.value());
+ leaderLockAttempt(path, candidates.value(), future);
} else {
- retryLock(path);
+ retryLock(path, future);
}
} else {
throw new IllegalStateException("should not be here");
}
} catch (Exception e) {
log.debug("Failed to fetch candidate information for {}", path, e);
- retryLock(path);
+ retryLock(path, future);
}
}
- private void leaderLockAttempt(String path, List<NodeId> candidates) {
+ private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
try {
Versioned<NodeId> currentLeader = leaderMap.get(path);
if (currentLeader != null) {
if (localNodeId.equals(currentLeader.value())) {
log.info("Already has leadership for {}", path);
// FIXME: candidates can get out of sync.
+ Leadership leadership = new Leadership(path,
+ localNodeId,
+ currentLeader.version(),
+ currentLeader.creationTime());
+ future.complete(leadership);
publish(new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(path,
- localNodeId,
- currentLeader.version(),
- currentLeader.creationTime())));
+ leadership));
} else {
// someone else has leadership. will retry after sometime.
- retryLock(path);
+ retryLock(path, future);
}
} else {
if (leaderMap.putIfAbsent(path, localNodeId) == null) {
@@ -398,20 +420,22 @@
// do a get again to get the version (epoch)
Versioned<NodeId> newLeader = leaderMap.get(path);
// FIXME: candidates can get out of sync
+ Leadership leadership = new Leadership(path,
+ newLeader.value(),
+ newLeader.version(),
+ newLeader.creationTime());
+ future.complete(leadership);
publish(new LeadershipEvent(
LeadershipEvent.Type.LEADER_ELECTED,
- new Leadership(path,
- newLeader.value(),
- newLeader.version(),
- newLeader.creationTime())));
+ leadership));
} else {
// someone beat us to it.
- retryLock(path);
+ retryLock(path, future);
}
}
} catch (Exception e) {
log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
- retryLock(path);
+ retryLock(path, future);
}
}
@@ -463,23 +487,23 @@
}
}
- private void rerunForLeadership(String path) {
+ private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
retryLeaderLockExecutor.schedule(
- () -> runForLeadership(path),
+ () -> doRunForLeadership(path, future),
ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
TimeUnit.SECONDS);
}
- private void retryLock(String path) {
+ private void retryLock(String path, CompletableFuture<Leadership> future) {
retryLeaderLockExecutor.schedule(
- () -> tryLeaderLock(path),
+ () -> tryLeaderLock(path, future),
DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
TimeUnit.SECONDS);
}
- private void retryWithdraw(String path) {
+ private void retryWithdraw(String path, CompletableFuture<Void> future) {
retryLeaderLockExecutor.schedule(
- () -> withdraw(path),
+ () -> doWithdraw(path, future),
DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
TimeUnit.SECONDS);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 9aa376a..5225549 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -163,19 +163,22 @@
}
@Override
- public MastershipRole requestRole(DeviceId deviceId) {
+ public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
checkArgument(deviceId != null, DEVICE_ID_NULL);
String leadershipTopic = createDeviceMastershipTopic(deviceId);
if (connectedDevices.add(deviceId)) {
- leadershipService.runForLeadership(leadershipTopic);
- return MastershipRole.STANDBY;
+ return leadershipService.runForLeadership(leadershipTopic)
+ .thenApply(leadership -> {
+ return Objects.equal(localNodeId, leadership.leader())
+ ? MastershipRole.MASTER : MastershipRole.STANDBY;
+ });
} else {
Leadership leadership = leadershipService.getLeadership(leadershipTopic);
if (leadership != null && leadership.leader().equals(localNodeId)) {
- return MastershipRole.MASTER;
+ return CompletableFuture.completedFuture(MastershipRole.MASTER);
} else {
- return MastershipRole.STANDBY;
+ return CompletableFuture.completedFuture(MastershipRole.STANDBY);
}
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
index d6a857c..b2c5ade 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
@@ -205,7 +205,7 @@
}
@Override
- public MastershipRole requestRole(DeviceId deviceId) {
+ public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
// if no master => become master
// if there already exists a master:
@@ -225,7 +225,7 @@
updateTerm(deviceId);
roleMap.put(deviceId, rv);
- return MASTER;
+ return CompletableFuture.completedFuture(MASTER);
}
final MastershipRole currentRole = rv.getRole(local);
switch (currentRole) {
@@ -239,7 +239,7 @@
roleMap.put(deviceId, rv);
// trigger BACKUPS_CHANGED?
}
- return currentRole;
+ return CompletableFuture.completedFuture(currentRole);
case STANDBY:
// RoleInfo integrity check
modified = rv.reassign(local, NONE, STANDBY);
@@ -250,16 +250,16 @@
roleMap.put(deviceId, rv);
// trigger BACKUPS_CHANGED?
}
- return currentRole;
+ return CompletableFuture.completedFuture(currentRole);
case NONE:
rv.reassign(local, NONE, STANDBY);
roleMap.put(deviceId, rv);
// TODO: notifyDelegate BACKUPS_CHANGED
- return STANDBY;
+ return CompletableFuture.completedFuture(STANDBY);
default:
log.warn("unknown Mastership Role {}", currentRole);
}
- return currentRole;
+ return CompletableFuture.completedFuture(currentRole);
} finally {
roleMap.unlock(deviceId);
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
index c3b529e..9655b2d 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
@@ -35,6 +35,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import static junit.framework.TestCase.assertFalse;
import static org.easymock.EasyMock.anyObject;
@@ -74,8 +75,11 @@
leadershipService.addListener(anyObject(LeadershipEventListener.class));
expectLastCall().andDelegateTo(new TestLeadershipService());
- leadershipService.runForLeadership(anyString());
- expectLastCall().anyTimes();
+ for (int i = 0; i < PartitionManager.NUM_PARTITIONS; i++) {
+ expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
+ .andReturn(CompletableFuture.completedFuture(null))
+ .times(1);
+ }
partitionManager = new PartitionManager()
.withScheduledExecutor(new NullScheduledExecutor());
@@ -92,6 +96,7 @@
* @param numMine number of partitions that should be owned by the local node
*/
private void setUpLeadershipService(int numMine) {
+
Map<String, Leadership> leaderBoard = new HashMap<>();
for (int i = 0; i < numMine; i++) {
@@ -123,7 +128,9 @@
leadershipService.addListener(anyObject(LeadershipEventListener.class));
for (int i = 0; i < PartitionManager.NUM_PARTITIONS; i++) {
- leadershipService.runForLeadership(ELECTION_PREFIX + i);
+ expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
+ .andReturn(CompletableFuture.completedFuture(null))
+ .times(1);
}
replay(leadershipService);
@@ -172,8 +179,9 @@
// We have all the partitions so we'll need to relinquish some
setUpLeadershipService(PartitionManager.NUM_PARTITIONS);
- leadershipService.withdraw(anyString());
- expectLastCall().times(7);
+ expect(leadershipService.withdraw(anyString()))
+ .andReturn(CompletableFuture.completedFuture(null))
+ .times(7);
replay(leadershipService);
diff --git a/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
index 56879e8..1745687 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
@@ -145,22 +145,22 @@
//if already MASTER, nothing should happen
testStore.put(DID2, N1, true, false, true);
- assertEquals("wrong role for MASTER:", MASTER, dms.requestRole(DID2));
+ assertEquals("wrong role for MASTER:", MASTER, Futures.getUnchecked(dms.requestRole(DID2)));
//populate maps with DID1, N1 thru NONE case
- assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
assertTrue("wrong state for store:", !dms.terms.isEmpty());
assertEquals("wrong term",
MastershipTerm.of(N1, 1), dms.getTermFor(DID1));
//CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY
testStore.setCurrent(CN2);
- assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+ assertEquals("wrong role for STANDBY:", STANDBY, Futures.getUnchecked(dms.requestRole(DID2)));
assertEquals("wrong number of entries:", 2, dms.terms.size());
//change term and requestRole() again; should persist
testStore.increment(DID2);
- assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+ assertEquals("wrong role for STANDBY:", STANDBY, Futures.getUnchecked(dms.requestRole(DID2)));
assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2));
}
@@ -168,7 +168,7 @@
public void setMaster() {
//populate maps with DID1, N1 as MASTER thru NONE case
testStore.setCurrent(CN1);
- assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
assertNull("wrong event:", Futures.getUnchecked(dms.setMaster(N1, DID1)));
//switch over to N2
@@ -189,7 +189,7 @@
public void relinquishRole() {
//populate maps with DID1, N1 as MASTER thru NONE case
testStore.setCurrent(CN1);
- assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
//no backup, no new MASTER/event
assertNull("wrong event:", Futures.getUnchecked(dms.relinquishRole(N1, DID1)));
@@ -197,7 +197,7 @@
//add backup CN2, get it elected MASTER by relinquishing
testStore.setCurrent(CN2);
- assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+ assertEquals("wrong role for NONE:", STANDBY, Futures.getUnchecked(dms.requestRole(DID1)));
assertEquals("wrong event:", Type.MASTER_CHANGED, Futures.getUnchecked(dms.relinquishRole(N1, DID1)).type());
assertEquals("wrong master", N2, dms.getMaster(DID1));
@@ -209,9 +209,9 @@
dms.roleMap.get(DID1).nodesOfRole(NONE).size());
//bring nodes back
- assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+ assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
testStore.setCurrent(CN1);
- assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+ assertEquals("wrong role for NONE:", STANDBY, Futures.getUnchecked(dms.requestRole(DID1)));
assertEquals("wrong number of backup nodes", 1,
dms.roleMap.get(DID1).nodesOfRole(STANDBY).size());
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
index d00f616..eebe3dd 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
@@ -21,6 +21,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
@@ -76,21 +77,23 @@
}
@Override
- public void runForLeadership(String path) {
+ public CompletableFuture<Leadership> runForLeadership(String path) {
elections.put(path, true);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_ELECTED,
new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
}
+ return CompletableFuture.completedFuture(new Leadership(path, clusterService.getLocalNode().id(), 0, 0));
}
@Override
- public void withdraw(String path) {
+ public CompletableFuture<Void> withdraw(String path) {
elections.remove(path);
for (LeadershipEventListener listener : listeners) {
listener.event(new LeadershipEvent(Type.LEADER_BOOTED,
new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
}
+ return CompletableFuture.completedFuture(null);
}
@Override
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java
index 39bc893..2feb18b 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java
@@ -191,14 +191,14 @@
}
@Override
- public synchronized MastershipRole requestRole(DeviceId deviceId) {
+ public synchronized CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
//query+possible reelection
NodeId node = clusterService.getLocalNode().id();
MastershipRole role = getRole(node, deviceId);
switch (role) {
case MASTER:
- return MastershipRole.MASTER;
+ return CompletableFuture.completedFuture(MastershipRole.MASTER);
case STANDBY:
if (getMaster(deviceId) == null) {
// no master => become master
@@ -208,9 +208,9 @@
removeFromBackups(deviceId, node);
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
getNodes(deviceId)));
- return MastershipRole.MASTER;
+ return CompletableFuture.completedFuture(MastershipRole.MASTER);
}
- return MastershipRole.STANDBY;
+ return CompletableFuture.completedFuture(MastershipRole.STANDBY);
case NONE:
if (getMaster(deviceId) == null) {
// no master => become master
@@ -218,18 +218,18 @@
incrementTerm(deviceId);
notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
getNodes(deviceId)));
- return MastershipRole.MASTER;
+ return CompletableFuture.completedFuture(MastershipRole.MASTER);
}
// add to backup list
if (addToBackup(deviceId, node)) {
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId,
getNodes(deviceId)));
}
- return MastershipRole.STANDBY;
+ return CompletableFuture.completedFuture(MastershipRole.STANDBY);
default:
log.warn("unknown Mastership Role {}", role);
}
- return role;
+ return CompletableFuture.completedFuture(role);
}
// add to backup if not there already, silently ignores null node
diff --git a/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java b/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java
index 5b44515..2cda65b 100644
--- a/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java
@@ -130,19 +130,19 @@
public void requestRole() {
//NONE - become MASTER
put(DID1, N1, false, false);
- assertEquals("wrong role", MASTER, sms.requestRole(DID1));
+ assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID1)));
//was STANDBY - become MASTER
put(DID2, N1, false, true);
- assertEquals("wrong role", MASTER, sms.requestRole(DID2));
+ assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID2)));
//other MASTER - stay STANDBY
put(DID3, N2, true, false);
- assertEquals("wrong role", STANDBY, sms.requestRole(DID3));
+ assertEquals("wrong role", STANDBY, Futures.getUnchecked(sms.requestRole(DID3)));
//local (N1) is MASTER - stay MASTER
put(DID4, N1, true, true);
- assertEquals("wrong role", MASTER, sms.requestRole(DID4));
+ assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID4)));
}
@Test
diff --git a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
index 2e05ce5..4395914 100644
--- a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
+++ b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
@@ -30,6 +30,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.junit.After;
import org.junit.Before;
@@ -496,8 +497,8 @@
}
@Override
- public MastershipRole requestRoleFor(DeviceId deviceId) {
- return null;
+ public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
+ return CompletableFuture.completedFuture(null);
}
@Override