[ONOS-4460] Relinquish device role when partitioned away from cluster
Change-Id: I578029614cced96a2d4503e4fe3052c927f051ab
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
index 79bd717..23d42e2 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipEvent.java
@@ -41,13 +41,23 @@
* Signifies that the leader for a topic has changed.
*/
// TODO: We may not need this. We currently do not support a way for a current leader to step down
- // while still reamining a candidate
+ // while still remaining a candidate
LEADER_CHANGED,
/**
* Signifies a change in the list of candidates for a topic.
*/
- CANDIDATES_CHANGED
+ CANDIDATES_CHANGED,
+
+ /**
+ * Signifies the Leadership Elector is unavailable.
+ */
+ SERVICE_DISRUPTED,
+
+ /**
+ * Signifies the Leadership Elector is available again.
+ */
+ SERVICE_RESTORED
}
/**
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java
index 70c54b4..701e690 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipStore.java
@@ -24,7 +24,7 @@
public interface LeadershipStore extends Store<LeadershipEvent, LeadershipStoreDelegate> {
/**
- * Adds registration for the local instance to be leader for topic.
+ * Adds registration for the local instance to be part of the leadership contest for topic.
*
* @param topic leadership topic
* @return Updated leadership after operation is completed
diff --git a/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java b/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
index 1d3bbfd..1156787 100644
--- a/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
+++ b/core/api/src/main/java/org/onosproject/mastership/MastershipEvent.java
@@ -44,7 +44,13 @@
* the change in the backups list is accompanied by a change in
* master, the event is subsumed by MASTER_CHANGED.
*/
- BACKUPS_CHANGED
+ BACKUPS_CHANGED,
+
+ /**
+ * Signifies that the underlying storage for the Mastership state
+ * of this device is unavailable.
+ */
+ SUSPENDED
}
/**
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 21f0f76..8e09e8e 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -729,16 +729,17 @@
}
private void handleMastershipEvent(MastershipEvent event) {
- if (event.type() != MastershipEvent.Type.MASTER_CHANGED) {
+ if (event.type() == MastershipEvent.Type.BACKUPS_CHANGED) {
// Don't care if backup list changed.
return;
}
-
final DeviceId did = event.subject();
// myRole suggested by MastershipService
MastershipRole myNextRole;
- if (localNodeId.equals(event.roleInfo().master())) {
+ if (event.type() == MastershipEvent.Type.SUSPENDED) {
+ myNextRole = NONE; // FIXME STANDBY OR NONE?
+ } else if (localNodeId.equals(event.roleInfo().master())) {
// confirm latest info
MastershipTerm term = termService.getMastershipTerm(did);
final boolean iHaveControl = term != null && localNodeId.equals(term.master());
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index 58dffce..f8be97e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -15,12 +15,16 @@
*/
package org.onosproject.store.cluster.impl;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.Consumer;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -35,6 +39,7 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.AbstractStore;
+import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.LeaderElector;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
@@ -57,8 +62,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
+ private ExecutorService statusChangeHandler;
private NodeId localNodeId;
private LeaderElector leaderElector;
+ private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
private final Consumer<Change<Leadership>> leadershipChangeListener =
change -> {
@@ -77,22 +84,54 @@
eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
}
notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
+ // Update local cache of currently held leaderships
+ if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
+ localLeaderCache.put(newValue.topic(), newValue);
+ } else {
+ localLeaderCache.remove(newValue.topic());
+ }
};
+ private final Consumer<Status> clientStatusListener = status ->
+ statusChangeHandler.execute(() -> handleStatusChange(status));
+
+ private void handleStatusChange(Status status) {
+ // Notify mastership Service of disconnect and reconnect
+ if (status == Status.ACTIVE) {
+ // Service Restored
+ localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
+ leaderElector.getLeaderships().forEach((topic, leadership) ->
+ notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_RESTORED, leadership)));
+ } else if (status == Status.SUSPENDED) {
+ // Service Suspended
+ localLeaderCache.forEach((topic, leadership) ->
+ notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_DISRUPTED, leadership)));
+ } else {
+ // Should be only inactive state
+ return;
+ }
+ }
+
+
@Activate
public void activate() {
+ statusChangeHandler = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
localNodeId = clusterService.getLocalNode().id();
leaderElector = storageService.leaderElectorBuilder()
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
leaderElector.addChangeListener(leadershipChangeListener);
+ leaderElector.addStatusChangeListener(clientStatusListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
leaderElector.removeChangeListener(leadershipChangeListener);
+ leaderElector.removeStatusChangeListener(clientStatusListener);
+ statusChangeHandler.shutdown();
log.info("Stopped");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 3e83183..3b87637 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -18,6 +18,7 @@
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
+import static org.onosproject.mastership.MastershipEvent.Type.SUSPENDED;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.base.Preconditions.checkArgument;
@@ -319,7 +320,8 @@
private void handleEvent(LeadershipEvent event) {
Leadership leadership = event.subject();
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
- RoleInfo roleInfo = getNodes(deviceId);
+ RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
+ getNodes(deviceId) : new RoleInfo();
switch (event.type()) {
case LEADER_AND_CANDIDATES_CHANGED:
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
@@ -331,6 +333,12 @@
case CANDIDATES_CHANGED:
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
break;
+ case SERVICE_DISRUPTED:
+ notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, roleInfo));
+ break;
+ case SERVICE_RESTORED:
+ // Do nothing, wait for updates from peers
+ break;
default:
return;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 9b2f11e..3e45091 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
+import java.util.function.Function;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
@@ -63,6 +64,19 @@
public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
private final LoadingCache<String, CompletableFuture<Leadership>> cache;
+ Function<CopycatClient.State, Status> mapper = state -> {
+ switch (state) {
+ case CONNECTED:
+ return Status.ACTIVE;
+ case SUSPENDED:
+ return Status.SUSPENDED;
+ case CLOSED:
+ return Status.INACTIVE;
+ default:
+ throw new IllegalStateException("Unknown state " + state);
+ }
+ };
+
public AtomixLeaderElector(CopycatClient client, Properties properties) {
super(client, properties);
cache = CacheBuilder.newBuilder()
@@ -79,6 +93,7 @@
}
};
addStatusChangeListener(statusListener);
+ client.onStateChange(this::handleStateChange);
}
@Override
@@ -193,4 +208,8 @@
private boolean isListening() {
return !leadershipChangeListeners.isEmpty();
}
+
+ private void handleStateChange(CopycatClient.State state) {
+ statusChangeListeners().forEach(listener -> listener.accept(mapper.apply(state)));
+ }
}
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
index dca849e..e5423c2 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LldpLinkProvider.java
@@ -542,22 +542,20 @@
private class InternalRoleListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
- if (MastershipEvent.Type.BACKUPS_CHANGED.equals(event.type())) {
+ if (MastershipEvent.Type.MASTER_CHANGED.equals(event.type())) {
// only need new master events
- return;
+ eventExecutor.execute(() -> {
+ DeviceId deviceId = event.subject();
+ Device device = deviceService.getDevice(deviceId);
+ if (device == null) {
+ log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
+ return;
+ }
+ if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
+ updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
+ }
+ });
}
-
- eventExecutor.execute(() -> {
- DeviceId deviceId = event.subject();
- Device device = deviceService.getDevice(deviceId);
- if (device == null) {
- log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
- return;
- }
- if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
- updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
- }
- });
}
}