1. Refactored ConsistentMap and StorageServive (renamed from DatabaseService) to api bundle.
2. Misc bug fixes uncovered during testing

Change-Id: I1219c5264831bcfa93565f764511f89de35a949d
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
deleted file mode 100644
index 4d7f33b..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
+++ /dev/null
@@ -1,200 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.Map.Entry;
-
-/**
- * A distributed, strongly consistent map.
- * <p>
- * This map offers strong read-after-update (where update == create/update/delete)
- * consistency. All operations to the map are serialized and applied in a consistent
- * manner.
- * <p>
- * The stronger consistency comes at the expense of availability in
- * the event of a network partition. A network partition can be either due to
- * a temporary disruption in network connectivity between participating nodes
- * or due to a node being temporarily down.
- * </p><p>
- * All values stored in this map are versioned and the API supports optimistic
- * concurrency by allowing conditional updates that take into consideration
- * the version or value that was previously read.
- * </p><p>
- * The map also supports atomic batch updates (transactions). One can provide a list
- * of updates to be applied atomically if and only if all the operations are guaranteed
- * to succeed i.e. all their preconditions are met. For example, the precondition
- * for a putIfAbsent API call is absence of a mapping for the key. Similarly, the
- * precondition for a conditional replace operation is the presence of an expected
- * version or value
- * </p><p>
- * This map does not allow null values. All methods can throw a ConsistentMapException
- * (which extends RuntimeException) to indicate failures.
- *
- */
-public interface ConsistentMap<K, V> {
-
-    /**
-     * Returns the number of entries in the map.
-     *
-     * @return map size.
-     */
-    int size();
-
-    /**
-     * Returns true if the map is empty.
-     *
-     * @return true if map has no entries, false otherwise.
-     */
-    boolean isEmpty();
-
-    /**
-     * Returns true if this map contains a mapping for the specified key.
-     *
-     * @param key key
-     * @return true if map contains key, false otherwise.
-     */
-    boolean containsKey(K key);
-
-    /**
-     * Returns true if this map contains the specified value.
-     *
-     * @param value value
-     * @return true if map contains value, false otherwise.
-     */
-    boolean containsValue(V value);
-
-    /**
-     * Returns the value (and version) to which the specified key is mapped, or null if this
-     * map contains no mapping for the key.
-     *
-     * @param key the key whose associated value (and version) is to be returned
-     * @return the value (and version) to which the specified key is mapped, or null if
-     * this map contains no mapping for the key
-     */
-    Versioned<V> get(K key);
-
-    /**
-     * Associates the specified value with the specified key in this map (optional operation).
-     * If the map previously contained a mapping for the key, the old value is replaced by the
-     * specified value.
-     *
-     * @param key key with which the specified value is to be associated
-     * @param value value to be associated with the specified key
-     * @return the previous value (and version) associated with key, or null if there was
-     * no mapping for key.
-     */
-    Versioned<V> put(K key, V value);
-
-    /**
-     * Removes the mapping for a key from this map if it is present (optional operation).
-     *
-     * @param key key whose value is to be removed from the map
-     * @return the value (and version) to which this map previously associated the key,
-     * or null if the map contained no mapping for the key.
-     */
-    Versioned<V> remove(K key);
-
-    /**
-     * Removes all of the mappings from this map (optional operation).
-     * The map will be empty after this call returns.
-     */
-    void clear();
-
-    /**
-     * Returns a Set view of the keys contained in this map.
-     * This method differs from the behavior of java.util.Map.keySet() in that
-     * what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
-     * Attempts to modify the returned set, whether direct or via its iterator,
-     * result in an UnsupportedOperationException.
-     *
-     * @return a set of the keys contained in this map
-     */
-    Set<K> keySet();
-
-    /**
-     * Returns the collection of values (and associated versions) contained in this map.
-     * This method differs from the behavior of java.util.Map.values() in that
-     * what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
-     * Attempts to modify the returned collection, whether direct or via its iterator,
-     * result in an UnsupportedOperationException.
-     *
-     * @return a collection of the values (and associated versions) contained in this map
-     */
-    Collection<Versioned<V>> values();
-
-    /**
-     * Returns the set of entries contained in this map.
-     * This method differs from the behavior of java.util.Map.entrySet() in that
-     * what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
-     * Attempts to modify the returned set, whether direct or via its iterator,
-     * result in an UnsupportedOperationException.
-     *
-     * @return set of entries contained in this map.
-     */
-    Set<Entry<K, Versioned<V>>> entrySet();
-
-    /**
-     * If the specified key is not already associated with a value
-     * associates it with the given value and returns null, else returns the current value.
-     *
-     * @param key key with which the specified value is to be associated
-     * @param value value to be associated with the specified key
-     * @return the previous value associated with the specified key or null
-     * if key does not already mapped to a value.
-     */
-    Versioned<V> putIfAbsent(K key, V value);
-
-    /**
-     * Removes the entry for the specified key only if it is currently
-     * mapped to the specified value.
-     *
-     * @param key key with which the specified value is associated
-     * @param value value expected to be associated with the specified key
-     * @return true if the value was removed
-     */
-    boolean remove(K key, V value);
-
-    /**
-     * Removes the entry for the specified key only if its current
-     * version in the map is equal to the specified version.
-     *
-     * @param key key with which the specified version is associated
-     * @param version version expected to be associated with the specified key
-     * @return true if the value was removed
-     */
-    boolean remove(K key, long version);
-
-    /**
-     * Replaces the entry for the specified key only if currently mapped
-     * to the specified value.
-     *
-     * @param key key with which the specified value is associated
-     * @param oldValue value expected to be associated with the specified key
-     * @param newValue value to be associated with the specified key
-     * @return true if the value was replaced
-     */
-    boolean replace(K key, V oldValue, V newValue);
-
-    /**
-     * Replaces the entry for the specified key only if it is currently mapped to the
-     * specified version.
-     *
-     * @param key key key with which the specified value is associated
-     * @param oldVersion version expected to be associated with the specified key
-     * @param newValue value to be associated with the specified key
-     * @return true if the value was replaced
-     */
-    boolean replace(K key, long oldVersion, V newValue);
-
-    /**
-     * Atomically apply the specified list of updates to the map.
-     * If any of the updates cannot be applied due to a precondition
-     * violation, none of the updates will be applied and the state of
-     * the map remains unaltered.
-     *
-     * @param updates list of updates to apply atomically.
-     * @return true if the map was updated.
-     */
-    boolean batchUpdate(List<UpdateOperation<K, V>> updates);
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
deleted file mode 100644
index d98e5a3..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-/**
- * Top level exception for ConsistentMap failures.
- */
-@SuppressWarnings("serial")
-public class ConsistentMapException extends RuntimeException {
-    public ConsistentMapException() {
-    }
-
-    public ConsistentMapException(Throwable t) {
-        super(t);
-    }
-
-    /**
-     * ConsistentMap operation timeout.
-     */
-    public static class Timeout extends ConsistentMapException {
-    }
-
-    /**
-     * ConsistentMap operation interrupted.
-     */
-    public static class Interrupted extends ConsistentMapException {
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
index 1c7ef7f..569cf95 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
@@ -2,7 +2,6 @@
 
 import static com.google.common.base.Preconditions.*;
 
-import java.util.AbstractMap;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -15,8 +14,13 @@
 import java.util.stream.Collectors;
 import java.util.Set;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.HexString;
-import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
 
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -33,7 +37,7 @@
 
     private final String name;
     private final DatabaseProxy<String, byte[]> proxy;
-    private final StoreSerializer serializer;
+    private final Serializer serializer;
 
     private static final int OPERATION_TIMEOUT_MILLIS = 1000;
     private static final String ERROR_NULL_KEY = "Key cannot be null";
@@ -55,7 +59,7 @@
 
     ConsistentMapImpl(String name,
             DatabaseProxy<String, byte[]> proxy,
-            StoreSerializer serializer) {
+            Serializer serializer) {
         this.name = checkNotNull(name, "map name cannot be null");
         this.proxy = checkNotNull(proxy, "database proxy cannot be null");
         this.serializer = checkNotNull(serializer, "serializer cannot be null");
@@ -87,14 +91,15 @@
     public Versioned<V> get(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
         Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
-        return new Versioned<>(serializer.decode(value.value()), value.version());
+        return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
     }
 
     @Override
     public Versioned<V> put(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        Versioned<byte[]> previousValue = complete(proxy.get(name, keyCache.getUnchecked(key)));
+        Versioned<byte[]> previousValue =
+                complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
         return (previousValue != null) ?
                 new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
 
@@ -103,7 +108,7 @@
     @Override
     public Versioned<V> remove(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
-        Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
+        Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
         return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
     }
 
@@ -198,7 +203,7 @@
     }
 
     private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
-        return new AbstractMap.SimpleEntry<>(
+        return Pair.of(
                 dK(e.getKey()),
                 new Versioned<>(
                         serializer.decode(e.getValue().value()),
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 0fcb895..311cfe9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -22,7 +22,9 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
 import com.google.common.collect.Sets;
@@ -32,7 +34,7 @@
  */
 @Component(immediate = true, enabled = true)
 @Service
-public class DatabaseManager implements DatabaseService {
+public class DatabaseManager implements StorageService {
 
     private final Logger log = getLogger(getClass());
     private PartitionedDatabase partitionedDatabase;
@@ -44,7 +46,7 @@
     protected ClusterService clusterService;
 
     protected String nodeToUri(ControllerNode node) {
-        return "tcp://" + node.ip() + ":" + COPYCAT_TCP_PORT;
+        return String.format("tcp://%s:%d", node.ip(), COPYCAT_TCP_PORT);
     }
 
     @Activate
@@ -76,7 +78,17 @@
         String localNodeUri = nodeToUri(clusterService.getLocalNode());
 
         ClusterConfig clusterConfig = new ClusterConfig()
-            .withProtocol(new NettyTcpProtocol())
+            .withProtocol(new NettyTcpProtocol()
+                    .withSsl(false)
+                    .withConnectTimeout(60000)
+                    .withAcceptBacklog(1024)
+                    .withTrafficClass(-1)
+                    .withSoLinger(-1)
+                    .withReceiveBufferSize(32768)
+                    .withSendBufferSize(8192)
+                    .withThreads(1))
+            .withElectionTimeout(300)
+            .withHeartbeatInterval(150)
             .withMembers(activeNodeUris)
             .withLocalMember(localNodeUri);
 
@@ -85,8 +97,15 @@
         partitionMap.forEach((name, nodes) -> {
             Set<String> replicas = nodes.stream().map(this::nodeToUri).collect(Collectors.toSet());
             DatabaseConfig partitionConfig = new DatabaseConfig()
+                            .withElectionTimeout(300)
+                            .withHeartbeatInterval(150)
                             .withConsistency(Consistency.STRONG)
-                            .withLog(new FileLog(logDir))
+                            .withLog(new FileLog()
+                                    .withDirectory(logDir)
+                                    .withSegmentSize(1073741824) // 1GB
+                                    .withFlushOnWrite(true)
+                                    .withSegmentInterval(Long.MAX_VALUE))
+                            .withDefaultSerializer(new DatabaseSerializer())
                             .withReplicas(replicas);
             databaseConfig.addPartition(name, partitionConfig);
         });
@@ -116,7 +135,7 @@
     }
 
     @Override
-    public <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer) {
+    public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
         return new ConsistentMapImpl<K, V>(name, partitionedDatabase, serializer);
     }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index f83e8f8..23aa0be 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -6,6 +6,9 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
 /**
  * Database proxy.
  */
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
new file mode 100644
index 0000000..0563361
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -0,0 +1,74 @@
+package org.onosproject.store.consistent.impl;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.Versioned;
+
+import net.kuujo.copycat.cluster.internal.MemberInfo;
+import net.kuujo.copycat.protocol.rpc.AppendRequest;
+import net.kuujo.copycat.protocol.rpc.AppendResponse;
+import net.kuujo.copycat.protocol.rpc.CommitRequest;
+import net.kuujo.copycat.protocol.rpc.CommitResponse;
+import net.kuujo.copycat.protocol.rpc.PollRequest;
+import net.kuujo.copycat.protocol.rpc.PollResponse;
+import net.kuujo.copycat.protocol.rpc.QueryRequest;
+import net.kuujo.copycat.protocol.rpc.QueryResponse;
+import net.kuujo.copycat.protocol.rpc.ReplicaInfo;
+import net.kuujo.copycat.protocol.rpc.SyncRequest;
+import net.kuujo.copycat.protocol.rpc.SyncResponse;
+import net.kuujo.copycat.util.serializer.SerializerConfig;
+
+/**
+ * Serializer for DatabaseManager's interaction with Copycat.
+ */
+public class DatabaseSerializer extends SerializerConfig {
+
+    private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+            .nextId(KryoNamespace.FLOATING_ID)
+            .register(AppendRequest.class)
+            .register(AppendResponse.class)
+            .register(SyncRequest.class)
+            .register(SyncResponse.class)
+            .register(PollRequest.class)
+            .register(PollResponse.class)
+            .register(QueryRequest.class)
+            .register(QueryResponse.class)
+            .register(CommitRequest.class)
+            .register(CommitResponse.class)
+            .register(ReplicaInfo.class)
+            .register(MemberInfo.class)
+            .build();
+
+    private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
+            .nextId(KryoNamespace.FLOATING_ID)
+            .register(Versioned.class)
+            .register(Pair.class)
+            .register(ImmutablePair.class)
+            .build();
+
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.BASIC)
+                    .register(COPYCAT)
+                    .register(ONOS_STORE)
+                    .build();
+        }
+    };
+
+    @Override
+    public ByteBuffer writeObject(Object object) {
+        return ByteBuffer.wrap(SERIALIZER.encode(object));
+    }
+
+    @Override
+    public <T> T readObject(ByteBuffer buffer) {
+        return SERIALIZER.decode(buffer);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
deleted file mode 100644
index 2ae418b..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.serializers.StoreSerializer;
-
-/**
- * Database service.
- */
-public interface DatabaseService {
-
-    /**
-     * Creates a ConsistentMap.
-     *
-     * @param <K> Key type
-     * @param <V> value type
-     * @param name map name
-     * @param serializer serializer to use for serializing keys and values.
-     * @return consistent map.
-     */
-    <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer);
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 89a51e8..1baf146 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -5,6 +5,9 @@
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
 import net.kuujo.copycat.state.Command;
 import net.kuujo.copycat.state.Initializer;
 import net.kuujo.copycat.state.Query;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 837f3b4..0ddaab6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -13,6 +13,9 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
 /**
  * Default database.
  */
@@ -132,7 +135,7 @@
     return runStartupTasks()
       .thenCompose(v -> stateMachine.open())
       .thenRun(() -> {
-        this.proxy = stateMachine.createProxy(DatabaseProxy.class);
+        this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
       })
       .thenApply(v -> null);
   }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 2b20f53..452f80e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -6,8 +6,16 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 import java.util.Set;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
 import net.kuujo.copycat.state.Initializer;
 import net.kuujo.copycat.state.StateContext;
 
@@ -88,17 +96,21 @@
 
     @Override
     public Set<K> keySet(String tableName) {
-        return getTableMap(tableName).keySet();
+        return ImmutableSet.copyOf(getTableMap(tableName).keySet());
     }
 
     @Override
     public Collection<Versioned<V>> values(String tableName) {
-        return getTableMap(tableName).values();
+        return ImmutableList.copyOf(getTableMap(tableName).values());
     }
 
     @Override
     public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
-        return getTableMap(tableName).entrySet();
+        return ImmutableSet.copyOf(getTableMap(tableName)
+                .entrySet()
+                .stream()
+                .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
+                .collect(Collectors.toSet()));
     }
 
     @Override
@@ -110,7 +122,7 @@
     @Override
     public boolean remove(String tableName, K key, V value) {
         Versioned<V> existing = getTableMap(tableName).get(key);
-        if (existing != null && existing.value().equals(value)) {
+        if (existing != null && checkEquality(existing.value(), value)) {
             getTableMap(tableName).remove(key);
             return true;
         }
@@ -130,7 +142,7 @@
     @Override
     public boolean replace(String tableName, K key, V oldValue, V newValue) {
         Versioned<V> existing = getTableMap(tableName).get(key);
-        if (existing != null && existing.value().equals(oldValue)) {
+        if (existing != null && checkEquality(existing.value(), oldValue)) {
             put(tableName, key, newValue);
             return true;
         }
@@ -198,11 +210,11 @@
         case PUT_IF_VERSION_MATCH:
             return existingEntry != null && existingEntry.version() == update.currentVersion();
         case PUT_IF_VALUE_MATCH:
-            return existingEntry != null && existingEntry.value().equals(update.currentValue());
+            return existingEntry != null && checkEquality(existingEntry.value(), update.currentValue());
         case REMOVE_IF_VERSION_MATCH:
             return existingEntry == null || existingEntry.version() == update.currentVersion();
         case REMOVE_IF_VALUE_MATCH:
-            return existingEntry == null || existingEntry.value().equals(update.currentValue());
+            return existingEntry == null || checkEquality(existingEntry.value(), update.currentValue());
         default:
             throw new IllegalStateException("Unsupported type: " + update.type());
         }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 8cf1703..72f8c04 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -9,6 +9,10 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -112,7 +116,7 @@
         return CompletableFuture.allOf(partitions
                     .values()
                     .stream()
-                    .map(p -> p.values(tableName))
+                    .map(p -> p.values(tableName).thenApply(values::addAll))
                     .toArray(CompletableFuture[]::new))
                 .thenApply(v -> values);
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
index 81f00de..d6db75d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
@@ -67,13 +67,14 @@
         CopycatConfig copycatConfig = new CopycatConfig()
             .withName(name)
             .withClusterConfig(clusterConfig)
+            .withDefaultSerializer(new DatabaseSerializer())
             .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
         ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
         PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
         partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
             partitionedDatabase.registerPartition(partitionName ,
                     coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
-                        .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
+                        .withSerializer(copycatConfig.getDefaultSerializer())
                         .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
         partitionedDatabase.setPartitioner(
                 new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
deleted file mode 100644
index 11df520..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
+++ /dev/null
@@ -1,182 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-import static com.google.common.base.Preconditions.*;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Database update operation.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
-public class UpdateOperation<K, V> {
-
-    /**
-     * Type of database update operation.
-     */
-    public static enum Type {
-        PUT,
-        PUT_IF_ABSENT,
-        PUT_IF_VERSION_MATCH,
-        PUT_IF_VALUE_MATCH,
-        REMOVE,
-        REMOVE_IF_VERSION_MATCH,
-        REMOVE_IF_VALUE_MATCH,
-    }
-
-    private Type type;
-    private String tableName;
-    private K key;
-    private V value;
-    private V currentValue;
-    private long currentVersion = -1;
-
-    /**
-     * Returns the type of update operation.
-     * @return type of update.
-     */
-    public Type type() {
-        return type;
-    }
-
-    /**
-     * Returns the tableName being updated.
-     * @return table name.
-     */
-    public String tableName() {
-        return tableName;
-    }
-
-    /**
-     * Returns the item key being updated.
-     * @return item key
-     */
-    public K key() {
-        return key;
-    }
-
-    /**
-     * Returns the new value.
-     * @return item's target value.
-     */
-    public V value() {
-        return value;
-    }
-
-    /**
-     * Returns the expected current value in the database value for the key.
-     * @return current value in database.
-     */
-    public V currentValue() {
-        return currentValue;
-    }
-
-    /**
-     * Returns the expected current version in the database for the key.
-     * @return expected version.
-     */
-    public long currentVersion() {
-        return currentVersion;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .add("type", type)
-            .add("tableName", tableName)
-            .add("key", key)
-            .add("value", value)
-            .add("currentValue", currentValue)
-            .add("currentVersion", currentVersion)
-            .toString();
-    }
-
-    /**
-     * Creates a new builder instance.
-     * @param <K> key type.
-     * @param <V> value type.
-     *
-     * @return builder.
-     */
-    public static <K, V> Builder<K, V> newBuilder() {
-        return new Builder<>();
-    }
-
-    /**
-     * UpdatOperation builder.
-     *
-     * @param <K> key type.
-     * @param <V> value type.
-     */
-    public static final class Builder<K, V> {
-
-        private UpdateOperation<K, V> operation = new UpdateOperation<>();
-
-        public UpdateOperation<K, V> build() {
-            validateInputs();
-            return operation;
-        }
-
-        public Builder<K, V> withType(Type type) {
-            operation.type = checkNotNull(type, "type cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withTableName(String tableName) {
-            operation.tableName = checkNotNull(tableName, "tableName cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withKey(K key) {
-            operation.key = checkNotNull(key, "key cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withCurrentValue(V value) {
-            operation.currentValue = checkNotNull(value, "currentValue cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withValue(V value) {
-            operation.value = checkNotNull(value, "value cannot be null");
-            return this;
-        }
-
-        public Builder<K, V> withCurrentVersion(long version) {
-            checkArgument(version >= 0, "version cannot be negative");
-            operation.currentVersion = version;
-            return this;
-        }
-
-        private void validateInputs() {
-            checkNotNull(operation.type, "type must be specified");
-            checkNotNull(operation.tableName, "table name must be specified");
-            checkNotNull(operation.key, "key must be specified");
-            switch (operation.type) {
-            case PUT:
-            case PUT_IF_ABSENT:
-                checkNotNull(operation.value, "value must be specified.");
-                break;
-            case PUT_IF_VERSION_MATCH:
-                checkNotNull(operation.value, "value must be specified.");
-                checkState(operation.currentVersion >= 0, "current version must be specified");
-                break;
-            case PUT_IF_VALUE_MATCH:
-                checkNotNull(operation.value, "value must be specified.");
-                checkNotNull(operation.currentValue, "currentValue must be specified.");
-                break;
-            case REMOVE:
-                break;
-            case REMOVE_IF_VERSION_MATCH:
-                checkState(operation.currentVersion >= 0, "current version must be specified");
-                break;
-            case REMOVE_IF_VALUE_MATCH:
-                checkNotNull(operation.currentValue, "currentValue must be specified.");
-                break;
-            default:
-                throw new IllegalStateException("Unknown operation type");
-            }
-        }
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
deleted file mode 100644
index 6eb908d..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Versioned value.
- *
- * @param <V> value type.
- */
-public class Versioned<V> {
-
-    private final V value;
-    private final long version;
-
-    /**
-     * Constructs a new versioned value.
-     * @param value value
-     * @param version version
-     */
-    public Versioned(V value, long version) {
-        this.value = value;
-        this.version = version;
-    }
-
-    /**
-     * Returns the value.
-     *
-     * @return value.
-     */
-    public V value() {
-        return value;
-    }
-
-    /**
-     * Returns the version.
-     *
-     * @return version
-     */
-    public long version() {
-        return version;
-    }
-
-    @Override
-    public String toString() {
-        return MoreObjects.toStringHelper(this)
-            .add("value", value)
-            .add("version", version)
-            .toString();
-    }
-}