Make leadership election more robust to failures.
    - Catch exceptions thrown by lock extension calls.
    - Dealing with potential race conditions between joining and withdrawing from a race.

Change-Id: I429045b33f5972c459d5ed031fe8593438813e8d
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
index b4091b2..49fdb8d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/LeadershipManager.java
@@ -1,7 +1,6 @@
 package org.onlab.onos.store.cluster.impl;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Verify.verifyNotNull;
 import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -75,7 +74,7 @@
 
     private final Map<String, Leadership> leaderBoard = Maps.newHashMap();
 
-    private final Map<String, Lock> openContests = Maps.newHashMap();
+    private final Map<String, Lock> openContests = Maps.newConcurrentMap();
     private final Set<LeadershipEventListener> listeners = Sets.newIdentityHashSet();
     private ControllerNode localNode;
 
@@ -133,19 +132,21 @@
     @Override
     public void runForLeadership(String path) {
         checkArgument(path != null);
+
         if (openContests.containsKey(path)) {
             log.info("Already in the leadership contest for {}", path);
             return;
         } else {
             Lock lock = lockService.create(path);
             openContests.put(path, lock);
-            tryAcquireLeadership(path);
+            threadPool.schedule(new TryLeadership(lock), 0, TimeUnit.MILLISECONDS);
         }
     }
 
     @Override
     public void withdraw(String path) {
         checkArgument(path != null);
+
         Lock lock = openContests.remove(path);
 
         if (lock != null && lock.isLocked()) {
@@ -171,13 +172,20 @@
 
     private void notifyListeners(LeadershipEvent event) {
         for (LeadershipEventListener listener : listeners) {
-            listener.event(event);
+            try {
+                listener.event(event);
+            } catch (Exception e) {
+                log.error("Notifying listener failed with exception.", e);
+            }
         }
     }
 
     private void tryAcquireLeadership(String path) {
         Lock lock = openContests.get(path);
-        verifyNotNull(lock, "Lock should not be null");
+        if (lock == null) {
+            // withdrew from race.
+            return;
+        }
         lock.lockAsync(TERM_DURATION_MS).whenComplete((response, error) -> {
             if (error == null) {
                 threadPool.schedule(
@@ -190,13 +198,8 @@
                                 new Leadership(lock.path(), localNode, lock.epoch())));
                 return;
             } else {
-                log.warn("Failed to acquire lock for {}. Will retry in {} sec", path, WAIT_BEFORE_RETRY_MS, error);
-                try {
-                    Thread.sleep(WAIT_BEFORE_RETRY_MS);
-                    tryAcquireLeadership(path);
-                } catch (InterruptedException e) {
-                    Thread.currentThread().interrupt();
-                }
+                log.warn("Failed to acquire lock for {}. Will retry in {} ms", path, WAIT_BEFORE_RETRY_MS, error);
+                threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
             }
         });
     }
@@ -211,24 +214,52 @@
 
         @Override
         public void run() {
-            if (lock.extendExpiration(TERM_DURATION_MS)) {
+            if (!openContests.containsKey(lock.path())) {
+                log.debug("Node withdrew from leadership race for {}. Cancelling reelection task.", lock.path());
+                return;
+            }
+
+            boolean lockExtended = false;
+            try {
+                lockExtended = lock.extendExpiration(TERM_DURATION_MS);
+            } catch (Exception e) {
+                log.warn("Attempt to extend lock failed with an exception.", e);
+            }
+
+            if (lockExtended) {
                 notifyListeners(
                         new LeadershipEvent(
                                 LeadershipEvent.Type.LEADER_REELECTED,
                                 new Leadership(lock.path(), localNode, lock.epoch())));
                 threadPool.schedule(this, TERM_DURATION_MS / 2, TimeUnit.MILLISECONDS);
             } else {
+                // Check if this node already withdrew from the contest, in which case
+                // we don't need to notify here.
                 if (openContests.containsKey(lock.path())) {
                     notifyListeners(
                             new LeadershipEvent(
                                     LeadershipEvent.Type.LEADER_BOOTED,
                                     new Leadership(lock.path(), localNode, lock.epoch())));
-                    tryAcquireLeadership(lock.path());
+                    // Retry leadership after a brief wait.
+                    threadPool.schedule(new TryLeadership(lock), WAIT_BEFORE_RETRY_MS, TimeUnit.MILLISECONDS);
                 }
             }
         }
     }
 
+    private class TryLeadership implements Runnable {
+        private final Lock lock;
+
+        public TryLeadership(Lock lock) {
+           this.lock = lock;
+        }
+
+        @Override
+        public void run() {
+            tryAcquireLeadership(lock.path());
+        }
+    }
+
     private class PeerAdvertiser implements LeadershipEventListener {
         @Override
         public void event(LeadershipEvent event) {