ONOS-2133: Support for purging associated stores (ConsistentMap/DistributedSet) when the application is uninstalled

Change-Id: I5bf7678f50ff3ed2792313383ff738c356bef69f
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java
index 50d829c..eb2583b 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java
@@ -1,5 +1,6 @@
 package org.onosproject.store.service;
 
+import org.onosproject.core.ApplicationId;
 
 /**
  * Builder for consistent maps.
@@ -24,6 +25,18 @@
     ConsistentMapBuilder<K, V> withName(String name);
 
     /**
+     * Sets the owner applicationId for the map.
+     * <p>
+     * Note: If {@code purgeOnUninstall} option is enabled, applicationId
+     * must be specified.
+     * </p>
+     *
+     * @param id applicationId owning the consistent map
+     * @return this ConsistentMapBuilder
+     */
+    ConsistentMapBuilder<K, V> withApplicationId(ApplicationId id);
+
+    /**
      * Sets a serializer that can be used to serialize
      * both the keys and values inserted into the map. The serializer
      * builder should be pre-populated with any classes that will be
@@ -65,6 +78,18 @@
     ConsistentMapBuilder<K, V> withUpdatesDisabled();
 
     /**
+     * Purges map contents when the application owning the map is uninstalled.
+     * <p>
+     * When this option is enabled, the caller must provide a applicationId via
+     * the {@code withAppliationId} builder method.
+     * <p>
+     * By default map entries will NOT be purged when owning application is uninstalled.
+     *
+     * @return this ConsistentMapBuilder
+     */
+    ConsistentMapBuilder<K, V> withPurgeOnUninstall();
+
+    /**
      * Builds an consistent map based on the configuration options
      * supplied to this builder.
      *
diff --git a/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
index c5608ce..5f4f287 100644
--- a/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
+++ b/core/api/src/main/java/org/onosproject/store/service/DistributedSetBuilder.java
@@ -15,6 +15,8 @@
  */
 package org.onosproject.store.service;
 
+import org.onosproject.core.ApplicationId;
+
 /**
  * Builder for distributed set.
  *
@@ -37,6 +39,18 @@
     DistributedSetBuilder<E> withName(String name);
 
     /**
+     * Sets the owner applicationId for the set.
+     * <p>
+     * Note: If {@code purgeOnUninstall} option is enabled, applicationId
+     * must be specified.
+     * </p>
+     *
+     * @param id applicationId owning the set
+     * @return this DistributedSetBuilder
+     */
+    DistributedSetBuilder<E> withApplicationId(ApplicationId id);
+
+    /**
      * Sets a serializer that can be used to serialize
      * the elements add to the set. The serializer
      * builder should be pre-populated with any classes that will be
@@ -78,6 +92,18 @@
     DistributedSetBuilder<E> withPartitionsDisabled();
 
     /**
+     * Purges set contents when the application owning the set is uninstalled.
+     * <p>
+     * When this option is enabled, the caller must provide a applicationId via
+     * the {@code withAppliationId} builder method.
+     * <p>
+     * By default set contents will NOT be purged when owning application is uninstalled.
+     *
+     * @return this DistributedSetBuilder
+     */
+    DistributedSetBuilder<E> withPurgeOnUninstall();
+
+    /**
      * Builds an set based on the configuration options
      * supplied to this builder.
      *
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 76fd7d6..f13ce85 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -17,7 +17,10 @@
 package org.onosproject.store.consistent.impl;
 
 import com.google.common.base.Charsets;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -42,12 +45,17 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.apache.felix.scr.annotations.Service;
 
 import static org.onlab.util.Tools.groupedThreads;
 
+import org.onosproject.app.ApplicationEvent;
+import org.onosproject.app.ApplicationListener;
+import org.onosproject.app.ApplicationService;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.core.IdGenerator;
 import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
 import org.onosproject.store.cluster.impl.NodeInfo;
@@ -84,6 +92,8 @@
 import java.util.stream.Collectors;
 
 import static org.slf4j.LoggerFactory.getLogger;
+import static org.onosproject.app.ApplicationEvent.Type.APP_UNINSTALLED;
+import static org.onosproject.app.ApplicationEvent.Type.APP_DEACTIVATED;
 
 /**
  * Database manager.
@@ -113,13 +123,18 @@
 
     private ExecutorService eventDispatcher;
     private ExecutorService queuePollExecutor;
+    private ApplicationListener appListener = new InternalApplicationListener();
 
     private final Map<String, DefaultAsyncConsistentMap> maps = Maps.newConcurrentMap();
+    private final ListMultimap<ApplicationId, DefaultAsyncConsistentMap> mapsByApplication = ArrayListMultimap.create();
     private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
 
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY, policy = ReferencePolicy.DYNAMIC)
+    protected ApplicationService applicationService;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
 
@@ -127,6 +142,16 @@
         return String.format("onos://%s:%d", node.getIp(), node.getTcpPort());
     }
 
+    protected void bindApplicationService(ApplicationService service) {
+        applicationService = service;
+        applicationService.addListener(appListener);
+    }
+
+    protected void unbindApplicationService(ApplicationService service) {
+        applicationService.removeListener(appListener);
+        this.applicationService = null;
+    }
+
     @Activate
     public void activate() {
         localNodeId = clusterService.getLocalNode().id();
@@ -250,6 +275,9 @@
             });
         clusterCommunicator.removeSubscriber(QUEUE_UPDATED_TOPIC);
         maps.values().forEach(this::unregisterMap);
+        if (applicationService != null) {
+            applicationService.removeListener(appListener);
+        }
         eventDispatcher.shutdown();
         queuePollExecutor.shutdown();
         log.info("Stopped");
@@ -421,6 +449,10 @@
             // FIXME: We need to cleanly support different map instances with same name.
             log.info("Map by name {} already exists", map.name());
             return existing;
+        } else {
+            if (map.applicationId() != null) {
+                mapsByApplication.put(map.applicationId(), map);
+            }
         }
 
         clusterCommunicator.<MapEvent<K, V>>addSubscriber(mapUpdatesSubject(map.name()),
@@ -434,6 +466,9 @@
         if (maps.remove(map.name()) != null) {
             clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name()));
         }
+        if (map.applicationId() != null) {
+            mapsByApplication.remove(map.applicationId(), map);
+        }
     }
 
     protected <E> void registerQueue(DefaultDistributedQueue<E> queue) {
@@ -446,4 +481,18 @@
     protected static MessageSubject mapUpdatesSubject(String mapName) {
         return new MessageSubject(mapName + "-map-updates");
     }
+
+    private class InternalApplicationListener implements ApplicationListener {
+        @Override
+        public void event(ApplicationEvent event) {
+            if (event.type() == APP_UNINSTALLED || event.type() == APP_DEACTIVATED) {
+                ApplicationId appId = event.subject().id();
+                List<DefaultAsyncConsistentMap> mapsToRemove = ImmutableList.copyOf(mapsByApplication.get(appId));
+                mapsToRemove.forEach(DatabaseManager.this::unregisterMap);
+                if (event.type() == APP_UNINSTALLED) {
+                    mapsToRemove.stream().filter(map -> map.purgeOnUninstall()).forEach(map -> map.clear());
+                }
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index 9ef33d5..298fe2f 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -37,6 +37,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.util.HexString;
 import org.onlab.util.Tools;
+import org.onosproject.core.ApplicationId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMapException;
 import org.onosproject.store.service.MapEvent;
@@ -59,9 +60,11 @@
 public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
 
     private final String name;
+    private final ApplicationId applicationId;
     private final Database database;
     private final Serializer serializer;
     private final boolean readOnly;
+    private final boolean purgeOnUninstall;
     private final Consumer<MapEvent<K, V>> eventPublisher;
 
     private final Set<MapEventListener<K, V>> listeners = new CopyOnWriteArraySet<>();
@@ -86,14 +89,18 @@
     }
 
     public DefaultAsyncConsistentMap(String name,
+            ApplicationId applicationId,
             Database database,
             Serializer serializer,
             boolean readOnly,
+            boolean purgeOnUninstall,
             Consumer<MapEvent<K, V>> eventPublisher) {
         this.name = checkNotNull(name, "map name cannot be null");
+        this.applicationId = applicationId;
         this.database = checkNotNull(database, "database cannot be null");
         this.serializer = checkNotNull(serializer, "serializer cannot be null");
         this.readOnly = readOnly;
+        this.purgeOnUninstall = purgeOnUninstall;
         this.eventPublisher = eventPublisher;
     }
 
@@ -113,6 +120,23 @@
         return serializer;
     }
 
+    /**
+     * Returns the applicationId owning this map.
+     * @return application Id
+     */
+    public ApplicationId applicationId() {
+        return applicationId;
+    }
+
+    /**
+     * Returns whether the map entries should be purged when the application
+     * owning it is uninstalled.
+     * @return true is map needs to cleared on app uninstall; false otherwise
+     */
+    public boolean purgeOnUninstall() {
+        return purgeOnUninstall;
+    }
+
     @Override
     public CompletableFuture<Integer> size() {
         return database.size(name);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
index 75b2505..23024ea 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
@@ -3,6 +3,7 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
+import org.onosproject.core.ApplicationId;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.ConsistentMapBuilder;
@@ -19,6 +20,8 @@
 
     private Serializer serializer;
     private String name;
+    private ApplicationId applicationId;
+    private boolean purgeOnUninstall = false;
     private boolean partitionsEnabled = true;
     private boolean readOnly = false;
     private final DatabaseManager manager;
@@ -35,6 +38,19 @@
     }
 
     @Override
+    public ConsistentMapBuilder<K, V> withApplicationId(ApplicationId id) {
+        checkArgument(id != null);
+        this.applicationId = id;
+        return this;
+    }
+
+    @Override
+    public ConsistentMapBuilder<K, V> withPurgeOnUninstall() {
+        purgeOnUninstall = true;
+        return this;
+    }
+
+    @Override
     public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) {
         checkArgument(serializer != null);
         this.serializer = serializer;
@@ -53,8 +69,12 @@
         return this;
     }
 
-    private boolean validInputs() {
-        return name != null && serializer != null;
+    private void validateInputs() {
+        checkState(name != null, "name must be specified");
+        checkState(serializer != null, "serializer must be specified");
+        if (purgeOnUninstall) {
+            checkState(applicationId != null, "ApplicationId must be specified when purgeOnUninstall is enabled");
+        }
     }
 
     @Override
@@ -68,12 +88,14 @@
     }
 
     private DefaultAsyncConsistentMap<K, V> buildAndRegisterMap() {
-        checkState(validInputs());
+        validateInputs();
         DefaultAsyncConsistentMap<K, V> asyncMap = new DefaultAsyncConsistentMap<>(
                 name,
+                applicationId,
                 partitionsEnabled ? manager.partitionedDatabase : manager.inMemoryDatabase,
                 serializer,
                 readOnly,
+                purgeOnUninstall,
                 event -> manager.clusterCommunicator.<MapEvent<K, V>>broadcast(event,
                         DatabaseManager.mapUpdatesSubject(name),
                         serializer::encode));
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
index 57ec232..b3e3da3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedSetBuilder.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.store.consistent.impl;
 
+import org.onosproject.core.ApplicationId;
 import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.DistributedSet;
 import org.onosproject.store.service.Serializer;
@@ -42,6 +43,18 @@
     }
 
     @Override
+    public DistributedSetBuilder<E> withApplicationId(ApplicationId id) {
+        mapBuilder.withApplicationId(id);
+        return this;
+    }
+
+    @Override
+    public DistributedSetBuilder<E> withPurgeOnUninstall() {
+        mapBuilder.withPurgeOnUninstall();
+        return this;
+    }
+
+    @Override
     public DistributedSetBuilder<E> withSerializer(Serializer serializer) {
         mapBuilder.withSerializer(serializer);
         return this;