Ensure cache map listeners are run after cache has been updated to prevent stale reads from the cache
Change-Id: Ia9abf57aa6f18037c9e5db7ef4f21f72bf8b211b
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java
index 5e977d8..a25fb79 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java
@@ -17,12 +17,16 @@
import static org.slf4j.LoggerFactory.getLogger;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
+import com.google.common.util.concurrent.MoreExecutors;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Versioned;
@@ -52,6 +56,7 @@
private static final int DEFAULT_CACHE_SIZE = 10000;
private final Logger log = getLogger(getClass());
+ private final Map<MapEventListener<K, V>, Executor> mapEventListeners = new ConcurrentHashMap<>();
private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache;
private final AsyncConsistentMap<K, V> backingMap;
private final MapEventListener<K, V> cacheUpdater;
@@ -85,6 +90,7 @@
} else {
cache.put(event.key(), CompletableFuture.completedFuture(newValue));
}
+ mapEventListeners.forEach((listener, executor) -> executor.execute(() -> listener.event(event)));
};
statusListener = status -> {
log.debug("{} status changed to {}", this.name(), status);
@@ -94,7 +100,7 @@
cache.invalidateAll();
}
};
- super.addListener(cacheUpdater);
+ super.addListener(cacheUpdater, MoreExecutors.directExecutor());
super.addStatusChangeListener(statusListener);
}
@@ -224,4 +230,16 @@
}
});
}
+
+ @Override
+ public CompletableFuture<Void> addListener(MapEventListener<K, V> listener, Executor executor) {
+ mapEventListeners.put(listener, executor);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(MapEventListener<K, V> listener) {
+ mapEventListeners.remove(listener);
+ return CompletableFuture.completedFuture(null);
+ }
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMapTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMapTest.java
new file mode 100644
index 0000000..cdb4ecb
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMapTest.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import io.atomix.protocols.raft.proxy.RaftProxy;
+import io.atomix.protocols.raft.service.RaftService;
+import org.junit.Test;
+import org.onosproject.store.primitives.DefaultConsistentMap;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMap;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapService;
+import org.onosproject.store.primitives.resources.impl.AtomixTestBase;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * Unit tests for cached {@link AtomixConsistentMap}.
+ */
+public class CachingAsyncConsistentMapTest extends AtomixTestBase<AtomixConsistentMap> {
+
+ @Override
+ protected RaftService createService() {
+ return new AtomixConsistentMapService();
+ }
+
+ @Override
+ protected AtomixConsistentMap createPrimitive(RaftProxy proxy) {
+ return new AtomixConsistentMap(proxy);
+ }
+
+ /**
+ * Tests that reads following events are not stale when cached.
+ */
+ @Test
+ public void testCacheConsistency() throws Throwable {
+ Serializer serializer = Serializer.using(KryoNamespaces.BASIC);
+
+ ConsistentMap<String, String> map1 = new DefaultConsistentMap<>(
+ new CachingAsyncConsistentMap<>(
+ new TranscodingAsyncConsistentMap<>(
+ newPrimitive("testCacheConsistency"),
+ k -> k,
+ k -> k,
+ v -> serializer.encode(v),
+ v -> serializer.decode(v))), 5000);
+ ConsistentMap<String, String> map2 = new DefaultConsistentMap<>(
+ new CachingAsyncConsistentMap<>(
+ new TranscodingAsyncConsistentMap<>(
+ newPrimitive("testCacheConsistency"),
+ k -> k,
+ k -> k,
+ v -> serializer.encode(v),
+ v -> serializer.decode(v))), 5000);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ AtomicBoolean failed = new AtomicBoolean();
+
+ Executor executor = Executors.newSingleThreadExecutor();
+ map1.addListener(event -> {
+ // Check only the "baz" value since it's the last one written. If we check for "bar" on the "bar" event,
+ // there's a race in the test wherein the cache can legitimately be updated to "baz" before the next read.
+ if (event.newValue().value().equals("baz")) {
+ try {
+ assertEquals(event.newValue().value(), map1.get("foo").value());
+ } catch (AssertionError e) {
+ failed.set(true);
+ }
+ latch.countDown();
+ }
+ }, executor);
+
+ map2.put("foo", "bar");
+ map2.put("foo", "baz");
+
+ latch.await(10, TimeUnit.SECONDS);
+ if (latch.getCount() == 1 || failed.get()) {
+ fail();
+ }
+ }
+}
\ No newline at end of file