Add bulk update to ConsistentMultimap

Change-Id: I61e9f0c2ed5ab368777c64b6fb4aa2c8dd31d081
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
index c14b187..51f0934 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
@@ -111,11 +111,21 @@
     }
 
     @Override
+    public boolean removeAll(Map<K, Collection<? extends V>> mapping) {
+        return complete(asyncMultimap.removeAll(mapping));
+    }
+
+    @Override
     public boolean putAll(K key, Collection<? extends V> values) {
         return complete(asyncMultimap.putAll(key, values));
     }
 
     @Override
+    public boolean putAll(Map<K, Collection<? extends V>> mapping) {
+        return complete(asyncMultimap.putAll(mapping));
+    }
+
+    @Override
     public Versioned<Collection<? extends V>> replaceValues(
             K key, Collection<V> values) {
         return complete(asyncMultimap.replaceValues(key, values));
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
index 94943b8..b86845e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
@@ -166,6 +166,19 @@
     CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key);
 
     /**
+     * Removes the set of key-value pairs with the specified key and values if they
+     * exist. In implementations that allow duplicates each instance of a key
+     * will remove one matching entry, which one is not defined. Equivalent to
+     * repeated calls to {@code remove()} for each key value pair but more
+     * efficient.
+     *
+     * @param mapping the keys-values to be removed
+     * @return a future whose value will be true if the map changes because of
+     * this call, false otherwise.
+     */
+    CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping);
+
+    /**
      * Adds the set of key-value pairs of the specified key with each of the
      * values in the iterable if each key-value pair does not already exist,
      * if the pair does exist the behavior is implementation specific.
@@ -180,6 +193,18 @@
                                       Collection<? extends V> values);
 
     /**
+     * Adds the set of key-value pairs of the specified mapping with each of
+     * the values in the iterable if each key-value pair does not already exist,
+     * if the pair does exist the behavior is implementation specific.
+     * (Same as repeated puts but with efficiency gains.)
+     *
+     * @param mapping the keys-values to be added
+     * @return a future whose value will be true if any change in the map
+     * results from this call, false otherwise
+     */
+    CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping);
+
+    /**
      * Stores all the values in values associated with the key specified,
      * removes all preexisting values and returns a collection of the removed
      * values which may be empty if the entry did not exist.
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
index 99045a8..8aabca7 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
@@ -149,6 +149,18 @@
     Versioned<Collection<? extends V>> removeAll(K key);
 
     /**
+     * Removes the set of key-value pairs with the specified key and values if they
+     * exist. In implementations that allow duplicates each instance of a key
+     * will remove one matching entry, which one is not defined. Equivalent to
+     * repeated calls to {@code remove()} for each key value pair but more
+     * efficient.
+     *
+     * @param mapping the keys-values to be removed
+     * @return true if the map changes because of this call, false otherwise.
+     */
+    boolean removeAll(Map<K, Collection<? extends V>> mapping);
+
+    /**
      * Adds the set of key-value pairs of the specified key with each of the
      * values in the iterable if each key-value pair does not already exist,
      * if the pair does exist the behavior is implementation specific.
@@ -162,6 +174,18 @@
     boolean putAll(K key, Collection<? extends V> values);
 
     /**
+     * Adds the set of key-value pairs of the specified mapping with each of
+     * the values in the iterable if each key-value pair does not already exist,
+     * if the pair does exist the behavior is implementation specific.
+     * (Same as repeated puts but with efficiency gains.)
+     *
+     * @param mapping the keys-values to be added
+     * @return true if any change in the map results from this call,
+     * false otherwise
+     */
+    boolean putAll(Map<K, Collection<? extends V>> mapping);
+
+    /**
      * Stores all the values in values associated with the key specified,
      * removes all preexisting values and returns a collection of the removed
      * values which may be empty if the entry did not exist.
diff --git a/core/api/src/test/java/org/onosproject/store/primitives/DefaultConsistentMultimapTest.java b/core/api/src/test/java/org/onosproject/store/primitives/DefaultConsistentMultimapTest.java
new file mode 100644
index 0000000..bc4db71
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/primitives/DefaultConsistentMultimapTest.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import org.junit.Test;
+import org.onosproject.store.service.AsyncConsistentMultimapAdapter;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests for DefaultConsistentMultiMap.
+ */
+public class DefaultConsistentMultimapTest {
+
+    private static final String KEY1 = "AAA";
+    private static final String VALUE1 = "111";
+    private static final String KEY2 = "BBB";
+    private static final String VALUE2 = "222";
+    private static final String KEY3 = "CCC";
+    private static final String VALUE3 = "333";
+    private static final String KEY4 = "DDD";
+    private static final String VALUE4 = "444";
+    private final List<String> allKeys = Lists.newArrayList(KEY1, KEY2,
+                                                            KEY3, KEY4);
+    private final List<String> allValues = Lists.newArrayList(VALUE1, VALUE2,
+                                                              VALUE3, VALUE4);
+
+    /**
+     * Tests the behavior of public APIs of the default consistent multi-map
+     * implementation.
+     */
+    @Test
+    public void testBehavior() {
+        // Initialize the map
+        Multimap<String, String> baseMap = HashMultimap.create();
+        AsyncConsistentMultimapMock<String, String> asyncMultiMap = new AsyncConsistentMultimapMock<>(baseMap);
+        ConsistentMultimap<String, String> newMap = new DefaultConsistentMultimap<>(asyncMultiMap, 69);
+
+        // Verify is empty
+        assertThat(newMap.size(), is(0));
+        assertThat(newMap.isEmpty(), is(true));
+
+        // Test multi put
+        Map<String, Collection<? extends String>> mapping = Maps.newHashMap();
+        // First build the mappings having each key a different mapping
+        allKeys.forEach(key -> {
+            switch (key) {
+                case KEY1:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 1)));
+                    break;
+                case KEY2:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 2)));
+                    break;
+                case KEY3:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 3)));
+                    break;
+                default:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 4)));
+                    break;
+            }
+        });
+        // Success
+        assertThat(newMap.putAll(mapping), is(true));
+        // Failure
+        assertThat(newMap.putAll(mapping), is(false));
+        // Verify operation
+        assertThat(newMap.size(), is(10));
+        assertThat(newMap.isEmpty(), is(false));
+        // verify mapping is ok
+        allKeys.forEach(key -> {
+            switch (key) {
+                case KEY1:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(newMap.get(key))),
+                                      containsInAnyOrder(allValues.subList(0, 1).toArray()));
+                    break;
+                case KEY2:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(newMap.get(key))),
+                                      containsInAnyOrder(allValues.subList(0, 2).toArray()));
+                    break;
+                case KEY3:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(newMap.get(key))),
+                                      containsInAnyOrder(allValues.subList(0, 3).toArray()));
+                    break;
+                default:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(newMap.get(key))),
+                                      containsInAnyOrder(allValues.subList(0, 4).toArray()));
+                    break;
+            }
+        });
+        // Success
+        assertThat(newMap.removeAll(mapping), is(true));
+        // Failure
+        assertThat(newMap.removeAll(mapping), is(false));
+        // Verify operation
+        assertThat(newMap.size(), is(0));
+        assertThat(newMap.isEmpty(), is(true));
+    }
+
+    public static class AsyncConsistentMultimapMock<K, V> extends AsyncConsistentMultimapAdapter<K, V> {
+        private final Multimap<K, V> baseMap;
+        private static final int DEFAULT_CREATION_TIME = 0;
+        private static final int DEFAULT_VERSION = 0;
+
+        AsyncConsistentMultimapMock(Multimap<K, V> newBaseMap) {
+            baseMap = newBaseMap;
+        }
+
+        Versioned<Collection<? extends V>> makeVersioned(Collection<? extends V> v) {
+            return new Versioned<>(v, DEFAULT_VERSION, DEFAULT_CREATION_TIME);
+        }
+
+        @Override
+        public CompletableFuture<Integer> size() {
+            return CompletableFuture.completedFuture(baseMap.size());
+        }
+
+        @Override
+        public CompletableFuture<Boolean> isEmpty() {
+            return CompletableFuture.completedFuture(baseMap.isEmpty());
+        }
+
+        @Override
+        public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
+            CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+            for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+                if (baseMap.putAll(entry.getKey(), entry.getValue())) {
+                    result = CompletableFuture.completedFuture(true);
+                }
+            }
+            return result;
+        }
+
+        @Override
+        public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
+            return CompletableFuture.completedFuture(makeVersioned(baseMap.get(key)));
+        }
+
+        @Override
+        public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
+            CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+            for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+                for (V value : entry.getValue()) {
+                    if (baseMap.remove(entry.getKey(), value)) {
+                        result = CompletableFuture.completedFuture(true);
+                    }
+                }
+            }
+            return result;
+        }
+    }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMultimapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMultimapAdapter.java
new file mode 100644
index 0000000..4bb2618
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMultimapAdapter.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+import com.google.common.collect.Multiset;
+import org.onosproject.core.ApplicationId;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+public class AsyncConsistentMultimapAdapter<K, V> implements AsyncConsistentMultimap<K, V> {
+
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public Type primitiveType() {
+        return null;
+    }
+
+    @Override
+    public ApplicationId applicationId() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> destroy() {
+        return null;
+    }
+
+    @Override
+    public void addStatusChangeListener(Consumer<Status> listener) {
+
+    }
+
+    @Override
+    public void removeStatusChangeListener(Consumer<Status> listener) {
+
+    }
+
+    @Override
+    public Collection<Consumer<Status>> statusChangeListeners() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Integer> size() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isEmpty() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsKey(K key) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsValue(V value) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsEntry(K key, V value) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> put(K key, V value) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(K key, V value) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> clear() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Set<K>> keySet() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Multiset<K>> keys() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Multiset<V>> values() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Collection<Map.Entry<K, V>>> entries() {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Map<K, Collection<V>>> asMap() {
+        return null;
+    }
+
+    @Override
+    public ConsistentMultimap<K, V> asMultimap() {
+        return null;
+    }
+
+    @Override
+    public ConsistentMultimap<K, V> asMultimap(long timeoutMillis) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<AsyncIterator<Map.Entry<K, V>>> iterator() {
+        return null;
+    }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
index 08be2e4..f70d186 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
@@ -24,6 +24,7 @@
 import java.util.Set;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 /**
  * Implementation to test ConsistentMultimap. Very limited.
@@ -103,11 +104,42 @@
     }
 
     @Override
+    public boolean removeAll(Map<K, Collection<? extends V>> mapping) {
+        // Semantic is that any change in the map should return true
+        boolean result = false;
+        for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+            Collection<? extends Versioned<V>> versionedValues = entry.getValue().stream()
+                    .map(this::version)
+                    .collect(Collectors.toList());
+            for (Versioned<V> value : versionedValues) {
+                if (innermap.remove(entry.getKey(), value)) {
+                    result = true;
+                }
+            }
+        }
+        return result;
+    }
+
+    @Override
     public boolean putAll(K key, Collection<? extends V> values) {
         return false;
     }
 
     @Override
+    public boolean putAll(Map<K, Collection<? extends V>> mapping) {
+        // Semantic is that any change in the map should return true
+        boolean result = false;
+        for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+            Collection<? extends Versioned<V>> versionedValues = entry.getValue().stream()
+                    .map(this::version).collect(Collectors.toList());
+            if (innermap.putAll(entry.getKey(), versionedValues)) {
+                result = true;
+            }
+        }
+        return result;
+    }
+
+    @Override
     public Versioned<Collection<? extends V>> replaceValues(K key, Collection<V> values) {
         return null;
     }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
index 3d49f7f..09640f2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
@@ -151,12 +151,24 @@
     }
 
     @Override
+    public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
+        return super.removeAll(mapping)
+                .whenComplete((r, e) -> mapping.keySet().forEach(cache::invalidate));
+    }
+
+    @Override
     public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
         return super.putAll(key, values)
             .whenComplete((r, e) -> cache.invalidate(key));
     }
 
     @Override
+    public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
+        return super.putAll(mapping)
+            .whenComplete((r, e) -> mapping.keySet().forEach(cache::invalidate));
+    }
+
+    @Override
     public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
         return super.replaceValues(key, values)
             .whenComplete((r, e) -> cache.invalidate(key));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index 779bb7a..6cfdaac 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -105,12 +105,24 @@
     }
 
     @Override
+    public CompletableFuture<Boolean> removeAll(
+            Map<K, Collection<? extends V>> mapping) {
+        return delegateMap.removeAll(mapping);
+    }
+
+    @Override
     public CompletableFuture<Boolean> putAll(
             K key, Collection<? extends V> values) {
         return delegateMap.putAll(key, values);
     }
 
     @Override
+    public CompletableFuture<Boolean> putAll(
+            Map<K, Collection<? extends V>> mapping) {
+        return delegateMap.putAll(mapping);
+    }
+
+    @Override
     public CompletableFuture<Versioned<Collection<? extends V>>>
         replaceValues(K key, Collection<V> values) {
         return delegateMap.replaceValues(key, values);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
index a4e0bf9..0cd3fe5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
@@ -126,11 +126,31 @@
     }
 
     @Override
+    public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
+        Map<PartitionId, Map<K, Collection<? extends V>>> subMappings = buildSubMappings(mapping);
+        // Semantic is that any change in the partitions should return true
+        return Tools.allOf(subMappings.entrySet().stream()
+                                   .map(entry -> partitions.get(entry.getKey()).removeAll(entry.getValue()))
+                                   .collect(Collectors.toList()),
+                           Boolean::logicalOr, false);
+    }
+
+    @Override
     public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
         return getMultimap(key).putAll(key, values);
     }
 
     @Override
+    public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
+        Map<PartitionId, Map<K, Collection<? extends V>>> subMappings = buildSubMappings(mapping);
+        // Semantic is that any change in the partitions should return true
+        return Tools.allOf(subMappings.entrySet().stream()
+                            .map(entry -> partitions.get(entry.getKey()).putAll(entry.getValue()))
+                            .collect(Collectors.toList()),
+                    Boolean::logicalOr, false);
+    }
+
+    @Override
     public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
         return getMultimap(key).replaceValues(key, values);
     }
@@ -237,6 +257,26 @@
         return partitions.values();
     }
 
+    /**
+     * Build sub-mappings for each partition.
+     *
+     * @param mapping initial mapping key-value
+     * @return sub-mappings partition-values
+     */
+    private Map<PartitionId, Map<K, Collection<? extends V>>> buildSubMappings(
+            Map<K, Collection<? extends V>> mapping) {
+        Map<PartitionId, Map<K, Collection<? extends V>>> subMappings = Maps.newHashMap();
+        // Build first a mapping with the partitions
+        mapping.forEach((key, values) -> subMappings.compute(keyHasher.hash(key), (k, v) -> {
+            if (v == null) {
+                v = Maps.newHashMap();
+            }
+            v.put(key, values);
+            return v;
+        }));
+        return subMappings;
+    }
+
     private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, V>> {
         private final Iterator<AsyncIterator<Entry<K, V>>> iterators;
         private volatile AsyncIterator<Entry<K, V>> iterator;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index 6fab8c8..cb7fc02 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -196,6 +196,21 @@
 
     @Override
     public CompletableFuture<Boolean>
+    removeAll(Map<K1, Collection<? extends V1>> mapping) {
+        try {
+            // Transform the mapping to the new output
+            Map<K2, Collection<? extends V2>> transformedMapping = Maps.newHashMap();
+            mapping.forEach((key, value) -> transformedMapping.put(keyEncoder.apply(key),
+                                                                   valueCollectionEncode.apply(value)));
+            // Then apply the operation
+            return backingMap.removeAll(transformedMapping);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Boolean>
     putAll(K1 key, Collection<? extends V1> values) {
         try {
             return backingMap.putAll(keyEncoder.apply(key),
@@ -206,6 +221,21 @@
     }
 
     @Override
+    public CompletableFuture<Boolean>
+    putAll(Map<K1, Collection<? extends V1>> mapping) {
+        try {
+            // Transform the mapping to the new output
+            Map<K2, Collection<? extends V2>> transformedMapping = Maps.newHashMap();
+            mapping.forEach((key, value) -> transformedMapping.put(keyEncoder.apply(key),
+                                                                   valueCollectionEncode.apply(value)));
+            // Then apply the operation
+            return backingMap.putAll(transformedMapping);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
+    }
+
+    @Override
     public CompletableFuture<Versioned<Collection<? extends V1>>>
     replaceValues(K1 key, Collection<V1> values) {
         try {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index b04c9dd..f4c732f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -75,6 +75,10 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Replace;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.SIZE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.VALUES;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MULTI_PUT_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiPutAll;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MULTI_REMOVE_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemoveAll;
 
 
 /**
@@ -171,7 +175,7 @@
         return proxy.invoke(
             REMOVE,
             SERIALIZER::encode,
-            new MultiRemove(key, (Collection<byte[]>) values, null),
+            new MultiRemove(key, values, null),
             SERIALIZER::decode);
     }
 
@@ -181,12 +185,23 @@
     }
 
     @Override
+    public CompletableFuture<Boolean> removeAll(Map<String, Collection<? extends byte[]>> mapping) {
+        return proxy.invoke(MULTI_REMOVE_ALL, SERIALIZER::encode,
+                            new MultiRemoveAll(mapping, null), SERIALIZER::decode);
+    }
+
+    @Override
     public CompletableFuture<Boolean> putAll(
         String key, Collection<? extends byte[]> values) {
         return proxy.invoke(PUT, SERIALIZER::encode, new Put(key, values, null), SERIALIZER::decode);
     }
 
     @Override
+    public CompletableFuture<Boolean> putAll(Map<String, Collection<? extends byte[]>> mapping) {
+        return proxy.invoke(MULTI_PUT_ALL, SERIALIZER::encode, new MultiPutAll(mapping, null), SERIALIZER::decode);
+    }
+
+    @Override
     public CompletableFuture<Versioned<Collection<? extends byte[]>>> replaceValues(
         String key, Collection<byte[]> values) {
         return proxy.invoke(
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
index 4d38d71..eb6dc65 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
@@ -47,10 +47,12 @@
     VALUES(OperationType.QUERY),
     ENTRIES(OperationType.QUERY),
     PUT(OperationType.COMMAND),
+    MULTI_PUT_ALL(OperationType.COMMAND),
     PUT_AND_GET(OperationType.COMMAND),
     REMOVE(OperationType.COMMAND),
     REMOVE_AND_GET(OperationType.COMMAND),
     REMOVE_ALL(OperationType.COMMAND),
+    MULTI_REMOVE_ALL(OperationType.COMMAND),
     REPLACE(OperationType.COMMAND),
     CLEAR(OperationType.COMMAND),
     ADD_LISTENER(OperationType.COMMAND),
@@ -83,6 +85,8 @@
             .register(ContainsValue.class)
             .register(Get.class)
             .register(MultiRemove.class)
+            .register(MultiRemoveAll.class)
+            .register(MultiPutAll.class)
             .register(Put.class)
             .register(RemoveAll.class)
             .register(Replace.class)
@@ -261,13 +265,13 @@
     @SuppressWarnings("serial")
     public static class MultiRemove extends MultimapOperation {
         private String key;
-        private Collection<byte[]> values;
+        private Collection<? extends byte[]> values;
         private Match<Long> versionMatch;
 
         public MultiRemove() {
         }
 
-        public MultiRemove(String key, Collection<byte[]> valueMatches,
+        public MultiRemove(String key, Collection<? extends byte[]> valueMatches,
                            Match<Long> versionMatch) {
             this.key = checkNotNull(key);
             this.values = valueMatches;
@@ -278,7 +282,7 @@
             return this.key;
         }
 
-        public Collection<byte[]> values() {
+        public Collection<? extends byte[]> values() {
             return values;
         }
 
@@ -297,10 +301,43 @@
     }
 
     /**
+     * Command to back the multi-key removeAll method.
+     */
+    @SuppressWarnings("serial")
+    public static class MultiRemoveAll extends MultimapOperation {
+        private Map<String, Collection<? extends byte[]>> mapping;
+        private Match<Long> versionMatch;
+
+        public MultiRemoveAll() {
+        }
+
+        public MultiRemoveAll(Map<String, Collection<? extends byte[]>> mapping, Match<Long> versionMatch) {
+            this.mapping = checkNotNull(mapping);
+            this.versionMatch = versionMatch;
+        }
+
+        public Map<String, Collection<? extends byte[]>> mapping() {
+            return mapping;
+        }
+
+        public Match<Long> versionMatch() {
+            return versionMatch;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("mapping", mapping)
+                    .add("versionMatch", versionMatch)
+                    .toString();
+        }
+    }
+
+    /**
      * Command to back the put and putAll methods.
      */
     @SuppressWarnings("serial")
-    public static class  Put extends MultimapOperation {
+    public static class Put extends MultimapOperation {
         private String key;
         private Collection<? extends byte[]> values;
         private Match<Long> versionMatch;
@@ -337,6 +374,39 @@
     }
 
     /**
+     * Command to back the multi-key putAll method.
+     */
+    @SuppressWarnings("serial")
+    public static class MultiPutAll extends MultimapOperation {
+        private Map<String, Collection<? extends byte[]>> mapping;
+        private Match<Long> versionMatch;
+
+        public MultiPutAll() {
+        }
+
+        public MultiPutAll(Map<String, Collection<? extends byte[]>> mapping, Match<Long> versionMatch) {
+            this.mapping = checkNotNull(mapping);
+            this.versionMatch = versionMatch;
+        }
+
+        public Map<String, Collection<? extends byte[]>> mapping() {
+            return mapping;
+        }
+
+        public Match<Long> versionMatch() {
+            return versionMatch;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("mapping", mapping)
+                    .add("versionMatch", versionMatch)
+                    .toString();
+        }
+    }
+
+    /**
      * Replace command, returns the collection that was replaced.
      */
     @SuppressWarnings("serial")
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index fd2e676..2d18c35 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -49,6 +49,7 @@
 import io.atomix.protocols.raft.service.AbstractRaftService;
 import io.atomix.protocols.raft.service.Commit;
 import io.atomix.protocols.raft.service.RaftServiceExecutor;
+import io.atomix.protocols.raft.service.impl.DefaultCommit;
 import io.atomix.protocols.raft.session.RaftSession;
 import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
 import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
@@ -93,6 +94,10 @@
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Replace;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.SIZE;
 import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.VALUES;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MULTI_PUT_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiPutAll;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MULTI_REMOVE_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemoveAll;
 
 /**
  * State Machine for {@link AtomixConsistentSetMultimap} resource.
@@ -169,9 +174,11 @@
         executor.register(VALUES, this::values, serializer::encode);
         executor.register(ENTRIES, this::entries, serializer::encode);
         executor.register(GET, serializer::decode, this::get, serializer::encode);
+        executor.register(MULTI_REMOVE_ALL, serializer::decode, this::multiRemoveAll, serializer::encode);
         executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
         executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
         executor.register(REMOVE_AND_GET, serializer::decode, this::removeAndGet, serializer::encode);
+        executor.register(MULTI_PUT_ALL, serializer::decode, this::multiPutAll, serializer::encode);
         executor.register(PUT, serializer::decode, this::put, serializer::encode);
         executor.register(PUT_AND_GET, serializer::decode, this::putAndGet, serializer::encode);
         executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
@@ -395,6 +402,32 @@
     }
 
     /**
+     * Handles a MultiRemoveAll commit, returns true if any change results from this
+     * commit.
+     * @param commit a MultiRemoveAll commit
+     * @return true if this commit results in a change, else false
+     */
+    protected boolean multiRemoveAll(Commit<? extends MultiRemoveAll> commit) {
+        Map<String, Collection<? extends byte[]>> mapping = commit.value().mapping();
+        // There are no updates
+        if (mapping.isEmpty()) {
+            return false;
+        }
+        // Decompose the commit in several updates
+        boolean operationResult = false;
+        for (Map.Entry<String, Collection<? extends byte[]>> entry : mapping.entrySet()) {
+            MultiRemove update = new MultiRemove(entry.getKey(), entry.getValue(), commit.value().versionMatch());
+            Commit<? extends MultiRemove> commitUpdate = new DefaultCommit<>(commit.index(), commit.operation(),
+                                                                     update, commit.session(),
+                                                                     commit.wallClockTime().unixTimestamp());
+            if (multiRemove(commitUpdate)) {
+                operationResult = true;
+            }
+        }
+        return operationResult;
+    }
+
+    /**
      * Handles a removeAndGet commit.
      *
      * @param commit multiRemove commit
@@ -456,6 +489,32 @@
     }
 
     /**
+     * Handles a MultiPutAll commit, returns true if any change results from this
+     * commit.
+     * @param commit a MultiPutAll commit
+     * @return true if this commit results in a change, else false
+     */
+    protected boolean multiPutAll(Commit<? extends MultiPutAll> commit) {
+        Map<String, Collection<? extends byte[]>> mapping = commit.value().mapping();
+        // There are no updates
+        if (mapping.isEmpty()) {
+            return false;
+        }
+        // Decompose the commit in several updates
+        boolean operationResult = false;
+        for (Map.Entry<String, Collection<? extends byte[]>> entry : mapping.entrySet()) {
+            Put update = new Put(entry.getKey(), entry.getValue(), commit.value().versionMatch());
+            Commit<? extends Put> commitUpdate = new DefaultCommit<>(commit.index(), commit.operation(),
+                                                                     update, commit.session(),
+                                                                     commit.wallClockTime().unixTimestamp());
+            if (put(commitUpdate)) {
+                operationResult = true;
+            }
+        }
+        return operationResult;
+    }
+
+    /**
      * Handles a putAndGet commit.
      *
      * @param commit a put commit
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/AsyncConsistentMultimapMock.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/AsyncConsistentMultimapMock.java
new file mode 100644
index 0000000..c451da9
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/AsyncConsistentMultimapMock.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.onosproject.store.service.AsyncConsistentMultimapAdapter;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public class AsyncConsistentMultimapMock<K, V> extends AsyncConsistentMultimapAdapter<K, V> {
+    private final Multimap<K, V> baseMap = HashMultimap.create();
+    private static final int DEFAULT_CREATION_TIME = 0;
+    private static final int DEFAULT_VERSION = 0;
+
+    AsyncConsistentMultimapMock() {
+    }
+
+    Versioned<Collection<? extends V>> makeVersioned(Collection<? extends V> v) {
+        return new Versioned<>(v, DEFAULT_VERSION, DEFAULT_CREATION_TIME);
+    }
+
+    @Override
+    public CompletableFuture<Integer> size() {
+        return CompletableFuture.completedFuture(baseMap.size());
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isEmpty() {
+        return CompletableFuture.completedFuture(baseMap.isEmpty());
+    }
+
+    @Override
+    public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
+        CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+        for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+            if (baseMap.putAll(entry.getKey(), entry.getValue())) {
+                result = CompletableFuture.completedFuture(true);
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
+        return CompletableFuture.completedFuture(makeVersioned(baseMap.get(key)));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
+        CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+        for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+            for (V value : entry.getValue()) {
+                if (baseMap.remove(entry.getKey(), value)) {
+                    result = CompletableFuture.completedFuture(true);
+                }
+            }
+        }
+        return result;
+    }
+}
+
+
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimapTest.java
new file mode 100644
index 0000000..0ee4812
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimapTest.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.Hashing;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+public class PartitionedAsyncConsistentMultimapTest {
+
+    private static PartitionedAsyncConsistentMultimap<String, String> partitionedAsyncConsistentMap;
+    private static Map<PartitionId, AsyncConsistentMultimap<String, String>> partitions;
+    private static List<PartitionId> sortedMemberPartitionIds;
+    private static Serializer serializer;
+
+    private static final String PARTITION_NAME = "PartitionManager";
+
+    private static final String KEY1 = "AAA";
+    private static final String VALUE1 = "one";
+    private static final String KEY2 = "BBB";
+    private static final String VALUE2 = "two";
+    private static final String KEY3 = "CCC";
+    private static final String VALUE3 = "three";
+    private static final String KEY4  = "DDD";
+    private static final String VALUE4 = "four";
+
+    private final List<String> allKeys = Lists.newArrayList(KEY1, KEY2,
+                                                            KEY3, KEY4);
+    private final List<String> allValues = Lists.newArrayList(VALUE1, VALUE2,
+                                                                VALUE3, VALUE4);
+
+    @Before
+    public void setUp() throws Exception {
+        // Init maps and partitions
+        AsyncConsistentMultimap<String, String> asyncMap1 = new AsyncConsistentMultimapMock<>();
+        AsyncConsistentMultimap<String, String> asyncMap2 = new AsyncConsistentMultimapMock<>();
+        AsyncConsistentMultimap<String, String> asyncMap3 = new AsyncConsistentMultimapMock<>();
+        AsyncConsistentMultimap<String, String> asyncMap4 = new AsyncConsistentMultimapMock<>();
+        PartitionId pid1 = PartitionId.from(1);
+        PartitionId pid2 = PartitionId.from(2);
+        PartitionId pid3 = PartitionId.from(3);
+        PartitionId pid4 = PartitionId.from(4);
+        partitions = new HashMap<>();
+        serializer = Serializer.using(KryoNamespaces.BASIC);
+        partitions.put(pid1, asyncMap1);
+        partitions.put(pid2, asyncMap2);
+        partitions.put(pid3, asyncMap3);
+        partitions.put(pid4, asyncMap4);
+        sortedMemberPartitionIds = Lists.newArrayList(partitions.keySet());
+        Hasher<String> hasher = key -> {
+            int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt();
+            return sortedMemberPartitionIds.get(Math.abs(hashCode) % partitions.size());
+        };
+        partitionedAsyncConsistentMap = new PartitionedAsyncConsistentMultimap<>(PARTITION_NAME,
+                                                                                 partitions,
+                                                                                 hasher);
+    }
+
+    @Test
+    public void testPutAllRemoveAll() {
+        // Init phase
+        assertThat(partitionedAsyncConsistentMap.isEmpty().join(), is(true));
+        assertThat(partitionedAsyncConsistentMap.size().join(), is(0));
+        assertThat(partitionedAsyncConsistentMap.name(), is("PartitionManager"));
+        // Test multi put
+        Map<String, Collection<? extends String>> mapping = Maps.newHashMap();
+        // First build the mappings having each key a different mapping
+        allKeys.forEach(key -> {
+            switch (key) {
+                case KEY1:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 1)));
+                    break;
+                case KEY2:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 2)));
+                    break;
+                case KEY3:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 3)));
+                    break;
+                default:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 4)));
+                    break;
+            }
+        });
+        // Success
+        assertThat(partitionedAsyncConsistentMap.putAll(mapping).join(), is(true));
+        // Failure
+        assertThat(partitionedAsyncConsistentMap.putAll(mapping).join(), is(false));
+        // Verify operation
+        assertThat(partitionedAsyncConsistentMap.size().join(), is(10));
+        assertThat(partitionedAsyncConsistentMap.isEmpty().join(), is(false));
+        // verify mapping is ok
+        allKeys.forEach(key -> {
+            switch (key) {
+                case KEY1:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(partitionedAsyncConsistentMap.get(key).join())),
+                               containsInAnyOrder(allValues.subList(0, 1).toArray()));
+                    break;
+                case KEY2:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(partitionedAsyncConsistentMap.get(key).join())),
+                               containsInAnyOrder(allValues.subList(0, 2).toArray()));
+                    break;
+                case KEY3:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(partitionedAsyncConsistentMap.get(key).join())),
+                               containsInAnyOrder(allValues.subList(0, 3).toArray()));
+                    break;
+                default:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(partitionedAsyncConsistentMap.get(key).join())),
+                               containsInAnyOrder(allValues.subList(0, 4).toArray()));
+                    break;
+            }
+        });
+        // Success
+        assertThat(partitionedAsyncConsistentMap.removeAll(mapping).join(), is(true));
+        // Failure
+        assertThat(partitionedAsyncConsistentMap.removeAll(mapping).join(), is(false));
+        // Verify operation
+        assertThat(partitionedAsyncConsistentMap.size().join(), is(0));
+        assertThat(partitionedAsyncConsistentMap.isEmpty().join(), is(true));
+    }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimapTest.java
new file mode 100644
index 0000000..9f631db
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimapTest.java
@@ -0,0 +1,222 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.net.DeviceId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncConsistentMultimapAdapter;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+
+public class TranscodingAsyncConsistentMultimapTest {
+
+    private static Serializer serializer;
+    private static AsyncConsistentMultimap<String, DeviceId> transcodingMap;
+    private static AsyncConsistentMultimap<String, byte[]> baseMap;
+
+    private static final String KEY1 = "Key1";
+    private static final String KEY2 = "Key2";
+    private static final String KEY3 = "Key3";
+    private static final String KEY4 = "Key4";
+    private static final DeviceId DEV1 = DeviceId.deviceId("Device1");
+    private static final DeviceId DEV2 = DeviceId.deviceId("Device2");
+    private static final DeviceId DEV3 = DeviceId.deviceId("foo");
+    private static final DeviceId DEV4 = DeviceId.deviceId("bar");
+    private final List<String> allKeys = Lists.newArrayList(KEY1, KEY2,
+                                                            KEY3, KEY4);
+    private final List<DeviceId> allValues = Lists.newArrayList(DEV1, DEV2,
+                                                              DEV3, DEV4);
+
+    @Before
+    public void setUp() throws Exception {
+        // Init base map and serializer
+        serializer = Serializer.using(KryoNamespaces.API);
+        baseMap = new AsyncConsistentMultimapMock();
+        // Create the transcoding map
+        transcodingMap =  DistributedPrimitives.newTranscodingMultimap(
+                baseMap,
+                Function.identity(),
+                Function.identity(),
+                serializer::encode,
+                serializer::decode);
+    }
+
+    @Test
+    public void testPutAllRemoveAll() throws Exception {
+        // Init phase
+        assertThat(transcodingMap.size().join(), is(0));
+        assertThat(transcodingMap.isEmpty().join(), is(true));
+        // Test multi put
+        Map<String, Collection<? extends DeviceId>> mapping = Maps.newHashMap();
+        // First build the mappings having each key a different mapping
+        allKeys.forEach(key -> {
+            switch (key) {
+                case KEY1:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 1)));
+                    break;
+                case KEY2:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 2)));
+                    break;
+                case KEY3:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 3)));
+                    break;
+                default:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 4)));
+                    break;
+            }
+        });
+        // Success
+        assertThat(transcodingMap.putAll(mapping).join(), is(true));
+        // Failure
+        assertThat(transcodingMap.putAll(mapping).join(), is(false));
+        // Verify operation
+        assertThat(transcodingMap.size().join(), is(10));
+        assertThat(transcodingMap.isEmpty().join(), is(false));
+        // verify mapping is ok
+        allKeys.forEach(key -> {
+            switch (key) {
+                case KEY1:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(transcodingMap.get(key).join())),
+                               containsInAnyOrder(allValues.subList(0, 1).toArray()));
+                    break;
+                case KEY2:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(transcodingMap.get(key).join())),
+                               containsInAnyOrder(allValues.subList(0, 2).toArray()));
+                    break;
+                case KEY3:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(transcodingMap.get(key).join())),
+                               containsInAnyOrder(allValues.subList(0, 3).toArray()));
+                    break;
+                default:
+                    assertThat(Lists.newArrayList(Versioned.valueOrNull(transcodingMap.get(key).join())),
+                               containsInAnyOrder(allValues.subList(0, 4).toArray()));
+                    break;
+            }
+        });
+        // Success
+        assertThat(transcodingMap.removeAll(mapping).join(), is(true));
+        // Failure
+        assertThat(transcodingMap.removeAll(mapping).join(), is(false));
+        // Verify operation
+        assertThat(transcodingMap.size().join(), is(0));
+        assertThat(transcodingMap.isEmpty().join(), is(true));
+    }
+
+    // It uses a special internal map for bytes comparison - otherwise the equality cannot be verified
+    public static class AsyncConsistentMultimapMock extends AsyncConsistentMultimapAdapter<String, byte[]> {
+        private final Map<String, Set<byte[]>> baseMap = new HashMap<>();
+        private static final int DEFAULT_CREATION_TIME = 0;
+        private static final int DEFAULT_VERSION = 0;
+
+        AsyncConsistentMultimapMock() { }
+
+        Versioned<Collection<? extends byte[]>> makeVersioned(Collection<? extends byte[]> v) {
+            return new Versioned<>(v, DEFAULT_VERSION, DEFAULT_CREATION_TIME);
+        }
+
+        @Override
+        public CompletableFuture<Integer> size() {
+            return CompletableFuture.completedFuture(baseMap.values().stream()
+                                                             .map(Set::size)
+                                                             .mapToInt(size -> size).sum());
+        }
+
+        @Override
+        public CompletableFuture<Boolean> isEmpty() {
+            return CompletableFuture.completedFuture(baseMap.isEmpty());
+        }
+
+        @Override
+        public CompletableFuture<Boolean> putAll(Map<String, Collection<? extends byte[]>> mapping) {
+            CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+            for (Map.Entry<String, Collection<? extends byte[]>> entry : mapping.entrySet()) {
+                Set<byte[]> values = baseMap.computeIfAbsent(
+                        entry.getKey(), k -> Sets.newTreeSet(new ByteArrayComparator()));
+                for (byte[] value : entry.getValue()) {
+                    if (values.add(value)) {
+                        result = CompletableFuture.completedFuture(true);
+                    }
+                }
+            }
+            return result;
+        }
+
+        @Override
+        public CompletableFuture<Versioned<Collection<? extends byte[]>>> get(String key) {
+            return CompletableFuture.completedFuture(makeVersioned(baseMap.get(key)));
+        }
+
+        @Override
+        public CompletableFuture<Boolean> removeAll(Map<String, Collection<? extends byte[]>> mapping) {
+            CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+            for (Map.Entry<String, Collection<? extends byte[]>> entry : mapping.entrySet()) {
+                Set<byte[]> values = baseMap.get(entry.getKey());
+                if (values == null) {
+                    return CompletableFuture.completedFuture(false);
+                }
+                for (byte[] value : entry.getValue()) {
+                    if (values.remove(value)) {
+                        result = CompletableFuture.completedFuture(true);
+                    }
+                }
+                if (values.isEmpty()) {
+                    baseMap.remove(entry.getKey());
+                }
+            }
+            return result;
+        }
+
+        private static class ByteArrayComparator implements Comparator<byte[]> {
+            @Override
+            public int compare(byte[] o1, byte[] o2) {
+                if (Arrays.equals(o1, o2)) {
+                    return 0;
+                } else {
+                    for (int i = 0; i < o1.length && i < o2.length; i++) {
+                        if (o1[i] < o2[i]) {
+                            return -1;
+                        } else if (o1[i] > o2[i]) {
+                            return 1;
+                        }
+                    }
+                    return o1.length > o2.length ? 1 : -1;
+                }
+            }
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
index cabf215..6b7e710 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
@@ -28,11 +28,13 @@
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import com.google.common.collect.Multiset;
 import com.google.common.collect.TreeMultiset;
 import io.atomix.protocols.raft.proxy.RaftProxy;
 import io.atomix.protocols.raft.service.RaftService;
 import org.apache.commons.collections.keyvalue.DefaultMapEntry;
+import org.junit.Assert;
 import org.junit.Test;
 import org.onlab.util.Tools;
 import org.onosproject.store.service.AsyncIterator;
@@ -336,6 +338,83 @@
                     .join();
         });
 
+        // Done let's destroy the map
+        map.destroy().join();
+    }
+
+    /**
+     * Contains tests for put, putAll, remove, removeAll and replace.
+     * @throws Exception
+     */
+    @Test
+    public void multiPutAllAndMultiRemoveAllTest() throws Exception {
+        // Init phase
+        AtomixConsistentSetMultimap map = createResource("testMultiOp");
+        allKeys.forEach(key -> {
+            allValues.forEach(value -> {
+                map.put(key, value).join();
+                map.put(key, value).join();
+            });
+        });
+
+        // Test multi put
+        Map<String, Collection<? extends byte[]>> mapping = Maps.newHashMap();
+        // First build the mappings having each key a different mapping
+        allKeys.forEach(key -> {
+            switch (key) {
+                case keyOne:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 1)));
+                    break;
+                case keyTwo:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 2)));
+                    break;
+                case keyThree:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 3)));
+                    break;
+                default:
+                    mapping.put(key, Lists.newArrayList(allValues.subList(0, 4)));
+                    break;
+            }
+        });
+        map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
+        // Keys are already present operation has to fail
+        map.putAll(mapping).thenAccept(Assert::assertFalse).join();
+        // clean up the map
+        allKeys.forEach(key -> {
+            map.removeAll(key).join();
+            map.removeAll(key).join();
+        });
+        // verify map is empty
+        map.size().thenAccept(result -> assertEquals(0, (int) result)).join();
+        // put all again but now operation is successful
+        map.putAll(mapping).thenAccept(Assert::assertTrue).join();
+        // verify mapping is ok
+        allKeys.forEach(key -> map.get(key).thenAccept(result -> {
+            switch (key) {
+                case keyOne:
+                    assertTrue(byteArrayCollectionIsEqual(allValues.subList(0, 1), result.value()));
+                    break;
+                case keyTwo:
+                    assertTrue(byteArrayCollectionIsEqual(allValues.subList(0, 2), result.value()));
+                    break;
+                case keyThree:
+                    assertTrue(byteArrayCollectionIsEqual(allValues.subList(0, 3), result.value()));
+                    break;
+                default:
+                    assertTrue(byteArrayCollectionIsEqual(allValues.subList(0, 4), result.value()));
+                    break;
+            }
+        }).join());
+        // We have added keyOne -> {valueOne}, keyTwo -> {valueOne, valueTwo} and so on
+        map.size().thenAccept(result -> assertEquals(10, (int) result)).join();
+        // removeAll but now operation is successful
+        map.removeAll(mapping).thenAccept(Assert::assertTrue).join();
+        // removeAll but now operation is failure
+        map.removeAll(mapping).thenAccept(Assert::assertFalse).join();
+        // No more elements
+        map.size().thenAccept(result -> assertEquals(0, (int) result)).join();
+
+        // Done let's destroy the map
         map.destroy().join();
     }