Add bulk update to ConsistentMultimap
Change-Id: I61e9f0c2ed5ab368777c64b6fb4aa2c8dd31d081
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
index c14b187..51f0934 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
@@ -111,11 +111,21 @@
}
@Override
+ public boolean removeAll(Map<K, Collection<? extends V>> mapping) {
+ return complete(asyncMultimap.removeAll(mapping));
+ }
+
+ @Override
public boolean putAll(K key, Collection<? extends V> values) {
return complete(asyncMultimap.putAll(key, values));
}
@Override
+ public boolean putAll(Map<K, Collection<? extends V>> mapping) {
+ return complete(asyncMultimap.putAll(mapping));
+ }
+
+ @Override
public Versioned<Collection<? extends V>> replaceValues(
K key, Collection<V> values) {
return complete(asyncMultimap.replaceValues(key, values));
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
index 94943b8..b86845e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
@@ -166,6 +166,19 @@
CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key);
/**
+ * Removes the set of key-value pairs with the specified key and values if they
+ * exist. In implementations that allow duplicates each instance of a key
+ * will remove one matching entry, which one is not defined. Equivalent to
+ * repeated calls to {@code remove()} for each key value pair but more
+ * efficient.
+ *
+ * @param mapping the keys-values to be removed
+ * @return a future whose value will be true if the map changes because of
+ * this call, false otherwise.
+ */
+ CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping);
+
+ /**
* Adds the set of key-value pairs of the specified key with each of the
* values in the iterable if each key-value pair does not already exist,
* if the pair does exist the behavior is implementation specific.
@@ -180,6 +193,18 @@
Collection<? extends V> values);
/**
+ * Adds the set of key-value pairs of the specified mapping with each of
+ * the values in the iterable if each key-value pair does not already exist,
+ * if the pair does exist the behavior is implementation specific.
+ * (Same as repeated puts but with efficiency gains.)
+ *
+ * @param mapping the keys-values to be added
+ * @return a future whose value will be true if any change in the map
+ * results from this call, false otherwise
+ */
+ CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping);
+
+ /**
* Stores all the values in values associated with the key specified,
* removes all preexisting values and returns a collection of the removed
* values which may be empty if the entry did not exist.
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
index 99045a8..8aabca7 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
@@ -149,6 +149,18 @@
Versioned<Collection<? extends V>> removeAll(K key);
/**
+ * Removes the set of key-value pairs with the specified key and values if they
+ * exist. In implementations that allow duplicates each instance of a key
+ * will remove one matching entry, which one is not defined. Equivalent to
+ * repeated calls to {@code remove()} for each key value pair but more
+ * efficient.
+ *
+ * @param mapping the keys-values to be removed
+ * @return true if the map changes because of this call, false otherwise.
+ */
+ boolean removeAll(Map<K, Collection<? extends V>> mapping);
+
+ /**
* Adds the set of key-value pairs of the specified key with each of the
* values in the iterable if each key-value pair does not already exist,
* if the pair does exist the behavior is implementation specific.
@@ -162,6 +174,18 @@
boolean putAll(K key, Collection<? extends V> values);
/**
+ * Adds the set of key-value pairs of the specified mapping with each of
+ * the values in the iterable if each key-value pair does not already exist,
+ * if the pair does exist the behavior is implementation specific.
+ * (Same as repeated puts but with efficiency gains.)
+ *
+ * @param mapping the keys-values to be added
+ * @return true if any change in the map results from this call,
+ * false otherwise
+ */
+ boolean putAll(Map<K, Collection<? extends V>> mapping);
+
+ /**
* Stores all the values in values associated with the key specified,
* removes all preexisting values and returns a collection of the removed
* values which may be empty if the entry did not exist.
diff --git a/core/api/src/test/java/org/onosproject/store/primitives/DefaultConsistentMultimapTest.java b/core/api/src/test/java/org/onosproject/store/primitives/DefaultConsistentMultimapTest.java
new file mode 100644
index 0000000..bc4db71
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/primitives/DefaultConsistentMultimapTest.java
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import org.junit.Test;
+import org.onosproject.store.service.AsyncConsistentMultimapAdapter;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+
+/**
+ * Tests for DefaultConsistentMultiMap.
+ */
+public class DefaultConsistentMultimapTest {
+
+ private static final String KEY1 = "AAA";
+ private static final String VALUE1 = "111";
+ private static final String KEY2 = "BBB";
+ private static final String VALUE2 = "222";
+ private static final String KEY3 = "CCC";
+ private static final String VALUE3 = "333";
+ private static final String KEY4 = "DDD";
+ private static final String VALUE4 = "444";
+ private final List<String> allKeys = Lists.newArrayList(KEY1, KEY2,
+ KEY3, KEY4);
+ private final List<String> allValues = Lists.newArrayList(VALUE1, VALUE2,
+ VALUE3, VALUE4);
+
+ /**
+ * Tests the behavior of public APIs of the default consistent multi-map
+ * implementation.
+ */
+ @Test
+ public void testBehavior() {
+ // Initialize the map
+ Multimap<String, String> baseMap = HashMultimap.create();
+ AsyncConsistentMultimapMock<String, String> asyncMultiMap = new AsyncConsistentMultimapMock<>(baseMap);
+ ConsistentMultimap<String, String> newMap = new DefaultConsistentMultimap<>(asyncMultiMap, 69);
+
+ // Verify is empty
+ assertThat(newMap.size(), is(0));
+ assertThat(newMap.isEmpty(), is(true));
+
+ // Test multi put
+ Map<String, Collection<? extends String>> mapping = Maps.newHashMap();
+ // First build the mappings having each key a different mapping
+ allKeys.forEach(key -> {
+ switch (key) {
+ case KEY1:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 1)));
+ break;
+ case KEY2:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 2)));
+ break;
+ case KEY3:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 3)));
+ break;
+ default:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 4)));
+ break;
+ }
+ });
+ // Success
+ assertThat(newMap.putAll(mapping), is(true));
+ // Failure
+ assertThat(newMap.putAll(mapping), is(false));
+ // Verify operation
+ assertThat(newMap.size(), is(10));
+ assertThat(newMap.isEmpty(), is(false));
+ // verify mapping is ok
+ allKeys.forEach(key -> {
+ switch (key) {
+ case KEY1:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(newMap.get(key))),
+ containsInAnyOrder(allValues.subList(0, 1).toArray()));
+ break;
+ case KEY2:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(newMap.get(key))),
+ containsInAnyOrder(allValues.subList(0, 2).toArray()));
+ break;
+ case KEY3:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(newMap.get(key))),
+ containsInAnyOrder(allValues.subList(0, 3).toArray()));
+ break;
+ default:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(newMap.get(key))),
+ containsInAnyOrder(allValues.subList(0, 4).toArray()));
+ break;
+ }
+ });
+ // Success
+ assertThat(newMap.removeAll(mapping), is(true));
+ // Failure
+ assertThat(newMap.removeAll(mapping), is(false));
+ // Verify operation
+ assertThat(newMap.size(), is(0));
+ assertThat(newMap.isEmpty(), is(true));
+ }
+
+ public static class AsyncConsistentMultimapMock<K, V> extends AsyncConsistentMultimapAdapter<K, V> {
+ private final Multimap<K, V> baseMap;
+ private static final int DEFAULT_CREATION_TIME = 0;
+ private static final int DEFAULT_VERSION = 0;
+
+ AsyncConsistentMultimapMock(Multimap<K, V> newBaseMap) {
+ baseMap = newBaseMap;
+ }
+
+ Versioned<Collection<? extends V>> makeVersioned(Collection<? extends V> v) {
+ return new Versioned<>(v, DEFAULT_VERSION, DEFAULT_CREATION_TIME);
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return CompletableFuture.completedFuture(baseMap.size());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ return CompletableFuture.completedFuture(baseMap.isEmpty());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
+ CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+ for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+ if (baseMap.putAll(entry.getKey(), entry.getValue())) {
+ result = CompletableFuture.completedFuture(true);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
+ return CompletableFuture.completedFuture(makeVersioned(baseMap.get(key)));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
+ CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+ for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+ for (V value : entry.getValue()) {
+ if (baseMap.remove(entry.getKey(), value)) {
+ result = CompletableFuture.completedFuture(true);
+ }
+ }
+ }
+ return result;
+ }
+ }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMultimapAdapter.java b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMultimapAdapter.java
new file mode 100644
index 0000000..4bb2618
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/AsyncConsistentMultimapAdapter.java
@@ -0,0 +1,205 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.service;
+
+import com.google.common.collect.Multiset;
+import org.onosproject.core.ApplicationId;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.Consumer;
+
+public class AsyncConsistentMultimapAdapter<K, V> implements AsyncConsistentMultimap<K, V> {
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public Type primitiveType() {
+ return null;
+ }
+
+ @Override
+ public ApplicationId applicationId() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> destroy() {
+ return null;
+ }
+
+ @Override
+ public void addStatusChangeListener(Consumer<Status> listener) {
+
+ }
+
+ @Override
+ public void removeStatusChangeListener(Consumer<Status> listener) {
+
+ }
+
+ @Override
+ public Collection<Consumer<Status>> statusChangeListeners() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsKey(K key) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsValue(V value) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsEntry(K key, V value) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> put(K key, V value) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(K key, V value) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> removeAll(K key) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> clear() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Set<K>> keySet() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Multiset<K>> keys() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Multiset<V>> values() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Collection<Map.Entry<K, V>>> entries() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Map<K, Collection<V>>> asMap() {
+ return null;
+ }
+
+ @Override
+ public ConsistentMultimap<K, V> asMultimap() {
+ return null;
+ }
+
+ @Override
+ public ConsistentMultimap<K, V> asMultimap(long timeoutMillis) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<AsyncIterator<Map.Entry<K, V>>> iterator() {
+ return null;
+ }
+}
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
index 08be2e4..f70d186 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
@@ -24,6 +24,7 @@
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
/**
* Implementation to test ConsistentMultimap. Very limited.
@@ -103,11 +104,42 @@
}
@Override
+ public boolean removeAll(Map<K, Collection<? extends V>> mapping) {
+ // Semantic is that any change in the map should return true
+ boolean result = false;
+ for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+ Collection<? extends Versioned<V>> versionedValues = entry.getValue().stream()
+ .map(this::version)
+ .collect(Collectors.toList());
+ for (Versioned<V> value : versionedValues) {
+ if (innermap.remove(entry.getKey(), value)) {
+ result = true;
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
public boolean putAll(K key, Collection<? extends V> values) {
return false;
}
@Override
+ public boolean putAll(Map<K, Collection<? extends V>> mapping) {
+ // Semantic is that any change in the map should return true
+ boolean result = false;
+ for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+ Collection<? extends Versioned<V>> versionedValues = entry.getValue().stream()
+ .map(this::version).collect(Collectors.toList());
+ if (innermap.putAll(entry.getKey(), versionedValues)) {
+ result = true;
+ }
+ }
+ return result;
+ }
+
+ @Override
public Versioned<Collection<? extends V>> replaceValues(K key, Collection<V> values) {
return null;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
index 3d49f7f..09640f2 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
@@ -151,12 +151,24 @@
}
@Override
+ public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
+ return super.removeAll(mapping)
+ .whenComplete((r, e) -> mapping.keySet().forEach(cache::invalidate));
+ }
+
+ @Override
public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
return super.putAll(key, values)
.whenComplete((r, e) -> cache.invalidate(key));
}
@Override
+ public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
+ return super.putAll(mapping)
+ .whenComplete((r, e) -> mapping.keySet().forEach(cache::invalidate));
+ }
+
+ @Override
public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
return super.replaceValues(key, values)
.whenComplete((r, e) -> cache.invalidate(key));
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index 779bb7a..6cfdaac 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -105,12 +105,24 @@
}
@Override
+ public CompletableFuture<Boolean> removeAll(
+ Map<K, Collection<? extends V>> mapping) {
+ return delegateMap.removeAll(mapping);
+ }
+
+ @Override
public CompletableFuture<Boolean> putAll(
K key, Collection<? extends V> values) {
return delegateMap.putAll(key, values);
}
@Override
+ public CompletableFuture<Boolean> putAll(
+ Map<K, Collection<? extends V>> mapping) {
+ return delegateMap.putAll(mapping);
+ }
+
+ @Override
public CompletableFuture<Versioned<Collection<? extends V>>>
replaceValues(K key, Collection<V> values) {
return delegateMap.replaceValues(key, values);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
index a4e0bf9..0cd3fe5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
@@ -126,11 +126,31 @@
}
@Override
+ public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
+ Map<PartitionId, Map<K, Collection<? extends V>>> subMappings = buildSubMappings(mapping);
+ // Semantic is that any change in the partitions should return true
+ return Tools.allOf(subMappings.entrySet().stream()
+ .map(entry -> partitions.get(entry.getKey()).removeAll(entry.getValue()))
+ .collect(Collectors.toList()),
+ Boolean::logicalOr, false);
+ }
+
+ @Override
public CompletableFuture<Boolean> putAll(K key, Collection<? extends V> values) {
return getMultimap(key).putAll(key, values);
}
@Override
+ public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
+ Map<PartitionId, Map<K, Collection<? extends V>>> subMappings = buildSubMappings(mapping);
+ // Semantic is that any change in the partitions should return true
+ return Tools.allOf(subMappings.entrySet().stream()
+ .map(entry -> partitions.get(entry.getKey()).putAll(entry.getValue()))
+ .collect(Collectors.toList()),
+ Boolean::logicalOr, false);
+ }
+
+ @Override
public CompletableFuture<Versioned<Collection<? extends V>>> replaceValues(K key, Collection<V> values) {
return getMultimap(key).replaceValues(key, values);
}
@@ -237,6 +257,26 @@
return partitions.values();
}
+ /**
+ * Build sub-mappings for each partition.
+ *
+ * @param mapping initial mapping key-value
+ * @return sub-mappings partition-values
+ */
+ private Map<PartitionId, Map<K, Collection<? extends V>>> buildSubMappings(
+ Map<K, Collection<? extends V>> mapping) {
+ Map<PartitionId, Map<K, Collection<? extends V>>> subMappings = Maps.newHashMap();
+ // Build first a mapping with the partitions
+ mapping.forEach((key, values) -> subMappings.compute(keyHasher.hash(key), (k, v) -> {
+ if (v == null) {
+ v = Maps.newHashMap();
+ }
+ v.put(key, values);
+ return v;
+ }));
+ return subMappings;
+ }
+
private class PartitionedMultimapIterator<K, V> implements AsyncIterator<Map.Entry<K, V>> {
private final Iterator<AsyncIterator<Entry<K, V>>> iterators;
private volatile AsyncIterator<Entry<K, V>> iterator;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index 6fab8c8..cb7fc02 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -196,6 +196,21 @@
@Override
public CompletableFuture<Boolean>
+ removeAll(Map<K1, Collection<? extends V1>> mapping) {
+ try {
+ // Transform the mapping to the new output
+ Map<K2, Collection<? extends V2>> transformedMapping = Maps.newHashMap();
+ mapping.forEach((key, value) -> transformedMapping.put(keyEncoder.apply(key),
+ valueCollectionEncode.apply(value)));
+ // Then apply the operation
+ return backingMap.removeAll(transformedMapping);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean>
putAll(K1 key, Collection<? extends V1> values) {
try {
return backingMap.putAll(keyEncoder.apply(key),
@@ -206,6 +221,21 @@
}
@Override
+ public CompletableFuture<Boolean>
+ putAll(Map<K1, Collection<? extends V1>> mapping) {
+ try {
+ // Transform the mapping to the new output
+ Map<K2, Collection<? extends V2>> transformedMapping = Maps.newHashMap();
+ mapping.forEach((key, value) -> transformedMapping.put(keyEncoder.apply(key),
+ valueCollectionEncode.apply(value)));
+ // Then apply the operation
+ return backingMap.putAll(transformedMapping);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
public CompletableFuture<Versioned<Collection<? extends V1>>>
replaceValues(K1 key, Collection<V1> values) {
try {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index b04c9dd..f4c732f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -75,6 +75,10 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Replace;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.SIZE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.VALUES;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MULTI_PUT_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiPutAll;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MULTI_REMOVE_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemoveAll;
/**
@@ -171,7 +175,7 @@
return proxy.invoke(
REMOVE,
SERIALIZER::encode,
- new MultiRemove(key, (Collection<byte[]>) values, null),
+ new MultiRemove(key, values, null),
SERIALIZER::decode);
}
@@ -181,12 +185,23 @@
}
@Override
+ public CompletableFuture<Boolean> removeAll(Map<String, Collection<? extends byte[]>> mapping) {
+ return proxy.invoke(MULTI_REMOVE_ALL, SERIALIZER::encode,
+ new MultiRemoveAll(mapping, null), SERIALIZER::decode);
+ }
+
+ @Override
public CompletableFuture<Boolean> putAll(
String key, Collection<? extends byte[]> values) {
return proxy.invoke(PUT, SERIALIZER::encode, new Put(key, values, null), SERIALIZER::decode);
}
@Override
+ public CompletableFuture<Boolean> putAll(Map<String, Collection<? extends byte[]>> mapping) {
+ return proxy.invoke(MULTI_PUT_ALL, SERIALIZER::encode, new MultiPutAll(mapping, null), SERIALIZER::decode);
+ }
+
+ @Override
public CompletableFuture<Versioned<Collection<? extends byte[]>>> replaceValues(
String key, Collection<byte[]> values) {
return proxy.invoke(
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
index 4d38d71..eb6dc65 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
@@ -47,10 +47,12 @@
VALUES(OperationType.QUERY),
ENTRIES(OperationType.QUERY),
PUT(OperationType.COMMAND),
+ MULTI_PUT_ALL(OperationType.COMMAND),
PUT_AND_GET(OperationType.COMMAND),
REMOVE(OperationType.COMMAND),
REMOVE_AND_GET(OperationType.COMMAND),
REMOVE_ALL(OperationType.COMMAND),
+ MULTI_REMOVE_ALL(OperationType.COMMAND),
REPLACE(OperationType.COMMAND),
CLEAR(OperationType.COMMAND),
ADD_LISTENER(OperationType.COMMAND),
@@ -83,6 +85,8 @@
.register(ContainsValue.class)
.register(Get.class)
.register(MultiRemove.class)
+ .register(MultiRemoveAll.class)
+ .register(MultiPutAll.class)
.register(Put.class)
.register(RemoveAll.class)
.register(Replace.class)
@@ -261,13 +265,13 @@
@SuppressWarnings("serial")
public static class MultiRemove extends MultimapOperation {
private String key;
- private Collection<byte[]> values;
+ private Collection<? extends byte[]> values;
private Match<Long> versionMatch;
public MultiRemove() {
}
- public MultiRemove(String key, Collection<byte[]> valueMatches,
+ public MultiRemove(String key, Collection<? extends byte[]> valueMatches,
Match<Long> versionMatch) {
this.key = checkNotNull(key);
this.values = valueMatches;
@@ -278,7 +282,7 @@
return this.key;
}
- public Collection<byte[]> values() {
+ public Collection<? extends byte[]> values() {
return values;
}
@@ -297,10 +301,43 @@
}
/**
+ * Command to back the multi-key removeAll method.
+ */
+ @SuppressWarnings("serial")
+ public static class MultiRemoveAll extends MultimapOperation {
+ private Map<String, Collection<? extends byte[]>> mapping;
+ private Match<Long> versionMatch;
+
+ public MultiRemoveAll() {
+ }
+
+ public MultiRemoveAll(Map<String, Collection<? extends byte[]>> mapping, Match<Long> versionMatch) {
+ this.mapping = checkNotNull(mapping);
+ this.versionMatch = versionMatch;
+ }
+
+ public Map<String, Collection<? extends byte[]>> mapping() {
+ return mapping;
+ }
+
+ public Match<Long> versionMatch() {
+ return versionMatch;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("mapping", mapping)
+ .add("versionMatch", versionMatch)
+ .toString();
+ }
+ }
+
+ /**
* Command to back the put and putAll methods.
*/
@SuppressWarnings("serial")
- public static class Put extends MultimapOperation {
+ public static class Put extends MultimapOperation {
private String key;
private Collection<? extends byte[]> values;
private Match<Long> versionMatch;
@@ -337,6 +374,39 @@
}
/**
+ * Command to back the multi-key putAll method.
+ */
+ @SuppressWarnings("serial")
+ public static class MultiPutAll extends MultimapOperation {
+ private Map<String, Collection<? extends byte[]>> mapping;
+ private Match<Long> versionMatch;
+
+ public MultiPutAll() {
+ }
+
+ public MultiPutAll(Map<String, Collection<? extends byte[]>> mapping, Match<Long> versionMatch) {
+ this.mapping = checkNotNull(mapping);
+ this.versionMatch = versionMatch;
+ }
+
+ public Map<String, Collection<? extends byte[]>> mapping() {
+ return mapping;
+ }
+
+ public Match<Long> versionMatch() {
+ return versionMatch;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("mapping", mapping)
+ .add("versionMatch", versionMatch)
+ .toString();
+ }
+ }
+
+ /**
* Replace command, returns the collection that was replaced.
*/
@SuppressWarnings("serial")
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index fd2e676..2d18c35 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -49,6 +49,7 @@
import io.atomix.protocols.raft.service.AbstractRaftService;
import io.atomix.protocols.raft.service.Commit;
import io.atomix.protocols.raft.service.RaftServiceExecutor;
+import io.atomix.protocols.raft.service.impl.DefaultCommit;
import io.atomix.protocols.raft.session.RaftSession;
import io.atomix.protocols.raft.storage.snapshot.SnapshotReader;
import io.atomix.protocols.raft.storage.snapshot.SnapshotWriter;
@@ -93,6 +94,10 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Replace;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.SIZE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.VALUES;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MULTI_PUT_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiPutAll;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MULTI_REMOVE_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemoveAll;
/**
* State Machine for {@link AtomixConsistentSetMultimap} resource.
@@ -169,9 +174,11 @@
executor.register(VALUES, this::values, serializer::encode);
executor.register(ENTRIES, this::entries, serializer::encode);
executor.register(GET, serializer::decode, this::get, serializer::encode);
+ executor.register(MULTI_REMOVE_ALL, serializer::decode, this::multiRemoveAll, serializer::encode);
executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
executor.register(REMOVE_AND_GET, serializer::decode, this::removeAndGet, serializer::encode);
+ executor.register(MULTI_PUT_ALL, serializer::decode, this::multiPutAll, serializer::encode);
executor.register(PUT, serializer::decode, this::put, serializer::encode);
executor.register(PUT_AND_GET, serializer::decode, this::putAndGet, serializer::encode);
executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
@@ -395,6 +402,32 @@
}
/**
+ * Handles a MultiRemoveAll commit, returns true if any change results from this
+ * commit.
+ * @param commit a MultiRemoveAll commit
+ * @return true if this commit results in a change, else false
+ */
+ protected boolean multiRemoveAll(Commit<? extends MultiRemoveAll> commit) {
+ Map<String, Collection<? extends byte[]>> mapping = commit.value().mapping();
+ // There are no updates
+ if (mapping.isEmpty()) {
+ return false;
+ }
+ // Decompose the commit in several updates
+ boolean operationResult = false;
+ for (Map.Entry<String, Collection<? extends byte[]>> entry : mapping.entrySet()) {
+ MultiRemove update = new MultiRemove(entry.getKey(), entry.getValue(), commit.value().versionMatch());
+ Commit<? extends MultiRemove> commitUpdate = new DefaultCommit<>(commit.index(), commit.operation(),
+ update, commit.session(),
+ commit.wallClockTime().unixTimestamp());
+ if (multiRemove(commitUpdate)) {
+ operationResult = true;
+ }
+ }
+ return operationResult;
+ }
+
+ /**
* Handles a removeAndGet commit.
*
* @param commit multiRemove commit
@@ -456,6 +489,32 @@
}
/**
+ * Handles a MultiPutAll commit, returns true if any change results from this
+ * commit.
+ * @param commit a MultiPutAll commit
+ * @return true if this commit results in a change, else false
+ */
+ protected boolean multiPutAll(Commit<? extends MultiPutAll> commit) {
+ Map<String, Collection<? extends byte[]>> mapping = commit.value().mapping();
+ // There are no updates
+ if (mapping.isEmpty()) {
+ return false;
+ }
+ // Decompose the commit in several updates
+ boolean operationResult = false;
+ for (Map.Entry<String, Collection<? extends byte[]>> entry : mapping.entrySet()) {
+ Put update = new Put(entry.getKey(), entry.getValue(), commit.value().versionMatch());
+ Commit<? extends Put> commitUpdate = new DefaultCommit<>(commit.index(), commit.operation(),
+ update, commit.session(),
+ commit.wallClockTime().unixTimestamp());
+ if (put(commitUpdate)) {
+ operationResult = true;
+ }
+ }
+ return operationResult;
+ }
+
+ /**
* Handles a putAndGet commit.
*
* @param commit a put commit
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/AsyncConsistentMultimapMock.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/AsyncConsistentMultimapMock.java
new file mode 100644
index 0000000..c451da9
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/AsyncConsistentMultimapMock.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+import org.onosproject.store.service.AsyncConsistentMultimapAdapter;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public class AsyncConsistentMultimapMock<K, V> extends AsyncConsistentMultimapAdapter<K, V> {
+ private final Multimap<K, V> baseMap = HashMultimap.create();
+ private static final int DEFAULT_CREATION_TIME = 0;
+ private static final int DEFAULT_VERSION = 0;
+
+ AsyncConsistentMultimapMock() {
+ }
+
+ Versioned<Collection<? extends V>> makeVersioned(Collection<? extends V> v) {
+ return new Versioned<>(v, DEFAULT_VERSION, DEFAULT_CREATION_TIME);
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return CompletableFuture.completedFuture(baseMap.size());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ return CompletableFuture.completedFuture(baseMap.isEmpty());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> putAll(Map<K, Collection<? extends V>> mapping) {
+ CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+ for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+ if (baseMap.putAll(entry.getKey(), entry.getValue())) {
+ result = CompletableFuture.completedFuture(true);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> get(K key) {
+ return CompletableFuture.completedFuture(makeVersioned(baseMap.get(key)));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeAll(Map<K, Collection<? extends V>> mapping) {
+ CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+ for (Map.Entry<K, Collection<? extends V>> entry : mapping.entrySet()) {
+ for (V value : entry.getValue()) {
+ if (baseMap.remove(entry.getKey(), value)) {
+ result = CompletableFuture.completedFuture(true);
+ }
+ }
+ }
+ return result;
+ }
+}
+
+
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimapTest.java
new file mode 100644
index 0000000..0ee4812
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimapTest.java
@@ -0,0 +1,149 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.Hashing;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.Matchers.*;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+
+public class PartitionedAsyncConsistentMultimapTest {
+
+ private static PartitionedAsyncConsistentMultimap<String, String> partitionedAsyncConsistentMap;
+ private static Map<PartitionId, AsyncConsistentMultimap<String, String>> partitions;
+ private static List<PartitionId> sortedMemberPartitionIds;
+ private static Serializer serializer;
+
+ private static final String PARTITION_NAME = "PartitionManager";
+
+ private static final String KEY1 = "AAA";
+ private static final String VALUE1 = "one";
+ private static final String KEY2 = "BBB";
+ private static final String VALUE2 = "two";
+ private static final String KEY3 = "CCC";
+ private static final String VALUE3 = "three";
+ private static final String KEY4 = "DDD";
+ private static final String VALUE4 = "four";
+
+ private final List<String> allKeys = Lists.newArrayList(KEY1, KEY2,
+ KEY3, KEY4);
+ private final List<String> allValues = Lists.newArrayList(VALUE1, VALUE2,
+ VALUE3, VALUE4);
+
+ @Before
+ public void setUp() throws Exception {
+ // Init maps and partitions
+ AsyncConsistentMultimap<String, String> asyncMap1 = new AsyncConsistentMultimapMock<>();
+ AsyncConsistentMultimap<String, String> asyncMap2 = new AsyncConsistentMultimapMock<>();
+ AsyncConsistentMultimap<String, String> asyncMap3 = new AsyncConsistentMultimapMock<>();
+ AsyncConsistentMultimap<String, String> asyncMap4 = new AsyncConsistentMultimapMock<>();
+ PartitionId pid1 = PartitionId.from(1);
+ PartitionId pid2 = PartitionId.from(2);
+ PartitionId pid3 = PartitionId.from(3);
+ PartitionId pid4 = PartitionId.from(4);
+ partitions = new HashMap<>();
+ serializer = Serializer.using(KryoNamespaces.BASIC);
+ partitions.put(pid1, asyncMap1);
+ partitions.put(pid2, asyncMap2);
+ partitions.put(pid3, asyncMap3);
+ partitions.put(pid4, asyncMap4);
+ sortedMemberPartitionIds = Lists.newArrayList(partitions.keySet());
+ Hasher<String> hasher = key -> {
+ int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt();
+ return sortedMemberPartitionIds.get(Math.abs(hashCode) % partitions.size());
+ };
+ partitionedAsyncConsistentMap = new PartitionedAsyncConsistentMultimap<>(PARTITION_NAME,
+ partitions,
+ hasher);
+ }
+
+ @Test
+ public void testPutAllRemoveAll() {
+ // Init phase
+ assertThat(partitionedAsyncConsistentMap.isEmpty().join(), is(true));
+ assertThat(partitionedAsyncConsistentMap.size().join(), is(0));
+ assertThat(partitionedAsyncConsistentMap.name(), is("PartitionManager"));
+ // Test multi put
+ Map<String, Collection<? extends String>> mapping = Maps.newHashMap();
+ // First build the mappings having each key a different mapping
+ allKeys.forEach(key -> {
+ switch (key) {
+ case KEY1:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 1)));
+ break;
+ case KEY2:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 2)));
+ break;
+ case KEY3:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 3)));
+ break;
+ default:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 4)));
+ break;
+ }
+ });
+ // Success
+ assertThat(partitionedAsyncConsistentMap.putAll(mapping).join(), is(true));
+ // Failure
+ assertThat(partitionedAsyncConsistentMap.putAll(mapping).join(), is(false));
+ // Verify operation
+ assertThat(partitionedAsyncConsistentMap.size().join(), is(10));
+ assertThat(partitionedAsyncConsistentMap.isEmpty().join(), is(false));
+ // verify mapping is ok
+ allKeys.forEach(key -> {
+ switch (key) {
+ case KEY1:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(partitionedAsyncConsistentMap.get(key).join())),
+ containsInAnyOrder(allValues.subList(0, 1).toArray()));
+ break;
+ case KEY2:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(partitionedAsyncConsistentMap.get(key).join())),
+ containsInAnyOrder(allValues.subList(0, 2).toArray()));
+ break;
+ case KEY3:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(partitionedAsyncConsistentMap.get(key).join())),
+ containsInAnyOrder(allValues.subList(0, 3).toArray()));
+ break;
+ default:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(partitionedAsyncConsistentMap.get(key).join())),
+ containsInAnyOrder(allValues.subList(0, 4).toArray()));
+ break;
+ }
+ });
+ // Success
+ assertThat(partitionedAsyncConsistentMap.removeAll(mapping).join(), is(true));
+ // Failure
+ assertThat(partitionedAsyncConsistentMap.removeAll(mapping).join(), is(false));
+ // Verify operation
+ assertThat(partitionedAsyncConsistentMap.size().join(), is(0));
+ assertThat(partitionedAsyncConsistentMap.isEmpty().join(), is(true));
+ }
+}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimapTest.java
new file mode 100644
index 0000000..9f631db
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimapTest.java
@@ -0,0 +1,222 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.net.DeviceId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.AsyncConsistentMultimapAdapter;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.Versioned;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+
+
+public class TranscodingAsyncConsistentMultimapTest {
+
+ private static Serializer serializer;
+ private static AsyncConsistentMultimap<String, DeviceId> transcodingMap;
+ private static AsyncConsistentMultimap<String, byte[]> baseMap;
+
+ private static final String KEY1 = "Key1";
+ private static final String KEY2 = "Key2";
+ private static final String KEY3 = "Key3";
+ private static final String KEY4 = "Key4";
+ private static final DeviceId DEV1 = DeviceId.deviceId("Device1");
+ private static final DeviceId DEV2 = DeviceId.deviceId("Device2");
+ private static final DeviceId DEV3 = DeviceId.deviceId("foo");
+ private static final DeviceId DEV4 = DeviceId.deviceId("bar");
+ private final List<String> allKeys = Lists.newArrayList(KEY1, KEY2,
+ KEY3, KEY4);
+ private final List<DeviceId> allValues = Lists.newArrayList(DEV1, DEV2,
+ DEV3, DEV4);
+
+ @Before
+ public void setUp() throws Exception {
+ // Init base map and serializer
+ serializer = Serializer.using(KryoNamespaces.API);
+ baseMap = new AsyncConsistentMultimapMock();
+ // Create the transcoding map
+ transcodingMap = DistributedPrimitives.newTranscodingMultimap(
+ baseMap,
+ Function.identity(),
+ Function.identity(),
+ serializer::encode,
+ serializer::decode);
+ }
+
+ @Test
+ public void testPutAllRemoveAll() throws Exception {
+ // Init phase
+ assertThat(transcodingMap.size().join(), is(0));
+ assertThat(transcodingMap.isEmpty().join(), is(true));
+ // Test multi put
+ Map<String, Collection<? extends DeviceId>> mapping = Maps.newHashMap();
+ // First build the mappings having each key a different mapping
+ allKeys.forEach(key -> {
+ switch (key) {
+ case KEY1:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 1)));
+ break;
+ case KEY2:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 2)));
+ break;
+ case KEY3:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 3)));
+ break;
+ default:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 4)));
+ break;
+ }
+ });
+ // Success
+ assertThat(transcodingMap.putAll(mapping).join(), is(true));
+ // Failure
+ assertThat(transcodingMap.putAll(mapping).join(), is(false));
+ // Verify operation
+ assertThat(transcodingMap.size().join(), is(10));
+ assertThat(transcodingMap.isEmpty().join(), is(false));
+ // verify mapping is ok
+ allKeys.forEach(key -> {
+ switch (key) {
+ case KEY1:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(transcodingMap.get(key).join())),
+ containsInAnyOrder(allValues.subList(0, 1).toArray()));
+ break;
+ case KEY2:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(transcodingMap.get(key).join())),
+ containsInAnyOrder(allValues.subList(0, 2).toArray()));
+ break;
+ case KEY3:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(transcodingMap.get(key).join())),
+ containsInAnyOrder(allValues.subList(0, 3).toArray()));
+ break;
+ default:
+ assertThat(Lists.newArrayList(Versioned.valueOrNull(transcodingMap.get(key).join())),
+ containsInAnyOrder(allValues.subList(0, 4).toArray()));
+ break;
+ }
+ });
+ // Success
+ assertThat(transcodingMap.removeAll(mapping).join(), is(true));
+ // Failure
+ assertThat(transcodingMap.removeAll(mapping).join(), is(false));
+ // Verify operation
+ assertThat(transcodingMap.size().join(), is(0));
+ assertThat(transcodingMap.isEmpty().join(), is(true));
+ }
+
+ // It uses a special internal map for bytes comparison - otherwise the equality cannot be verified
+ public static class AsyncConsistentMultimapMock extends AsyncConsistentMultimapAdapter<String, byte[]> {
+ private final Map<String, Set<byte[]>> baseMap = new HashMap<>();
+ private static final int DEFAULT_CREATION_TIME = 0;
+ private static final int DEFAULT_VERSION = 0;
+
+ AsyncConsistentMultimapMock() { }
+
+ Versioned<Collection<? extends byte[]>> makeVersioned(Collection<? extends byte[]> v) {
+ return new Versioned<>(v, DEFAULT_VERSION, DEFAULT_CREATION_TIME);
+ }
+
+ @Override
+ public CompletableFuture<Integer> size() {
+ return CompletableFuture.completedFuture(baseMap.values().stream()
+ .map(Set::size)
+ .mapToInt(size -> size).sum());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty() {
+ return CompletableFuture.completedFuture(baseMap.isEmpty());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> putAll(Map<String, Collection<? extends byte[]>> mapping) {
+ CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+ for (Map.Entry<String, Collection<? extends byte[]>> entry : mapping.entrySet()) {
+ Set<byte[]> values = baseMap.computeIfAbsent(
+ entry.getKey(), k -> Sets.newTreeSet(new ByteArrayComparator()));
+ for (byte[] value : entry.getValue()) {
+ if (values.add(value)) {
+ result = CompletableFuture.completedFuture(true);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public CompletableFuture<Versioned<Collection<? extends byte[]>>> get(String key) {
+ return CompletableFuture.completedFuture(makeVersioned(baseMap.get(key)));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> removeAll(Map<String, Collection<? extends byte[]>> mapping) {
+ CompletableFuture<Boolean> result = CompletableFuture.completedFuture(false);
+ for (Map.Entry<String, Collection<? extends byte[]>> entry : mapping.entrySet()) {
+ Set<byte[]> values = baseMap.get(entry.getKey());
+ if (values == null) {
+ return CompletableFuture.completedFuture(false);
+ }
+ for (byte[] value : entry.getValue()) {
+ if (values.remove(value)) {
+ result = CompletableFuture.completedFuture(true);
+ }
+ }
+ if (values.isEmpty()) {
+ baseMap.remove(entry.getKey());
+ }
+ }
+ return result;
+ }
+
+ private static class ByteArrayComparator implements Comparator<byte[]> {
+ @Override
+ public int compare(byte[] o1, byte[] o2) {
+ if (Arrays.equals(o1, o2)) {
+ return 0;
+ } else {
+ for (int i = 0; i < o1.length && i < o2.length; i++) {
+ if (o1[i] < o2[i]) {
+ return -1;
+ } else if (o1[i] > o2[i]) {
+ return 1;
+ }
+ }
+ return o1.length > o2.length ? 1 : -1;
+ }
+ }
+ }
+ }
+
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
index cabf215..6b7e710 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
@@ -28,11 +28,13 @@
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.TreeMultiset;
import io.atomix.protocols.raft.proxy.RaftProxy;
import io.atomix.protocols.raft.service.RaftService;
import org.apache.commons.collections.keyvalue.DefaultMapEntry;
+import org.junit.Assert;
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncIterator;
@@ -336,6 +338,83 @@
.join();
});
+ // Done let's destroy the map
+ map.destroy().join();
+ }
+
+ /**
+ * Contains tests for put, putAll, remove, removeAll and replace.
+ * @throws Exception
+ */
+ @Test
+ public void multiPutAllAndMultiRemoveAllTest() throws Exception {
+ // Init phase
+ AtomixConsistentSetMultimap map = createResource("testMultiOp");
+ allKeys.forEach(key -> {
+ allValues.forEach(value -> {
+ map.put(key, value).join();
+ map.put(key, value).join();
+ });
+ });
+
+ // Test multi put
+ Map<String, Collection<? extends byte[]>> mapping = Maps.newHashMap();
+ // First build the mappings having each key a different mapping
+ allKeys.forEach(key -> {
+ switch (key) {
+ case keyOne:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 1)));
+ break;
+ case keyTwo:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 2)));
+ break;
+ case keyThree:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 3)));
+ break;
+ default:
+ mapping.put(key, Lists.newArrayList(allValues.subList(0, 4)));
+ break;
+ }
+ });
+ map.size().thenAccept(result -> assertEquals(16, (int) result)).join();
+ // Keys are already present operation has to fail
+ map.putAll(mapping).thenAccept(Assert::assertFalse).join();
+ // clean up the map
+ allKeys.forEach(key -> {
+ map.removeAll(key).join();
+ map.removeAll(key).join();
+ });
+ // verify map is empty
+ map.size().thenAccept(result -> assertEquals(0, (int) result)).join();
+ // put all again but now operation is successful
+ map.putAll(mapping).thenAccept(Assert::assertTrue).join();
+ // verify mapping is ok
+ allKeys.forEach(key -> map.get(key).thenAccept(result -> {
+ switch (key) {
+ case keyOne:
+ assertTrue(byteArrayCollectionIsEqual(allValues.subList(0, 1), result.value()));
+ break;
+ case keyTwo:
+ assertTrue(byteArrayCollectionIsEqual(allValues.subList(0, 2), result.value()));
+ break;
+ case keyThree:
+ assertTrue(byteArrayCollectionIsEqual(allValues.subList(0, 3), result.value()));
+ break;
+ default:
+ assertTrue(byteArrayCollectionIsEqual(allValues.subList(0, 4), result.value()));
+ break;
+ }
+ }).join());
+ // We have added keyOne -> {valueOne}, keyTwo -> {valueOne, valueTwo} and so on
+ map.size().thenAccept(result -> assertEquals(10, (int) result)).join();
+ // removeAll but now operation is successful
+ map.removeAll(mapping).thenAccept(Assert::assertTrue).join();
+ // removeAll but now operation is failure
+ map.removeAll(mapping).thenAccept(Assert::assertFalse).join();
+ // No more elements
+ map.size().thenAccept(result -> assertEquals(0, (int) result)).join();
+
+ // Done let's destroy the map
map.destroy().join();
}