1. Fixed a synchronization issue with database update processing and expiry tracking.
2. Fixed a synchronization issue with MapDBLog appendEntries method.
3. DatabaseClient now uses ProtocolClient to interact with Raft cluster.
4. Misc javdoc and logging improvements

Change-Id: I147eb5bf859cf9827df452d62ab415d643a00aa4
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseException.java b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseException.java
index bbc2daf..1468d1b 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseException.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseException.java
@@ -19,4 +19,18 @@
 
     public DatabaseException() {
     };
+
+    public static class Timeout extends DatabaseException {
+        public Timeout(String message, Throwable t) {
+            super(message, t);
+        }
+
+        public Timeout(String message) {
+            super(message);
+        }
+
+        public Timeout(Throwable t) {
+            super(t);
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 72e4b95..876f6cc 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -1,21 +1,24 @@
 package org.onlab.onos.store.service.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
-import java.util.Collections;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import net.kuujo.copycat.Copycat;
+import net.kuujo.copycat.cluster.Member;
+import net.kuujo.copycat.cluster.TcpMember;
 import net.kuujo.copycat.event.EventHandler;
 import net.kuujo.copycat.event.LeaderElectEvent;
+import net.kuujo.copycat.protocol.SubmitRequest;
+import net.kuujo.copycat.protocol.SubmitResponse;
+import net.kuujo.copycat.spi.protocol.ProtocolClient;
 
 import org.onlab.onos.store.service.BatchReadRequest;
 import org.onlab.onos.store.service.BatchWriteRequest;
@@ -28,7 +31,7 @@
 /**
  * Client for interacting with the Copycat Raft cluster.
  */
-public class DatabaseClient {
+public class DatabaseClient implements EventHandler<LeaderElectEvent> {
 
     private static final int RETRIES = 5;
 
@@ -36,137 +39,101 @@
 
     private final Logger log = getLogger(getClass());
 
-    private final Copycat copycat;
+    private final DatabaseProtocolService protocol;
+    private volatile ProtocolClient copycat = null;
+    private volatile Member currentLeader = null;
 
-    public DatabaseClient(Copycat copycat) {
-        this.copycat = checkNotNull(copycat);
+    public DatabaseClient(DatabaseProtocolService protocol) {
+        this.protocol = protocol;
+    }
+
+    @Override
+    public void handle(LeaderElectEvent event) {
+        Member newLeader = event.leader();
+        if (newLeader != null && !newLeader.equals(currentLeader)) {
+            currentLeader = newLeader;
+            if (copycat != null) {
+                copycat.close();
+            }
+            copycat = protocol.createClient((TcpMember) currentLeader);
+            copycat.connect();
+        }
+    }
+
+    private String nextRequestId() {
+        return UUID.randomUUID().toString();
     }
 
     public void waitForLeader() {
-        if (copycat.leader() != null) {
+        if (currentLeader != null) {
             return;
         }
 
         log.info("No leader in cluster, waiting for election.");
-        final CountDownLatch latch = new CountDownLatch(1);
-        final EventHandler<LeaderElectEvent> leaderLsnr = new EventHandler<LeaderElectEvent>() {
 
-            @Override
-            public void handle(LeaderElectEvent event) {
-                log.info("Leader chosen: {}", event);
-                latch.countDown();
-            }
-        };
-
-        copycat.event(LeaderElectEvent.class).registerHandler(leaderLsnr);
         try {
-            while (copycat.leader() == null) {
-                latch.await(200, TimeUnit.MILLISECONDS);
+            while (currentLeader == null) {
+                Thread.sleep(200);
             }
-            log.info("Leader appeared: {}", copycat.leader());
+            log.info("Leader appeared: {}", currentLeader);
             return;
         } catch (InterruptedException e) {
             log.error("Interrupted while waiting for Leader", e);
             Thread.currentThread().interrupt();
-        } finally {
-            copycat.event(LeaderElectEvent.class).unregisterHandler(leaderLsnr);
+        }
+    }
+
+    private <T> T submit(String operationName, Object... args) {
+        waitForLeader();
+        if (currentLeader == null) {
+            throw new DatabaseException("Raft cluster does not have a leader.");
+        }
+
+        SubmitRequest request =
+                new SubmitRequest(nextRequestId(), operationName, Arrays.asList(args));
+
+        CompletableFuture<SubmitResponse> submitResponse = copycat.submit(request);
+
+        log.debug("Sent {} to {}", request, currentLeader);
+
+        try {
+            return (T) submitResponse.get(TIMEOUT_MS, TimeUnit.MILLISECONDS).result();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new DatabaseException(e);
+        } catch (TimeoutException e) {
+            throw new DatabaseException.Timeout(e);
         }
     }
 
     public boolean createTable(String tableName) {
-        waitForLeader();
-        CompletableFuture<Boolean> future = copycat.submit("createTable", tableName);
-        try {
-            return future.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("createTable", tableName);
     }
 
     public boolean createTable(String tableName, int ttlMillis) {
-        waitForLeader();
-        CompletableFuture<Boolean> future = copycat.submit("createTableWithExpiration", tableName);
-        try {
-            return future.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("createTable", tableName, ttlMillis);
     }
 
     public void dropTable(String tableName) {
-        waitForLeader();
-        CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
-        try {
-            future.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        }
+        submit("dropTable", tableName);
     }
 
     public void dropAllTables() {
-        waitForLeader();
-        CompletableFuture<Void> future = copycat.submit("dropAllTables");
-        try {
-            future.get();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        }
+        submit("dropAllTables");
     }
 
     public Set<String> listTables() {
-        waitForLeader();
-        try {
-            for (int i = 0; i < RETRIES; ++i) {
-                CompletableFuture<Set<String>> future = copycat.submit("listTables");
-                try {
-                    return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
-                } catch (TimeoutException e) {
-                    log.debug("Timed out retrying {}", i);
-                    future.cancel(true);
-                    waitForLeader();
-                }
-            }
-            // TODO: proper timeout handling
-            log.error("Timed out");
-            return Collections.emptySet();
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("listTables");
     }
 
     public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
-        waitForLeader();
-        CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
-        try {
-            return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        } catch (TimeoutException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("read", batchRequest);
     }
 
     public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
-        waitForLeader();
-        CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
-        try {
-            return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        } catch (TimeoutException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("write", batchRequest);
     }
 
     public Map<String, VersionedValue> getAll(String tableName) {
-        waitForLeader();
-        CompletableFuture<Map<String, VersionedValue>> future = copycat.submit("getAll", tableName);
-        try {
-            return future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
-        } catch (InterruptedException | ExecutionException e) {
-            throw new DatabaseException(e);
-        } catch (TimeoutException e) {
-            throw new DatabaseException(e);
-        }
+        return submit("getAll", tableName);
     }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
index 62cf584..2fba52e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
@@ -16,10 +16,14 @@
 
 package org.onlab.onos.store.service.impl;
 
+import static org.onlab.util.Tools.namedThreads;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -40,6 +44,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.MoreObjects;
+
 /**
  * Plugs into the database update stream and track the TTL of entries added to
  * the database. For tables with pre-configured finite TTL, this class has
@@ -48,6 +54,9 @@
 public class DatabaseEntryExpirationTracker implements
         DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
 
+    private static final ExecutorService THREAD_POOL =
+            Executors.newCachedThreadPool(namedThreads("database-stale-entry-expirer-%d"));
+
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final DatabaseService databaseService;
@@ -74,7 +83,7 @@
 
     @Override
     public void tableModified(TableModificationEvent event) {
-        log.debug("Received a table modification event {}", event);
+        log.debug("{}: Received {}", localNode.id(), event);
 
         if (!tableEntryExpirationMap.containsKey(event.tableName())) {
             return;
@@ -89,8 +98,8 @@
             map.remove(row, eventVersion);
             if (isLocalMemberLeader.get()) {
                 try {
-                    // FIXME: The broadcast message should be sent to self.
-                    clusterCommunicator.broadcast(new ClusterMessage(
+                    log.debug("Broadcasting {} to the entire cluster", event);
+                    clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
                             localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
                             DatabaseStateMachine.SERIALIZER.encode(event)));
                 } catch (IOException e) {
@@ -119,8 +128,6 @@
             tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
                     .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
                     .expirationListener(expirationObserver)
-                    // TODO: make the expiration policy configurable.
-                    // Do we need to support expiration based on last access time?
                     .expirationPolicy(ExpirationPolicy.CREATED).build());
         }
     }
@@ -135,6 +142,23 @@
             ExpirationListener<DatabaseRow, Long> {
         @Override
         public void expired(DatabaseRow row, Long version) {
+            THREAD_POOL.submit(new ExpirationTask(row, version));
+        }
+    }
+
+    private class ExpirationTask implements Runnable {
+
+        private final DatabaseRow row;
+        private final Long version;
+
+        public ExpirationTask(DatabaseRow row, Long version) {
+            this.row = row;
+            this.version = version;
+        }
+
+        @Override
+        public void run() {
+            log.debug("Received an expiration event for {}, version: {}", row, version);
             Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(row.tableName);
             try {
                 if (isLocalMemberLeader.get()) {
@@ -142,7 +166,7 @@
                             row.key, version)) {
                         log.info("Entry in database was updated right before its expiration.");
                     } else {
-                        log.info("Successfully expired old entry with key ({}) from table ({})",
+                        log.debug("Successfully expired old entry with key ({}) from table ({})",
                                 row.key, row.tableName);
                     }
                 } else {
@@ -164,6 +188,9 @@
     @Override
     public void handle(LeaderElectEvent event) {
         isLocalMemberLeader.set(localMember.equals(event.leader()));
+        if (isLocalMemberLeader.get()) {
+            log.info("{} is now the leader of Raft cluster", localNode.id());
+        }
     }
 
     /**
@@ -180,6 +207,14 @@
         }
 
         @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                .add("tableName", tableName)
+                .add("key", key)
+                .toString();
+        }
+
+        @Override
         public boolean equals(Object obj) {
             if (this == obj) {
                 return true;
@@ -204,6 +239,7 @@
         if (!tableEntryExpirationMap.isEmpty()) {
             return;
         }
+        log.debug("Received a snapshot installed notification");
         for (String tableName : state.getTableNames()) {
 
             TableMetadata metadata = state.getTableMetadata(tableName);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 645de00..65b0544 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -173,13 +173,16 @@
         Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
                                         ClusterMessagingProtocol.SERIALIZER);
 
+        client = new DatabaseClient(copycatMessagingProtocol);
+
         copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
 
+        copycat.event(LeaderElectEvent.class).registerHandler(client);
         copycat.event(LeaderElectEvent.class).registerHandler(expirationTracker);
 
         copycat.start().get();
 
-        client = new DatabaseClient(copycat);
+        client = new DatabaseClient(copycatMessagingProtocol);
         client.waitForLeader();
 
         log.info("Started.");
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 5958bb7..f6ea217 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -1,5 +1,6 @@
 package org.onlab.onos.store.service.impl;
 
+import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.ByteArrayInputStream;
@@ -9,6 +10,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 
@@ -49,6 +52,9 @@
 
     private final Logger log = getLogger(getClass());
 
+    private final ExecutorService updatesExecutor =
+            Executors.newSingleThreadExecutor(namedThreads("database-statemachine-updates"));
+
     // message subject for database update notifications.
     public static final MessageSubject DATABASE_UPDATE_EVENTS =
             new MessageSubject("database-update-events");
@@ -88,8 +94,7 @@
     }
 
     @Command
-    public boolean createTableWithExpiration(String tableName) {
-        int ttlMillis = 10000;
+    public boolean createTable(String tableName, Integer ttlMillis) {
         TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
         return createTable(metadata);
     }
@@ -100,18 +105,32 @@
             return false;
         }
         state.createTable(metadata);
-        for (DatabaseUpdateEventListener listener : listeners) {
-            listener.tableCreated(metadata);
-        }
+
+        updatesExecutor.submit(new Runnable() {
+            @Override
+            public void run() {
+                for (DatabaseUpdateEventListener listener : listeners) {
+                    listener.tableCreated(metadata);
+                }
+            }
+        });
+
         return true;
     }
 
     @Command
     public boolean dropTable(String tableName) {
         if (state.removeTable(tableName)) {
-            for (DatabaseUpdateEventListener listener : listeners) {
-                listener.tableDeleted(tableName);
-            }
+
+            updatesExecutor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    for (DatabaseUpdateEventListener listener : listeners) {
+                        listener.tableDeleted(tableName);
+                    }
+                }
+            });
+
             return true;
         }
         return false;
@@ -121,11 +140,18 @@
     public boolean dropAllTables() {
         Set<String> tableNames = state.getTableNames();
         state.removeAllTables();
-        for (DatabaseUpdateEventListener listener : listeners) {
-            for (String tableName : tableNames) {
-                listener.tableDeleted(tableName);
+
+        updatesExecutor.submit(new Runnable() {
+            @Override
+            public void run() {
+                for (DatabaseUpdateEventListener listener : listeners) {
+                    for (String tableName : tableNames) {
+                        listener.tableDeleted(tableName);
+                    }
+                }
             }
-        }
+        });
+
         return true;
     }
 
@@ -273,12 +299,18 @@
         }
 
         // notify listeners of table mod events.
-        for (DatabaseUpdateEventListener listener : listeners) {
-            for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
-                log.trace("Publishing table modification event: {}", tableModificationEvent);
-                listener.tableModified(tableModificationEvent);
+
+        updatesExecutor.submit(new Runnable() {
+            @Override
+            public void run() {
+                for (DatabaseUpdateEventListener listener : listeners) {
+                    for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
+                        log.trace("Publishing table modification event: {}", tableModificationEvent);
+                        listener.tableModified(tableModificationEvent);
+                    }
+                }
             }
-        }
+        });
 
         return results;
     }
@@ -397,10 +429,15 @@
                 this.state = SERIALIZER.decode(data);
             }
 
-            // FIXME: synchronize.
-            for (DatabaseUpdateEventListener listener : listeners) {
-                listener.snapshotInstalled(state);
-            }
+            updatesExecutor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    for (DatabaseUpdateEventListener listener : listeners) {
+                        listener.snapshotInstalled(state);
+                    }
+                }
+            });
+
         } catch (Exception e) {
             log.error("Failed to install from snapshot", e);
             throw new SnapshotException(e);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
index cef53a5..a3fbc0d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -1,5 +1,7 @@
 package org.onlab.onos.store.service.impl;
 
+import static org.slf4j.LoggerFactory.getLogger;
+
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -11,12 +13,17 @@
 import org.onlab.onos.cluster.ClusterService;
 import org.onlab.onos.store.service.DatabaseService;
 import org.onlab.onos.store.service.Lock;
+import org.slf4j.Logger;
 
 /**
  * A distributed lock implementation.
  */
 public class DistributedLock implements Lock {
 
+    private final Logger log = getLogger(getClass());
+
+    private static final long MAX_WAIT_TIME_MS = 100000000L;
+
     private final DistributedLockManager lockManager;
     private final DatabaseService databaseService;
     private final String path;
@@ -44,54 +51,60 @@
 
     @Override
     public void lock(int leaseDurationMillis) {
-
         if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
             // Nothing to do.
             // Current expiration time is beyond what is requested.
             return;
         } else {
-            tryLock(Long.MAX_VALUE, leaseDurationMillis);
+            tryLock(MAX_WAIT_TIME_MS, leaseDurationMillis);
         }
     }
 
     @Override
     public boolean tryLock(int leaseDurationMillis) {
-        return databaseService.putIfAbsent(
+        if (databaseService.putIfAbsent(
                 DistributedLockManager.ONOS_LOCK_TABLE_NAME,
                 path,
-                lockId);
+                lockId)) {
+            isLocked.set(true);
+            lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
+            return true;
+        }
+        return false;
     }
 
     @Override
     public boolean tryLock(
             long waitTimeMillis,
             int leaseDurationMillis) {
-        if (!tryLock(leaseDurationMillis)) {
-            CompletableFuture<Void> future =
-                    lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
-            try {
-                future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
-            } catch (ExecutionException | InterruptedException e) {
-                // TODO: ExecutionException could indicate something
-                // wrong with the backing database.
-                // Throw an exception?
-                return false;
-            } catch (TimeoutException e) {
-                return false;
-            }
+        if (tryLock(leaseDurationMillis)) {
+            return true;
         }
-        isLocked.set(true);
-        lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
-        return true;
+
+        CompletableFuture<DateTime> future =
+                lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
+        try {
+            lockExpirationTime = future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
+            return true;
+        } catch (ExecutionException | InterruptedException e) {
+            log.error("Encountered an exception trying to acquire lock for " + path, e);
+            // TODO: ExecutionException could indicate something
+            // wrong with the backing database.
+            // Throw an exception?
+            return false;
+        } catch (TimeoutException e) {
+            log.debug("Timed out waiting to acquire lock for {}", path);
+            return false;
+        }
     }
 
     @Override
     public boolean isLocked() {
         if (isLocked.get()) {
             // We rely on local information to check
-            // if the expired.
+            // if the lock expired.
             // This should should make this call
-            // light weight, which still retaining the same
+            // light weight, while still retaining the
             // safety guarantees.
             if (DateTime.now().isAfter(lockExpirationTime)) {
                 isLocked.set(false);
@@ -108,17 +121,30 @@
         if (!isLocked()) {
             return;
         } else {
-            isLocked.set(false);
-            databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
+            if (databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId)) {
+                isLocked.set(false);
+            }
         }
     }
 
     @Override
     public boolean extendExpiration(int leaseDurationMillis) {
-        if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
+        if (!isLocked()) {
+            log.warn("Ignoring request to extend expiration for lock {}."
+                    + " ExtendExpiration must be called for locks that are already acquired.", path);
+        }
+
+        if (databaseService.putIfValueMatches(
+                DistributedLockManager.ONOS_LOCK_TABLE_NAME,
+                path,
+                lockId,
+                lockId)) {
+            lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
+            log.debug("Succeeded in extending lock {} expiration time to {}", lockExpirationTime);
             return true;
         } else {
-            return tryLock(leaseDurationMillis);
+            log.info("Failed to extend expiration for {}", path);
+            return false;
         }
     }
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
index c8b16e4..a8941aa 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -1,10 +1,13 @@
 package org.onlab.onos.store.service.impl;
 
+import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -23,18 +26,23 @@
 import org.onlab.onos.store.service.LockService;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.LinkedListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Multimaps;
 
 @Component(immediate = true)
 @Service
 public class DistributedLockManager implements LockService {
 
+    private static final ExecutorService THREAD_POOL =
+            Executors.newCachedThreadPool(namedThreads("lock-manager-%d"));
+
     private final Logger log = getLogger(getClass());
 
     public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
 
-    private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap
-            .create();
+    private final ListMultimap<String, LockRequest> locksToAcquire =
+                Multimaps.synchronizedListMultimap(LinkedListMultimap.<String, LockRequest>create());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterCommunicationService clusterCommunicator;
@@ -56,8 +64,9 @@
 
     @Deactivate
     public void deactivate() {
+        clusterCommunicator.removeSubscriber(DatabaseStateMachine.DATABASE_UPDATE_EVENTS);
         locksToAcquire.clear();
-        log.info("Started.");
+        log.info("Stopped.");
     }
 
     @Override
@@ -77,11 +86,19 @@
         throw new UnsupportedOperationException();
     }
 
-    protected CompletableFuture<Void> lockIfAvailable(Lock lock,
-            long waitTimeMillis, int leaseDurationMillis) {
-        CompletableFuture<Void> future = new CompletableFuture<>();
-        locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis,
-                leaseDurationMillis, future));
+    /**
+     * Attempts to acquire the lock as soon as it becomes available.
+     * @param lock lock to acquire.
+     * @param waitTimeMillis maximum time to wait before giving up.
+     * @param leaseDurationMillis the duration for which to acquire the lock initially.
+     * @return Future lease expiration date.
+     */
+    protected CompletableFuture<DateTime> lockIfAvailable(
+            Lock lock,
+            long waitTimeMillis,
+            int leaseDurationMillis) {
+        CompletableFuture<DateTime> future = new CompletableFuture<>();
+        locksToAcquire.put(lock.path(), new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
         return future;
     }
 
@@ -90,37 +107,46 @@
         public void handle(ClusterMessage message) {
             TableModificationEvent event = DatabaseStateMachine.SERIALIZER
                     .decode(message.payload());
-            if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
-                return;
+            if (event.tableName().equals(ONOS_LOCK_TABLE_NAME) &&
+                    event.type().equals(TableModificationEvent.Type.ROW_DELETED)) {
+                THREAD_POOL.submit(new RetryLockTask(event.key()));
             }
+        }
+    }
 
-            log.info("Received a lock available event for path: {}", event.key());
+    private class RetryLockTask implements Runnable {
 
-            String path = event.key();
+        private final String path;
+
+        public RetryLockTask(String path) {
+            this.path = path;
+        }
+
+        @Override
+        public void run() {
             if (!locksToAcquire.containsKey(path)) {
                 return;
             }
 
-            if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
-                List<LockRequest> existingRequests = locksToAcquire.get(path);
-                if (existingRequests == null) {
-                    return;
-                }
+            List<LockRequest> existingRequests = locksToAcquire.get(path);
+            if (existingRequests == null || existingRequests.isEmpty()) {
+                return;
+            }
+            log.info("Path {} is now available for locking. There are {} outstanding "
+                    + "requests for it.",
+                    path, existingRequests.size());
 
-                synchronized (existingRequests) {
-
-                    Iterator<LockRequest> existingRequestIterator = existingRequests
-                            .iterator();
-                    while (existingRequestIterator.hasNext()) {
-                        LockRequest request = existingRequestIterator.next();
-                        if (request.expirationTime().isAfter(DateTime.now())) {
+            synchronized (existingRequests) {
+                Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
+                while (existingRequestIterator.hasNext()) {
+                    LockRequest request = existingRequestIterator.next();
+                    if (DateTime.now().isAfter(request.requestExpirationTime())) {
+                        // request expired.
+                        existingRequestIterator.remove();
+                    } else {
+                        if (request.lock().tryLock(request.leaseDurationMillis())) {
+                            request.future().complete(DateTime.now().plusMillis(request.leaseDurationMillis()));
                             existingRequestIterator.remove();
-                        } else {
-                            if (request.lock().tryLock(
-                                    request.leaseDurationMillis())) {
-                                request.future().complete(null);
-                                existingRequestIterator.remove();
-                            }
                         }
                     }
                 }
@@ -131,16 +157,15 @@
     private class LockRequest {
 
         private final Lock lock;
-        private final DateTime expirationTime;
+        private final DateTime requestExpirationTime;
         private final int leaseDurationMillis;
-        private final CompletableFuture<Void> future;
+        private final CompletableFuture<DateTime> future;
 
         public LockRequest(Lock lock, long waitTimeMillis,
-                int leaseDurationMillis, CompletableFuture<Void> future) {
+                int leaseDurationMillis, CompletableFuture<DateTime> future) {
 
             this.lock = lock;
-            this.expirationTime = DateTime.now().plusMillis(
-                    (int) waitTimeMillis);
+            this.requestExpirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
             this.leaseDurationMillis = leaseDurationMillis;
             this.future = future;
         }
@@ -149,15 +174,15 @@
             return lock;
         }
 
-        public DateTime expirationTime() {
-            return expirationTime;
+        public DateTime requestExpirationTime() {
+            return requestExpirationTime;
         }
 
         public int leaseDurationMillis() {
             return leaseDurationMillis;
         }
 
-        public CompletableFuture<Void> future() {
+        public CompletableFuture<DateTime> future() {
             return future;
         }
     }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
index aa0dd7c..9ca5494 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/MapDBLog.java
@@ -86,7 +86,7 @@
     }
 
     @Override
-    public List<Long> appendEntries(List<Entry> entries) {
+    public synchronized List<Long> appendEntries(List<Entry> entries) {
         assertIsOpen();
         checkArgument(entries != null, "expecting non-null entries");
         final List<Long> indices = new ArrayList<>(entries.size());