Implement partitioned multimap
Change-Id: Ia1823a9aa263ec0e4b0171b139df300e6757d5d4
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index b6f27f0..3fe721f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -110,7 +110,25 @@
@Override
public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(ConsistentMultimapOptions options) {
- return getCreator(options.name()).newAsyncConsistentSetMultimap(options);
+ Map<PartitionId, AsyncConsistentMultimap<byte[], byte[]>> maps =
+ Maps.transformValues(members,
+ partition -> DistributedPrimitives.newTranscodingMultimap(
+ partition.<String, byte[]>newAsyncConsistentSetMultimap(options.name(), null),
+ HexString::toHexString,
+ HexString::fromHexString,
+ Function.identity(),
+ Function.identity()));
+ Hasher<byte[]> hasher = key -> {
+ int bucket = Math.abs(Hashing.murmur3_32().hashBytes(key).asInt()) % buckets;
+ return sortedMemberPartitionIds.get(Hashing.consistentHash(bucket, sortedMemberPartitionIds.size()));
+ };
+ AsyncConsistentMultimap<byte[], byte[]> partitionedMap =
+ new PartitionedAsyncConsistentMultimap<>(options.name(), maps, hasher);
+ return DistributedPrimitives.newTranscodingMultimap(partitionedMap,
+ key -> options.serializer().encode(key),
+ bytes -> options.serializer().decode(bytes),
+ value -> value == null ? null : options.serializer().encode(value),
+ bytes -> options.serializer().decode(bytes));
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
new file mode 100644
index 0000000..50b919f
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Multisets;
+import org.onlab.util.Match;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link AsyncConsistentMultimap} that has its entries partitioned horizontally across
+ * several {@link AsyncConsistentMultimap maps}.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class PartitionedAsyncConsistentMultimap<K, V> implements AsyncConsistentMultimap<K, V> {
+
+ private final String name;
+ private final TreeMap<PartitionId, AsyncConsistentMultimap<K, V>> partitions = Maps.newTreeMap();
+ private final Hasher<K> keyHasher;
+
+ public PartitionedAsyncConsistentMultimap(String name,
+ Map<PartitionId, AsyncConsistentMultimap<K, V>> partitions,
+ Hasher<K> keyHasher) {
+ this.name = name;
+ this.partitions.putAll(checkNotNull(partitions));
+ this.keyHasher = checkNotNull(keyHasher);
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return Tools.allOf(getMultimaps().stream().map(m -> m.size()).collect(Collectors.toList()),
+ Math::addExact,
+ 0);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ return size().thenApply(size -> size == 0);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsKey(K key) {
+ return getMultimap(key).containsKey(key);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsValue(V value) {
+ return Tools.firstOf(getMultimaps().stream().map(m -> m.containsValue(value)).collect(Collectors.toList()),
+ Match.ifValue(true),
+ false);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
+ return getMultimap(key).get(key);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsEntry(K key, V value) {
+ return getMultimap(key).containsEntry(key, value);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> put(K key, V value) {
+ return getMultimap(key).put(key, value);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
+ return getMultimap(key).removeAll(key, values);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
+ return getMultimap(key).removeAll(key);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
+ return getMultimap(key).putAll(key, values);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
+ return getMultimap(key).replaceValues(key, values);
+ }
+
+ @Override
+ public CompletableFuture<Map<K, Collection<V>>> asMap() {
+ throw new UnsupportedOperationException("Expensive operation.");
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ return CompletableFuture.allOf(getMultimaps().stream()
+ .map(map -> map.clear())
+ .toArray(CompletableFuture[]::new));
+ }
+
+ @Override
+ public CompletableFuture<Set<K>> keySet() {
+ return Tools.allOf(getMultimaps().stream().map(m -> m.keySet()).collect(Collectors.toList()),
+ (s1, s2) -> ImmutableSet.<K>builder().addAll(s1).addAll(s2).build(),
+ ImmutableSet.of());
+ }
+
+ @Override
+ public CompletableFuture<Multiset<K>> keys() {
+ return Tools.allOf(getMultimaps().stream().map(m -> m.keys()).collect(Collectors.toList()))
+ .thenApply(results -> results.stream().reduce(Multisets::sum).orElse(HashMultiset.create()));
+ }
+
+ @Override
+ public CompletableFuture<Multiset<V>> values() {
+ return Tools.allOf(getMultimaps().stream().map(m -> m.values()).collect(Collectors.toList()))
+ .thenApply(results -> results.stream().reduce(Multisets::sum).orElse(HashMultiset.create()));
+ }
+
+ @Override
+ public CompletableFuture<Collection<Entry<K, V>>> entries() {
+ return Tools.allOf(getMultimaps().stream().map(m -> m.entries()).collect(Collectors.toList()))
+ .thenApply(results -> results.stream().reduce((s1, s2) -> ImmutableList.copyOf(Iterables.concat(s1, s2)))
+ .orElse(ImmutableList.of()));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(K key, V value) {
+ return getMultimap(key).remove(key, value);
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ return CompletableFuture.allOf(getMultimaps().stream()
+ .map(map -> map.addListener(listener, executor))
+ .toArray(CompletableFuture[]::new));
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+ return CompletableFuture.allOf(getMultimaps().stream()
+ .map(map -> map.removeListener(listener))
+ .toArray(CompletableFuture[]::new));
+ }
+
+ @Override
+ public void addStatusChangeListener(Consumer<Status> listener) {
+ partitions.values().forEach(map -> map.addStatusChangeListener(listener));
+ }
+
+ @Override
+ public void removeStatusChangeListener(Consumer<Status> listener) {
+ partitions.values().forEach(map -> map.removeStatusChangeListener(listener));
+ }
+
+ @Override
+ public Collection<Consumer<Status>> statusChangeListeners() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Returns the map (partition) to which the specified key maps.
+ *
+ * @param key key
+ * @return AsyncConsistentMap to which key maps
+ */
+ private AsyncConsistentMultimap<K, V> getMultimap(K key) {
+ return partitions.get(keyHasher.hash(key));
+ }
+
+ /**
+ * Returns all the constituent maps.
+ *
+ * @return collection of maps.
+ */
+ private Collection<AsyncConsistentMultimap<K, V>> getMultimaps() {
+ return partitions.values();
+ }
+}