Reworked DatabaseService API.
Initial implementation of LockManager.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index afeaef9..e3de134 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -4,8 +4,6 @@
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -22,7 +20,6 @@
 import net.kuujo.copycat.cluster.TcpCluster;
 import net.kuujo.copycat.cluster.TcpClusterConfig;
 import net.kuujo.copycat.cluster.TcpMember;
-import net.kuujo.copycat.log.InMemoryLog;
 import net.kuujo.copycat.log.Log;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -37,17 +34,18 @@
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.service.BatchReadRequest;
+import org.onlab.onos.store.service.BatchReadResult;
+import org.onlab.onos.store.service.BatchWriteRequest;
+import org.onlab.onos.store.service.BatchWriteResult;
 import org.onlab.onos.store.service.DatabaseAdminService;
 import org.onlab.onos.store.service.DatabaseException;
 import org.onlab.onos.store.service.DatabaseService;
-import org.onlab.onos.store.service.NoSuchTableException;
-import org.onlab.onos.store.service.OptimisticLockException;
-import org.onlab.onos.store.service.OptionalResult;
-import org.onlab.onos.store.service.ReadRequest;
 import org.onlab.onos.store.service.ReadResult;
-import org.onlab.onos.store.service.WriteAborted;
-import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.ReadStatus;
+import org.onlab.onos.store.service.VersionedValue;
 import org.onlab.onos.store.service.WriteResult;
+import org.onlab.onos.store.service.WriteStatus;
 import org.onlab.packet.IpAddress;
 import org.slf4j.Logger;
 
@@ -199,66 +197,121 @@
     }
 
     @Override
-    public ReadResult read(ReadRequest request) {
-        return batchRead(Arrays.asList(request)).get(0).get();
-    }
-
-    @Override
-    public List<OptionalResult<ReadResult, DatabaseException>> batchRead(
-            List<ReadRequest> batch) {
-        List<OptionalResult<ReadResult, DatabaseException>> readResults = new ArrayList<>(batch.size());
-        for (InternalReadResult internalReadResult : client.batchRead(batch)) {
-            if (internalReadResult.status() == InternalReadResult.Status.NO_SUCH_TABLE) {
-                readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
-                        new NoSuchTableException()));
-            } else {
-                readResults.add(new DatabaseOperationResult<ReadResult, DatabaseException>(
-                        internalReadResult.result()));
-            }
+    public VersionedValue get(String tableName, String key) {
+        BatchReadRequest batchRequest = new BatchReadRequest.Builder().get(tableName, key).build();
+        ReadResult readResult = batchRead(batchRequest).getAsList().get(0);
+        if (readResult.status().equals(ReadStatus.OK)) {
+            return readResult.value();
         }
-        return readResults;
+        throw new DatabaseException("get failed due to status: " + readResult.status());
     }
 
     @Override
-    public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
-        return batchWrite(Arrays.asList(request)).get(0);
+    public BatchReadResult batchRead(BatchReadRequest batchRequest) {
+        return new BatchReadResult(client.batchRead(batchRequest));
     }
 
     @Override
-    public WriteResult write(WriteRequest request) {
-//            throws OptimisticLockException, PreconditionFailedException {
-        return writeNothrow(request).get();
+    public BatchWriteResult batchWrite(BatchWriteRequest batchRequest) {
+        return new BatchWriteResult(client.batchWrite(batchRequest));
     }
 
     @Override
-    public List<OptionalResult<WriteResult, DatabaseException>> batchWrite(
-            List<WriteRequest> batch) {
-        List<OptionalResult<WriteResult, DatabaseException>> writeResults = new ArrayList<>(batch.size());
-        for (InternalWriteResult internalWriteResult : client.batchWrite(batch)) {
-            if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new NoSuchTableException()));
-            } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new OptimisticLockException()));
-            } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
-                // TODO: throw a different exception?
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new OptimisticLockException()));
-            } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new WriteAborted()));
-            } else {
-                writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        internalWriteResult.result()));
-            }
+    public VersionedValue put(String tableName, String key, byte[] value) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().put(tableName, key, value).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return writeResult.previousValue();
         }
-        return writeResults;
+        throw new DatabaseException("put failed due to status: " + writeResult.status());
+    }
 
+    @Override
+    public boolean putIfAbsent(String tableName, String key, byte[] value) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfAbsent(tableName, key, value).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("putIfAbsent failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean putIfVersionMatches(String tableName, String key,
+            byte[] value, long version) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfVersionMatches(tableName, key, value, version).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("putIfVersionMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean putIfValueMatches(String tableName, String key,
+            byte[] oldValue, byte[] newValue) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().putIfValueMatches(tableName, key, oldValue, newValue).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("putIfValueMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public VersionedValue remove(String tableName, String key) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().remove(tableName, key).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return writeResult.previousValue();
+        }
+        throw new DatabaseException("remove failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean removeIfVersionMatches(String tableName, String key,
+            long version) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfVersionMatches(tableName, key, version).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("removeIfVersionMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public boolean removeIfValueMatches(String tableName, String key,
+            byte[] value) {
+        BatchWriteRequest batchRequest = new BatchWriteRequest.Builder().removeIfValueMatches(tableName, key, value).build();
+        WriteResult writeResult = batchWrite(batchRequest).getAsList().get(0);
+        if (writeResult.status().equals(WriteStatus.OK)) {
+            return true;
+        } else if (writeResult.status().equals(WriteStatus.PRECONDITION_VIOLATION)) {
+            return false;
+        }
+        throw new DatabaseException("removeIfValueMatches failed due to status: " + writeResult.status());
+    }
+
+    @Override
+    public void addMember(final ControllerNode node) {
+        final TcpMember tcpMember = new TcpMember(node.ip().toString(),
+                                                  node.tcpPort());
+        log.info("{} was added to the cluster", tcpMember);
+        synchronized (clusterConfig) {
+            clusterConfig.addRemoteMember(tcpMember);
+        }
     }
 
     private final class InternalClusterEventListener
-            implements ClusterEventListener {
+    implements ClusterEventListener {
 
         @Override
         public void event(ClusterEvent event) {
@@ -266,7 +319,7 @@
 
             final ControllerNode node = event.subject();
             final TcpMember tcpMember = new TcpMember(node.ip().toString(),
-                                                      node.tcpPort());
+                    node.tcpPort());
 
             switch (event.type()) {
             case INSTANCE_ACTIVATED:
@@ -284,8 +337,8 @@
             case INSTANCE_REMOVED:
                 if (autoAddMember) {
                     Set<DefaultControllerNode> members
-                        = tabletMembers.getOrDefault(DEFAULT_TABLET,
-                                                     Collections.emptySet());
+                    = tabletMembers.getOrDefault(DEFAULT_TABLET,
+                            Collections.emptySet());
                     // remove only if not the initial members
                     if (!members.contains(node)) {
                         synchronized (clusterConfig) {
@@ -308,63 +361,6 @@
 
     }
 
-    public static final class KryoRegisteredInMemoryLog extends InMemoryLog {
-        public KryoRegisteredInMemoryLog() {
-            super();
-            // required to deserialize object across bundles
-            super.kryo.register(TcpMember.class, new TcpMemberSerializer());
-            super.kryo.register(TcpClusterConfig.class, new TcpClusterConfigSerializer());
-        }
-    }
-
-    private class DatabaseOperationResult<R, E extends DatabaseException> implements OptionalResult<R, E> {
-
-        private final R result;
-        private final DatabaseException exception;
-
-        public DatabaseOperationResult(R result) {
-            this.result = result;
-            this.exception = null;
-        }
-
-        public DatabaseOperationResult(DatabaseException exception) {
-            this.result = null;
-            this.exception = exception;
-        }
-
-        @Override
-        public R get() {
-            if (result != null) {
-                return result;
-            }
-            throw exception;
-        }
-
-        @Override
-        public boolean hasValidResult() {
-            return result != null;
-        }
-
-        @Override
-        public String toString() {
-            if (result != null) {
-                return result.toString();
-            } else {
-                return exception.toString();
-            }
-        }
-    }
-
-    @Override
-    public void addMember(final ControllerNode node) {
-        final TcpMember tcpMember = new TcpMember(node.ip().toString(),
-                                                  node.tcpPort());
-        log.info("{} was added to the cluster", tcpMember);
-        synchronized (clusterConfig) {
-            clusterConfig.addRemoteMember(tcpMember);
-        }
-    }
-
     @Override
     public void removeMember(final ControllerNode node) {
         final TcpMember tcpMember = new TcpMember(node.ip().toString(),