1. Refactored ConsistentMap and StorageServive (renamed from DatabaseService) to api bundle.
2. Misc bug fixes uncovered during testing
Change-Id: I1219c5264831bcfa93565f764511f89de35a949d
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
similarity index 98%
rename from core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
rename to core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index 4d7f33b..cf90ab0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -1,4 +1,4 @@
-package org.onosproject.store.consistent.impl;
+package org.onosproject.store.service;
import java.util.Collection;
import java.util.List;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
similarity index 91%
rename from core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
rename to core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
index d98e5a3..2ba4dee 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapException.java
@@ -1,4 +1,4 @@
-package org.onosproject.store.consistent.impl;
+package org.onosproject.store.service;
/**
* Top level exception for ConsistentMap failures.
diff --git a/core/api/src/main/java/org/onosproject/store/service/Serializer.java b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
new file mode 100644
index 0000000..f43090f
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/Serializer.java
@@ -0,0 +1,22 @@
+package org.onosproject.store.service;
+
+/**
+ * Interface for serialization for store artifacts.
+ */
+public interface Serializer {
+ /**
+ * Serialize the specified object.
+ * @param object object to serialize.
+ * @return serialized bytes.
+ * @param <T> encoded type
+ */
+ <T> byte[] encode(T object);
+
+ /**
+ * Deserialize the specified bytes.
+ * @param bytes byte array to deserialize.
+ * @return deserialized object.
+ * @param <T> decoded type
+ */
+ <T> T decode(byte[] bytes);
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
new file mode 100644
index 0000000..cfae271
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -0,0 +1,27 @@
+package org.onosproject.store.service;
+
+/**
+ * Storage service.
+ * <p>
+ * This service provides operations for creating key-value stores.
+ * One can chose to create key-value stores with varying properties such
+ * as strongly consistent vs eventually consistent, durable vs volatile.
+ * <p>
+ * Various store implementations should leverage the data structures provided
+ * by this service
+ */
+public interface StorageService {
+
+ /**
+ * Creates a ConsistentMap.
+ *
+ * @param name map name
+ * @param serializer serializer to use for serializing keys and values.
+ * @return consistent map.
+ * @param <K> key type
+ * @param <V> value type
+ */
+ <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer);
+
+ // TODO: add API for creating Eventually Consistent Map.
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java b/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
similarity index 98%
rename from core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
rename to core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
index 11df520..224efbe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
+++ b/core/api/src/main/java/org/onosproject/store/service/UpdateOperation.java
@@ -1,4 +1,4 @@
-package org.onosproject.store.consistent.impl;
+package org.onosproject.store.service;
import static com.google.common.base.Preconditions.*;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
similarity index 94%
rename from core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
rename to core/api/src/main/java/org/onosproject/store/service/Versioned.java
index 6eb908d..2026437 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
@@ -1,4 +1,4 @@
-package org.onosproject.store.consistent.impl;
+package org.onosproject.store.service;
import com.google.common.base.MoreObjects;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
index 1c7ef7f..569cf95 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
@@ -2,7 +2,6 @@
import static com.google.common.base.Preconditions.*;
-import java.util.AbstractMap;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -15,8 +14,13 @@
import java.util.stream.Collectors;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.HexString;
-import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -33,7 +37,7 @@
private final String name;
private final DatabaseProxy<String, byte[]> proxy;
- private final StoreSerializer serializer;
+ private final Serializer serializer;
private static final int OPERATION_TIMEOUT_MILLIS = 1000;
private static final String ERROR_NULL_KEY = "Key cannot be null";
@@ -55,7 +59,7 @@
ConsistentMapImpl(String name,
DatabaseProxy<String, byte[]> proxy,
- StoreSerializer serializer) {
+ Serializer serializer) {
this.name = checkNotNull(name, "map name cannot be null");
this.proxy = checkNotNull(proxy, "database proxy cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
@@ -87,14 +91,15 @@
public Versioned<V> get(K key) {
checkNotNull(key, ERROR_NULL_KEY);
Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
- return new Versioned<>(serializer.decode(value.value()), value.version());
+ return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
}
@Override
public Versioned<V> put(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- Versioned<byte[]> previousValue = complete(proxy.get(name, keyCache.getUnchecked(key)));
+ Versioned<byte[]> previousValue =
+ complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
return (previousValue != null) ?
new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
@@ -103,7 +108,7 @@
@Override
public Versioned<V> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
- Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
+ Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
}
@@ -198,7 +203,7 @@
}
private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
- return new AbstractMap.SimpleEntry<>(
+ return Pair.of(
dK(e.getKey()),
new Versioned<>(
serializer.decode(e.getValue().value()),
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 0fcb895..311cfe9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -22,7 +22,9 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import com.google.common.collect.Sets;
@@ -32,7 +34,7 @@
*/
@Component(immediate = true, enabled = true)
@Service
-public class DatabaseManager implements DatabaseService {
+public class DatabaseManager implements StorageService {
private final Logger log = getLogger(getClass());
private PartitionedDatabase partitionedDatabase;
@@ -44,7 +46,7 @@
protected ClusterService clusterService;
protected String nodeToUri(ControllerNode node) {
- return "tcp://" + node.ip() + ":" + COPYCAT_TCP_PORT;
+ return String.format("tcp://%s:%d", node.ip(), COPYCAT_TCP_PORT);
}
@Activate
@@ -76,7 +78,17 @@
String localNodeUri = nodeToUri(clusterService.getLocalNode());
ClusterConfig clusterConfig = new ClusterConfig()
- .withProtocol(new NettyTcpProtocol())
+ .withProtocol(new NettyTcpProtocol()
+ .withSsl(false)
+ .withConnectTimeout(60000)
+ .withAcceptBacklog(1024)
+ .withTrafficClass(-1)
+ .withSoLinger(-1)
+ .withReceiveBufferSize(32768)
+ .withSendBufferSize(8192)
+ .withThreads(1))
+ .withElectionTimeout(300)
+ .withHeartbeatInterval(150)
.withMembers(activeNodeUris)
.withLocalMember(localNodeUri);
@@ -85,8 +97,15 @@
partitionMap.forEach((name, nodes) -> {
Set<String> replicas = nodes.stream().map(this::nodeToUri).collect(Collectors.toSet());
DatabaseConfig partitionConfig = new DatabaseConfig()
+ .withElectionTimeout(300)
+ .withHeartbeatInterval(150)
.withConsistency(Consistency.STRONG)
- .withLog(new FileLog(logDir))
+ .withLog(new FileLog()
+ .withDirectory(logDir)
+ .withSegmentSize(1073741824) // 1GB
+ .withFlushOnWrite(true)
+ .withSegmentInterval(Long.MAX_VALUE))
+ .withDefaultSerializer(new DatabaseSerializer())
.withReplicas(replicas);
databaseConfig.addPartition(name, partitionConfig);
});
@@ -116,7 +135,7 @@
}
@Override
- public <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer) {
+ public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
return new ConsistentMapImpl<K, V>(name, partitionedDatabase, serializer);
}
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index f83e8f8..23aa0be 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -6,6 +6,9 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
/**
* Database proxy.
*/
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
new file mode 100644
index 0000000..0563361
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -0,0 +1,74 @@
+package org.onosproject.store.consistent.impl;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.Versioned;
+
+import net.kuujo.copycat.cluster.internal.MemberInfo;
+import net.kuujo.copycat.protocol.rpc.AppendRequest;
+import net.kuujo.copycat.protocol.rpc.AppendResponse;
+import net.kuujo.copycat.protocol.rpc.CommitRequest;
+import net.kuujo.copycat.protocol.rpc.CommitResponse;
+import net.kuujo.copycat.protocol.rpc.PollRequest;
+import net.kuujo.copycat.protocol.rpc.PollResponse;
+import net.kuujo.copycat.protocol.rpc.QueryRequest;
+import net.kuujo.copycat.protocol.rpc.QueryResponse;
+import net.kuujo.copycat.protocol.rpc.ReplicaInfo;
+import net.kuujo.copycat.protocol.rpc.SyncRequest;
+import net.kuujo.copycat.protocol.rpc.SyncResponse;
+import net.kuujo.copycat.util.serializer.SerializerConfig;
+
+/**
+ * Serializer for DatabaseManager's interaction with Copycat.
+ */
+public class DatabaseSerializer extends SerializerConfig {
+
+ private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+ .nextId(KryoNamespace.FLOATING_ID)
+ .register(AppendRequest.class)
+ .register(AppendResponse.class)
+ .register(SyncRequest.class)
+ .register(SyncResponse.class)
+ .register(PollRequest.class)
+ .register(PollResponse.class)
+ .register(QueryRequest.class)
+ .register(QueryResponse.class)
+ .register(CommitRequest.class)
+ .register(CommitResponse.class)
+ .register(ReplicaInfo.class)
+ .register(MemberInfo.class)
+ .build();
+
+ private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
+ .nextId(KryoNamespace.FLOATING_ID)
+ .register(Versioned.class)
+ .register(Pair.class)
+ .register(ImmutablePair.class)
+ .build();
+
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .register(COPYCAT)
+ .register(ONOS_STORE)
+ .build();
+ }
+ };
+
+ @Override
+ public ByteBuffer writeObject(Object object) {
+ return ByteBuffer.wrap(SERIALIZER.encode(object));
+ }
+
+ @Override
+ public <T> T readObject(ByteBuffer buffer) {
+ return SERIALIZER.decode(buffer);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
deleted file mode 100644
index 2ae418b..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.serializers.StoreSerializer;
-
-/**
- * Database service.
- */
-public interface DatabaseService {
-
- /**
- * Creates a ConsistentMap.
- *
- * @param <K> Key type
- * @param <V> value type
- * @param name map name
- * @param serializer serializer to use for serializing keys and values.
- * @return consistent map.
- */
- <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer);
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 89a51e8..1baf146 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -5,6 +5,9 @@
import java.util.Map.Entry;
import java.util.Set;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
import net.kuujo.copycat.state.Command;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.Query;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 837f3b4..0ddaab6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -13,6 +13,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
/**
* Default database.
*/
@@ -132,7 +135,7 @@
return runStartupTasks()
.thenCompose(v -> stateMachine.open())
.thenRun(() -> {
- this.proxy = stateMachine.createProxy(DatabaseProxy.class);
+ this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
})
.thenApply(v -> null);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 2b20f53..452f80e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -6,8 +6,16 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.stream.Collectors;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.StateContext;
@@ -88,17 +96,21 @@
@Override
public Set<K> keySet(String tableName) {
- return getTableMap(tableName).keySet();
+ return ImmutableSet.copyOf(getTableMap(tableName).keySet());
}
@Override
public Collection<Versioned<V>> values(String tableName) {
- return getTableMap(tableName).values();
+ return ImmutableList.copyOf(getTableMap(tableName).values());
}
@Override
public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
- return getTableMap(tableName).entrySet();
+ return ImmutableSet.copyOf(getTableMap(tableName)
+ .entrySet()
+ .stream()
+ .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
+ .collect(Collectors.toSet()));
}
@Override
@@ -110,7 +122,7 @@
@Override
public boolean remove(String tableName, K key, V value) {
Versioned<V> existing = getTableMap(tableName).get(key);
- if (existing != null && existing.value().equals(value)) {
+ if (existing != null && checkEquality(existing.value(), value)) {
getTableMap(tableName).remove(key);
return true;
}
@@ -130,7 +142,7 @@
@Override
public boolean replace(String tableName, K key, V oldValue, V newValue) {
Versioned<V> existing = getTableMap(tableName).get(key);
- if (existing != null && existing.value().equals(oldValue)) {
+ if (existing != null && checkEquality(existing.value(), oldValue)) {
put(tableName, key, newValue);
return true;
}
@@ -198,11 +210,11 @@
case PUT_IF_VERSION_MATCH:
return existingEntry != null && existingEntry.version() == update.currentVersion();
case PUT_IF_VALUE_MATCH:
- return existingEntry != null && existingEntry.value().equals(update.currentValue());
+ return existingEntry != null && checkEquality(existingEntry.value(), update.currentValue());
case REMOVE_IF_VERSION_MATCH:
return existingEntry == null || existingEntry.version() == update.currentVersion();
case REMOVE_IF_VALUE_MATCH:
- return existingEntry == null || existingEntry.value().equals(update.currentValue());
+ return existingEntry == null || checkEquality(existingEntry.value(), update.currentValue());
default:
throw new IllegalStateException("Unsupported type: " + update.type());
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 8cf1703..72f8c04 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -9,6 +9,10 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -112,7 +116,7 @@
return CompletableFuture.allOf(partitions
.values()
.stream()
- .map(p -> p.values(tableName))
+ .map(p -> p.values(tableName).thenApply(values::addAll))
.toArray(CompletableFuture[]::new))
.thenApply(v -> values);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
index 81f00de..d6db75d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
@@ -67,13 +67,14 @@
CopycatConfig copycatConfig = new CopycatConfig()
.withName(name)
.withClusterConfig(clusterConfig)
+ .withDefaultSerializer(new DatabaseSerializer())
.withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
partitionedDatabase.registerPartition(partitionName ,
coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
- .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
+ .withSerializer(copycatConfig.getDefaultSerializer())
.withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
partitionedDatabase.setPartitioner(
new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));