Adding ability to handle orphaned devices when balancing mastership.
Change-Id: I01dd7a3074475d79504d516fbd3fd32ef18770ce
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
index 1a4fc5f..c744d61 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
@@ -57,6 +57,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -67,6 +68,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Lists.newArrayList;
+import static java.util.concurrent.CompletableFuture.allOf;
import static org.onlab.metrics.MetricsUtil.startTimer;
import static org.onlab.metrics.MetricsUtil.stopTimer;
import static org.onosproject.net.MastershipRole.MASTER;
@@ -76,13 +78,15 @@
import static org.slf4j.LoggerFactory.getLogger;
-
+/**
+ * Component providing the node-device mastership service.
+ */
@Component(immediate = true)
@Service
public class MastershipManager
- extends AbstractListenerManager<MastershipEvent, MastershipListener>
- implements MastershipService, MastershipAdminService, MastershipTermService,
- MetricsHelper {
+ extends AbstractListenerManager<MastershipEvent, MastershipListener>
+ implements MastershipService, MastershipAdminService, MastershipTermService,
+ MetricsHelper {
private static final String NODE_ID_NULL = "Node ID cannot be null";
private static final String DEVICE_ID_NULL = "Device ID cannot be null";
@@ -116,7 +120,7 @@
static final boolean DEFAULT_USE_REGION_FOR_BALANCE_ROLES = false;
@Property(name = "useRegionForBalanceRoles", boolValue = DEFAULT_USE_REGION_FOR_BALANCE_ROLES,
- label = "Use Regions for balancing roles")
+ label = "Use Regions for balancing roles")
protected boolean useRegionForBalanceRoles;
private static final boolean DEFAULT_REBALANCE_ROLES_ON_UPGRADE = true;
@@ -181,7 +185,7 @@
}
return eventFuture.thenAccept(this::post)
- .thenApply(v -> null);
+ .thenApply(v -> null);
}
@Override
@@ -196,8 +200,8 @@
public CompletableFuture<Void> relinquishMastership(DeviceId deviceId) {
checkPermission(CLUSTER_WRITE);
return store.relinquishRole(localNodeId, deviceId)
- .thenAccept(this::post)
- .thenApply(v -> null);
+ .thenAccept(this::post)
+ .thenApply(v -> null);
}
@Override
@@ -249,15 +253,20 @@
public void balanceRoles() {
List<ControllerNode> nodes = newArrayList(clusterService.getNodes());
Map<ControllerNode, Set<DeviceId>> controllerDevices = new HashMap<>();
+ Set<DeviceId> orphanedDevices = Sets.newHashSet();
int deviceCount = 0;
- // Create buckets reflecting current ownership.
+ // Create buckets reflecting current ownership; do this irrespective of
+ // whether the node is active.
for (ControllerNode node : nodes) {
+ Set<DeviceId> devicesOf = new HashSet<>(getDevicesOf(node.id()));
if (clusterService.getState(node.id()).isActive()) {
- Set<DeviceId> devicesOf = new HashSet<>(getDevicesOf(node.id()));
+ log.info("Node {} has {} devices.", node.id(), devicesOf.size());
deviceCount += devicesOf.size();
controllerDevices.put(node, devicesOf);
- log.info("Node {} has {} devices.", node.id(), devicesOf.size());
+ } else if (!devicesOf.isEmpty()) {
+ log.warn("Inactive node {} has {} orphaned devices.", node.id(), devicesOf.size());
+ orphanedDevices.addAll(getDevicesOf(node.id()));
}
}
@@ -265,11 +274,16 @@
return;
}
- // Now re-balance the buckets until they are roughly even.
- List<CompletableFuture<Void>> balanceBucketsFutures = balanceControllerNodes(controllerDevices, deviceCount);
+ List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newLinkedList();
- CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
- balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
+ // First re-balance the buckets until they are roughly even.
+ balanceControllerNodes(controllerDevices, deviceCount, balanceBucketsFutures);
+
+ // Then attempt to distribute any orphaned devices among the buckets.
+ distributeOrphanedDevices(controllerDevices, orphanedDevices, balanceBucketsFutures);
+
+ CompletableFuture<Void> balanceRolesFuture =
+ allOf(balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
Futures.getUnchecked(balanceRolesFuture);
}
@@ -278,32 +292,51 @@
* Balances the nodes specified in controllerDevices.
*
* @param controllerDevices controller nodes to devices map
- * @param deviceCount number of devices mastered by controller nodes
- * @return list of setRole futures for "moved" devices
+ * @param deviceCount number of devices mastered by controller nodes
+ * @param futures list of setRole futures for "moved" devices
*/
- private List<CompletableFuture<Void>> balanceControllerNodes(
- Map<ControllerNode, Set<DeviceId>> controllerDevices, int deviceCount) {
+ private void balanceControllerNodes(Map<ControllerNode, Set<DeviceId>> controllerDevices,
+ int deviceCount,
+ List<CompletableFuture<Void>> futures) {
// Now re-balance the buckets until they are roughly even.
- List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newLinkedList();
int rounds = controllerDevices.keySet().size();
for (int i = 0; i < rounds; i++) {
// Iterate over the buckets and find the smallest and the largest.
ControllerNode smallest = findBucket(true, controllerDevices);
ControllerNode largest = findBucket(false, controllerDevices);
- balanceBucketsFutures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount));
+ futures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount));
}
- return balanceBucketsFutures;
+ }
+
+ /**
+ * Uses the set of orphaned devices to even out the load among the controllers.
+ *
+ * @param controllerDevices controller nodes to devices map
+ * @param orphanedDevices set of orphaned devices without an active master
+ * @param futures list of completable future to track the progress of the balancing operation
+ */
+ private void distributeOrphanedDevices(Map<ControllerNode, Set<DeviceId>> controllerDevices,
+ Set<DeviceId> orphanedDevices,
+ List<CompletableFuture<Void>> futures) {
+ // Now re-distribute the orphaned devices into buckets until they are roughly even.
+ while (!orphanedDevices.isEmpty()) {
+ // Iterate over the buckets and find the smallest bucket.
+ ControllerNode smallest = findBucket(true, controllerDevices);
+ changeMastership(smallest, controllerDevices.get(smallest),
+ orphanedDevices, 1, futures);
+ }
}
/**
* Finds node with the minimum/maximum devices from a list of nodes.
*
- * @param min true: minimum, false: maximum
+ * @param min true: minimum, false: maximum
* @param controllerDevices controller nodes to devices map
* @return controller node with minimum/maximum devices
*/
+
private ControllerNode findBucket(boolean min,
- Map<ControllerNode, Set<DeviceId>> controllerDevices) {
+ Map<ControllerNode, Set<DeviceId>> controllerDevices) {
int xSize = min ? Integer.MAX_VALUE : -1;
ControllerNode xNode = null;
for (ControllerNode node : controllerDevices.keySet()) {
@@ -319,15 +352,15 @@
/**
* Balance the node buckets by moving devices from largest to smallest node.
*
- * @param smallest node that is master of the smallest number of devices
- * @param largest node that is master of the largest number of devices
+ * @param smallest node that is master of the smallest number of devices
+ * @param largest node that is master of the largest number of devices
* @param controllerDevices controller nodes to devices map
- * @param deviceCount number of devices mastered by controller nodes
+ * @param deviceCount number of devices mastered by controller nodes
* @return list of setRole futures for "moved" devices
*/
private CompletableFuture<Void> balanceBuckets(ControllerNode smallest, ControllerNode largest,
- Map<ControllerNode, Set<DeviceId>> controllerDevices,
- int deviceCount) {
+ Map<ControllerNode, Set<DeviceId>> controllerDevices,
+ int deviceCount) {
Collection<DeviceId> minBucket = controllerDevices.get(smallest);
Collection<DeviceId> maxBucket = controllerDevices.get(largest);
int bucketCount = controllerDevices.keySet().size();
@@ -340,20 +373,36 @@
if (delta > 0) {
log.info("Attempting to move {} nodes from {} to {}...", delta,
largest.id(), smallest.id());
-
- int i = 0;
- Iterator<DeviceId> it = maxBucket.iterator();
- while (it.hasNext() && i < delta) {
- DeviceId deviceId = it.next();
- log.info("Setting {} as the master for {}", smallest.id(), deviceId);
- setRoleFutures.add(setRole(smallest.id(), deviceId, MASTER));
- controllerDevices.get(smallest).add(deviceId);
- it.remove();
- i++;
- }
+ changeMastership(smallest, minBucket, maxBucket, delta, setRoleFutures);
}
- return CompletableFuture.allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()]));
+ return allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()]));
+ }
+
+ /**
+ * Changes mastership for the specified number of devices in the given source
+ * bucket to the specified node and ads those devices to the given target
+ * bucket. Also adds the futures for tracking the role reassignment progress.
+ *
+ * @param toNode target controller node
+ * @param toBucket target bucket
+ * @param fromBucket source bucket
+ * @param count number of devices
+ * @param futures futures for tracking operation progress
+ */
+ private void changeMastership(ControllerNode toNode, Collection<DeviceId> toBucket,
+ Collection<DeviceId> fromBucket, int count,
+ List<CompletableFuture<Void>> futures) {
+ int i = 0;
+ Iterator<DeviceId> it = fromBucket.iterator();
+ while (it.hasNext() && i < count) {
+ DeviceId deviceId = it.next();
+ log.info("Setting {} as the master for {}", toNode.id(), deviceId);
+ futures.add(setRole(toNode.id(), deviceId, MASTER));
+ toBucket.add(deviceId);
+ it.remove();
+ i++;
+ }
}
/**
@@ -368,7 +417,7 @@
return false; // no balancing was done using regions.
}
- // handle nodes belonging to regions
+ // Handle nodes belonging to regions
Set<ControllerNode> nodesInRegions = Sets.newHashSet();
for (Region region : regions) {
Map<ControllerNode, Set<DeviceId>> activeRegionControllers =
@@ -376,20 +425,20 @@
nodesInRegions.addAll(activeRegionControllers.keySet());
}
- // handle nodes not belonging to any region
+ // Handle nodes not belonging to any region
Set<ControllerNode> nodesNotInRegions = Sets.difference(allControllerDevices.keySet(), nodesInRegions);
if (!nodesNotInRegions.isEmpty()) {
int deviceCount = 0;
Map<ControllerNode, Set<DeviceId>> controllerDevicesNotInRegions = new HashMap<>();
- for (ControllerNode controllerNode: nodesNotInRegions) {
+ for (ControllerNode controllerNode : nodesNotInRegions) {
controllerDevicesNotInRegions.put(controllerNode, allControllerDevices.get(controllerNode));
deviceCount += allControllerDevices.get(controllerNode).size();
}
// Now re-balance the buckets until they are roughly even.
- List<CompletableFuture<Void>> balanceBucketsFutures =
- balanceControllerNodes(controllerDevicesNotInRegions, deviceCount);
+ List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newArrayList();
+ balanceControllerNodes(controllerDevicesNotInRegions, deviceCount, balanceBucketsFutures);
- CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
+ CompletableFuture<Void> balanceRolesFuture = allOf(
balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
Futures.getUnchecked(balanceRolesFuture);
@@ -400,14 +449,15 @@
/**
* Balances the nodes in specified region.
*
- * @param region region in which nodes are to be balanced
+ * @param region region in which nodes are to be balanced
* @param allControllerDevices controller nodes to devices map
* @return controller nodes that were balanced
*/
- private Map<ControllerNode, Set<DeviceId>> balanceRolesInRegion(Region region,
- Map<ControllerNode, Set<DeviceId>> allControllerDevices) {
+ private Map<ControllerNode, Set<DeviceId>>
+ balanceRolesInRegion(Region region,
+ Map<ControllerNode, Set<DeviceId>> allControllerDevices) {
- // retrieve all devices associated with specified region
+ // Retrieve all devices associated with specified region
Set<DeviceId> devicesInRegion = regionService.getRegionDevices(region.id());
log.info("Region {} has {} devices.", region.id(), devicesInRegion.size());
if (devicesInRegion.isEmpty()) {
@@ -421,24 +471,22 @@
return new HashMap<>(); // for now just leave devices alone
}
- // get the region's preferred set of masters
+ // Get the region's preferred set of masters
Set<DeviceId> devicesInMasters = Sets.newHashSet();
Map<ControllerNode, Set<DeviceId>> regionalControllerDevices =
getRegionsPreferredMasters(region, devicesInMasters, allControllerDevices);
// Now re-balance the buckets until they are roughly even.
- List<CompletableFuture<Void>> balanceBucketsFutures =
- balanceControllerNodes(regionalControllerDevices, devicesInMasters.size());
+ List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newArrayList();
+ balanceControllerNodes(regionalControllerDevices, devicesInMasters.size(), balanceBucketsFutures);
- // handle devices that are not currently mastered by the master node set
+ // Handle devices that are not currently mastered by the master node set
Set<DeviceId> devicesNotMasteredWithControllers = Sets.difference(devicesInRegion, devicesInMasters);
if (!devicesNotMasteredWithControllers.isEmpty()) {
// active controllers in master node set are already balanced, just
// assign device mastership in sequence
List<ControllerNode> sorted = new ArrayList<>(regionalControllerDevices.keySet());
- Collections.sort(sorted, (o1, o2) ->
- ((Integer) (regionalControllerDevices.get(o1)).size())
- .compareTo((Integer) (regionalControllerDevices.get(o2)).size()));
+ Collections.sort(sorted, Comparator.comparingInt(o -> (regionalControllerDevices.get(o)).size()));
int deviceIndex = 0;
for (DeviceId deviceId : devicesNotMasteredWithControllers) {
ControllerNode cnode = sorted.get(deviceIndex % sorted.size());
@@ -448,12 +496,12 @@
}
}
- CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
- balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
+ CompletableFuture<Void> balanceRolesFuture =
+ allOf(balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
Futures.getUnchecked(balanceRolesFuture);
- // update the map before returning
+ // Update the map before returning
regionalControllerDevices.forEach((controllerNode, deviceIds) -> {
regionalControllerDevices.put(controllerNode, new HashSet<>(getDevicesOf(controllerNode.id())));
});
@@ -465,17 +513,18 @@
* Get region's preferred set of master nodes - the first master node set that has at
* least one active node.
*
- * @param region region for which preferred set of master nodes is requested
- * @param devicesInMasters device set to track devices in preferred set of master nodes
+ * @param region region for which preferred set of master nodes is requested
+ * @param devicesInMasters device set to track devices in preferred set of master nodes
* @param allControllerDevices controller nodes to devices map
* @return region's preferred master nodes (and devices that use them as masters)
*/
- private Map<ControllerNode, Set<DeviceId>> getRegionsPreferredMasters(Region region,
- Set<DeviceId> devicesInMasters,
- Map<ControllerNode, Set<DeviceId>> allControllerDevices) {
+ private Map<ControllerNode, Set<DeviceId>>
+ getRegionsPreferredMasters(Region region,
+ Set<DeviceId> devicesInMasters,
+ Map<ControllerNode, Set<DeviceId>> allControllerDevices) {
Map<ControllerNode, Set<DeviceId>> regionalControllerDevices = new HashMap<>();
int listIndex = 0;
- for (Set<NodeId> masterSet: region.masters()) {
+ for (Set<NodeId> masterSet : region.masters()) {
log.info("Region {} masters set {} has {} nodes.",
region.id(), listIndex, masterSet.size());
if (masterSet.isEmpty()) { // nothing on this level
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
index 0f75445..50fc377 100644
--- a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
@@ -78,6 +78,9 @@
private static final DeviceId DID1 = DeviceId.deviceId("foo:d1");
private static final DeviceId DID2 = DeviceId.deviceId("foo:d2");
private static final DeviceId DID3 = DeviceId.deviceId("foo:d3");
+ private static final DeviceId DID4 = DeviceId.deviceId("foo:d4");
+ private static final DeviceId DID5 = DeviceId.deviceId("foo:d5");
+ private static final DeviceId DID6 = DeviceId.deviceId("foo:d6");
private static final NodeId NID1 = NodeId.nodeId("n1");
private static final NodeId NID2 = NodeId.nodeId("n2");
private static final NodeId NID3 = NodeId.nodeId("n3");
@@ -220,6 +223,36 @@
}
@Test
+ public void balanceWithOrphans() {
+ // Setup cluster of three nodes
+ testClusterService.put(CNODE1, ControllerNode.State.ACTIVE);
+ testClusterService.put(CNODE2, ControllerNode.State.INACTIVE);
+ testClusterService.put(CNODE3, ControllerNode.State.ACTIVE);
+
+ // Pre-assign some devices to each of the node
+ // Leave some devices as orphans assigned to a downed node
+ assignRoles(NID1, ImmutableSet.of(DID1, DID2, DID3, DID4));
+ assignRoles(NID2, ImmutableSet.of(DID5));
+ assignRoles(NID3, ImmutableSet.of(DID6));
+
+ // Trigger load balancing
+ mgr.balanceRoles();
+
+ // Make sure we have a balanced load
+ // Make sure that we no longer have any orphans
+ assertEquals("incorrect balance for node 1", 3, mgr.getDevicesOf(NID1).size());
+ assertEquals("incorrect balance for node 2", 0, mgr.getDevicesOf(NID2).size());
+ assertEquals("incorrect balance for node 3", 3, mgr.getDevicesOf(NID3).size());
+ }
+
+ private void assignRoles(NodeId nid, Set<DeviceId> deviceIds) {
+ Set<DeviceId> all = ImmutableSet.of(DID1, DID2, DID3, DID4, DID5, DID6);
+ for (DeviceId did : all) {
+ mgr.setRole(nid, did, deviceIds.contains(did) ? MASTER : STANDBY);
+ }
+ }
+
+ @Test
public void balanceWithRegion1() {
//set up region - 2 sets of masters with 1 node in each
Set<NodeId> masterSet1 = ImmutableSet.of(NID1);