Use consistent hashing for map keys
Change-Id: I9a3e7947c0ffa7b39569b8b6164bd84051c3e543
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index 52c91b8..4786fb4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -41,6 +41,7 @@
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.function.Function;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -51,28 +52,35 @@
public class FederatedDistributedPrimitiveCreator implements DistributedPrimitiveCreator {
private final TreeMap<PartitionId, DistributedPrimitiveCreator> members;
private final List<PartitionId> sortedMemberPartitionIds;
+ private final int buckets;
- public FederatedDistributedPrimitiveCreator(Map<PartitionId, DistributedPrimitiveCreator> members) {
+ public FederatedDistributedPrimitiveCreator(Map<PartitionId, DistributedPrimitiveCreator> members, int buckets) {
this.members = Maps.newTreeMap();
this.members.putAll(checkNotNull(members));
this.sortedMemberPartitionIds = Lists.newArrayList(members.keySet());
+ this.buckets = buckets;
}
@Override
public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
checkNotNull(name);
checkNotNull(serializer);
- Map<PartitionId, AsyncConsistentMap<String, byte[]>> maps =
+ Map<PartitionId, AsyncConsistentMap<byte[], byte[]>> maps =
Maps.transformValues(members,
- partition -> partition.newAsyncConsistentMap(name, null));
- Hasher<String> hasher = key -> {
- int hashCode = Hashing.sha256().hashString(key, Charsets.UTF_8).asInt();
- return sortedMemberPartitionIds.get(Math.abs(hashCode) % members.size());
+ partition -> DistributedPrimitives.newTranscodingMap(
+ partition.<String, byte[]>newAsyncConsistentMap(name, null),
+ HexString::toHexString,
+ HexString::fromHexString,
+ Function.identity(),
+ Function.identity()));
+ Hasher<byte[]> hasher = key -> {
+ int bucket = Math.abs(Hashing.murmur3_32().hashBytes(key).asInt()) % buckets;
+ return sortedMemberPartitionIds.get(Hashing.consistentHash(bucket, sortedMemberPartitionIds.size()));
};
- AsyncConsistentMap<String, byte[]> partitionedMap = new PartitionedAsyncConsistentMap<>(name, maps, hasher);
+ AsyncConsistentMap<byte[], byte[]> partitionedMap = new PartitionedAsyncConsistentMap<>(name, maps, hasher);
return DistributedPrimitives.newTranscodingMap(partitionedMap,
- key -> HexString.toHexString(serializer.encode(key)),
- string -> serializer.decode(HexString.fromHexString(string)),
+ key -> serializer.encode(key),
+ bytes -> serializer.decode(bytes),
value -> value == null ? null : serializer.encode(value),
bytes -> serializer.decode(bytes));
}