LeadershipService and MastershipService/Store APIs return CompletableFutures so that callers can (optionally) chain together dependent operations
Change-Id: Ia00fcb7d98fbfce897527f67ea9690abf6fe846a
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
index 1f3b360..ee60cc7 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
@@ -17,6 +17,8 @@
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -109,7 +111,7 @@
}
@Override
- public void setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
+ public CompletableFuture<Void> setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
checkNotNull(nodeId, NODE_ID_NULL);
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(role, ROLE_NULL);
@@ -128,14 +130,14 @@
break;
default:
log.info("Unknown role; ignoring");
- return;
+ return CompletableFuture.completedFuture(null);
}
- eventFuture.whenComplete((event, error) -> {
+ return eventFuture.whenComplete((event, error) -> {
if (event != null) {
post(event);
}
- });
+ }).thenApply(v -> null);
}
@Override
@@ -155,14 +157,11 @@
}
@Override
- public MastershipRole requestRoleFor(DeviceId deviceId) {
+ public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
final Context timer = startTimer(requestRoleTimer);
- try {
- return store.requestRole(deviceId);
- } finally {
- stopTimer(timer);
- }
+ return store.requestRole(deviceId).whenComplete((result, error) -> stopTimer(timer));
+
}
@Override
@@ -222,13 +221,18 @@
}
// Now re-balance the buckets until they are roughly even.
+ List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newLinkedList();
int rounds = controllerDevices.keySet().size();
for (int i = 0; i < rounds; i++) {
// Iterate over the buckets and find the smallest and the largest.
ControllerNode smallest = findBucket(true, controllerDevices);
ControllerNode largest = findBucket(false, controllerDevices);
- balanceBuckets(smallest, largest, controllerDevices, deviceCount);
+ balanceBucketsFutures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount));
}
+ CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
+ balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
+
+ Futures.getUnchecked(balanceRolesFuture);
}
private ControllerNode findBucket(boolean min,
@@ -245,7 +249,7 @@
return xNode;
}
- private void balanceBuckets(ControllerNode smallest, ControllerNode largest,
+ private CompletableFuture<Void> balanceBuckets(ControllerNode smallest, ControllerNode largest,
Map<ControllerNode, Set<DeviceId>> controllerDevices,
int deviceCount) {
Collection<DeviceId> minBucket = controllerDevices.get(smallest);
@@ -255,6 +259,8 @@
int delta = (maxBucket.size() - minBucket.size()) / 2;
delta = Math.min(deviceCount / bucketCount, delta);
+ List<CompletableFuture<Void>> setRoleFutures = Lists.newLinkedList();
+
if (delta > 0) {
log.info("Attempting to move {} nodes from {} to {}...", delta,
largest.id(), smallest.id());
@@ -264,12 +270,14 @@
while (it.hasNext() && i < delta) {
DeviceId deviceId = it.next();
log.info("Setting {} as the master for {}", smallest.id(), deviceId);
- setRole(smallest.id(), deviceId, MASTER);
+ setRoleFutures.add(setRole(smallest.id(), deviceId, MASTER));
controllerDevices.get(smallest).add(deviceId);
it.remove();
i++;
}
}
+
+ return CompletableFuture.allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()]));
}
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 661ddca..d12664b 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -16,6 +16,7 @@
package org.onosproject.net.device.impl;
import com.google.common.collect.Lists;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -57,6 +58,7 @@
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -79,7 +81,6 @@
private static final String PORT_NUMBER_NULL = "Port number cannot be null";
private static final String DEVICE_DESCRIPTION_NULL = "Device description cannot be null";
private static final String PORT_DESCRIPTION_NULL = "Port description cannot be null";
- private static final String ROLE_NULL = "Role cannot be null";
private final Logger log = getLogger(getClass());
@@ -89,6 +90,7 @@
private final DeviceStoreDelegate delegate = new InternalStoreDelegate();
private final MastershipListener mastershipListener = new InternalMastershipListener();
+ private NodeId localNodeId;
private ScheduledExecutorService backgroundService;
@@ -113,6 +115,7 @@
@Activate
public void activate() {
backgroundService = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "manager-background"));
+ localNodeId = clusterService.getLocalNode().id();
store.setDelegate(delegate);
eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
@@ -302,12 +305,10 @@
checkValidity();
log.info("Device {} connected", deviceId);
- final NodeId myNodeId = clusterService.getLocalNode().id();
-
// check my Role
mastershipService.requestRoleFor(deviceId);
final MastershipTerm term = termService.getMastershipTerm(deviceId);
- if (term == null || !myNodeId.equals(term.master())) {
+ if (term == null || !localNodeId.equals(term.master())) {
log.info("Role of this node is STANDBY for {}", deviceId);
// TODO: Do we need to explicitly tell the Provider that
// this instance is not the MASTER
@@ -337,7 +338,6 @@
log.info("Device {} disconnected from this node", deviceId);
- DeviceEvent event = null;
List<Port> ports = store.getPorts(deviceId);
List<PortDescription> descs = Lists.newArrayList();
ports.forEach(port ->
@@ -346,7 +346,7 @@
port.portSpeed())));
store.updatePorts(this.provider().id(), deviceId, descs);
try {
- event = store.markOffline(deviceId);
+ post(store.markOffline(deviceId));
} catch (IllegalStateException e) {
log.warn("Failed to mark {} offline", deviceId);
// only the MASTER should be marking off-line in normal cases,
@@ -360,26 +360,21 @@
// FIXME: Store semantics leaking out as IllegalStateException.
// Consider revising store API to handle this scenario.
-
- MastershipRole role = mastershipService.requestRoleFor(deviceId);
- MastershipTerm term = termService.getMastershipTerm(deviceId);
- final NodeId myNodeId = clusterService.getLocalNode().id();
- // TODO: Move this type of check inside device clock manager, etc.
- if (term != null && myNodeId.equals(term.master())) {
- log.info("Retry marking {} offline", deviceId);
- deviceClockProviderService.setMastershipTerm(deviceId, term);
- event = store.markOffline(deviceId);
- } else {
- log.info("Failed again marking {} offline. {}", deviceId, role);
- }
+ CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
+ roleFuture.whenComplete((role, error) -> {
+ MastershipTerm term = termService.getMastershipTerm(deviceId);
+ // TODO: Move this type of check inside device clock manager, etc.
+ if (term != null && localNodeId.equals(term.master())) {
+ log.info("Retry marking {} offline", deviceId);
+ deviceClockProviderService.setMastershipTerm(deviceId, term);
+ post(store.markOffline(deviceId));
+ } else {
+ log.info("Failed again marking {} offline. {}", deviceId, role);
+ }
+ });
} finally {
//relinquish master role and ability to be backup.
mastershipService.relinquishMastership(deviceId);
-
- if (event != null) {
- log.info("Device {} disconnected from cluster", deviceId);
- post(event);
- }
}
}
@@ -531,12 +526,11 @@
private void reassertRole(final DeviceId did,
final MastershipRole nextRole) {
- final NodeId myNodeId = clusterService.getLocalNode().id();
MastershipRole myNextRole = nextRole;
if (myNextRole == NONE) {
mastershipService.requestRoleFor(did);
MastershipTerm term = termService.getMastershipTerm(did);
- if (term != null && myNodeId.equals(term.master())) {
+ if (term != null && localNodeId.equals(term.master())) {
myNextRole = MASTER;
} else {
myNextRole = STANDBY;
@@ -597,21 +591,20 @@
}
final DeviceId did = event.subject();
- final NodeId myNodeId = clusterService.getLocalNode().id();
// myRole suggested by MastershipService
MastershipRole myNextRole;
- if (myNodeId.equals(event.roleInfo().master())) {
+ if (localNodeId.equals(event.roleInfo().master())) {
// confirm latest info
MastershipTerm term = termService.getMastershipTerm(did);
- final boolean iHaveControl = term != null && myNodeId.equals(term.master());
+ final boolean iHaveControl = term != null && localNodeId.equals(term.master());
if (iHaveControl) {
deviceClockProviderService.setMastershipTerm(did, term);
myNextRole = MASTER;
} else {
myNextRole = STANDBY;
}
- } else if (event.roleInfo().backups().contains(myNodeId)) {
+ } else if (event.roleInfo().backups().contains(localNodeId)) {
myNextRole = STANDBY;
} else {
myNextRole = NONE;
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
index 87b08d6..b5ae31b 100644
--- a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
@@ -34,6 +34,7 @@
import org.onosproject.store.trivial.impl.SimpleMastershipStore;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@@ -77,7 +78,7 @@
public void setRole() {
mgr.setRole(NID_OTHER, DEV_MASTER, MASTER);
assertEquals("wrong local role:", NONE, mgr.getLocalRole(DEV_MASTER));
- assertEquals("wrong obtained role:", STANDBY, mgr.requestRoleFor(DEV_MASTER));
+ assertEquals("wrong obtained role:", STANDBY, Futures.getUnchecked(mgr.requestRoleFor(DEV_MASTER)));
//set to master
mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
@@ -112,8 +113,8 @@
mgr.setRole(NID_OTHER, DEV_OTHER, MASTER);
//local should be master for one but standby for other
- assertEquals("wrong role:", MASTER, mgr.requestRoleFor(DEV_MASTER));
- assertEquals("wrong role:", STANDBY, mgr.requestRoleFor(DEV_OTHER));
+ assertEquals("wrong role:", MASTER, Futures.getUnchecked(mgr.requestRoleFor(DEV_MASTER)));
+ assertEquals("wrong role:", STANDBY, Futures.getUnchecked(mgr.requestRoleFor(DEV_OTHER)));
}
@Test
diff --git a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
index 20d55c4..d6bfda2 100644
--- a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
@@ -19,6 +19,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.junit.After;
import org.junit.Before;
@@ -305,8 +306,8 @@
}
@Override
- public MastershipRole requestRoleFor(DeviceId deviceId) {
- return MastershipRole.MASTER;
+ public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
+ return CompletableFuture.completedFuture(MastershipRole.MASTER);
}
@Override