LeadershipService and MastershipService/Store APIs return CompletableFutures so that callers can (optionally) chain together dependent operations

Change-Id: Ia00fcb7d98fbfce897527f67ea9690abf6fe846a
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 661ddca..d12664b 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -16,6 +16,7 @@
 package org.onosproject.net.device.impl;
 
 import com.google.common.collect.Lists;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -57,6 +58,7 @@
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
@@ -79,7 +81,6 @@
     private static final String PORT_NUMBER_NULL = "Port number cannot be null";
     private static final String DEVICE_DESCRIPTION_NULL = "Device description cannot be null";
     private static final String PORT_DESCRIPTION_NULL = "Port description cannot be null";
-    private static final String ROLE_NULL = "Role cannot be null";
 
     private final Logger log = getLogger(getClass());
 
@@ -89,6 +90,7 @@
     private final DeviceStoreDelegate delegate = new InternalStoreDelegate();
 
     private final MastershipListener mastershipListener = new InternalMastershipListener();
+    private NodeId localNodeId;
 
     private ScheduledExecutorService backgroundService;
 
@@ -113,6 +115,7 @@
     @Activate
     public void activate() {
         backgroundService = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "manager-background"));
+        localNodeId = clusterService.getLocalNode().id();
 
         store.setDelegate(delegate);
         eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
@@ -302,12 +305,10 @@
             checkValidity();
 
             log.info("Device {} connected", deviceId);
-            final NodeId myNodeId = clusterService.getLocalNode().id();
-
             // check my Role
             mastershipService.requestRoleFor(deviceId);
             final MastershipTerm term = termService.getMastershipTerm(deviceId);
-            if (term == null || !myNodeId.equals(term.master())) {
+            if (term == null || !localNodeId.equals(term.master())) {
                 log.info("Role of this node is STANDBY for {}", deviceId);
                 // TODO: Do we need to explicitly tell the Provider that
                 // this instance is not the MASTER
@@ -337,7 +338,6 @@
 
             log.info("Device {} disconnected from this node", deviceId);
 
-            DeviceEvent event = null;
             List<Port> ports = store.getPorts(deviceId);
             List<PortDescription> descs = Lists.newArrayList();
             ports.forEach(port ->
@@ -346,7 +346,7 @@
                                                      port.portSpeed())));
             store.updatePorts(this.provider().id(), deviceId, descs);
             try {
-                event = store.markOffline(deviceId);
+                post(store.markOffline(deviceId));
             } catch (IllegalStateException e) {
                 log.warn("Failed to mark {} offline", deviceId);
                 // only the MASTER should be marking off-line in normal cases,
@@ -360,26 +360,21 @@
 
                 // FIXME: Store semantics leaking out as IllegalStateException.
                 //  Consider revising store API to handle this scenario.
-
-                MastershipRole role = mastershipService.requestRoleFor(deviceId);
-                MastershipTerm term = termService.getMastershipTerm(deviceId);
-                final NodeId myNodeId = clusterService.getLocalNode().id();
-                // TODO: Move this type of check inside device clock manager, etc.
-                if (term != null && myNodeId.equals(term.master())) {
-                    log.info("Retry marking {} offline", deviceId);
-                    deviceClockProviderService.setMastershipTerm(deviceId, term);
-                    event = store.markOffline(deviceId);
-                } else {
-                    log.info("Failed again marking {} offline. {}", deviceId, role);
-                }
+                CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
+                roleFuture.whenComplete((role, error) -> {
+                    MastershipTerm term = termService.getMastershipTerm(deviceId);
+                    // TODO: Move this type of check inside device clock manager, etc.
+                    if (term != null && localNodeId.equals(term.master())) {
+                        log.info("Retry marking {} offline", deviceId);
+                        deviceClockProviderService.setMastershipTerm(deviceId, term);
+                        post(store.markOffline(deviceId));
+                    } else {
+                        log.info("Failed again marking {} offline. {}", deviceId, role);
+                    }
+                });
             } finally {
                 //relinquish master role and ability to be backup.
                 mastershipService.relinquishMastership(deviceId);
-
-                if (event != null) {
-                    log.info("Device {} disconnected from cluster", deviceId);
-                    post(event);
-                }
             }
         }
 
@@ -531,12 +526,11 @@
     private void reassertRole(final DeviceId did,
                               final MastershipRole nextRole) {
 
-        final NodeId myNodeId = clusterService.getLocalNode().id();
         MastershipRole myNextRole = nextRole;
         if (myNextRole == NONE) {
             mastershipService.requestRoleFor(did);
             MastershipTerm term = termService.getMastershipTerm(did);
-            if (term != null && myNodeId.equals(term.master())) {
+            if (term != null && localNodeId.equals(term.master())) {
                 myNextRole = MASTER;
             } else {
                 myNextRole = STANDBY;
@@ -597,21 +591,20 @@
             }
 
             final DeviceId did = event.subject();
-            final NodeId myNodeId = clusterService.getLocalNode().id();
 
             // myRole suggested by MastershipService
             MastershipRole myNextRole;
-            if (myNodeId.equals(event.roleInfo().master())) {
+            if (localNodeId.equals(event.roleInfo().master())) {
                 // confirm latest info
                 MastershipTerm term = termService.getMastershipTerm(did);
-                final boolean iHaveControl = term != null && myNodeId.equals(term.master());
+                final boolean iHaveControl = term != null && localNodeId.equals(term.master());
                 if (iHaveControl) {
                     deviceClockProviderService.setMastershipTerm(did, term);
                     myNextRole = MASTER;
                 } else {
                     myNextRole = STANDBY;
                 }
-            } else if (event.roleInfo().backups().contains(myNodeId)) {
+            } else if (event.roleInfo().backups().contains(localNodeId)) {
                 myNextRole = STANDBY;
             } else {
                 myNextRole = NONE;