Updated the HazelcastLeadershipService implemenation:
 * Every listener receives all leadership events, even for new topics and
   topics the local instance is not running for a leadership election
 * Now getLeaderBoard() returns all leadership info

Change-Id: Ia11a10ed287d2f8d905dd987beb8052c35be6cf1
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java
index 8e2a7ee..b3cd370 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/HazelcastLeadershipService.java
@@ -71,7 +71,8 @@
  */
 @Component(immediate = true)
 @Service
-public class HazelcastLeadershipService implements LeadershipService {
+public class HazelcastLeadershipService implements LeadershipService,
+                                        MessageListener<byte[]> {
     private static final Logger log =
         LoggerFactory.getLogger(HazelcastLeadershipService.class);
 
@@ -87,6 +88,7 @@
 
     private static final long LEADERSHIP_PERIODIC_INTERVAL_MS = 5 * 1000; // 5s
     private static final long LEADERSHIP_REMOTE_TIMEOUT_MS = 15 * 1000;  // 15s
+    private static final String TOPIC_HZ_ID = "LeadershipService/AllTopics";
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
@@ -102,18 +104,29 @@
     private final Map<String, Topic> topics = Maps.newConcurrentMap();
     private NodeId localNodeId;
 
+    private ITopic<byte[]> leaderTopic;
+    private String leaderTopicRegistrationId;
+
     @Activate
     protected void activate() {
         localNodeId = clusterService.getLocalNode().id();
         listenerRegistry = new AbstractListenerRegistry<>();
         eventDispatcher.addSink(LeadershipEvent.class, listenerRegistry);
 
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setGlobalOrderingEnabled(true);
+        topicConfig.setName(TOPIC_HZ_ID);
+        storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
+        leaderTopic = storeService.getHazelcastInstance().getTopic(TOPIC_HZ_ID);
+        leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
+
         log.info("Hazelcast Leadership Service started");
     }
 
     @Deactivate
     protected void deactivate() {
         eventDispatcher.removeSink(LeadershipEvent.class);
+        leaderTopic.removeMessageListener(leaderTopicRegistrationId);
 
         for (Topic topic : topics.values()) {
             topic.stop();
@@ -139,6 +152,9 @@
         Topic oldTopic = topics.putIfAbsent(path, topic);
         if (oldTopic == null) {
             topic.start();
+            topic.runForLeadership();
+        } else {
+            oldTopic.runForLeadership();
         }
     }
 
@@ -156,11 +172,6 @@
     public Map<String, Leadership> getLeaderBoard() {
         Map<String, Leadership> result = new HashMap<>();
 
-        //
-        // Get the leaders for the topics.
-        // NOTE: A topic is listed only if this instance is running for
-        // a leadership for that topic.
-        //
         for (Topic topic : topics.values()) {
             Leadership leadership = new Leadership(topic.topicName(),
                                                    topic.leader(),
@@ -180,12 +191,41 @@
         listenerRegistry.removeListener(listener);
     }
 
+    @Override
+    public void onMessage(Message<byte[]> message) {
+        LeadershipEvent leadershipEvent =
+            SERIALIZER.decode(message.getMessageObject());
+
+        log.debug("Leadership Event: time = {} type = {} event = {}",
+                  leadershipEvent.time(), leadershipEvent.type(),
+                  leadershipEvent);
+
+        //
+        // If there is no entry for the topic, then create a new one to
+        // keep track of the leadership, but don't run for leadership itself.
+        //
+        String topicName = leadershipEvent.subject().topic();
+        Topic topic = topics.get(topicName);
+        if (topic == null) {
+            topic = new Topic(topicName);
+            Topic oldTopic = topics.putIfAbsent(topicName, topic);
+            if (oldTopic == null) {
+                topic.start();
+            } else {
+                topic = oldTopic;
+            }
+        }
+        topic.receivedLeadershipEvent(leadershipEvent);
+        eventDispatcher.post(leadershipEvent);
+    }
+
     /**
      * Class for keeping per-topic information.
      */
-    private final class Topic implements MessageListener<byte[]> {
+    private final class Topic {
         private final String topicName;
         private volatile boolean isShutdown = true;
+        private volatile boolean isRunningForLeadership = false;
         private volatile long lastLeadershipUpdateMs = 0;
         private ExecutorService leaderElectionExecutor;
 
@@ -193,8 +233,6 @@
         private Lock leaderLock;
         private Future<?> getLockFuture;
         private Future<?> periodicProcessingFuture;
-        private ITopic<byte[]> leaderTopic;
-        private String leaderTopicRegistrationId;
 
         /**
          * Constructor.
@@ -224,33 +262,14 @@
         }
 
         /**
-         * Starts leadership election for the topic.
+         * Starts operation.
          */
         private void start() {
             isShutdown = false;
-            String lockHzId = "LeadershipService/" + topicName + "/lock";
-            String topicHzId = "LeadershipService/" + topicName + "/topic";
-            leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
-
             String threadPoolName = "leader-election-" + topicName + "-%d";
             leaderElectionExecutor = Executors.newScheduledThreadPool(2,
                                         namedThreads(threadPoolName));
 
-            TopicConfig topicConfig = new TopicConfig();
-            topicConfig.setGlobalOrderingEnabled(true);
-            topicConfig.setName(topicHzId);
-            storeService.getHazelcastInstance().getConfig().addTopicConfig(topicConfig);
-
-            leaderTopic =
-                storeService.getHazelcastInstance().getTopic(topicHzId);
-            leaderTopicRegistrationId = leaderTopic.addMessageListener(this);
-
-            getLockFuture = leaderElectionExecutor.submit(new Runnable() {
-                    @Override
-                    public void run() {
-                        doLeaderElectionThread();
-                    }
-                });
             periodicProcessingFuture =
                 leaderElectionExecutor.submit(new Runnable() {
                     @Override
@@ -261,25 +280,43 @@
         }
 
         /**
+         * Runs for leadership.
+         */
+        private void runForLeadership() {
+            if (isRunningForLeadership) {
+                return;         // Nothing to do: already running
+            }
+            if (isShutdown) {
+                start();
+            }
+            String lockHzId = "LeadershipService/" + topicName + "/lock";
+            leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
+            getLockFuture = leaderElectionExecutor.submit(new Runnable() {
+                    @Override
+                    public void run() {
+                        doLeaderElectionThread();
+                    }
+                });
+        }
+
+        /**
          * Stops leadership election for the topic.
          */
         private void stop() {
             isShutdown = true;
-            leaderTopic.removeMessageListener(leaderTopicRegistrationId);
+            isRunningForLeadership = false;
             // getLockFuture.cancel(true);
             // periodicProcessingFuture.cancel(true);
             leaderElectionExecutor.shutdownNow();
         }
 
-        @Override
-        public void onMessage(Message<byte[]> message) {
-            LeadershipEvent leadershipEvent =
-                SERIALIZER.decode(message.getMessageObject());
+        /**
+         * Received a Leadership Event.
+         *
+         * @param leadershipEvent the received Leadership Event
+         */
+        private void receivedLeadershipEvent(LeadershipEvent leadershipEvent) {
             NodeId eventLeaderId = leadershipEvent.subject().leader();
-
-            log.debug("Leadership Event: time = {} type = {} event = {}",
-                      leadershipEvent.time(), leadershipEvent.type(),
-                      leadershipEvent);
             if (!leadershipEvent.subject().topic().equals(topicName)) {
                 return;         // Not our topic: ignore
             }
@@ -296,23 +333,21 @@
                     // Another leader: if we are also a leader, then give up
                     // leadership and run for re-election.
                     //
-                    if ((leader != null) &&
-                        leader.equals(localNodeId)) {
-                        getLockFuture.cancel(true);
+                    if ((leader != null) && leader.equals(localNodeId)) {
+                        if (getLockFuture != null) {
+                            getLockFuture.cancel(true);
+                        }
                     } else {
                         // Just update the current leader
                         leader = leadershipEvent.subject().leader();
                         lastLeadershipUpdateMs = System.currentTimeMillis();
                     }
-                    eventDispatcher.post(leadershipEvent);
                     break;
                 case LEADER_BOOTED:
                     // Remove the state for the current leader
-                    if ((leader != null) &&
-                        eventLeaderId.equals(leader)) {
+                    if ((leader != null) && eventLeaderId.equals(leader)) {
                         leader = null;
                     }
-                    eventDispatcher.post(leadershipEvent);
                     break;
                 default:
                     break;
@@ -340,7 +375,7 @@
                             leadershipEvent = new LeadershipEvent(
                                 LeadershipEvent.Type.LEADER_REELECTED,
                                 new Leadership(topicName, localNodeId, 0));
-                            // Dispatch to all remote instances
+                            // Dispatch to all instances
                             leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
                         } else {
                             //
@@ -404,7 +439,6 @@
                     leadershipEvent = new LeadershipEvent(
                         LeadershipEvent.Type.LEADER_ELECTED,
                         new Leadership(topicName, localNodeId, 0));
-                    eventDispatcher.post(leadershipEvent);
                     leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
                 }
 
@@ -430,7 +464,6 @@
                     leadershipEvent = new LeadershipEvent(
                                 LeadershipEvent.Type.LEADER_BOOTED,
                                 new Leadership(topicName, localNodeId, 0));
-                    eventDispatcher.post(leadershipEvent);
                     leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
                     leaderLock.unlock();
                 }