[SDFAB-977][SDFAB-978] Implement persistent mastership logic in ONOS
Design doc can be found here:
https://docs.google.com/document/d/1UStD1hGCy5YQlFmaWfclq6uYROtMY56jWxUBwfUEsRM
Additionally, introduce few more changes in the DeviceManager:
- Always reassert the role if the instance has a valid role
- Test reachability through isReachable and probeReachability
Change-Id: I35183c04b65a98bc0332ed256be8333b53d6b68f
(cherry picked from commit b2f636bffd2783266826503755ba5f42dfdb2b75)
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index e50fe37..cd8ab96 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -23,7 +23,9 @@
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipAdminService;
import org.onosproject.mastership.MastershipEvent;
import org.onosproject.mastership.MastershipListener;
import org.onosproject.mastership.MastershipService;
@@ -146,6 +148,9 @@
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipAdminService mastershipAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected MastershipTermService termService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -157,7 +162,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterCommunicationService communicationService;
- private ExecutorService portRequestExecutor;
+ private ExecutorService clusterRequestExecutor;
/**
* List of all registered PortConfigOperator.
*/
@@ -177,6 +182,11 @@
private static final MessageSubject PORT_UPDOWN_SUBJECT =
new MessageSubject("port-updown-req");
+ private static final MessageSubject PROBE_SUBJECT =
+ new MessageSubject("probe-req");
+ private static final long PROBE_TIMEOUT_MILLIS = 5000;
+ private static final int PROBE_ATTEMPTS = 3;
+
private static final Serializer SERIALIZER = Serializer.using(
KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
@@ -229,13 +239,20 @@
}
}, 1, 1, TimeUnit.MINUTES);
- portRequestExecutor = newSingleThreadExecutor();
+ clusterRequestExecutor = newSingleThreadExecutor();
- communicationService.<InternalPortUpDownEvent>addSubscriber(
+ communicationService.addSubscriber(
PORT_UPDOWN_SUBJECT,
SERIALIZER::decode,
this::handlePortRequest,
- portRequestExecutor);
+ clusterRequestExecutor);
+
+ communicationService.addSubscriber(
+ PROBE_SUBJECT,
+ SERIALIZER::decode,
+ this::handleProbeRequest,
+ SERIALIZER::encode,
+ clusterRequestExecutor);
backgroundRoleChecker = newSingleThreadScheduledExecutor(
groupedThreads("onos/device", "manager-role", log));
@@ -258,7 +275,7 @@
mastershipService.removeListener(mastershipListener);
eventDispatcher.removeSink(DeviceEvent.class);
communicationService.removeSubscriber(PORT_UPDOWN_SUBJECT);
- portRequestExecutor.shutdown();
+ clusterRequestExecutor.shutdown();
backgroundRoleChecker.shutdown();
log.info("Stopped");
}
@@ -389,7 +406,7 @@
}
DeviceProvider provider = getProvider(deviceId);
if (provider != null) {
- boolean reachable = provider.isReachable(deviceId);
+ boolean reachable = probeReachability(deviceId);
if (reachable && !isLocallyConnected(deviceId)) {
deviceLocalStatus.put(deviceId, new LocalStatus(true, Instant.now()));
} else if (!reachable && isLocallyConnected(deviceId)) {
@@ -498,42 +515,35 @@
log.trace("Checking device {}. Current role is {}", deviceId, myRole);
if (!isReachable(deviceId)) {
if (myRole != NONE) {
- // can't be master if device is not reachable
- try {
- if (myRole == MASTER) {
- log.info("Local Role {}, Marking unreachable device {} offline", MASTER, deviceId);
- post(store.markOffline(deviceId));
- }
- //relinquish master role and ability to be backup.
- roleToAcknowledge.remove(deviceId);
- mastershipService.relinquishMastership(deviceId).get();
- } catch (InterruptedException e) {
- log.warn("Interrupted while relinquishing role for {}", deviceId);
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- log.error("Exception thrown while relinquishing role for {}", deviceId, e);
+ // Verify if the device is fully disconnected from the cluster
+ if (updateMastershipFor(deviceId) == null
+ && myRole == MASTER && isAvailable(deviceId)) {
+ log.info("Local Role {}, Marking unreachable device {} offline", MASTER, deviceId);
+ post(store.markOffline(deviceId));
}
} else {
- // check if the device has master and is available to the store, if not, mark it offline
- // only the nodes which has mastership role can mark any device offline.
- // This condition should never be hit unless in a device removed phase for NONE mastership roles.
- NodeId master = mastershipService.getMasterFor(deviceId);
- if (master == null && isAvailable(deviceId)) {
- CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
- roleFuture.thenAccept(role -> {
- MastershipTerm term = termService.getMastershipTerm(deviceId);
- if (term != null && localNodeId.equals(term.master())) {
- log.info("Marking unreachable device {} offline", deviceId);
- post(store.markOffline(deviceId));
- } else {
- log.info("Failed marking {} offline. {}", deviceId, role);
- }
- // Stop the timer - just to prevent any race
- roleToAcknowledge.remove(deviceId);
- mastershipService.relinquishMastership(deviceId);
- });
+ /* Firstly get a role and then check if the device is available in the store.
+ if it is, if this node is the master and the device is fully disconnected
+ from the cluster mark the device offline. In principle, this condition should
+ never be hit unless in a device removed phase for NONE mastership roles. */
+ try {
+ mastershipService.requestRoleFor(deviceId).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted waiting for Mastership", e);
+ } catch (ExecutionException e) {
+ log.error("Encountered an error waiting for Mastership", e);
+ }
+
+ MastershipTerm term = termService.getMastershipTerm(deviceId);
+ if (updateMastershipFor(deviceId) == null &&
+ term != null && localNodeId.equals(term.master()) &&
+ isAvailable(deviceId)) {
+ log.info("Marking unreachable device {} offline", deviceId);
+ post(store.markOffline(deviceId));
}
}
+ roleToAcknowledge.remove(deviceId);
continue;
}
@@ -542,15 +552,11 @@
post(store.markOnline(deviceId));
}
- if (myRole != NONE) {
- continue;
- }
+ log.info("{} is reachable - reasserting the role", deviceId);
- log.info("{} is reachable but did not have a valid role, reasserting", deviceId);
-
- // isReachable but was not MASTER or STANDBY, get a role and apply
- // Note: NONE triggers request to MastershipService
- reassertRole(deviceId, NONE);
+ /* Device is still reachable. It is useful for some protocols
+ to reassert the role. Note: NONE triggers request to MastershipService */
+ reassertRole(deviceId, myRole);
}
}
@@ -592,14 +598,7 @@
Yield mastership to other instance*/
log.warn("Failed to assert role onto device {}. requested={}, no response",
deviceId, myRole);
- mastershipService.relinquishMastership(deviceId).whenComplete((result, error) -> {
- if (error != null) {
- log.error("Exception while relinquishing mastership", error);
- } else {
- log.info("Successfully relinquished mastership for {}. Requesting new role", deviceId);
- mastershipService.requestRoleFor(deviceId);
- }
- });
+ updateMastershipFor(deviceId);
}
}
@@ -700,55 +699,55 @@
public void deviceDisconnected(DeviceId deviceId) {
checkNotNull(deviceId, DEVICE_ID_NULL);
checkValidity();
+ // Update the local status
deviceLocalStatus.put(deviceId, new LocalStatus(false, Instant.now()));
log.info("Device {} disconnected from this node: {}", deviceId,
clusterService.getLocalNode().id());
- List<PortDescription> descs = store.getPortDescriptions(provider().id(), deviceId)
- .map(desc -> ensurePortEnabledState(desc, false))
- .collect(Collectors.toList());
+ /* If none can reach the device, we will continue with the disconnection logic.
+ If there is one instance that reported device is still reachable, we hand over
+ the mastership to it if we are the current master, otherwise if we are a backup
+ we demote ourselves to the bottom of the backups list */
+ if (updateMastershipFor(deviceId) == null) {
+ log.info("Device {} is fully disconnected from the cluster", deviceId);
+ List<PortDescription> descs = store.getPortDescriptions(provider().id(), deviceId)
+ .map(desc -> ensurePortEnabledState(desc, false))
+ .collect(Collectors.toList());
+ store.updatePorts(this.provider().id(), deviceId, descs);
- store.updatePorts(this.provider().id(), deviceId, descs);
- try {
- if (mastershipService.isLocalMaster(deviceId)) {
- post(store.markOffline(deviceId));
- }
- } catch (IllegalStateException e) {
- log.warn("Failed to mark {} offline", deviceId);
- // only the MASTER should be marking off-line in normal cases,
- // but if I was the last STANDBY connection, etc. and no one else
- // was there to mark the device offline, this instance may need to
- // temporarily request for Master Role and mark offline.
-
- //there are times when this node will correctly have mastership, BUT
- //that isn't reflected in the ClockManager before the device disconnects.
- //we want to let go of the device anyways, so make sure this happens.
-
- // FIXME: Store semantics leaking out as IllegalStateException.
- // Consider revising store API to handle this scenario.
- CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
- roleFuture.whenComplete((role, error) -> {
- MastershipTerm term = termService.getMastershipTerm(deviceId);
- // TODO: Move this type of check inside device clock manager, etc.
- if (term != null && localNodeId.equals(term.master())) {
- log.info("Retry marking {} offline", deviceId);
- post(store.markOffline(deviceId));
- } else {
- log.info("Failed again marking {} offline. {}", deviceId, role);
- }
- });
- } finally {
try {
- //relinquish master role and ability to be backup.
+ if (mastershipService.isLocalMaster(deviceId)) {
+ post(store.markOffline(deviceId));
+ }
+ } catch (IllegalStateException e) {
+ log.warn("Failed to mark {} offline", deviceId);
+ // only the MASTER should be marking off-line in normal cases,
+ // but if I was the last STANDBY connection, etc. and no one else
+ // was there to mark the device offline, this instance may need to
+ // temporarily request for Master Role and mark offline.
+
+ //there are times when this node will correctly have mastership, BUT
+ //that isn't reflected in the ClockManager before the device disconnects.
+ //we want to let go of the device anyways, so make sure this happens.
+
+ // FIXME: Store semantics leaking out as IllegalStateException.
+ // Consider revising store API to handle this scenario.
+ CompletableFuture<MastershipRole> roleFuture = mastershipService.requestRoleFor(deviceId);
+ roleFuture.whenComplete((role, error) -> {
+ MastershipTerm term = termService.getMastershipTerm(deviceId);
+ // TODO: Move this type of check inside device clock manager, etc.
+ if (term != null && localNodeId.equals(term.master())) {
+ log.info("Retry marking {} offline", deviceId);
+ post(store.markOffline(deviceId));
+ } else {
+ log.info("Failed again marking {} offline. {}", deviceId, role);
+ }
+ });
+ } finally {
roleToAcknowledge.remove(deviceId);
- mastershipService.relinquishMastership(deviceId).get();
- } catch (InterruptedException e) {
- log.warn("Interrupted while reliquishing role for {}", deviceId);
- Thread.currentThread().interrupt();
- } catch (ExecutionException e) {
- log.error("Exception thrown while relinquishing role for {}", deviceId, e);
}
}
+
}
@Override
@@ -875,7 +874,7 @@
// something was off with DeviceProvider, maybe check channel too?
log.warn("Failed to assert role onto Device {}", deviceId);
roleToAcknowledge.remove(deviceId);
- mastershipService.relinquishMastership(deviceId);
+ updateMastershipFor(deviceId);
return;
}
@@ -901,7 +900,6 @@
// roleManager got the device to comply, but doesn't agree with
// the store; use the store's view, then try to reassert.
backgroundService.execute(() -> reassertRole(deviceId, expected));
- return;
}
} else {
// we didn't get back what we asked for. Reelect someone else.
@@ -910,17 +908,13 @@
if (requested == MastershipRole.MASTER) {
// Stop the timer
roleToAcknowledge.remove(deviceId);
- mastershipService.relinquishMastership(deviceId);
- // TODO: Shouldn't we be triggering event?
- //final Device device = getDevice(deviceId);
- //post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, device));
+ updateMastershipFor(deviceId);
} else if (requested == MastershipRole.STANDBY) {
// For P4RT devices, the response role will be NONE when this node is expected to be STANDBY
// but the stream channel is not opened correctly.
// Calling reassertRole will trigger the mechanism in GeneralDeviceProvider that
// attempts to re-establish the stream channel
backgroundService.execute(() -> reassertRole(deviceId, expected));
- return;
}
}
}
@@ -991,6 +985,16 @@
return true;
}
+ private boolean probeReachability(DeviceId deviceId) {
+ DeviceProvider provider = getProvider(deviceId);
+ if (provider == null) {
+ log.warn("Provider for {} was not found. Cannot probe reachability", deviceId);
+ return false;
+ }
+ return provider.isReachable(deviceId) && Tools.futureGetOrElse(provider.probeReachability(deviceId),
+ PROBE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS, Boolean.FALSE);
+ }
+
/**
* Reassert role for specified device connected to this node.
*
@@ -1030,7 +1034,7 @@
if (!applyRoleAndProbe(did, MASTER)) {
log.warn("Unsuccessful applying role {} to {}", myNextRole, did);
// immediately failed to apply role
- mastershipService.relinquishMastership(did);
+ updateMastershipFor(did);
// FIXME disconnect?
}
break;
@@ -1039,7 +1043,7 @@
if (!applyRoleAndProbe(did, STANDBY)) {
log.warn("Unsuccessful applying role {} to {}", myNextRole, did);
// immediately failed to apply role
- mastershipService.relinquishMastership(did);
+ updateMastershipFor(did);
// FIXME disconnect?
}
break;
@@ -1053,13 +1057,9 @@
}
private void handleMastershipEvent(MastershipEvent event) {
- if (event.type() == MastershipEvent.Type.BACKUPS_CHANGED) {
- // Don't care if backup list changed.
- return;
- }
final DeviceId did = event.subject();
- // myRole suggested by MastershipService
+ // myNextRole suggested by MastershipService event
MastershipRole myNextRole;
if (event.type() == MastershipEvent.Type.SUSPENDED) {
myNextRole = NONE; // FIXME STANDBY OR NONE?
@@ -1080,24 +1080,40 @@
final boolean isReachable = isReachable(did);
if (!isReachable) {
- // device is not connected to this node
+ // device is not connected to this node, nevertheless we should get a role
if (mastershipService.getLocalRole(did) == NONE) {
log.debug("Node was instructed to be {} role for {}, "
+ "but this node cannot reach the device "
- + "and role is already None. Ignoring request.",
+ + "and role is already None. Asking a new role "
+ + "and then apply the disconnection protocol.",
myNextRole, did);
+ try {
+ mastershipService.requestRoleFor(did).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ log.error("Interrupted waiting for Mastership", e);
+ } catch (ExecutionException e) {
+ log.error("Encountered an error waiting for Mastership", e);
+ }
} else if (myNextRole != NONE) {
log.warn("Node was instructed to be {} role for {}, "
- + "but this node cannot reach the device. "
- + "Relinquishing role. ",
+ + "but this node cannot reach the device. "
+ + "Apply the disconnection protocol.",
myNextRole, did);
- roleToAcknowledge.remove(did);
- mastershipService.relinquishMastership(did);
}
+ // Let's put some order in the candidates list
+ roleToAcknowledge.remove(did);
+ updateMastershipFor(did);
return;
}
- // device is connected to this node:
+ /* Device is connected to this node - always reassert the role.
+ Ideally, protocols like OpenFlow would not need to reassert the
+ role because the instances are only identified by the role. However,
+ other protocols like P4RT require to provide also an election id
+ which maybe different over time, by reasserting the role will guarantee
+ that updated election ids are communicated to the devices. It should not
+ cost us a lot as it is equivalent to a probe.*/
if (store.getDevice(did) != null) {
reassertRole(did, myNextRole);
} else {
@@ -1375,6 +1391,83 @@
}
/**
+ * Handler for remote probe requests.
+ *
+ * @param deviceId the device to check
+ * @return whether or not the device is reachable
+ */
+ private boolean handleProbeRequest(DeviceId deviceId) {
+ int attempt = 0;
+ // Let's do a number of attempts
+ while (attempt < PROBE_ATTEMPTS) {
+ if (!probeReachability(deviceId)) {
+ return false;
+ }
+ attempt++;
+ }
+ return true;
+ }
+
+ /**
+ * Update the mastership for this device. If there is a node able
+ * to reach the device and this node is the master move the
+ * mastership to the next node still connected to this device.
+ * If the current node is a backup, it demotes itself to the bottom
+ * of the candidates list
+ *
+ * @param deviceId the device for which we have to update the mastership
+ * @return the NodeId of any node that can reach the device, or null if
+ * none of the ONOS instances can reach the device
+ */
+ private NodeId updateMastershipFor(DeviceId deviceId) {
+ Map<NodeId, CompletableFuture<Boolean>> probes = Maps.newHashMap();
+ // Request a probe only if the node is ready
+ for (ControllerNode onosNode : clusterService.getNodes()) {
+ if (!clusterService.getState(onosNode.id()).isReady() || localNodeId.equals(onosNode.id())) {
+ continue;
+ }
+ probes.put(onosNode.id(), communicationService.sendAndReceive(deviceId, PROBE_SUBJECT, SERIALIZER::encode,
+ SERIALIZER::decode, onosNode.id()));
+ }
+
+ // Returns the first node able to reach the device
+ // FIXME [SDFAB-935] optimize by looking at the MastershipInfo
+ boolean isReachable;
+ NodeId nextMaster = null;
+ // FIXME Should we expose timeout? Understand if there is need to signal to the caller
+ for (Map.Entry<NodeId, CompletableFuture<Boolean>> probe : probes.entrySet()) {
+ isReachable = Tools.futureGetOrElse(probe.getValue(), PROBE_TIMEOUT_MILLIS,
+ TimeUnit.MILLISECONDS, Boolean.FALSE);
+ if (isReachable) {
+ nextMaster = probe.getKey();
+ }
+ }
+
+ // FIXME [SDFAB-935] optimize demote by looking at the MastershipInfo;
+ if (nextMaster != null) {
+ log.info("Device {} is still connected to {}", deviceId, nextMaster);
+ MastershipRole myRole = mastershipService.getLocalRole(deviceId);
+ if (myRole == MASTER) {
+ log.info("Handing over the mastership of {} to next master {}", deviceId, nextMaster);
+ mastershipAdminService.setRole(nextMaster, deviceId, MASTER);
+ /* Do not demote here because setRole can return before the mastership has been passed.
+ Current implementation promotes first the nextMaster as top of candidate list and then
+ transfer the leadership. We can use the BACKUP events to do demote or leverage periodic
+ checks.*/
+ } else if (myRole == STANDBY) {
+ log.info("Demote current instance to the bottom of the candidates list for {}", deviceId);
+ mastershipAdminService.demote(localNodeId, deviceId);
+ } else {
+ log.debug("No valid role for {}", deviceId);
+ }
+ }
+
+ return nextMaster;
+ }
+
+
+
+ /**
* Port Enable/Disable message sent to the device's MASTER node.
*/
private class InternalPortUpDownEvent {
diff --git a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
index 3e1e4d1..c2339f3 100644
--- a/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/device/impl/DeviceManagerTest.java
@@ -361,6 +361,11 @@
return local;
}
+ @Override
+ public ControllerNode.State getState(NodeId nodeId) {
+ return ControllerNode.State.READY;
+ }
+
}
private class TestNetworkConfigService extends NetworkConfigServiceAdapter {