[ONOS-6371] (vnet) consistent mastership manager
Consistent mastership store enables to share a mastership
information among distriritubed instances.
Change-Id: I2be33a8c43fcbcf220eab7a1279a8c810582c102
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
new file mode 100644
index 0000000..7c1c01c
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
@@ -0,0 +1,472 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.store.virtual.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipAdminService;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.RoleInfo;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipStoreDelegate;
+import org.onosproject.mastership.MastershipTerm;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.StoreSerializer;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
+import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
+import static org.onosproject.mastership.MastershipEvent.Type.SUSPENDED;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Component(immediate = true, enabled = false)
+@Service
+public class ConsistentVirtualDeviceMastershipStore
+ extends AbstractVirtualStore<MastershipEvent, MastershipStoreDelegate>
+ implements VirtualNetworkMastershipStore {
+
+ private final Logger log = getLogger(getClass());
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LeadershipAdminService leadershipAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ private NodeId localNodeId;
+
+ private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
+ new MessageSubject("virtual-mastership-store-device-role-relinquish");
+
+ private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
+ Pattern.compile("vnet:(.*),device:(.*)");
+
+ private ExecutorService eventHandler;
+ private ExecutorService messageHandlingExecutor;
+ private ScheduledExecutorService transferExecutor;
+ private final LeadershipEventListener leadershipEventListener =
+ new InternalDeviceMastershipEventListener();
+
+ private static final String NODE_ID_NULL = "Node ID cannot be null";
+ private static final String NETWORK_ID_NULL = "Network ID cannot be null";
+ private static final String DEVICE_ID_NULL = "Device ID cannot be null";
+ private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
+
+ public static final StoreSerializer SERIALIZER = StoreSerializer.using(
+ KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(MastershipRole.class)
+ .register(MastershipEvent.class)
+ .register(MastershipEvent.Type.class)
+ .register(VirtualDeviceId.class)
+ .build("VirtualMastershipStore"));
+
+ @Activate
+ public void activate() {
+ eventHandler = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/virtual/mastership", "event-handler", log));
+
+ messageHandlingExecutor =
+ Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/virtual/mastership", "message-handler", log));
+ transferExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ groupedThreads("onos/store/virtual/mastership", "mastership-transfer-executor", log));
+ clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
+ SERIALIZER::decode,
+ this::relinquishLocalRole,
+ SERIALIZER::encode,
+ messageHandlingExecutor);
+ localNodeId = clusterService.getLocalNode().id();
+ leadershipService.addListener(leadershipEventListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
+ leadershipService.removeListener(leadershipEventListener);
+ messageHandlingExecutor.shutdown();
+ transferExecutor.shutdown();
+ eventHandler.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public CompletableFuture<MastershipRole> requestRole(NetworkId networkId,
+ DeviceId deviceId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+ String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+ Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
+ return CompletableFuture
+ .completedFuture(localNodeId.equals(leadership.leaderNodeId()) ?
+ MastershipRole.MASTER : MastershipRole.STANDBY);
+ }
+
+ @Override
+ public MastershipRole getRole(NetworkId networkId, NodeId nodeId, DeviceId deviceId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(nodeId != null, NODE_ID_NULL);
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+ String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+ Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+ NodeId leader = leadership == null ? null : leadership.leaderNodeId();
+ List<NodeId> candidates = leadership == null ?
+ ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+ return Objects.equal(nodeId, leader) ?
+ MastershipRole.MASTER : candidates.contains(nodeId) ?
+ MastershipRole.STANDBY : MastershipRole.NONE;
+ }
+
+ @Override
+ public NodeId getMaster(NetworkId networkId, DeviceId deviceId) {
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+ return leadershipService.getLeader(createDeviceMastershipTopic(networkId, deviceId));
+ }
+
+ @Override
+ public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+ Map<NodeId, MastershipRole> roles = Maps.newHashMap();
+ clusterService.getNodes()
+ .forEach((node) -> roles.put(node.id(),
+ getRole(networkId, node.id(), deviceId)));
+
+ NodeId master = null;
+ final List<NodeId> standbys = Lists.newLinkedList();
+
+ List<NodeId> candidates = leadershipService
+ .getCandidates(createDeviceMastershipTopic(networkId, deviceId));
+
+ for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
+ if (entry.getValue() == MastershipRole.MASTER) {
+ master = entry.getKey();
+ } else if (entry.getValue() == MastershipRole.STANDBY) {
+ standbys.add(entry.getKey());
+ }
+ }
+
+ List<NodeId> sortedStandbyList = candidates.stream()
+ .filter(standbys::contains).collect(Collectors.toList());
+
+ return new RoleInfo(master, sortedStandbyList);
+ }
+
+ @Override
+ public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(nodeId != null, NODE_ID_NULL);
+
+ // FIXME This result contains REMOVED device.
+ // MastershipService cannot listen to DeviceEvent to GC removed topic,
+ // since DeviceManager depend on it.
+ // Reference count, etc. at LeadershipService layer?
+ return leadershipService
+ .ownedTopics(nodeId)
+ .stream()
+ .filter(this::isVirtualMastershipTopic)
+ .map(this::extractDeviceIdFromTopic)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public CompletableFuture<MastershipEvent> setMaster(NetworkId networkId,
+ NodeId nodeId, DeviceId deviceId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(nodeId != null, NODE_ID_NULL);
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+ String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+ if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
+ transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
+ WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public MastershipTerm getTermFor(NetworkId networkId, DeviceId deviceId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+ String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+ Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+ return leadership != null && leadership.leaderNodeId() != null ?
+ MastershipTerm.of(leadership.leaderNodeId(),
+ leadership.leader().term()) : null;
+ }
+
+ @Override
+ public CompletableFuture<MastershipEvent> setStandby(NetworkId networkId,
+ NodeId nodeId,
+ DeviceId deviceId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(nodeId != null, NODE_ID_NULL);
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+ NodeId currentMaster = getMaster(networkId, deviceId);
+ if (!nodeId.equals(currentMaster)) {
+ return CompletableFuture.completedFuture(null);
+ }
+
+ String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+ List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
+
+ NodeId newMaster = candidates.stream()
+ .filter(candidate -> !Objects.equal(nodeId, candidate))
+ .findFirst()
+ .orElse(null);
+ log.info("Transitioning to role {} for {}. Next master: {}",
+ newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE,
+ deviceId, newMaster);
+
+ if (newMaster != null) {
+ return setMaster(networkId, newMaster, deviceId);
+ }
+ return relinquishRole(networkId, nodeId, deviceId);
+ }
+
+ @Override
+ public CompletableFuture<MastershipEvent> relinquishRole(NetworkId networkId,
+ NodeId nodeId,
+ DeviceId deviceId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(nodeId != null, NODE_ID_NULL);
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+ if (nodeId.equals(localNodeId)) {
+ return relinquishLocalRoleByNetwork(networkId, deviceId);
+ }
+
+ log.debug("Forwarding request to relinquish "
+ + "role for vnet {} device {} to {}", deviceId, nodeId);
+ return clusterCommunicator.sendAndReceive(
+ new VirtualDeviceId(networkId, deviceId),
+ ROLE_RELINQUISH_SUBJECT,
+ SERIALIZER::encode,
+ SERIALIZER::decode,
+ nodeId);
+ }
+
+ private CompletableFuture<MastershipEvent> relinquishLocalRoleByNetwork(NetworkId networkId,
+ DeviceId deviceId) {
+ checkArgument(networkId != null, NETWORK_ID_NULL);
+ checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+ String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+ if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
+ MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
+ leadershipService.withdraw(leadershipTopic);
+ return CompletableFuture.completedFuture(new MastershipEvent(eventType,
+ deviceId,
+ getNodes(networkId, deviceId)));
+ }
+
+ private CompletableFuture<MastershipEvent>
+ relinquishLocalRole(VirtualDeviceId virtualDeviceId) {
+ return relinquishLocalRoleByNetwork(virtualDeviceId.networkId,
+ virtualDeviceId.deviceId);
+ }
+
+ @Override
+ public void relinquishAllRole(NetworkId networkId, NodeId nodeId) {
+ // Noop. LeadershipService already takes care of detecting and purging stale locks.
+ }
+
+ private class InternalDeviceMastershipEventListener
+ implements LeadershipEventListener {
+
+ @Override
+ public boolean isRelevant(LeadershipEvent event) {
+ Leadership leadership = event.subject();
+ return isVirtualMastershipTopic(leadership.topic());
+ }
+
+ @Override
+ public void event(LeadershipEvent event) {
+ eventHandler.execute(() -> handleEvent(event));
+ }
+
+ private void handleEvent(LeadershipEvent event) {
+ Leadership leadership = event.subject();
+
+ NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
+ DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
+
+ RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
+ getNodes(networkId, deviceId) : new RoleInfo();
+
+ switch (event.type()) {
+ case LEADER_AND_CANDIDATES_CHANGED:
+ notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
+ deviceId, roleInfo));
+ notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
+ deviceId, roleInfo));
+ break;
+ case LEADER_CHANGED:
+ notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
+ deviceId, roleInfo));
+ break;
+ case CANDIDATES_CHANGED:
+ notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
+ deviceId, roleInfo));
+ break;
+ case SERVICE_DISRUPTED:
+ notifyDelegate(networkId, new MastershipEvent(SUSPENDED,
+ deviceId, roleInfo));
+ break;
+ case SERVICE_RESTORED:
+ // Do nothing, wait for updates from peers
+ break;
+ default:
+ }
+ }
+ }
+
+ private String createDeviceMastershipTopic(NetworkId networkId, DeviceId deviceId) {
+ return String.format("vnet:%s,device:%s", networkId.toString(), deviceId.toString());
+ }
+
+ /**
+ * Returns the virtual network identifier extracted from the topic.
+ *
+ * @param topic topic to extract virtual network identifier
+ * @return an extracted virtual network identifier
+ * @throws IllegalArgumentException the topic not match with the pattern
+ * used for virtual network mastership store
+ */
+ private NetworkId extractNetworkIdFromTopic(String topic) {
+ Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+ if (m.matches()) {
+ return NetworkId.networkId(Long.getLong(m.group(1)));
+ } else {
+ throw new IllegalArgumentException("Invalid virtual mastership topic: "
+ + topic);
+ }
+ }
+
+ /**
+ * Returns the device identifier extracted from the topic.
+ *
+ * @param topic topic to extract device identifier
+ * @return an extracted virtual device identifier
+ * @throws IllegalArgumentException the topic not match with the pattern
+ * used for virtual network mastership store
+ */
+ private DeviceId extractDeviceIdFromTopic(String topic) {
+ Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+ if (m.matches()) {
+ return DeviceId.deviceId(m.group(2));
+ } else {
+ throw new IllegalArgumentException("Invalid virtual mastership topic: "
+ + topic);
+ }
+ }
+
+ /**
+ * Returns whether the topic is matched with virtual mastership store topic.
+ *
+ * @param topic topic to match
+ * @return True when the topic matched with virtual network mastership store
+ */
+ private boolean isVirtualMastershipTopic(String topic) {
+ Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+ return m.matches();
+ }
+
+ /**
+ * A wrapper class used for the communication service.
+ */
+ private class VirtualDeviceId {
+ NetworkId networkId;
+ DeviceId deviceId;
+
+ public VirtualDeviceId(NetworkId networkId, DeviceId deviceId) {
+ this.networkId = networkId;
+ this.deviceId = deviceId;
+ }
+
+ public int hashCode() {
+ return Objects.hashCode(networkId, deviceId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof VirtualDeviceId) {
+ final VirtualDeviceId that = (VirtualDeviceId) obj;
+ return this.getClass() == that.getClass() &&
+ Objects.equal(this.networkId, that.networkId) &&
+ Objects.equal(this.deviceId, that.deviceId);
+ }
+ return false;
+ }
+ }
+}