Use consistent hashing for map keys
Change-Id: I9a3e7947c0ffa7b39569b8b6164bd84051c3e543
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 52c91b8..4786fb4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.function.Function;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -51,28 +52,35 @@
public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiveCreator {
private final TreeMap<PartitionId, DistributedPrimitiveCreator> members;
private final List<PartitionId> sortedMemberPartitionIds;
+ private final int buckets;
- public FederatedDistributedPrimitiveCreator(Map<PartitionId, DistributedPrimitiveCreator> members) {
+ public FederatedDistributedPrimitiveCreator(Map<PartitionId, DistributedPrimitiveCreator> members, int buckets) {
this.members = Maps.newTreeMap();
this.members.putAll(checkNotNull(members));
this.sortedMemberPartitionIds = Lists.newArrayList(members.keySet());
+ this.buckets = buckets;
}
@Override
public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
checkNotNull(name);
checkNotNull(serializer);
- Map<PartitionId, AsyncConsistentMap<String, byte[]>> maps =
+ Map<PartitionId, AsyncConsistentMap<byte[], byte[]>> maps =
Maps.transformValues(members,
- partition -> partition.newAsyncConsistentMap(name, null));
- Hasher<String> hasher = key -> {
- int hashCode = Hashing.sha256().hashString(key, Charsets.UTF_8).asInt();
- return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
+ partition -> DistributedPrimitives.newTranscodingMap(
+ partition.<String, byte[]>newAsyncConsistentMap(name, null),
+ HexString::toHexString,
+ HexString::fromHexString,
+ Function.identity(),
+ Function.identity()));
+ Hasher<byte[]> hasher = key -> {
+ int bucket = Math.abs(Hashing.murmur3_32().hashBytes(key).asInt()) % buckets;
+ return sortedMemberPartitionIds.get(Hashing.consistentHash(bucket, sortedMemberPartitionIds.size()));
};
- AsyncConsistentMap<String, byte[]> partitionedMap = new PartitionedAsyncConsistentMap<>(name, maps, hasher);
+ AsyncConsistentMap<byte[], byte[]> partitionedMap = new PartitionedAsyncConsistentMap<>(name, maps, hasher);
return DistributedPrimitives.newTranscodingMap(partitionedMap,
- key -> HexString.toHexString(serializer.encode(key)),
- string -> serializer.decode(HexString.fromHexString(string)),
+ key -> serializer.encode(key),
+ bytes -> serializer.decode(bytes),
value -> value == null ? null : serializer.encode(value),
bytes -> serializer.decode(bytes));
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index e92e965..822641c 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -77,6 +77,8 @@
@Component(immediate = true)
public class StorageManager implements StorageService, StorageAdminService {
+ private static final int BUCKETS = 128;
+
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -105,8 +107,8 @@
partitionService.getAllPartitionIds().stream()
.filter(id -> !id.equals(PartitionId.from(0)))
.forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
- federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap);
- transactionManager = new TransactionManager(this, partitionService);
+ federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap, BUCKETS);
+ transactionManager = new TransactionManager(this, partitionService, BUCKETS);
log.info("Started");
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
index 0e99212..198c1ae 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
@@ -23,14 +23,12 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import com.google.common.base.Charsets;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.Futures;
-import org.onlab.util.HexString;
import org.onosproject.cluster.PartitionId;
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.PartitionService;
@@ -51,15 +49,21 @@
private final List<PartitionId> sortedPartitions;
private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
private final int cacheSize;
+ private final int buckets;
private final Map<PartitionId, Cache<String, CachedMap>> partitionCache = Maps.newConcurrentMap();
- public TransactionManager(StorageService storageService, PartitionService partitionService) {
- this(storageService, partitionService, DEFAULT_CACHE_SIZE);
+ public TransactionManager(StorageService storageService, PartitionService partitionService, int buckets) {
+ this(storageService, partitionService, DEFAULT_CACHE_SIZE, buckets);
}
- public TransactionManager(StorageService storageService, PartitionService partitionService, int cacheSize) {
+ public TransactionManager(
+ StorageService storageService,
+ PartitionService partitionService,
+ int cacheSize,
+ int buckets) {
this.partitionService = partitionService;
this.cacheSize = cacheSize;
+ this.buckets = buckets;
this.transactions = storageService.<TransactionId, Transaction.State>consistentMapBuilder()
.withName("onos-transactions")
.withSerializer(Serializer.using(KryoNamespaces.API,
@@ -103,9 +107,9 @@
}
Hasher<K> hasher = key -> {
- int hashCode = Hashing.sha256()
- .hashString(HexString.toHexString(serializer.encode(key)), Charsets.UTF_8).asInt();
- return sortedPartitions.get(Math.abs(hashCode) % sortedPartitions.size());
+ int bucket = Math.abs(Hashing.murmur3_32().hashBytes(serializer.encode(key)).asInt()) % buckets;
+ int partition = Hashing.consistentHash(bucket, sortedPartitions.size());
+ return sortedPartitions.get(partition);
};
return new PartitionedTransactionalMap<>(partitions, hasher);
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
index 5638300..918eae3 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TransactionManagerTest.java
@@ -81,7 +81,7 @@
replay(storageService, partitionService, asyncMap, primitiveCreator, mapBuilder);
- TransactionManager transactionManager = new TransactionManager(storageService, partitionService);
+ TransactionManager transactionManager = new TransactionManager(storageService, partitionService, 128);
TransactionId transactionId = TransactionId.from(UUID.randomUUID().toString());
TransactionCoordinator transactionCoordinator = new TransactionCoordinator(transactionId, transactionManager);
Serializer serializer = Serializer.using(KryoNamespaces.API);