Added couple of methods to LeadershipService.

Change-Id: I259b1a282a51af9425e941a720336f89d66f1097
diff --git a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
index 75cf90f..65ec687 100644
--- a/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/LeadershipService.java
@@ -16,6 +16,7 @@
 package org.onosproject.cluster;
 
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Service for leader election.
@@ -27,13 +28,27 @@
 public interface LeadershipService {
 
     /**
-     * Gets the most recent leader for the topic.
+     * Returns the current leader for the topic.
      * @param path topic
      * @return nodeId of the leader, null if so such topic exists.
      */
     NodeId getLeader(String path);
 
     /**
+     * Returns the current leadership info for the topic.
+     * @param path topic
+     * @return leadership info or null if so such topic exists.
+     */
+    Leadership getLeadership(String path);
+
+    /**
+     * Returns the set of topics owned by the specified node.
+     * @param nodeId node Id.
+     * @return set of topics for which this node is the current leader.
+     */
+    Set<String> ownedTopics(NodeId nodeId);
+
+    /**
      * Joins the leadership contest.
      * @param path topic for which this controller node wishes to be a leader.
      */
@@ -45,6 +60,10 @@
      */
     void withdraw(String path);
 
+    /**
+     * Returns the current leader board.
+     * @return mapping from topic to leadership info.
+     */
     Map<String, Leadership> getLeaderBoard();
 
     /**
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index a276367..ef651cb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -46,10 +46,12 @@
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.onlab.util.Tools.groupedThreads;
@@ -162,6 +164,28 @@
     }
 
     @Override
+    public Leadership getLeadership(String path) {
+        checkArgument(path != null);
+        Topic topic = topics.get(path);
+        if (topic != null) {
+            return new Leadership(topic.topicName(),
+                    topic.leader(),
+                    topic.term());
+        }
+        return null;
+    }
+
+    @Override
+    public Set<String> ownedTopics(NodeId nodeId) {
+        checkArgument(nodeId != null);
+        return topics.values()
+                .stream()
+                .filter(topic -> nodeId.equals(topic.leader()))
+                .map(topic -> topic.topicName)
+                .collect(Collectors.toSet());
+    }
+
+    @Override
     public void runForLeadership(String path) {
         checkArgument(path != null);
         Topic topic = new Topic(path);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
deleted file mode 100644
index 2e414d4..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/LeadershipManager.java
+++ /dev/null
@@ -1,336 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.cluster.impl;
-
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Leadership;
-import org.onosproject.cluster.LeadershipEvent;
-import org.onosproject.cluster.LeadershipEventListener;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-import org.onosproject.store.cluster.messaging.ClusterMessage;
-import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
-import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.service.Lock;
-import org.onosproject.store.service.LockService;
-import org.slf4j.Logger;
-
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Distributed implementation of LeadershipService that is based on the primitives exposed by
- * LockService.
- */
-@Component(enabled = false)
-@Service
-public class LeadershipManager implements LeadershipService {
-
-    private final Logger log = getLogger(getClass());
-
-    private static final int TERM_DURATION_MS = 2000;
-
-    // Time to wait before retrying leadership after
-    // a unexpected error.
-    private static final int WAIT_BEFORE_RETRY_MS = 2000;
-
-    // TODO: Make Thread pool size configurable.
-    private final ScheduledExecutorService threadPool =
-            Executors.newScheduledThreadPool(25, groupedThreads("onos/leadership", "manager-%d"));
-
-    private static final MessageSubject LEADERSHIP_UPDATES =
-            new MessageSubject("leadership-contest-updates");
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterService clusterService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterCommunicationService clusterCommunicator;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private LockService lockService;
-
-    private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
-
-    private final Map<String, Lock> openContests = Maps.newConcurrentMap();
-    private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
-    private NodeId localNodeId;
-
-    private final LeadershipEventListener peerAdvertiser = new PeerAdvertiser();
-    private final LeadershipEventListener leaderBoardUpdater = new LeaderBoardUpdater();
-
-    private ExecutorService messageHandlingExecutor;
-
-    public static final KryoSerializer SERIALIZER = new KryoSerializer() {
-        @Override
-        protected void setupKryoPool() {
-            serializerPool = KryoNamespace.newBuilder()
-                    .register(KryoNamespaces.API)
-                    .build()
-                    .populate(1);
-        }
-    };
-
-    @Activate
-    public void activate() {
-        localNodeId = clusterService.getLocalNode().id();
-
-        addListener(peerAdvertiser);
-        addListener(leaderBoardUpdater);
-
-        messageHandlingExecutor = Executors.newSingleThreadExecutor(
-                        groupedThreads("onos/store/leadership",
-                        "peer-advertisement-handler"));
-
-        clusterCommunicator.addSubscriber(
-                LEADERSHIP_UPDATES,
-                new PeerAdvertisementHandler(),
-                messageHandlingExecutor);
-
-        log.info("Started.");
-    }
-
-    @Deactivate
-    public void deactivate() {
-        removeListener(peerAdvertiser);
-        removeListener(leaderBoardUpdater);
-
-        clusterCommunicator.removeSubscriber(LEADERSHIP_UPDATES);
-
-        messageHandlingExecutor.shutdown();
-        threadPool.shutdown();
-
-        log.info("Stopped.");
-    }
-
-
-    @Override
-    public NodeId getLeader(String path) {
-        synchronized (leaderBoard) {
-            Leadership leadership = leaderBoard.get(path);
-            if (leadership != null) {
-                return leadership.leader();
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public void runForLeadership(String path) {
-        checkArgument(path != null);
-
-        if (openContests.containsKey(path)) {
-            log.info("Already in the leadership contest for {}", path);
-            return;
-        } else {
-            Lock lock = lockService.create(path);
-            openContests.put(path, lock);
-            threadPool.schedule(new TryLeadership(lock), 0, TimeUnit.MILLISECONDS);
-        }
-    }
-
-    @Override
-    public void withdraw(String path) {
-        checkArgument(path != null);
-
-        Lock lock = openContests.remove(path);
-
-        if (lock != null && lock.isLocked()) {
-            lock.unlock();
-            notifyListeners(
-                    new LeadershipEvent(
-                            LeadershipEvent.Type.LEADER_BOOTED,
-                            new Leadership(lock.path(), localNodeId, lock.epoch())));
-        }
-    }
-
-    @Override
-    public Map<String, Leadership> getLeaderBoard() {
-        return ImmutableMap.copyOf(leaderBoard);
-    }
-
-    @Override
-    public void addListener(LeadershipEventListener listener) {
-        checkArgument(listener != null);
-        listeners.add(listener);
-    }
-
-    @Override
-    public void removeListener(LeadershipEventListener listener) {
-        checkArgument(listener != null);
-        listeners.remove(listener);
-    }
-
-    private void notifyListeners(LeadershipEvent event) {
-        for (LeadershipEventListener listener : listeners) {
-            try {
-                listener.event(event);
-            } catch (Exception e) {
-                log.error("Notifying listener failed with exception.", e);
-            }
-        }
-    }
-
-    private void tryAcquireLeadership(String path) {
-        Lock lock = openContests.get(path);
-        if (lock == null) {
-            // withdrew from race.
-            return;
-        }
-        lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
-            if (error == null) {
-                threadPool.schedule(
-                        new ReelectionTask(lock),
-                        TERM_DURATION_MS / 2,
-                        TimeUnit.MILLISECONDS);
-                notifyListeners(
-                        new LeadershipEvent(
-                                LeadershipEvent.Type.LEADER_ELECTED,
-                                new Leadership(lock.path(), localNodeId, lock.epoch())));
-                return;
-            } else {
-                log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
-                threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
-            }
-        });
-    }
-
-    private class ReelectionTask implements Runnable {
-
-        private final Lock lock;
-
-        public ReelectionTask(Lock lock) {
-           this.lock = lock;
-        }
-
-        @Override
-        public void run() {
-            if (!openContests.containsKey(lock.path())) {
-                log.debug("Node withdrew from leadership race for {}. Cancelling reelection task.", lock.path());
-                return;
-            }
-
-            boolean lockExtended = false;
-            try {
-                lockExtended = lock.extendExpiration(TERM_DURATION_MS);
-            } catch (Exception e) {
-                log.warn("Attempt to extend lock failed with an exception.", e);
-            }
-
-            if (lockExtended) {
-                notifyListeners(
-                        new LeadershipEvent(
-                                LeadershipEvent.Type.LEADER_REELECTED,
-                                new Leadership(lock.path(), localNodeId, lock.epoch())));
-                threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
-            } else {
-                // Check if this node already withdrew from the contest, in which case
-                // we don't need to notify here.
-                if (openContests.containsKey(lock.path())) {
-                    notifyListeners(
-                            new LeadershipEvent(
-                                    LeadershipEvent.Type.LEADER_BOOTED,
-                                    new Leadership(lock.path(), localNodeId, lock.epoch())));
-                    // Retry leadership after a brief wait.
-                    threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
-                }
-            }
-        }
-    }
-
-    private class TryLeadership implements Runnable {
-        private final Lock lock;
-
-        public TryLeadership(Lock lock) {
-           this.lock = lock;
-        }
-
-        @Override
-        public void run() {
-            tryAcquireLeadership(lock.path());
-        }
-    }
-
-    private class PeerAdvertiser implements LeadershipEventListener {
-        @Override
-        public void event(LeadershipEvent event) {
-            // publish events originating on this host.
-            if (event.subject().leader().equals(localNodeId)) {
-                clusterCommunicator.broadcast(
-                        new ClusterMessage(
-                                localNodeId,
-                                LEADERSHIP_UPDATES,
-                                SERIALIZER.encode(event)));
-            }
-        }
-    }
-
-    private class PeerAdvertisementHandler implements ClusterMessageHandler {
-        @Override
-        public void handle(ClusterMessage message) {
-            LeadershipEvent event = SERIALIZER.decode(message.payload());
-            log.trace("Received {} from {}", event, message.sender());
-            notifyListeners(event);
-        }
-    }
-
-    private class LeaderBoardUpdater implements LeadershipEventListener {
-        @Override
-        public void event(LeadershipEvent event) {
-            Leadership leadershipUpdate = event.subject();
-            synchronized (leaderBoard) {
-                Leadership currentLeadership = leaderBoard.get(leadershipUpdate.topic());
-                switch (event.type()) {
-                case LEADER_ELECTED:
-                case LEADER_REELECTED:
-                        if (currentLeadership == null || currentLeadership.epoch() < leadershipUpdate.epoch()) {
-                            leaderBoard.put(leadershipUpdate.topic(), leadershipUpdate);
-                        }
-                    break;
-                case LEADER_BOOTED:
-                    if (currentLeadership != null && currentLeadership.epoch() <= leadershipUpdate.epoch()) {
-                        leaderBoard.remove(leadershipUpdate.topic());
-                    }
-                    break;
-                default:
-                    break;
-                }
-            }
-        }
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 69ea1f7..787b372 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -2,6 +2,7 @@
 
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
+import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.Map;
 import java.util.Map.Entry;
@@ -10,6 +11,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -171,6 +173,22 @@
     }
 
     @Override
+    public Leadership getLeadership(String path) {
+        checkArgument(path != null);
+        return leaderBoard.get(path);
+    }
+
+    @Override
+    public Set<String> ownedTopics(NodeId nodeId) {
+        checkArgument(nodeId != null);
+        return leaderBoard.entrySet()
+                    .stream()
+                    .filter(entry -> nodeId.equals(entry.getValue().leader()))
+                    .map(Entry::getKey)
+                    .collect(Collectors.toSet());
+    }
+
+    @Override
     public void runForLeadership(String path) {
         log.info("Running for leadership for topic: {}", path);
         activeTopics.add(path);
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
index 74349b3..d61772e 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleLeadershipManager.java
@@ -15,10 +15,14 @@
  */
 package org.onosproject.store.trivial.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.stream.Collectors;
 
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Reference;
@@ -55,6 +59,22 @@
     }
 
     @Override
+    public Leadership getLeadership(String path) {
+        checkArgument(path != null);
+        return elections.get(path) ? new Leadership(path, clusterService.getLocalNode().id(), 0) : null;
+    }
+
+    @Override
+    public Set<String> ownedTopics(NodeId nodeId) {
+        checkArgument(nodeId != null);
+        return elections.entrySet()
+                .stream()
+                .filter(Entry::getValue)
+                .map(Entry::getKey)
+                .collect(Collectors.toSet());
+    }
+
+    @Override
     public void runForLeadership(String path) {
         elections.put(path, true);
         for (LeadershipEventListener listener : listeners) {
@@ -88,5 +108,4 @@
     public void removeListener(LeadershipEventListener listener) {
         listeners.remove(listener);
     }
-
 }