Ensure multimap events are published on replaceValues

Change-Id: Ie4fb007ae70e618f0adfd16fd506e326b24580bc
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
index 6c864b0..fd2e676 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapService.java
@@ -486,11 +486,28 @@
 
     protected Versioned<Collection<? extends byte[]>> replace(
             Commit<? extends Replace> commit) {
-        if (!backingMap.containsKey(commit.value().key())) {
-            backingMap.put(commit.value().key(),
-                    new NonTransactionalCommit());
+        String key = commit.value().key();
+        if (!backingMap.containsKey(key)) {
+            backingMap.put(key, new NonTransactionalCommit());
         }
-        return backingMap.get(commit.value().key()).addCommit(commit);
+
+        Versioned<Collection<? extends byte[]>> values = backingMap.get(commit.value().key()).addCommit(commit);
+        if (values != null) {
+            Set<byte[]> addedValues = Sets.newTreeSet(new ByteArrayComparator());
+            addedValues.addAll(commit.value().values());
+
+            Set<byte[]> removedValues = Sets.newTreeSet(new ByteArrayComparator());
+            removedValues.addAll(values.value());
+
+            List<MultimapEvent<String, byte[]>> events = Lists.newArrayList();
+            Sets.difference(removedValues, addedValues)
+                .forEach(value -> events.add(new MultimapEvent<>("", key, null, value)));
+            Sets.difference(addedValues, removedValues)
+                .forEach(value -> events.add(new MultimapEvent<>("", key, value, null)));
+
+            publish(events);
+        }
+        return values;
     }
 
     /**
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
index b14afb3..44d09bf 100644
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentSetMultimapTest.java
@@ -23,6 +23,8 @@
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
@@ -34,9 +36,13 @@
 import org.junit.Test;
 import org.onlab.util.Tools;
 import org.onosproject.store.service.AsyncIterator;
+import org.onosproject.store.service.MultimapEvent;
+import org.onosproject.store.service.MultimapEventListener;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -441,6 +447,45 @@
         map.destroy().join();
     }
 
+    @Test
+    public void testMultimapEvents() throws Throwable {
+        final byte[] value1 = Tools.getBytesUtf8("value1");
+        final byte[] value2 = Tools.getBytesUtf8("value2");
+        final byte[] value3 = Tools.getBytesUtf8("value3");
+
+        AtomixConsistentSetMultimap map = createResource("testFourMap");
+        TestMultimapEventListener listener = new TestMultimapEventListener();
+
+        // add listener; insert new value into map and verify an INSERT event is received.
+        map.addListener(listener).thenCompose(v -> map.put("foo", value1)).join();
+        MultimapEvent<String, byte[]> event = listener.event();
+        assertNotNull(event);
+        assertEquals(MultimapEvent.Type.INSERT, event.type());
+        assertTrue(Arrays.equals(value1, event.newValue()));
+
+        // remove listener and verify listener is not notified.
+        map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
+        assertFalse(listener.eventReceived());
+
+        // add listener; insert new value into map and verify an INSERT event is received.
+        map.addListener(listener)
+            .thenCompose(v -> map.replaceValues("foo", Arrays.asList(value2, value3))).join();
+        event = listener.event();
+        assertNotNull(event);
+        assertEquals(MultimapEvent.Type.REMOVE, event.type());
+        assertArrayEquals(value1, event.oldValue());
+        event = listener.event();
+        assertNotNull(event);
+        assertEquals(MultimapEvent.Type.INSERT, event.type());
+        assertArrayEquals(value3, event.newValue());
+
+        // remove listener and verify listener is not notified.
+        map.removeListener(listener).thenCompose(v -> map.put("foo", value2)).join();
+        assertFalse(listener.eventReceived());
+
+        map.removeListener(listener).join();
+    }
+
     private AtomixConsistentSetMultimap createResource(String mapName) {
         try {
             AtomixConsistentSetMultimap map = newPrimitive(mapName);
@@ -548,4 +593,26 @@
             }
         }
     }
+
+    private static class TestMultimapEventListener implements MultimapEventListener<String, byte[]> {
+
+        private final BlockingQueue<MultimapEvent<String, byte[]>> queue = new ArrayBlockingQueue<>(1);
+
+        @Override
+        public void event(MultimapEvent<String, byte[]> event) {
+            try {
+                queue.put(event);
+            } catch (InterruptedException e) {
+                throw new IllegalStateException(e);
+            }
+        }
+
+        public boolean eventReceived() {
+            return !queue.isEmpty();
+        }
+
+        public MultimapEvent<String, byte[]> event() throws InterruptedException {
+            return queue.take();
+        }
+    }
 }