Refactor how PartitionedAsyncConsistentMaps are constructed to remove the need for serialization when mapping keys to partitions.
Change-Id: Ia0d43f2adf66a3b7e9f6bee312e8271ec98ee4af
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index ea8d075..22a59d7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -20,7 +20,9 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
+import org.onlab.util.HexString;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.service.AsyncAtomicCounter;
@@ -61,14 +63,20 @@
public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
checkNotNull(name);
checkNotNull(serializer);
- Map<PartitionId, AsyncConsistentMap<K, V>> maps =
+ Map<PartitionId, AsyncConsistentMap<String, byte[]>> maps =
Maps.transformValues(members,
- partition -> partition.newAsyncConsistentMap(name, serializer));
- Hasher<K> hasher = key -> {
- int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt();
+ partition -> partition.newAsyncConsistentMap(name, null));
+ HashFunction hashFunction = Hashing.goodFastHash(32);
+ Hasher<String> hasher = key -> {
+ int hashCode = hashFunction.hashUnencodedChars(key).asInt();
return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
};
- return new PartitionedAsyncConsistentMap<>(name, maps, hasher);
+ AsyncConsistentMap<String, byte[]> partitionedMap = new PartitionedAsyncConsistentMap<>(name, maps, hasher);
+ return DistributedPrimitives.newTranscodingMap(partitionedMap,
+ key -> HexString.toHexString(serializer.encode(key)),
+ string -> serializer.decode(HexString.fromHexString(string)),
+ value -> value == null ? null : serializer.encode(value),
+ bytes -> serializer.decode(bytes));
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index b55efc4..6679ea2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -100,8 +100,9 @@
}
@Override
+ @SuppressWarnings("unchecked")
public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
- AtomixConsistentMap atomixConsistentMap =
+ AtomixConsistentMap rawMap =
new AtomixConsistentMap(client.newProxyBuilder()
.withName(name)
.withServiceType(DistributedPrimitive.Type.CONSISTENT_MAP.name())
@@ -113,27 +114,20 @@
.open()
.join());
- AsyncConsistentMap<String, byte[]> rawMap =
- new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
- @Override
- public String name() {
- return name;
- }
- };
-
- // We have to ensure serialization is done on the Copycat threads since Kryo is not thread safe.
- AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.newTranscodingMap(rawMap,
- key -> HexString.toHexString(serializer.encode(key)),
- string -> serializer.decode(HexString.fromHexString(string)),
- value -> value == null ? null : serializer.encode(value),
- bytes -> serializer.decode(bytes));
-
- return transcodedMap;
+ if (serializer != null) {
+ return DistributedPrimitives.newTranscodingMap(rawMap,
+ key -> HexString.toHexString(serializer.encode(key)),
+ string -> serializer.decode(HexString.fromHexString(string)),
+ value -> value == null ? null : serializer.encode(value),
+ bytes -> serializer.decode(bytes));
+ }
+ return (AsyncConsistentMap<K, V>) rawMap;
}
@Override
+ @SuppressWarnings("unchecked")
public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
- AtomixConsistentTreeMap atomixConsistentTreeMap =
+ AtomixConsistentTreeMap rawMap =
new AtomixConsistentTreeMap(client.newProxyBuilder()
.withName(name)
.withServiceType(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name())
@@ -145,26 +139,19 @@
.open()
.join());
- AsyncConsistentTreeMap<byte[]> rawMap =
- new DelegatingAsyncConsistentTreeMap<byte[]>(atomixConsistentTreeMap) {
- @Override
- public String name() {
- return name;
- }
- };
-
- AsyncConsistentTreeMap<V> transcodedMap =
- DistributedPrimitives.<V, byte[]>newTranscodingTreeMap(
- rawMap,
- value -> value == null ? null : serializer.encode(value),
- bytes -> serializer.decode(bytes));
-
- return transcodedMap;
+ if (serializer != null) {
+ return DistributedPrimitives.newTranscodingTreeMap(
+ rawMap,
+ value -> value == null ? null : serializer.encode(value),
+ bytes -> serializer.decode(bytes));
+ }
+ return (AsyncConsistentTreeMap<V>) rawMap;
}
@Override
+ @SuppressWarnings("unchecked")
public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(String name, Serializer serializer) {
- AtomixConsistentSetMultimap atomixConsistentSetMultimap =
+ AtomixConsistentSetMultimap rawMap =
new AtomixConsistentSetMultimap(client.newProxyBuilder()
.withName(name)
.withServiceType(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name())
@@ -176,24 +163,15 @@
.open()
.join());
- AsyncConsistentMultimap<String, byte[]> rawMap =
- new DelegatingAsyncConsistentMultimap<String, byte[]>(
- atomixConsistentSetMultimap) {
- @Override
- public String name() {
- return super.name();
- }
- };
-
- AsyncConsistentMultimap<K, V> transcodedMap =
- DistributedPrimitives.newTranscodingMultimap(
- rawMap,
- key -> HexString.toHexString(serializer.encode(key)),
- string -> serializer.decode(HexString.fromHexString(string)),
- value -> serializer.encode(value),
- bytes -> serializer.decode(bytes));
-
- return transcodedMap;
+ if (serializer != null) {
+ return DistributedPrimitives.newTranscodingMultimap(
+ rawMap,
+ key -> HexString.toHexString(serializer.encode(key)),
+ string -> serializer.decode(HexString.fromHexString(string)),
+ value -> serializer.encode(value),
+ bytes -> serializer.decode(bytes));
+ }
+ return (AsyncConsistentMultimap<K, V>) rawMap;
}
@Override
@@ -202,8 +180,9 @@
}
@Override
+ @SuppressWarnings("unchecked")
public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
- AtomixAtomicCounterMap atomixAtomicCounterMap = new AtomixAtomicCounterMap(client.newProxyBuilder()
+ AtomixAtomicCounterMap rawMap = new AtomixAtomicCounterMap(client.newProxyBuilder()
.withName(name)
.withServiceType(DistributedPrimitive.Type.COUNTER_MAP.name())
.withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
@@ -214,13 +193,13 @@
.open()
.join());
- AsyncAtomicCounterMap<K> transcodedMap =
- DistributedPrimitives.newTranscodingAtomicCounterMap(
- atomixAtomicCounterMap,
- key -> HexString.toHexString(serializer.encode(key)),
- string -> serializer.decode(HexString.fromHexString(string)));
-
- return transcodedMap;
+ if (serializer != null) {
+ return DistributedPrimitives.newTranscodingAtomicCounterMap(
+ rawMap,
+ key -> HexString.toHexString(serializer.encode(key)),
+ string -> serializer.decode(HexString.fromHexString(string)));
+ }
+ return (AsyncAtomicCounterMap<K>) rawMap;
}
@Override