[ONOS-6371] (vnet) consistent mastership manager

Consistent mastership store enables to share a mastership
information among distriritubed instances.

Change-Id: I2be33a8c43fcbcf220eab7a1279a8c810582c102
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
new file mode 100644
index 0000000..7c1c01c
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/virtual/impl/ConsistentVirtualDeviceMastershipStore.java
@@ -0,0 +1,472 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.incubator.store.virtual.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipAdminService;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.RoleInfo;
+import org.onosproject.incubator.net.virtual.NetworkId;
+import org.onosproject.incubator.net.virtual.VirtualNetworkMastershipStore;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipStoreDelegate;
+import org.onosproject.mastership.MastershipTerm;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.StoreSerializer;
+import org.slf4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
+import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
+import static org.onosproject.mastership.MastershipEvent.Type.SUSPENDED;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import static com.google.common.base.Preconditions.checkArgument;
+
+@Component(immediate = true, enabled = false)
+@Service
+public class ConsistentVirtualDeviceMastershipStore
+        extends AbstractVirtualStore<MastershipEvent, MastershipStoreDelegate>
+        implements VirtualNetworkMastershipStore {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipAdminService leadershipAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    private NodeId localNodeId;
+
+    private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
+            new MessageSubject("virtual-mastership-store-device-role-relinquish");
+
+    private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
+            Pattern.compile("vnet:(.*),device:(.*)");
+
+    private ExecutorService eventHandler;
+    private ExecutorService messageHandlingExecutor;
+    private ScheduledExecutorService transferExecutor;
+    private final LeadershipEventListener leadershipEventListener =
+            new InternalDeviceMastershipEventListener();
+
+    private static final String NODE_ID_NULL = "Node ID cannot be null";
+    private static final String NETWORK_ID_NULL = "Network ID cannot be null";
+    private static final String DEVICE_ID_NULL = "Device ID cannot be null";
+    private static final int WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS = 3000;
+
+    public static final StoreSerializer SERIALIZER = StoreSerializer.using(
+            KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.API)
+                    .register(MastershipRole.class)
+                    .register(MastershipEvent.class)
+                    .register(MastershipEvent.Type.class)
+                    .register(VirtualDeviceId.class)
+                    .build("VirtualMastershipStore"));
+
+    @Activate
+    public void activate() {
+        eventHandler = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/store/virtual/mastership", "event-handler", log));
+
+        messageHandlingExecutor =
+                Executors.newSingleThreadExecutor(
+                        groupedThreads("onos/store/virtual/mastership", "message-handler", log));
+        transferExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        groupedThreads("onos/store/virtual/mastership", "mastership-transfer-executor", log));
+        clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
+                                          SERIALIZER::decode,
+                                          this::relinquishLocalRole,
+                                          SERIALIZER::encode,
+                                          messageHandlingExecutor);
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.addListener(leadershipEventListener);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
+        leadershipService.removeListener(leadershipEventListener);
+        messageHandlingExecutor.shutdown();
+        transferExecutor.shutdown();
+        eventHandler.shutdown();
+        log.info("Stopped");
+    }
+
+    @Override
+    public CompletableFuture<MastershipRole> requestRole(NetworkId networkId,
+                                                         DeviceId deviceId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+        Leadership leadership = leadershipService.runForLeadership(leadershipTopic);
+        return CompletableFuture
+                .completedFuture(localNodeId.equals(leadership.leaderNodeId()) ?
+                                         MastershipRole.MASTER : MastershipRole.STANDBY);
+    }
+
+    @Override
+    public MastershipRole getRole(NetworkId networkId, NodeId nodeId, DeviceId deviceId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(nodeId != null, NODE_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+        Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+        NodeId leader = leadership == null ? null : leadership.leaderNodeId();
+        List<NodeId> candidates = leadership == null ?
+                ImmutableList.of() : ImmutableList.copyOf(leadership.candidates());
+        return Objects.equal(nodeId, leader) ?
+                MastershipRole.MASTER : candidates.contains(nodeId) ?
+                MastershipRole.STANDBY : MastershipRole.NONE;
+    }
+
+    @Override
+    public NodeId getMaster(NetworkId networkId, DeviceId deviceId) {
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        return leadershipService.getLeader(createDeviceMastershipTopic(networkId, deviceId));
+    }
+
+    @Override
+    public RoleInfo getNodes(NetworkId networkId, DeviceId deviceId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        Map<NodeId, MastershipRole> roles = Maps.newHashMap();
+        clusterService.getNodes()
+                .forEach((node) -> roles.put(node.id(),
+                                             getRole(networkId, node.id(), deviceId)));
+
+        NodeId master = null;
+        final List<NodeId> standbys = Lists.newLinkedList();
+
+        List<NodeId> candidates = leadershipService
+                .getCandidates(createDeviceMastershipTopic(networkId, deviceId));
+
+        for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
+            if (entry.getValue() == MastershipRole.MASTER) {
+                master = entry.getKey();
+            } else if (entry.getValue() == MastershipRole.STANDBY) {
+                standbys.add(entry.getKey());
+            }
+        }
+
+        List<NodeId> sortedStandbyList = candidates.stream()
+                .filter(standbys::contains).collect(Collectors.toList());
+
+        return new RoleInfo(master, sortedStandbyList);
+    }
+
+    @Override
+    public Set<DeviceId> getDevices(NetworkId networkId, NodeId nodeId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(nodeId != null, NODE_ID_NULL);
+
+        // FIXME This result contains REMOVED device.
+        // MastershipService cannot listen to DeviceEvent to GC removed topic,
+        // since DeviceManager depend on it.
+        // Reference count, etc. at LeadershipService layer?
+        return leadershipService
+                .ownedTopics(nodeId)
+                .stream()
+                .filter(this::isVirtualMastershipTopic)
+                .map(this::extractDeviceIdFromTopic)
+                .collect(Collectors.toSet());
+    }
+
+    @Override
+    public CompletableFuture<MastershipEvent> setMaster(NetworkId networkId,
+                                                        NodeId nodeId, DeviceId deviceId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(nodeId != null, NODE_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+        if (leadershipAdminService.promoteToTopOfCandidateList(leadershipTopic, nodeId)) {
+            transferExecutor.schedule(() -> leadershipAdminService.transferLeadership(leadershipTopic, nodeId),
+                                      WAIT_BEFORE_MASTERSHIP_HANDOFF_MILLIS, TimeUnit.MILLISECONDS);
+        }
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public MastershipTerm getTermFor(NetworkId networkId, DeviceId deviceId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+        Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+        return leadership != null && leadership.leaderNodeId() != null ?
+                MastershipTerm.of(leadership.leaderNodeId(),
+                                  leadership.leader().term()) : null;
+    }
+
+    @Override
+    public CompletableFuture<MastershipEvent> setStandby(NetworkId networkId,
+                                                         NodeId nodeId,
+                                                         DeviceId deviceId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(nodeId != null, NODE_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        NodeId currentMaster = getMaster(networkId, deviceId);
+        if (!nodeId.equals(currentMaster)) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+        List<NodeId> candidates = leadershipService.getCandidates(leadershipTopic);
+
+        NodeId newMaster = candidates.stream()
+                .filter(candidate -> !Objects.equal(nodeId, candidate))
+                .findFirst()
+                .orElse(null);
+        log.info("Transitioning to role {} for {}. Next master: {}",
+                 newMaster != null ? MastershipRole.STANDBY : MastershipRole.NONE,
+                 deviceId, newMaster);
+
+        if (newMaster != null) {
+            return setMaster(networkId, newMaster, deviceId);
+        }
+        return relinquishRole(networkId, nodeId, deviceId);
+    }
+
+    @Override
+    public CompletableFuture<MastershipEvent> relinquishRole(NetworkId networkId,
+                                                             NodeId nodeId,
+                                                             DeviceId deviceId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(nodeId != null, NODE_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        if (nodeId.equals(localNodeId)) {
+            return relinquishLocalRoleByNetwork(networkId, deviceId);
+        }
+
+        log.debug("Forwarding request to relinquish "
+                          + "role for vnet {} device {} to {}", deviceId, nodeId);
+        return clusterCommunicator.sendAndReceive(
+                new VirtualDeviceId(networkId, deviceId),
+                ROLE_RELINQUISH_SUBJECT,
+                SERIALIZER::encode,
+                SERIALIZER::decode,
+                nodeId);
+    }
+
+    private CompletableFuture<MastershipEvent> relinquishLocalRoleByNetwork(NetworkId networkId,
+                                                                   DeviceId deviceId) {
+        checkArgument(networkId != null, NETWORK_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        String leadershipTopic = createDeviceMastershipTopic(networkId, deviceId);
+        if (!leadershipService.getCandidates(leadershipTopic).contains(localNodeId)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        MastershipEvent.Type eventType = localNodeId.equals(leadershipService.getLeader(leadershipTopic)) ?
+                MastershipEvent.Type.MASTER_CHANGED : MastershipEvent.Type.BACKUPS_CHANGED;
+        leadershipService.withdraw(leadershipTopic);
+        return CompletableFuture.completedFuture(new MastershipEvent(eventType,
+                                                                     deviceId,
+                                                                     getNodes(networkId, deviceId)));
+    }
+
+    private CompletableFuture<MastershipEvent>
+    relinquishLocalRole(VirtualDeviceId virtualDeviceId) {
+        return relinquishLocalRoleByNetwork(virtualDeviceId.networkId,
+                                            virtualDeviceId.deviceId);
+    }
+
+    @Override
+    public void relinquishAllRole(NetworkId networkId, NodeId nodeId) {
+        // Noop. LeadershipService already takes care of detecting and purging stale locks.
+    }
+
+    private class InternalDeviceMastershipEventListener
+            implements LeadershipEventListener {
+
+        @Override
+        public boolean isRelevant(LeadershipEvent event) {
+            Leadership leadership = event.subject();
+            return isVirtualMastershipTopic(leadership.topic());
+        }
+
+        @Override
+        public void event(LeadershipEvent event) {
+            eventHandler.execute(() -> handleEvent(event));
+        }
+
+        private void handleEvent(LeadershipEvent event) {
+            Leadership leadership = event.subject();
+
+            NetworkId networkId = extractNetworkIdFromTopic(leadership.topic());
+            DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
+
+            RoleInfo roleInfo = event.type() != LeadershipEvent.Type.SERVICE_DISRUPTED ?
+                    getNodes(networkId, deviceId) : new RoleInfo();
+
+            switch (event.type()) {
+                case LEADER_AND_CANDIDATES_CHANGED:
+                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
+                                                                  deviceId, roleInfo));
+                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
+                                                                  deviceId, roleInfo));
+                    break;
+                case LEADER_CHANGED:
+                    notifyDelegate(networkId, new MastershipEvent(MASTER_CHANGED,
+                                                                  deviceId, roleInfo));
+                    break;
+                case CANDIDATES_CHANGED:
+                    notifyDelegate(networkId, new MastershipEvent(BACKUPS_CHANGED,
+                                                                  deviceId, roleInfo));
+                    break;
+                case SERVICE_DISRUPTED:
+                    notifyDelegate(networkId, new MastershipEvent(SUSPENDED,
+                                                                  deviceId, roleInfo));
+                    break;
+                case SERVICE_RESTORED:
+                    // Do nothing, wait for updates from peers
+                    break;
+                default:
+            }
+        }
+    }
+
+    private String createDeviceMastershipTopic(NetworkId networkId, DeviceId deviceId) {
+        return String.format("vnet:%s,device:%s", networkId.toString(), deviceId.toString());
+    }
+
+    /**
+     * Returns the virtual network identifier extracted from the topic.
+     *
+     * @param topic topic to extract virtual network identifier
+     * @return an extracted virtual network identifier
+     * @throws IllegalArgumentException the topic not match with the pattern
+     * used for virtual network mastership store
+     */
+    private NetworkId extractNetworkIdFromTopic(String topic) {
+        Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+        if (m.matches()) {
+            return NetworkId.networkId(Long.getLong(m.group(1)));
+        } else {
+            throw new IllegalArgumentException("Invalid virtual mastership topic: "
+                                                       + topic);
+        }
+    }
+
+    /**
+     * Returns the device identifier extracted from the topic.
+     *
+     * @param topic topic to extract device identifier
+     * @return an extracted virtual device identifier
+     * @throws IllegalArgumentException the topic not match with the pattern
+     * used for virtual network mastership store
+     */
+    private DeviceId extractDeviceIdFromTopic(String topic) {
+        Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+        if (m.matches()) {
+            return DeviceId.deviceId(m.group(2));
+        } else {
+            throw new IllegalArgumentException("Invalid virtual mastership topic: "
+                                                       + topic);
+        }
+    }
+
+    /**
+     * Returns whether the topic is matched with virtual mastership store topic.
+     *
+     * @param topic topic to match
+     * @return True when the topic matched with virtual network mastership store
+     */
+    private boolean isVirtualMastershipTopic(String topic) {
+        Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+        return m.matches();
+    }
+
+    /**
+     * A wrapper class used for the communication service.
+     */
+    private class VirtualDeviceId {
+        NetworkId networkId;
+        DeviceId deviceId;
+
+        public VirtualDeviceId(NetworkId networkId, DeviceId deviceId) {
+            this.networkId = networkId;
+            this.deviceId = deviceId;
+        }
+
+        public int hashCode() {
+            return Objects.hashCode(networkId, deviceId);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (obj instanceof VirtualDeviceId) {
+                final VirtualDeviceId that = (VirtualDeviceId) obj;
+                return this.getClass() == that.getClass() &&
+                        Objects.equal(this.networkId, that.networkId) &&
+                        Objects.equal(this.deviceId, that.deviceId);
+            }
+            return false;
+        }
+    }
+}