Ensure multimap events are published on replaceValues
Change-Id: Ie4fb007ae70e618f0adfd16fd506e326b24580bc
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 6c864b0..fd2e676 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -486,11 +486,28 @@
protected Versioned<Collection<? extends byte[]>> replace(
Commit<? extends Replace> commit) {
- if (!backingMap.containsKey(commit.value().key())) {
- backingMap.put(commit.value().key(),
- new NonTransactionalCommit());
+ String key = commit.value().key();
+ if (!backingMap.containsKey(key)) {
+ backingMap.put(key, new NonTransactionalCommit());
}
- return backingMap.get(commit.value().key()).addCommit(commit);
+
+ Versioned<Collection<? extends byte[]>> values = backingMap.get(commit.value().key()).addCommit(commit);
+ if (values != null) {
+ Set<byte[]> addedValues = Sets.newTreeSet(new ByteArrayComparator());
+ addedValues.addAll(commit.value().values());
+
+ Set<byte[]> removedValues = Sets.newTreeSet(new ByteArrayComparator());
+ removedValues.addAll(values.value());
+
+ List<MultimapEvent<String, byte[]>> events = Lists.newArrayList();
+ Sets.difference(removedValues, addedValues)
+ .forEach(value -> events.add(new MultimapEvent<>("", key, null, value)));
+ Sets.difference(addedValues, removedValues)
+ .forEach(value -> events.add(new MultimapEvent<>("", key, value, null)));
+
+ publish(events);
+ }
+ return values;
}
/**
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
index b14afb3..44d09bf 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
@@ -23,6 +23,8 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Lists;
@@ -34,9 +36,13 @@
import org.junit.Test;
import org.onlab.util.Tools;
import org.onosproject.store.service.AsyncIterator;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
/**
@@ -441,6 +447,45 @@
map.destroy().join();
}
+ @Test
+ public void testMultimapEvents() throws Throwable {
+ final byte[] value1 = Tools.getBytesUtf8("value1");
+ final byte[] value2 = Tools.getBytesUtf8("value2");
+ final byte[] value3 = Tools.getBytesUtf8("value3");
+
+ AtomixConsistentSetMultimap map = createResource("testFourMap");
+ TestMultimapEventListener listener = new TestMultimapEventListener();
+
+ // add listener; insert new value into map and verify an INSERT event is received.
+ map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
+ MultimapEvent<String, byte[]> event = listener.event();
+ assertNotNull(event);
+ assertEquals(MultimapEvent.Type.INSERT, event.type());
+ assertTrue(Arrays.equals(value1, event.newValue()));
+
+ // remove listener and verify listener is not notified.
+ map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
+ assertFalse(listener.eventReceived());
+
+ // add listener; insert new value into map and verify an INSERT event is received.
+ map.addListener(listener)
+ .thenCompose(v -> map.replaceValues("foo", Arrays.asList(value2, value3))).join();
+ event = listener.event();
+ assertNotNull(event);
+ assertEquals(MultimapEvent.Type.REMOVE, event.type());
+ assertArrayEquals(value1, event.oldValue());
+ event = listener.event();
+ assertNotNull(event);
+ assertEquals(MultimapEvent.Type.INSERT, event.type());
+ assertArrayEquals(value3, event.newValue());
+
+ // remove listener and verify listener is not notified.
+ map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
+ assertFalse(listener.eventReceived());
+
+ map.removeListener(listener).join();
+ }
+
private AtomixConsistentSetMultimap createResource(String mapName) {
try {
AtomixConsistentSetMultimap map = newPrimitive(mapName);
@@ -548,4 +593,26 @@
}
}
}
+
+ private static class TestMultimapEventListener implements MultimapEventListener<String, byte[]> {
+
+ private final BlockingQueue<MultimapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
+
+ @Override
+ public void event(MultimapEvent<String, byte[]> event) {
+ try {
+ queue.put(event);
+ } catch (InterruptedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public boolean eventReceived() {
+ return !queue.isEmpty();
+ }
+
+ public MultimapEvent<String, byte[]> event() throws InterruptedException {
+ return queue.take();
+ }
+ }
}