1. Fixed a synchronization issue with database update processing and expiry tracking.
2. Fixed a synchronization issue with MapDBLog appendEntries method.
3. DatabaseClient now uses ProtocolClient to interact with Raft cluster.
4. Misc javdoc and logging improvements

Change-Id: I147eb5bf859cf9827df452d62ab415d643a00aa4
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
index c8b16e4..a8941aa 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -1,10 +1,13 @@
 package org.onlab.onos.store.service.impl;
 
+import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -23,18 +26,23 @@
 import org.onlab.onos.store.service.LockService;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimaps;
 
 @Component(immediate = true)
 @Service
 public class DistributedLockManager implements LockService {
 
+    private static final ExecutorService THREAD_POOL =
+            Executors.newCachedThreadPool(namedThreads("lock-manager-%d"));
+
     private final Logger log = getLogger(getClass());
 
     public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
 
-    private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap
-            .create();
+    private final ListMultimap<String, LockRequest> locksToAcquire =
+                Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterCommunicationService clusterCommunicator;
@@ -56,8 +64,9 @@
 
     @Deactivate
     public void deactivate() {
+        clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
         locksToAcquire.clear();
-        log.info("Started.");
+        log.info("Stopped.");
     }
 
     @Override
@@ -77,11 +86,19 @@
         throw new UnsupportedOperationException();
     }
 
-    protected CompletableFuture<Void> lockIfAvailable(Lock lock,
-            long waitTimeMillis, int leaseDurationMillis) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis,
-                leaseDurationMillis, future));
+    /**
+     * Attempts to acquire the lock as soon as it becomes available.
+     * @param lock lock to acquire.
+     * @param waitTimeMillis maximum time to wait before giving up.
+     * @param leaseDurationMillis the duration for which to acquire the lock initially.
+     * @return Future lease expiration date.
+     */
+    protected CompletableFuture<DateTime> lockIfAvailable(
+            Lock lock,
+            long waitTimeMillis,
+            int leaseDurationMillis) {
+        CompletableFuture<DateTime> future = new CompletableFuture<>();
+        locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
         return future;
     }
 
@@ -90,37 +107,46 @@
         public void handle(ClusterMessage message) {
             TableModificationEvent event = DatabaseStateMachine.SERIALIZER
                     .decode(message.payload());
-            if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
-                return;
+            if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
+                    event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
+                THREAD_POOL.submit(new RetryLockTask(event.key()));
             }
+        }
+    }
 
-            log.info("Received a lock available event for path: {}", event.key());
+    private class RetryLockTask implements Runnable {
 
-            String path = event.key();
+        private final String path;
+
+        public RetryLockTask(String path) {
+            this.path = path;
+        }
+
+        @Override
+        public void run() {
             if (!locksToAcquire.containsKey(path)) {
                 return;
             }
 
-            if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
-                List<LockRequest> existingRequests = locksToAcquire.get(path);
-                if (existingRequests == null) {
-                    return;
-                }
+            List<LockRequest> existingRequests = locksToAcquire.get(path);
+            if (existingRequests == null || existingRequests.isEmpty()) {
+                return;
+            }
+            log.info("Path {} is now available for locking. There are {} outstanding "
+                    + "requests for it.",
+                    path, existingRequests.size());
 
-                synchronized (existingRequests) {
-
-                    Iterator<LockRequest> existingRequestIterator = existingRequests
-                            .iterator();
-                    while (existingRequestIterator.hasNext()) {
-                        LockRequest request = existingRequestIterator.next();
-                        if (request.expirationTime().isAfter(DateTime.now())) {
+            synchronized (existingRequests) {
+                Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
+                while (existingRequestIterator.hasNext()) {
+                    LockRequest request = existingRequestIterator.next();
+                    if (DateTime.now().isAfter(request.requestExpirationTime())) {
+                        // request expired.
+                        existingRequestIterator.remove();
+                    } else {
+                        if (request.lock().tryLock(request.leaseDurationMillis())) {
+                            request.future().complete(DateTime.now().plusMillis(request.leaseDurationMillis()));
                             existingRequestIterator.remove();
-                        } else {
-                            if (request.lock().tryLock(
-                                    request.leaseDurationMillis())) {
-                                request.future().complete(null);
-                                existingRequestIterator.remove();
-                            }
                         }
                     }
                 }
@@ -131,16 +157,15 @@
     private class LockRequest {
 
         private final Lock lock;
-        private final DateTime expirationTime;
+        private final DateTime requestExpirationTime;
         private final int leaseDurationMillis;
-        private final CompletableFuture<Void> future;
+        private final CompletableFuture<DateTime> future;
 
         public LockRequest(Lock lock, long waitTimeMillis,
-                int leaseDurationMillis, CompletableFuture<Void> future) {
+                int leaseDurationMillis, CompletableFuture<DateTime> future) {
 
             this.lock = lock;
-            this.expirationTime = DateTime.now().plusMillis(
-                    (int) waitTimeMillis);
+            this.requestExpirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
             this.leaseDurationMillis = leaseDurationMillis;
             this.future = future;
         }
@@ -149,15 +174,15 @@
             return lock;
         }
 
-        public DateTime expirationTime() {
-            return expirationTime;
+        public DateTime requestExpirationTime() {
+            return requestExpirationTime;
         }
 
         public int leaseDurationMillis() {
             return leaseDurationMillis;
         }
 
-        public CompletableFuture<Void> future() {
+        public CompletableFuture<DateTime> future() {
             return future;
         }
     }