Implement partitioned multimap

Change-Id: Ia1823a9aa263ec0e4b0171b139df300e6757d5d4
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
index b6f27f0..3fe721f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/FederatedDistributedPrimitiveCreator.java
@@ -110,7 +110,25 @@
 
     @Override
     public <K, V> AsyncConsistentMultimap<K, V> newAsyncConsistentSetMultimap(ConsistentMultimapOptions options) {
-        return getCreator(options.name()).newAsyncConsistentSetMultimap(options);
+        Map<PartitionId, AsyncConsistentMultimap<byte[], byte[]>> maps =
+            Maps.transformValues(members,
+                partition -> DistributedPrimitives.newTranscodingMultimap(
+                    partition.<String, byte[]>newAsyncConsistentSetMultimap(options.name(), null),
+                    HexString::toHexString,
+                    HexString::fromHexString,
+                    Function.identity(),
+                    Function.identity()));
+        Hasher<byte[]> hasher = key -> {
+            int bucket = Math.abs(Hashing.murmur3_32().hashBytes(key).asInt()) % buckets;
+            return sortedMemberPartitionIds.get(Hashing.consistentHash(bucket, sortedMemberPartitionIds.size()));
+        };
+        AsyncConsistentMultimap<byte[], byte[]> partitionedMap =
+            new PartitionedAsyncConsistentMultimap<>(options.name(), maps, hasher);
+        return DistributedPrimitives.newTranscodingMultimap(partitionedMap,
+            key -> options.serializer().encode(key),
+            bytes -> options.serializer().decode(bytes),
+            value -> value == null ? null : options.serializer().encode(value),
+            bytes -> options.serializer().decode(bytes));
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
new file mode 100644
index 0000000..50b919f
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.HashMultiset;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
+import com.google.common.collect.Multisets;
+import org.onlab.util.Match;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.Versioned;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * {@link AsyncConsistentMultimap} that has its entries partitioned horizontally across
+ * several {@link AsyncConsistentMultimap maps}.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class PartitionedAsyncConsistentMultimap<K, V> implements AsyncConsistentMultimap<K, V> {
+
+    private final String name;
+    private final TreeMap<PartitionId, AsyncConsistentMultimap<K, V>> partitions = Maps.newTreeMap();
+    private final Hasher<K> keyHasher;
+
+    public PartitionedAsyncConsistentMultimap(String name,
+        Map<PartitionId, AsyncConsistentMultimap<K, V>> partitions,
+        Hasher<K> keyHasher) {
+        this.name = name;
+        this.partitions.putAll(checkNotNull(partitions));
+        this.keyHasher = checkNotNull(keyHasher);
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public CompletableFuture<Integer> size() {
+        return Tools.allOf(getMultimaps().stream().map(m -> m.size()).collect(Collectors.toList()),
+            Math::addExact,
+            0);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isEmpty() {
+        return size().thenApply(size -> size == 0);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsKey(K key) {
+        return getMultimap(key).containsKey(key);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsValue(V value) {
+        return Tools.firstOf(getMultimaps().stream().map(m -> m.containsValue(value)).collect(Collectors.toList()),
+            Match.ifValue(true),
+            false);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
+        return getMultimap(key).get(key);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsEntry(K key, V value) {
+        return getMultimap(key).containsEntry(key, value);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> put(K key, V value) {
+        return getMultimap(key).put(key, value);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
+        return getMultimap(key).removeAll(key, values);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
+        return getMultimap(key).removeAll(key);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
+        return getMultimap(key).putAll(key, values);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
+        return getMultimap(key).replaceValues(key, values);
+    }
+
+    @Override
+    public CompletableFuture<Map<K, Collection<V>>> asMap() {
+        throw new UnsupportedOperationException("Expensive operation.");
+    }
+
+    @Override
+    public CompletableFuture<Void> clear() {
+        return CompletableFuture.allOf(getMultimaps().stream()
+            .map(map -> map.clear())
+            .toArray(CompletableFuture[]::new));
+    }
+
+    @Override
+    public CompletableFuture<Set<K>> keySet() {
+        return Tools.allOf(getMultimaps().stream().map(m -> m.keySet()).collect(Collectors.toList()),
+            (s1, s2) -> ImmutableSet.<K>builder().addAll(s1).addAll(s2).build(),
+            ImmutableSet.of());
+    }
+
+    @Override
+    public CompletableFuture<Multiset<K>> keys() {
+        return Tools.allOf(getMultimaps().stream().map(m -> m.keys()).collect(Collectors.toList()))
+            .thenApply(results -> results.stream().reduce(Multisets::sum).orElse(HashMultiset.create()));
+    }
+
+    @Override
+    public CompletableFuture<Multiset<V>> values() {
+        return Tools.allOf(getMultimaps().stream().map(m -> m.values()).collect(Collectors.toList()))
+            .thenApply(results -> results.stream().reduce(Multisets::sum).orElse(HashMultiset.create()));
+    }
+
+    @Override
+    public CompletableFuture<Collection<Entry<K, V>>> entries() {
+        return Tools.allOf(getMultimaps().stream().map(m -> m.entries()).collect(Collectors.toList()))
+            .thenApply(results -> results.stream().reduce((s1, s2) -> ImmutableList.copyOf(Iterables.concat(s1, s2)))
+                .orElse(ImmutableList.of()));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(K key, V value) {
+        return getMultimap(key).remove(key, value);
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        return CompletableFuture.allOf(getMultimaps().stream()
+            .map(map -> map.addListener(listener, executor))
+            .toArray(CompletableFuture[]::new));
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+        return CompletableFuture.allOf(getMultimaps().stream()
+            .map(map -> map.removeListener(listener))
+            .toArray(CompletableFuture[]::new));
+    }
+
+    @Override
+    public void addStatusChangeListener(Consumer<Status> listener) {
+        partitions.values().forEach(map -> map.addStatusChangeListener(listener));
+    }
+
+    @Override
+    public void removeStatusChangeListener(Consumer<Status> listener) {
+        partitions.values().forEach(map -> map.removeStatusChangeListener(listener));
+    }
+
+    @Override
+    public Collection<Consumer<Status>> statusChangeListeners() {
+        throw new UnsupportedOperationException();
+    }
+
+    /**
+     * Returns the map (partition) to which the specified key maps.
+     *
+     * @param key key
+     * @return AsyncConsistentMap to which key maps
+     */
+    private AsyncConsistentMultimap<K, V> getMultimap(K key) {
+        return partitions.get(keyHasher.hash(key));
+    }
+
+    /**
+     * Returns all the constituent maps.
+     *
+     * @return collection of maps.
+     */
+    private Collection<AsyncConsistentMultimap<K, V>> getMultimaps() {
+        return partitions.values();
+    }
+}