1. Refactored ConsistentMap and StorageServive (renamed from DatabaseService) to api bundle.
2. Misc bug fixes uncovered during testing

Change-Id: I1219c5264831bcfa93565f764511f89de35a949d
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
similarity index 98%
rename from core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
rename to core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 4d7f33b..cf90ab0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -1,4 +1,4 @@
-package org.onosproject.store.consistent.impl;
+package org.onosproject.store.service;
 
 import java.util.Collection;
 import java.util.List;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
similarity index 91%
rename from core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
rename to core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
index d98e5a3..2ba4dee 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
@@ -1,4 +1,4 @@
-package org.onosproject.store.consistent.impl;
+package org.onosproject.store.service;
 
 /**
  * Top level exception for ConsistentMap failures.
diff --git a/core/api/src/main/java/org/onosproject/store/service/Serializer.java b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
new file mode 100644
index 0000000..f43090f
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
@@ -0,0 +1,22 @@
+package org.onosproject.store.service;
+
+/**
+ * Interface for serialization for store artifacts.
+ */
+public interface Serializer {
+    /**
+     * Serialize the specified object.
+     * @param object object to serialize.
+     * @return serialized bytes.
+     * @param <T> encoded type
+     */
+    <T> byte[] encode(T object);
+
+    /**
+     * Deserialize the specified bytes.
+     * @param bytes byte array to deserialize.
+     * @return deserialized object.
+     * @param <T> decoded type
+     */
+    <T> T decode(byte[] bytes);
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
new file mode 100644
index 0000000..cfae271
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -0,0 +1,27 @@
+package org.onosproject.store.service;
+
+/**
+ * Storage service.
+ * <p>
+ * This service provides operations for creating key-value stores.
+ * One can chose to create key-value stores with varying properties such
+ * as strongly consistent vs eventually consistent, durable vs volatile.
+ * <p>
+ * Various store implementations should leverage the data structures provided
+ * by this service
+ */
+public interface StorageService {
+
+    /**
+     * Creates a ConsistentMap.
+     *
+     * @param name map name
+     * @param serializer serializer to use for serializing keys and values.
+     * @return consistent map.
+     * @param <K> key type
+     * @param <V> value type
+     */
+    <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer);
+
+    // TODO: add API for creating Eventually Consistent Map.
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java b/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
similarity index 98%
rename from core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
rename to core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
index 11df520..224efbe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
+++ b/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
@@ -1,4 +1,4 @@
-package org.onosproject.store.consistent.impl;
+package org.onosproject.store.service;
 
 import static com.google.common.base.Preconditions.*;
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
similarity index 94%
rename from core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
rename to core/api/src/main/java/org/onosproject/store/service/Versioned.java
index 6eb908d..2026437 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
@@ -1,4 +1,4 @@
-package org.onosproject.store.consistent.impl;
+package org.onosproject.store.service;
 
 import com.google.common.base.MoreObjects;
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
index 1c7ef7f..569cf95 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
@@ -2,7 +2,6 @@
 
 import static com.google.common.base.Preconditions.*;
 
-import java.util.AbstractMap;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -15,8 +14,13 @@
 import java.util.stream.Collectors;
 import java.util.Set;
 
+import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.HexString;
-import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
 
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -33,7 +37,7 @@
 
     private final String name;
     private final DatabaseProxy<String, byte[]> proxy;
-    private final StoreSerializer serializer;
+    private final Serializer serializer;
 
     private static final int OPERATION_TIMEOUT_MILLIS = 1000;
     private static final String ERROR_NULL_KEY = "Key cannot be null";
@@ -55,7 +59,7 @@
 
     ConsistentMapImpl(String name,
             DatabaseProxy<String, byte[]> proxy,
-            StoreSerializer serializer) {
+            Serializer serializer) {
         this.name = checkNotNull(name, "map name cannot be null");
         this.proxy = checkNotNull(proxy, "database proxy cannot be null");
         this.serializer = checkNotNull(serializer, "serializer cannot be null");
@@ -87,14 +91,15 @@
     public Versioned<V> get(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
         Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
-        return new Versioned<>(serializer.decode(value.value()), value.version());
+        return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
     }
 
     @Override
     public Versioned<V> put(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        Versioned<byte[]> previousValue = complete(proxy.get(name, keyCache.getUnchecked(key)));
+        Versioned<byte[]> previousValue =
+                complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
         return (previousValue != null) ?
                 new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
 
@@ -103,7 +108,7 @@
     @Override
     public Versioned<V> remove(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
-        Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
+        Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
         return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
     }
 
@@ -198,7 +203,7 @@
     }
 
     private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
-        return new AbstractMap.SimpleEntry<>(
+        return Pair.of(
                 dK(e.getKey()),
                 new Versioned<>(
                         serializer.decode(e.getValue().value()),
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 0fcb895..311cfe9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -22,7 +22,9 @@
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.slf4j.Logger;
 
 import com.google.common.collect.Sets;
@@ -32,7 +34,7 @@
  */
 @Component(immediate = true, enabled = true)
 @Service
-public class DatabaseManager implements DatabaseService {
+public class DatabaseManager implements StorageService {
 
     private final Logger log = getLogger(getClass());
     private PartitionedDatabase partitionedDatabase;
@@ -44,7 +46,7 @@
     protected ClusterService clusterService;
 
     protected String nodeToUri(ControllerNode node) {
-        return "tcp://" + node.ip() + ":" + COPYCAT_TCP_PORT;
+        return String.format("tcp://%s:%d", node.ip(), COPYCAT_TCP_PORT);
     }
 
     @Activate
@@ -76,7 +78,17 @@
         String localNodeUri = nodeToUri(clusterService.getLocalNode());
 
         ClusterConfig clusterConfig = new ClusterConfig()
-            .withProtocol(new NettyTcpProtocol())
+            .withProtocol(new NettyTcpProtocol()
+                    .withSsl(false)
+                    .withConnectTimeout(60000)
+                    .withAcceptBacklog(1024)
+                    .withTrafficClass(-1)
+                    .withSoLinger(-1)
+                    .withReceiveBufferSize(32768)
+                    .withSendBufferSize(8192)
+                    .withThreads(1))
+            .withElectionTimeout(300)
+            .withHeartbeatInterval(150)
             .withMembers(activeNodeUris)
             .withLocalMember(localNodeUri);
 
@@ -85,8 +97,15 @@
         partitionMap.forEach((name, nodes) -> {
             Set<String> replicas = nodes.stream().map(this::nodeToUri).collect(Collectors.toSet());
             DatabaseConfig partitionConfig = new DatabaseConfig()
+                            .withElectionTimeout(300)
+                            .withHeartbeatInterval(150)
                             .withConsistency(Consistency.STRONG)
-                            .withLog(new FileLog(logDir))
+                            .withLog(new FileLog()
+                                    .withDirectory(logDir)
+                                    .withSegmentSize(1073741824) // 1GB
+                                    .withFlushOnWrite(true)
+                                    .withSegmentInterval(Long.MAX_VALUE))
+                            .withDefaultSerializer(new DatabaseSerializer())
                             .withReplicas(replicas);
             databaseConfig.addPartition(name, partitionConfig);
         });
@@ -116,7 +135,7 @@
     }
 
     @Override
-    public <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer) {
+    public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
         return new ConsistentMapImpl<K, V>(name, partitionedDatabase, serializer);
     }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index f83e8f8..23aa0be 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -6,6 +6,9 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
 /**
  * Database proxy.
  */
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
new file mode 100644
index 0000000..0563361
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -0,0 +1,74 @@
+package org.onosproject.store.consistent.impl;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.Versioned;
+
+import net.kuujo.copycat.cluster.internal.MemberInfo;
+import net.kuujo.copycat.protocol.rpc.AppendRequest;
+import net.kuujo.copycat.protocol.rpc.AppendResponse;
+import net.kuujo.copycat.protocol.rpc.CommitRequest;
+import net.kuujo.copycat.protocol.rpc.CommitResponse;
+import net.kuujo.copycat.protocol.rpc.PollRequest;
+import net.kuujo.copycat.protocol.rpc.PollResponse;
+import net.kuujo.copycat.protocol.rpc.QueryRequest;
+import net.kuujo.copycat.protocol.rpc.QueryResponse;
+import net.kuujo.copycat.protocol.rpc.ReplicaInfo;
+import net.kuujo.copycat.protocol.rpc.SyncRequest;
+import net.kuujo.copycat.protocol.rpc.SyncResponse;
+import net.kuujo.copycat.util.serializer.SerializerConfig;
+
+/**
+ * Serializer for DatabaseManager's interaction with Copycat.
+ */
+public class DatabaseSerializer extends SerializerConfig {
+
+    private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+            .nextId(KryoNamespace.FLOATING_ID)
+            .register(AppendRequest.class)
+            .register(AppendResponse.class)
+            .register(SyncRequest.class)
+            .register(SyncResponse.class)
+            .register(PollRequest.class)
+            .register(PollResponse.class)
+            .register(QueryRequest.class)
+            .register(QueryResponse.class)
+            .register(CommitRequest.class)
+            .register(CommitResponse.class)
+            .register(ReplicaInfo.class)
+            .register(MemberInfo.class)
+            .build();
+
+    private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
+            .nextId(KryoNamespace.FLOATING_ID)
+            .register(Versioned.class)
+            .register(Pair.class)
+            .register(ImmutablePair.class)
+            .build();
+
+    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
+        protected void setupKryoPool() {
+            serializerPool = KryoNamespace.newBuilder()
+                    .register(KryoNamespaces.BASIC)
+                    .register(COPYCAT)
+                    .register(ONOS_STORE)
+                    .build();
+        }
+    };
+
+    @Override
+    public ByteBuffer writeObject(Object object) {
+        return ByteBuffer.wrap(SERIALIZER.encode(object));
+    }
+
+    @Override
+    public <T> T readObject(ByteBuffer buffer) {
+        return SERIALIZER.decode(buffer);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
deleted file mode 100644
index 2ae418b..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.serializers.StoreSerializer;
-
-/**
- * Database service.
- */
-public interface DatabaseService {
-
-    /**
-     * Creates a ConsistentMap.
-     *
-     * @param <K> Key type
-     * @param <V> value type
-     * @param name map name
-     * @param serializer serializer to use for serializing keys and values.
-     * @return consistent map.
-     */
-    <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer);
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 89a51e8..1baf146 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -5,6 +5,9 @@
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
 import net.kuujo.copycat.state.Command;
 import net.kuujo.copycat.state.Initializer;
 import net.kuujo.copycat.state.Query;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 837f3b4..0ddaab6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -13,6 +13,9 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
 /**
  * Default database.
  */
@@ -132,7 +135,7 @@
     return runStartupTasks()
       .thenCompose(v -> stateMachine.open())
       .thenRun(() -> {
-        this.proxy = stateMachine.createProxy(DatabaseProxy.class);
+        this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
       })
       .thenApply(v -> null);
   }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 2b20f53..452f80e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -6,8 +6,16 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 import java.util.Set;
 
+import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
 import net.kuujo.copycat.state.Initializer;
 import net.kuujo.copycat.state.StateContext;
 
@@ -88,17 +96,21 @@
 
     @Override
     public Set<K> keySet(String tableName) {
-        return getTableMap(tableName).keySet();
+        return ImmutableSet.copyOf(getTableMap(tableName).keySet());
     }
 
     @Override
     public Collection<Versioned<V>> values(String tableName) {
-        return getTableMap(tableName).values();
+        return ImmutableList.copyOf(getTableMap(tableName).values());
     }
 
     @Override
     public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
-        return getTableMap(tableName).entrySet();
+        return ImmutableSet.copyOf(getTableMap(tableName)
+                .entrySet()
+                .stream()
+                .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
+                .collect(Collectors.toSet()));
     }
 
     @Override
@@ -110,7 +122,7 @@
     @Override
     public boolean remove(String tableName, K key, V value) {
         Versioned<V> existing = getTableMap(tableName).get(key);
-        if (existing != null && existing.value().equals(value)) {
+        if (existing != null && checkEquality(existing.value(), value)) {
             getTableMap(tableName).remove(key);
             return true;
         }
@@ -130,7 +142,7 @@
     @Override
     public boolean replace(String tableName, K key, V oldValue, V newValue) {
         Versioned<V> existing = getTableMap(tableName).get(key);
-        if (existing != null && existing.value().equals(oldValue)) {
+        if (existing != null && checkEquality(existing.value(), oldValue)) {
             put(tableName, key, newValue);
             return true;
         }
@@ -198,11 +210,11 @@
         case PUT_IF_VERSION_MATCH:
             return existingEntry != null && existingEntry.version() == update.currentVersion();
         case PUT_IF_VALUE_MATCH:
-            return existingEntry != null && existingEntry.value().equals(update.currentValue());
+            return existingEntry != null && checkEquality(existingEntry.value(), update.currentValue());
         case REMOVE_IF_VERSION_MATCH:
             return existingEntry == null || existingEntry.version() == update.currentVersion();
         case REMOVE_IF_VALUE_MATCH:
-            return existingEntry == null || existingEntry.value().equals(update.currentValue());
+            return existingEntry == null || checkEquality(existingEntry.value(), update.currentValue());
         default:
             throw new IllegalStateException("Unsupported type: " + update.type());
         }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 8cf1703..72f8c04 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -9,6 +9,10 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -112,7 +116,7 @@
         return CompletableFuture.allOf(partitions
                     .values()
                     .stream()
-                    .map(p -> p.values(tableName))
+                    .map(p -> p.values(tableName).thenApply(values::addAll))
                     .toArray(CompletableFuture[]::new))
                 .thenApply(v -> values);
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
index 81f00de..d6db75d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
@@ -67,13 +67,14 @@
         CopycatConfig copycatConfig = new CopycatConfig()
             .withName(name)
             .withClusterConfig(clusterConfig)
+            .withDefaultSerializer(new DatabaseSerializer())
             .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
         ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
         PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
         partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
             partitionedDatabase.registerPartition(partitionName ,
                     coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
-                        .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
+                        .withSerializer(copycatConfig.getDefaultSerializer())
                         .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
         partitionedDatabase.setPartitioner(
                 new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));