Implement partitioned multimap
Change-Id: Ia1823a9aa263ec0e4b0171b139df300e6757d5d4
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index b6f27f0..3fe721f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -110,7 +110,25 @@
@Override
public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(ConsistentMultimapOptions options) {
- return getCreator(options.name()).newAsyncConsistentSetMultimap(options);
+ Map<PartitionId, AsyncConsistentMultimap<byte[], byte[]>> maps =
+ Maps.transformValues(members,
+ partition -> DistributedPrimitives.newTranscodingMultimap(
+ partition.<String, byte[]>newAsyncConsistentSetMultimap(options.name(), null),
+ HexString::toHexString,
+ HexString::fromHexString,
+ Function.identity(),
+ Function.identity()));
+ Hasher<byte[]> hasher = key -> {
+ int bucket = Math.abs(Hashing.murmur3_32().hashBytes(key).asInt()) % buckets;
+ return sortedMemberPartitionIds.get(Hashing.consistentHash(bucket, sortedMemberPartitionIds.size()));
+ };
+ AsyncConsistentMultimap<byte[], byte[]> partitionedMap =
+ new PartitionedAsyncConsistentMultimap<>(options.name(), maps, hasher);
+ return DistributedPrimitives.newTranscodingMultimap(partitionedMap,
+ key -> options.serializer().encode(key),
+ bytes -> options.serializer().decode(bytes),
+ value -> value == null ? null : options.serializer().encode(value),
+ bytes -> options.serializer().decode(bytes));
}
@Override