Ensure multimap event listeners are executed after cache update to avoid stale reads from the cache.

Change-Id: I527cb2905cfde85b63a0b61ebca3e1a56ef49f48
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
index 27cfd34..3d49f7f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
@@ -16,8 +16,11 @@
 package org.onosproject.store.primitives.impl;
 
 import java.util.Collection;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
@@ -25,6 +28,7 @@
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.onosproject.store.service.AsyncConsistentMultimap;
 import org.onosproject.store.service.MultimapEventListener;
 import org.onosproject.store.service.Versioned;
@@ -41,6 +45,7 @@
     private static final int DEFAULT_CACHE_SIZE = 10000;
     private final Logger log = getLogger(getClass());
 
+    private final Map<MultimapEventListener<K, V>, Executor> mapEventListeners = new ConcurrentHashMap<>();
     private final LoadingCache<K, CompletableFuture<Versioned<Collection<? extends V>>>> cache;
     private final MultimapEventListener<K, V> cacheUpdater;
     private final Consumer<Status> statusListener;
@@ -102,6 +107,7 @@
                 default:
                     break;
             }
+            mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event)));
         };
         statusListener = status -> {
             log.debug("{} status changed to {}", this.name(), status);
@@ -111,7 +117,7 @@
                 cache.invalidateAll();
             }
         };
-        super.addListener(cacheUpdater);
+        super.addListener(cacheUpdater, MoreExecutors.directExecutor());
         super.addStatusChangeListener(statusListener);
     }
 
@@ -173,6 +179,18 @@
     }
 
     @Override
+    public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        mapEventListeners.put(listener, executor);
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+        mapEventListeners.remove(listener);
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
     public CompletableFuture<Void> destroy() {
         super.removeStatusChangeListener(statusListener);
         return super.destroy().thenCompose(v -> removeListener(cacheUpdater));
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimapTest.java
new file mode 100644
index 0000000..84f7595
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimapTest.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
+import org.junit.Test;
+import org.onosproject.store.primitives.DefaultConsistentMultimap;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimap;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
+import org.onosproject.store.primitives.resources.impl.AtomixTestBase;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for cached {@link AtomixConsistentSetMultimap}.
+ */
+public class CachingAsyncConsistentMultimapTest extends AtomixTestBase<AtomixConsistentSetMultimap> {
+
+    @Override
+    protected RaftService createService() {
+        return new AtomixConsistentSetMultimapService();
+    }
+
+    @Override
+    protected AtomixConsistentSetMultimap createPrimitive(RaftProxy proxy) {
+        return new AtomixConsistentSetMultimap(proxy);
+    }
+
+    /**
+     * Tests that reads following events are not stale when cached.
+     */
+    @Test
+    public void testCacheConsistency() throws Throwable {
+        Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
+
+        ConsistentMultimap<String, String> multimap1 = new DefaultConsistentMultimap<>(
+            new CachingAsyncConsistentMultimap<>(
+                new TranscodingAsyncConsistentMultimap<>(
+                    newPrimitive("testCacheConsistency"),
+                    k -> k,
+                    k -> k,
+                    v -> serializer.decode(v),
+                    v -> serializer.encode(v))), 5000);
+        ConsistentMultimap<String, String> multimap2 = new DefaultConsistentMultimap<>(
+            new CachingAsyncConsistentMultimap<>(
+                new TranscodingAsyncConsistentMultimap<>(
+                    newPrimitive("testCacheConsistency"),
+                    k -> k,
+                    k -> k,
+                    v -> serializer.decode(v),
+                    v -> serializer.encode(v))), 5000);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean failed = new AtomicBoolean();
+
+        Executor executor = Executors.newSingleThreadExecutor();
+        multimap1.addListener(event -> {
+            if (event.newValue().equals("baz")) {
+                Collection<? extends String> values = multimap1.get("foo").value();
+                try {
+                    assertEquals(2, values.size());
+                } catch (AssertionError e) {
+                    failed.set(true);
+                }
+                latch.countDown();
+            }
+        }, executor);
+
+        multimap2.put("foo", "bar");
+        multimap2.put("foo", "baz");
+
+        latch.await(10, TimeUnit.SECONDS);
+        if (latch.getCount() == 1 || failed.get()) {
+            fail();
+        }
+    }
+}