[SDFAB-821] Implement auto-recovery mechanism for failed mastership changes
With this fix the ONOS cluster will start a new term if the mastership is not
acknowledged by the device after 10 seconds.
Change-Id: Id0927181e4ba37ed1af52dd9843d4c2563469025
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index cf6265b..e50fe37 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -90,6 +90,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -98,6 +99,7 @@
import static com.google.common.collect.Multimaps.synchronizedListMultimap;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.lang.System.currentTimeMillis;
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.MastershipRole.MASTER;
import static org.onosproject.net.MastershipRole.NONE;
@@ -155,7 +157,7 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterCommunicationService communicationService;
- private ExecutorService portReqeustExecutor;
+ private ExecutorService portRequestExecutor;
/**
* List of all registered PortConfigOperator.
*/
@@ -198,6 +200,12 @@
private final Map<DeviceId, LocalStatus> deviceLocalStatus =
Maps.newConcurrentMap();
+ // To remember whether or not the role was acknowledged by the device
+ private final Map<DeviceId, Long> roleToAcknowledge =
+ Maps.newConcurrentMap();
+ private ScheduledExecutorService backgroundRoleChecker;
+ private static final int ROLE_TIMEOUT_SECONDS = 10;
+
@Activate
public void activate() {
portAnnotationOp = new PortAnnotationOperator(networkConfigService);
@@ -217,17 +225,27 @@
try {
mastershipCheck();
} catch (Exception e) {
- log.error("Exception thrown during integrity check", e);
+ log.error("Exception thrown during mastership integrity check", e);
}
}, 1, 1, TimeUnit.MINUTES);
- portReqeustExecutor = newSingleThreadExecutor();
+ portRequestExecutor = newSingleThreadExecutor();
communicationService.<InternalPortUpDownEvent>addSubscriber(
PORT_UPDOWN_SUBJECT,
SERIALIZER::decode,
this::handlePortRequest,
- portReqeustExecutor);
+ portRequestExecutor);
+
+ backgroundRoleChecker = newSingleThreadScheduledExecutor(
+ groupedThreads("onos/device", "manager-role", log));
+ backgroundRoleChecker.scheduleAtFixedRate(() -> {
+ try {
+ roleCheck();
+ } catch (Exception e) {
+ log.error("Exception thrown while verifying role acknowledgement from all devices", e);
+ }
+ }, 0, 10, TimeUnit.SECONDS);
log.info("Started");
}
@@ -240,7 +258,8 @@
mastershipService.removeListener(mastershipListener);
eventDispatcher.removeSink(DeviceEvent.class);
communicationService.removeSubscriber(PORT_UPDOWN_SUBJECT);
- portReqeustExecutor.shutdown();
+ portRequestExecutor.shutdown();
+ backgroundRoleChecker.shutdown();
log.info("Stopped");
}
@@ -486,6 +505,7 @@
post(store.markOffline(deviceId));
}
//relinquish master role and ability to be backup.
+ roleToAcknowledge.remove(deviceId);
mastershipService.relinquishMastership(deviceId).get();
} catch (InterruptedException e) {
log.warn("Interrupted while relinquishing role for {}", deviceId);
@@ -508,6 +528,8 @@
} else {
log.info("Failed marking {} offline. {}", deviceId, role);
}
+ // Stop the timer - just to prevent any race
+ roleToAcknowledge.remove(deviceId);
mastershipService.relinquishMastership(deviceId);
});
}
@@ -532,6 +554,55 @@
}
}
+ /**
+ * Checks if all the devices have acknowledged the mastership role.
+ */
+ private void roleCheck() {
+ log.debug("Checking role");
+ for (Device device : getDevices()) {
+ final DeviceId deviceId = device.id();
+ MastershipRole myRole = mastershipService.getLocalRole(deviceId);
+ log.trace("Checking device {}. Current role is {}", deviceId, myRole);
+ final AtomicBoolean exists = new AtomicBoolean(false);
+ final Long ts = roleToAcknowledge.compute(deviceId, (key, value) -> {
+ if (value == null) {
+ return null;
+ }
+ exists.set(true);
+ if (currentTimeMillis() - value < (ROLE_TIMEOUT_SECONDS * 1000)) {
+ return value;
+ }
+ return null;
+ });
+ // Nobody applied the role recently
+ if (!exists.get()) {
+ log.trace("Role was not applied or it has been acknowledged for device {}", deviceId);
+ continue;
+ }
+ // Timeout still on
+ if (ts != null) {
+ log.debug("Timeout expires in {} ms", ((ROLE_TIMEOUT_SECONDS * 1000) - currentTimeMillis() + ts));
+ continue;
+ }
+ if (myRole != MASTER) {
+ log.debug("Timeout is expired but current role is not MASTER ({}), nothing to do", myRole);
+ continue;
+ }
+ /* Switch failed to acknowledge master role we asked for.
+ Yield mastership to other instance*/
+ log.warn("Failed to assert role onto device {}. requested={}, no response",
+ deviceId, myRole);
+ mastershipService.relinquishMastership(deviceId).whenComplete((result, error) -> {
+ if (error != null) {
+ log.error("Exception while relinquishing mastership", error);
+ } else {
+ log.info("Successfully relinquished mastership for {}. Requesting new role", deviceId);
+ mastershipService.requestRoleFor(deviceId);
+ }
+ });
+ }
+ }
+
// Personalized device provider service issued to the supplied provider.
private class InternalDeviceProviderService
extends AbstractProviderService<DeviceProvider>
@@ -561,6 +632,8 @@
deviceId, newRole);
return false;
}
+ // Start the timer
+ roleToAcknowledge.put(deviceId, currentTimeMillis());
provider.roleChanged(deviceId, newRole);
// not triggering probe when triggered by provider service event
return true;
@@ -667,6 +740,7 @@
} finally {
try {
//relinquish master role and ability to be backup.
+ roleToAcknowledge.remove(deviceId);
mastershipService.relinquishMastership(deviceId).get();
} catch (InterruptedException e) {
log.warn("Interrupted while reliquishing role for {}", deviceId);
@@ -800,6 +874,7 @@
if (requested == null && response == null) {
// something was off with DeviceProvider, maybe check channel too?
log.warn("Failed to assert role onto Device {}", deviceId);
+ roleToAcknowledge.remove(deviceId);
mastershipService.relinquishMastership(deviceId);
return;
}
@@ -817,7 +892,9 @@
if (Objects.equals(requested, response)) {
if (Objects.equals(requested, expected)) {
- return;
+ // Stop the timer
+ log.info("Role has been acknowledged for device {}", deviceId);
+ roleToAcknowledge.remove(deviceId);
} else {
log.warn("Role mismatch on {}. Set to {}, but store demands {}",
deviceId, response, expected);
@@ -831,6 +908,8 @@
log.warn("Failed to assert role onto device {}. requested={}, response={}",
deviceId, requested, response);
if (requested == MastershipRole.MASTER) {
+ // Stop the timer
+ roleToAcknowledge.remove(deviceId);
mastershipService.relinquishMastership(deviceId);
// TODO: Shouldn't we be triggering event?
//final Device device = getDevice(deviceId);
@@ -900,6 +979,8 @@
log.warn("Provider for {} was not found. Cannot apply role {}", deviceId, newRole);
return false;
}
+ // Start the timer
+ roleToAcknowledge.put(deviceId, currentTimeMillis());
provider.roleChanged(deviceId, newRole);
if (newRole.equals(MastershipRole.MASTER)) {
@@ -1010,6 +1091,7 @@
+ "but this node cannot reach the device. "
+ "Relinquishing role. ",
myNextRole, did);
+ roleToAcknowledge.remove(did);
mastershipService.relinquishMastership(did);
}
return;