WIP: device mastership store based on leadership service.

Change-Id: I6347718f46b6600f93974825816fb537e39abb44
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
new file mode 100644
index 0000000..25870b7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -0,0 +1,400 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.mastership.impl;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.mastership.MastershipEvent.Type.BACKUPS_CHANGED;
+import static org.onosproject.mastership.MastershipEvent.Type.MASTER_CHANGED;
+import static org.slf4j.LoggerFactory.getLogger;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.RoleInfo;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipStore;
+import org.onosproject.mastership.MastershipStoreDelegate;
+import org.onosproject.mastership.MastershipTerm;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.MastershipRole;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.serializers.StoreSerializer;
+import org.slf4j.Logger;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Implementation of the MastershipStore on top of Leadership Service.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class ConsistentDeviceMastershipStore
+    extends AbstractStore<MastershipEvent, MastershipStoreDelegate>
+    implements MastershipStore {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    private NodeId localNodeId;
+    private final Set<DeviceId> connectedDevices = Sets.newHashSet();
+
+    private static final MessageSubject ROLE_QUERY_SUBJECT =
+            new MessageSubject("mastership-store-device-role-query");
+    private static final MessageSubject ROLE_RELINQUISH_SUBJECT =
+            new MessageSubject("mastership-store-device-role-relinquish");
+
+    private static final Pattern DEVICE_MASTERSHIP_TOPIC_PATTERN =
+            Pattern.compile("/devices/(.*)/mastership");
+
+    private static final long PEER_REQUEST_TIMEOUT_MS = 5000;
+    private ExecutorService messageHandlingExecutor;
+    private final LeadershipEventListener leadershipEventListener =
+            new InternalDeviceMastershipEventListener();
+
+    private static final String NODE_ID_NULL = "Node ID cannot be null";
+    private static final String DEVICE_ID_NULL = "Device ID cannot be null";;
+
+    public static final StoreSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.API)
+                    .register(MastershipRole.class)
+                    .register(MastershipEvent.class)
+                    .build();
+        }
+    };
+
+    @Activate
+    public void activate() {
+        messageHandlingExecutor =
+                Executors.newSingleThreadExecutor(groupedThreads("onos/store/device/mastership", "message-handler"));
+        clusterCommunicator.addSubscriber(ROLE_QUERY_SUBJECT,
+                new RoleQueryHandler(),
+                messageHandlingExecutor);
+        clusterCommunicator.addSubscriber(ROLE_RELINQUISH_SUBJECT,
+               new RoleRelinquishHandler(),
+               messageHandlingExecutor);
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.addListener(leadershipEventListener);
+
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clusterCommunicator.removeSubscriber(ROLE_QUERY_SUBJECT);
+        clusterCommunicator.removeSubscriber(ROLE_RELINQUISH_SUBJECT);
+        messageHandlingExecutor.shutdown();
+        leadershipService.removeListener(leadershipEventListener);
+
+        log.info("Stoppped.");
+    }
+
+    @Override
+    public MastershipRole requestRole(DeviceId deviceId) {
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        if (connectedDevices.add(deviceId)) {
+            leadershipService.runForLeadership(leadershipTopic);
+            return MastershipRole.STANDBY;
+        } else {
+            Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+            if (leadership != null && leadership.leader().equals(localNodeId)) {
+                return MastershipRole.MASTER;
+            } else {
+                return MastershipRole.STANDBY;
+            }
+        }
+    }
+
+    @Override
+    public MastershipRole getRole(NodeId nodeId, DeviceId deviceId) {
+        checkArgument(nodeId != null, NODE_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+        if (leadership != null && nodeId.equals(leadership.leader())) {
+            return MastershipRole.MASTER;
+        }
+
+        if (localNodeId.equals(nodeId)) {
+            if (connectedDevices.contains(deviceId)) {
+                return MastershipRole.STANDBY;
+            } else {
+                return MastershipRole.NONE;
+            }
+        } else {
+            try {
+                MastershipRole role = complete(clusterCommunicator.sendAndReceive(
+                                                        new ClusterMessage(
+                                                                    localNodeId,
+                                                                    ROLE_QUERY_SUBJECT,
+                                                                    SERIALIZER.encode(deviceId)),
+                                                        nodeId));
+                return role == null ? MastershipRole.NONE : role;
+            } catch (IOException e) {
+                log.warn("Failed to query {} for {}'s role. Defaulting to NONE", nodeId, deviceId, e);
+                return MastershipRole.NONE;
+            }
+        }
+    }
+
+    @Override
+    public NodeId getMaster(DeviceId deviceId) {
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+        return leadership != null ? leadership.leader() : null;
+    }
+
+    @Override
+    public RoleInfo getNodes(DeviceId deviceId) {
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        Map<NodeId, MastershipRole> roles = Maps.newHashMap();
+        clusterService
+            .getNodes()
+            .stream()
+            .parallel()
+            .forEach((node) -> roles.put(node.id(), getRole(node.id(), deviceId)));
+
+        NodeId master = null;
+        final List<NodeId> standbys = Lists.newLinkedList();
+
+        for (Map.Entry<NodeId, MastershipRole> entry : roles.entrySet()) {
+            if (entry.getValue() == MastershipRole.MASTER) {
+                master = entry.getKey();
+            } else if (entry.getValue() == MastershipRole.STANDBY) {
+                standbys.add(entry.getKey());
+            }
+        }
+
+        return new RoleInfo(master, standbys);
+    }
+
+    @Override
+    public Set<DeviceId> getDevices(NodeId nodeId) {
+        checkArgument(nodeId != null, NODE_ID_NULL);
+
+        return leadershipService
+                .ownedTopics(nodeId)
+                .stream()
+                .filter(this::isDeviceMastershipTopic)
+                .map(this::extractDeviceIdFromTopic)
+                .collect(Collectors.toSet());
+    }
+
+    @Override
+    public MastershipEvent setMaster(NodeId nodeId, DeviceId deviceId) {
+        checkArgument(nodeId != null, NODE_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
+    }
+
+    @Override
+    public MastershipTerm getTermFor(DeviceId deviceId) {
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        Leadership leadership = leadershipService.getLeadership(leadershipTopic);
+        return leadership != null ? MastershipTerm.of(leadership.leader(), leadership.epoch()) : null;
+    }
+
+    @Override
+    public MastershipEvent setStandby(NodeId nodeId, DeviceId deviceId) {
+        checkArgument(nodeId != null, NODE_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        throw new UnsupportedOperationException("This operation is not supported in " + this.getClass().getName());
+    }
+
+    @Override
+    public MastershipEvent relinquishRole(NodeId nodeId, DeviceId deviceId) {
+        checkArgument(nodeId != null, NODE_ID_NULL);
+        checkArgument(deviceId != null, DEVICE_ID_NULL);
+
+        if (!nodeId.equals(localNodeId)) {
+            log.debug("Forwarding request to relinquish "
+                    + "role for device {} to {}", deviceId, nodeId);
+            try {
+                return complete(clusterCommunicator.sendAndReceive(
+                                                        new ClusterMessage(
+                                                                    localNodeId,
+                                                                    ROLE_RELINQUISH_SUBJECT,
+                                                                    SERIALIZER.encode(deviceId)),
+                                                        nodeId));
+            } catch (IOException e) {
+                log.warn("Failed to send a request to relinquish role for {} to {}", deviceId, nodeId, e);
+                return null;
+            }
+        }
+
+        // Check if this node is can be managed by this node.
+        if (!connectedDevices.contains(deviceId)) {
+            return null;
+        }
+
+        String leadershipTopic = createDeviceMastershipTopic(deviceId);
+        Leadership currentLeadership = leadershipService.getLeadership(leadershipTopic);
+
+        MastershipEvent.Type eventType = null;
+        if (currentLeadership != null && currentLeadership.leader().equals(localNodeId)) {
+            eventType = MastershipEvent.Type.MASTER_CHANGED;
+        } else {
+            eventType = MastershipEvent.Type.BACKUPS_CHANGED;
+        }
+
+        connectedDevices.remove(deviceId);
+        leadershipService.withdraw(leadershipTopic);
+
+        return new MastershipEvent(eventType, deviceId, getNodes(deviceId));
+    }
+
+    private class RoleQueryHandler implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            DeviceId deviceId = SERIALIZER.decode(message.payload());
+            try {
+                message.respond(SERIALIZER.encode(getRole(localNodeId, deviceId)));
+            } catch (IOException e) {
+                log.error("Failed to responsd to role query", e);
+            }
+        }
+    }
+
+
+    @Override
+    public void relinquishAllRole(NodeId nodeId) {
+        // Noop. LeadershipService already takes care of detecting and purging deadlocks.
+    }
+
+    private class RoleRelinquishHandler implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            DeviceId deviceId = SERIALIZER.decode(message.payload());
+            try {
+                message.respond(SERIALIZER.encode(relinquishRole(localNodeId, deviceId)));
+            } catch (IOException e) {
+                log.error("Failed to relinquish role.", e);
+            }
+        }
+    }
+
+    private class InternalDeviceMastershipEventListener implements LeadershipEventListener {
+        @Override
+        public void event(LeadershipEvent event) {
+            Leadership leadership = event.subject();
+            if (!isDeviceMastershipTopic(leadership.topic())) {
+                return;
+            }
+            NodeId nodeId = leadership.leader();
+            DeviceId deviceId = extractDeviceIdFromTopic(leadership.topic());
+            if (Objects.equal(nodeId, localNodeId) && connectedDevices.contains(deviceId)) {
+                switch (event.type()) {
+                case LEADER_ELECTED:
+                    notifyDelegate(new MastershipEvent(MASTER_CHANGED, deviceId, getNodes(deviceId)));
+                    break;
+                case LEADER_REELECTED:
+                    // There is no concept of leader re-election in the new distributed leadership manager.
+                    throw new IllegalStateException("Unexpected event type");
+                case LEADER_BOOTED:
+                    notifyDelegate(new MastershipEvent(BACKUPS_CHANGED, deviceId, getNodes(deviceId)));
+                    break;
+                default:
+                    return;
+                }
+            }
+        }
+    }
+
+    private String createDeviceMastershipTopic(DeviceId deviceId) {
+        return "/devices/" + deviceId.toString() + "/mastership";
+    }
+
+    private DeviceId extractDeviceIdFromTopic(String topic) {
+        Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+        if (m.matches()) {
+            return DeviceId.deviceId(m.group(1));
+        } else {
+            throw new IllegalArgumentException("Invalid device mastership topic: " + topic);
+        }
+    }
+
+    private boolean isDeviceMastershipTopic(String topic) {
+        Matcher m = DEVICE_MASTERSHIP_TOPIC_PATTERN.matcher(topic);
+        return m.matches();
+    }
+
+    private <T> T complete(Future<byte[]> future) {
+        try {
+            return SERIALIZER.decode(future.get(PEER_REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.error("Interrupted while waiting for operation to complete.", e);
+            return null;
+        } catch (TimeoutException | ExecutionException e) {
+            log.error("Failed remote operation", e);
+            return null;
+        }
+    }
+}
\ No newline at end of file