LeadershipService and MastershipService/Store APIs return CompletableFutures so that callers can (optionally) chain together dependent operations

Change-Id: Ia00fcb7d98fbfce897527f67ea9690abf6fe846a
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
index 9b94056..9130f4f 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
@@ -18,6 +18,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Service for leader election.
@@ -55,16 +56,18 @@
     /**
      * Joins the leadership contest.
      *
-     * @param path topic for which this controller node wishes to be a leader.
+     * @param path topic for which this controller node wishes to be a leader
+     * @return {@code Leadership} future
      */
-    void runForLeadership(String path);
+    CompletableFuture<Leadership> runForLeadership(String path);
 
     /**
      * Withdraws from a leadership contest.
      *
-     * @param path topic for which this controller node no longer wishes to be a leader.
+     * @param path topic for which this controller node no longer wishes to be a leader
+     * @return future that is successfully completed when withdraw is done
      */
-    void withdraw(String path);
+    CompletableFuture<Void> withdraw(String path);
 
     /**
      * If the local nodeId is the leader for specified topic, this method causes it to
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipAdminService.java b/core/api/src/main/java/org/onosproject/mastership/MastershipAdminService.java
index 6c41fb2..a8835fc 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipAdminService.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipAdminService.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.mastership;
 
+import java.util.concurrent.CompletableFuture;
+
 import org.onosproject.cluster.NodeId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
@@ -30,8 +32,9 @@
      * @param instance controller instance identifier
      * @param deviceId device identifier
      * @param role     requested role
+     * @return future that is completed when the role is set
      */
-    void setRole(NodeId instance, DeviceId deviceId, MastershipRole role);
+    CompletableFuture<Void> setRole(NodeId instance, DeviceId deviceId, MastershipRole role);
 
     /**
      * Balances the mastership to be shared as evenly as possibly by all
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipService.java b/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
index f03f6c8..450bcc5 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipService.java
@@ -16,6 +16,7 @@
 package org.onosproject.mastership;
 
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.RoleInfo;
@@ -46,7 +47,7 @@
      * @param deviceId the the identifier of the device
      * @return the role of this controller instance
      */
-    MastershipRole requestRoleFor(DeviceId deviceId);
+    CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId);
 
     /**
      * Abandons mastership of the specified device on the local node thus
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java b/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
index 6b64705..81c2d8b 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipStore.java
@@ -38,7 +38,7 @@
      * @param deviceId device identifier
      * @return established or newly negotiated mastership role
      */
-    MastershipRole requestRole(DeviceId deviceId);
+    CompletableFuture<MastershipRole> requestRole(DeviceId deviceId);
 
     /**
      * Returns the role of a device for a specific controller instance.
diff --git a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
index abc92f5..e1d421d 100644
--- a/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/cluster/LeadershipServiceAdapter.java
@@ -18,6 +18,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Test adapter for leadership service.
@@ -40,13 +41,13 @@
     }
 
     @Override
-    public void runForLeadership(String path) {
-
+    public CompletableFuture<Leadership> runForLeadership(String path) {
+        return null;
     }
 
     @Override
-    public void withdraw(String path) {
-
+    public CompletableFuture<Void> withdraw(String path) {
+        return null;
     }
 
     @Override
diff --git a/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java b/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
index be1f4b9..aff67cd 100644
--- a/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/mastership/MastershipServiceAdapter.java
@@ -21,6 +21,7 @@
 import org.onosproject.net.MastershipRole;
 
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Test adapter for mastership service.
@@ -32,7 +33,7 @@
     }
 
     @Override
-    public MastershipRole requestRoleFor(DeviceId deviceId) {
+    public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
         return null;
     }
 
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
index 1f3b360..ee60cc7 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/MastershipManager.java
@@ -17,6 +17,8 @@
 
 import com.codahale.metrics.Timer;
 import com.codahale.metrics.Timer.Context;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -109,7 +111,7 @@
     }
 
     @Override
-    public void setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
+    public CompletableFuture<Void> setRole(NodeId nodeId, DeviceId deviceId, MastershipRole role) {
         checkNotNull(nodeId, NODE_ID_NULL);
         checkNotNull(deviceId, DEVICE_ID_NULL);
         checkNotNull(role, ROLE_NULL);
@@ -128,14 +130,14 @@
                 break;
             default:
                 log.info("Unknown role; ignoring");
-                return;
+                return CompletableFuture.completedFuture(null);
         }
 
-        eventFuture.whenComplete((event, error) -> {
+        return eventFuture.whenComplete((event, error) -> {
             if (event != null) {
                 post(event);
             }
-        });
+        }).thenApply(v -> null);
     }
 
     @Override
@@ -155,14 +157,11 @@
     }
 
     @Override
-    public MastershipRole requestRoleFor(DeviceId deviceId) {
+    public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
         checkNotNull(deviceId, DEVICE_ID_NULL);
         final Context timer = startTimer(requestRoleTimer);
-        try {
-            return store.requestRole(deviceId);
-        } finally {
-            stopTimer(timer);
-        }
+        return store.requestRole(deviceId).whenComplete((result, error) -> stopTimer(timer));
+
     }
 
     @Override
@@ -222,13 +221,18 @@
         }
 
         // Now re-balance the buckets until they are roughly even.
+        List<CompletableFuture<Void>> balanceBucketsFutures = Lists.newLinkedList();
         int rounds = controllerDevices.keySet().size();
         for (int i = 0; i < rounds; i++) {
             // Iterate over the buckets and find the smallest and the largest.
             ControllerNode smallest = findBucket(true, controllerDevices);
             ControllerNode largest = findBucket(false, controllerDevices);
-            balanceBuckets(smallest, largest, controllerDevices, deviceCount);
+            balanceBucketsFutures.add(balanceBuckets(smallest, largest, controllerDevices, deviceCount));
         }
+        CompletableFuture<Void> balanceRolesFuture = CompletableFuture.allOf(
+                balanceBucketsFutures.toArray(new CompletableFuture[balanceBucketsFutures.size()]));
+
+        Futures.getUnchecked(balanceRolesFuture);
     }
 
     private ControllerNode findBucket(boolean min,
@@ -245,7 +249,7 @@
         return xNode;
     }
 
-    private void balanceBuckets(ControllerNode smallest, ControllerNode largest,
+    private CompletableFuture<Void> balanceBuckets(ControllerNode smallest, ControllerNode largest,
                                 Map<ControllerNode, Set<DeviceId>>  controllerDevices,
                                 int deviceCount) {
         Collection<DeviceId> minBucket = controllerDevices.get(smallest);
@@ -255,6 +259,8 @@
         int delta = (maxBucket.size() - minBucket.size()) / 2;
         delta = Math.min(deviceCount / bucketCount, delta);
 
+        List<CompletableFuture<Void>> setRoleFutures = Lists.newLinkedList();
+
         if (delta > 0) {
             log.info("Attempting to move {} nodes from {} to {}...", delta,
                      largest.id(), smallest.id());
@@ -264,12 +270,14 @@
             while (it.hasNext() && i < delta) {
                 DeviceId deviceId = it.next();
                 log.info("Setting {} as the master for {}", smallest.id(), deviceId);
-                setRole(smallest.id(), deviceId, MASTER);
+                setRoleFutures.add(setRole(smallest.id(), deviceId, MASTER));
                 controllerDevices.get(smallest).add(deviceId);
                 it.remove();
                 i++;
             }
         }
+
+        return CompletableFuture.allOf(setRoleFutures.toArray(new CompletableFuture[setRoleFutures.size()]));
     }
 
 
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 661ddca..d12664b 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -16,6 +16,7 @@
 package org.onosproject.net.device.impl;
 
 import com.google.common.collect.Lists;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -57,6 +58,7 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -79,7 +81,6 @@
     private static final String PORT_NUMBER_NULL = "Port number cannot be null";
     private static final String DEVICE_DESCRIPTION_NULL = "Device description cannot be null";
     private static final String PORT_DESCRIPTION_NULL = "Port description cannot be null";
-    private static final String ROLE_NULL = "Role cannot be null";
 
     private final Logger log = getLogger(getClass());
 
@@ -89,6 +90,7 @@
     private final DeviceStoreDelegate delegate = new InternalStoreDelegate();
 
     private final MastershipListener mastershipListener = new InternalMastershipListener();
+    private NodeId localNodeId;
 
     private ScheduledExecutorService backgroundService;
 
@@ -113,6 +115,7 @@
     @Activate
     public void activate() {
         backgroundService = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "manager-background"));
+        localNodeId = clusterService.getLocalNode().id();
 
         store.setDelegate(delegate);
         eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
@@ -302,12 +305,10 @@
             checkValidity();
 
             log.info("Device {} connected", deviceId);
-            final NodeId myNodeId = clusterService.getLocalNode().id();
-
             // check my Role
             mastershipService.requestRoleFor(deviceId);
             final MastershipTerm term = termService.getMastershipTerm(deviceId);
-            if (term == null || !myNodeId.equals(term.master())) {
+            if (term == null || !localNodeId.equals(term.master())) {
                 log.info("Role of this node is STANDBY for {}", deviceId);
                 // TODO: Do we need to explicitly tell the Provider that
                 // this instance is not the MASTER
@@ -337,7 +338,6 @@
 
             log.info("Device {} disconnected from this node", deviceId);
 
-            DeviceEvent event = null;
             List<Port> ports = store.getPorts(deviceId);
             List<PortDescription> descs = Lists.newArrayList();
             ports.forEach(port ->
@@ -346,7 +346,7 @@
                                                      port.portSpeed())));
             store.updatePorts(this.provider().id(), deviceId, descs);
             try {
-                event = store.markOffline(deviceId);
+                post(store.markOffline(deviceId));
             } catch (IllegalStateException e) {
                 log.warn("Failed to mark {} offline", deviceId);
                 // only the MASTER should be marking off-line in normal cases,
@@ -360,26 +360,21 @@
 
                 // FIXME: Store semantics leaking out as IllegalStateException.
                 //  Consider revising store API to handle this scenario.
-
-                MastershipRole role = mastershipService.requestRoleFor(deviceId);
-                MastershipTerm term = termService.getMastershipTerm(deviceId);
-                final NodeId myNodeId = clusterService.getLocalNode().id();
-                // TODO: Move this type of check inside device clock manager, etc.
-                if (term != null && myNodeId.equals(term.master())) {
-                    log.info("Retry marking {} offline", deviceId);
-                    deviceClockProviderService.setMastershipTerm(deviceId, term);
-                    event = store.markOffline(deviceId);
-                } else {
-                    log.info("Failed again marking {} offline. {}", deviceId, role);
-                }
+                CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
+                roleFuture.whenComplete((role, error) -> {
+                    MastershipTerm term = termService.getMastershipTerm(deviceId);
+                    // TODO: Move this type of check inside device clock manager, etc.
+                    if (term != null && localNodeId.equals(term.master())) {
+                        log.info("Retry marking {} offline", deviceId);
+                        deviceClockProviderService.setMastershipTerm(deviceId, term);
+                        post(store.markOffline(deviceId));
+                    } else {
+                        log.info("Failed again marking {} offline. {}", deviceId, role);
+                    }
+                });
             } finally {
                 //relinquish master role and ability to be backup.
                 mastershipService.relinquishMastership(deviceId);
-
-                if (event != null) {
-                    log.info("Device {} disconnected from cluster", deviceId);
-                    post(event);
-                }
             }
         }
 
@@ -531,12 +526,11 @@
     private void reassertRole(final DeviceId did,
                               final MastershipRole nextRole) {
 
-        final NodeId myNodeId = clusterService.getLocalNode().id();
         MastershipRole myNextRole = nextRole;
         if (myNextRole == NONE) {
             mastershipService.requestRoleFor(did);
             MastershipTerm term = termService.getMastershipTerm(did);
-            if (term != null && myNodeId.equals(term.master())) {
+            if (term != null && localNodeId.equals(term.master())) {
                 myNextRole = MASTER;
             } else {
                 myNextRole = STANDBY;
@@ -597,21 +591,20 @@
             }
 
             final DeviceId did = event.subject();
-            final NodeId myNodeId = clusterService.getLocalNode().id();
 
             // myRole suggested by MastershipService
             MastershipRole myNextRole;
-            if (myNodeId.equals(event.roleInfo().master())) {
+            if (localNodeId.equals(event.roleInfo().master())) {
                 // confirm latest info
                 MastershipTerm term = termService.getMastershipTerm(did);
-                final boolean iHaveControl = term != null && myNodeId.equals(term.master());
+                final boolean iHaveControl = term != null && localNodeId.equals(term.master());
                 if (iHaveControl) {
                     deviceClockProviderService.setMastershipTerm(did, term);
                     myNextRole = MASTER;
                 } else {
                     myNextRole = STANDBY;
                 }
-            } else if (event.roleInfo().backups().contains(myNodeId)) {
+            } else if (event.roleInfo().backups().contains(localNodeId)) {
                 myNextRole = STANDBY;
             } else {
                 myNextRole = NONE;
diff --git a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
index 87b08d6..b5ae31b 100644
--- a/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/cluster/impl/MastershipManagerTest.java
@@ -34,6 +34,7 @@
 import org.onosproject.store.trivial.impl.SimpleMastershipStore;
 
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
@@ -77,7 +78,7 @@
     public void setRole() {
         mgr.setRole(NID_OTHER, DEV_MASTER, MASTER);
         assertEquals("wrong local role:", NONE, mgr.getLocalRole(DEV_MASTER));
-        assertEquals("wrong obtained role:", STANDBY, mgr.requestRoleFor(DEV_MASTER));
+        assertEquals("wrong obtained role:", STANDBY, Futures.getUnchecked(mgr.requestRoleFor(DEV_MASTER)));
 
         //set to master
         mgr.setRole(NID_LOCAL, DEV_MASTER, MASTER);
@@ -112,8 +113,8 @@
         mgr.setRole(NID_OTHER, DEV_OTHER, MASTER);
 
         //local should be master for one but standby for other
-        assertEquals("wrong role:", MASTER, mgr.requestRoleFor(DEV_MASTER));
-        assertEquals("wrong role:", STANDBY, mgr.requestRoleFor(DEV_OTHER));
+        assertEquals("wrong role:", MASTER, Futures.getUnchecked(mgr.requestRoleFor(DEV_MASTER)));
+        assertEquals("wrong role:", STANDBY, Futures.getUnchecked(mgr.requestRoleFor(DEV_OTHER)));
     }
 
     @Test
diff --git a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
index 20d55c4..d6bfda2 100644
--- a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
@@ -19,6 +19,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import org.junit.After;
 import org.junit.Before;
@@ -305,8 +306,8 @@
         }
 
         @Override
-        public MastershipRole requestRoleFor(DeviceId deviceId) {
-            return MastershipRole.MASTER;
+        public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
+            return CompletableFuture.completedFuture(MastershipRole.MASTER);
         }
 
         @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index a4f265f..5b73246 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -49,6 +49,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -188,7 +189,7 @@
     }
 
     @Override
-    public void runForLeadership(String path) {
+    public CompletableFuture<Leadership> runForLeadership(String path) {
         checkArgument(path != null);
         Topic topic = new Topic(path);
         Topic oldTopic = topics.putIfAbsent(path, topic);
@@ -198,16 +199,18 @@
         } else {
             oldTopic.runForLeadership();
         }
+        return CompletableFuture.completedFuture(getLeadership(path));
     }
 
     @Override
-    public void withdraw(String path) {
+    public CompletableFuture<Void> withdraw(String path) {
         checkArgument(path != null);
         Topic topic = topics.get(path);
         if (topic != null) {
             topics.remove(path, topic);
             topic.stop();
         }
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index a013737..434d0e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -37,6 +37,8 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -199,8 +201,14 @@
     }
 
     @Override
-    public void runForLeadership(String path) {
+    public CompletableFuture<Leadership> runForLeadership(String path) {
         log.debug("Running for leadership for topic: {}", path);
+        CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
+        doRunForLeadership(path, resultFuture);
+        return resultFuture;
+    }
+
+    private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
         try {
             Versioned<List<NodeId>> candidates = candidateMap.get(path);
             if (candidates != null) {
@@ -216,7 +224,7 @@
                                     newCandidates.version(),
                                     newCandidates.creationTime())));
                     } else {
-                        rerunForLeadership(path);
+                        rerunForLeadership(path, future);
                         return;
                     }
                 }
@@ -231,28 +239,38 @@
                                 newCandidates.version(),
                                 newCandidates.creationTime())));
                 } else {
-                    rerunForLeadership(path);
+                    rerunForLeadership(path, future);
                     return;
                 }
             }
             log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
             activeTopics.add(path);
-            tryLeaderLock(path);
+            tryLeaderLock(path, future);
         } catch (ConsistentMapException e) {
             log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
-            rerunForLeadership(path);
+            rerunForLeadership(path, future);
         }
     }
 
     @Override
-    public void withdraw(String path) {
+    public CompletableFuture<Void> withdraw(String path) {
         activeTopics.remove(path);
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        doWithdraw(path, resultFuture);
+        return resultFuture;
+    }
 
+
+    private void doWithdraw(String path, CompletableFuture<Void> future) {
+        if (activeTopics.contains(path)) {
+            future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
+        }
         try {
             Versioned<NodeId> leader = leaderMap.get(path);
             if (leader != null && Objects.equals(leader.value(), localNodeId)) {
                 if (leaderMap.remove(path, leader.version())) {
                     log.info("Gave up leadership for {}", path);
+                    future.complete(null);
                     publish(new LeadershipEvent(
                             LeadershipEvent.Type.LEADER_BOOTED,
                             new Leadership(path,
@@ -267,10 +285,12 @@
                     ? Lists.newArrayList(candidates.value())
                     : Lists.newArrayList();
             if (!candidateList.remove(localNodeId)) {
+                future.complete(null);
                 return;
             }
             if (candidateMap.replace(path, candidates.version(), candidateList)) {
                 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+                future.complete(null);
                 publish(new LeadershipEvent(
                                 LeadershipEvent.Type.CANDIDATES_CHANGED,
                                 new Leadership(path,
@@ -279,11 +299,11 @@
                                     newCandidates.creationTime())));
             } else {
                 log.warn("Failed to withdraw from candidates list. Will retry");
-                retryWithdraw(path);
+                retryWithdraw(path, future);
             }
         } catch (Exception e) {
             log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
-            retryWithdraw(path);
+            retryWithdraw(path, future);
         }
     }
 
@@ -304,7 +324,7 @@
                                 localNodeId,
                                 leader.version(),
                                 leader.creationTime())));
-                    retryLock(path);
+                    retryLock(path, new CompletableFuture<>());
                     return true;
                 }
             }
@@ -350,7 +370,7 @@
         return updated;
     }
 
-    private void tryLeaderLock(String path) {
+    private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
         if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
             return;
         }
@@ -362,35 +382,37 @@
                                   .filter(n -> clusterService.getState(n) == ACTIVE)
                                   .collect(Collectors.toList());
                 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
-                    leaderLockAttempt(path, candidates.value());
+                    leaderLockAttempt(path, candidates.value(), future);
                 } else {
-                    retryLock(path);
+                    retryLock(path, future);
                 }
             } else {
                 throw new IllegalStateException("should not be here");
             }
         } catch (Exception e) {
             log.debug("Failed to fetch candidate information for {}", path, e);
-            retryLock(path);
+            retryLock(path, future);
         }
     }
 
-    private void leaderLockAttempt(String path, List<NodeId> candidates) {
+    private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
         try {
             Versioned<NodeId> currentLeader = leaderMap.get(path);
             if (currentLeader != null) {
                 if (localNodeId.equals(currentLeader.value())) {
                     log.info("Already has leadership for {}", path);
                     // FIXME: candidates can get out of sync.
+                    Leadership leadership = new Leadership(path,
+                            localNodeId,
+                            currentLeader.version(),
+                            currentLeader.creationTime());
+                    future.complete(leadership);
                     publish(new LeadershipEvent(
                             LeadershipEvent.Type.LEADER_ELECTED,
-                            new Leadership(path,
-                                localNodeId,
-                                currentLeader.version(),
-                                currentLeader.creationTime())));
+                            leadership));
                 } else {
                     // someone else has leadership. will retry after sometime.
-                    retryLock(path);
+                    retryLock(path, future);
                 }
             } else {
                 if (leaderMap.putIfAbsent(path, localNodeId) == null) {
@@ -398,20 +420,22 @@
                     // do a get again to get the version (epoch)
                     Versioned<NodeId> newLeader = leaderMap.get(path);
                     // FIXME: candidates can get out of sync
+                    Leadership leadership = new Leadership(path,
+                            newLeader.value(),
+                            newLeader.version(),
+                            newLeader.creationTime());
+                    future.complete(leadership);
                     publish(new LeadershipEvent(
                             LeadershipEvent.Type.LEADER_ELECTED,
-                            new Leadership(path,
-                                newLeader.value(),
-                                newLeader.version(),
-                                newLeader.creationTime())));
+                            leadership));
                 } else {
                     // someone beat us to it.
-                    retryLock(path);
+                    retryLock(path, future);
                 }
             }
         } catch (Exception e) {
             log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
-            retryLock(path);
+            retryLock(path, future);
         }
     }
 
@@ -463,23 +487,23 @@
         }
     }
 
-    private void rerunForLeadership(String path) {
+    private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
         retryLeaderLockExecutor.schedule(
-                () -> runForLeadership(path),
+                () -> doRunForLeadership(path, future),
                 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
                 TimeUnit.SECONDS);
     }
 
-    private void retryLock(String path) {
+    private void retryLock(String path, CompletableFuture<Leadership> future) {
         retryLeaderLockExecutor.schedule(
-                () -> tryLeaderLock(path),
+                () -> tryLeaderLock(path, future),
                 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
                 TimeUnit.SECONDS);
     }
 
-    private void retryWithdraw(String path) {
+    private void retryWithdraw(String path, CompletableFuture<Void> future) {
         retryLeaderLockExecutor.schedule(
-                () -> withdraw(path),
+                () -> doWithdraw(path, future),
                 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
                 TimeUnit.SECONDS);
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 9aa376a..5225549 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -163,19 +163,22 @@
     }
 
     @Override
-    public MastershipRole requestRole(DeviceId deviceId) {
+    public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
         if (connectedDevices.add(deviceId)) {
-            leadershipService.runForLeadership(leadershipTopic);
-            return MastershipRole.STANDBY;
+            return leadershipService.runForLeadership(leadershipTopic)
+                                    .thenApply(leadership -> {
+                                        return Objects.equal(localNodeId, leadership.leader())
+                                                ? MastershipRole.MASTER : MastershipRole.STANDBY;
+                                    });
         } else {
             Leadership leadership = leadershipService.getLeadership(leadershipTopic);
             if (leadership != null && leadership.leader().equals(localNodeId)) {
-                return MastershipRole.MASTER;
+                return CompletableFuture.completedFuture(MastershipRole.MASTER);
             } else {
-                return MastershipRole.STANDBY;
+                return CompletableFuture.completedFuture(MastershipRole.STANDBY);
             }
         }
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
index d6a857c..b2c5ade 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
@@ -205,7 +205,7 @@
     }
 
     @Override
-    public MastershipRole requestRole(DeviceId deviceId) {
+    public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
 
         // if no master => become master
         // if there already exists a master:
@@ -225,7 +225,7 @@
 
                 updateTerm(deviceId);
                 roleMap.put(deviceId, rv);
-                return MASTER;
+                return CompletableFuture.completedFuture(MASTER);
             }
             final MastershipRole currentRole = rv.getRole(local);
             switch (currentRole) {
@@ -239,7 +239,7 @@
                         roleMap.put(deviceId, rv);
                         // trigger BACKUPS_CHANGED?
                     }
-                    return currentRole;
+                    return CompletableFuture.completedFuture(currentRole);
                 case STANDBY:
                     // RoleInfo integrity check
                     modified = rv.reassign(local, NONE, STANDBY);
@@ -250,16 +250,16 @@
                         roleMap.put(deviceId, rv);
                         // trigger BACKUPS_CHANGED?
                     }
-                    return currentRole;
+                    return CompletableFuture.completedFuture(currentRole);
                 case NONE:
                     rv.reassign(local, NONE, STANDBY);
                     roleMap.put(deviceId, rv);
                     // TODO: notifyDelegate BACKUPS_CHANGED
-                    return STANDBY;
+                    return CompletableFuture.completedFuture(STANDBY);
                 default:
                     log.warn("unknown Mastership Role {}", currentRole);
             }
-            return currentRole;
+            return CompletableFuture.completedFuture(currentRole);
         } finally {
             roleMap.unlock(deviceId);
         }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
index c3b529e..9655b2d 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/intent/impl/PartitionManagerTest.java
@@ -35,6 +35,7 @@
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import static junit.framework.TestCase.assertFalse;
 import static org.easymock.EasyMock.anyObject;
@@ -74,8 +75,11 @@
 
         leadershipService.addListener(anyObject(LeadershipEventListener.class));
         expectLastCall().andDelegateTo(new TestLeadershipService());
-        leadershipService.runForLeadership(anyString());
-        expectLastCall().anyTimes();
+        for (int i = 0; i < PartitionManager.NUM_PARTITIONS; i++) {
+            expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
+                .andReturn(CompletableFuture.completedFuture(null))
+                .times(1);
+        }
 
         partitionManager = new PartitionManager()
                 .withScheduledExecutor(new NullScheduledExecutor());
@@ -92,6 +96,7 @@
      * @param numMine number of partitions that should be owned by the local node
      */
     private void setUpLeadershipService(int numMine) {
+
         Map<String, Leadership> leaderBoard = new HashMap<>();
 
         for (int i = 0; i < numMine; i++) {
@@ -123,7 +128,9 @@
         leadershipService.addListener(anyObject(LeadershipEventListener.class));
 
         for (int i = 0; i < PartitionManager.NUM_PARTITIONS; i++) {
-            leadershipService.runForLeadership(ELECTION_PREFIX + i);
+            expect(leadershipService.runForLeadership(ELECTION_PREFIX + i))
+                .andReturn(CompletableFuture.completedFuture(null))
+                .times(1);
         }
 
         replay(leadershipService);
@@ -172,8 +179,9 @@
         // We have all the partitions so we'll need to relinquish some
         setUpLeadershipService(PartitionManager.NUM_PARTITIONS);
 
-        leadershipService.withdraw(anyString());
-        expectLastCall().times(7);
+        expect(leadershipService.withdraw(anyString()))
+                                .andReturn(CompletableFuture.completedFuture(null))
+                                .times(7);
 
         replay(leadershipService);
 
diff --git a/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
index 56879e8..1745687 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/mastership/impl/DistributedMastershipStoreTest.java
@@ -145,22 +145,22 @@
 
         //if already MASTER, nothing should happen
         testStore.put(DID2, N1, true, false, true);
-        assertEquals("wrong role for MASTER:", MASTER, dms.requestRole(DID2));
+        assertEquals("wrong role for MASTER:", MASTER, Futures.getUnchecked(dms.requestRole(DID2)));
 
         //populate maps with DID1, N1 thru NONE case
-        assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+        assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
         assertTrue("wrong state for store:", !dms.terms.isEmpty());
         assertEquals("wrong term",
                 MastershipTerm.of(N1, 1), dms.getTermFor(DID1));
 
         //CN2 now local. DID2 has N1 as MASTER so N2 is STANDBY
         testStore.setCurrent(CN2);
-        assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+        assertEquals("wrong role for STANDBY:", STANDBY, Futures.getUnchecked(dms.requestRole(DID2)));
         assertEquals("wrong number of entries:", 2, dms.terms.size());
 
         //change term and requestRole() again; should persist
         testStore.increment(DID2);
-        assertEquals("wrong role for STANDBY:", STANDBY, dms.requestRole(DID2));
+        assertEquals("wrong role for STANDBY:", STANDBY, Futures.getUnchecked(dms.requestRole(DID2)));
         assertEquals("wrong term", MastershipTerm.of(N1, 1), dms.getTermFor(DID2));
     }
 
@@ -168,7 +168,7 @@
     public void setMaster() {
         //populate maps with DID1, N1 as MASTER thru NONE case
         testStore.setCurrent(CN1);
-        assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+        assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
         assertNull("wrong event:", Futures.getUnchecked(dms.setMaster(N1, DID1)));
 
         //switch over to N2
@@ -189,7 +189,7 @@
     public void relinquishRole() {
         //populate maps with DID1, N1 as MASTER thru NONE case
         testStore.setCurrent(CN1);
-        assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+        assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
         //no backup, no new MASTER/event
         assertNull("wrong event:", Futures.getUnchecked(dms.relinquishRole(N1, DID1)));
 
@@ -197,7 +197,7 @@
 
         //add backup CN2, get it elected MASTER by relinquishing
         testStore.setCurrent(CN2);
-        assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+        assertEquals("wrong role for NONE:", STANDBY, Futures.getUnchecked(dms.requestRole(DID1)));
         assertEquals("wrong event:", Type.MASTER_CHANGED, Futures.getUnchecked(dms.relinquishRole(N1, DID1)).type());
         assertEquals("wrong master", N2, dms.getMaster(DID1));
 
@@ -209,9 +209,9 @@
                 dms.roleMap.get(DID1).nodesOfRole(NONE).size());
 
         //bring nodes back
-        assertEquals("wrong role for NONE:", MASTER, dms.requestRole(DID1));
+        assertEquals("wrong role for NONE:", MASTER, Futures.getUnchecked(dms.requestRole(DID1)));
         testStore.setCurrent(CN1);
-        assertEquals("wrong role for NONE:", STANDBY, dms.requestRole(DID1));
+        assertEquals("wrong role for NONE:", STANDBY, Futures.getUnchecked(dms.requestRole(DID1)));
         assertEquals("wrong number of backup nodes", 1,
                 dms.roleMap.get(DID1).nodesOfRole(STANDBY).size());
 
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
index d00f616..eebe3dd 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
@@ -21,6 +21,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.stream.Collectors;
@@ -76,21 +77,23 @@
     }
 
     @Override
-    public void runForLeadership(String path) {
+    public CompletableFuture<Leadership> runForLeadership(String path) {
         elections.put(path, true);
         for (LeadershipEventListener listener : listeners) {
             listener.event(new LeadershipEvent(Type.LEADER_ELECTED,
                     new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
         }
+        return CompletableFuture.completedFuture(new Leadership(path, clusterService.getLocalNode().id(), 0, 0));
     }
 
     @Override
-    public void withdraw(String path) {
+    public CompletableFuture<Void> withdraw(String path) {
         elections.remove(path);
         for (LeadershipEventListener listener : listeners) {
             listener.event(new LeadershipEvent(Type.LEADER_BOOTED,
                     new Leadership(path, clusterService.getLocalNode().id(), 0, 0)));
         }
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java
index 39bc893..2feb18b 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleMastershipStore.java
@@ -191,14 +191,14 @@
     }
 
     @Override
-    public synchronized MastershipRole requestRole(DeviceId deviceId) {
+    public synchronized CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
         //query+possible reelection
         NodeId node = clusterService.getLocalNode().id();
         MastershipRole role = getRole(node, deviceId);
 
         switch (role) {
             case MASTER:
-                return MastershipRole.MASTER;
+                return CompletableFuture.completedFuture(MastershipRole.MASTER);
             case STANDBY:
                 if (getMaster(deviceId) == null) {
                     // no master => become master
@@ -208,9 +208,9 @@
                     removeFromBackups(deviceId, node);
                     notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
                                                        getNodes(deviceId)));
-                    return MastershipRole.MASTER;
+                    return CompletableFuture.completedFuture(MastershipRole.MASTER);
                 }
-                return MastershipRole.STANDBY;
+                return CompletableFuture.completedFuture(MastershipRole.STANDBY);
             case NONE:
                 if (getMaster(deviceId) == null) {
                     // no master => become master
@@ -218,18 +218,18 @@
                     incrementTerm(deviceId);
                     notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId,
                                                        getNodes(deviceId)));
-                    return MastershipRole.MASTER;
+                    return CompletableFuture.completedFuture(MastershipRole.MASTER);
                 }
                 // add to backup list
                 if (addToBackup(deviceId, node)) {
                     notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId,
                                                        getNodes(deviceId)));
                 }
-                return MastershipRole.STANDBY;
+                return CompletableFuture.completedFuture(MastershipRole.STANDBY);
             default:
                 log.warn("unknown Mastership Role {}", role);
         }
-        return role;
+        return CompletableFuture.completedFuture(role);
     }
 
     // add to backup if not there already, silently ignores null node
diff --git a/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java b/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java
index 5b44515..2cda65b 100644
--- a/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onosproject/store/trivial/impl/SimpleMastershipStoreTest.java
@@ -130,19 +130,19 @@
     public void requestRole() {
         //NONE - become MASTER
         put(DID1, N1, false, false);
-        assertEquals("wrong role", MASTER, sms.requestRole(DID1));
+        assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID1)));
 
         //was STANDBY - become MASTER
         put(DID2, N1, false, true);
-        assertEquals("wrong role", MASTER, sms.requestRole(DID2));
+        assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID2)));
 
         //other MASTER - stay STANDBY
         put(DID3, N2, true, false);
-        assertEquals("wrong role", STANDBY, sms.requestRole(DID3));
+        assertEquals("wrong role", STANDBY, Futures.getUnchecked(sms.requestRole(DID3)));
 
         //local (N1) is MASTER - stay MASTER
         put(DID4, N1, true, true);
-        assertEquals("wrong role", MASTER, sms.requestRole(DID4));
+        assertEquals("wrong role", MASTER, Futures.getUnchecked(sms.requestRole(DID4)));
     }
 
     @Test
diff --git a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
index 2e05ce5..4395914 100644
--- a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
+++ b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
@@ -30,6 +30,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 
 import org.junit.After;
 import org.junit.Before;
@@ -496,8 +497,8 @@
         }
 
         @Override
-        public MastershipRole requestRoleFor(DeviceId deviceId) {
-            return null;
+        public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
+            return CompletableFuture.completedFuture(null);
         }
 
         @Override