LeadershipService and MastershipService/Store APIs return CompletableFutures so that callers can (optionally) chain together dependent operations
Change-Id: Ia00fcb7d98fbfce897527f67ea9690abf6fe846a
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 661ddca..d12664b 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -16,6 +16,7 @@
package org.onosproject.net.device.impl;
import com.google.common.collect.Lists;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -57,6 +58,7 @@
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -79,7 +81,6 @@
private static final String PORT_NUMBER_NULL = "Port number cannot be null";
private static final String DEVICE_DESCRIPTION_NULL = "Device description cannot be null";
private static final String PORT_DESCRIPTION_NULL = "Port description cannot be null";
- private static final String ROLE_NULL = "Role cannot be null";
private final Logger log = getLogger(getClass());
@@ -89,6 +90,7 @@
private final DeviceStoreDelegate delegate = new InternalStoreDelegate();
private final MastershipListener mastershipListener = new InternalMastershipListener();
+ private NodeId localNodeId;
private ScheduledExecutorService backgroundService;
@@ -113,6 +115,7 @@
@Activate
public void activate() {
backgroundService = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "manager-background"));
+ localNodeId = clusterService.getLocalNode().id();
store.setDelegate(delegate);
eventDispatcher.addSink(DeviceEvent.class, listenerRegistry);
@@ -302,12 +305,10 @@
checkValidity();
log.info("Device {} connected", deviceId);
- final NodeId myNodeId = clusterService.getLocalNode().id();
-
// check my Role
mastershipService.requestRoleFor(deviceId);
final MastershipTerm term = termService.getMastershipTerm(deviceId);
- if (term == null || !myNodeId.equals(term.master())) {
+ if (term == null || !localNodeId.equals(term.master())) {
log.info("Role of this node is STANDBY for {}", deviceId);
// TODO: Do we need to explicitly tell the Provider that
// this instance is not the MASTER
@@ -337,7 +338,6 @@
log.info("Device {} disconnected from this node", deviceId);
- DeviceEvent event = null;
List<Port> ports = store.getPorts(deviceId);
List<PortDescription> descs = Lists.newArrayList();
ports.forEach(port ->
@@ -346,7 +346,7 @@
port.portSpeed())));
store.updatePorts(this.provider().id(), deviceId, descs);
try {
- event = store.markOffline(deviceId);
+ post(store.markOffline(deviceId));
} catch (IllegalStateException e) {
log.warn("Failed to mark {} offline", deviceId);
// only the MASTER should be marking off-line in normal cases,
@@ -360,26 +360,21 @@
// FIXME: Store semantics leaking out as IllegalStateException.
// Consider revising store API to handle this scenario.
-
- MastershipRole role = mastershipService.requestRoleFor(deviceId);
- MastershipTerm term = termService.getMastershipTerm(deviceId);
- final NodeId myNodeId = clusterService.getLocalNode().id();
- // TODO: Move this type of check inside device clock manager, etc.
- if (term != null && myNodeId.equals(term.master())) {
- log.info("Retry marking {} offline", deviceId);
- deviceClockProviderService.setMastershipTerm(deviceId, term);
- event = store.markOffline(deviceId);
- } else {
- log.info("Failed again marking {} offline. {}", deviceId, role);
- }
+ CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
+ roleFuture.whenComplete((role, error) -> {
+ MastershipTerm term = termService.getMastershipTerm(deviceId);
+ // TODO: Move this type of check inside device clock manager, etc.
+ if (term != null && localNodeId.equals(term.master())) {
+ log.info("Retry marking {} offline", deviceId);
+ deviceClockProviderService.setMastershipTerm(deviceId, term);
+ post(store.markOffline(deviceId));
+ } else {
+ log.info("Failed again marking {} offline. {}", deviceId, role);
+ }
+ });
} finally {
//relinquish master role and ability to be backup.
mastershipService.relinquishMastership(deviceId);
-
- if (event != null) {
- log.info("Device {} disconnected from cluster", deviceId);
- post(event);
- }
}
}
@@ -531,12 +526,11 @@
private void reassertRole(final DeviceId did,
final MastershipRole nextRole) {
- final NodeId myNodeId = clusterService.getLocalNode().id();
MastershipRole myNextRole = nextRole;
if (myNextRole == NONE) {
mastershipService.requestRoleFor(did);
MastershipTerm term = termService.getMastershipTerm(did);
- if (term != null && myNodeId.equals(term.master())) {
+ if (term != null && localNodeId.equals(term.master())) {
myNextRole = MASTER;
} else {
myNextRole = STANDBY;
@@ -597,21 +591,20 @@
}
final DeviceId did = event.subject();
- final NodeId myNodeId = clusterService.getLocalNode().id();
// myRole suggested by MastershipService
MastershipRole myNextRole;
- if (myNodeId.equals(event.roleInfo().master())) {
+ if (localNodeId.equals(event.roleInfo().master())) {
// confirm latest info
MastershipTerm term = termService.getMastershipTerm(did);
- final boolean iHaveControl = term != null && myNodeId.equals(term.master());
+ final boolean iHaveControl = term != null && localNodeId.equals(term.master());
if (iHaveControl) {
deviceClockProviderService.setMastershipTerm(did, term);
myNextRole = MASTER;
} else {
myNextRole = STANDBY;
}
- } else if (event.roleInfo().backups().contains(myNodeId)) {
+ } else if (event.roleInfo().backups().contains(localNodeId)) {
myNextRole = STANDBY;
} else {
myNextRole = NONE;