Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
index 8beed85..db8514d 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
@@ -15,8 +15,13 @@
  */
 package org.onlab.onos.foo;
 
+import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static org.onlab.util.Tools.namedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
 import java.nio.ByteBuffer;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -37,20 +42,10 @@
 import org.onlab.onos.net.intent.IntentListener;
 import org.onlab.onos.net.intent.IntentService;
 import org.onlab.onos.store.service.DatabaseAdminService;
-import org.onlab.onos.store.service.DatabaseException;
 import org.onlab.onos.store.service.DatabaseService;
-import org.onlab.onos.store.service.OptionalResult;
-import org.onlab.onos.store.service.PreconditionFailedException;
-import org.onlab.onos.store.service.ReadRequest;
-import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.WriteRequest;
-import org.onlab.onos.store.service.WriteResult;
+import org.onlab.onos.store.service.VersionedValue;
 import org.slf4j.Logger;
 
-import static org.onlab.util.Tools.namedThreads;
-import static org.slf4j.LoggerFactory.getLogger;
-import static java.util.concurrent.Executors.newScheduledThreadPool;
-
 /**
  * Playground app component.
  */
@@ -97,9 +92,9 @@
             log.info("Couldn't find DB service");
         } else {
             log.info("Found DB service");
-//            longIncrementor();
-//            executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
-//            executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
+            longIncrementor();
+            executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
+            executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
         }
         log.info("Started");
     }
@@ -164,44 +159,33 @@
 
             dbAdminService.createTable(someTable);
 
-            ReadResult read = dbService.read(ReadRequest.get(someTable, someKey));
-            if (!read.valueExists()) {
+            VersionedValue vv = dbService.get(someTable, someKey);
+            if (vv == null) {
                 ByteBuffer zero = ByteBuffer.allocate(Long.BYTES).putLong(0);
-                try {
-                    dbService.write(WriteRequest
-                                    .putIfAbsent(someTable,
-                                                 someKey,
-                                                 zero.array()));
-                    log.info("Wrote initial value");
-                    read = dbService.read(ReadRequest.get(someTable, someKey));
-                } catch (PreconditionFailedException e) {
-                    log.info("Concurrent write detected.", e);
-
-                    // concurrent write detected, read and fall through
-                    read = dbService.read(ReadRequest.get(someTable, someKey));
-                    if (!read.valueExists()) {
-                        log.error("Shouldn't reach here");
-                    }
+                if (dbService.putIfAbsent(someTable, someKey, zero.array())) {
+                	log.info("Wrote initial value");
+                	vv = dbService.get(someTable, someKey);
+                } else {
+                	log.info("Concurrent write detected.");
+                	// concurrent write detected, read and fall through
+                	vv = dbService.get(someTable, someKey);
+                	if (vv == null) {
+                		log.error("Shouldn't reach here");
+                	}
                 }
             }
             int retry = 5;
             do {
-                ByteBuffer prev = ByteBuffer.wrap(read.value().value());
+                ByteBuffer prev = ByteBuffer.wrap(vv.value());
                 long next = prev.getLong() + 1;
                 byte[] newValue = ByteBuffer.allocate(Long.BYTES).putLong(next).array();
-                OptionalResult<WriteResult, DatabaseException> result
-                = dbService.writeNothrow(WriteRequest
-                                         .putIfVersionMatches(someTable,
-                                                              someKey,
-                                                              newValue,
-                                                              read.value().version()));
-                if (result.hasValidResult()) {
-                    log.info("Write success {} -> {}", result.get().previousValue(), next);
+                if (dbService.putIfVersionMatches(someTable, someKey, newValue, vv.version())) {
+                    log.info("Write success. New value: {}", next);
                     break;
                 } else {
                     log.info("Write failed trying to write {}", next);
-                    read = dbService.read(ReadRequest.get(someTable, someKey));
-                    if (!read.valueExists()) {
+                    vv = dbService.get(someTable, someKey);
+                    if (vv == null) {
                         log.error("Shouldn't reach here");
                     }
                 }
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchReadRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadRequest.java
new file mode 100644
index 0000000..1cf422c
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadRequest.java
@@ -0,0 +1,70 @@
+package org.onlab.onos.store.service;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Collection of read requests to be submitted as one batch.
+ */
+public class BatchReadRequest {
+
+	private final List<ReadRequest> readRequests;
+
+	/**
+	 * Creates a new BatchReadRequest object from the specified list of read requests.
+	 * @param readRequests read requests.
+	 * @return BatchReadRequest object.
+	 */
+	public static BatchReadRequest create(List<ReadRequest> readRequests) {
+		return new BatchReadRequest(readRequests);
+	}
+
+	private BatchReadRequest(List<ReadRequest> readRequests) {
+		this.readRequests = Collections.unmodifiableList(readRequests);
+	}
+
+	/**
+	 * Returns the number of requests in this batch.
+	 * @return size of request batch.
+	 */
+	public int batchSize() {
+		return readRequests.size();
+	}
+
+	/**
+	 * Returns the requests in this batch as a list.
+	 * @return list of read requests
+	 */
+	public List<ReadRequest> getAsList() {
+		return readRequests;
+	}
+
+	/**
+	 * Builder for BatchReadRequest.
+	 */
+	public static class Builder {
+
+		private final List<ReadRequest> readRequests = Lists.newLinkedList();
+
+		/**
+		 * Append a get request.
+		 * @param tableName table name
+		 * @param key key to fetch.
+		 * @return this Builder
+		 */
+		public Builder get(String tableName, String key) {
+			readRequests.add(new ReadRequest(tableName, key));
+			return this;
+		}
+
+		/**
+		 * Builds a BatchReadRequest
+		 * @return BatchReadRequest
+		 */
+		public BatchReadRequest build() {
+			return new BatchReadRequest(readRequests);
+		}
+	}
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchReadResult.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadResult.java
new file mode 100644
index 0000000..7281eff
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchReadResult.java
@@ -0,0 +1,21 @@
+package org.onlab.onos.store.service;
+
+import java.util.Collections;
+import java.util.List;
+
+public class BatchReadResult {
+	
+	private final List<ReadResult> readResults;
+	
+	public BatchReadResult(List<ReadResult> readResults)  {
+		this.readResults = Collections.unmodifiableList(readResults);
+	}
+	
+	public List<ReadResult> getAsList() {
+		return readResults;
+	}
+	
+	public int batchSize() {
+		return readResults.size();
+	}
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java
new file mode 100644
index 0000000..9ce14a1
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteRequest.java
@@ -0,0 +1,90 @@
+package org.onlab.onos.store.service;
+
+import java.util.Collections;
+import java.util.List;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Collection of write requests to be submitted as one batch.
+ */
+public class BatchWriteRequest {
+
+	private final List<WriteRequest> writeRequests;
+
+	/**
+	 * Creates a new BatchWriteRequest object from the specified list of write requests.
+	 * @param writeRequests write requests.
+	 * @return BatchWriteRequest object.
+	 */
+	public static BatchWriteRequest create(List<WriteRequest> writeRequests) {
+		return new BatchWriteRequest(writeRequests);
+	}
+
+	private BatchWriteRequest(List<WriteRequest> writeRequests) {
+		this.writeRequests = Collections.unmodifiableList(writeRequests);
+	}
+
+	/**
+	 * Returns the requests in this batch as a list.
+	 * @return list of write requests
+	 */
+	public List<WriteRequest> getAsList() {
+		return writeRequests;
+	}
+
+	/**
+	 * Returns the number of requests in this batch.
+	 * @return size of request batch.
+	 */
+	public int batchSize() {
+		return writeRequests.size();
+	}
+
+	/**
+	 * Builder for BatchWriteRequest.
+	 */
+	public static class Builder {
+
+		private final List<WriteRequest> writeRequests = Lists.newLinkedList();
+
+		public Builder put(String tableName, String key, byte[] value) {
+			writeRequests.add(WriteRequest.put(tableName, key, value));
+			return this;
+		}
+
+		public Builder putIfAbsent(String tableName, String key, byte[] value) {
+			writeRequests.add(WriteRequest.putIfAbsent(tableName, key, value));
+			return this;
+		}
+
+		public Builder putIfValueMatches(String tableName, String key, byte[] oldValue, byte[] newValue) {
+			writeRequests.add(WriteRequest.putIfValueMatches(tableName, key, oldValue, newValue));
+			return this;
+		}
+
+		public Builder putIfVersionMatches(String tableName, String key, byte[] value, long version) {
+			writeRequests.add(WriteRequest.putIfVersionMatches(tableName, key, value, version));
+			return this;
+		}
+
+		public Builder remove(String tableName, String key) {
+			writeRequests.add(WriteRequest.remove(tableName, key));
+			return this;
+		}
+
+		public Builder removeIfVersionMatches(String tableName, String key, long version) {
+			writeRequests.add(WriteRequest.removeIfVersionMatches(tableName, key, version));
+			return this;
+		}
+
+		public Builder removeIfValueMatches(String tableName, String key, byte[] value) {
+			writeRequests.add(WriteRequest.removeIfValueMatches(tableName, key, value));
+			return this;
+		}
+
+		public BatchWriteRequest build() {
+			return new BatchWriteRequest(writeRequests);
+		}
+	}
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteResult.java b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteResult.java
new file mode 100644
index 0000000..22c653c
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/BatchWriteResult.java
@@ -0,0 +1,30 @@
+package org.onlab.onos.store.service;
+
+import java.util.Collections;
+import java.util.List;
+
+public class BatchWriteResult {
+	
+	private final List<WriteResult> writeResults;
+	
+	public BatchWriteResult(List<WriteResult> writeResults) {
+		this.writeResults = Collections.unmodifiableList(writeResults);
+	}
+	
+	public boolean isSuccessful() {
+		for (WriteResult result : writeResults) {
+			if (result.status() != WriteStatus.OK) {
+				return false;
+			}
+		}
+		return true;
+	}
+	
+	public List<WriteResult> getAsList() {
+		return this.writeResults;
+	}
+	
+	public int batchSize() {
+		return writeResults.size();
+	}
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java
index cf0ef0a..57d81f2 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java
@@ -1,7 +1,5 @@
 package org.onlab.onos.store.service;
 
-import java.util.List;
-
 /**
  * Service interface for a strongly consistent and durable
  * key value data store.
@@ -9,46 +7,93 @@
 public interface DatabaseService {
 
     /**
-     * Performs a read on the database.
-     * @param request read request.
-     * @return ReadResult
-     * @throws DatabaseException if there is a failure in executing read.
+     * Reads the specified key.
+     * @param tableName name of the table associated with this operation.
+     * @return key key to read.
+     * @returns value (and version) associated with this key. This calls returns null if the key does not exist.
      */
-    ReadResult read(ReadRequest request);
-
+    VersionedValue get(String tableName, String key);
+    
     /**
-     * Performs a batch read operation on the database.
-     * The main advantage of batch read operation is parallelization.
-     * @param batch batch of read requests to execute.
-     * @return batch read result.
+     * Associate the key with a value.
+     * @param tableName table name in which this key/value resides.
+     * @param key key with which the specified value is to be associated
+     * @param value value to be associated with the specified key
+     * @return the previous value associated with the specified key, or null if there was no mapping for the key.
      */
-    List<OptionalResult<ReadResult, DatabaseException>> batchRead(List<ReadRequest> batch);
-
-    // FIXME Give me a better name
+    VersionedValue put(String tableName, String key, byte[] value);
+    
     /**
-     * Performs a write operation on the database.
-     * @param request write request
-     * @return write result.
-     * @throws DatabaseException if there is failure in execution write.
+     * If the specified key is not already associated with a value, associate it with the given value.
+     * @param tableName table name in which this key/value resides.
+     * @param key key with which the specified value is to be associated
+     * @param value value to be associated with the specified key
+     * @return true if put was successful, false if there is already a value associated with this key
      */
-    OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request);
-
+    boolean putIfAbsent(String tableName, String key, byte[] value);
+    
     /**
-     * Performs a write operation on the database.
-     * @param request write request
-     * @return write result.
-     * @throws OptimisticLockException FIXME define conditional failure
-     * @throws PreconditionFailedException FIXME define conditional failure
-     * @throws DatabaseException if there is failure in execution write.
+     * Sets the key to the specified value if the version in the database (for that key)
+     * matches the specified version.
+     * @param tableName name of table associated with this operation.
+     * @param key key
+     * @param value value
+     * @param version version that should present in the database for the put to be successful.
+     * @return true if put was successful, false if there version in database is different from what is specified.
      */
-    WriteResult write(WriteRequest request)/* throws OptimisticLockException, PreconditionFailedException*/;
-
+    boolean putIfVersionMatches(String tableName, String key, byte[] value, long version);
+    
     /**
-     * Performs a batch write operation on the database.
-     * Batch write provides transactional semantics. Either all operations
-     * succeed or none of them do.
-     * @param batch batch of write requests to execute as a transaction.
-     * @return result of executing the batch write operation.
+     * Replaces the entry for a key only if currently mapped to a given value.
+     * @param tableName name of table associated with this operation.
+     * @param key with which the specified value is associated
+     * @param oldValue value expected to be associated with the specified key
+     * @param newValue value to be associated with the specified key
+     * @return true if put was successful, false if there version in database is different from what is specified.
      */
-    List<OptionalResult<WriteResult, DatabaseException>> batchWrite(List<WriteRequest> batch);
-}
+    boolean putIfValueMatches(String tableName, String key, byte[] oldValue, byte[] newValue);
+    
+    /**
+     * Removes the key (and associated value).
+     * @param tableName name of table associated with this operation.
+     * @param key key to remove
+     * @return value previously associated with the key. This call returns null if the key does not exist.
+     */
+    VersionedValue remove(String tableName, String key);
+    
+    /**
+     * Removes the key (and associated value) if the version in the database matches specified version.
+     * @param tableName name of table associated with this operation.
+     * @param key key to remove
+     * @param version version that should present in the database for the remove to be successful.
+     * @return true if remove was successful, false if there version in database is different from what is specified.
+     */
+    boolean removeIfVersionMatches(String tableName, String key, long version);
+    
+    /**
+     * Removes the key (and associated value) if the value in the database matches specified value.
+     * @param tableName name of table associated with this operation.
+     * @param key key to remove
+     * @param value value that should present in the database for the remove to be successful.
+     * @return true if remove was successful, false if there value in database is different from what is specified.
+     */
+    boolean removeIfValueMatches(String tableName, String key, byte[] value);
+    
+    /**
+     * Performs a batch read operation and returns the results.
+     * @param batchRequest batch request.
+     * @return result of the batch operation.
+     */
+    BatchReadResult batchRead(BatchReadRequest batchRequest);
+    
+    /**
+     * Performs a batch write operation and returns the results.
+     * This method provides transactional semantics. Either all writes succeed or none do.
+     * Even a single write failure would cause the entire batch to be aborted.
+     * In the case of unsuccessful operation, the batch result can be inspected to determine
+     * which operation(s) caused the batch to fail.
+     * @param batchRequest batch request.
+     * @return result of the batch operation.
+     */
+    BatchWriteResult batchWrite(BatchWriteRequest batchRequest);
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
index c1eb0ff..510df82 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/Lock.java
@@ -9,6 +9,12 @@
  */
 public interface Lock {
 
+	/**
+	 * Returns the path this lock will be used to guard from concurrent access.
+	 * @return path.
+	 */
+	String path();
+
     /**
      * Acquires the lock.
      * If the lock is not available then the caller thread becomes
@@ -24,19 +30,19 @@
      * @param leaseDurationMillis the number of milliseconds to hold the
      * lock after granting it, before automatically releasing it if it hasn't
      * already been released by invoking unlock(). Must be in the range
-     * (0, MAX_LEASE_MILLIS]
+     * (0, LockManager.MAX_LEASE_MILLIS]
      */
-    void lock(long leaseDurationMillis);
+    void lock(int leaseDurationMillis);
 
     /**
      * Acquires the lock only if it is free at the time of invocation.
      * @param leaseDurationMillis the number of milliseconds the must be
      * locked after it is granted, before automatically releasing it if it hasn't
      * already been released by an invocation of unlock(). Must be in the range
-     * (0, MAX_LEASE_MILLIS]
+     * (0, LockManager.MAX_LEASE_MILLIS]
      * @return true if the lock was acquired and false otherwise
      */
-    boolean tryLock(long leaseDurationMillis);
+    boolean tryLock(int leaseDurationMillis);
 
     /**
      * Acquires the lock if it is free within the given waiting
@@ -45,11 +51,11 @@
      * @param leaseDurationMillis the number of milliseconds to hold the
      * lock after granting it, before automatically releasing it if it hasn't
      * already been released by invoking unlock(Object). Must be in the range
-     * (0, MAX_LEASE_MILLIS]
+     * (0, LockManager.MAX_LEASE_MILLIS]
      * @return true if the lock was acquired and false if the waiting time
      * elapsed before the lock was acquired
      */
-    boolean tryLock(long waitTimeMillis, long leaseDurationMillis);
+    boolean tryLock(long waitTimeMillis, int leaseDurationMillis);
 
     /**
      * Returns true if this Lock instance currently holds the lock.
@@ -63,16 +69,14 @@
     void unlock();
 
     /**
-     * Extends the lease for this lock.
-     * @param extensionDurationMillis is the amount of additional
-     * time to add to the end of the current expiration time. For example,
-     * if the lock is currently set to expire at time T, a successful call to
-     * extendLease with an argument of 5000 will cause the lock to
-     * now expire at 5 seconds past T.
-     * @return true if the extension is successful, false otherwise. Note
-     * that a failure to extend the lease does not result in unlocking. The lock
-     * will be released either by an explicit call to unlock or when previously
-     * acquired lease runs out.
+     * Extends the expiration time for a lock that is currently owned
+     * by a specified duration. The new expiration time is computed
+     * by adding the specified duration to the current time. If this point
+     * in time is earlier than the existing expiration time then this method
+     * has no effect.
+     * @param leaseDurationMillis extension duration.
+     * @return true if successfully extended expiration, false if attempt to
+     * extend expiration fails or if the path is currently not locked by this instance.
      */
-    boolean extendLease(long extensionDurationMillis);
+    boolean extendExpiration(int leaseDurationMillis);
 }
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/LockService.java b/core/api/src/main/java/org/onlab/onos/store/service/LockService.java
index 7b742b1..a4ac30b 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/LockService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/LockService.java
@@ -1,13 +1,16 @@
 package org.onlab.onos.store.service;
 
+/**
+ * Service interface for mutual exclusion primitives.
+ */
 public interface LockService {
 
     /**
-     * Create a new lock instance.
+     * Creates a new lock instance.
      * A successful return from this method does not mean the resource guarded by the path is locked.
-     * The caller is expect to call Lock.lock() to acquire the lock.
+     * The caller is expected to call Lock.lock() to acquire the lock.
      * @param path unique lock name.
-     * @return
+     * @return a Lock instance that can be used to acquire the lock.
      */
     Lock create(String path);
 
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/ReadRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/ReadRequest.java
index 9f7af4a..f30529e 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/ReadRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/ReadRequest.java
@@ -75,5 +75,4 @@
         return Objects.equals(this.key, other.key) &&
                 Objects.equals(this.tableName, other.tableName);
     }
-
 }
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java b/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
index 49e9665..3f253f2 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
@@ -11,12 +11,21 @@
     private final String tableName;
     private final String key;
     private final VersionedValue value;
+    private final ReadStatus status;
 
-    public ReadResult(String tableName, String key, VersionedValue value) {
+    public ReadResult(ReadStatus status, String tableName, String key, VersionedValue value) {
+    	this.status = status;
         this.tableName = tableName;
         this.key = key;
         this.value = value;
     }
+    
+    /**
+     * Returns the status of the read operation.
+     */
+    public ReadStatus status() {
+    	return status;
+    }
 
     /**
      * Returns database table name.
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/ReadStatus.java b/core/api/src/main/java/org/onlab/onos/store/service/ReadStatus.java
new file mode 100644
index 0000000..2039a1c
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/ReadStatus.java
@@ -0,0 +1,6 @@
+package org.onlab.onos.store.service;
+
+public enum ReadStatus {
+	OK,
+	NO_SUCH_TABLE
+}
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
index fb736e5..6607cfe 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
@@ -112,7 +112,7 @@
      * @param previousVersion previous version expected
      * @return WriteRequest
      */
-    public static WriteRequest remove(String tableName, String key,
+    public static WriteRequest removeIfVersionMatches(String tableName, String key,
                                       long previousVersion) {
         return new WriteRequest(REMOVE_IF_VALUE, tableName, key,
                                 null, previousVersion, null);
@@ -127,7 +127,7 @@
      * @param oldValue  previous value expected, must not be null
      * @return WriteRequest
      */
-    public static WriteRequest remove(String tableName, String key,
+    public static WriteRequest removeIfValueMatches(String tableName, String key,
                                       byte[] oldValue) {
         return new WriteRequest(Type.REMOVE_IF_VALUE, tableName, key,
                                 null, ANY_VERSION, checkNotNull(oldValue));
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/WriteResult.java b/core/api/src/main/java/org/onlab/onos/store/service/WriteResult.java
index aec3046..3cc11b0 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/WriteResult.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/WriteResult.java
@@ -7,34 +7,27 @@
  * Database write result.
  */
 public class WriteResult {
-
-    private final String tableName;
-    private final String key;
+	
+    private final WriteStatus status;
     private final VersionedValue previousValue;
-
-    public WriteResult(String tableName, String key, VersionedValue previousValue) {
-        this.tableName = tableName;
-        this.key = key;
+    
+    public WriteResult(WriteStatus status, VersionedValue previousValue) {
+    	this.status = status;
         this.previousValue = previousValue;
     }
 
-    public String tableName() {
-        return tableName;
-    }
-
-    public String key() {
-        return key;
-    }
-
     public VersionedValue previousValue() {
         return previousValue;
     }
+    
+    public WriteStatus status() {
+    	return status;
+    }
 
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
-                .add("tableName", tableName)
-                .add("key", key)
+        		.add("status", status)
                 .add("previousValue", previousValue)
                 .toString();
     }
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/WriteStatus.java b/core/api/src/main/java/org/onlab/onos/store/service/WriteStatus.java
new file mode 100644
index 0000000..ebb4f6d
--- /dev/null
+++ b/core/api/src/main/java/org/onlab/onos/store/service/WriteStatus.java
@@ -0,0 +1,8 @@
+package org.onlab.onos.store.service;
+
+public enum WriteStatus {
+	OK,
+	ABORTED,
+	PRECONDITION_VIOLATION,
+	NO_SUCH_TABLE,
+}
diff --git a/core/store/dist/pom.xml b/core/store/dist/pom.xml
index f0f37c7..10196c2 100644
--- a/core/store/dist/pom.xml
+++ b/core/store/dist/pom.xml
@@ -103,6 +103,12 @@
           <artifactId>hazelcast</artifactId>
         </dependency>
 
+        <dependency>
+          <groupId>net.jodah</groupId>
+          <artifactId>expiringmap</artifactId>
+          <version>0.3.1</version>
+        </dependency>
+
         <!-- for shaded copycat -->
         <dependency>
           <groupId>org.onlab.onos</groupId>
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index c749197..77ff062 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -8,9 +8,11 @@
 
 import net.kuujo.copycat.Copycat;
 
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchWriteRequest;
 import org.onlab.onos.store.service.DatabaseException;
-import org.onlab.onos.store.service.ReadRequest;
-import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.WriteResult;
 
 /**
  * Client for interacting with the Copycat Raft cluster.
@@ -63,9 +65,9 @@
         }
     }
 
-    public List<InternalReadResult> batchRead(List<ReadRequest> requests) {
+    public List<ReadResult> batchRead(BatchReadRequest batchRequest) {
 
-        CompletableFuture<List<InternalReadResult>> future = copycat.submit("read", requests);
+        CompletableFuture<List<ReadResult>> future = copycat.submit("read", batchRequest);
         try {
             return future.get();
         } catch (InterruptedException | ExecutionException e) {
@@ -73,9 +75,9 @@
         }
     }
 
-    public List<InternalWriteResult> batchWrite(List<WriteRequest> requests) {
+    public List<WriteResult> batchWrite(BatchWriteRequest batchRequest) {
 
-        CompletableFuture<List<InternalWriteResult>> future = copycat.submit("write", requests);
+        CompletableFuture<List<WriteResult>> future = copycat.submit("write", batchRequest);
         try {
             return future.get();
         } catch (InterruptedException | ExecutionException e) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index afeaef9..e3de134 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -4,8 +4,6 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -22,7 +20,6 @@
 import net.kuujo.copycat.cluster.TcpCluster;
 import net.kuujo.copycat.cluster.TcpClusterConfig;
 import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.log.InMemoryLog;
 import net.kuujo.copycat.log.Log;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -37,17 +34,18 @@
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchReadResult;
+import org.onlab.onos.store.service.BatchWriteRequest;
+import org.onlab.onos.store.service.BatchWriteResult;
 import org.onlab.onos.store.service.DatabaseAdminService;
 import org.onlab.onos.store.service.DatabaseException;
 import org.onlab.onos.store.service.DatabaseService;
-import org.onlab.onos.store.service.NoSuchTableException;
-import org.onlab.onos.store.service.OptimisticLockException;
-import org.onlab.onos.store.service.OptionalResult;
-import org.onlab.onos.store.service.ReadRequest;
 import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.WriteAborted;
-import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.ReadStatus;
+import org.onlab.onos.store.service.VersionedValue;
 import org.onlab.onos.store.service.WriteResult;
+import org.onlab.onos.store.service.WriteStatus;
 import org.onlab.packet.IpAddress;
 import org.slf4j.Logger;
 
@@ -199,66 +197,121 @@
     }
 
     @Override
-    public ReadResult read(ReadRequest request) {
-        return batchRead(Arrays.asList(request)).get(0).get();
-    }
-
-    @Override
-    public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
-            List<ReadRequest> batch) {
-        List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
-        for (InternalReadResult internalReadResult : client.batchRead(batch)) {
-            if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
-                readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
-                        new NoSuchTableException()));
-            } else {
-                readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
-                        internalReadResult.result()));
-            }
+    public VersionedValue get(String tableName, String key) {
+        BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
+        ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
+        if (readResult.status().equals(ReadStatus.OK)) {
+            return readResult.value();
         }
-        return readResults;
+        throw new DatabaseException("get failed due to status: " + readResult.status());
     }
 
     @Override
-    public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
-        return batchWrite(Arrays.asList(request)).get(0);
+    public BatchReadResult batchRead(BatchReadRequest batchRequest) {
+        return new BatchReadResult(client.batchRead(batchRequest));
     }
 
     @Override
-    public WriteResult write(WriteRequest request) {
-//            throws OptimisticLockException, PreconditionFailedException {
-        return writeNothrow(request).get();
+    public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
+        return new BatchWriteResult(client.batchWrite(batchRequest));
     }
 
     @Override
-    public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
-            List<WriteRequest> batch) {
-        List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
-        for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
-            if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new NoSuchTableException()));
-            } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new OptimisticLockException()));
-            } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
-                // TODO: throw a different exception?
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new OptimisticLockException()));
-            } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new WriteAborted()));
-            } else {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        internalWriteResult.result()));
-            }
+    public VersionedValue put(String tableName, String key, byte[] value) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return writeResult.previousValue();
         }
-        return writeResults;
+        throw new DatabaseException("put failed due to status: " + writeResult.status());
+    }
 
+    @Override
+    public boolean putIfAbsent(String tableName, String key, byte[] value) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfAbsent(tableName, key, value).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("putIfAbsent failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean putIfVersionMatches(String tableName, String key,
+            byte[] value, long version) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfVersionMatches(tableName, key, value, version).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("putIfVersionMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean putIfValueMatches(String tableName, String key,
+            byte[] oldValue, byte[] newValue) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfValueMatches(tableName, key, oldValue, newValue).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("putIfValueMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public VersionedValue remove(String tableName, String key) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().remove(tableName, key).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return writeResult.previousValue();
+        }
+        throw new DatabaseException("remove failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean removeIfVersionMatches(String tableName, String key,
+            long version) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfVersionMatches(tableName, key, version).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("removeIfVersionMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean removeIfValueMatches(String tableName, String key,
+            byte[] value) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfValueMatches(tableName, key, value).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("removeIfValueMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public void addMember(final ControllerNode node) {
+        final TcpMember tcpMember = new TcpMember(node.ip().toString(),
+                                                  node.tcpPort());
+        log.info("{} was added to the cluster", tcpMember);
+        synchronized (clusterConfig) {
+            clusterConfig.addRemoteMember(tcpMember);
+        }
     }
 
     private final class InternalClusterEventListener
-            implements ClusterEventListener {
+    implements ClusterEventListener {
 
         @Override
         public void event(ClusterEvent event) {
@@ -266,7 +319,7 @@
 
             final ControllerNode node = event.subject();
             final TcpMember tcpMember = new TcpMember(node.ip().toString(),
-                                                      node.tcpPort());
+                    node.tcpPort());
 
             switch (event.type()) {
             case INSTANCE_ACTIVATED:
@@ -284,8 +337,8 @@
             case INSTANCE_REMOVED:
                 if (autoAddMember) {
                     Set<DefaultControllerNode> members
-                        = tabletMembers.getOrDefault(DEFAULT_TABLET,
-                                                     Collections.emptySet());
+                    = tabletMembers.getOrDefault(DEFAULT_TABLET,
+                            Collections.emptySet());
                     // remove only if not the initial members
                     if (!members.contains(node)) {
                         synchronized (clusterConfig) {
@@ -308,63 +361,6 @@
 
     }
 
-    public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
-        public KryoRegisteredInMemoryLog() {
-            super();
-            // required to deserialize object across bundles
-            super.kryo.register(TcpMember.class, new TcpMemberSerializer());
-            super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
-        }
-    }
-
-    private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
-
-        private final R result;
-        private final DatabaseException exception;
-
-        public DatabaseOperationResult(R result) {
-            this.result = result;
-            this.exception = null;
-        }
-
-        public DatabaseOperationResult(DatabaseException exception) {
-            this.result = null;
-            this.exception = exception;
-        }
-
-        @Override
-        public R get() {
-            if (result != null) {
-                return result;
-            }
-            throw exception;
-        }
-
-        @Override
-        public boolean hasValidResult() {
-            return result != null;
-        }
-
-        @Override
-        public String toString() {
-            if (result != null) {
-                return result.toString();
-            } else {
-                return exception.toString();
-            }
-        }
-    }
-
-    @Override
-    public void addMember(final ControllerNode node) {
-        final TcpMember tcpMember = new TcpMember(node.ip().toString(),
-                                                  node.tcpPort());
-        log.info("{} was added to the cluster", tcpMember);
-        synchronized (clusterConfig) {
-            clusterConfig.addRemoteMember(tcpMember);
-        }
-    }
-
     @Override
     public void removeMember(final ControllerNode node) {
         final TcpMember tcpMember = new TcpMember(node.ip().toString(),
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 50d1203..17b174c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -8,6 +8,7 @@
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 
@@ -15,16 +16,21 @@
 import net.kuujo.copycat.Query;
 import net.kuujo.copycat.StateMachine;
 
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchWriteRequest;
 import org.onlab.onos.store.service.ReadRequest;
 import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.ReadStatus;
 import org.onlab.onos.store.service.VersionedValue;
 import org.onlab.onos.store.service.WriteRequest;
 import org.onlab.onos.store.service.WriteResult;
-import org.onlab.onos.store.service.impl.InternalWriteResult.Status;
+import org.onlab.onos.store.service.WriteStatus;
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
+import com.beust.jcommander.internal.Lists;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Maps;
 import com.google.common.io.ByteStreams;
@@ -40,6 +46,10 @@
 
     private final Logger log = getLogger(getClass());
 
+    // message subject for database update notifications.
+    public static MessageSubject DATABASE_UPDATE_EVENTS =
+            new MessageSubject("database-update-events");
+
     // serializer used for snapshot
     public static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
@@ -47,29 +57,72 @@
             serializerPool = KryoNamespace.newBuilder()
                     .register(VersionedValue.class)
                     .register(State.class)
+                    .register(BatchReadRequest.class)
+                    .register(BatchWriteRequest.class)
+                    .register(ReadStatus.class)
+                    .register(WriteStatus.class)
+                    // TODO: Move this out ?
+                    .register(TableModificationEvent.class)
                     .register(ClusterMessagingProtocol.COMMON)
                     .build()
                     .populate(1);
         }
     };
 
+    private final List<DatabaseUpdateEventListener> listeners = Lists.newLinkedList();
+
+    // durable internal state of the database.
     private State state = new State();
 
     private boolean compressSnapshot = false;
 
     @Command
     public boolean createTable(String tableName) {
-        return state.getTables().putIfAbsent(tableName, Maps.newHashMap()) == null;
+        Map<String, VersionedValue> existingTable =
+                state.getTables().putIfAbsent(tableName, Maps.newHashMap());
+        if (existingTable == null) {
+            for (DatabaseUpdateEventListener listener : listeners) {
+                listener.tableCreated(tableName, Integer.MAX_VALUE);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    @Command
+    public boolean createTable(String tableName, int expirationTimeMillis) {
+        Map<String, VersionedValue> existingTable =
+                state.getTables().putIfAbsent(tableName, Maps.newHashMap());
+        if (existingTable == null) {
+            for (DatabaseUpdateEventListener listener : listeners) {
+                listener.tableCreated(tableName, expirationTimeMillis);
+            }
+            return true;
+        }
+        return false;
     }
 
     @Command
     public boolean dropTable(String tableName) {
-        return state.getTables().remove(tableName) != null;
+        Map<String, VersionedValue> table = state.getTables().remove(tableName);
+        if (table != null) {
+            for (DatabaseUpdateEventListener listener : listeners) {
+                listener.tableDeleted(tableName);
+            }
+            return true;
+        }
+        return false;
     }
 
     @Command
     public boolean dropAllTables() {
+        Set<String> tableNames = state.getTables().keySet();
         state.getTables().clear();
+        for (DatabaseUpdateEventListener listener : listeners) {
+            for (String tableName : tableNames) {
+                listener.tableDeleted(tableName);
+            }
+        }
         return true;
     }
 
@@ -79,96 +132,95 @@
     }
 
     @Query
-    public List<InternalReadResult> read(List<ReadRequest> requests) {
-        List<InternalReadResult> results = new ArrayList<>(requests.size());
-        for (ReadRequest request : requests) {
+    public List<ReadResult> read(BatchReadRequest batchRequest) {
+        List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
+        for (ReadRequest request : batchRequest.getAsList()) {
             Map<String, VersionedValue> table = state.getTables().get(request.tableName());
             if (table == null) {
-                results.add(new InternalReadResult(InternalReadResult.Status.NO_SUCH_TABLE, null));
+                results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
                 continue;
             }
             VersionedValue value = VersionedValue.copy(table.get(request.key()));
-            results.add(new InternalReadResult(
-                    InternalReadResult.Status.OK,
-                    new ReadResult(
-                            request.tableName(),
-                            request.key(),
-                            value)));
+            results.add(new ReadResult(ReadStatus.OK, request.tableName(), request.key(), value));
         }
         return results;
     }
 
-    InternalWriteResult.Status checkIfApplicable(WriteRequest request,
-                                                 VersionedValue value) {
+    WriteStatus checkIfApplicable(WriteRequest request,
+                                        VersionedValue value) {
 
         switch (request.type()) {
         case PUT:
-            return InternalWriteResult.Status.OK;
+            return WriteStatus.OK;
 
         case PUT_IF_ABSENT:
             if (value == null) {
-                return InternalWriteResult.Status.OK;
+                return WriteStatus.OK;
             }
-            return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+            return WriteStatus.PRECONDITION_VIOLATION;
         case PUT_IF_VALUE:
         case REMOVE_IF_VALUE:
             if (value != null && Arrays.equals(value.value(), request.oldValue())) {
-                return InternalWriteResult.Status.OK;
+                return WriteStatus.OK;
             }
-            return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+            return WriteStatus.PRECONDITION_VIOLATION;
         case PUT_IF_VERSION:
         case REMOVE_IF_VERSION:
             if (value != null && request.previousVersion() == value.version()) {
-                return InternalWriteResult.Status.OK;
+                return WriteStatus.OK;
             }
-            return InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH;
+            return WriteStatus.PRECONDITION_VIOLATION;
         case REMOVE:
-            return InternalWriteResult.Status.OK;
+            return WriteStatus.OK;
         default:
             break;
         }
         log.error("Should never reach here {}", request);
-        return InternalWriteResult.Status.ABORTED;
+        return WriteStatus.ABORTED;
     }
 
     @Command
-    public List<InternalWriteResult> write(List<WriteRequest> requests) {
+    public List<WriteResult> write(BatchWriteRequest batchRequest) {
 
         // applicability check
         boolean abort = false;
-        List<InternalWriteResult.Status> validationResults = new ArrayList<>(requests.size());
-        for (WriteRequest request : requests) {
+        List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
+        for (WriteRequest request : batchRequest.getAsList()) {
             Map<String, VersionedValue> table = state.getTables().get(request.tableName());
             if (table == null) {
-                validationResults.add(InternalWriteResult.Status.NO_SUCH_TABLE);
+                validationResults.add(WriteStatus.NO_SUCH_TABLE);
                 abort = true;
                 continue;
             }
             final VersionedValue value = table.get(request.key());
-            Status result = checkIfApplicable(request, value);
+            WriteStatus result = checkIfApplicable(request, value);
             validationResults.add(result);
-            if (result != Status.OK) {
+            if (result != WriteStatus.OK) {
                 abort = true;
             }
         }
 
-        List<InternalWriteResult> results = new ArrayList<>(requests.size());
+        List<WriteResult> results = new ArrayList<>(batchRequest.batchSize());
 
         if (abort) {
-            for (InternalWriteResult.Status validationResult : validationResults) {
-                if (validationResult == InternalWriteResult.Status.OK) {
+            for (WriteStatus validationResult : validationResults) {
+                if (validationResult == WriteStatus.OK) {
                     // aborted due to applicability check failure on other request
-                    results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
+                    results.add(new WriteResult(WriteStatus.ABORTED, null));
                 } else {
-                    results.add(new InternalWriteResult(validationResult, null));
+                    results.add(new WriteResult(validationResult, null));
                 }
             }
             return results;
         }
 
+        List<TableModificationEvent> tableModificationEvents = Lists.newLinkedList();
+
         // apply changes
-        for (WriteRequest request : requests) {
+        for (WriteRequest request : batchRequest.getAsList()) {
             Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+
+            TableModificationEvent tableModificationEvent = null;
             // FIXME: If this method could be called by multiple thread,
             // synchronization scope is wrong.
             // Whole function including applicability check needs to be protected.
@@ -182,16 +234,23 @@
                 case PUT_IF_VERSION:
                     VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
                     VersionedValue previousValue = table.put(request.key(), newValue);
-                    WriteResult putResult = new WriteResult(request.tableName(), request.key(), previousValue);
-                    results.add(InternalWriteResult.ok(putResult));
+                    WriteResult putResult = new WriteResult(WriteStatus.OK, previousValue);
+                    results.add(putResult);
+                    tableModificationEvent = (previousValue == null) ?
+                            TableModificationEvent.rowAdded(request.tableName(), request.key()) :
+                            TableModificationEvent.rowUpdated(request.tableName(), request.key());
                     break;
 
                 case REMOVE:
                 case REMOVE_IF_VALUE:
                 case REMOVE_IF_VERSION:
                     VersionedValue removedValue = table.remove(request.key());
-                    WriteResult removeResult = new WriteResult(request.tableName(), request.key(), removedValue);
-                    results.add(InternalWriteResult.ok(removeResult));
+                    WriteResult removeResult = new WriteResult(WriteStatus.OK, removedValue);
+                    results.add(removeResult);
+                    if (removedValue != null) {
+                        tableModificationEvent =
+                                TableModificationEvent.rowDeleted(request.tableName(), request.key());
+                    }
                     break;
 
                 default:
@@ -199,7 +258,19 @@
                     break;
                 }
             }
+
+            if (tableModificationEvent != null) {
+                tableModificationEvents.add(tableModificationEvent);
+            }
         }
+
+        // notify listeners of table mod events.
+        for (DatabaseUpdateEventListener listener : listeners) {
+            for (TableModificationEvent tableModificationEvent : tableModificationEvents) {
+                listener.tableModified(tableModificationEvent);
+            }
+        }
+
         return results;
     }
 
@@ -253,4 +324,8 @@
             throw new SnapshotException(e);
         }
     }
+
+    public void addEventListener(DatabaseUpdateEventListener listener) {
+        listeners.add(listener);
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java
new file mode 100644
index 0000000..58d5446
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventHandler.java
@@ -0,0 +1,163 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onlab.onos.store.service.impl;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import net.jodah.expiringmap.ExpiringMap;
+import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
+import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
+import net.kuujo.copycat.cluster.Member;
+import net.kuujo.copycat.event.EventHandler;
+import net.kuujo.copycat.event.LeaderElectEvent;
+
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.service.DatabaseService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DatabaseUpdateEventHandler implements DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    public final static MessageSubject DATABASE_UPDATES = new MessageSubject("database-update-event");
+
+    private DatabaseService databaseService;
+    private ClusterService cluster;
+    private ClusterCommunicationService clusterCommunicator;
+
+    private final Member localMember;
+    private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
+    private final Map<String, Map<DatabaseRow, Void>> tableEntryExpirationMap = new HashMap<>();
+    private final ExpirationListener<DatabaseRow, Void> expirationObserver = new ExpirationObserver();
+
+    DatabaseUpdateEventHandler(Member localMember) {
+        this.localMember = localMember;
+    }
+
+    @Override
+    public void tableModified(TableModificationEvent event) {
+        DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
+        Map<DatabaseRow, Void> map = tableEntryExpirationMap.get(event.tableName());
+
+        switch (event.type()) {
+        case ROW_DELETED:
+            if (isLocalMemberLeader.get()) {
+                try {
+                    clusterCommunicator.broadcast(
+                            new ClusterMessage(
+                                    cluster.getLocalNode().id(),
+                                    DATABASE_UPDATES,
+                                    DatabaseStateMachine.SERIALIZER.encode(event)));
+                } catch (IOException e) {
+                    log.error("Failed to broadcast a database table modification event.", e);
+                }
+            }
+            break;
+        case ROW_ADDED:
+        case ROW_UPDATED:
+            map.put(row, null);
+            break;
+        default:
+            break;
+        }
+    }
+
+    @Override
+    public void tableCreated(String tableName, int expirationTimeMillis) {
+        // make this explicit instead of relying on a negative value
+        // to indicate no expiration.
+        if (expirationTimeMillis > 0) {
+            tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
+                    .expiration(expirationTimeMillis, TimeUnit.SECONDS)
+                    .expirationListener(expirationObserver)
+                    // FIXME: make the expiration policy configurable.
+                    .expirationPolicy(ExpirationPolicy.CREATED)
+                    .build());
+        }
+    }
+
+    @Override
+    public void tableDeleted(String tableName) {
+        tableEntryExpirationMap.remove(tableName);
+    }
+
+    private class ExpirationObserver implements ExpirationListener<DatabaseRow, Void> {
+        @Override
+        public void expired(DatabaseRow key, Void value) {
+            try {
+                // TODO: The safety of this check needs to be verified.
+                // Couple of issues:
+                // 1. It is very likely that only one member should attempt deletion of the entry from database.
+                // 2. A potential race condition exists where the entry expires, but before its can be deleted
+                // from the database, a new entry is added or existing entry is updated.
+                // That means ttl and expiration should be for a given version.
+                if (isLocalMemberLeader.get()) {
+                    databaseService.remove(key.tableName, key.key);
+                }
+            } catch (Exception e) {
+                log.warn("Failed to delete entry from the database after ttl expiration. Will retry eviction", e);
+                tableEntryExpirationMap.get(key.tableName).put(new DatabaseRow(key.tableName, key.key), null);
+            }
+        }
+    }
+
+    @Override
+    public void handle(LeaderElectEvent event) {
+        if (localMember.equals(event.leader())) {
+            isLocalMemberLeader.set(true);
+        }
+    }
+
+    private class DatabaseRow {
+
+        String tableName;
+        String key;
+
+        public DatabaseRow(String tableName, String key) {
+            this.tableName = tableName;
+            this.key = key;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+            if (!(obj instanceof DatabaseRow)) {
+                return false;
+            }
+            DatabaseRow that = (DatabaseRow) obj;
+
+            return Objects.equals(this.tableName, that.tableName) &&
+                   Objects.equals(this.key, that.key);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableName, key);
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
new file mode 100644
index 0000000..f7b147f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2014 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onlab.onos.store.service.impl;
+
+public interface DatabaseUpdateEventListener {
+    
+    /**
+     * 
+     * @param event
+     */
+    public void tableModified(TableModificationEvent event);
+    
+    /**
+     * 
+     * @param tableName
+     * @param expirationTimeMillis
+     */
+    public void tableCreated(String tableName, int expirationTimeMillis);
+    
+    /**
+     * 
+     * @param tableName
+     */
+    public void tableDeleted(String tableName);
+
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
new file mode 100644
index 0000000..eaae063
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLock.java
@@ -0,0 +1,123 @@
+package org.onlab.onos.store.service.impl;
+
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.joda.time.DateTime;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.OptimisticLockException;
+
+/**
+ * A distributed lock implementation.
+ */
+public class DistributedLock implements Lock {
+
+    private final DistributedLockManager lockManager;
+    private final DatabaseService databaseService;
+    private final String path;
+    private DateTime lockExpirationTime;
+    private AtomicBoolean isLocked = new AtomicBoolean(false);
+    private byte[] lockId;
+
+    public DistributedLock(
+            String path,
+            DatabaseService databaseService,
+            ClusterService clusterService,
+            DistributedLockManager lockManager) {
+
+        this.path = path;
+        this.databaseService = databaseService;
+        this.lockManager = lockManager;
+        this.lockId =
+                (UUID.randomUUID().toString() + "::" + clusterService.getLocalNode().id().toString()).getBytes();
+    }
+
+    @Override
+    public String path() {
+        return path;
+    }
+
+    @Override
+    public void lock(int leaseDurationMillis) {
+
+        if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
+            // Nothing to do.
+            // Current expiration time is beyond what is requested.
+            return;
+        } else {
+            tryLock(Long.MAX_VALUE, leaseDurationMillis);
+        }
+    }
+
+    @Override
+    public boolean tryLock(int leaseDurationMillis) {
+        try {
+            databaseService.putIfAbsent(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
+            return true;
+        } catch (OptimisticLockException e) {
+            return false;
+        }
+    }
+
+    @Override
+    public boolean tryLock(
+            long waitTimeMillis,
+            int leaseDurationMillis) {
+        if (tryLock(leaseDurationMillis) == false) {
+            CompletableFuture<Void> future =
+                    lockManager.lockIfAvailable(this, waitTimeMillis, leaseDurationMillis);
+            try {
+                future.get(waitTimeMillis, TimeUnit.MILLISECONDS);
+            } catch (ExecutionException | InterruptedException e) {
+                // TODO: ExecutionException could indicate something
+                // wrong with the backing database.
+                // Throw an exception?
+                return false;
+            } catch (TimeoutException e) {
+                return false;
+            }
+        }
+        lockExpirationTime = DateTime.now().plusMillis(leaseDurationMillis);
+        return true;
+    }
+
+    @Override
+    public boolean isLocked() {
+        if (isLocked.get()) {
+            // We rely on local information to check
+            // if the expired.
+            // This should should make this call
+            // light weight, which still retaining the same
+            // safety guarantees.
+            if (DateTime.now().isAfter(lockExpirationTime)) {
+                isLocked.set(false);
+                return false;
+            }
+        }
+        return true;
+    }
+
+    @Override
+    public void unlock() {
+        if (!isLocked()) {
+            return;
+        } else {
+            databaseService.removeIfValueMatches(DistributedLockManager.ONOS_LOCK_TABLE_NAME, path, lockId);
+        }
+    }
+
+    @Override
+    public boolean extendExpiration(int leaseDurationMillis) {
+        if (isLocked() && lockExpirationTime.isAfter(DateTime.now().plusMillis(leaseDurationMillis))) {
+            return true;
+        } else {
+            return tryLock(leaseDurationMillis);
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
new file mode 100644
index 0000000..c87ab37
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DistributedLockManager.java
@@ -0,0 +1,159 @@
+package org.onlab.onos.store.service.impl;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.joda.time.DateTime;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.Lock;
+import org.onlab.onos.store.service.LockEventListener;
+import org.onlab.onos.store.service.LockService;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ArrayListMultimap;
+
+@Component(immediate = true)
+@Service
+public class DistributedLockManager implements LockService {
+
+    private final Logger log = getLogger(getClass());
+
+    public static final String ONOS_LOCK_TABLE_NAME = "onos-locks";
+
+    private final ArrayListMultimap<String, LockRequest> locksToAcquire = ArrayListMultimap.create();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private DatabaseService databaseService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterService clusterService;
+
+    @Activate
+    public void activate() {
+        clusterCommunicator.addSubscriber(
+                DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
+                new LockEventMessageListener());
+        log.info("Started.");
+
+    }
+
+    @Deactivate
+    public void deactivate() {
+        locksToAcquire.clear();
+        log.info("Started.");
+    }
+
+    @Override
+    public Lock create(String path) {
+        return new DistributedLock(
+                path,
+                databaseService,
+                clusterService,
+                this);
+    }
+
+    @Override
+    public void addListener(LockEventListener listener) {
+        // FIXME:
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void removeListener(LockEventListener listener) {
+        // FIXME:
+        throw new UnsupportedOperationException();
+    }
+
+    protected CompletableFuture<Void> lockIfAvailable(Lock lock, long waitTimeMillis, int leaseDurationMillis) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        locksToAcquire.put(
+                lock.path(),
+                new LockRequest(lock, waitTimeMillis, leaseDurationMillis, future));
+        return future;
+    }
+
+    private class LockEventMessageListener implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            TableModificationEvent event = DatabaseStateMachine.SERIALIZER.decode(message.payload());
+            if (!event.tableName().equals(ONOS_LOCK_TABLE_NAME)) {
+                return;
+            }
+
+            String path = event.key();
+            if (!locksToAcquire.containsKey(path)) {
+                return;
+            }
+
+            if (event.type() == TableModificationEvent.Type.ROW_DELETED) {
+                List<LockRequest> existingRequests = locksToAcquire.get(path);
+                if (existingRequests == null) return;
+
+                Iterator<LockRequest> existingRequestIterator = existingRequests.iterator();
+                while (existingRequestIterator.hasNext()) {
+                    LockRequest request = existingRequestIterator.next();
+                    if (request.expirationTime().isAfter(DateTime.now())) {
+                        existingRequestIterator.remove();
+                    } else {
+                        if (request.lock().tryLock(request.leaseDurationMillis()) == true) {
+                            request.future().complete(null);
+                            existingRequests.remove(0);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    private class LockRequest {
+
+        private final Lock lock;
+        private final DateTime expirationTime;
+        private final int leaseDurationMillis;
+        private final CompletableFuture<Void> future;
+
+        public LockRequest(
+            Lock lock,
+            long waitTimeMillis,
+            int leaseDurationMillis,
+            CompletableFuture<Void> future) {
+
+            this.lock = lock;
+            this.expirationTime = DateTime.now().plusMillis((int) waitTimeMillis);
+            this.leaseDurationMillis = leaseDurationMillis;
+            this.future = future;
+        }
+
+        public Lock lock() {
+            return lock;
+        }
+
+        public DateTime expirationTime() {
+            return expirationTime;
+        }
+
+        public int leaseDurationMillis() {
+            return leaseDurationMillis;
+        }
+
+        public CompletableFuture<Void> future() {
+            return future;
+        }
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
new file mode 100644
index 0000000..25956de
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/TableModificationEvent.java
@@ -0,0 +1,44 @@
+package org.onlab.onos.store.service.impl;
+
+public class TableModificationEvent {
+
+    public enum Type {
+        ROW_ADDED,
+        ROW_DELETED,
+        ROW_UPDATED
+    }
+    
+    private final String tableName;
+    private final String key;
+    private final Type type;
+
+    public static TableModificationEvent rowDeleted(String tableName, String key) {
+        return new TableModificationEvent(tableName, key, Type.ROW_DELETED);
+    }
+    
+    public static TableModificationEvent rowAdded(String tableName, String key) {
+        return new TableModificationEvent(tableName, key, Type.ROW_ADDED);
+    }
+    
+    public static TableModificationEvent rowUpdated(String tableName, String key) {
+        return new TableModificationEvent(tableName, key, Type.ROW_UPDATED);
+    }
+
+    private TableModificationEvent(String tableName, String key, Type type) {
+        this.tableName = tableName;
+        this.key = key;
+        this.type = type;
+    }
+
+    public String tableName() {
+        return tableName;
+    }
+
+    public String key() {
+        return key;
+    }
+
+    public Type type() {
+        return type;
+    }
+}