Refactor DistributedPacketStore to store packet requests in a ConsistentMultimap
Change-Id: Ia4a93c47fee726009673e99609b2f8800807e675
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
index 600af28..c14b187 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMultimap.java
@@ -86,11 +86,21 @@
}
@Override
+ public Versioned<Collection<? extends V>> putAndGet(K key, V value) {
+ return complete(asyncMultimap.putAndGet(key, value));
+ }
+
+ @Override
public boolean remove(K key, V value) {
return complete(asyncMultimap.remove(key, value));
}
@Override
+ public Versioned<Collection<? extends V>> removeAndGet(K key, V value) {
+ return complete(asyncMultimap.removeAndGet(key, value));
+ }
+
+ @Override
public boolean removeAll(K key, Collection<? extends V> values) {
return complete(asyncMultimap.removeAll(key, values));
}
diff --git a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
index 12b4645..94943b8 100644
--- a/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/AsyncConsistentMultimap.java
@@ -104,6 +104,19 @@
CompletableFuture<Boolean> put(K key, V value);
/**
+ * If the key-value pair does not already exist adds either the key value
+ * pair or the value to the set of values associated with the key and
+ * returns the updated value, if the key-value pair already exists then behavior
+ * is implementation specific with some implementations allowing duplicates
+ * and others ignoring put requests for existing entries.
+ *
+ * @param key the key to add
+ * @param value the value to add
+ * @return a future to be completed with the updated values
+ */
+ CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value);
+
+ /**
* Removes the key-value pair with the specified values if it exists. In
* implementations that allow duplicates which matching entry will be
* removed is undefined.
@@ -116,6 +129,17 @@
CompletableFuture<Boolean> remove(K key, V value);
/**
+ * Removes the key-value pair with the specified values if it exists. In
+ * implementations that allow duplicates which matching entry will be
+ * removed is undefined.
+ *
+ * @param key the key of the pair to be removed
+ * @param value the value of the pair to be removed
+ * @return a future to be completed with the updated values
+ */
+ CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value);
+
+ /**
* Removes the key-value pairs with the specified key and values if they
* exist. In implementations that allow duplicates each instance of a key
* will remove one matching entry, which one is not defined. Equivalent to
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
index bdbbee0..99045a8 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMultimap.java
@@ -91,6 +91,19 @@
boolean put(K key, V value);
/**
+ * If the key-value pair does not already exist adds either the key value
+ * pair or the value to the set of values associated with the key and
+ * returns the updated value, if the key-value pair already exists then behavior
+ * is implementation specific with some implementations allowing duplicates
+ * and others ignoring put requests for existing entries.
+ *
+ * @param key the key to add
+ * @param value the value to add
+ * @return the updated values
+ */
+ Versioned<Collection<? extends V>> putAndGet(K key, V value);
+
+ /**
* Removes the key-value pair with the specified values if it exists. In
* implementations that allow duplicates which matching entry will be
* removed is undefined.
@@ -102,6 +115,17 @@
boolean remove(K key, V value);
/**
+ * Removes the key-value pair with the specified values if it exists. In
+ * implementations that allow duplicates which matching entry will be
+ * removed is undefined.
+ *
+ * @param key the key of the pair to be removed
+ * @param value the value of the pair to be removed
+ * @return the updated values
+ */
+ Versioned<Collection<? extends V>> removeAndGet(K key, V value);
+
+ /**
* Removes the key-value pairs with the specified key and values if they
* exist. In implementations that allow duplicates each instance of a key
* will remove one matching entry, which one is not defined. Equivalent to
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
index a97337e..08be2e4 100644
--- a/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
+++ b/core/api/src/test/java/org/onosproject/store/service/TestConsistentMultimap.java
@@ -76,11 +76,23 @@
}
@Override
+ public Versioned<Collection<? extends V>> putAndGet(K key, V value) {
+ innermap.put(key, version(value));
+ return (Versioned<Collection<? extends V>>) innermap.get(key);
+ }
+
+ @Override
public boolean remove(K key, V value) {
return innermap.remove(key, value);
}
@Override
+ public Versioned<Collection<? extends V>> removeAndGet(K key, V value) {
+ innermap.remove(key, value);
+ return (Versioned<Collection<? extends V>>) innermap.get(key);
+ }
+
+ @Override
public boolean removeAll(K key, Collection<? extends V> values) {
return false;
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
index 5fdea05..1c02da9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/packet/impl/DistributedPacketStore.java
@@ -15,9 +15,7 @@
*/
package org.onosproject.store.packet.impl;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -43,20 +41,20 @@
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMultimap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
+import java.util.Collection;
import java.util.Dictionary;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
@@ -205,10 +203,10 @@
private final class PacketRequestTracker {
- private ConsistentMap<RequestKey, Set<PacketRequest>> requests;
+ private ConsistentMultimap<RequestKey, PacketRequest> requests;
private PacketRequestTracker() {
- requests = storageService.<RequestKey, Set<PacketRequest>>consistentMapBuilder()
+ requests = storageService.<RequestKey, PacketRequest>consistentMultimapBuilder()
.withName("onos-packet-requests")
.withSerializer(Serializer.using(KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
@@ -218,68 +216,36 @@
}
private void add(PacketRequest request) {
- AtomicBoolean firstRequest = addInternal(request);
- if (firstRequest.get() && delegate != null) {
+ boolean firstRequest = addInternal(request);
+ if (firstRequest && delegate != null) {
// The instance that makes the first request will push to all devices
delegate.requestPackets(request);
}
}
- private AtomicBoolean addInternal(PacketRequest request) {
- AtomicBoolean firstRequest = new AtomicBoolean(false);
- requests.compute(key(request), (s, existingRequests) -> {
- // Reset to false just in case this is a retry due to
- // ConcurrentModificationException
- firstRequest.set(false);
- if (existingRequests == null) {
- firstRequest.set(true);
- return ImmutableSet.of(request);
- } else if (!existingRequests.contains(request)) {
- firstRequest.set(true);
- return ImmutableSet.<PacketRequest>builder()
- .addAll(existingRequests)
- .add(request)
- .build();
- } else {
- return existingRequests;
- }
- });
- return firstRequest;
+ private boolean addInternal(PacketRequest request) {
+ Collection<? extends PacketRequest> values =
+ Versioned.valueOrNull(requests.putAndGet(key(request), request));
+ return values.size() == 1;
}
private void remove(PacketRequest request) {
- AtomicBoolean removedLast = removeInternal(request);
- if (removedLast.get() && delegate != null) {
+ boolean removedLast = removeInternal(request);
+ if (removedLast && delegate != null) {
// The instance that removes the last request will remove from all devices
delegate.cancelPackets(request);
}
}
- private AtomicBoolean removeInternal(PacketRequest request) {
- AtomicBoolean removedLast = new AtomicBoolean(false);
- requests.computeIfPresent(key(request), (s, existingRequests) -> {
- // Reset to false just in case this is a retry due to
- // ConcurrentModificationException
- removedLast.set(false);
- if (existingRequests.contains(request)) {
- Set<PacketRequest> newRequests = Sets.newHashSet(existingRequests);
- newRequests.remove(request);
- if (newRequests.size() > 0) {
- return ImmutableSet.copyOf(newRequests);
- } else {
- removedLast.set(true);
- return null;
- }
- } else {
- return existingRequests;
- }
- });
- return removedLast;
+ private boolean removeInternal(PacketRequest request) {
+ Collection<? extends PacketRequest> values =
+ Versioned.valueOrNull(requests.removeAndGet(key(request), request));
+ return values == null || values.isEmpty();
}
private List<PacketRequest> requests() {
List<PacketRequest> list = Lists.newArrayList();
- requests.values().forEach(v -> list.addAll(v.value()));
+ requests.values().forEach(v -> list.add(v));
list.sort((o1, o2) -> o1.priority().priorityValue() - o2.priority().priorityValue());
return list;
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
index bb249e1..779bb7a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMultimap.java
@@ -78,11 +78,21 @@
}
@Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
+ return delegateMap.putAndGet(key, value);
+ }
+
+ @Override
public CompletableFuture<Boolean> remove(K key, V value) {
return delegateMap.remove(key, value);
}
@Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
+ return delegateMap.removeAndGet(key, value);
+ }
+
+ @Override
public CompletableFuture<Boolean> removeAll(
K key, Collection<? extends V> values) {
return delegateMap.removeAll(key, values);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
index badbf05..a4e0bf9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMultimap.java
@@ -111,6 +111,11 @@
}
@Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> putAndGet(K key, V value) {
+ return getMultimap(key).putAndGet(key, value);
+ }
+
+ @Override
public CompletableFuture<Boolean> removeAll(K key, Collection<? extends V> values) {
return getMultimap(key).removeAll(key, values);
}
@@ -174,6 +179,11 @@
}
@Override
+ public CompletableFuture<Versioned<Collection<? extends V>>> removeAndGet(K key, V value) {
+ return getMultimap(key).removeAndGet(key, value);
+ }
+
+ @Override
public CompletableFuture<AsyncIterator<Entry<K, V>>> iterator() {
return Tools.allOf(getMultimaps().stream().map(m -> m.iterator()).collect(Collectors.toList()))
.thenApply(PartitionedMultimapIterator::new);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
index b00d06a..6fab8c8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMultimap.java
@@ -141,6 +141,16 @@
}
@Override
+ public CompletableFuture<Versioned<Collection<? extends V1>>> putAndGet(K1 key, V1 value) {
+ try {
+ return backingMap.putAndGet(keyEncoder.apply(key), valueEncoder.apply(value))
+ .thenApply(versionedValueCollectionDecode);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
public CompletableFuture<Boolean> remove(K1 key, V1 value) {
try {
return backingMap.remove(keyEncoder.apply(key), valueEncoder
@@ -151,6 +161,16 @@
}
@Override
+ public CompletableFuture<Versioned<Collection<? extends V1>>> removeAndGet(K1 key, V1 value) {
+ try {
+ return backingMap.removeAndGet(keyEncoder.apply(key), valueEncoder.apply(value))
+ .thenApply(versionedValueCollectionDecode);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
public CompletableFuture<Boolean> removeAll(
K1 key, Collection<? extends V1> values) {
try {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
index 44cc614..39d49fe 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimap.java
@@ -61,9 +61,11 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.KEY_SET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
@@ -148,6 +150,15 @@
}
@Override
+ public CompletableFuture<Versioned<Collection<? extends byte[]>>> putAndGet(String key, byte[] value) {
+ return proxy.invoke(
+ PUT_AND_GET,
+ SERIALIZER::encode,
+ new Put(key, Lists.newArrayList(value), null),
+ SERIALIZER::decode);
+ }
+
+ @Override
public CompletableFuture<Boolean> remove(String key, byte[] value) {
return proxy.invoke(REMOVE, SERIALIZER::encode, new MultiRemove(key,
Lists.newArrayList(value),
@@ -155,6 +166,13 @@
}
@Override
+ public CompletableFuture<Versioned<Collection<? extends byte[]>>> removeAndGet(String key, byte[] value) {
+ return proxy.invoke(REMOVE_AND_GET, SERIALIZER::encode, new MultiRemove(key,
+ Lists.newArrayList(value),
+ null), SERIALIZER::decode);
+ }
+
+ @Override
public CompletableFuture<Boolean> removeAll(String key, Collection<? extends byte[]> values) {
return proxy.invoke(
REMOVE,
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
index 82372a2..6ad93b6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapOperations.java
@@ -45,7 +45,9 @@
VALUES(OperationType.QUERY),
ENTRIES(OperationType.QUERY),
PUT(OperationType.COMMAND),
+ PUT_AND_GET(OperationType.COMMAND),
REMOVE(OperationType.COMMAND),
+ REMOVE_AND_GET(OperationType.COMMAND),
REMOVE_ALL(OperationType.COMMAND),
REPLACE(OperationType.COMMAND),
CLEAR(OperationType.COMMAND),
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 2312d8f..5a93bef 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -77,9 +77,11 @@
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultiRemove;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.MultimapOperation;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.PUT_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.Put;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_ALL;
+import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_AND_GET;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REMOVE_LISTENER;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.REPLACE;
import static org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapOperations.RemoveAll;
@@ -153,7 +155,9 @@
executor.register(GET, serializer::decode, this::get, serializer::encode);
executor.register(REMOVE_ALL, serializer::decode, this::removeAll, serializer::encode);
executor.register(REMOVE, serializer::decode, this::multiRemove, serializer::encode);
+ executor.register(REMOVE_AND_GET, serializer::decode, this::removeAndGet, serializer::encode);
executor.register(PUT, serializer::decode, this::put, serializer::encode);
+ executor.register(PUT_AND_GET, serializer::decode, this::putAndGet, serializer::encode);
executor.register(REPLACE, serializer::decode, this::replace, serializer::encode);
executor.register(ADD_LISTENER, this::listen);
executor.register(REMOVE_LISTENER, this::unlisten);
@@ -352,8 +356,8 @@
}
Versioned<Collection<? extends byte[]>> removedValues = backingMap
- .get(key)
- .addCommit(commit);
+ .get(key)
+ .addCommit(commit);
if (removedValues != null) {
if (removedValues.value().isEmpty()) {
@@ -361,9 +365,9 @@
}
publish(removedValues.value().stream()
- .map(value -> new MultimapEvent<String, byte[]>(
- "", key, null, value))
- .collect(Collectors.toList()));
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, null, value))
+ .collect(Collectors.toList()));
return true;
}
@@ -371,6 +375,37 @@
}
/**
+ * Handles a removeAndGet commit.
+ *
+ * @param commit multiRemove commit
+ * @return the updated values or null if the values are empty
+ */
+ protected Versioned<Collection<? extends byte[]>> removeAndGet(Commit<? extends MultiRemove> commit) {
+ String key = commit.value().key();
+
+ if (!backingMap.containsKey(key)) {
+ return null;
+ }
+
+ Versioned<Collection<? extends byte[]>> removedValues = backingMap
+ .get(key)
+ .addCommit(commit);
+
+ if (removedValues != null) {
+ if (removedValues.value().isEmpty()) {
+ backingMap.remove(key);
+ }
+
+ publish(removedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, null, value))
+ .collect(Collectors.toList()));
+ }
+
+ return toVersioned(backingMap.get(key));
+ }
+
+ /**
* Handles a put commit, returns true if any change results from this
* commit.
* @param commit a put commit
@@ -386,20 +421,49 @@
}
Versioned<Collection<? extends byte[]>> addedValues = backingMap
- .get(key)
- .addCommit(commit);
+ .get(key)
+ .addCommit(commit);
if (addedValues != null) {
publish(addedValues.value().stream()
- .map(value -> new MultimapEvent<String, byte[]>(
- "", key, value, null))
- .collect(Collectors.toList()));
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, value, null))
+ .collect(Collectors.toList()));
return true;
}
return false;
}
+ /**
+ * Handles a putAndGet commit.
+ *
+ * @param commit a put commit
+ * @return the updated values
+ */
+ protected Versioned<Collection<? extends byte[]>> putAndGet(Commit<? extends Put> commit) {
+ String key = commit.value().key();
+ if (commit.value().values().isEmpty()) {
+ return null;
+ }
+ if (!backingMap.containsKey(key)) {
+ backingMap.put(key, new NonTransactionalCommit());
+ }
+
+ Versioned<Collection<? extends byte[]>> addedValues = backingMap
+ .get(key)
+ .addCommit(commit);
+
+ if (addedValues != null) {
+ publish(addedValues.value().stream()
+ .map(value -> new MultimapEvent<String, byte[]>(
+ "", key, value, null))
+ .collect(Collectors.toList()));
+ }
+
+ return toVersioned(backingMap.get(key));
+ }
+
protected Versioned<Collection<? extends byte[]>> replace(
Commit<? extends Replace> commit) {
if (!backingMap.containsKey(commit.value().key())) {
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
index 0247fca..01f5d00 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
@@ -238,6 +238,28 @@
.thenAccept(result -> assertFalse(result)).join();
});
+ allKeys.forEach(key -> {
+ map.putAndGet(key, valueOne)
+ .thenAccept(result -> assertEquals(1, result.value().size()));
+ map.putAndGet(key, valueTwo)
+ .thenAccept(result -> assertEquals(2, result.value().size()));
+ map.putAndGet(key, valueThree)
+ .thenAccept(result -> assertEquals(3, result.value().size()));
+ map.putAndGet(key, valueFour)
+ .thenAccept(result -> assertEquals(4, result.value().size()));
+ });
+
+ allKeys.forEach(key -> {
+ map.removeAndGet(key, valueOne)
+ .thenAccept(result -> assertEquals(3, result.value().size()));
+ map.removeAndGet(key, valueTwo)
+ .thenAccept(result -> assertEquals(2, result.value().size()));
+ map.removeAndGet(key, valueThree)
+ .thenAccept(result -> assertEquals(1, result.value().size()));
+ map.removeAndGet(key, valueFour)
+ .thenAccept(result -> assertEquals(0, result.value().size()));
+ });
+
map.isEmpty().thenAccept(result -> assertTrue(result)).join();
//Repopulate for next test