ONOS-4423: Support for invalidating cached map entries when a client session is suspended

Change-Id: Icb5e73dc7a37d9459d26cd3a5c9ca1e1a05b0436
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
index 3f1e53b..5360e50 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/DefaultConsistentMap.java
@@ -25,6 +25,7 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
@@ -187,6 +188,21 @@
     }
 
     @Override
+    public void addStatusChangeListener(Consumer<Status> listener) {
+        asyncMap.addStatusChangeListener(listener);
+    }
+
+    @Override
+    public void removeStatusChangeListener(Consumer<Status> listener) {
+        asyncMap.removeStatusChangeListener(listener);
+    }
+
+    @Override
+    public Collection<Consumer<Status>> statusChangeListeners() {
+        return asyncMap.statusChangeListeners();
+    }
+
+    @Override
     public Map<K, V> asJavaMap() {
         synchronized (this) {
             if (javaMap == null) {
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
index e8111d6..38179df 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedPrimitive.java
@@ -15,7 +15,10 @@
  */
 package org.onosproject.store.service;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 
 import org.onosproject.core.ApplicationId;
 
@@ -74,6 +77,29 @@
         TRANSACTION_CONTEXT
     }
 
+    /**
+     * Status of distributed primitive.
+     */
+    public enum Status {
+
+        /**
+         * Signifies a state wherein the primitive is operating correctly and is capable of meeting the advertised
+         * consistency and reliability guarantees.
+         */
+        ACTIVE,
+
+        /**
+         * Signifies a state wherein the primitive is temporarily incapable of providing the advertised
+         * consistency properties.
+         */
+        SUSPENDED,
+
+        /**
+         * Signifies a state wherein the primitive has been shutdown and therefore cannot perform its functions.
+         */
+        INACTIVE
+    }
+
     static final long DEFAULT_OPERTATION_TIMEOUT_MILLIS = 60000L;
 
     /**
@@ -107,4 +133,24 @@
     default CompletableFuture<Void> destroy() {
         return CompletableFuture.completedFuture(null);
     }
+
+    /**
+     * Registers a listener to be called when the primitive's status changes.
+     * @param listener The listener to be called when the status changes.
+     */
+    default void addStatusChangeListener(Consumer<Status> listener) {}
+
+    /**
+     * Unregisters a previously registered listener to be called when the primitive's status changes.
+     * @param listener The listener to unregister
+     */
+    default void removeStatusChangeListener(Consumer<Status> listener) {}
+
+    /**
+     * Returns the collection of status change listeners previously registered.
+     * @return collection of status change listeners
+     */
+    default Collection<Consumer<Status>> statusChangeListeners() {
+        return Collections.emptyList();
+    }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java
index 8d49a83..34d4e21 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CachingAsyncConsistentMap.java
@@ -15,18 +15,25 @@
  */
 package org.onosproject.store.primitives.impl;
 
+import static org.slf4j.LoggerFactory.getLogger;
+
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Versioned;
+import org.slf4j.Logger;
 
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 
+import static org.onosproject.store.service.DistributedPrimitive.Status.INACTIVE;
+import static org.onosproject.store.service.DistributedPrimitive.Status.SUSPENDED;
+
 /**
  * {@code AsyncConsistentMap} that caches entries on read.
  * <p>
@@ -39,20 +46,13 @@
  * @param <V> value type
  */
 public class CachingAsyncConsistentMap<K, V> extends DelegatingAsyncConsistentMap<K, V> {
-    private int maxCacheSize = 10000;
+    private static final int DEFAULT_CACHE_SIZE = 10000;
+    private final Logger log = getLogger(getClass());
 
-    private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache =
-            CacheBuilder.newBuilder()
-                    .maximumSize(maxCacheSize)
-                    .build(new CacheLoader<K, CompletableFuture<Versioned<V>>>() {
-                        @Override
-                        public CompletableFuture<Versioned<V>> load(K key)
-                                throws Exception {
-                            return CachingAsyncConsistentMap.super.get(key);
-                        }
-                    });
+    private final LoadingCache<K, CompletableFuture<Versioned<V>>> cache;
 
-    private final MapEventListener<K, V> cacheInvalidator = event -> cache.invalidate(event.key());
+    private final MapEventListener<K, V> cacheInvalidator;
+    private final Consumer<Status> statusListener;
 
     /**
      * Default constructor.
@@ -60,24 +60,36 @@
      * @param backingMap a distributed, strongly consistent map for backing
      */
     public CachingAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap) {
-        super(backingMap);
-        super.addListener(cacheInvalidator);
+        this(backingMap, DEFAULT_CACHE_SIZE);
     }
 
     /**
-     * Constructor to configure cache size of LoadingCache.
+     * Constructor to configure cache size.
      *
      * @param backingMap a distributed, strongly consistent map for backing
      * @param cacheSize the maximum size of the cache
      */
     public CachingAsyncConsistentMap(AsyncConsistentMap<K, V> backingMap, int cacheSize) {
         super(backingMap);
+        cache = CacheBuilder.newBuilder()
+                            .maximumSize(cacheSize)
+                            .build(CacheLoader.from(CachingAsyncConsistentMap.super::get));
+        cacheInvalidator = event -> cache.invalidate(event.key());
+        statusListener = status -> {
+            log.debug("{} status changed to {}", this.name(), status);
+            // If the status of the underlying map is SUSPENDED or INACTIVE
+            // we can no longer guarantee that the cache will be in sync.
+            if (status == SUSPENDED || status == INACTIVE) {
+                cache.invalidateAll();
+            }
+        };
         super.addListener(cacheInvalidator);
-        maxCacheSize = cacheSize;
+        super.addStatusChangeListener(statusListener);
     }
 
     @Override
     public CompletableFuture<Void> destroy() {
+        super.removeStatusChangeListener(statusListener);
         return super.destroy().thenCompose(v -> removeListener(cacheInvalidator));
     }
 
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index c02dd91..f2a86d1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -24,6 +24,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 
 import org.onosproject.core.ApplicationId;
@@ -32,7 +33,6 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
-
 import com.google.common.base.MoreObjects;
 
 /**
@@ -183,6 +183,21 @@
     }
 
     @Override
+    public void addStatusChangeListener(Consumer<Status> listener) {
+        delegateMap.addStatusChangeListener(listener);
+    }
+
+    @Override
+    public void removeStatusChangeListener(Consumer<Status> listener) {
+        delegateMap.removeStatusChangeListener(listener);
+    }
+
+    @Override
+    public Collection<Consumer<Status>> statusChangeListeners() {
+        return delegateMap.statusChangeListeners();
+    }
+
+    @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
                           .add("delegateMap", delegateMap)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 37ae453..9535134 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -27,6 +27,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -38,7 +39,6 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
-
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -254,6 +254,21 @@
                     .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
     }
 
+    @Override
+    public void addStatusChangeListener(Consumer<Status> listener) {
+        partitions.values().forEach(map -> map.addStatusChangeListener(listener));
+    }
+
+    @Override
+    public void removeStatusChangeListener(Consumer<Status> listener) {
+        partitions.values().forEach(map -> map.removeStatusChangeListener(listener));
+    }
+
+    @Override
+    public Collection<Consumer<Status>> statusChangeListeners() {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * Returns the map (partition) to which the specified key maps.
      * @param key key
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index 230060a..f2aa0a8 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -23,6 +23,7 @@
 import io.atomix.catalyst.util.concurrent.CatalystThreadFactory;
 import io.atomix.copycat.client.ConnectionStrategies;
 import io.atomix.copycat.client.CopycatClient;
+import io.atomix.copycat.client.CopycatClient.State;
 import io.atomix.copycat.client.RecoveryStrategies;
 import io.atomix.copycat.client.RetryStrategies;
 import io.atomix.copycat.client.ServerSelectionStrategies;
@@ -36,6 +37,8 @@
 import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import org.onlab.util.HexString;
 import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -48,6 +51,7 @@
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AsyncLeaderElector;
+import org.onosproject.store.service.DistributedPrimitive.Status;
 import org.onosproject.store.service.DistributedQueue;
 import org.onosproject.store.service.Serializer;
 import org.slf4j.Logger;
@@ -71,6 +75,18 @@
     private final Supplier<AsyncConsistentMap<String, byte[]>> onosAtomicValuesMap =
             Suppliers.memoize(() -> newAsyncConsistentMap(ATOMIC_VALUES_CONSISTENT_MAP_NAME,
                                                           Serializer.using(KryoNamespaces.BASIC)));
+    Function<State, Status> mapper = state -> {
+                                        switch (state) {
+                                        case CONNECTED:
+                                            return Status.ACTIVE;
+                                        case SUSPENDED:
+                                            return Status.SUSPENDED;
+                                        case CLOSED:
+                                            return Status.INACTIVE;
+                                        default:
+                                            throw new IllegalStateException("Unknown state " + state);
+                                        }
+                                    };
 
     public StoragePartitionClient(StoragePartition partition,
             io.atomix.catalyst.serializer.Serializer serializer,
@@ -90,7 +106,8 @@
                                              transport,
                                              serializer.clone(),
                                              StoragePartition.RESOURCE_TYPES);
-            copycatClient.onStateChange(state -> log.info("Client state {}", state));
+          copycatClient.onStateChange(state -> log.debug("Partition {} client state"
+                    + " changed to {}", partition.getId(), state));
             client = new AtomixClient(new ResourceClient(copycatClient));
         }
         return client.open().whenComplete((r, e) -> {
@@ -109,9 +126,14 @@
 
     @Override
     public <K, V> AsyncConsistentMap<K, V> newAsyncConsistentMap(String name, Serializer serializer) {
+        AtomixConsistentMap atomixConsistentMap = client.getResource(name, AtomixConsistentMap.class).join();
+        Consumer<State> statusListener = state -> {
+            atomixConsistentMap.statusChangeListeners()
+                               .forEach(listener -> listener.accept(mapper.apply(state)));
+        };
+        copycatClient.onStateChange(statusListener);
         AsyncConsistentMap<String, byte[]> rawMap =
-                new DelegatingAsyncConsistentMap<String, byte[]>(client.getResource(name, AtomixConsistentMap.class)
-                                                                       .join()) {
+                new DelegatingAsyncConsistentMap<String, byte[]>(atomixConsistentMap) {
                     @Override
                     public String name() {
                         return name;
@@ -139,9 +161,7 @@
 
     @Override
     public <V> AsyncAtomicValue<V> newAsyncAtomicValue(String name, Serializer serializer) {
-       return new DefaultAsyncAtomicValue<>(name,
-                                        serializer,
-                                        onosAtomicValuesMap.get());
+       return new DefaultAsyncAtomicValue<>(name, serializer, onosAtomicValuesMap.get());
     }
 
     @Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index ee882a7..0178730 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -22,6 +22,7 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -33,7 +34,6 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
-
 import com.google.common.collect.Maps;
 
 /**
@@ -281,6 +281,21 @@
         }
     }
 
+    @Override
+    public void addStatusChangeListener(Consumer<Status> listener) {
+        backingMap.addStatusChangeListener(listener);
+    }
+
+    @Override
+    public void removeStatusChangeListener(Consumer<Status> listener) {
+        backingMap.removeStatusChangeListener(listener);
+    }
+
+    @Override
+    public Collection<Consumer<Status>> statusChangeListeners() {
+        return backingMap.statusChangeListeners();
+    }
+
     private class InternalBackingMapEventListener implements MapEventListener<K2, V2> {
 
         private final MapEventListener<K1, V1> listener;
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 4813849..2cb4a50 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Predicate;
 
 import org.onlab.util.Match;
@@ -53,7 +54,7 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.MapTransaction;
 import org.onosproject.store.service.Versioned;
-
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 
 /**
@@ -63,6 +64,7 @@
 public class AtomixConsistentMap extends AbstractResource<AtomixConsistentMap>
     implements AsyncConsistentMap<String, byte[]> {
 
+    private final Set<Consumer<Status>> statusChangeListeners = Sets.newCopyOnWriteArraySet();
     private final Set<MapEventListener<String, byte[]>> mapEventListeners = Sets.newCopyOnWriteArraySet();
 
     public static final String CHANGE_SUBJECT = "changeEvents";
@@ -291,4 +293,19 @@
     public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
         return submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
     }
+
+    @Override
+    public void addStatusChangeListener(Consumer<Status> listener) {
+        statusChangeListeners.add(listener);
+    }
+
+    @Override
+    public void removeStatusChangeListener(Consumer<Status> listener) {
+        statusChangeListeners.remove(listener);
+    }
+
+    @Override
+    public Collection<Consumer<Status>> statusChangeListeners() {
+        return ImmutableSet.copyOf(statusChangeListeners);
+    }
 }
\ No newline at end of file