New ApplicationStore that uses a single ConsistentMap to track all app related state

Change-Id: Ieacc97f213add8ece8f462cd9971fb6ef3d0dde5
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
similarity index 64%
rename from core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
rename to core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
index 9809d11..d2707d6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2015-present Open Networking Laboratory
+ * Copyright 2016-present Open Networking Laboratory
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -16,18 +16,20 @@
 package org.onosproject.store.app;
 
 import com.google.common.base.Charsets;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
 import org.onosproject.app.ApplicationDescription;
 import org.onosproject.app.ApplicationEvent;
 import org.onosproject.app.ApplicationException;
@@ -46,13 +48,14 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.LogicalClockService;
-import org.onosproject.store.service.MultiValuedTimestamp;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageException;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.DistributedPrimitive.Status;
 import org.slf4j.Logger;
 
 import java.io.ByteArrayInputStream;
@@ -66,7 +69,10 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import static com.google.common.collect.Multimaps.newSetMultimap;
 import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
@@ -76,18 +82,16 @@
 import static org.onlab.util.Tools.groupedThreads;
 import static org.onlab.util.Tools.randomDelay;
 import static org.onosproject.app.ApplicationEvent.Type.*;
-import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+import static org.onosproject.store.app.DistributedApplicationStore.InternalState.*;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Manages inventory of applications in a distributed data store that uses
- * optimistic replication and gossip based anti-entropy techniques.
+ * Manages inventory of applications in a distributed data store providing
+ * stronger consistency guarantees.
  */
-@Component(immediate = true)
+@Component(immediate = true, enabled = true)
 @Service
-public class GossipApplicationStore extends ApplicationArchive
+public class DistributedApplicationStore extends ApplicationArchive
         implements ApplicationStore {
 
     private final Logger log = getLogger(getClass());
@@ -110,9 +114,7 @@
     private ScheduledExecutorService executor;
     private ExecutorService messageHandlingExecutor;
 
-    private EventuallyConsistentMap<ApplicationId, Application> apps;
-    private EventuallyConsistentMap<Application, InternalState> states;
-    private EventuallyConsistentMap<Application, Set<Permission>> permissions;
+    private ConsistentMap<ApplicationId, InternalApplicationHolder> apps;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
@@ -124,11 +126,12 @@
     protected StorageService storageService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    protected LogicalClockService clockService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ApplicationIdStore idStore;
 
+    private final InternalAppsListener appsListener = new InternalAppsListener();
+
+    private Consumer<Status> statusChangeListener;
+
     // Multimap to track which apps are required by others apps
     // app -> { required-by, ... }
     // Apps explicitly activated will be required by the CORE app
@@ -139,16 +142,8 @@
 
     @Activate
     public void activate() {
-        KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
-                .register(KryoNamespaces.API)
-                .register(MultiValuedTimestamp.class)
-                .register(InternalState.class);
-
-        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
-
         messageHandlingExecutor = Executors.newSingleThreadExecutor(
                 groupedThreads("onos/store/app", "message-handler", log));
-
         clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
                                                           bytes -> new String(bytes, Charsets.UTF_8),
                                                           name -> {
@@ -161,33 +156,36 @@
                                                           Function.identity(),
                                                           messageHandlingExecutor);
 
-        // FIXME: Consider consolidating into a single map.
-
-        apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
-                .withName("apps")
-                .withSerializer(serializer)
-                .withTimestampProvider((k, v) -> clockService.getTimestamp())
+        apps = storageService.<ApplicationId, InternalApplicationHolder>consistentMapBuilder()
+                .withName("onos-apps")
+                .withRelaxedReadConsistency()
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                    InternalApplicationHolder.class,
+                                    InternalState.class))
                 .build();
 
-        states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
-                .withName("app-states")
-                .withSerializer(serializer)
-                .withTimestampProvider((k, v) -> clockService.getTimestamp())
-                .build();
-
-        states.addListener(new InternalAppStatesListener());
-
-        permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
-                .withName("app-permissions")
-                .withSerializer(serializer)
-                .withTimestampProvider((k, v) -> clockService.getTimestamp())
-                .build();
-
+        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
+        statusChangeListener = status -> {
+            if (status == Status.ACTIVE) {
+                executor.execute(this::bootstrapExistingApplications);
+            }
+        };
+        apps.addListener(appsListener, messageHandlingExecutor);
+        apps.addStatusChangeListener(statusChangeListener);
         coreAppId = getId(CoreService.CORE_APP_NAME);
         log.info("Started");
     }
 
     /**
+     * Processes existing applications from the distributed map. This is done to
+     * account for events that this instance may be have missed due to a staggered start.
+     */
+    void bootstrapExistingApplications() {
+        apps.asJavaMap().forEach((appId, holder) -> setupApplicationAndNotify(appId, holder.app(), holder.state()));
+
+    }
+
+    /**
      * Loads the application inventory from the disk and activates apps if
      * they are marked to be active.
      */
@@ -244,23 +242,27 @@
     @Deactivate
     public void deactivate() {
         clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
+        apps.removeStatusChangeListener(statusChangeListener);
+        apps.removeListener(appsListener);
         messageHandlingExecutor.shutdown();
         executor.shutdown();
-        apps.destroy();
-        states.destroy();
-        permissions.destroy();
         log.info("Stopped");
     }
 
     @Override
     public void setDelegate(ApplicationStoreDelegate delegate) {
         super.setDelegate(delegate);
+        executor.execute(this::bootstrapExistingApplications);
         executor.schedule(() -> loadFromDisk(), APP_LOAD_DELAY_MS, TimeUnit.MILLISECONDS);
     }
 
     @Override
     public Set<Application> getApplications() {
-        return ImmutableSet.copyOf(apps.values());
+        return ImmutableSet.copyOf(apps.values()
+                                       .stream()
+                                       .map(Versioned::value)
+                                       .map(InternalApplicationHolder::app)
+                                       .collect(Collectors.toSet()));
     }
 
     @Override
@@ -270,15 +272,15 @@
 
     @Override
     public Application getApplication(ApplicationId appId) {
-        return apps.get(appId);
+        InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
+        return appHolder != null ? appHolder.app() : null;
     }
 
     @Override
     public ApplicationState getState(ApplicationId appId) {
-        Application app = apps.get(appId);
-        InternalState s = app == null ? null : states.get(app);
-        return s == null ? null : s == ACTIVATED ?
-                ApplicationState.ACTIVE : ApplicationState.INSTALLED;
+        InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
+        InternalState state = appHolder != null ? appHolder.state() : null;
+        return state == null ? null : state == ACTIVATED ? ApplicationState.ACTIVE : ApplicationState.INSTALLED;
     }
 
     @Override
@@ -300,26 +302,21 @@
         if (updateTime) {
             updateTime(app.id().name());
         }
-        apps.put(app.id(), app);
-        states.put(app, INSTALLED);
-        return app;
+        InternalApplicationHolder previousApp =
+                Versioned.valueOrNull(apps.putIfAbsent(app.id(), new InternalApplicationHolder(app, INSTALLED, null)));
+        return previousApp != null ? previousApp.app() : app;
     }
 
     @Override
     public void remove(ApplicationId appId) {
-        Application app = apps.get(appId);
-        if (app != null) {
-            uninstallDependentApps(app);
-            apps.remove(appId);
-            states.remove(app);
-            permissions.remove(app);
-        }
+        uninstallDependentApps(appId);
+        apps.remove(appId);
     }
 
     // Uninstalls all apps that depend on the given app.
-    private void uninstallDependentApps(Application app) {
+    private void uninstallDependentApps(ApplicationId appId) {
         getApplications().stream()
-                .filter(a -> a.requiredApps().contains(app.id().name()))
+                .filter(a -> a.requiredApps().contains(appId.name()))
                 .forEach(a -> remove(a.id()));
     }
 
@@ -335,13 +332,18 @@
 
 
     private void activate(ApplicationId appId, boolean updateTime) {
-        Application app = apps.get(appId);
-        if (app != null) {
+        AtomicBoolean stateChanged = new AtomicBoolean(false);
+        InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.computeIf(appId,
+            v -> v != null && v.state() != ACTIVATED,
+            (k, v) -> {
+                stateChanged.set(true);
+                return new InternalApplicationHolder(v.app(), ACTIVATED, v.permissions());
+            }));
+        if (stateChanged.get()) {
             if (updateTime) {
                 updateTime(appId.name());
             }
-            activateRequiredApps(app);
-            states.put(app, ACTIVATED);
+            activateRequiredApps(appHolder.app());
         }
     }
 
@@ -352,90 +354,108 @@
 
     @Override
     public void deactivate(ApplicationId appId) {
-        deactivateDependentApps(getApplication(appId));
+        deactivateDependentApps(appId);
         deactivate(appId, coreAppId);
     }
 
     private void deactivate(ApplicationId appId, ApplicationId forAppId) {
         requiredBy.remove(appId, forAppId);
         if (requiredBy.get(appId).isEmpty()) {
-            Application app = apps.get(appId);
-            if (app != null) {
+            AtomicBoolean stateChanged = new AtomicBoolean(false);
+            apps.computeIf(appId,
+                v -> v != null && v.state() != DEACTIVATED,
+                (k, v) -> {
+                    stateChanged.set(true);
+                    return new InternalApplicationHolder(v.app(), DEACTIVATED, v.permissions());
+                });
+            if (stateChanged.get()) {
                 updateTime(appId.name());
-                states.put(app, DEACTIVATED);
-                deactivateRequiredApps(app);
+                deactivateRequiredApps(appId);
             }
         }
     }
 
     // Deactivates all apps that require this application.
-    private void deactivateDependentApps(Application app) {
-        getApplications().stream()
-                .filter(a -> states.get(a) == ACTIVATED)
-                .filter(a -> a.requiredApps().contains(app.id().name()))
-                .forEach(a -> deactivate(a.id()));
+    private void deactivateDependentApps(ApplicationId appId) {
+        apps.values()
+            .stream()
+            .map(Versioned::value)
+            .filter(a -> a.state() == ACTIVATED)
+            .filter(a -> a.app().requiredApps().contains(appId.name()))
+            .forEach(a -> deactivate(a.app().id()));
     }
 
     // Deactivates all apps required by this application.
-    private void deactivateRequiredApps(Application app) {
-        app.requiredApps().stream().map(this::getId).map(this::getApplication)
-                .filter(a -> states.get(a) == ACTIVATED)
-                .forEach(a -> deactivate(a.id(), app.id()));
+    private void deactivateRequiredApps(ApplicationId appId) {
+        getApplication(appId).requiredApps()
+                             .stream()
+                             .map(this::getId)
+                             .map(apps::get)
+                             .map(Versioned::value)
+                             .filter(a -> a.state() == ACTIVATED)
+                             .forEach(a -> deactivate(a.app().id(), appId));
     }
 
     @Override
     public Set<Permission> getPermissions(ApplicationId appId) {
-        Application app = apps.get(appId);
-        return app != null ? permissions.get(app) : null;
+        InternalApplicationHolder app = Versioned.valueOrNull(apps.get(appId));
+        return app != null ? ImmutableSet.copyOf(app.permissions()) : ImmutableSet.of();
     }
 
     @Override
     public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
-        Application app = getApplication(appId);
-        if (app != null) {
-            this.permissions.put(app, permissions);
-            delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
+        AtomicBoolean permissionsChanged = new AtomicBoolean(false);
+        Versioned<InternalApplicationHolder> appHolder = apps.computeIf(appId,
+            v -> v != null && !Sets.symmetricDifference(v.permissions(), permissions).isEmpty(),
+            (k, v) -> {
+                permissionsChanged.set(true);
+                return new InternalApplicationHolder(v.app(), v.state(), ImmutableSet.copyOf(permissions));
+            });
+        if (permissionsChanged.get()) {
+            delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, appHolder.value().app()));
         }
     }
 
     /**
      * Listener to application state distributed map changes.
      */
-    private final class InternalAppStatesListener
-            implements EventuallyConsistentMapListener<Application, InternalState> {
+    private final class InternalAppsListener
+            implements MapEventListener<ApplicationId, InternalApplicationHolder> {
         @Override
-        public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
-            // If we do not have a delegate, refuse to process any events entirely.
-            // This is to allow the anti-entropy to kick in and process the events
-            // perhaps a bit later, but with opportunity to notify delegate.
+        public void event(MapEvent<ApplicationId, InternalApplicationHolder> event) {
             if (delegate == null) {
                 return;
             }
 
-            Application app = event.key();
-            InternalState state = event.value();
-
-            if (event.type() == PUT) {
-                if (state == INSTALLED) {
-                    fetchBitsIfNeeded(app);
-                    delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
-
-                } else if (state == ACTIVATED) {
-                    installAppIfNeeded(app);
-                    setActive(app.id().name());
-                    delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
-
-                } else if (state == DEACTIVATED) {
-                    clearActive(app.id().name());
-                    delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
+            ApplicationId appId = event.key();
+            InternalApplicationHolder newApp = event.newValue() == null ? null : event.newValue().value();
+            InternalApplicationHolder oldApp = event.oldValue() == null ? null : event.oldValue().value();
+            if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
+                if (event.type() == MapEvent.Type.UPDATE && newApp.state() == oldApp.state()) {
+                    return;
                 }
-            } else if (event.type() == REMOVE) {
-                delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
-                purgeApplication(app.id().name());
+                setupApplicationAndNotify(appId, newApp.app(), newApp.state());
+            } else if (event.type() == MapEvent.Type.REMOVE) {
+                delegate.notify(new ApplicationEvent(APP_UNINSTALLED, oldApp.app()));
+                purgeApplication(appId.name());
             }
         }
     }
 
+    private void setupApplicationAndNotify(ApplicationId appId, Application app, InternalState state) {
+        if (state == INSTALLED) {
+            fetchBitsIfNeeded(app);
+            delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
+        } else if (state == ACTIVATED) {
+            installAppIfNeeded(app);
+            setActive(appId.name());
+            delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
+        } else if (state == DEACTIVATED) {
+            clearActive(appId.name());
+            delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
+        }
+    }
+
     /**
      * Determines if the application bits are available locally.
      */
@@ -512,19 +532,6 @@
     }
 
     /**
-     * Prunes applications which are not in the map, but are on disk.
-     */
-    private void pruneUninstalledApps() {
-        for (String name : getApplicationNames()) {
-            if (getApplication(getId(name)) == null) {
-                Application app = registerApp(getApplicationDescription(name));
-                delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
-                purgeApplication(app.id().name());
-            }
-        }
-    }
-
-    /**
      * Produces a registered application from the supplied description.
      */
     private Application registerApp(ApplicationDescription appDesc) {
@@ -544,4 +551,46 @@
                 appDesc.features(),
                 appDesc.requiredApps());
     }
+
+    /**
+     * Internal class for holding app information.
+     */
+    private static class InternalApplicationHolder {
+        private final Application app;
+        private final InternalState state;
+        private final Set<Permission> permissions;
+
+        @SuppressWarnings("unused")
+        private InternalApplicationHolder() {
+            app = null;
+            state = null;
+            permissions = null;
+        }
+
+        public InternalApplicationHolder(Application app, InternalState state, Set<Permission> permissions) {
+            this.app = Preconditions.checkNotNull(app);
+            this.state = state;
+            this.permissions = permissions == null ? null : ImmutableSet.copyOf(permissions);
+        }
+
+        public Application app() {
+            return app;
+        }
+
+        public InternalState state() {
+            return state;
+        }
+
+        public Set<Permission> permissions() {
+            return permissions;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("app", app.id())
+                    .add("state", state)
+                    .toString();
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/app/package-info.java
index 2765d29..19ba3dc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/package-info.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/package-info.java
@@ -15,6 +15,6 @@
  */
 
 /**
- * Implementation of distributed applications store.
+ * Implementation of distributed application store.
  */
 package org.onosproject.store.app;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java b/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java
index cc98ac5..2c7b554 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java
@@ -38,7 +38,6 @@
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
 
@@ -109,8 +108,14 @@
 
     @Override
     public ApplicationId registerApplication(String name) {
-        return Versioned.valueOrNull(registeredIds.computeIfAbsent(name,
-                key -> new DefaultApplicationId((int) appIdCounter.incrementAndGet(), name)));
+        ApplicationId exisitingAppId = registeredIds.asJavaMap().get(name);
+        if (exisitingAppId == null) {
+            ApplicationId newAppId = new DefaultApplicationId((int) appIdCounter.incrementAndGet(), name);
+            exisitingAppId = registeredIds.asJavaMap().putIfAbsent(name, newAppId);
+            return exisitingAppId == null ? newAppId : exisitingAppId;
+        } else {
+            return exisitingAppId;
+        }
     }
 
     private void primeIdToAppIdCache() {