Reworked DatabaseService API.
Initial implementation of LockManager.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index c749197..77ff062 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -8,9 +8,11 @@
 
 import net.kuujo.copycat.Copycat;
 
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchWriteRequest;
 import org.onlab.onos.store.service.DatabaseException;
-import org.onlab.onos.store.service.ReadRequest;
-import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.WriteResult;
 
 /**
  * Client for interacting with the Copycat Raft cluster.
@@ -63,9 +65,9 @@
         }
     }
 
-    public List<InternalReadResult> batchRead(List<ReadRequest> requests) {
+    public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
 
-        CompletableFuture<List<InternalReadResult>> future = copycat.submit("read", requests);
+        CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
         try {
             return future.get();
         } catch (InterruptedException | ExecutionException e) {
@@ -73,9 +75,9 @@
         }
     }
 
-    public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) {
+    public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
 
-        CompletableFuture<List<InternalWriteResult>> future = copycat.submit("write", requests);
+        CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
         try {
             return future.get();
         } catch (InterruptedException | ExecutionException e) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index afeaef9..e3de134 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -4,8 +4,6 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -22,7 +20,6 @@
 import net.kuujo.copycat.cluster.TcpCluster;
 import net.kuujo.copycat.cluster.TcpClusterConfig;
 import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.log.InMemoryLog;
 import net.kuujo.copycat.log.Log;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -37,17 +34,18 @@
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchReadResult;
+import org.onlab.onos.store.service.BatchWriteRequest;
+import org.onlab.onos.store.service.BatchWriteResult;
 import org.onlab.onos.store.service.DatabaseAdminService;
 import org.onlab.onos.store.service.DatabaseException;
 import org.onlab.onos.store.service.DatabaseService;
-import org.onlab.onos.store.service.NoSuchTableException;
-import org.onlab.onos.store.service.OptimisticLockException;
-import org.onlab.onos.store.service.OptionalResult;
-import org.onlab.onos.store.service.ReadRequest;
 import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.WriteAborted;
-import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.ReadStatus;
+import org.onlab.onos.store.service.VersionedValue;
 import org.onlab.onos.store.service.WriteResult;
+import org.onlab.onos.store.service.WriteStatus;
 import org.onlab.packet.IpAddress;
 import org.slf4j.Logger;
 
@@ -199,66 +197,121 @@
     }
 
     @Override
-    public ReadResult read(ReadRequest request) {
-        return batchRead(Arrays.asList(request)).get(0).get();
-    }
-
-    @Override
-    public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
-            List<ReadRequest> batch) {
-        List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
-        for (InternalReadResult internalReadResult : client.batchRead(batch)) {
-            if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
-                readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
-                        new NoSuchTableException()));
-            } else {
-                readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
-                        internalReadResult.result()));
-            }
+    public VersionedValue get(String tableName, String key) {
+        BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
+        ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
+        if (readResult.status().equals(ReadStatus.OK)) {
+            return readResult.value();
         }
-        return readResults;
+        throw new DatabaseException("get failed due to status: " + readResult.status());
     }
 
     @Override
-    public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
-        return batchWrite(Arrays.asList(request)).get(0);
+    public BatchReadResult batchRead(BatchReadRequest batchRequest) {
+        return new BatchReadResult(client.batchRead(batchRequest));
     }
 
     @Override
-    public WriteResult write(WriteRequest request) {
-//            throws OptimisticLockException, PreconditionFailedException {
-        return writeNothrow(request).get();
+    public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
+        return new BatchWriteResult(client.batchWrite(batchRequest));
     }
 
     @Override
-    public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
-            List<WriteRequest> batch) {
-        List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
-        for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
-            if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new NoSuchTableException()));
-            } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new OptimisticLockException()));
-            } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
-                // TODO: throw a different exception?
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new OptimisticLockException()));
-            } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new WriteAborted()));
-            } else {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        internalWriteResult.result()));
-            }
+    public VersionedValue put(String tableName, String key, byte[] value) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return writeResult.previousValue();
         }
-        return writeResults;
+        throw new DatabaseException("put failed due to status: " + writeResult.status());
+    }
 
+    @Override
+    public boolean putIfAbsent(String tableName, String key, byte[] value) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfAbsent(tableName, key, value).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("putIfAbsent failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean putIfVersionMatches(String tableName, String key,
+            byte[] value, long version) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfVersionMatches(tableName, key, value, version).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("putIfVersionMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean putIfValueMatches(String tableName, String key,
+            byte[] oldValue, byte[] newValue) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfValueMatches(tableName, key, oldValue, newValue).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("putIfValueMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public VersionedValue remove(String tableName, String key) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().remove(tableName, key).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return writeResult.previousValue();
+        }
+        throw new DatabaseException("remove failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean removeIfVersionMatches(String tableName, String key,
+            long version) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfVersionMatches(tableName, key, version).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("removeIfVersionMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean removeIfValueMatches(String tableName, String key,
+            byte[] value) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfValueMatches(tableName, key, value).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("removeIfValueMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public void addMember(final ControllerNode node) {
+        final TcpMember tcpMember = new TcpMember(node.ip().toString(),
+                                                  node.tcpPort());
+        log.info("{} was added to the cluster", tcpMember);
+        synchronized (clusterConfig) {
+            clusterConfig.addRemoteMember(tcpMember);
+        }
     }
 
     private final class InternalClusterEventListener
-            implements ClusterEventListener {
+    implements ClusterEventListener {
 
         @Override
         public void event(ClusterEvent event) {
@@ -266,7 +319,7 @@
 
             final ControllerNode node = event.subject();
             final TcpMember tcpMember = new TcpMember(node.ip().toString(),
-                                                      node.tcpPort());
+                    node.tcpPort());
 
             switch (event.type()) {
             case INSTANCE_ACTIVATED:
@@ -284,8 +337,8 @@
             case INSTANCE_REMOVED:
                 if (autoAddMember) {
                     Set<DefaultControllerNode> members
-                        = tabletMembers.getOrDefault(DEFAULT_TABLET,
-                                                     Collections.emptySet());
+                    = tabletMembers.getOrDefault(DEFAULT_TABLET,
+                            Collections.emptySet());
                     // remove only if not the initial members
                     if (!members.contains(node)) {
                         synchronized (clusterConfig) {
@@ -308,63 +361,6 @@
 
     }
 
-    public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
-        public KryoRegisteredInMemoryLog() {
-            super();
-            // required to deserialize object across bundles
-            super.kryo.register(TcpMember.class, new TcpMemberSerializer());
-            super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
-        }
-    }
-
-    private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
-
-        private final R result;
-        private final DatabaseException exception;
-
-        public DatabaseOperationResult(R result) {
-            this.result = result;
-            this.exception = null;
-        }
-
-        public DatabaseOperationResult(DatabaseException exception) {
-            this.result = null;
-            this.exception = exception;
-        }
-
-        @Override
-        public R get() {
-            if (result != null) {
-                return result;
-            }
-            throw exception;
-        }
-
-        @Override
-        public boolean hasValidResult() {
-            return result != null;
-        }
-
-        @Override
-        public String toString() {
-            if (result != null) {
-                return result.toString();
-            } else {
-                return exception.toString();
-            }
-        }
-    }
-
-    @Override
-    public void addMember(final ControllerNode node) {
-        final TcpMember tcpMember = new TcpMember(node.ip().toString(),
-                                                  node.tcpPort());
-        log.info("{} was added to the cluster", tcpMember);
-        synchronized (clusterConfig) {
-            clusterConfig.addRemoteMember(tcpMember);
-        }
-    }
-
     @Override
     public void removeMember(final ControllerNode node) {
         final TcpMember tcpMember = new TcpMember(node.ip().toString(),
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 50d1203..17b174c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -8,6 +8,7 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 
@@ -15,16 +16,21 @@
 import net.kuujo.copycat.Query;
 import net.kuujo.copycat.StateMachine;
 
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchWriteRequest;
 import org.onlab.onos.store.service.ReadRequest;
 import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.ReadStatus;
 import org.onlab.onos.store.service.VersionedValue;
 import org.onlab.onos.store.service.WriteRequest;
 import org.onlab.onos.store.service.WriteResult;
-import org.onlab.onos.store.service.impl.InternalWriteResult.Status;
+import org.onlab.onos.store.service.WriteStatus;
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
+import com.beust.jcommander.internal.Lists;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.io.ByteStreams;
@@ -40,6 +46,10 @@
 
     private final Logger log = getLogger(getClass());
 
+    // message subject for database update notifications.
+    public static MessageSubject DATABASE_UPDATE_EVENTS =
+            new MessageSubject("database-update-events");
+
     // serializer used for snapshot
     public static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
@@ -47,29 +57,72 @@
             serializerPool = KryoNamespace.newBuilder()
                     .register(VersionedValue.class)
                     .register(State.class)
+                    .register(BatchReadRequest.class)
+                    .register(BatchWriteRequest.class)
+                    .register(ReadStatus.class)
+                    .register(WriteStatus.class)
+                    // TODO: Move this out ?
+                    .register(TableModificationEvent.class)
                     .register(ClusterMessagingProtocol.COMMON)
                     .build()
                     .populate(1);
         }
     };
 
+    private final List<DatabaseUpdateEventListener> listeners = Lists.newLinkedList();
+
+    // durable internal state of the database.
     private State state = new State();
 
     private boolean compressSnapshot = false;
 
     @Command
     public boolean createTable(String tableName) {
-        return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
+        Map<String, VersionedValue> existingTable =
+                state.getTables().putIfAbsent(tableName, Maps.newHashMap());
+        if (existingTable == null) {
+            for (DatabaseUpdateEventListener listener : listeners) {
+                listener.tableCreated(tableName, Integer.MAX_VALUE);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Command
+    public boolean createTable(String tableName, int expirationTimeMillis) {
+        Map<String, VersionedValue> existingTable =
+                state.getTables().putIfAbsent(tableName, Maps.newHashMap());
+        if (existingTable == null) {
+            for (DatabaseUpdateEventListener listener : listeners) {
+                listener.tableCreated(tableName, expirationTimeMillis);
+            }
+            return true;
+        }
+        return false;
     }
 
     @Command
     public boolean dropTable(String tableName) {
-        return state.getTables().remove(tableName) != null;
+        Map<String, VersionedValue> table = state.getTables().remove(tableName);
+        if (table != null) {
+            for (DatabaseUpdateEventListener listener : listeners) {
+                listener.tableDeleted(tableName);
+            }
+            return true;
+        }
+        return false;
     }
 
     @Command
     public boolean dropAllTables() {
+        Set<String> tableNames = state.getTables().keySet();
         state.getTables().clear();
+        for (DatabaseUpdateEventListener listener : listeners) {
+            for (String tableName : tableNames) {
+                listener.tableDeleted(tableName);
+            }
+        }
         return true;
     }
 
@@ -79,96 +132,95 @@
     }
 
     @Query
-    public List<InternalReadResult> read(List<ReadRequest> requests) {
-        List<InternalReadResult> results = new ArrayList<>(requests.size());
-        for (ReadRequest request : requests) {
+    public List<ReadResult> read(BatchReadRequest batchRequest) {
+        List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
+        for (ReadRequest request : batchRequest.getAsList()) {
             Map<String, VersionedValue> table = state.getTables().get(request.tableName());
             if (table == null) {
-                results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
+                results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
                 continue;
             }
             VersionedValue value = VersionedValue.copy(table.get(request.key()));
-            results.add(new InternalReadResult(
-                    InternalReadResult.Status.OK,
-                    new ReadResult(
-                            request.tableName(),
-                            request.key(),
-                            value)));
+            results.add(new ReadResult(ReadStatus.OK, request.tableName(), request.key(), value));
         }
         return results;
     }
 
-    InternalWriteResult.Status checkIfApplicable(WriteRequest request,
-                                                 VersionedValue value) {
+    WriteStatus checkIfApplicable(WriteRequest request,
+                                        VersionedValue value) {
 
         switch (request.type()) {
         case PUT:
-            return InternalWriteResult.Status.OK;
+            return WriteStatus.OK;
 
         case PUT_IF_ABSENT:
             if (value == null) {
-                return InternalWriteResult.Status.OK;
+                return WriteStatus.OK;
             }
-            return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+            return WriteStatus.PRECONDITION_VIOLATION;
         case PUT_IF_VALUE:
         case REMOVE_IF_VALUE:
             if (value != null && Arrays.equals(value.value(), request.oldValue())) {
-                return InternalWriteResult.Status.OK;
+                return WriteStatus.OK;
             }
-            return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+            return WriteStatus.PRECONDITION_VIOLATION;
         case PUT_IF_VERSION:
         case REMOVE_IF_VERSION:
             if (value != null && request.previousVersion() == value.version()) {
-                return InternalWriteResult.Status.OK;
+                return WriteStatus.OK;
             }
-            return InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH;
+            return WriteStatus.PRECONDITION_VIOLATION;
         case REMOVE:
-            return InternalWriteResult.Status.OK;
+            return WriteStatus.OK;
         default:
             break;
         }
         log.error("Should never reach here {}", request);
-        return InternalWriteResult.Status.ABORTED;
+        return WriteStatus.ABORTED;
     }
 
     @Command
-    public List<InternalWriteResult> write(List<WriteRequest> requests) {
+    public List<WriteResult> write(BatchWriteRequest batchRequest) {
 
         // applicability check
         boolean abort = false;
-        List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
-        for (WriteRequest request : requests) {
+        List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
+        for (WriteRequest request : batchRequest.getAsList()) {
             Map<String, VersionedValue> table = state.getTables().get(request.tableName());
             if (table == null) {
-                validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
+                validationResults.add(WriteStatus.NO_SUCH_TABLE);
                 abort = true;
                 continue;
             }
             final VersionedValue value = table.get(request.key());
-            Status result = checkIfApplicable(request, value);
+            WriteStatus result = checkIfApplicable(request, value);
             validationResults.add(result);
-            if (result != Status.OK) {
+            if (result != WriteStatus.OK) {
                 abort = true;
             }
         }
 
-        List<InternalWriteResult> results = new ArrayList<>(requests.size());
+        List<WriteResult> results = new ArrayList<>(batchRequest.batchSize());
 
         if (abort) {
-            for (InternalWriteResult.Status validationResult : validationResults) {
-                if (validationResult == InternalWriteResult.Status.OK) {
+            for (WriteStatus validationResult : validationResults) {
+                if (validationResult == WriteStatus.OK) {
                     // aborted due to applicability check failure on other request
-                    results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
+                    results.add(new WriteResult(WriteStatus.ABORTED, null));
                 } else {
-                    results.add(new InternalWriteResult(validationResult, null));
+                    results.add(new WriteResult(validationResult, null));
                 }
             }
             return results;
         }
 
+        List<TableModificationEvent> tableModificationEvents = Lists.newLinkedList();
+
         // apply changes
-        for (WriteRequest request : requests) {
+        for (WriteRequest request : batchRequest.getAsList()) {
             Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+
+            TableModificationEvent tableModificationEvent = null;
             // FIXME: If this method could be called by multiple thread,
             // synchronization scope is wrong.
             // Whole function including applicability check needs to be protected.
@@ -182,16 +234,23 @@
                 case PUT_IF_VERSION:
                     VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
                     VersionedValue previousValue = table.put(request.key(), newValue);
-                    WriteResult putResult = new WriteResult(request.tableName(), request.key(), previousValue);
-                    results.add(InternalWriteResult.ok(putResult));
+                    WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
+                    results.add(putResult);
+                    tableModificationEvent = (previousValue == null) ?
+                            TableModificationEvent.rowAdded(request.tableName(), request.key()) :
+                            TableModificationEvent.rowUpdated(request.tableName(), request.key());
                     break;
 
                 case REMOVE:
                 case REMOVE_IF_VALUE:
                 case REMOVE_IF_VERSION:
                     VersionedValue removedValue = table.remove(request.key());
-                    WriteResult removeResult = new WriteResult(request.tableName(), request.key(), removedValue);
-                    results.add(InternalWriteResult.ok(removeResult));
+                    WriteResult removeResult = new WriteResult(WriteStatus.OK, removedValue);
+                    results.add(removeResult);
+                    if (removedValue != null) {
+                        tableModificationEvent =
+                                TableModificationEvent.rowDeleted(request.tableName(), request.key());
+                    }
                     break;
 
                 default:
@@ -199,7 +258,19 @@
                     break;
                 }
             }
+
+            if (tableModificationEvent != null) {
+                tableModificationEvents.add(tableModificationEvent);
+            }
         }
+
+        // notify listeners of table mod events.
+        for (DatabaseUpdateEventListener listener : listeners) {
+            for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
+                listener.tableModified(tableModificationEvent);
+            }
+        }
+
         return results;
     }
 
@@ -253,4 +324,8 @@
             throw new SnapshotException(e);
         }
     }
+
+    public void addEventListener(DatabaseUpdateEventListener listener) {
+        listeners.add(listener);
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java
new file mode 100644
index 0000000..58d5446
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onlab.onos.store.service.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import net.jodah.expiringmap.ExpiringMap;
+import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
+import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
+import net.kuujo.copycat.cluster.Member;
+import net.kuujo.copycat.event.EventHandler;
+import net.kuujo.copycat.event.LeaderElectEvent;
+
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.service.DatabaseService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DatabaseUpdateEventHandler implements DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    public final static MessageSubject DATABASE_UPDATES = new MessageSubject("database-update-event");
+
+    private DatabaseService databaseService;
+    private ClusterService cluster;
+    private ClusterCommunicationService clusterCommunicator;
+
+    private final Member localMember;
+    private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
+    private final Map<String, Map<DatabaseRow, Void>> tableEntryExpirationMap = new HashMap<>();
+    private final ExpirationListener<DatabaseRow, Void> expirationObserver = new ExpirationObserver();
+
+    DatabaseUpdateEventHandler(Member localMember) {
+        this.localMember = localMember;
+    }
+
+    @Override
+    public void tableModified(TableModificationEvent event) {
+        DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
+        Map<DatabaseRow, Void> map = tableEntryExpirationMap.get(event.tableName());
+
+        switch (event.type()) {
+        case ROW_DELETED:
+            if (isLocalMemberLeader.get()) {
+                try {
+                    clusterCommunicator.broadcast(
+                            new ClusterMessage(
+                                    cluster.getLocalNode().id(),
+                                    DATABASE_UPDATES,
+                                    DatabaseStateMachine.SERIALIZER.encode(event)));
+                } catch (IOException e) {
+                    log.error("Failed to broadcast a database table modification event.", e);
+                }
+            }
+            break;
+        case ROW_ADDED:
+        case ROW_UPDATED:
+            map.put(row, null);
+            break;
+        default:
+            break;
+        }
+    }
+
+    @Override
+    public void tableCreated(String tableName, int expirationTimeMillis) {
+        // make this explicit instead of relying on a negative value
+        // to indicate no expiration.
+        if (expirationTimeMillis > 0) {
+            tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
+                    .expiration(expirationTimeMillis, TimeUnit.SECONDS)
+                    .expirationListener(expirationObserver)
+                    // FIXME: make the expiration policy configurable.
+                    .expirationPolicy(ExpirationPolicy.CREATED)
+                    .build());
+        }
+    }
+
+    @Override
+    public void tableDeleted(String tableName) {
+        tableEntryExpirationMap.remove(tableName);
+    }
+
+    private class ExpirationObserver implements ExpirationListener<DatabaseRow, Void> {
+        @Override
+        public void expired(DatabaseRow key, Void value) {
+            try {
+                // TODO: The safety of this check needs to be verified.
+                // Couple of issues:
+                // 1. It is very likely that only one member should attempt deletion of the entry from database.
+                // 2. A potential race condition exists where the entry expires, but before its can be deleted
+                // from the database, a new entry is added or existing entry is updated.
+                // That means ttl and expiration should be for a given version.
+                if (isLocalMemberLeader.get()) {
+                    databaseService.remove(key.tableName, key.key);
+                }
+            } catch (Exception e) {
+                log.warn("Failed to delete entry from the database after ttl expiration. Will retry eviction", e);
+                tableEntryExpirationMap.get(key.tableName).put(new DatabaseRow(key.tableName, key.key), null);
+            }
+        }
+    }
+
+    @Override
+    public void handle(LeaderElectEvent event) {
+        if (localMember.equals(event.leader())) {
+            isLocalMemberLeader.set(true);
+        }
+    }
+
+    private class DatabaseRow {
+
+        String tableName;
+        String key;
+
+        public DatabaseRow(String tableName, String key) {
+            this.tableName = tableName;
+            this.key = key;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof DatabaseRow)) {
+                return false;
+            }
+            DatabaseRow that = (DatabaseRow) obj;
+
+            return Objects.equals(this.tableName, that.tableName) &&
+                   Objects.equals(this.key, that.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableName, key);
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
new file mode 100644
index 0000000..f7b147f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onlab.onos.store.service.impl;
+
+public interface DatabaseUpdateEventListener {
+    
+    /**
+     * 
+     * @param event
+     */
+    public void tableModified(TableModificationEvent event);
+    
+    /**
+     * 
+     * @param tableName
+     * @param expirationTimeMillis
+     */
+    public void tableCreated(String tableName, int expirationTimeMillis);
+    
+    /**
+     * 
+     * @param tableName
+     */
+    public void tableDeleted(String tableName);
+
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
new file mode 100644
index 0000000..eaae063
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -0,0 +1,123 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.joda.time.DateTime;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.OptimisticLockException;
+
+/**
+ * A distributed lock implementation.
+ */
+public class DistributedLock implements Lock {
+
+    private final DistributedLockManager lockManager;
+    private final DatabaseService databaseService;
+    private final String path;
+    private DateTime lockExpirationTime;
+    private AtomicBoolean isLocked = new AtomicBoolean(false);
+    private byte[] lockId;
+
+    public DistributedLock(
+            String path,
+            DatabaseService databaseService,
+            ClusterService clusterService,
+            DistributedLockManager lockManager) {
+
+        this.path = path;
+        this.databaseService = databaseService;
+        this.lockManager = lockManager;
+        this.lockId =
+                (UUID.randomUUID().toString() + "::" + clusterService.getLocalNode().id().toString()).getBytes();
+    }
+
+    @Override
+    public String path() {
+        return path;
+    }
+
+    @Override
+    public void lock(int leaseDurationMillis) {
+
+        if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
+            // Nothing to do.
+            // Current expiration time is beyond what is requested.
+            return;
+        } else {
+            tryLock(Long.MAX_VALUE, leaseDurationMillis);
+        }
+    }
+
+    @Override
+    public boolean tryLock(int leaseDurationMillis) {
+        try {
+            databaseService.putIfAbsent(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
+            return true;
+        } catch (OptimisticLockException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public boolean tryLock(
+            long waitTimeMillis,
+            int leaseDurationMillis) {
+        if (tryLock(leaseDurationMillis) == false) {
+            CompletableFuture<Void> future =
+                    lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
+            try {
+                future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
+            } catch (ExecutionException | InterruptedException e) {
+                // TODO: ExecutionException could indicate something
+                // wrong with the backing database.
+                // Throw an exception?
+                return false;
+            } catch (TimeoutException e) {
+                return false;
+            }
+        }
+        lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
+        return true;
+    }
+
+    @Override
+    public boolean isLocked() {
+        if (isLocked.get()) {
+            // We rely on local information to check
+            // if the expired.
+            // This should should make this call
+            // light weight, which still retaining the same
+            // safety guarantees.
+            if (DateTime.now().isAfter(lockExpirationTime)) {
+                isLocked.set(false);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void unlock() {
+        if (!isLocked()) {
+            return;
+        } else {
+            databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
+        }
+    }
+
+    @Override
+    public boolean extendExpiration(int leaseDurationMillis) {
+        if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
+            return true;
+        } else {
+            return tryLock(leaseDurationMillis);
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
new file mode 100644
index 0000000..c87ab37
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -0,0 +1,159 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.joda.time.DateTime;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.LockEventListener;
+import org.onlab.onos.store.service.LockService;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ArrayListMultimap;
+
+@Component(immediate = true)
+@Service
+public class DistributedLockManager implements LockService {
+
+    private final Logger log = getLogger(getClass());
+
+    public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
+
+    private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap.create();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private DatabaseService databaseService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterService clusterService;
+
+    @Activate
+    public void activate() {
+        clusterCommunicator.addSubscriber(
+                DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
+                new LockEventMessageListener());
+        log.info("Started.");
+
+    }
+
+    @Deactivate
+    public void deactivate() {
+        locksToAcquire.clear();
+        log.info("Started.");
+    }
+
+    @Override
+    public Lock create(String path) {
+        return new DistributedLock(
+                path,
+                databaseService,
+                clusterService,
+                this);
+    }
+
+    @Override
+    public void addListener(LockEventListener listener) {
+        // FIXME:
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void removeListener(LockEventListener listener) {
+        // FIXME:
+        throw new UnsupportedOperationException();
+    }
+
+    protected CompletableFuture<Void> lockIfAvailable(Lock lock, long waitTimeMillis, int leaseDurationMillis) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        locksToAcquire.put(
+                lock.path(),
+                new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
+        return future;
+    }
+
+    private class LockEventMessageListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            TableModificationEvent event = DatabaseStateMachine.SERIALIZER.decode(message.payload());
+            if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
+                return;
+            }
+
+            String path = event.key();
+            if (!locksToAcquire.containsKey(path)) {
+                return;
+            }
+
+            if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
+                List<LockRequest> existingRequests = locksToAcquire.get(path);
+                if (existingRequests == null) return;
+
+                Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
+                while (existingRequestIterator.hasNext()) {
+                    LockRequest request = existingRequestIterator.next();
+                    if (request.expirationTime().isAfter(DateTime.now())) {
+                        existingRequestIterator.remove();
+                    } else {
+                        if (request.lock().tryLock(request.leaseDurationMillis()) == true) {
+                            request.future().complete(null);
+                            existingRequests.remove(0);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private class LockRequest {
+
+        private final Lock lock;
+        private final DateTime expirationTime;
+        private final int leaseDurationMillis;
+        private final CompletableFuture<Void> future;
+
+        public LockRequest(
+            Lock lock,
+            long waitTimeMillis,
+            int leaseDurationMillis,
+            CompletableFuture<Void> future) {
+
+            this.lock = lock;
+            this.expirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
+            this.leaseDurationMillis = leaseDurationMillis;
+            this.future = future;
+        }
+
+        public Lock lock() {
+            return lock;
+        }
+
+        public DateTime expirationTime() {
+            return expirationTime;
+        }
+
+        public int leaseDurationMillis() {
+            return leaseDurationMillis;
+        }
+
+        public CompletableFuture<Void> future() {
+            return future;
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
new file mode 100644
index 0000000..25956de
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
@@ -0,0 +1,44 @@
+package org.onlab.onos.store.service.impl;
+
+public class TableModificationEvent {
+
+    public enum Type {
+        ROW_ADDED,
+        ROW_DELETED,
+        ROW_UPDATED
+    }
+    
+    private final String tableName;
+    private final String key;
+    private final Type type;
+
+    public static TableModificationEvent rowDeleted(String tableName, String key) {
+        return new TableModificationEvent(tableName, key, Type.ROW_DELETED);
+    }
+    
+    public static TableModificationEvent rowAdded(String tableName, String key) {
+        return new TableModificationEvent(tableName, key, Type.ROW_ADDED);
+    }
+    
+    public static TableModificationEvent rowUpdated(String tableName, String key) {
+        return new TableModificationEvent(tableName, key, Type.ROW_UPDATED);
+    }
+
+    private TableModificationEvent(String tableName, String key, Type type) {
+        this.tableName = tableName;
+        this.key = key;
+        this.type = type;
+    }
+
+    public String tableName() {
+        return tableName;
+    }
+
+    public String key() {
+        return key;
+    }
+
+    public Type type() {
+        return type;
+    }
+}