ONOS-785 Adding distributed store for apps & app admin CLIs

Change-Id: Ia7639f3258fca2a18ba513f0c95de0ab8ea7ceee
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
new file mode 100644
index 0000000..39fddd8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -0,0 +1,398 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.app;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.app.ApplicationDescription;
+import org.onosproject.app.ApplicationEvent;
+import org.onosproject.app.ApplicationException;
+import org.onosproject.app.ApplicationState;
+import org.onosproject.app.ApplicationStore;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.common.app.ApplicationArchive;
+import org.onosproject.core.Application;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.ApplicationIdStore;
+import org.onosproject.core.DefaultApplication;
+import org.onosproject.core.Permission;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.impl.EventuallyConsistentMap;
+import org.onosproject.store.impl.EventuallyConsistentMapEvent;
+import org.onosproject.store.impl.EventuallyConsistentMapImpl;
+import org.onosproject.store.impl.EventuallyConsistentMapListener;
+import org.onosproject.store.impl.WallclockClockManager;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.slf4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+
+import static com.google.common.io.ByteStreams.toByteArray;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.onlab.util.Tools.namedThreads;
+import static org.onosproject.app.ApplicationEvent.Type.*;
+import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
+import static org.onosproject.store.impl.EventuallyConsistentMapEvent.Type.PUT;
+import static org.onosproject.store.impl.EventuallyConsistentMapEvent.Type.REMOVE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages inventory of applications in a distributed data store that uses
+ * optimistic replication and gossip based anti-entropy techniques.
+ */
+@Component(immediate = true)
+@Service
+public class GossipApplicationStore extends ApplicationArchive
+        implements ApplicationStore {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
+
+    private static final int FETCH_TIMEOUT_MS = 10_000;
+    private static final int LOAD_TIMEOUT_MS = 5_000;
+
+    public enum InternalState {
+        INSTALLED, ACTIVATED, DEACTIVATED
+    }
+
+    private final ScheduledExecutorService executor =
+            Executors.newSingleThreadScheduledExecutor(namedThreads("onos-app-store"));
+
+    private EventuallyConsistentMap<ApplicationId, Application> apps;
+    private EventuallyConsistentMap<Application, InternalState> states;
+    private EventuallyConsistentMap<Application, Set<Permission>> permissions;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ApplicationIdStore idStore;
+
+    @Activate
+    public void activate() {
+        KryoNamespace.Builder intentSerializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(InternalState.class);
+
+        clusterCommunicator.addSubscriber(APP_BITS_REQUEST, new InternalBitServer());
+
+        apps = new EventuallyConsistentMapImpl<>("apps", clusterService,
+                                                 clusterCommunicator,
+                                                 intentSerializer,
+                                                 new WallclockClockManager<>());
+
+        states = new EventuallyConsistentMapImpl<>("app-states",
+                                                   clusterService,
+                                                   clusterCommunicator,
+                                                   intentSerializer,
+                                                   new WallclockClockManager<>());
+        states.addListener(new InternalAppStatesListener());
+
+        permissions = new EventuallyConsistentMapImpl<>("app-permissions",
+                                                        clusterService,
+                                                        clusterCommunicator,
+                                                        intentSerializer,
+                                                        new WallclockClockManager<>());
+
+        // FIXME: figure out load from disk; this will require resolving the dual authority problem
+
+        executor.schedule(this::pruneUninstalledApps, LOAD_TIMEOUT_MS, MILLISECONDS);
+
+        log.info("Started");
+    }
+
+    private void loadFromDisk() {
+        for (String name : getApplicationNames()) {
+            create(getApplicationDescription(name));
+            // load app permissions
+        }
+    }
+
+    @Deactivate
+    public void deactivate() {
+        apps.destroy();
+        states.destroy();
+        permissions.destroy();
+        log.info("Stopped");
+    }
+
+    @Override
+    public Set<Application> getApplications() {
+        return ImmutableSet.copyOf(apps.values());
+    }
+
+    @Override
+    public ApplicationId getId(String name) {
+        return idStore.getAppId(name);
+    }
+
+    @Override
+    public Application getApplication(ApplicationId appId) {
+        return apps.get(appId);
+    }
+
+    @Override
+    public ApplicationState getState(ApplicationId appId) {
+        Application app = apps.get(appId);
+        InternalState s = app == null ? null : states.get(app);
+        return s == null ? null : s == ACTIVATED ?
+                ApplicationState.ACTIVE : ApplicationState.INSTALLED;
+    }
+
+    @Override
+    public Application create(InputStream appDescStream) {
+        ApplicationDescription appDesc = saveApplication(appDescStream);
+        return create(appDesc);
+    }
+
+    private Application create(ApplicationDescription appDesc) {
+        Application app = registerApp(appDesc);
+        apps.put(app.id(), app);
+        states.put(app, INSTALLED);
+        return app;
+    }
+
+    @Override
+    public void remove(ApplicationId appId) {
+        Application app = apps.get(appId);
+        if (app != null) {
+            apps.remove(appId);
+            states.remove(app);
+            permissions.remove(app);
+        }
+    }
+
+    @Override
+    public void activate(ApplicationId appId) {
+        Application app = apps.get(appId);
+        if (app != null) {
+            states.put(app, ACTIVATED);
+        }
+    }
+
+    @Override
+    public void deactivate(ApplicationId appId) {
+        Application app = apps.get(appId);
+        if (app != null) {
+            states.put(app, DEACTIVATED);
+        }
+    }
+
+    @Override
+    public Set<Permission> getPermissions(ApplicationId appId) {
+        Application app = apps.get(appId);
+        return app != null ? permissions.get(app) : null;
+    }
+
+    @Override
+    public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
+        Application app = getApplication(appId);
+        if (app != null) {
+            this.permissions.put(app, permissions);
+            delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
+        }
+    }
+
+    /**
+     * Listener to application state distributed map changes.
+     */
+    private final class InternalAppStatesListener
+            implements EventuallyConsistentMapListener<Application, InternalState> {
+        @Override
+        public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
+            Application app = event.key();
+            InternalState state = event.value();
+
+            if (event.type() == PUT) {
+                if (state == INSTALLED) {
+                    fetchBitsIfNeeded(app);
+                    delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
+
+                } else if (state == ACTIVATED) {
+                    installAppIfNeeded(app);
+                    setActive(app.id().name());
+                    delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
+
+                } else if (state == DEACTIVATED) {
+                    clearActive(app.id().name());
+                    delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
+                }
+            } else if (event.type() == REMOVE) {
+                delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
+                purgeApplication(app.id().name());
+            }
+        }
+    }
+
+    /**
+     * Determines if the application bits are available locally.
+     */
+    private boolean appBitsAvailable(Application app) {
+        try {
+            ApplicationDescription appDesc = getApplicationDescription(app.id().name());
+            return appDesc.version().equals(app.version());
+        } catch (ApplicationException e) {
+            return false;
+        }
+    }
+
+    /**
+     * Fetches the bits from the cluster peers if necessary.
+     */
+    private void fetchBitsIfNeeded(Application app) {
+        if (!appBitsAvailable(app)) {
+            fetchBits(app);
+        }
+    }
+
+    /**
+     * Installs the application if necessary from the application peers.
+     */
+    private void installAppIfNeeded(Application app) {
+        if (!appBitsAvailable(app)) {
+            fetchBits(app);
+            delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
+        }
+    }
+
+    /**
+     * Fetches the bits from the cluster peers.
+     */
+    private void fetchBits(Application app) {
+        ControllerNode localNode = clusterService.getLocalNode();
+        ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
+                                                    app.id().name().getBytes());
+        Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
+        CountDownLatch latch = new CountDownLatch(1);
+
+        // FIXME: send message with name & version to make sure we don't get served old bits
+
+        log.info("Downloading bits for application {}", app.id().name());
+        for (ControllerNode node : clusterService.getNodes()) {
+            try {
+                ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
+                future.addListener(new InternalBitListener(app, node, future, latch), executor);
+            } catch (IOException e) {
+                log.debug("Unable to request bits for application {} from node {}",
+                          app.id().name(), node.id());
+            }
+        }
+
+        try {
+            if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
+                log.warn("Unable to fetch bits for application {}", app.id().name());
+            }
+        } catch (InterruptedException e) {
+            log.warn("Interrupted while fetching bits for application {}", app.id().name());
+        }
+    }
+
+    /**
+     * Responder to requests for application bits.
+     */
+    private class InternalBitServer implements ClusterMessageHandler {
+        @Override
+        public void handle(ClusterMessage message) {
+            String name = new String(message.payload());
+            try {
+                message.respond(toByteArray(getApplicationInputStream(name)));
+            } catch (Exception e) {
+                log.debug("Unable to read bits for application {}", name);
+            }
+        }
+    }
+
+    /**
+     * Processes completed fetch requests.
+     */
+    private class InternalBitListener implements Runnable {
+        private final Application app;
+        private final ControllerNode node;
+        private final ListenableFuture<byte[]> future;
+        private final CountDownLatch latch;
+
+        public InternalBitListener(Application app, ControllerNode node,
+                                   ListenableFuture<byte[]> future, CountDownLatch latch) {
+            this.app = app;
+            this.node = node;
+            this.future = future;
+            this.latch = latch;
+        }
+
+        @Override
+        public void run() {
+            if (latch.getCount() > 0 && !future.isCancelled()) {
+                try {
+                    byte[] bits = future.get(1, MILLISECONDS);
+                    saveApplication(new ByteArrayInputStream(bits));
+                    log.info("Downloaded bits for application {} from node {}",
+                             app.id().name(), node.id());
+                    latch.countDown();
+                } catch (Exception e) {
+                    log.warn("Unable to fetch bits for application {} from node {}",
+                             app.id().name(), node.id());
+                }
+            }
+        }
+    }
+
+    /**
+     * Prunes applications which are not in the map, but are on disk.
+     */
+    private void pruneUninstalledApps() {
+        for (String name : getApplicationNames()) {
+            if (getApplication(getId(name)) == null) {
+                Application app = registerApp(getApplicationDescription(name));
+                delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
+                purgeApplication(app.id().name());
+            }
+        }
+    }
+
+    /**
+     * Produces a registered application from the supplied description.
+     */
+    private Application registerApp(ApplicationDescription appDesc) {
+        ApplicationId appId = idStore.registerApplication(appDesc.name());
+        return new DefaultApplication(appId, appDesc.version(), appDesc.description(),
+                                      appDesc.origin(), appDesc.permissions(),
+                                      appDesc.featuresRepo(), appDesc.features());
+    }
+}
+