Refactor how PartitionedAsyncConsistentMaps are constructed to remove the need for serialization when mapping keys to partitions.

Change-Id: Ia0d43f2adf66a3b7e9f6bee312e8271ec98ee4af
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index ea8d075..22a59d7 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -20,7 +20,9 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hashing;
+import org.onlab.util.HexString;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
 import org.onosproject.store.service.AsyncAtomicCounter;
@@ -61,14 +63,20 @@
     public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
         checkNotNull(name);
         checkNotNull(serializer);
-        Map<PartitionId, AsyncConsistentMap<K, V>> maps =
+        Map<PartitionId, AsyncConsistentMap<String, byte[]>> maps =
                 Maps.transformValues(members,
-                                     partition -> partition.newAsyncConsistentMap(name, serializer));
-        Hasher<K> hasher = key -> {
-            int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt();
+                                     partition -> partition.newAsyncConsistentMap(name, null));
+        HashFunction hashFunction = Hashing.goodFastHash(32);
+        Hasher<String> hasher = key -> {
+            int hashCode = hashFunction.hashUnencodedChars(key).asInt();
             return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
         };
-        return new PartitionedAsyncConsistentMap<>(name, maps, hasher);
+        AsyncConsistentMap<String, byte[]> partitionedMap = new PartitionedAsyncConsistentMap<>(name, maps, hasher);
+        return DistributedPrimitives.newTranscodingMap(partitionedMap,
+                key -> HexString.toHexString(serializer.encode(key)),
+                string -> serializer.decode(HexString.fromHexString(string)),
+                value -> value == null ? null : serializer.encode(value),
+                bytes -> serializer.decode(bytes));
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index b55efc4..6679ea2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -100,8 +100,9 @@
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
-        AtomixConsistentMap atomixConsistentMap =
+        AtomixConsistentMap rawMap =
                 new AtomixConsistentMap(client.newProxyBuilder()
                         .withName(name)
                         .withServiceType(DistributedPrimitive.Type.CONSISTENT_MAP.name())
@@ -113,27 +114,20 @@
                         .open()
                         .join());
 
-        AsyncConsistentMap<String, byte[]> rawMap =
-                new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
-                    @Override
-                    public String name() {
-                        return name;
-                    }
-                };
-
-        // We have to ensure serialization is done on the Copycat threads since Kryo is not thread safe.
-        AsyncConsistentMap<K, V> transcodedMap = DistributedPrimitives.newTranscodingMap(rawMap,
-                key -> HexString.toHexString(serializer.encode(key)),
-                string -> serializer.decode(HexString.fromHexString(string)),
-                value -> value == null ? null : serializer.encode(value),
-                bytes -> serializer.decode(bytes));
-
-        return transcodedMap;
+        if (serializer != null) {
+            return DistributedPrimitives.newTranscodingMap(rawMap,
+                    key -> HexString.toHexString(serializer.encode(key)),
+                    string -> serializer.decode(HexString.fromHexString(string)),
+                    value -> value == null ? null : serializer.encode(value),
+                    bytes -> serializer.decode(bytes));
+        }
+        return (AsyncConsistentMap<K, V>) rawMap;
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <V> AsyncConsistentTreeMap<V> newAsyncConsistentTreeMap(String name, Serializer serializer) {
-        AtomixConsistentTreeMap atomixConsistentTreeMap =
+        AtomixConsistentTreeMap rawMap =
                 new AtomixConsistentTreeMap(client.newProxyBuilder()
                         .withName(name)
                         .withServiceType(DistributedPrimitive.Type.CONSISTENT_TREEMAP.name())
@@ -145,26 +139,19 @@
                         .open()
                         .join());
 
-        AsyncConsistentTreeMap<byte[]> rawMap =
-                new DelegatingAsyncConsistentTreeMap<byte[]>(atomixConsistentTreeMap) {
-                    @Override
-                    public String name() {
-                        return name;
-                    }
-                };
-
-        AsyncConsistentTreeMap<V> transcodedMap =
-                DistributedPrimitives.<V, byte[]>newTranscodingTreeMap(
-                        rawMap,
-                        value -> value == null ? null : serializer.encode(value),
-                        bytes -> serializer.decode(bytes));
-
-        return transcodedMap;
+        if (serializer != null) {
+            return DistributedPrimitives.newTranscodingTreeMap(
+                            rawMap,
+                            value -> value == null ? null : serializer.encode(value),
+                            bytes -> serializer.decode(bytes));
+        }
+        return (AsyncConsistentTreeMap<V>) rawMap;
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(String name, Serializer serializer) {
-        AtomixConsistentSetMultimap atomixConsistentSetMultimap =
+        AtomixConsistentSetMultimap rawMap =
                 new AtomixConsistentSetMultimap(client.newProxyBuilder()
                         .withName(name)
                         .withServiceType(DistributedPrimitive.Type.CONSISTENT_MULTIMAP.name())
@@ -176,24 +163,15 @@
                         .open()
                         .join());
 
-        AsyncConsistentMultimap<String, byte[]> rawMap =
-                new DelegatingAsyncConsistentMultimap<String, byte[]>(
-                        atomixConsistentSetMultimap) {
-                    @Override
-                    public String name() {
-                        return super.name();
-                    }
-                };
-
-        AsyncConsistentMultimap<K, V> transcodedMap =
-                DistributedPrimitives.newTranscodingMultimap(
-                        rawMap,
-                        key -> HexString.toHexString(serializer.encode(key)),
-                        string -> serializer.decode(HexString.fromHexString(string)),
-                        value -> serializer.encode(value),
-                        bytes -> serializer.decode(bytes));
-
-        return transcodedMap;
+        if (serializer != null) {
+            return DistributedPrimitives.newTranscodingMultimap(
+                            rawMap,
+                            key -> HexString.toHexString(serializer.encode(key)),
+                            string -> serializer.decode(HexString.fromHexString(string)),
+                            value -> serializer.encode(value),
+                            bytes -> serializer.decode(bytes));
+        }
+        return (AsyncConsistentMultimap<K, V>) rawMap;
     }
 
     @Override
@@ -202,8 +180,9 @@
     }
 
     @Override
+    @SuppressWarnings("unchecked")
     public <K> AsyncAtomicCounterMap<K> newAsyncAtomicCounterMap(String name, Serializer serializer) {
-        AtomixAtomicCounterMap atomixAtomicCounterMap = new AtomixAtomicCounterMap(client.newProxyBuilder()
+        AtomixAtomicCounterMap rawMap = new AtomixAtomicCounterMap(client.newProxyBuilder()
                 .withName(name)
                 .withServiceType(DistributedPrimitive.Type.COUNTER_MAP.name())
                 .withReadConsistency(ReadConsistency.LINEARIZABLE_LEASE)
@@ -214,13 +193,13 @@
                 .open()
                 .join());
 
-        AsyncAtomicCounterMap<K> transcodedMap =
-                DistributedPrimitives.newTranscodingAtomicCounterMap(
-                        atomixAtomicCounterMap,
-                        key -> HexString.toHexString(serializer.encode(key)),
-                        string -> serializer.decode(HexString.fromHexString(string)));
-
-        return transcodedMap;
+        if (serializer != null) {
+            return DistributedPrimitives.newTranscodingAtomicCounterMap(
+                            rawMap,
+                            key -> HexString.toHexString(serializer.encode(key)),
+                            string -> serializer.decode(HexString.fromHexString(string)));
+        }
+        return (AsyncAtomicCounterMap<K>) rawMap;
     }
 
     @Override