ONOS-4423: Support for invalidating cached map entries when a client session is suspended
Change-Id: Icb5e73dc7a37d9459d26cd3a5c9ca1e1a05b0436
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
index 3f1e53b..5360e50 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
@@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
@@ -187,6 +188,21 @@
}
@Override
+ public void addStatusChangeListener(Consumer<Status> listener) {
+ asyncMap.addStatusChangeListener(listener);
+ }
+
+ @Override
+ public void removeStatusChangeListener(Consumer<Status> listener) {
+ asyncMap.removeStatusChangeListener(listener);
+ }
+
+ @Override
+ public Collection<Consumer<Status>> statusChangeListeners() {
+ return asyncMap.statusChangeListeners();
+ }
+
+ @Override
public Map<K, V> asJavaMap() {
synchronized (this) {
if (javaMap == null) {
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
index e8111d6..38179df 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
@@ -15,7 +15,10 @@
*/
package org.onosproject.store.service;
+import java.util.Collection;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import org.onosproject.core.ApplicationId;
@@ -74,6 +77,29 @@
TRANSACTION_CONTEXT
}
+ /**
+ * Status of distributed primitive.
+ */
+ public enum Status {
+
+ /**
+ * Signifies a state wherein the primitive is operating correctly and is capable of meeting the advertised
+ * consistency and reliability guarantees.
+ */
+ ACTIVE,
+
+ /**
+ * Signifies a state wherein the primitive is temporarily incapable of providing the advertised
+ * consistency properties.
+ */
+ SUSPENDED,
+
+ /**
+ * Signifies a state wherein the primitive has been shutdown and therefore cannot perform its functions.
+ */
+ INACTIVE
+ }
+
static final long DEFAULT_OPERTATION_TIMEOUT_MILLIS = 60000L;
/**
@@ -107,4 +133,24 @@
default CompletableFuture<Void> destroy() {
return CompletableFuture.completedFuture(null);
}
+
+ /**
+ * Registers a listener to be called when the primitive's status changes.
+ * @param listener The listener to be called when the status changes.
+ */
+ default void addStatusChangeListener(Consumer<Status> listener) {}
+
+ /**
+ * Unregisters a previously registered listener to be called when the primitive's status changes.
+ * @param listener The listener to unregister
+ */
+ default void removeStatusChangeListener(Consumer<Status> listener) {}
+
+ /**
+ * Returns the collection of status change listeners previously registered.
+ * @return collection of status change listeners
+ */
+ default Collection<Consumer<Status>> statusChangeListeners() {
+ return Collections.emptyList();
+ }
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java
index 8d49a83..34d4e21 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java
@@ -15,18 +15,25 @@
*/
package org.onosproject.store.primitives.impl;
+import static org.slf4j.LoggerFactory.getLogger;
+
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
+import static org.onosproject.store.service.DistributedPrimitive.Status.INACTIVE;
+import static org.onosproject.store.service.DistributedPrimitive.Status.SUSPENDED;
+
/**
* {@code AsyncConsistentMap} that caches entries on read.
* <p>
@@ -39,20 +46,13 @@
* @param <V> value type
*/
public class CachingAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMap<K, V> {
- private int maxCacheSize = 10000;
+ private static final int DEFAULT_CACHE_SIZE = 10000;
+ private final Logger log = getLogger(getClass());
- private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache =
- CacheBuilder.newBuilder()
- .maximumSize(maxCacheSize)
- .build(new CacheLoader<K, CompletableFuture<Versioned<V>>>() {
- @Override
- public CompletableFuture<Versioned<V>> load(K key)
- throws Exception {
- return CachingAsyncConsistentMap.super.get(key);
- }
- });
+ private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache;
- private final MapEventListener<K, V> cacheInvalidator = event -> cache.invalidate(event.key());
+ private final MapEventListener<K, V> cacheInvalidator;
+ private final Consumer<Status> statusListener;
/**
* Default constructor.
@@ -60,24 +60,36 @@
* @param backingMap a distributed, strongly consistent map for backing
*/
public CachingAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap) {
- super(backingMap);
- super.addListener(cacheInvalidator);
+ this(backingMap, DEFAULT_CACHE_SIZE);
}
/**
- * Constructor to configure cache size of LoadingCache.
+ * Constructor to configure cache size.
*
* @param backingMap a distributed, strongly consistent map for backing
* @param cacheSize the maximum size of the cache
*/
public CachingAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap, int cacheSize) {
super(backingMap);
+ cache = CacheBuilder.newBuilder()
+ .maximumSize(cacheSize)
+ .build(CacheLoader.from(CachingAsyncConsistentMap.super::get));
+ cacheInvalidator = event -> cache.invalidate(event.key());
+ statusListener = status -> {
+ log.debug("{} status changed to {}", this.name(), status);
+ // If the status of the underlying map is SUSPENDED or INACTIVE
+ // we can no longer guarantee that the cache will be in sync.
+ if (status == SUSPENDED || status == INACTIVE) {
+ cache.invalidateAll();
+ }
+ };
super.addListener(cacheInvalidator);
- maxCacheSize = cacheSize;
+ super.addStatusChangeListener(statusListener);
}
@Override
public CompletableFuture<Void> destroy() {
+ super.removeStatusChangeListener(statusListener);
return super.destroy().thenCompose(v -> removeListener(cacheInvalidator));
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index c02dd91..f2a86d1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -24,6 +24,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import org.onosproject.core.ApplicationId;
@@ -32,7 +33,6 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
-
import com.google.common.base.MoreObjects;
/**
@@ -183,6 +183,21 @@
}
@Override
+ public void addStatusChangeListener(Consumer<Status> listener) {
+ delegateMap.addStatusChangeListener(listener);
+ }
+
+ @Override
+ public void removeStatusChangeListener(Consumer<Status> listener) {
+ delegateMap.removeStatusChangeListener(listener);
+ }
+
+ @Override
+ public Collection<Consumer<Status>> statusChangeListeners() {
+ return delegateMap.statusChangeListeners();
+ }
+
+ @Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("delegateMap", delegateMap)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 37ae453..9535134 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -38,7 +39,6 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
-
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -254,6 +254,21 @@
.thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
}
+ @Override
+ public void addStatusChangeListener(Consumer<Status> listener) {
+ partitions.values().forEach(map -> map.addStatusChangeListener(listener));
+ }
+
+ @Override
+ public void removeStatusChangeListener(Consumer<Status> listener) {
+ partitions.values().forEach(map -> map.removeStatusChangeListener(listener));
+ }
+
+ @Override
+ public Collection<Consumer<Status>> statusChangeListeners() {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Returns the map (partition) to which the specified key maps.
* @param key key
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 230060a..f2aa0a8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -23,6 +23,7 @@
import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
import io.atomix.copycat.client.ConnectionStrategies;
import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.client.CopycatClient.State;
import io.atomix.copycat.client.RecoveryStrategies;
import io.atomix.copycat.client.RetryStrategies;
import io.atomix.copycat.client.ServerSelectionStrategies;
@@ -36,6 +37,8 @@
import java.util.Collection;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
import org.onlab.util.HexString;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -48,6 +51,7 @@
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDistributedSet;
import org.onosproject.store.service.AsyncLeaderElector;
+import org.onosproject.store.service.DistributedPrimitive.Status;
import org.onosproject.store.service.DistributedQueue;
import org.onosproject.store.service.Serializer;
import org.slf4j.Logger;
@@ -71,6 +75,18 @@
private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
Serializer.using(KryoNamespaces.BASIC)));
+ Function<State, Status> mapper = state -> {
+ switch (state) {
+ case CONNECTED:
+ return Status.ACTIVE;
+ case SUSPENDED:
+ return Status.SUSPENDED;
+ case CLOSED:
+ return Status.INACTIVE;
+ default:
+ throw new IllegalStateException("Unknown state " + state);
+ }
+ };
public StoragePartitionClient(StoragePartition partition,
io.atomix.catalyst.serializer.Serializer serializer,
@@ -90,7 +106,8 @@
transport,
serializer.clone(),
StoragePartition.RESOURCE_TYPES);
- copycatClient.onStateChange(state -> log.info("Client state {}", state));
+ copycatClient.onStateChange(state -> log.debug("Partition {} client state"
+ + " changed to {}", partition.getId(), state));
client = new AtomixClient(new ResourceClient(copycatClient));
}
return client.open().whenComplete((r, e) -> {
@@ -109,9 +126,14 @@
@Override
public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
+ AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
+ Consumer<State> statusListener = state -> {
+ atomixConsistentMap.statusChangeListeners()
+ .forEach(listener -> listener.accept(mapper.apply(state)));
+ };
+ copycatClient.onStateChange(statusListener);
AsyncConsistentMap<String, byte[]> rawMap =
- new DelegatingAsyncConsistentMap<String, byte[]>(client.getResource(name, AtomixConsistentMap.class)
- .join()) {
+ new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
@Override
public String name() {
return name;
@@ -139,9 +161,7 @@
@Override
public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
- return new DefaultAsyncAtomicValue<>(name,
- serializer,
- onosAtomicValuesMap.get());
+ return new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index ee882a7..0178730 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -22,6 +22,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -33,7 +34,6 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
-
import com.google.common.collect.Maps;
/**
@@ -281,6 +281,21 @@
}
}
+ @Override
+ public void addStatusChangeListener(Consumer<Status> listener) {
+ backingMap.addStatusChangeListener(listener);
+ }
+
+ @Override
+ public void removeStatusChangeListener(Consumer<Status> listener) {
+ backingMap.removeStatusChangeListener(listener);
+ }
+
+ @Override
+ public Collection<Consumer<Status>> statusChangeListeners() {
+ return backingMap.statusChangeListeners();
+ }
+
private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
private final MapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 4813849..2cb4a50 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
+import java.util.function.Consumer;
import java.util.function.Predicate;
import org.onlab.util.Match;
@@ -53,7 +54,7 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.MapTransaction;
import org.onosproject.store.service.Versioned;
-
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
/**
@@ -63,6 +64,7 @@
public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
implements AsyncConsistentMap<String, byte[]> {
+ private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
public static final String CHANGE_SUBJECT = "changeEvents";
@@ -291,4 +293,19 @@
public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
}
+
+ @Override
+ public void addStatusChangeListener(Consumer<Status> listener) {
+ statusChangeListeners.add(listener);
+ }
+
+ @Override
+ public void removeStatusChangeListener(Consumer<Status> listener) {
+ statusChangeListeners.remove(listener);
+ }
+
+ @Override
+ public Collection<Consumer<Status>> statusChangeListeners() {
+ return ImmutableSet.copyOf(statusChangeListeners);
+ }
}
\ No newline at end of file