LeadershipService and MastershipService/Store APIs return CompletableFutures so that callers can (optionally) chain together dependent operations

Change-Id: Ia00fcb7d98fbfce897527f67ea9690abf6fe846a
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
index 1f3b360..ee60cc7 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
@@ -17,6 +17,8 @@
 
 import com.codahale.metrics.Timer;
 import com.codahale.metrics.Timer.Context;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -109,7 +111,7 @@
     }
 
     @Override
-    public void setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
+    public CompletableFuture<Void> setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
         checkNotNull(nodeId, NODE_ID_NULL);
         checkNotNull(deviceId, DEVICE_ID_NULL);
         checkNotNull(role, ROLE_NULL);
@@ -128,14 +130,14 @@
                 break;
             default:
                 log.info("Unknown role; ignoring");
-                return;
+                return CompletableFuture.completedFuture(null);
         }
 
-        eventFuture.whenComplete((event, error) -> {
+        return eventFuture.whenComplete((event, error) -> {
             if (event != null) {
                 post(event);
             }
-        });
+        }).thenApply(v -> null);
     }
 
     @Override
@@ -155,14 +157,11 @@
     }
 
     @Override
-    public MastershipRole requestRoleFor(DeviceId deviceId) {
+    public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
         checkNotNull(deviceId, DEVICE_ID_NULL);
         final Context timer = startTimer(requestRoleTimer);
-        try {
-            return store.requestRole(deviceId);
-        } finally {
-            stopTimer(timer);
-        }
+        return store.requestRole(deviceId).whenComplete((result, error) -> stopTimer(timer));
+
     }
 
     @Override
@@ -222,13 +221,18 @@
         }
 
         // Now re-balance the buckets until they are roughly even.
+        List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newLinkedList();
         int rounds = controllerDevices.keySet().size();
         for (int i = 0; i < rounds; i++) {
             // Iterate over the buckets and find the smallest and the largest.
             ControllerNode smallest = findBucket(true, controllerDevices);
             ControllerNode largest = findBucket(false, controllerDevices);
-            balanceBuckets(smallest, largest, controllerDevices, deviceCount);
+            balanceBucketsFutures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount));
         }
+        CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
+                balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
+
+        Futures.getUnchecked(balanceRolesFuture);
     }
 
     private ControllerNode findBucket(boolean min,
@@ -245,7 +249,7 @@
         return xNode;
     }
 
-    private void balanceBuckets(ControllerNode smallest, ControllerNode largest,
+    private CompletableFuture<Void> balanceBuckets(ControllerNode smallest, ControllerNode largest,
                                 Map<ControllerNode, Set<DeviceId>>  controllerDevices,
                                 int deviceCount) {
         Collection<DeviceId> minBucket = controllerDevices.get(smallest);
@@ -255,6 +259,8 @@
         int delta = (maxBucket.size() - minBucket.size()) / 2;
         delta = Math.min(deviceCount / bucketCount, delta);
 
+        List<CompletableFuture<Void>> setRoleFutures = Lists.newLinkedList();
+
         if (delta > 0) {
             log.info("Attempting to move {} nodes from {} to {}...", delta,
                      largest.id(), smallest.id());
@@ -264,12 +270,14 @@
             while (it.hasNext() && i < delta) {
                 DeviceId deviceId = it.next();
                 log.info("Setting {} as the master for {}", smallest.id(), deviceId);
-                setRole(smallest.id(), deviceId, MASTER);
+                setRoleFutures.add(setRole(smallest.id(), deviceId, MASTER));
                 controllerDevices.get(smallest).add(deviceId);
                 it.remove();
                 i++;
             }
         }
+
+        return CompletableFuture.allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()]));
     }
 
 
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 661ddca..d12664b 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -16,6 +16,7 @@
 package org.onosproject.net.device.impl;
 
 import com.google.common.collect.Lists;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -57,6 +58,7 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -79,7 +81,6 @@
     private static final String PORT_NUMBER_NULL = "Port number cannot be null";
     private static final String DEVICE_DESCRIPTION_NULL = "Device description cannot be null";
     private static final String PORT_DESCRIPTION_NULL = "Port description cannot be null";
-    private static final String ROLE_NULL = "Role cannot be null";
 
     private final Logger log = getLogger(getClass());
 
@@ -89,6 +90,7 @@
     private final DeviceStoreDelegate delegate = new InternalStoreDelegate();
 
     private final MastershipListener mastershipListener = new InternalMastershipListener();
+    private NodeId localNodeId;
 
     private ScheduledExecutorService backgroundService;
 
@@ -113,6 +115,7 @@
     @Activate
     public void activate() {
         backgroundService = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "manager-background"));
+        localNodeId = clusterService.getLocalNode().id();
 
         store.setDelegate(delegate);
         eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
@@ -302,12 +305,10 @@
             checkValidity();
 
             log.info("Device {} connected", deviceId);
-            final NodeId myNodeId = clusterService.getLocalNode().id();
-
             // check my Role
             mastershipService.requestRoleFor(deviceId);
             final MastershipTerm term = termService.getMastershipTerm(deviceId);
-            if (term == null || !myNodeId.equals(term.master())) {
+            if (term == null || !localNodeId.equals(term.master())) {
                 log.info("Role of this node is STANDBY for {}", deviceId);
                 // TODO: Do we need to explicitly tell the Provider that
                 // this instance is not the MASTER
@@ -337,7 +338,6 @@
 
             log.info("Device {} disconnected from this node", deviceId);
 
-            DeviceEvent event = null;
             List<Port> ports = store.getPorts(deviceId);
             List<PortDescription> descs = Lists.newArrayList();
             ports.forEach(port ->
@@ -346,7 +346,7 @@
                                                      port.portSpeed())));
             store.updatePorts(this.provider().id(), deviceId, descs);
             try {
-                event = store.markOffline(deviceId);
+                post(store.markOffline(deviceId));
             } catch (IllegalStateException e) {
                 log.warn("Failed to mark {} offline", deviceId);
                 // only the MASTER should be marking off-line in normal cases,
@@ -360,26 +360,21 @@
 
                 // FIXME: Store semantics leaking out as IllegalStateException.
                 //  Consider revising store API to handle this scenario.
-
-                MastershipRole role = mastershipService.requestRoleFor(deviceId);
-                MastershipTerm term = termService.getMastershipTerm(deviceId);
-                final NodeId myNodeId = clusterService.getLocalNode().id();
-                // TODO: Move this type of check inside device clock manager, etc.
-                if (term != null && myNodeId.equals(term.master())) {
-                    log.info("Retry marking {} offline", deviceId);
-                    deviceClockProviderService.setMastershipTerm(deviceId, term);
-                    event = store.markOffline(deviceId);
-                } else {
-                    log.info("Failed again marking {} offline. {}", deviceId, role);
-                }
+                CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
+                roleFuture.whenComplete((role, error) -> {
+                    MastershipTerm term = termService.getMastershipTerm(deviceId);
+                    // TODO: Move this type of check inside device clock manager, etc.
+                    if (term != null && localNodeId.equals(term.master())) {
+                        log.info("Retry marking {} offline", deviceId);
+                        deviceClockProviderService.setMastershipTerm(deviceId, term);
+                        post(store.markOffline(deviceId));
+                    } else {
+                        log.info("Failed again marking {} offline. {}", deviceId, role);
+                    }
+                });
             } finally {
                 //relinquish master role and ability to be backup.
                 mastershipService.relinquishMastership(deviceId);
-
-                if (event != null) {
-                    log.info("Device {} disconnected from cluster", deviceId);
-                    post(event);
-                }
             }
         }
 
@@ -531,12 +526,11 @@
     private void reassertRole(final DeviceId did,
                               final MastershipRole nextRole) {
 
-        final NodeId myNodeId = clusterService.getLocalNode().id();
         MastershipRole myNextRole = nextRole;
         if (myNextRole == NONE) {
             mastershipService.requestRoleFor(did);
             MastershipTerm term = termService.getMastershipTerm(did);
-            if (term != null && myNodeId.equals(term.master())) {
+            if (term != null && localNodeId.equals(term.master())) {
                 myNextRole = MASTER;
             } else {
                 myNextRole = STANDBY;
@@ -597,21 +591,20 @@
             }
 
             final DeviceId did = event.subject();
-            final NodeId myNodeId = clusterService.getLocalNode().id();
 
             // myRole suggested by MastershipService
             MastershipRole myNextRole;
-            if (myNodeId.equals(event.roleInfo().master())) {
+            if (localNodeId.equals(event.roleInfo().master())) {
                 // confirm latest info
                 MastershipTerm term = termService.getMastershipTerm(did);
-                final boolean iHaveControl = term != null && myNodeId.equals(term.master());
+                final boolean iHaveControl = term != null && localNodeId.equals(term.master());
                 if (iHaveControl) {
                     deviceClockProviderService.setMastershipTerm(did, term);
                     myNextRole = MASTER;
                 } else {
                     myNextRole = STANDBY;
                 }
-            } else if (event.roleInfo().backups().contains(myNodeId)) {
+            } else if (event.roleInfo().backups().contains(localNodeId)) {
                 myNextRole = STANDBY;
             } else {
                 myNextRole = NONE;
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
index 87b08d6..b5ae31b 100644
--- a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
@@ -34,6 +34,7 @@
 import org.onosproject.store.trivial.impl.SimpleMastershipStore;
 
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -77,7 +78,7 @@
     public void setRole() {
         mgr.setRole(NID_OTHER, DEV_MASTER, MASTER);
         assertEquals("wrong local role:", NONE, mgr.getLocalRole(DEV_MASTER));
-        assertEquals("wrong obtained role:", STANDBY, mgr.requestRoleFor(DEV_MASTER));
+        assertEquals("wrong obtained role:", STANDBY, Futures.getUnchecked(mgr.requestRoleFor(DEV_MASTER)));
 
         //set to master
         mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
@@ -112,8 +113,8 @@
         mgr.setRole(NID_OTHER, DEV_OTHER, MASTER);
 
         //local should be master for one but standby for other
-        assertEquals("wrong role:", MASTER, mgr.requestRoleFor(DEV_MASTER));
-        assertEquals("wrong role:", STANDBY, mgr.requestRoleFor(DEV_OTHER));
+        assertEquals("wrong role:", MASTER, Futures.getUnchecked(mgr.requestRoleFor(DEV_MASTER)));
+        assertEquals("wrong role:", STANDBY, Futures.getUnchecked(mgr.requestRoleFor(DEV_OTHER)));
     }
 
     @Test
diff --git a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
index 20d55c4..d6bfda2 100644
--- a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
@@ -19,6 +19,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import org.junit.After;
 import org.junit.Before;
@@ -305,8 +306,8 @@
         }
 
         @Override
-        public MastershipRole requestRoleFor(DeviceId deviceId) {
-            return MastershipRole.MASTER;
+        public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
+            return CompletableFuture.completedFuture(MastershipRole.MASTER);
         }
 
         @Override