Add sample accessing database service to Foo

Change-Id: I514c57a278dea368448d284eb5bf0d41bb0013e3
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
index 117a9a0..5f65030 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
@@ -15,6 +15,9 @@
  */
 package org.onlab.onos.foo;
 
+import java.nio.ByteBuffer;
+import java.util.concurrent.ScheduledExecutorService;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -33,9 +36,20 @@
 import org.onlab.onos.net.intent.IntentEvent;
 import org.onlab.onos.net.intent.IntentListener;
 import org.onlab.onos.net.intent.IntentService;
+import org.onlab.onos.store.service.DatabaseAdminService;
+import org.onlab.onos.store.service.DatabaseException;
+import org.onlab.onos.store.service.DatabaseService;
+import org.onlab.onos.store.service.OptionalResult;
+import org.onlab.onos.store.service.PreconditionFailedException;
+import org.onlab.onos.store.service.ReadRequest;
+import org.onlab.onos.store.service.ReadResult;
+import org.onlab.onos.store.service.WriteRequest;
+import org.onlab.onos.store.service.WriteResult;
 import org.slf4j.Logger;
 
+import static org.onlab.util.Tools.namedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
 
 /**
  * Playground app component.
@@ -57,22 +71,42 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MastershipService mastershipService;
 
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
+    protected DatabaseAdminService dbAdminService;
+
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY)
+    protected DatabaseService dbService;
+
     private final ClusterEventListener clusterListener = new InnerClusterListener();
     private final DeviceListener deviceListener = new InnerDeviceListener();
     private final IntentListener intentListener = new InnerIntentListener();
     private final MastershipListener mastershipListener = new InnerMastershipListener();
 
+    private ScheduledExecutorService executor;
+
     @Activate
     public void activate() {
+        executor = newScheduledThreadPool(4, namedThreads("foo-executor-%d"));
+
         clusterService.addListener(clusterListener);
         deviceService.addListener(deviceListener);
         intentService.addListener(intentListener);
         mastershipService.addListener(mastershipListener);
+
+        if (dbService == null || dbAdminService == null) {
+            log.info("Couldn't find DB service");
+        } else {
+            log.info("Found DB service");
+//            longIncrementor();
+//            executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
+//            executor.scheduleAtFixedRate(new LongIncrementor(), 1, 10, TimeUnit.SECONDS);
+        }
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
+        executor.shutdown();
         clusterService.removeListener(clusterListener);
         deviceService.removeListener(deviceListener);
         intentService.removeListener(intentListener);
@@ -122,6 +156,65 @@
             }
         }
     }
+
+    private void longIncrementor() {
+        try {
+            final String someTable = "admin";
+            final String someKey = "long";
+
+            dbAdminService.createTable(someTable);
+
+            ReadResult read = dbService.read(ReadRequest.get(someTable, someKey));
+            if (!read.valueExists()) {
+                ByteBuffer zero = ByteBuffer.allocate(Long.BYTES).putLong(0);
+                try {
+                    dbService.write(WriteRequest
+                                    .putIfAbsent(someTable,
+                                                 someKey,
+                                                 zero.array()));
+                    log.info("Wrote initial value");
+                    read = dbService.read(ReadRequest.get(someTable, someKey));
+                } catch (PreconditionFailedException e) {
+                    log.info("Concurrent write detected.", e);
+
+                    // concurrent write detected, read and fall through
+                    read = dbService.read(ReadRequest.get(someTable, someKey));
+                    if (!read.valueExists()) {
+                        log.error("Shouldn't reach here");
+                    }
+                }
+            }
+            int retry = 5;
+            do {
+                ByteBuffer prev = ByteBuffer.wrap(read.value().value());
+                long next = prev.getLong() + 1;
+                byte[] newValue = ByteBuffer.allocate(Long.BYTES).putLong(next).array();
+                OptionalResult<WriteResult, DatabaseException> result
+                = dbService.writeNothrow(WriteRequest
+                                         .putIfVersionMatches(someTable,
+                                                              someKey,
+                                                              newValue,
+                                                              read.value().version()));
+                if (result.hasValidResult()) {
+                    log.info("Write success {} -> {}", result.get().previousValue(), next);
+                    break;
+                } else {
+                    log.info("Write failed trying to write{}", next);
+                }
+            } while(retry-- > 0);
+        } catch (Exception e) {
+            log.error("Exception thrown", e);
+        }
+    }
+
+    private final class LongIncrementor implements Runnable {
+
+        @Override
+        public void run() {
+            longIncrementor();
+        }
+
+    }
 }
 
 
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java
index 11cc5ff..cf0ef0a 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseService.java
@@ -24,13 +24,24 @@
      */
     List<OptionalResult<ReadResult, DatabaseException>> batchRead(List<ReadRequest> batch);
 
+    // FIXME Give me a better name
     /**
      * Performs a write operation on the database.
      * @param request write request
      * @return write result.
      * @throws DatabaseException if there is failure in execution write.
      */
-    WriteResult write(WriteRequest request);
+    OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request);
+
+    /**
+     * Performs a write operation on the database.
+     * @param request write request
+     * @return write result.
+     * @throws OptimisticLockException FIXME define conditional failure
+     * @throws PreconditionFailedException FIXME define conditional failure
+     * @throws DatabaseException if there is failure in execution write.
+     */
+    WriteResult write(WriteRequest request)/* throws OptimisticLockException, PreconditionFailedException*/;
 
     /**
      * Performs a batch write operation on the database.
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java b/core/api/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java
index 9eda528..8a631a0 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/PreconditionFailedException.java
@@ -1,6 +1,5 @@
 package org.onlab.onos.store.service;
 
-
 /**
  * Exception that indicates a precondition failure.
  * Scenarios that can cause this exception:
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java b/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
index 6d28fc2..49e9665 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/ReadResult.java
@@ -35,6 +35,15 @@
     }
 
     /**
+     * Returns true if database table contained value for the key.
+     *
+     * @return true if database table contained value for the key
+     */
+    public boolean valueExists() {
+        return value != null;
+    }
+
+    /**
      * Returns value associated with the key.
      * @return non-null value if the table contains one, null otherwise.
      */
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java b/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
index dcf22e9..fb736e5 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/WriteRequest.java
@@ -1,6 +1,8 @@
 package org.onlab.onos.store.service;
 
 import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.onos.store.service.WriteRequest.Type.*;
 
 import java.util.Objects;
 
@@ -11,8 +13,13 @@
  */
 public class WriteRequest {
 
+    public static final int ANY_VERSION = -1;
+
     private final String tableName;
     private final String key;
+
+    private final Type type;
+
     private final byte[] newValue;
     private final long previousVersion;
     private final byte[] oldValue;
@@ -23,22 +30,22 @@
      *
      * @param tableName name of the table
      * @param key       key in the table
-     * @param newValue  value to write
+     * @param newValue  value to write, must not be null
      * @return WriteRequest
      */
     public static WriteRequest put(String tableName, String key,
                                    byte[] newValue) {
-        return new WriteRequest(tableName, key, newValue, -1, null);
+        return new WriteRequest(PUT, tableName, key,
+                                checkNotNull(newValue), ANY_VERSION, null);
     }
 
-    // FIXME: Is there a special version value to realize putIfAbsent?
     /**
      * Creates a write request, which will
      * put the specified value to the table if the previous version matches.
      *
      * @param tableName name of the table
      * @param key       key in the table
-     * @param newValue  value to write
+     * @param newValue  value to write, must not be null
      * @param previousVersion previous version expected
      * @return WriteRequest
      */
@@ -46,37 +53,107 @@
                                                    byte[] newValue,
                                                    long previousVersion) {
         checkArgument(previousVersion >= 0);
-        return new WriteRequest(tableName, key, newValue, previousVersion, null);
+        return new WriteRequest(PUT_IF_VERSION, tableName, key,
+                                checkNotNull(newValue), previousVersion, null);
     }
 
-    // FIXME: What is the behavior of oldValue=null? putIfAbsent?
     /**
      * Creates a write request, which will
      * put the specified value to the table if the previous value matches.
      *
      * @param tableName name of the table
      * @param key       key in the table
-     * @param newValue  value to write
-     * @param oldValue  previous value expected
+     * @param newValue  value to write, must not be null
+     * @param oldValue  previous value expected, must not be null
      * @return WriteRequest
      */
     public static WriteRequest putIfValueMatches(String tableName, String key,
                                                  byte[] newValue,
                                                  byte[] oldValue) {
-        return new WriteRequest(tableName, key, newValue, -1, oldValue);
+        return new WriteRequest(PUT_IF_VALUE, tableName, key,
+                                checkNotNull(newValue), ANY_VERSION,
+                                checkNotNull(oldValue));
     }
 
-    // FIXME: How do we remove value? newValue=null?
+    /**
+     * Creates a write request, which will
+     * put the specified value to the table if the previous value does not exist.
+     *
+     * @param tableName name of the table
+     * @param key       key in the table
+     * @param newValue  value to write, must not be null
+     * @return WriteRequest
+     */
+    public static WriteRequest putIfAbsent(String tableName, String key,
+                                           byte[] newValue) {
+        return new WriteRequest(PUT_IF_ABSENT, tableName, key,
+                                checkNotNull(newValue), ANY_VERSION, null);
+    }
+
+    /**
+     * Creates a write request, which will
+     * remove the specified entry from the table regardless of the previous value.
+     *
+     * @param tableName name of the table
+     * @param key       key in the table
+     * @return WriteRequest
+     */
+    public static WriteRequest remove(String tableName, String key) {
+        return new WriteRequest(REMOVE, tableName, key,
+                                null, ANY_VERSION, null);
+    }
+
+    /**
+     * Creates a write request, which will
+     * remove the specified entry from the table if the previous version matches.
+     *
+     * @param tableName name of the table
+     * @param key       key in the table
+     * @param previousVersion previous version expected
+     * @return WriteRequest
+     */
+    public static WriteRequest remove(String tableName, String key,
+                                      long previousVersion) {
+        return new WriteRequest(REMOVE_IF_VALUE, tableName, key,
+                                null, previousVersion, null);
+    }
+
+    /**
+     * Creates a write request, which will
+     * remove the specified entry from the table if the previous value matches.
+     *
+     * @param tableName name of the table
+     * @param key       key in the table
+     * @param oldValue  previous value expected, must not be null
+     * @return WriteRequest
+     */
+    public static WriteRequest remove(String tableName, String key,
+                                      byte[] oldValue) {
+        return new WriteRequest(Type.REMOVE_IF_VALUE, tableName, key,
+                                null, ANY_VERSION, checkNotNull(oldValue));
+    }
+
+    public enum Type {
+        PUT,
+        PUT_IF_VERSION,
+        PUT_IF_VALUE,
+        PUT_IF_ABSENT,
+        REMOVE,
+        REMOVE_IF_VERSION,
+        REMOVE_IF_VALUE,
+    }
 
     // hidden constructor
-    protected WriteRequest(String tableName, String key, byte[] newValue, long previousVersion, byte[] oldValue) {
+    protected WriteRequest(Type type, String tableName, String key,
+                           byte[] newValue,
+                           long previousVersion, byte[] oldValue) {
 
-        checkArgument(tableName != null);
-        checkArgument(key != null);
-        checkArgument(newValue != null);
+        checkNotNull(tableName);
+        checkNotNull(key);
 
         this.tableName = tableName;
         this.key = key;
+        this.type = type;
         this.newValue = newValue;
         this.previousVersion = previousVersion;
         this.oldValue = oldValue;
@@ -90,6 +167,10 @@
         return key;
     }
 
+    public WriteRequest.Type type() {
+        return type;
+    }
+
     public byte[] newValue() {
         return newValue;
     }
@@ -105,6 +186,7 @@
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
+                .add("type", type)
                 .add("tableName", tableName)
                 .add("key", key)
                 .add("newValue", newValue)
@@ -113,9 +195,10 @@
                 .toString();
     }
 
+    // TODO: revisit hashCode, equals condition
     @Override
     public int hashCode() {
-        return Objects.hash(key, tableName, previousVersion);
+        return Objects.hash(type, key, tableName, previousVersion);
     }
 
     @Override
@@ -130,7 +213,8 @@
             return false;
         }
         WriteRequest other = (WriteRequest) obj;
-        return Objects.equals(this.key, other.key) &&
+        return Objects.equals(this.type, other.type) &&
+                Objects.equals(this.key, other.key) &&
                 Objects.equals(this.tableName, other.tableName) &&
                 Objects.equals(this.previousVersion, other.previousVersion);
     }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
index 56dba79..9a792bb 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/ClusterMessagingProtocol.java
@@ -105,6 +105,7 @@
     private static final KryoNamespace DATABASE = KryoNamespace.newBuilder()
             .register(ReadRequest.class)
             .register(WriteRequest.class)
+            .register(WriteRequest.Type.class)
             .register(InternalReadResult.class)
             .register(InternalWriteResult.class)
             .register(InternalReadResult.Status.class)
@@ -135,6 +136,7 @@
                     byte[].class)
             .build();
 
+    // serializer used for CopyCat Protocol
     public static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
         protected void setupKryoPool() {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index 2779b35..421f9cc 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -2,6 +2,7 @@
 
 import static org.slf4j.LoggerFactory.getLogger;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -57,7 +58,7 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DatabaseProtocolService copycatMessagingProtocol;
 
-    public static final String LOG_FILE_PREFIX = "onos-copy-cat-log";
+    public static final String LOG_FILE_PREFIX = "/tmp/onos-copy-cat-log";
 
     private Copycat copycat;
     private DatabaseClient client;
@@ -126,9 +127,11 @@
 
 
         StateMachine stateMachine = new DatabaseStateMachine();
-        // FIXME resolve Chronicle + OSGi issue
+        // Chronicle + OSGi issue
         //Log consensusLog = new ChronicleLog(LOG_FILE_PREFIX + "_" + thisNode.id());
-        Log consensusLog = new KryoRegisteredInMemoryLog();
+        //Log consensusLog = new KryoRegisteredInMemoryLog();
+        Log consensusLog = new MapDBLog(new File(LOG_FILE_PREFIX + localNode.id()),
+                                        ClusterMessagingProtocol.SERIALIZER);
 
         copycat = new Copycat(stateMachine, consensusLog, cluster, copycatMessagingProtocol);
         copycat.start();
@@ -187,8 +190,14 @@
     }
 
     @Override
-    public WriteResult write(WriteRequest request) {
-        return batchWrite(Arrays.asList(request)).get(0).get();
+    public OptionalResult<WriteResult, DatabaseException> writeNothrow(WriteRequest request) {
+        return batchWrite(Arrays.asList(request)).get(0);
+    }
+
+    @Override
+    public WriteResult write(WriteRequest request)
+            throws OptimisticLockException, PreconditionFailedException {
+        return writeNothrow(request).get();
     }
 
     @Override
@@ -199,13 +208,13 @@
             if (internalWriteResult.status() == InternalWriteResult.Status.NO_SUCH_TABLE) {
                 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
                         new NoSuchTableException()));
-            } else if (internalWriteResult.status() == InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE) {
+            } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH) {
                 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
                         new OptimisticLockException()));
             } else if (internalWriteResult.status() == InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH) {
                 // TODO: throw a different exception?
                 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
-                        new PreconditionFailedException()));
+                        new OptimisticLockException()));
             } else if (internalWriteResult.status() == InternalWriteResult.Status.ABORTED) {
                 writeResults.add(new DatabaseOperationResult<WriteResult, DatabaseException>(
                         new WriteAborted()));
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 9662976..9c323f0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -3,8 +3,10 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+
 import net.kuujo.copycat.Command;
 import net.kuujo.copycat.Query;
 import net.kuujo.copycat.StateMachine;
@@ -15,6 +17,7 @@
 import org.onlab.onos.store.service.VersionedValue;
 import org.onlab.onos.store.service.WriteRequest;
 import org.onlab.onos.store.service.WriteResult;
+import org.onlab.onos.store.service.impl.InternalWriteResult.Status;
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
@@ -32,6 +35,7 @@
 
     private final Logger log = getLogger(getClass());
 
+    // serializer used for snapshot
     public static final KryoSerializer SERIALIZER = new KryoSerializer() {
         @Override
         protected void setupKryoPool() {
@@ -87,6 +91,37 @@
         return results;
     }
 
+    InternalWriteResult.Status checkIfApplicable(WriteRequest request,
+                                                 VersionedValue value) {
+
+        switch (request.type()) {
+        case PUT:
+            return InternalWriteResult.Status.OK;
+
+        case PUT_IF_ABSENT:
+            if (value == null) {
+                return InternalWriteResult.Status.OK;
+            }
+            return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+        case PUT_IF_VALUE:
+        case REMOVE_IF_VALUE:
+            if (value != null && Arrays.equals(value.value(), request.oldValue())) {
+                return InternalWriteResult.Status.OK;
+            }
+            return InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH;
+        case PUT_IF_VERSION:
+        case REMOVE_IF_VERSION:
+            if (value != null && request.previousVersion() == value.version()) {
+                return InternalWriteResult.Status.OK;
+            }
+            return InternalWriteResult.Status.PREVIOUS_VERSION_MISMATCH;
+        case REMOVE:
+            return InternalWriteResult.Status.OK;
+        }
+        log.error("Should never reach here {}", request);
+        return InternalWriteResult.Status.ABORTED;
+    }
+
     @Command
     public List<InternalWriteResult> write(List<WriteRequest> requests) {
 
@@ -100,25 +135,12 @@
                 abort = true;
                 continue;
             }
-            VersionedValue value = table.get(request.key());
-            if (value == null) {
-                if (request.oldValue() != null) {
-                    validationResults.add(InternalWriteResult.Status.PREVIOUS_VALUE_MISMATCH);
-                    abort = true;
-                    continue;
-                } else if (request.previousVersion() >= 0) {
-                    validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
-                    abort = true;
-                    continue;
-                }
-            }
-            if (request.previousVersion() >= 0 && value.version() != request.previousVersion()) {
-                validationResults.add(InternalWriteResult.Status.OPTIMISTIC_LOCK_FAILURE);
+            final VersionedValue value = table.get(request.key());
+            Status result = checkIfApplicable(request, value);
+            validationResults.add(result);
+            if (result != Status.OK) {
                 abort = true;
-                continue;
             }
-
-            validationResults.add(InternalWriteResult.Status.OK);
         }
 
         List<InternalWriteResult> results = new ArrayList<>(requests.size());
@@ -126,6 +148,7 @@
         if (abort) {
             for (InternalWriteResult.Status validationResult : validationResults) {
                 if (validationResult == InternalWriteResult.Status.OK) {
+                    // aborted due to applicability check failure on other request
                     results.add(new InternalWriteResult(InternalWriteResult.Status.ABORTED, null));
                 } else {
                     results.add(new InternalWriteResult(validationResult, null));
@@ -141,12 +164,31 @@
             // synchronization scope is wrong.
             // Whole function including applicability check needs to be protected.
             // Confirm copycat's thread safety requirement for StateMachine
+            // TODO: If we need isolation, we need to block reads also
             synchronized (table) {
-                VersionedValue previousValue =
-                        table.put(request.key(), new VersionedValue(request.newValue(), state.nextVersion()));
-                results.add(new InternalWriteResult(
-                        InternalWriteResult.Status.OK,
-                        new WriteResult(request.tableName(), request.key(), previousValue)));
+                switch (request.type()) {
+                case PUT:
+                case PUT_IF_ABSENT:
+                case PUT_IF_VALUE:
+                case PUT_IF_VERSION:
+                    VersionedValue newValue = new VersionedValue(request.newValue(), state.nextVersion());
+                    VersionedValue previousValue = table.put(request.key(), newValue);
+                    WriteResult putResult = new WriteResult(request.tableName(), request.key(), previousValue);
+                    results.add(InternalWriteResult.ok(putResult));
+                    break;
+
+                case REMOVE:
+                case REMOVE_IF_VALUE:
+                case REMOVE_IF_VERSION:
+                    VersionedValue removedValue = table.remove(request.key());
+                    WriteResult removeResult = new WriteResult(request.tableName(), request.key(), removedValue);
+                    results.add(InternalWriteResult.ok(removeResult));
+                    break;
+
+                default:
+                    log.error("Invalid WriteRequest type {}", request.type());
+                    break;
+                }
             }
         }
         return results;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java
index aadbcfb..d016ba1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalReadResult.java
@@ -1,11 +1,14 @@
 package org.onlab.onos.store.service.impl;
 
+import java.io.Serializable;
+
 import org.onlab.onos.store.service.ReadResult;
 
 /**
  * Result of a read operation executed on the DatabaseStateMachine.
  */
-public class InternalReadResult {
+@SuppressWarnings("serial")
+public class InternalReadResult implements Serializable {
 
     public enum Status {
         OK,
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java
index d757dfb..ba714af 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/InternalWriteResult.java
@@ -11,13 +11,17 @@
         OK,
         ABORTED,
         NO_SUCH_TABLE,
-        OPTIMISTIC_LOCK_FAILURE,
+        PREVIOUS_VERSION_MISMATCH,
         PREVIOUS_VALUE_MISMATCH
     }
 
     private final Status status;
     private final WriteResult result;
 
+    public static InternalWriteResult ok(WriteResult result) {
+        return new InternalWriteResult(Status.OK, result);
+    }
+
     public InternalWriteResult(Status status, WriteResult result) {
         this.status = status;
         this.result = result;