Added a distributed leadership manager implementation on top of consistent map.

Change-Id: I3f3c6114df72e3ab033ba39c8608ac4ae11e5272
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
index a5b6656..83eea18 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
@@ -53,7 +53,7 @@
     private final DatabaseProxy<String, byte[]> proxy;
     private final Serializer serializer;
 
-    private static final int OPERATION_TIMEOUT_MILLIS = 1000;
+    private static final int OPERATION_TIMEOUT_MILLIS = 5000;
     private static final String ERROR_NULL_KEY = "Key cannot be null";
     private static final String ERROR_NULL_VALUE = "Null values are not allowed";
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 354068b..09cc645 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -17,11 +17,13 @@
 package org.onosproject.store.consistent.impl;
 
 import com.google.common.collect.Sets;
+
 import net.kuujo.copycat.cluster.ClusterConfig;
 import net.kuujo.copycat.cluster.Member;
 import net.kuujo.copycat.log.FileLog;
 import net.kuujo.copycat.netty.NettyTcpProtocol;
 import net.kuujo.copycat.protocol.Consistency;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -44,6 +46,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.slf4j.LoggerFactory.getLogger;
@@ -60,6 +64,7 @@
     public static final int COPYCAT_TCP_PORT = 7238; //  7238 = RAFT
     private static final String CONFIG_DIR = "../config";
     private static final String PARTITION_DEFINITION_FILE = "tablets.json";
+    private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
@@ -131,13 +136,23 @@
 
         partitionedDatabase = PartitionedDatabaseManager.create("onos-store", clusterConfig, databaseConfig);
 
+        CountDownLatch latch = new CountDownLatch(1);
         partitionedDatabase.open().whenComplete((db, error) -> {
             if (error != null) {
                 log.warn("Failed to open database.", error);
             } else {
+                latch.countDown();
                 log.info("Successfully opened database.");
             }
         });
+        try {
+            if (!latch.await(DATABASE_STARTUP_TIMEOUT_SEC, TimeUnit.SECONDS)) {
+                log.warn("Timeed out watiing for database to initialize.");
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            log.warn("Failed to complete database initialization.");
+        }
         log.info("Started");
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
new file mode 100644
index 0000000..69ea1f7
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -0,0 +1,371 @@
+package org.onosproject.store.consistent.impl;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipEvent;
+import org.onosproject.cluster.LeadershipEventListener;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.event.AbstractListenerRegistry;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * Distributed Lock Manager implemented on top of ConsistentMap.
+ * <p>
+ * This implementation makes use of cluster manager's failure
+ * detection capabilities to detect and purge stale locks.
+ * TODO: Ensure lock safety and liveness.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class DistributedLeadershipManager implements LeadershipService {
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected EventDeliveryService eventDispatcher;
+
+    private static final MessageSubject LEADERSHIP_EVENT_MESSAGE_SUBJECT =
+            new MessageSubject("distributed-leadership-manager-events");
+
+    private final Logger log = getLogger(getClass());
+    private ExecutorService messageHandlingExecutor;
+    private ScheduledExecutorService retryLeaderLockExecutor;
+    private ScheduledExecutorService deadLockDetectionExecutor;
+    private ScheduledExecutorService leadershipStatusBroadcaster;
+
+    private ConsistentMap<String, NodeId> lockMap;
+    private AbstractListenerRegistry<LeadershipEvent, LeadershipEventListener>
+        listenerRegistry;
+    private final Map<String, Leadership> leaderBoard = Maps.newConcurrentMap();
+    private NodeId localNodeId;
+
+    private Set<String> activeTopics = Sets.newConcurrentHashSet();
+
+    private static final int DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC = 2;
+    private static final int DEADLOCK_DETECTION_INTERVAL_SEC = 2;
+    private static final int LEADERSHIP_STATUS_UPDATE_INTERVAL = 2;
+
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .build()
+                .populate(1);
+        }
+    };
+
+    @Activate
+    public void activate() {
+        lockMap = storageService.createConsistentMap("onos-leader-locks", new Serializer() {
+            KryoNamespace kryo = new KryoNamespace.Builder()
+                        .register(KryoNamespaces.API).build();
+
+            @Override
+            public <T> byte[] encode(T object) {
+                return kryo.serialize(object);
+            }
+
+            @Override
+            public <T> T decode(byte[] bytes) {
+                return kryo.deserialize(bytes);
+            }
+        });
+
+        localNodeId = clusterService.getLocalNode().id();
+
+        messageHandlingExecutor = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/store/leadership", "message-handler"));
+        retryLeaderLockExecutor = Executors.newScheduledThreadPool(
+                4, groupedThreads("onos/store/leadership", "election-thread-%d"));
+        deadLockDetectionExecutor = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/store/leadership", "dead-lock-detector"));
+        leadershipStatusBroadcaster = Executors.newSingleThreadScheduledExecutor(
+                groupedThreads("onos/store/leadership", "peer-updater"));
+        clusterCommunicator.addSubscriber(
+                LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                new InternalLeadershipEventListener(),
+                messageHandlingExecutor);
+
+        deadLockDetectionExecutor.scheduleWithFixedDelay(
+                this::purgeStaleLocks, 0, DEADLOCK_DETECTION_INTERVAL_SEC, TimeUnit.SECONDS);
+        leadershipStatusBroadcaster.scheduleWithFixedDelay(
+                this::sendLeadershipStatus, 0, LEADERSHIP_STATUS_UPDATE_INTERVAL, TimeUnit.SECONDS);
+
+        listenerRegistry = new AbstractListenerRegistry<>();
+        eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
+
+        log.info("Started.");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        leaderBoard.forEach((topic, leadership) -> {
+            if (localNodeId.equals(leadership.leader())) {
+                withdraw(topic);
+            }
+        });
+
+        eventDispatcher.removeSink(LeadershipEvent.class);
+        clusterCommunicator.removeSubscriber(LEADERSHIP_EVENT_MESSAGE_SUBJECT);
+
+        messageHandlingExecutor.shutdown();
+        retryLeaderLockExecutor.shutdown();
+        deadLockDetectionExecutor.shutdown();
+        leadershipStatusBroadcaster.shutdown();
+
+        log.info("Stopped.");
+    }
+
+    @Override
+    public Map<String, Leadership> getLeaderBoard() {
+        return ImmutableMap.copyOf(leaderBoard);
+    }
+
+    @Override
+    public NodeId getLeader(String path) {
+        Leadership leadership = leaderBoard.get(path);
+        return leadership != null ? leadership.leader() : null;
+    }
+
+    @Override
+    public void runForLeadership(String path) {
+        log.info("Running for leadership for topic: {}", path);
+        activeTopics.add(path);
+        tryLeaderLock(path);
+    }
+
+    @Override
+    public void withdraw(String path) {
+        activeTopics.remove(path);
+        try {
+            if (lockMap.remove(path, localNodeId)) {
+                log.info("Sucessfully gave up leadership for {}", path);
+            }
+            // else we are not the current owner.
+        } catch (Exception e) {
+            log.warn("Failed to verify (and clear) any lock this node might be holding for {}", path, e);
+        }
+    }
+
+    @Override
+    public void addListener(LeadershipEventListener listener) {
+        listenerRegistry.addListener(listener);
+    }
+
+    @Override
+    public void removeListener(LeadershipEventListener listener) {
+        listenerRegistry.removeListener(listener);
+    }
+
+    private void tryLeaderLock(String path) {
+        if (!activeTopics.contains(path)) {
+            return;
+        }
+        try {
+            Versioned<NodeId> currentLeader = lockMap.get(path);
+            if (currentLeader != null) {
+                if (localNodeId.equals(currentLeader.value())) {
+                    log.info("Already has leadership for {}", path);
+                    notifyNewLeader(path, localNodeId, currentLeader.version());
+                } else {
+                    // someone else has leadership. will retry after sometime.
+                    retry(path);
+                }
+            } else {
+                if (lockMap.putIfAbsent(path, localNodeId) == null) {
+                    log.info("Assumed leadership for {}", path);
+                    // do a get again to get the version (epoch)
+                    Versioned<NodeId> newLeader = lockMap.get(path);
+                    notifyNewLeader(path, localNodeId, newLeader.version());
+                } else {
+                    // someone beat us to it.
+                    retry(path);
+                }
+            }
+        } catch (Exception e) {
+            log.warn("Attempt to acquire leadership lock for topic {} failed", path, e);
+            retry(path);
+        }
+    }
+
+    private void notifyNewLeader(String path, NodeId leader, long epoch) {
+        Leadership newLeadership = new Leadership(path, leader, epoch);
+        boolean updatedLeader = false;
+        synchronized (leaderBoard) {
+            Leadership currentLeader = leaderBoard.get(path);
+            if (currentLeader == null || currentLeader.epoch() < epoch) {
+                leaderBoard.put(path, newLeadership);
+                updatedLeader = true;
+            }
+        }
+
+        if (updatedLeader) {
+            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, newLeadership);
+            eventDispatcher.post(event);
+            clusterCommunicator.broadcast(
+                    new ClusterMessage(
+                            clusterService.getLocalNode().id(),
+                            LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                            SERIALIZER.encode(event)));
+        }
+    }
+
+    private void notifyRemovedLeader(String path, NodeId leader, long epoch) {
+        Leadership oldLeadership = new Leadership(path, leader, epoch);
+        boolean updatedLeader = false;
+        synchronized (leaderBoard) {
+            Leadership currentLeader = leaderBoard.get(path);
+            if (currentLeader != null && currentLeader.epoch() == oldLeadership.epoch()) {
+                leaderBoard.remove(path);
+                updatedLeader = true;
+            }
+        }
+
+        if (updatedLeader) {
+            LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_BOOTED, oldLeadership);
+            eventDispatcher.post(event);
+            clusterCommunicator.broadcast(
+                    new ClusterMessage(
+                            clusterService.getLocalNode().id(),
+                            LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                            SERIALIZER.encode(event)));
+        }
+    }
+
+    private class InternalLeadershipEventListener implements ClusterMessageHandler {
+
+        @Override
+        public void handle(ClusterMessage message) {
+            LeadershipEvent leadershipEvent =
+                    SERIALIZER.decode(message.payload());
+
+            log.trace("Leadership Event: time = {} type = {} event = {}",
+                    leadershipEvent.time(), leadershipEvent.type(),
+                    leadershipEvent);
+
+            Leadership leadershipUpdate = leadershipEvent.subject();
+            LeadershipEvent.Type eventType = leadershipEvent.type();
+            String topic = leadershipUpdate.topic();
+
+            boolean updateAccepted = false;
+
+            synchronized (leaderBoard) {
+                Leadership currentLeadership = leaderBoard.get(topic);
+                if (eventType.equals(LeadershipEvent.Type.LEADER_ELECTED)) {
+                    if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
+                        leaderBoard.put(topic, leadershipUpdate);
+                        updateAccepted = true;
+                    }
+                } else if (eventType.equals(LeadershipEvent.Type.LEADER_BOOTED)) {
+                    if (currentLeadership != null && currentLeadership.epoch() == leadershipUpdate.epoch()) {
+                        leaderBoard.remove(topic);
+                        updateAccepted = true;
+                    }
+                } else {
+                    throw new IllegalStateException("Unknown event type.");
+                }
+                if (updateAccepted) {
+                    eventDispatcher.post(leadershipEvent);
+                }
+            }
+        }
+    }
+
+    private void retry(String path) {
+        retryLeaderLockExecutor.schedule(
+                () -> tryLeaderLock(path),
+                DELAY_BETWEEN_LEADER_LOCK_ATTEMPTS_SEC,
+                TimeUnit.SECONDS);
+    }
+
+    private void purgeStaleLocks() {
+        try {
+            Set<Entry<String, Versioned<NodeId>>> entries = lockMap.entrySet();
+            entries.forEach(entry -> {
+                String path = entry.getKey();
+                NodeId nodeId = entry.getValue().value();
+                long epoch = entry.getValue().version();
+                if (clusterService.getState(nodeId) == ControllerNode.State.INACTIVE) {
+                    log.info("Lock for {} is held by {} which is currently inactive", path, nodeId);
+                    try {
+                        if (lockMap.remove(path, epoch)) {
+                            log.info("Successfully purged stale lock held by {} for {}", nodeId, path);
+                            notifyRemovedLeader(path, nodeId, epoch);
+                        }
+                    } catch (Exception e) {
+                        log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
+                    }
+                }
+                if (localNodeId.equals(nodeId) && !activeTopics.contains(path)) {
+                    log.info("Lock for {} is held by {} when it not running for leadership.", path, nodeId);
+                    try {
+                        if (lockMap.remove(path, epoch)) {
+                            log.info("Successfully purged stale lock held by {} for {}", nodeId, path);
+                            notifyRemovedLeader(path, nodeId, epoch);
+                        }
+                    } catch (Exception e) {
+                        log.warn("Failed to purge stale lock held by {} for {}", nodeId, path, e);
+                    }
+                }
+            });
+        } catch (Exception e) {
+            log.warn("Failed cleaning up stale locks", e);
+        }
+    }
+
+    private void sendLeadershipStatus() {
+        leaderBoard.forEach((path, leadership) -> {
+            if (leadership.leader().equals(localNodeId)) {
+                LeadershipEvent event = new LeadershipEvent(LeadershipEvent.Type.LEADER_ELECTED, leadership);
+                clusterCommunicator.broadcast(
+                        new ClusterMessage(
+                                clusterService.getLocalNode().id(),
+                                LEADERSHIP_EVENT_MESSAGE_SUBJECT,
+                                SERIALIZER.encode(event)));
+            }
+        });
+    }
+}
\ No newline at end of file