LeadershipService and MastershipService/Store APIs return CompletableFutures so that callers can (optionally) chain together dependent operations

Change-Id: Ia00fcb7d98fbfce897527f67ea9690abf6fe846a
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index a4f265f..5b73246 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -49,6 +49,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -188,7 +189,7 @@
     }
 
     @Override
-    public void runForLeadership(String path) {
+    public CompletableFuture<Leadership> runForLeadership(String path) {
         checkArgument(path != null);
         Topic topic = new Topic(path);
         Topic oldTopic = topics.putIfAbsent(path, topic);
@@ -198,16 +199,18 @@
         } else {
             oldTopic.runForLeadership();
         }
+        return CompletableFuture.completedFuture(getLeadership(path));
     }
 
     @Override
-    public void withdraw(String path) {
+    public CompletableFuture<Void> withdraw(String path) {
         checkArgument(path != null);
         Topic topic = topics.get(path);
         if (topic != null) {
             topics.remove(path, topic);
             topic.stop();
         }
+        return CompletableFuture.completedFuture(null);
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index a013737..434d0e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -37,6 +37,8 @@
 import java.util.Objects;
 import java.util.Set;
 import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -199,8 +201,14 @@
     }
 
     @Override
-    public void runForLeadership(String path) {
+    public CompletableFuture<Leadership> runForLeadership(String path) {
         log.debug("Running for leadership for topic: {}", path);
+        CompletableFuture<Leadership> resultFuture = new CompletableFuture<>();
+        doRunForLeadership(path, resultFuture);
+        return resultFuture;
+    }
+
+    private void doRunForLeadership(String path, CompletableFuture<Leadership> future) {
         try {
             Versioned<List<NodeId>> candidates = candidateMap.get(path);
             if (candidates != null) {
@@ -216,7 +224,7 @@
                                     newCandidates.version(),
                                     newCandidates.creationTime())));
                     } else {
-                        rerunForLeadership(path);
+                        rerunForLeadership(path, future);
                         return;
                     }
                 }
@@ -231,28 +239,38 @@
                                 newCandidates.version(),
                                 newCandidates.creationTime())));
                 } else {
-                    rerunForLeadership(path);
+                    rerunForLeadership(path, future);
                     return;
                 }
             }
             log.debug("In the leadership race for topic {} with candidates {}", path, candidates);
             activeTopics.add(path);
-            tryLeaderLock(path);
+            tryLeaderLock(path, future);
         } catch (ConsistentMapException e) {
             log.debug("Failed to enter topic leader race for {}. Retrying.", path, e);
-            rerunForLeadership(path);
+            rerunForLeadership(path, future);
         }
     }
 
     @Override
-    public void withdraw(String path) {
+    public CompletableFuture<Void> withdraw(String path) {
         activeTopics.remove(path);
+        CompletableFuture<Void> resultFuture = new CompletableFuture<>();
+        doWithdraw(path, resultFuture);
+        return resultFuture;
+    }
 
+
+    private void doWithdraw(String path, CompletableFuture<Void> future) {
+        if (activeTopics.contains(path)) {
+            future.completeExceptionally(new CancellationException(String.format("%s is now a active topic", path)));
+        }
         try {
             Versioned<NodeId> leader = leaderMap.get(path);
             if (leader != null && Objects.equals(leader.value(), localNodeId)) {
                 if (leaderMap.remove(path, leader.version())) {
                     log.info("Gave up leadership for {}", path);
+                    future.complete(null);
                     publish(new LeadershipEvent(
                             LeadershipEvent.Type.LEADER_BOOTED,
                             new Leadership(path,
@@ -267,10 +285,12 @@
                     ? Lists.newArrayList(candidates.value())
                     : Lists.newArrayList();
             if (!candidateList.remove(localNodeId)) {
+                future.complete(null);
                 return;
             }
             if (candidateMap.replace(path, candidates.version(), candidateList)) {
                 Versioned<List<NodeId>> newCandidates = candidateMap.get(path);
+                future.complete(null);
                 publish(new LeadershipEvent(
                                 LeadershipEvent.Type.CANDIDATES_CHANGED,
                                 new Leadership(path,
@@ -279,11 +299,11 @@
                                     newCandidates.creationTime())));
             } else {
                 log.warn("Failed to withdraw from candidates list. Will retry");
-                retryWithdraw(path);
+                retryWithdraw(path, future);
             }
         } catch (Exception e) {
             log.debug("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
-            retryWithdraw(path);
+            retryWithdraw(path, future);
         }
     }
 
@@ -304,7 +324,7 @@
                                 localNodeId,
                                 leader.version(),
                                 leader.creationTime())));
-                    retryLock(path);
+                    retryLock(path, new CompletableFuture<>());
                     return true;
                 }
             }
@@ -350,7 +370,7 @@
         return updated;
     }
 
-    private void tryLeaderLock(String path) {
+    private void tryLeaderLock(String path, CompletableFuture<Leadership> future) {
         if (!activeTopics.contains(path) || Objects.equals(localNodeId, getLeader(path))) {
             return;
         }
@@ -362,35 +382,37 @@
                                   .filter(n -> clusterService.getState(n) == ACTIVE)
                                   .collect(Collectors.toList());
                 if (localNodeId.equals(activeNodes.get(LEADER_CANDIDATE_POS))) {
-                    leaderLockAttempt(path, candidates.value());
+                    leaderLockAttempt(path, candidates.value(), future);
                 } else {
-                    retryLock(path);
+                    retryLock(path, future);
                 }
             } else {
                 throw new IllegalStateException("should not be here");
             }
         } catch (Exception e) {
             log.debug("Failed to fetch candidate information for {}", path, e);
-            retryLock(path);
+            retryLock(path, future);
         }
     }
 
-    private void leaderLockAttempt(String path, List<NodeId> candidates) {
+    private void leaderLockAttempt(String path, List<NodeId> candidates, CompletableFuture<Leadership> future) {
         try {
             Versioned<NodeId> currentLeader = leaderMap.get(path);
             if (currentLeader != null) {
                 if (localNodeId.equals(currentLeader.value())) {
                     log.info("Already has leadership for {}", path);
                     // FIXME: candidates can get out of sync.
+                    Leadership leadership = new Leadership(path,
+                            localNodeId,
+                            currentLeader.version(),
+                            currentLeader.creationTime());
+                    future.complete(leadership);
                     publish(new LeadershipEvent(
                             LeadershipEvent.Type.LEADER_ELECTED,
-                            new Leadership(path,
-                                localNodeId,
-                                currentLeader.version(),
-                                currentLeader.creationTime())));
+                            leadership));
                 } else {
                     // someone else has leadership. will retry after sometime.
-                    retryLock(path);
+                    retryLock(path, future);
                 }
             } else {
                 if (leaderMap.putIfAbsent(path, localNodeId) == null) {
@@ -398,20 +420,22 @@
                     // do a get again to get the version (epoch)
                     Versioned<NodeId> newLeader = leaderMap.get(path);
                     // FIXME: candidates can get out of sync
+                    Leadership leadership = new Leadership(path,
+                            newLeader.value(),
+                            newLeader.version(),
+                            newLeader.creationTime());
+                    future.complete(leadership);
                     publish(new LeadershipEvent(
                             LeadershipEvent.Type.LEADER_ELECTED,
-                            new Leadership(path,
-                                newLeader.value(),
-                                newLeader.version(),
-                                newLeader.creationTime())));
+                            leadership));
                 } else {
                     // someone beat us to it.
-                    retryLock(path);
+                    retryLock(path, future);
                 }
             }
         } catch (Exception e) {
             log.debug("Attempt to acquire leadership lock for topic {} failed", path, e);
-            retryLock(path);
+            retryLock(path, future);
         }
     }
 
@@ -463,23 +487,23 @@
         }
     }
 
-    private void rerunForLeadership(String path) {
+    private void rerunForLeadership(String path, CompletableFuture<Leadership> future) {
         retryLeaderLockExecutor.schedule(
-                () -> runForLeadership(path),
+                () -> doRunForLeadership(path, future),
                 ELECTION_JOIN_ATTEMPT_INTERVAL_SEC,
                 TimeUnit.SECONDS);
     }
 
-    private void retryLock(String path) {
+    private void retryLock(String path, CompletableFuture<Leadership> future) {
         retryLeaderLockExecutor.schedule(
-                () -> tryLeaderLock(path),
+                () -> tryLeaderLock(path, future),
                 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
                 TimeUnit.SECONDS);
     }
 
-    private void retryWithdraw(String path) {
+    private void retryWithdraw(String path, CompletableFuture<Void> future) {
         retryLeaderLockExecutor.schedule(
-                () -> withdraw(path),
+                () -> doWithdraw(path, future),
                 DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
                 TimeUnit.SECONDS);
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 9aa376a..5225549 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -163,19 +163,22 @@
     }
 
     @Override
-    public MastershipRole requestRole(DeviceId deviceId) {
+    public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
         checkArgument(deviceId != null, DEVICE_ID_NULL);
 
         String leadershipTopic = createDeviceMastershipTopic(deviceId);
         if (connectedDevices.add(deviceId)) {
-            leadershipService.runForLeadership(leadershipTopic);
-            return MastershipRole.STANDBY;
+            return leadershipService.runForLeadership(leadershipTopic)
+                                    .thenApply(leadership -> {
+                                        return Objects.equal(localNodeId, leadership.leader())
+                                                ? MastershipRole.MASTER : MastershipRole.STANDBY;
+                                    });
         } else {
             Leadership leadership = leadershipService.getLeadership(leadershipTopic);
             if (leadership != null && leadership.leader().equals(localNodeId)) {
-                return MastershipRole.MASTER;
+                return CompletableFuture.completedFuture(MastershipRole.MASTER);
             } else {
-                return MastershipRole.STANDBY;
+                return CompletableFuture.completedFuture(MastershipRole.STANDBY);
             }
         }
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
index d6a857c..b2c5ade 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/DistributedMastershipStore.java
@@ -205,7 +205,7 @@
     }
 
     @Override
-    public MastershipRole requestRole(DeviceId deviceId) {
+    public CompletableFuture<MastershipRole> requestRole(DeviceId deviceId) {
 
         // if no master => become master
         // if there already exists a master:
@@ -225,7 +225,7 @@
 
                 updateTerm(deviceId);
                 roleMap.put(deviceId, rv);
-                return MASTER;
+                return CompletableFuture.completedFuture(MASTER);
             }
             final MastershipRole currentRole = rv.getRole(local);
             switch (currentRole) {
@@ -239,7 +239,7 @@
                         roleMap.put(deviceId, rv);
                         // trigger BACKUPS_CHANGED?
                     }
-                    return currentRole;
+                    return CompletableFuture.completedFuture(currentRole);
                 case STANDBY:
                     // RoleInfo integrity check
                     modified = rv.reassign(local, NONE, STANDBY);
@@ -250,16 +250,16 @@
                         roleMap.put(deviceId, rv);
                         // trigger BACKUPS_CHANGED?
                     }
-                    return currentRole;
+                    return CompletableFuture.completedFuture(currentRole);
                 case NONE:
                     rv.reassign(local, NONE, STANDBY);
                     roleMap.put(deviceId, rv);
                     // TODO: notifyDelegate BACKUPS_CHANGED
-                    return STANDBY;
+                    return CompletableFuture.completedFuture(STANDBY);
                 default:
                     log.warn("unknown Mastership Role {}", currentRole);
             }
-            return currentRole;
+            return CompletableFuture.completedFuture(currentRole);
         } finally {
             roleMap.unlock(deviceId);
         }