LeadershipService and MastershipService/Store APIs return CompletableFutures so that callers can (optionally) chain together dependent operations
Change-Id: Ia00fcb7d98fbfce897527f67ea9690abf6fe846a
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
index 1f3b360..ee60cc7 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
@@ -17,6 +17,8 @@
import com.codahale.metrics.Timer;
import com.codahale.metrics.Timer.Context;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -109,7 +111,7 @@
}
@Override
- public void setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
+ public CompletableFuture<Void> setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
checkNotNull(nodeId, NODE_ID_NULL);
checkNotNull(deviceId, DEVICE_ID_NULL);
checkNotNull(role, ROLE_NULL);
@@ -128,14 +130,14 @@
break;
default:
log.info("Unknown role; ignoring");
- return;
+ return CompletableFuture.completedFuture(null);
}
- eventFuture.whenComplete((event, error) -> {
+ return eventFuture.whenComplete((event, error) -> {
if (event != null) {
post(event);
}
- });
+ }).thenApply(v -> null);
}
@Override
@@ -155,14 +157,11 @@
}
@Override
- public MastershipRole requestRoleFor(DeviceId deviceId) {
+ public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
final Context timer = startTimer(requestRoleTimer);
- try {
- return store.requestRole(deviceId);
- } finally {
- stopTimer(timer);
- }
+ return store.requestRole(deviceId).whenComplete((result, error) -> stopTimer(timer));
+
}
@Override
@@ -222,13 +221,18 @@
}
// Now re-balance the buckets until they are roughly even.
+ List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newLinkedList();
int rounds = controllerDevices.keySet().size();
for (int i = 0; i < rounds; i++) {
// Iterate over the buckets and find the smallest and the largest.
ControllerNode smallest = findBucket(true, controllerDevices);
ControllerNode largest = findBucket(false, controllerDevices);
- balanceBuckets(smallest, largest, controllerDevices, deviceCount);
+ balanceBucketsFutures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount));
}
+ CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
+ balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
+
+ Futures.getUnchecked(balanceRolesFuture);
}
private ControllerNode findBucket(boolean min,
@@ -245,7 +249,7 @@
return xNode;
}
- private void balanceBuckets(ControllerNode smallest, ControllerNode largest,
+ private CompletableFuture<Void> balanceBuckets(ControllerNode smallest, ControllerNode largest,
Map<ControllerNode, Set<DeviceId>> controllerDevices,
int deviceCount) {
Collection<DeviceId> minBucket = controllerDevices.get(smallest);
@@ -255,6 +259,8 @@
int delta = (maxBucket.size() - minBucket.size()) / 2;
delta = Math.min(deviceCount / bucketCount, delta);
+ List<CompletableFuture<Void>> setRoleFutures = Lists.newLinkedList();
+
if (delta > 0) {
log.info("Attempting to move {} nodes from {} to {}...", delta,
largest.id(), smallest.id());
@@ -264,12 +270,14 @@
while (it.hasNext() && i < delta) {
DeviceId deviceId = it.next();
log.info("Setting {} as the master for {}", smallest.id(), deviceId);
- setRole(smallest.id(), deviceId, MASTER);
+ setRoleFutures.add(setRole(smallest.id(), deviceId, MASTER));
controllerDevices.get(smallest).add(deviceId);
it.remove();
i++;
}
}
+
+ return CompletableFuture.allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()]));
}