1. Refactored ConsistentMap and StorageServive (renamed from DatabaseService) to api bundle.
2. Misc bug fixes uncovered during testing
Change-Id: I1219c5264831bcfa93565f764511f89de35a949d
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
deleted file mode 100644
index 4d7f33b..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMap.java
+++ /dev/null
@@ -1,200 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.Map.Entry;
-
-/**
- * A distributed, strongly consistent map.
- * <p>
- * This map offers strong read-after-update (where update == create/update/delete)
- * consistency. All operations to the map are serialized and applied in a consistent
- * manner.
- * <p>
- * The stronger consistency comes at the expense of availability in
- * the event of a network partition. A network partition can be either due to
- * a temporary disruption in network connectivity between participating nodes
- * or due to a node being temporarily down.
- * </p><p>
- * All values stored in this map are versioned and the API supports optimistic
- * concurrency by allowing conditional updates that take into consideration
- * the version or value that was previously read.
- * </p><p>
- * The map also supports atomic batch updates (transactions). One can provide a list
- * of updates to be applied atomically if and only if all the operations are guaranteed
- * to succeed i.e. all their preconditions are met. For example, the precondition
- * for a putIfAbsent API call is absence of a mapping for the key. Similarly, the
- * precondition for a conditional replace operation is the presence of an expected
- * version or value
- * </p><p>
- * This map does not allow null values. All methods can throw a ConsistentMapException
- * (which extends RuntimeException) to indicate failures.
- *
- */
-public interface ConsistentMap<K, V> {
-
- /**
- * Returns the number of entries in the map.
- *
- * @return map size.
- */
- int size();
-
- /**
- * Returns true if the map is empty.
- *
- * @return true if map has no entries, false otherwise.
- */
- boolean isEmpty();
-
- /**
- * Returns true if this map contains a mapping for the specified key.
- *
- * @param key key
- * @return true if map contains key, false otherwise.
- */
- boolean containsKey(K key);
-
- /**
- * Returns true if this map contains the specified value.
- *
- * @param value value
- * @return true if map contains value, false otherwise.
- */
- boolean containsValue(V value);
-
- /**
- * Returns the value (and version) to which the specified key is mapped, or null if this
- * map contains no mapping for the key.
- *
- * @param key the key whose associated value (and version) is to be returned
- * @return the value (and version) to which the specified key is mapped, or null if
- * this map contains no mapping for the key
- */
- Versioned<V> get(K key);
-
- /**
- * Associates the specified value with the specified key in this map (optional operation).
- * If the map previously contained a mapping for the key, the old value is replaced by the
- * specified value.
- *
- * @param key key with which the specified value is to be associated
- * @param value value to be associated with the specified key
- * @return the previous value (and version) associated with key, or null if there was
- * no mapping for key.
- */
- Versioned<V> put(K key, V value);
-
- /**
- * Removes the mapping for a key from this map if it is present (optional operation).
- *
- * @param key key whose value is to be removed from the map
- * @return the value (and version) to which this map previously associated the key,
- * or null if the map contained no mapping for the key.
- */
- Versioned<V> remove(K key);
-
- /**
- * Removes all of the mappings from this map (optional operation).
- * The map will be empty after this call returns.
- */
- void clear();
-
- /**
- * Returns a Set view of the keys contained in this map.
- * This method differs from the behavior of java.util.Map.keySet() in that
- * what is returned is a unmodifiable snapshot view of the keys in the ConsistentMap.
- * Attempts to modify the returned set, whether direct or via its iterator,
- * result in an UnsupportedOperationException.
- *
- * @return a set of the keys contained in this map
- */
- Set<K> keySet();
-
- /**
- * Returns the collection of values (and associated versions) contained in this map.
- * This method differs from the behavior of java.util.Map.values() in that
- * what is returned is a unmodifiable snapshot view of the values in the ConsistentMap.
- * Attempts to modify the returned collection, whether direct or via its iterator,
- * result in an UnsupportedOperationException.
- *
- * @return a collection of the values (and associated versions) contained in this map
- */
- Collection<Versioned<V>> values();
-
- /**
- * Returns the set of entries contained in this map.
- * This method differs from the behavior of java.util.Map.entrySet() in that
- * what is returned is a unmodifiable snapshot view of the entries in the ConsistentMap.
- * Attempts to modify the returned set, whether direct or via its iterator,
- * result in an UnsupportedOperationException.
- *
- * @return set of entries contained in this map.
- */
- Set<Entry<K, Versioned<V>>> entrySet();
-
- /**
- * If the specified key is not already associated with a value
- * associates it with the given value and returns null, else returns the current value.
- *
- * @param key key with which the specified value is to be associated
- * @param value value to be associated with the specified key
- * @return the previous value associated with the specified key or null
- * if key does not already mapped to a value.
- */
- Versioned<V> putIfAbsent(K key, V value);
-
- /**
- * Removes the entry for the specified key only if it is currently
- * mapped to the specified value.
- *
- * @param key key with which the specified value is associated
- * @param value value expected to be associated with the specified key
- * @return true if the value was removed
- */
- boolean remove(K key, V value);
-
- /**
- * Removes the entry for the specified key only if its current
- * version in the map is equal to the specified version.
- *
- * @param key key with which the specified version is associated
- * @param version version expected to be associated with the specified key
- * @return true if the value was removed
- */
- boolean remove(K key, long version);
-
- /**
- * Replaces the entry for the specified key only if currently mapped
- * to the specified value.
- *
- * @param key key with which the specified value is associated
- * @param oldValue value expected to be associated with the specified key
- * @param newValue value to be associated with the specified key
- * @return true if the value was replaced
- */
- boolean replace(K key, V oldValue, V newValue);
-
- /**
- * Replaces the entry for the specified key only if it is currently mapped to the
- * specified version.
- *
- * @param key key key with which the specified value is associated
- * @param oldVersion version expected to be associated with the specified key
- * @param newValue value to be associated with the specified key
- * @return true if the value was replaced
- */
- boolean replace(K key, long oldVersion, V newValue);
-
- /**
- * Atomically apply the specified list of updates to the map.
- * If any of the updates cannot be applied due to a precondition
- * violation, none of the updates will be applied and the state of
- * the map remains unaltered.
- *
- * @param updates list of updates to apply atomically.
- * @return true if the map was updated.
- */
- boolean batchUpdate(List<UpdateOperation<K, V>> updates);
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
deleted file mode 100644
index d98e5a3..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-/**
- * Top level exception for ConsistentMap failures.
- */
-@SuppressWarnings("serial")
-public class ConsistentMapException extends RuntimeException {
- public ConsistentMapException() {
- }
-
- public ConsistentMapException(Throwable t) {
- super(t);
- }
-
- /**
- * ConsistentMap operation timeout.
- */
- public static class Timeout extends ConsistentMapException {
- }
-
- /**
- * ConsistentMap operation interrupted.
- */
- public static class Interrupted extends ConsistentMapException {
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
index 1c7ef7f..569cf95 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/ConsistentMapImpl.java
@@ -2,7 +2,6 @@
import static com.google.common.base.Preconditions.*;
-import java.util.AbstractMap;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -15,8 +14,13 @@
import java.util.stream.Collectors;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.HexString;
-import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
@@ -33,7 +37,7 @@
private final String name;
private final DatabaseProxy<String, byte[]> proxy;
- private final StoreSerializer serializer;
+ private final Serializer serializer;
private static final int OPERATION_TIMEOUT_MILLIS = 1000;
private static final String ERROR_NULL_KEY = "Key cannot be null";
@@ -55,7 +59,7 @@
ConsistentMapImpl(String name,
DatabaseProxy<String, byte[]> proxy,
- StoreSerializer serializer) {
+ Serializer serializer) {
this.name = checkNotNull(name, "map name cannot be null");
this.proxy = checkNotNull(proxy, "database proxy cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
@@ -87,14 +91,15 @@
public Versioned<V> get(K key) {
checkNotNull(key, ERROR_NULL_KEY);
Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
- return new Versioned<>(serializer.decode(value.value()), value.version());
+ return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
}
@Override
public Versioned<V> put(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- Versioned<byte[]> previousValue = complete(proxy.get(name, keyCache.getUnchecked(key)));
+ Versioned<byte[]> previousValue =
+ complete(proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value)));
return (previousValue != null) ?
new Versioned<>(serializer.decode(previousValue.value()), previousValue.version()) : null;
@@ -103,7 +108,7 @@
@Override
public Versioned<V> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
- Versioned<byte[]> value = complete(proxy.get(name, keyCache.getUnchecked(key)));
+ Versioned<byte[]> value = complete(proxy.remove(name, keyCache.getUnchecked(key)));
return (value != null) ? new Versioned<>(serializer.decode(value.value()), value.version()) : null;
}
@@ -198,7 +203,7 @@
}
private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
- return new AbstractMap.SimpleEntry<>(
+ return Pair.of(
dK(e.getKey()),
new Versioned<>(
serializer.decode(e.getValue().value()),
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 0fcb895..311cfe9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -22,7 +22,9 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.store.serializers.StoreSerializer;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import com.google.common.collect.Sets;
@@ -32,7 +34,7 @@
*/
@Component(immediate = true, enabled = true)
@Service
-public class DatabaseManager implements DatabaseService {
+public class DatabaseManager implements StorageService {
private final Logger log = getLogger(getClass());
private PartitionedDatabase partitionedDatabase;
@@ -44,7 +46,7 @@
protected ClusterService clusterService;
protected String nodeToUri(ControllerNode node) {
- return "tcp://" + node.ip() + ":" + COPYCAT_TCP_PORT;
+ return String.format("tcp://%s:%d", node.ip(), COPYCAT_TCP_PORT);
}
@Activate
@@ -76,7 +78,17 @@
String localNodeUri = nodeToUri(clusterService.getLocalNode());
ClusterConfig clusterConfig = new ClusterConfig()
- .withProtocol(new NettyTcpProtocol())
+ .withProtocol(new NettyTcpProtocol()
+ .withSsl(false)
+ .withConnectTimeout(60000)
+ .withAcceptBacklog(1024)
+ .withTrafficClass(-1)
+ .withSoLinger(-1)
+ .withReceiveBufferSize(32768)
+ .withSendBufferSize(8192)
+ .withThreads(1))
+ .withElectionTimeout(300)
+ .withHeartbeatInterval(150)
.withMembers(activeNodeUris)
.withLocalMember(localNodeUri);
@@ -85,8 +97,15 @@
partitionMap.forEach((name, nodes) -> {
Set<String> replicas = nodes.stream().map(this::nodeToUri).collect(Collectors.toSet());
DatabaseConfig partitionConfig = new DatabaseConfig()
+ .withElectionTimeout(300)
+ .withHeartbeatInterval(150)
.withConsistency(Consistency.STRONG)
- .withLog(new FileLog(logDir))
+ .withLog(new FileLog()
+ .withDirectory(logDir)
+ .withSegmentSize(1073741824) // 1GB
+ .withFlushOnWrite(true)
+ .withSegmentInterval(Long.MAX_VALUE))
+ .withDefaultSerializer(new DatabaseSerializer())
.withReplicas(replicas);
databaseConfig.addPartition(name, partitionConfig);
});
@@ -116,7 +135,7 @@
}
@Override
- public <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer) {
+ public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
return new ConsistentMapImpl<K, V>(name, partitionedDatabase, serializer);
}
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index f83e8f8..23aa0be 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -6,6 +6,9 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
/**
* Database proxy.
*/
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
new file mode 100644
index 0000000..0563361
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -0,0 +1,74 @@
+package org.onosproject.store.consistent.impl;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.serializers.KryoSerializer;
+import org.onosproject.store.service.Versioned;
+
+import net.kuujo.copycat.cluster.internal.MemberInfo;
+import net.kuujo.copycat.protocol.rpc.AppendRequest;
+import net.kuujo.copycat.protocol.rpc.AppendResponse;
+import net.kuujo.copycat.protocol.rpc.CommitRequest;
+import net.kuujo.copycat.protocol.rpc.CommitResponse;
+import net.kuujo.copycat.protocol.rpc.PollRequest;
+import net.kuujo.copycat.protocol.rpc.PollResponse;
+import net.kuujo.copycat.protocol.rpc.QueryRequest;
+import net.kuujo.copycat.protocol.rpc.QueryResponse;
+import net.kuujo.copycat.protocol.rpc.ReplicaInfo;
+import net.kuujo.copycat.protocol.rpc.SyncRequest;
+import net.kuujo.copycat.protocol.rpc.SyncResponse;
+import net.kuujo.copycat.util.serializer.SerializerConfig;
+
+/**
+ * Serializer for DatabaseManager's interaction with Copycat.
+ */
+public class DatabaseSerializer extends SerializerConfig {
+
+ private static final KryoNamespace COPYCAT = KryoNamespace.newBuilder()
+ .nextId(KryoNamespace.FLOATING_ID)
+ .register(AppendRequest.class)
+ .register(AppendResponse.class)
+ .register(SyncRequest.class)
+ .register(SyncResponse.class)
+ .register(PollRequest.class)
+ .register(PollResponse.class)
+ .register(QueryRequest.class)
+ .register(QueryResponse.class)
+ .register(CommitRequest.class)
+ .register(CommitResponse.class)
+ .register(ReplicaInfo.class)
+ .register(MemberInfo.class)
+ .build();
+
+ private static final KryoNamespace ONOS_STORE = KryoNamespace.newBuilder()
+ .nextId(KryoNamespace.FLOATING_ID)
+ .register(Versioned.class)
+ .register(Pair.class)
+ .register(ImmutablePair.class)
+ .build();
+
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.BASIC)
+ .register(COPYCAT)
+ .register(ONOS_STORE)
+ .build();
+ }
+ };
+
+ @Override
+ public ByteBuffer writeObject(Object object) {
+ return ByteBuffer.wrap(SERIALIZER.encode(object));
+ }
+
+ @Override
+ public <T> T readObject(ByteBuffer buffer) {
+ return SERIALIZER.decode(buffer);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
deleted file mode 100644
index 2ae418b..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseService.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-import org.onosproject.store.serializers.StoreSerializer;
-
-/**
- * Database service.
- */
-public interface DatabaseService {
-
- /**
- * Creates a ConsistentMap.
- *
- * @param <K> Key type
- * @param <V> value type
- * @param name map name
- * @param serializer serializer to use for serializing keys and values.
- * @return consistent map.
- */
- <K, V> ConsistentMap<K , V> createConsistentMap(String name, StoreSerializer serializer);
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 89a51e8..1baf146 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -5,6 +5,9 @@
import java.util.Map.Entry;
import java.util.Set;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
import net.kuujo.copycat.state.Command;
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.Query;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 837f3b4..0ddaab6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -13,6 +13,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
/**
* Default database.
*/
@@ -132,7 +135,7 @@
return runStartupTasks()
.thenCompose(v -> stateMachine.open())
.thenRun(() -> {
- this.proxy = stateMachine.createProxy(DatabaseProxy.class);
+ this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
})
.thenApply(v -> null);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 2b20f53..452f80e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -6,8 +6,16 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.stream.Collectors;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
import net.kuujo.copycat.state.Initializer;
import net.kuujo.copycat.state.StateContext;
@@ -88,17 +96,21 @@
@Override
public Set<K> keySet(String tableName) {
- return getTableMap(tableName).keySet();
+ return ImmutableSet.copyOf(getTableMap(tableName).keySet());
}
@Override
public Collection<Versioned<V>> values(String tableName) {
- return getTableMap(tableName).values();
+ return ImmutableList.copyOf(getTableMap(tableName).values());
}
@Override
public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
- return getTableMap(tableName).entrySet();
+ return ImmutableSet.copyOf(getTableMap(tableName)
+ .entrySet()
+ .stream()
+ .map(entry -> Pair.of(entry.getKey(), entry.getValue()))
+ .collect(Collectors.toSet()));
}
@Override
@@ -110,7 +122,7 @@
@Override
public boolean remove(String tableName, K key, V value) {
Versioned<V> existing = getTableMap(tableName).get(key);
- if (existing != null && existing.value().equals(value)) {
+ if (existing != null && checkEquality(existing.value(), value)) {
getTableMap(tableName).remove(key);
return true;
}
@@ -130,7 +142,7 @@
@Override
public boolean replace(String tableName, K key, V oldValue, V newValue) {
Versioned<V> existing = getTableMap(tableName).get(key);
- if (existing != null && existing.value().equals(oldValue)) {
+ if (existing != null && checkEquality(existing.value(), oldValue)) {
put(tableName, key, newValue);
return true;
}
@@ -198,11 +210,11 @@
case PUT_IF_VERSION_MATCH:
return existingEntry != null && existingEntry.version() == update.currentVersion();
case PUT_IF_VALUE_MATCH:
- return existingEntry != null && existingEntry.value().equals(update.currentValue());
+ return existingEntry != null && checkEquality(existingEntry.value(), update.currentValue());
case REMOVE_IF_VERSION_MATCH:
return existingEntry == null || existingEntry.version() == update.currentVersion();
case REMOVE_IF_VALUE_MATCH:
- return existingEntry == null || existingEntry.value().equals(update.currentValue());
+ return existingEntry == null || checkEquality(existingEntry.value(), update.currentValue());
default:
throw new IllegalStateException("Unsupported type: " + update.type());
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 8cf1703..72f8c04 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -9,6 +9,10 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+
+import org.onosproject.store.service.UpdateOperation;
+import org.onosproject.store.service.Versioned;
+
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -112,7 +116,7 @@
return CompletableFuture.allOf(partitions
.values()
.stream()
- .map(p -> p.values(tableName))
+ .map(p -> p.values(tableName).thenApply(values::addAll))
.toArray(CompletableFuture[]::new))
.thenApply(v -> values);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
index 81f00de..d6db75d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
@@ -67,13 +67,14 @@
CopycatConfig copycatConfig = new CopycatConfig()
.withName(name)
.withClusterConfig(clusterConfig)
+ .withDefaultSerializer(new DatabaseSerializer())
.withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
partitionedDatabase.registerPartition(partitionName ,
coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
- .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
+ .withSerializer(copycatConfig.getDefaultSerializer())
.withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
partitionedDatabase.setPartitioner(
new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
deleted file mode 100644
index 11df520..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
+++ /dev/null
@@ -1,182 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-import static com.google.common.base.Preconditions.*;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Database update operation.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
-public class UpdateOperation<K, V> {
-
- /**
- * Type of database update operation.
- */
- public static enum Type {
- PUT,
- PUT_IF_ABSENT,
- PUT_IF_VERSION_MATCH,
- PUT_IF_VALUE_MATCH,
- REMOVE,
- REMOVE_IF_VERSION_MATCH,
- REMOVE_IF_VALUE_MATCH,
- }
-
- private Type type;
- private String tableName;
- private K key;
- private V value;
- private V currentValue;
- private long currentVersion = -1;
-
- /**
- * Returns the type of update operation.
- * @return type of update.
- */
- public Type type() {
- return type;
- }
-
- /**
- * Returns the tableName being updated.
- * @return table name.
- */
- public String tableName() {
- return tableName;
- }
-
- /**
- * Returns the item key being updated.
- * @return item key
- */
- public K key() {
- return key;
- }
-
- /**
- * Returns the new value.
- * @return item's target value.
- */
- public V value() {
- return value;
- }
-
- /**
- * Returns the expected current value in the database value for the key.
- * @return current value in database.
- */
- public V currentValue() {
- return currentValue;
- }
-
- /**
- * Returns the expected current version in the database for the key.
- * @return expected version.
- */
- public long currentVersion() {
- return currentVersion;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("type", type)
- .add("tableName", tableName)
- .add("key", key)
- .add("value", value)
- .add("currentValue", currentValue)
- .add("currentVersion", currentVersion)
- .toString();
- }
-
- /**
- * Creates a new builder instance.
- * @param <K> key type.
- * @param <V> value type.
- *
- * @return builder.
- */
- public static <K, V> Builder<K, V> newBuilder() {
- return new Builder<>();
- }
-
- /**
- * UpdatOperation builder.
- *
- * @param <K> key type.
- * @param <V> value type.
- */
- public static final class Builder<K, V> {
-
- private UpdateOperation<K, V> operation = new UpdateOperation<>();
-
- public UpdateOperation<K, V> build() {
- validateInputs();
- return operation;
- }
-
- public Builder<K, V> withType(Type type) {
- operation.type = checkNotNull(type, "type cannot be null");
- return this;
- }
-
- public Builder<K, V> withTableName(String tableName) {
- operation.tableName = checkNotNull(tableName, "tableName cannot be null");
- return this;
- }
-
- public Builder<K, V> withKey(K key) {
- operation.key = checkNotNull(key, "key cannot be null");
- return this;
- }
-
- public Builder<K, V> withCurrentValue(V value) {
- operation.currentValue = checkNotNull(value, "currentValue cannot be null");
- return this;
- }
-
- public Builder<K, V> withValue(V value) {
- operation.value = checkNotNull(value, "value cannot be null");
- return this;
- }
-
- public Builder<K, V> withCurrentVersion(long version) {
- checkArgument(version >= 0, "version cannot be negative");
- operation.currentVersion = version;
- return this;
- }
-
- private void validateInputs() {
- checkNotNull(operation.type, "type must be specified");
- checkNotNull(operation.tableName, "table name must be specified");
- checkNotNull(operation.key, "key must be specified");
- switch (operation.type) {
- case PUT:
- case PUT_IF_ABSENT:
- checkNotNull(operation.value, "value must be specified.");
- break;
- case PUT_IF_VERSION_MATCH:
- checkNotNull(operation.value, "value must be specified.");
- checkState(operation.currentVersion >= 0, "current version must be specified");
- break;
- case PUT_IF_VALUE_MATCH:
- checkNotNull(operation.value, "value must be specified.");
- checkNotNull(operation.currentValue, "currentValue must be specified.");
- break;
- case REMOVE:
- break;
- case REMOVE_IF_VERSION_MATCH:
- checkState(operation.currentVersion >= 0, "current version must be specified");
- break;
- case REMOVE_IF_VALUE_MATCH:
- checkNotNull(operation.currentValue, "currentValue must be specified.");
- break;
- default:
- throw new IllegalStateException("Unknown operation type");
- }
- }
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
deleted file mode 100644
index 6eb908d..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package org.onosproject.store.consistent.impl;
-
-import com.google.common.base.MoreObjects;
-
-/**
- * Versioned value.
- *
- * @param <V> value type.
- */
-public class Versioned<V> {
-
- private final V value;
- private final long version;
-
- /**
- * Constructs a new versioned value.
- * @param value value
- * @param version version
- */
- public Versioned(V value, long version) {
- this.value = value;
- this.version = version;
- }
-
- /**
- * Returns the value.
- *
- * @return value.
- */
- public V value() {
- return value;
- }
-
- /**
- * Returns the version.
- *
- * @return version
- */
- public long version() {
- return version;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("value", value)
- .add("version", version)
- .toString();
- }
-}