Ensure multimap event listeners are executed after cache update to avoid stale reads from the cache.
Change-Id: I527cb2905cfde85b63a0b61ebca3e1a56ef49f48
(cherry picked from commit 837be006017353e5af14addd6d36c7dfe46d978c)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
index 27cfd34..3d49f7f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimap.java
@@ -16,8 +16,11 @@
package org.onosproject.store.primitives.impl;
import java.util.Collection;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.function.Consumer;
import java.util.stream.Collectors;
@@ -25,6 +28,7 @@
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.MoreExecutors;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.MultimapEventListener;
import org.onosproject.store.service.Versioned;
@@ -41,6 +45,7 @@
private static final int DEFAULT_CACHE_SIZE = 10000;
private final Logger log = getLogger(getClass());
+ private final Map<MultimapEventListener<K, V>, Executor> mapEventListeners = new ConcurrentHashMap<>();
private final LoadingCache<K, CompletableFuture<Versioned<Collection<? extends V>>>> cache;
private final MultimapEventListener<K, V> cacheUpdater;
private final Consumer<Status> statusListener;
@@ -102,6 +107,7 @@
default:
break;
}
+ mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event)));
};
statusListener = status -> {
log.debug("{} status changed to {}", this.name(), status);
@@ -111,7 +117,7 @@
cache.invalidateAll();
}
};
- super.addListener(cacheUpdater);
+ super.addListener(cacheUpdater, MoreExecutors.directExecutor());
super.addStatusChangeListener(statusListener);
}
@@ -173,6 +179,18 @@
}
@Override
+ public CompletableFuture<Void> addListener(MultimapEventListener<K, V> listener, Executor executor) {
+ mapEventListeners.put(listener, executor);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MultimapEventListener<K, V> listener) {
+ mapEventListeners.remove(listener);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
public CompletableFuture<Void> destroy() {
super.removeStatusChangeListener(statusListener);
return super.destroy().thenCompose(v -> removeListener(cacheUpdater));
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimapTest.java
new file mode 100644
index 0000000..84f7595
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMultimapTest.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
+import org.junit.Test;
+import org.onosproject.store.primitives.DefaultConsistentMultimap;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimap;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentSetMultimapService;
+import org.onosproject.store.primitives.resources.impl.AtomixTestBase;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for cached {@link AtomixConsistentSetMultimap}.
+ */
+public class CachingAsyncConsistentMultimapTest extends AtomixTestBase<AtomixConsistentSetMultimap> {
+
+ @Override
+ protected RaftService createService() {
+ return new AtomixConsistentSetMultimapService();
+ }
+
+ @Override
+ protected AtomixConsistentSetMultimap createPrimitive(RaftProxy proxy) {
+ return new AtomixConsistentSetMultimap(proxy);
+ }
+
+ /**
+ * Tests that reads following events are not stale when cached.
+ */
+ @Test
+ public void testCacheConsistency() throws Throwable {
+ Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
+
+ ConsistentMultimap<String, String> multimap1 = new DefaultConsistentMultimap<>(
+ new CachingAsyncConsistentMultimap<>(
+ new TranscodingAsyncConsistentMultimap<>(
+ newPrimitive("testCacheConsistency"),
+ k -> k,
+ k -> k,
+ v -> serializer.decode(v),
+ v -> serializer.encode(v))), 5000);
+ ConsistentMultimap<String, String> multimap2 = new DefaultConsistentMultimap<>(
+ new CachingAsyncConsistentMultimap<>(
+ new TranscodingAsyncConsistentMultimap<>(
+ newPrimitive("testCacheConsistency"),
+ k -> k,
+ k -> k,
+ v -> serializer.decode(v),
+ v -> serializer.encode(v))), 5000);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean failed = new AtomicBoolean();
+
+ Executor executor = Executors.newSingleThreadExecutor();
+ multimap1.addListener(event -> {
+ if (event.newValue().equals("baz")) {
+ Collection<? extends String> values = multimap1.get("foo").value();
+ try {
+ assertEquals(2, values.size());
+ } catch (AssertionError e) {
+ failed.set(true);
+ }
+ latch.countDown();
+ }
+ }, executor);
+
+ multimap2.put("foo", "bar");
+ multimap2.put("foo", "baz");
+
+ latch.await(10, TimeUnit.SECONDS);
+ if (latch.getCount() == 1 || failed.get()) {
+ fail();
+ }
+ }
+}