Adding ability to handle orphaned devices when balancing mastership.

Change-Id: I01dd7a3074475d79504d516fbd3fd32ef18770ce
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
index 1a4fc5f..c744d61 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
@@ -57,6 +57,7 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -67,6 +68,7 @@
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.collect.Lists.newArrayList;
+import static java.util.concurrent.CompletableFuture.allOf;
 import static org.onlab.metrics.MetricsUtil.startTimer;
 import static org.onlab.metrics.MetricsUtil.stopTimer;
 import static org.onosproject.net.MastershipRole.MASTER;
@@ -76,13 +78,15 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 
-
+/**
+ * Component providing the node-device mastership service.
+ */
 @Component(immediate = true)
 @Service
 public class MastershipManager
-    extends AbstractListenerManager<MastershipEvent, MastershipListener>
-    implements MastershipService, MastershipAdminService, MastershipTermService,
-               MetricsHelper {
+        extends AbstractListenerManager<MastershipEvent, MastershipListener>
+        implements MastershipService, MastershipAdminService, MastershipTermService,
+        MetricsHelper {
 
     private static final String NODE_ID_NULL = "Node ID cannot be null";
     private static final String DEVICE_ID_NULL = "Device ID cannot be null";
@@ -116,7 +120,7 @@
 
     static final boolean DEFAULT_USE_REGION_FOR_BALANCE_ROLES = false;
     @Property(name = "useRegionForBalanceRoles", boolValue = DEFAULT_USE_REGION_FOR_BALANCE_ROLES,
-              label = "Use Regions for balancing roles")
+            label = "Use Regions for balancing roles")
     protected boolean useRegionForBalanceRoles;
 
     private static final boolean DEFAULT_REBALANCE_ROLES_ON_UPGRADE = true;
@@ -181,7 +185,7 @@
         }
 
         return eventFuture.thenAccept(this::post)
-                          .thenApply(v -> null);
+                .thenApply(v -> null);
     }
 
     @Override
@@ -196,8 +200,8 @@
     public CompletableFuture<Void> relinquishMastership(DeviceId deviceId) {
         checkPermission(CLUSTER_WRITE);
         return store.relinquishRole(localNodeId, deviceId)
-                    .thenAccept(this::post)
-                    .thenApply(v -> null);
+                .thenAccept(this::post)
+                .thenApply(v -> null);
     }
 
     @Override
@@ -249,15 +253,20 @@
     public void balanceRoles() {
         List<ControllerNode> nodes = newArrayList(clusterService.getNodes());
         Map<ControllerNode, Set<DeviceId>> controllerDevices = new HashMap<>();
+        Set<DeviceId> orphanedDevices = Sets.newHashSet();
         int deviceCount = 0;
 
-        // Create buckets reflecting current ownership.
+        // Create buckets reflecting current ownership; do this irrespective of
+        // whether the node is active.
         for (ControllerNode node : nodes) {
+            Set<DeviceId> devicesOf = new HashSet<>(getDevicesOf(node.id()));
             if (clusterService.getState(node.id()).isActive()) {
-                Set<DeviceId> devicesOf = new HashSet<>(getDevicesOf(node.id()));
+                log.info("Node {} has {} devices.", node.id(), devicesOf.size());
                 deviceCount += devicesOf.size();
                 controllerDevices.put(node, devicesOf);
-                log.info("Node {} has {} devices.", node.id(), devicesOf.size());
+            } else if (!devicesOf.isEmpty()) {
+                log.warn("Inactive node {} has {} orphaned devices.", node.id(), devicesOf.size());
+                orphanedDevices.addAll(getDevicesOf(node.id()));
             }
         }
 
@@ -265,11 +274,16 @@
             return;
         }
 
-        // Now re-balance the buckets until they are roughly even.
-        List<CompletableFuture<Void>> balanceBucketsFutures = balanceControllerNodes(controllerDevices, deviceCount);
+        List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newLinkedList();
 
-        CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
-                balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
+        // First re-balance the buckets until they are roughly even.
+        balanceControllerNodes(controllerDevices, deviceCount, balanceBucketsFutures);
+
+        // Then attempt to distribute any orphaned devices among the buckets.
+        distributeOrphanedDevices(controllerDevices, orphanedDevices, balanceBucketsFutures);
+
+        CompletableFuture<Void> balanceRolesFuture =
+                allOf(balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
 
         Futures.getUnchecked(balanceRolesFuture);
     }
@@ -278,32 +292,51 @@
      * Balances the nodes specified in controllerDevices.
      *
      * @param controllerDevices controller nodes to devices map
-     * @param deviceCount number of devices mastered by controller nodes
-     * @return list of setRole futures for "moved" devices
+     * @param deviceCount       number of devices mastered by controller nodes
+     * @param futures           list of setRole futures for "moved" devices
      */
-    private List<CompletableFuture<Void>> balanceControllerNodes(
-            Map<ControllerNode, Set<DeviceId>> controllerDevices, int deviceCount) {
+    private void balanceControllerNodes(Map<ControllerNode, Set<DeviceId>> controllerDevices,
+                                        int deviceCount,
+                                        List<CompletableFuture<Void>> futures) {
         // Now re-balance the buckets until they are roughly even.
-        List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newLinkedList();
         int rounds = controllerDevices.keySet().size();
         for (int i = 0; i < rounds; i++) {
             // Iterate over the buckets and find the smallest and the largest.
             ControllerNode smallest = findBucket(true, controllerDevices);
             ControllerNode largest = findBucket(false, controllerDevices);
-            balanceBucketsFutures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount));
+            futures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount));
         }
-        return balanceBucketsFutures;
+    }
+
+    /**
+     * Uses the set of orphaned devices to even out the load among the controllers.
+     *
+     * @param controllerDevices controller nodes to devices map
+     * @param orphanedDevices   set of orphaned devices without an active master
+     * @param futures           list of completable future to track the progress of the balancing operation
+     */
+    private void distributeOrphanedDevices(Map<ControllerNode, Set<DeviceId>> controllerDevices,
+                                           Set<DeviceId> orphanedDevices,
+                                           List<CompletableFuture<Void>> futures) {
+        // Now re-distribute the orphaned devices into buckets until they are roughly even.
+        while (!orphanedDevices.isEmpty()) {
+            // Iterate over the buckets and find the smallest bucket.
+            ControllerNode smallest = findBucket(true, controllerDevices);
+            changeMastership(smallest, controllerDevices.get(smallest),
+                             orphanedDevices, 1, futures);
+        }
     }
 
     /**
      * Finds node with the minimum/maximum devices from a list of nodes.
      *
-     * @param min true: minimum, false: maximum
+     * @param min               true: minimum, false: maximum
      * @param controllerDevices controller nodes to devices map
      * @return controller node with minimum/maximum devices
      */
+
     private ControllerNode findBucket(boolean min,
-                                      Map<ControllerNode, Set<DeviceId>>  controllerDevices) {
+                                      Map<ControllerNode, Set<DeviceId>> controllerDevices) {
         int xSize = min ? Integer.MAX_VALUE : -1;
         ControllerNode xNode = null;
         for (ControllerNode node : controllerDevices.keySet()) {
@@ -319,15 +352,15 @@
     /**
      * Balance the node buckets by moving devices from largest to smallest node.
      *
-     * @param smallest node that is master of the smallest number of devices
-     * @param largest node that is master of the largest number of devices
+     * @param smallest          node that is master of the smallest number of devices
+     * @param largest           node that is master of the largest number of devices
      * @param controllerDevices controller nodes to devices map
-     * @param deviceCount number of devices mastered by controller nodes
+     * @param deviceCount       number of devices mastered by controller nodes
      * @return list of setRole futures for "moved" devices
      */
     private CompletableFuture<Void> balanceBuckets(ControllerNode smallest, ControllerNode largest,
-                                Map<ControllerNode, Set<DeviceId>>  controllerDevices,
-                                int deviceCount) {
+                                                   Map<ControllerNode, Set<DeviceId>> controllerDevices,
+                                                   int deviceCount) {
         Collection<DeviceId> minBucket = controllerDevices.get(smallest);
         Collection<DeviceId> maxBucket = controllerDevices.get(largest);
         int bucketCount = controllerDevices.keySet().size();
@@ -340,20 +373,36 @@
         if (delta > 0) {
             log.info("Attempting to move {} nodes from {} to {}...", delta,
                      largest.id(), smallest.id());
-
-            int i = 0;
-            Iterator<DeviceId> it = maxBucket.iterator();
-            while (it.hasNext() && i < delta) {
-                DeviceId deviceId = it.next();
-                log.info("Setting {} as the master for {}", smallest.id(), deviceId);
-                setRoleFutures.add(setRole(smallest.id(), deviceId, MASTER));
-                controllerDevices.get(smallest).add(deviceId);
-                it.remove();
-                i++;
-            }
+            changeMastership(smallest, minBucket, maxBucket, delta, setRoleFutures);
         }
 
-        return CompletableFuture.allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()]));
+        return allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()]));
+    }
+
+    /**
+     * Changes mastership for the specified number of devices in the given source
+     * bucket to the specified node and ads those devices to the given target
+     * bucket. Also adds the futures for tracking the role reassignment progress.
+     *
+     * @param toNode     target controller node
+     * @param toBucket   target bucket
+     * @param fromBucket source bucket
+     * @param count      number of devices
+     * @param futures    futures for tracking operation progress
+     */
+    private void changeMastership(ControllerNode toNode, Collection<DeviceId> toBucket,
+                                  Collection<DeviceId> fromBucket, int count,
+                                  List<CompletableFuture<Void>> futures) {
+        int i = 0;
+        Iterator<DeviceId> it = fromBucket.iterator();
+        while (it.hasNext() && i < count) {
+            DeviceId deviceId = it.next();
+            log.info("Setting {} as the master for {}", toNode.id(), deviceId);
+            futures.add(setRole(toNode.id(), deviceId, MASTER));
+            toBucket.add(deviceId);
+            it.remove();
+            i++;
+        }
     }
 
     /**
@@ -368,7 +417,7 @@
             return false; // no balancing was done using regions.
         }
 
-        // handle nodes belonging to regions
+        // Handle nodes belonging to regions
         Set<ControllerNode> nodesInRegions = Sets.newHashSet();
         for (Region region : regions) {
             Map<ControllerNode, Set<DeviceId>> activeRegionControllers =
@@ -376,20 +425,20 @@
             nodesInRegions.addAll(activeRegionControllers.keySet());
         }
 
-        // handle nodes not belonging to any region
+        // Handle nodes not belonging to any region
         Set<ControllerNode> nodesNotInRegions = Sets.difference(allControllerDevices.keySet(), nodesInRegions);
         if (!nodesNotInRegions.isEmpty()) {
             int deviceCount = 0;
             Map<ControllerNode, Set<DeviceId>> controllerDevicesNotInRegions = new HashMap<>();
-            for (ControllerNode controllerNode: nodesNotInRegions) {
+            for (ControllerNode controllerNode : nodesNotInRegions) {
                 controllerDevicesNotInRegions.put(controllerNode, allControllerDevices.get(controllerNode));
                 deviceCount += allControllerDevices.get(controllerNode).size();
             }
             // Now re-balance the buckets until they are roughly even.
-            List<CompletableFuture<Void>> balanceBucketsFutures =
-                    balanceControllerNodes(controllerDevicesNotInRegions, deviceCount);
+            List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newArrayList();
+            balanceControllerNodes(controllerDevicesNotInRegions, deviceCount, balanceBucketsFutures);
 
-            CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
+            CompletableFuture<Void> balanceRolesFuture = allOf(
                     balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
 
             Futures.getUnchecked(balanceRolesFuture);
@@ -400,14 +449,15 @@
     /**
      * Balances the nodes in specified region.
      *
-     * @param region region in which nodes are to be balanced
+     * @param region               region in which nodes are to be balanced
      * @param allControllerDevices controller nodes to devices map
      * @return controller nodes that were balanced
      */
-    private Map<ControllerNode, Set<DeviceId>> balanceRolesInRegion(Region region,
-         Map<ControllerNode, Set<DeviceId>> allControllerDevices) {
+    private Map<ControllerNode, Set<DeviceId>>
+            balanceRolesInRegion(Region region,
+                                 Map<ControllerNode, Set<DeviceId>> allControllerDevices) {
 
-        // retrieve all devices associated with specified region
+        // Retrieve all devices associated with specified region
         Set<DeviceId> devicesInRegion = regionService.getRegionDevices(region.id());
         log.info("Region {} has {} devices.", region.id(), devicesInRegion.size());
         if (devicesInRegion.isEmpty()) {
@@ -421,24 +471,22 @@
             return new HashMap<>(); // for now just leave devices alone
         }
 
-        // get the region's preferred set of masters
+        // Get the region's preferred set of masters
         Set<DeviceId> devicesInMasters = Sets.newHashSet();
         Map<ControllerNode, Set<DeviceId>> regionalControllerDevices =
                 getRegionsPreferredMasters(region, devicesInMasters, allControllerDevices);
 
         // Now re-balance the buckets until they are roughly even.
-        List<CompletableFuture<Void>> balanceBucketsFutures =
-                balanceControllerNodes(regionalControllerDevices, devicesInMasters.size());
+        List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newArrayList();
+        balanceControllerNodes(regionalControllerDevices, devicesInMasters.size(), balanceBucketsFutures);
 
-        // handle devices that are not currently mastered by the master node set
+        // Handle devices that are not currently mastered by the master node set
         Set<DeviceId> devicesNotMasteredWithControllers = Sets.difference(devicesInRegion, devicesInMasters);
         if (!devicesNotMasteredWithControllers.isEmpty()) {
             // active controllers in master node set are already balanced, just
             // assign device mastership in sequence
             List<ControllerNode> sorted = new ArrayList<>(regionalControllerDevices.keySet());
-            Collections.sort(sorted, (o1, o2) ->
-                    ((Integer) (regionalControllerDevices.get(o1)).size())
-                            .compareTo((Integer) (regionalControllerDevices.get(o2)).size()));
+            Collections.sort(sorted, Comparator.comparingInt(o -> (regionalControllerDevices.get(o)).size()));
             int deviceIndex = 0;
             for (DeviceId deviceId : devicesNotMasteredWithControllers) {
                 ControllerNode cnode = sorted.get(deviceIndex % sorted.size());
@@ -448,12 +496,12 @@
             }
         }
 
-        CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
-                balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
+        CompletableFuture<Void> balanceRolesFuture =
+                allOf(balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
 
         Futures.getUnchecked(balanceRolesFuture);
 
-        // update the map before returning
+        // Update the map before returning
         regionalControllerDevices.forEach((controllerNode, deviceIds) -> {
             regionalControllerDevices.put(controllerNode, new HashSet<>(getDevicesOf(controllerNode.id())));
         });
@@ -465,17 +513,18 @@
      * Get region's preferred set of master nodes - the first master node set that has at
      * least one active node.
      *
-     * @param region region for which preferred set of master nodes is requested
-     * @param devicesInMasters device set to track devices in preferred set of master nodes
+     * @param region               region for which preferred set of master nodes is requested
+     * @param devicesInMasters     device set to track devices in preferred set of master nodes
      * @param allControllerDevices controller nodes to devices map
      * @return region's preferred master nodes (and devices that use them as masters)
      */
-    private Map<ControllerNode, Set<DeviceId>> getRegionsPreferredMasters(Region region,
-            Set<DeviceId> devicesInMasters,
-            Map<ControllerNode, Set<DeviceId>> allControllerDevices) {
+    private Map<ControllerNode, Set<DeviceId>>
+            getRegionsPreferredMasters(Region region,
+                                       Set<DeviceId> devicesInMasters,
+                                       Map<ControllerNode, Set<DeviceId>> allControllerDevices) {
         Map<ControllerNode, Set<DeviceId>> regionalControllerDevices = new HashMap<>();
         int listIndex = 0;
-        for (Set<NodeId> masterSet: region.masters()) {
+        for (Set<NodeId> masterSet : region.masters()) {
             log.info("Region {} masters set {} has {} nodes.",
                      region.id(), listIndex, masterSet.size());
             if (masterSet.isEmpty()) { // nothing on this level
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
index 0f75445..50fc377 100644
--- a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
@@ -78,6 +78,9 @@
     private static final DeviceId DID1 = DeviceId.deviceId("foo:d1");
     private static final DeviceId DID2 = DeviceId.deviceId("foo:d2");
     private static final DeviceId DID3 = DeviceId.deviceId("foo:d3");
+    private static final DeviceId DID4 = DeviceId.deviceId("foo:d4");
+    private static final DeviceId DID5 = DeviceId.deviceId("foo:d5");
+    private static final DeviceId DID6 = DeviceId.deviceId("foo:d6");
     private static final NodeId NID1 = NodeId.nodeId("n1");
     private static final NodeId NID2 = NodeId.nodeId("n2");
     private static final NodeId NID3 = NodeId.nodeId("n3");
@@ -220,6 +223,36 @@
     }
 
     @Test
+    public void balanceWithOrphans() {
+        // Setup cluster of three nodes
+        testClusterService.put(CNODE1, ControllerNode.State.ACTIVE);
+        testClusterService.put(CNODE2, ControllerNode.State.INACTIVE);
+        testClusterService.put(CNODE3, ControllerNode.State.ACTIVE);
+
+        // Pre-assign some devices to each of the node
+        // Leave some devices as orphans assigned to a downed node
+        assignRoles(NID1, ImmutableSet.of(DID1, DID2, DID3, DID4));
+        assignRoles(NID2, ImmutableSet.of(DID5));
+        assignRoles(NID3, ImmutableSet.of(DID6));
+
+        // Trigger load balancing
+        mgr.balanceRoles();
+
+        // Make sure we have a balanced load
+        // Make sure that we no longer have any orphans
+        assertEquals("incorrect balance for node 1", 3, mgr.getDevicesOf(NID1).size());
+        assertEquals("incorrect balance for node 2", 0, mgr.getDevicesOf(NID2).size());
+        assertEquals("incorrect balance for node 3", 3, mgr.getDevicesOf(NID3).size());
+    }
+
+    private void assignRoles(NodeId nid, Set<DeviceId> deviceIds) {
+        Set<DeviceId> all = ImmutableSet.of(DID1, DID2, DID3, DID4, DID5, DID6);
+        for (DeviceId did : all) {
+            mgr.setRole(nid, did, deviceIds.contains(did) ? MASTER : STANDBY);
+        }
+    }
+
+    @Test
     public void balanceWithRegion1() {
         //set up region - 2 sets of masters with 1 node in each
         Set<NodeId> masterSet1 = ImmutableSet.of(NID1);