Fix potential race conditions in HazelcastLeadershipService

Change-Id: Iac232652155830c8e054760ea371ffb5639cf464
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
index ca23482..990f259 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastLeadershipService.java
@@ -166,8 +166,8 @@
         checkArgument(path != null);
         Topic topic = topics.get(path);
         if (topic != null) {
-            topic.stop();
             topics.remove(path, topic);
+            topic.stop();
         }
     }
 
@@ -213,6 +213,7 @@
             topic = new Topic(topicName);
             Topic oldTopic = topics.putIfAbsent(topicName, topic);
             if (oldTopic == null) {
+                // encountered new topic, start periodic processing
                 topic.start();
             } else {
                 topic = oldTopic;
@@ -285,7 +286,11 @@
         /**
          * Starts operation.
          */
-        private void start() {
+        private synchronized void start() {
+            if (!isShutdown) {
+                // already running
+                return;
+            }
             isShutdown = false;
             String threadPoolName = "onos-leader-election-" + topicName + "-%d";
             leaderElectionExecutor = Executors.newScheduledThreadPool(2,
@@ -303,13 +308,14 @@
         /**
          * Runs for leadership.
          */
-        private void runForLeadership() {
+        private synchronized void runForLeadership() {
             if (isRunningForLeadership) {
                 return;         // Nothing to do: already running
             }
             if (isShutdown) {
                 start();
             }
+            isRunningForLeadership = true;
             String lockHzId = "LeadershipService/" + topicName + "/lock";
             String termHzId = "LeadershipService/" + topicName + "/term";
             leaderLock = storeService.getHazelcastInstance().getLock(lockHzId);
@@ -326,7 +332,7 @@
         /**
          * Stops leadership election for the topic.
          */
-        private void stop() {
+        private synchronized void stop() {
             isShutdown = true;
             isRunningForLeadership = false;
             // getLockFuture.cancel(true);
@@ -454,22 +460,22 @@
                     continue;
                 }
 
-                synchronized (this) {
-                    //
-                    // This instance is now the leader
-                    //
-                    log.info("Leader Elected for topic {}", topicName);
-
-                    updateTerm();
-
-                    leader = localNodeId;
-                    leadershipEvent = new LeadershipEvent(
-                        LeadershipEvent.Type.LEADER_ELECTED,
-                        new Leadership(topicName, localNodeId, myLastLeaderTerm));
-                    leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
-                }
-
                 try {
+                    synchronized (this) {
+                        //
+                        // This instance is now the leader
+                        //
+                        log.info("Leader Elected for topic {}", topicName);
+
+                        updateTerm();
+
+                        leader = localNodeId;
+                        leadershipEvent = new LeadershipEvent(
+                                                              LeadershipEvent.Type.LEADER_ELECTED,
+                                                              new Leadership(topicName, localNodeId, myLastLeaderTerm));
+                        leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+                    }
+
                     // Sleep forever until interrupted
                     Thread.sleep(Long.MAX_VALUE);
                 } catch (InterruptedException e) {
@@ -479,23 +485,24 @@
                     //
                     log.debug("Leader Interrupted for topic {}",
                               topicName);
-                }
 
-                synchronized (this) {
-                    // If we reach here, we should release the leadership
-                    log.debug("Leader Lock Released for topic {}", topicName);
-                    if ((leader != null) &&
-                        leader.equals(localNodeId)) {
-                        leader = null;
+                } finally {
+                    synchronized (this) {
+                        // If we reach here, we should release the leadership
+                        log.debug("Leader Lock Released for topic {}", topicName);
+                        if ((leader != null) &&
+                                leader.equals(localNodeId)) {
+                            leader = null;
+                        }
+                        leadershipEvent = new LeadershipEvent(
+                                                              LeadershipEvent.Type.LEADER_BOOTED,
+                                                              new Leadership(topicName, localNodeId, myLastLeaderTerm));
+                        leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
+                        leaderLock.unlock();
                     }
-                    leadershipEvent = new LeadershipEvent(
-                                LeadershipEvent.Type.LEADER_BOOTED,
-                                new Leadership(topicName, localNodeId, myLastLeaderTerm));
-                    leaderTopic.publish(SERIALIZER.encode(leadershipEvent));
-                    leaderLock.unlock();
                 }
             }
-
+            isRunningForLeadership = false;
         }
 
         // Globally guarded by the leadership lock for this term