[ONOS-4460] Relinquish device role when partitioned away from cluster
Change-Id: I578029614cced96a2d4503e4fe3052c927f051ab
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
index 58dffce..f8be97e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedLeadershipStore.java
@@ -15,12 +15,16 @@
*/
package org.onosproject.store.cluster.impl;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.Consumer;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -35,6 +39,7 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.event.Change;
import org.onosproject.store.AbstractStore;
+import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.LeaderElector;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
@@ -57,8 +62,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
+ private ExecutorService statusChangeHandler;
private NodeId localNodeId;
private LeaderElector leaderElector;
+ private final Map<String, Leadership> localLeaderCache = Maps.newConcurrentMap();
private final Consumer<Change<Leadership>> leadershipChangeListener =
change -> {
@@ -77,22 +84,54 @@
eventType = LeadershipEvent.Type.CANDIDATES_CHANGED;
}
notifyDelegate(new LeadershipEvent(eventType, change.newValue()));
+ // Update local cache of currently held leaderships
+ if (Objects.equals(newValue.leaderNodeId(), localNodeId)) {
+ localLeaderCache.put(newValue.topic(), newValue);
+ } else {
+ localLeaderCache.remove(newValue.topic());
+ }
};
+ private final Consumer<Status> clientStatusListener = status ->
+ statusChangeHandler.execute(() -> handleStatusChange(status));
+
+ private void handleStatusChange(Status status) {
+ // Notify mastership Service of disconnect and reconnect
+ if (status == Status.ACTIVE) {
+ // Service Restored
+ localLeaderCache.forEach((topic, leadership) -> leaderElector.run(topic, localNodeId));
+ leaderElector.getLeaderships().forEach((topic, leadership) ->
+ notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_RESTORED, leadership)));
+ } else if (status == Status.SUSPENDED) {
+ // Service Suspended
+ localLeaderCache.forEach((topic, leadership) ->
+ notifyDelegate(new LeadershipEvent(LeadershipEvent.Type.SERVICE_DISRUPTED, leadership)));
+ } else {
+ // Should be only inactive state
+ return;
+ }
+ }
+
+
@Activate
public void activate() {
+ statusChangeHandler = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/dist/cluster/leadership", "status-change-handler", log));
localNodeId = clusterService.getLocalNode().id();
leaderElector = storageService.leaderElectorBuilder()
.withName("onos-leadership-elections")
.build()
.asLeaderElector();
leaderElector.addChangeListener(leadershipChangeListener);
+ leaderElector.addStatusChangeListener(clientStatusListener);
log.info("Started");
}
@Deactivate
public void deactivate() {
leaderElector.removeChangeListener(leadershipChangeListener);
+ leaderElector.removeStatusChangeListener(clientStatusListener);
+ statusChangeHandler.shutdown();
log.info("Stopped");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 3e83183..3b87637 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -18,6 +18,7 @@
import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
+import static org.onosproject.mastership.MastershipEvent.Type.SUSPENDED;
import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.base.Preconditions.checkArgument;
@@ -319,7 +320,8 @@
private void handleEvent(LeadershipEvent event) {
Leadership leadership = event.subject();
DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
- RoleInfo roleInfo = getNodes(deviceId);
+ RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
+ getNodes(deviceId) : new RoleInfo();
switch (event.type()) {
case LEADER_AND_CANDIDATES_CHANGED:
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
@@ -331,6 +333,12 @@
case CANDIDATES_CHANGED:
notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, roleInfo));
break;
+ case SERVICE_DISRUPTED:
+ notifyDelegate(new MastershipEvent(SUSPENDED, deviceId, roleInfo));
+ break;
+ case SERVICE_RESTORED:
+ // Do nothing, wait for updates from peers
+ break;
default:
return;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
index 9b2f11e..3e45091 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixLeaderElector.java
@@ -26,6 +26,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
+import java.util.function.Function;
import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
@@ -63,6 +64,19 @@
public static final String CHANGE_SUBJECT = "leadershipChangeEvents";
private final LoadingCache<String, CompletableFuture<Leadership>> cache;
+ Function<CopycatClient.State, Status> mapper = state -> {
+ switch (state) {
+ case CONNECTED:
+ return Status.ACTIVE;
+ case SUSPENDED:
+ return Status.SUSPENDED;
+ case CLOSED:
+ return Status.INACTIVE;
+ default:
+ throw new IllegalStateException("Unknown state " + state);
+ }
+ };
+
public AtomixLeaderElector(CopycatClient client, Properties properties) {
super(client, properties);
cache = CacheBuilder.newBuilder()
@@ -79,6 +93,7 @@
}
};
addStatusChangeListener(statusListener);
+ client.onStateChange(this::handleStateChange);
}
@Override
@@ -193,4 +208,8 @@
private boolean isListening() {
return !leadershipChangeListeners.isEmpty();
}
+
+ private void handleStateChange(CopycatClient.State state) {
+ statusChangeListeners().forEach(listener -> listener.accept(mapper.apply(state)));
+ }
}