Use consistent hashing for map keys

Change-Id: I9a3e7947c0ffa7b39569b8b6164bd84051c3e543
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 52c91b8..4786fb4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -41,6 +41,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.function.Function;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
@@ -51,28 +52,35 @@
 public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiveCreator {
     private final TreeMap<PartitionId, DistributedPrimitiveCreator> members;
     private final List<PartitionId> sortedMemberPartitionIds;
+    private final int buckets;
 
-    public FederatedDistributedPrimitiveCreator(Map<PartitionId, DistributedPrimitiveCreator> members) {
+    public FederatedDistributedPrimitiveCreator(Map<PartitionId, DistributedPrimitiveCreator> members, int buckets) {
         this.members = Maps.newTreeMap();
         this.members.putAll(checkNotNull(members));
         this.sortedMemberPartitionIds = Lists.newArrayList(members.keySet());
+        this.buckets = buckets;
     }
 
     @Override
     public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
         checkNotNull(name);
         checkNotNull(serializer);
-        Map<PartitionId, AsyncConsistentMap<String, byte[]>> maps =
+        Map<PartitionId, AsyncConsistentMap<byte[], byte[]>> maps =
                 Maps.transformValues(members,
-                                     partition -> partition.newAsyncConsistentMap(name, null));
-        Hasher<String> hasher = key -> {
-            int hashCode = Hashing.sha256().hashString(key, Charsets.UTF_8).asInt();
-            return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
+                                     partition -> DistributedPrimitives.newTranscodingMap(
+                                             partition.<String, byte[]>newAsyncConsistentMap(name, null),
+                                             HexString::toHexString,
+                                             HexString::fromHexString,
+                                             Function.identity(),
+                                             Function.identity()));
+        Hasher<byte[]> hasher = key -> {
+            int bucket = Math.abs(Hashing.murmur3_32().hashBytes(key).asInt()) % buckets;
+            return sortedMemberPartitionIds.get(Hashing.consistentHash(bucket, sortedMemberPartitionIds.size()));
         };
-        AsyncConsistentMap<String, byte[]> partitionedMap = new PartitionedAsyncConsistentMap<>(name, maps, hasher);
+        AsyncConsistentMap<byte[], byte[]> partitionedMap = new PartitionedAsyncConsistentMap<>(name, maps, hasher);
         return DistributedPrimitives.newTranscodingMap(partitionedMap,
-                key -> HexString.toHexString(serializer.encode(key)),
-                string -> serializer.decode(HexString.fromHexString(string)),
+                key -> serializer.encode(key),
+                bytes -> serializer.decode(bytes),
                 value -> value == null ? null : serializer.encode(value),
                 bytes -> serializer.decode(bytes));
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index e92e965..822641c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -77,6 +77,8 @@
 @Component(immediate = true)
 public class StorageManager implements StorageService, StorageAdminService {
 
+    private static final int BUCKETS = 128;
+
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -105,8 +107,8 @@
         partitionService.getAllPartitionIds().stream()
             .filter(id -> !id.equals(PartitionId.from(0)))
             .forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
-        federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap);
-        transactionManager = new TransactionManager(this, partitionService);
+        federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap, BUCKETS);
+        transactionManager = new TransactionManager(this, partitionService, BUCKETS);
         log.info("Started");
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
index 0e99212..198c1ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
@@ -23,14 +23,12 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-import com.google.common.base.Charsets;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.hash.Hashing;
 import com.google.common.util.concurrent.Futures;
-import org.onlab.util.HexString;
 import org.onosproject.cluster.PartitionId;
 import org.onosproject.store.primitives.MapUpdate;
 import org.onosproject.store.primitives.PartitionService;
@@ -51,15 +49,21 @@
     private final List<PartitionId> sortedPartitions;
     private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
     private final int cacheSize;
+    private final int buckets;
     private final Map<PartitionId, Cache<String, CachedMap>> partitionCache = Maps.newConcurrentMap();
 
-    public TransactionManager(StorageService storageService, PartitionService partitionService) {
-        this(storageService, partitionService, DEFAULT_CACHE_SIZE);
+    public TransactionManager(StorageService storageService, PartitionService partitionService, int buckets) {
+        this(storageService, partitionService, DEFAULT_CACHE_SIZE, buckets);
     }
 
-    public TransactionManager(StorageService storageService, PartitionService partitionService, int cacheSize) {
+    public TransactionManager(
+            StorageService storageService,
+            PartitionService partitionService,
+            int cacheSize,
+            int buckets) {
         this.partitionService = partitionService;
         this.cacheSize = cacheSize;
+        this.buckets = buckets;
         this.transactions = storageService.<TransactionId, Transaction.State>consistentMapBuilder()
                 .withName("onos-transactions")
                 .withSerializer(Serializer.using(KryoNamespaces.API,
@@ -103,9 +107,9 @@
         }
 
         Hasher<K> hasher = key -> {
-            int hashCode = Hashing.sha256()
-                    .hashString(HexString.toHexString(serializer.encode(key)), Charsets.UTF_8).asInt();
-            return sortedPartitions.get(Math.abs(hashCode) % sortedPartitions.size());
+            int bucket = Math.abs(Hashing.murmur3_32().hashBytes(serializer.encode(key)).asInt()) % buckets;
+            int partition = Hashing.consistentHash(bucket, sortedPartitions.size());
+            return sortedPartitions.get(partition);
         };
         return new PartitionedTransactionalMap<>(partitions, hasher);
     }
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
index 5638300..918eae3 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
@@ -81,7 +81,7 @@
 
         replay(storageService, partitionService, asyncMap, primitiveCreator, mapBuilder);
 
-        TransactionManager transactionManager = new TransactionManager(storageService, partitionService);
+        TransactionManager transactionManager = new TransactionManager(storageService, partitionService, 128);
         TransactionId transactionId = TransactionId.from(UUID.randomUUID().toString());
         TransactionCoordinator transactionCoordinator = new TransactionCoordinator(transactionId, transactionManager);
         Serializer serializer = Serializer.using(KryoNamespaces.API);