New ApplicationStore that uses a single ConsistentMap to track all app related state

Change-Id: Ieacc97f213add8ece8f462cd9971fb6ef3d0dde5
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
new file mode 100644
index 0000000..d2707d6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
@@ -0,0 +1,596 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.app;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.app.ApplicationDescription;
+import org.onosproject.app.ApplicationEvent;
+import org.onosproject.app.ApplicationException;
+import org.onosproject.app.ApplicationState;
+import org.onosproject.app.ApplicationStore;
+import org.onosproject.app.ApplicationStoreDelegate;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.common.app.ApplicationArchive;
+import org.onosproject.core.Application;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.ApplicationIdStore;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.DefaultApplication;
+import org.onosproject.security.Permission;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.DistributedPrimitive.Status;
+import org.slf4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static com.google.common.collect.Multimaps.newSetMultimap;
+import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
+import static com.google.common.io.ByteStreams.toByteArray;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onlab.util.Tools.randomDelay;
+import static org.onosproject.app.ApplicationEvent.Type.*;
+import static org.onosproject.store.app.DistributedApplicationStore.InternalState.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages inventory of applications in a distributed data store providing
+ * stronger consistency guarantees.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DistributedApplicationStore extends ApplicationArchive
+        implements ApplicationStore {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
+
+    private static final int MAX_LOAD_RETRIES = 5;
+    private static final int RETRY_DELAY_MS = 2_000;
+
+    private static final int FETCH_TIMEOUT_MS = 10_000;
+
+    private static final int APP_LOAD_DELAY_MS = 500;
+
+    private static List<String> pendingApps = Lists.newArrayList();
+
+    public enum InternalState {
+        INSTALLED, ACTIVATED, DEACTIVATED
+    }
+
+    private ScheduledExecutorService executor;
+    private ExecutorService messageHandlingExecutor;
+
+    private ConsistentMap<ApplicationId, InternalApplicationHolder> apps;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ApplicationIdStore idStore;
+
+    private final InternalAppsListener appsListener = new InternalAppsListener();
+
+    private Consumer<Status> statusChangeListener;
+
+    // Multimap to track which apps are required by others apps
+    // app -> { required-by, ... }
+    // Apps explicitly activated will be required by the CORE app
+    private final Multimap<ApplicationId, ApplicationId> requiredBy =
+            synchronizedSetMultimap(newSetMultimap(Maps.newHashMap(), Sets::newHashSet));
+
+    private ApplicationId coreAppId;
+
+    @Activate
+    public void activate() {
+        messageHandlingExecutor = Executors.newSingleThreadExecutor(
+                groupedThreads("onos/store/app", "message-handler", log));
+        clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
+                                                          bytes -> new String(bytes, Charsets.UTF_8),
+                                                          name -> {
+                                                              try {
+                                                                  return toByteArray(getApplicationInputStream(name));
+                                                              } catch (IOException e) {
+                                                                  throw new StorageException(e);
+                                                              }
+                                                          },
+                                                          Function.identity(),
+                                                          messageHandlingExecutor);
+
+        apps = storageService.<ApplicationId, InternalApplicationHolder>consistentMapBuilder()
+                .withName("onos-apps")
+                .withRelaxedReadConsistency()
+                .withSerializer(Serializer.using(KryoNamespaces.API,
+                                    InternalApplicationHolder.class,
+                                    InternalState.class))
+                .build();
+
+        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
+        statusChangeListener = status -> {
+            if (status == Status.ACTIVE) {
+                executor.execute(this::bootstrapExistingApplications);
+            }
+        };
+        apps.addListener(appsListener, messageHandlingExecutor);
+        apps.addStatusChangeListener(statusChangeListener);
+        coreAppId = getId(CoreService.CORE_APP_NAME);
+        log.info("Started");
+    }
+
+    /**
+     * Processes existing applications from the distributed map. This is done to
+     * account for events that this instance may be have missed due to a staggered start.
+     */
+    void bootstrapExistingApplications() {
+        apps.asJavaMap().forEach((appId, holder) -> setupApplicationAndNotify(appId, holder.app(), holder.state()));
+
+    }
+
+    /**
+     * Loads the application inventory from the disk and activates apps if
+     * they are marked to be active.
+     */
+    private void loadFromDisk() {
+        getApplicationNames().forEach(appName -> {
+            Application app = loadFromDisk(appName);
+            if (app != null && isActive(app.id().name())) {
+                activate(app.id(), false);
+                // TODO Load app permissions
+            }
+        });
+    }
+
+    private Application loadFromDisk(String appName) {
+        pendingApps.add(appName);
+
+        for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
+            try {
+                // Directly return if app already exists
+                ApplicationId appId = getId(appName);
+                if (appId != null) {
+                    Application application = getApplication(appId);
+                    if (application != null) {
+                        pendingApps.remove(appName);
+                        return application;
+                    }
+                }
+
+                ApplicationDescription appDesc = getApplicationDescription(appName);
+
+                Optional<String> loop = appDesc.requiredApps().stream()
+                        .filter(app -> pendingApps.contains(app)).findAny();
+                if (loop.isPresent()) {
+                    log.error("Circular app dependency detected: {} -> {}", pendingApps, loop.get());
+                    pendingApps.remove(appName);
+                    return null;
+                }
+
+                boolean success = appDesc.requiredApps().stream()
+                        .noneMatch(requiredApp -> loadFromDisk(requiredApp) == null);
+                pendingApps.remove(appName);
+
+                return success ? create(appDesc, false) : null;
+
+            } catch (Exception e) {
+                log.warn("Unable to load application {} from disk; retrying", appName);
+                randomDelay(RETRY_DELAY_MS); //FIXME: This is a deliberate hack; fix in Falcon
+            }
+        }
+        pendingApps.remove(appName);
+        return null;
+    }
+
+    @Deactivate
+    public void deactivate() {
+        clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
+        apps.removeStatusChangeListener(statusChangeListener);
+        apps.removeListener(appsListener);
+        messageHandlingExecutor.shutdown();
+        executor.shutdown();
+        log.info("Stopped");
+    }
+
+    @Override
+    public void setDelegate(ApplicationStoreDelegate delegate) {
+        super.setDelegate(delegate);
+        executor.execute(this::bootstrapExistingApplications);
+        executor.schedule(() -> loadFromDisk(), APP_LOAD_DELAY_MS, TimeUnit.MILLISECONDS);
+    }
+
+    @Override
+    public Set<Application> getApplications() {
+        return ImmutableSet.copyOf(apps.values()
+                                       .stream()
+                                       .map(Versioned::value)
+                                       .map(InternalApplicationHolder::app)
+                                       .collect(Collectors.toSet()));
+    }
+
+    @Override
+    public ApplicationId getId(String name) {
+        return idStore.getAppId(name);
+    }
+
+    @Override
+    public Application getApplication(ApplicationId appId) {
+        InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
+        return appHolder != null ? appHolder.app() : null;
+    }
+
+    @Override
+    public ApplicationState getState(ApplicationId appId) {
+        InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
+        InternalState state = appHolder != null ? appHolder.state() : null;
+        return state == null ? null : state == ACTIVATED ? ApplicationState.ACTIVE : ApplicationState.INSTALLED;
+    }
+
+    @Override
+    public Application create(InputStream appDescStream) {
+        ApplicationDescription appDesc = saveApplication(appDescStream);
+        if (hasPrerequisites(appDesc)) {
+            return create(appDesc, true);
+        }
+        throw new ApplicationException("Missing dependencies for app " + appDesc.name());
+    }
+
+    private boolean hasPrerequisites(ApplicationDescription app) {
+        return !app.requiredApps().stream().map(n -> getId(n))
+                .anyMatch(id -> id == null || getApplication(id) == null);
+    }
+
+    private Application create(ApplicationDescription appDesc, boolean updateTime) {
+        Application app = registerApp(appDesc);
+        if (updateTime) {
+            updateTime(app.id().name());
+        }
+        InternalApplicationHolder previousApp =
+                Versioned.valueOrNull(apps.putIfAbsent(app.id(), new InternalApplicationHolder(app, INSTALLED, null)));
+        return previousApp != null ? previousApp.app() : app;
+    }
+
+    @Override
+    public void remove(ApplicationId appId) {
+        uninstallDependentApps(appId);
+        apps.remove(appId);
+    }
+
+    // Uninstalls all apps that depend on the given app.
+    private void uninstallDependentApps(ApplicationId appId) {
+        getApplications().stream()
+                .filter(a -> a.requiredApps().contains(appId.name()))
+                .forEach(a -> remove(a.id()));
+    }
+
+    @Override
+    public void activate(ApplicationId appId) {
+        activate(appId, coreAppId);
+    }
+
+    private void activate(ApplicationId appId, ApplicationId forAppId) {
+        requiredBy.put(appId, forAppId);
+        activate(appId, true);
+    }
+
+
+    private void activate(ApplicationId appId, boolean updateTime) {
+        AtomicBoolean stateChanged = new AtomicBoolean(false);
+        InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.computeIf(appId,
+            v -> v != null && v.state() != ACTIVATED,
+            (k, v) -> {
+                stateChanged.set(true);
+                return new InternalApplicationHolder(v.app(), ACTIVATED, v.permissions());
+            }));
+        if (stateChanged.get()) {
+            if (updateTime) {
+                updateTime(appId.name());
+            }
+            activateRequiredApps(appHolder.app());
+        }
+    }
+
+    // Activates all apps required by this application.
+    private void activateRequiredApps(Application app) {
+        app.requiredApps().stream().map(this::getId).forEach(id -> activate(id, app.id()));
+    }
+
+    @Override
+    public void deactivate(ApplicationId appId) {
+        deactivateDependentApps(appId);
+        deactivate(appId, coreAppId);
+    }
+
+    private void deactivate(ApplicationId appId, ApplicationId forAppId) {
+        requiredBy.remove(appId, forAppId);
+        if (requiredBy.get(appId).isEmpty()) {
+            AtomicBoolean stateChanged = new AtomicBoolean(false);
+            apps.computeIf(appId,
+                v -> v != null && v.state() != DEACTIVATED,
+                (k, v) -> {
+                    stateChanged.set(true);
+                    return new InternalApplicationHolder(v.app(), DEACTIVATED, v.permissions());
+                });
+            if (stateChanged.get()) {
+                updateTime(appId.name());
+                deactivateRequiredApps(appId);
+            }
+        }
+    }
+
+    // Deactivates all apps that require this application.
+    private void deactivateDependentApps(ApplicationId appId) {
+        apps.values()
+            .stream()
+            .map(Versioned::value)
+            .filter(a -> a.state() == ACTIVATED)
+            .filter(a -> a.app().requiredApps().contains(appId.name()))
+            .forEach(a -> deactivate(a.app().id()));
+    }
+
+    // Deactivates all apps required by this application.
+    private void deactivateRequiredApps(ApplicationId appId) {
+        getApplication(appId).requiredApps()
+                             .stream()
+                             .map(this::getId)
+                             .map(apps::get)
+                             .map(Versioned::value)
+                             .filter(a -> a.state() == ACTIVATED)
+                             .forEach(a -> deactivate(a.app().id(), appId));
+    }
+
+    @Override
+    public Set<Permission> getPermissions(ApplicationId appId) {
+        InternalApplicationHolder app = Versioned.valueOrNull(apps.get(appId));
+        return app != null ? ImmutableSet.copyOf(app.permissions()) : ImmutableSet.of();
+    }
+
+    @Override
+    public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
+        AtomicBoolean permissionsChanged = new AtomicBoolean(false);
+        Versioned<InternalApplicationHolder> appHolder = apps.computeIf(appId,
+            v -> v != null && !Sets.symmetricDifference(v.permissions(), permissions).isEmpty(),
+            (k, v) -> {
+                permissionsChanged.set(true);
+                return new InternalApplicationHolder(v.app(), v.state(), ImmutableSet.copyOf(permissions));
+            });
+        if (permissionsChanged.get()) {
+            delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, appHolder.value().app()));
+        }
+    }
+
+    /**
+     * Listener to application state distributed map changes.
+     */
+    private final class InternalAppsListener
+            implements MapEventListener<ApplicationId, InternalApplicationHolder> {
+        @Override
+        public void event(MapEvent<ApplicationId, InternalApplicationHolder> event) {
+            if (delegate == null) {
+                return;
+            }
+
+            ApplicationId appId = event.key();
+            InternalApplicationHolder newApp = event.newValue() == null ? null : event.newValue().value();
+            InternalApplicationHolder oldApp = event.oldValue() == null ? null : event.oldValue().value();
+            if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
+                if (event.type() == MapEvent.Type.UPDATE && newApp.state() == oldApp.state()) {
+                    return;
+                }
+                setupApplicationAndNotify(appId, newApp.app(), newApp.state());
+            } else if (event.type() == MapEvent.Type.REMOVE) {
+                delegate.notify(new ApplicationEvent(APP_UNINSTALLED, oldApp.app()));
+                purgeApplication(appId.name());
+            }
+        }
+    }
+
+    private void setupApplicationAndNotify(ApplicationId appId, Application app, InternalState state) {
+        if (state == INSTALLED) {
+            fetchBitsIfNeeded(app);
+            delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
+        } else if (state == ACTIVATED) {
+            installAppIfNeeded(app);
+            setActive(appId.name());
+            delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
+        } else if (state == DEACTIVATED) {
+            clearActive(appId.name());
+            delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
+        }
+    }
+
+    /**
+     * Determines if the application bits are available locally.
+     */
+    private boolean appBitsAvailable(Application app) {
+        try {
+            ApplicationDescription appDesc = getApplicationDescription(app.id().name());
+            return appDesc.version().equals(app.version());
+        } catch (ApplicationException e) {
+            return false;
+        }
+    }
+
+    /**
+     * Fetches the bits from the cluster peers if necessary.
+     */
+    private void fetchBitsIfNeeded(Application app) {
+        if (!appBitsAvailable(app)) {
+            fetchBits(app);
+        }
+    }
+
+    /**
+     * Installs the application if necessary from the application peers.
+     */
+    private void installAppIfNeeded(Application app) {
+        if (!appBitsAvailable(app)) {
+            fetchBits(app);
+            delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
+        }
+    }
+
+    /**
+     * Fetches the bits from the cluster peers.
+     */
+    private void fetchBits(Application app) {
+        ControllerNode localNode = clusterService.getLocalNode();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        // FIXME: send message with name & version to make sure we don't get served old bits
+
+        log.info("Downloading bits for application {}", app.id().name());
+        for (ControllerNode node : clusterService.getNodes()) {
+            if (latch.getCount() == 0) {
+                break;
+            }
+            if (node.equals(localNode)) {
+                continue;
+            }
+            clusterCommunicator.sendAndReceive(app.id().name(),
+                                               APP_BITS_REQUEST,
+                                               s -> s.getBytes(Charsets.UTF_8),
+                                               Function.identity(),
+                                               node.id())
+                    .whenCompleteAsync((bits, error) -> {
+                        if (error == null && latch.getCount() > 0) {
+                            saveApplication(new ByteArrayInputStream(bits));
+                            log.info("Downloaded bits for application {} from node {}",
+                                     app.id().name(), node.id());
+                            latch.countDown();
+                        } else if (error != null) {
+                            log.warn("Unable to fetch bits for application {} from node {}",
+                                     app.id().name(), node.id());
+                        }
+                    }, executor);
+        }
+
+        try {
+            if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
+                log.warn("Unable to fetch bits for application {}", app.id().name());
+            }
+        } catch (InterruptedException e) {
+            log.warn("Interrupted while fetching bits for application {}", app.id().name());
+        }
+    }
+
+    /**
+     * Produces a registered application from the supplied description.
+     */
+    private Application registerApp(ApplicationDescription appDesc) {
+        ApplicationId appId = idStore.registerApplication(appDesc.name());
+        return new DefaultApplication(appId,
+                appDesc.version(),
+                appDesc.title(),
+                appDesc.description(),
+                appDesc.origin(),
+                appDesc.category(),
+                appDesc.url(),
+                appDesc.readme(),
+                appDesc.icon(),
+                appDesc.role(),
+                appDesc.permissions(),
+                appDesc.featuresRepo(),
+                appDesc.features(),
+                appDesc.requiredApps());
+    }
+
+    /**
+     * Internal class for holding app information.
+     */
+    private static class InternalApplicationHolder {
+        private final Application app;
+        private final InternalState state;
+        private final Set<Permission> permissions;
+
+        @SuppressWarnings("unused")
+        private InternalApplicationHolder() {
+            app = null;
+            state = null;
+            permissions = null;
+        }
+
+        public InternalApplicationHolder(Application app, InternalState state, Set<Permission> permissions) {
+            this.app = Preconditions.checkNotNull(app);
+            this.state = state;
+            this.permissions = permissions == null ? null : ImmutableSet.copyOf(permissions);
+        }
+
+        public Application app() {
+            return app;
+        }
+
+        public InternalState state() {
+            return state;
+        }
+
+        public Set<Permission> permissions() {
+            return permissions;
+        }
+
+        @Override
+        public String toString() {
+            return MoreObjects.toStringHelper(getClass())
+                    .add("app", app.id())
+                    .add("state", state)
+                    .toString();
+        }
+    }
+}