New ApplicationStore that uses a single ConsistentMap to track all app related state
Change-Id: Ieacc97f213add8ece8f462cd9971fb6ef3d0dde5
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
new file mode 100644
index 0000000..d2707d6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
@@ -0,0 +1,596 @@
+/*
+ * Copyright 2016-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.app;
+
+import com.google.common.base.Charsets;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Sets;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onosproject.app.ApplicationDescription;
+import org.onosproject.app.ApplicationEvent;
+import org.onosproject.app.ApplicationException;
+import org.onosproject.app.ApplicationState;
+import org.onosproject.app.ApplicationStore;
+import org.onosproject.app.ApplicationStoreDelegate;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.common.app.ApplicationArchive;
+import org.onosproject.core.Application;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.ApplicationIdStore;
+import org.onosproject.core.CoreService;
+import org.onosproject.core.DefaultApplication;
+import org.onosproject.security.Permission;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.DistributedPrimitive.Status;
+import org.slf4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static com.google.common.collect.Multimaps.newSetMultimap;
+import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
+import static com.google.common.io.ByteStreams.toByteArray;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onlab.util.Tools.randomDelay;
+import static org.onosproject.app.ApplicationEvent.Type.*;
+import static org.onosproject.store.app.DistributedApplicationStore.InternalState.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages inventory of applications in a distributed data store providing
+ * stronger consistency guarantees.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class DistributedApplicationStore extends ApplicationArchive
+ implements ApplicationStore {
+
+ private final Logger log = getLogger(getClass());
+
+ private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
+
+ private static final int MAX_LOAD_RETRIES = 5;
+ private static final int RETRY_DELAY_MS = 2_000;
+
+ private static final int FETCH_TIMEOUT_MS = 10_000;
+
+ private static final int APP_LOAD_DELAY_MS = 500;
+
+ private static List<String> pendingApps = Lists.newArrayList();
+
+ public enum InternalState {
+ INSTALLED, ACTIVATED, DEACTIVATED
+ }
+
+ private ScheduledExecutorService executor;
+ private ExecutorService messageHandlingExecutor;
+
+ private ConsistentMap<ApplicationId, InternalApplicationHolder> apps;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ApplicationIdStore idStore;
+
+ private final InternalAppsListener appsListener = new InternalAppsListener();
+
+ private Consumer<Status> statusChangeListener;
+
+ // Multimap to track which apps are required by others apps
+ // app -> { required-by, ... }
+ // Apps explicitly activated will be required by the CORE app
+ private final Multimap<ApplicationId, ApplicationId> requiredBy =
+ synchronizedSetMultimap(newSetMultimap(Maps.newHashMap(), Sets::newHashSet));
+
+ private ApplicationId coreAppId;
+
+ @Activate
+ public void activate() {
+ messageHandlingExecutor = Executors.newSingleThreadExecutor(
+ groupedThreads("onos/store/app", "message-handler", log));
+ clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
+ bytes -> new String(bytes, Charsets.UTF_8),
+ name -> {
+ try {
+ return toByteArray(getApplicationInputStream(name));
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ },
+ Function.identity(),
+ messageHandlingExecutor);
+
+ apps = storageService.<ApplicationId, InternalApplicationHolder>consistentMapBuilder()
+ .withName("onos-apps")
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ InternalApplicationHolder.class,
+ InternalState.class))
+ .build();
+
+ executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
+ statusChangeListener = status -> {
+ if (status == Status.ACTIVE) {
+ executor.execute(this::bootstrapExistingApplications);
+ }
+ };
+ apps.addListener(appsListener, messageHandlingExecutor);
+ apps.addStatusChangeListener(statusChangeListener);
+ coreAppId = getId(CoreService.CORE_APP_NAME);
+ log.info("Started");
+ }
+
+ /**
+ * Processes existing applications from the distributed map. This is done to
+ * account for events that this instance may be have missed due to a staggered start.
+ */
+ void bootstrapExistingApplications() {
+ apps.asJavaMap().forEach((appId, holder) -> setupApplicationAndNotify(appId, holder.app(), holder.state()));
+
+ }
+
+ /**
+ * Loads the application inventory from the disk and activates apps if
+ * they are marked to be active.
+ */
+ private void loadFromDisk() {
+ getApplicationNames().forEach(appName -> {
+ Application app = loadFromDisk(appName);
+ if (app != null && isActive(app.id().name())) {
+ activate(app.id(), false);
+ // TODO Load app permissions
+ }
+ });
+ }
+
+ private Application loadFromDisk(String appName) {
+ pendingApps.add(appName);
+
+ for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
+ try {
+ // Directly return if app already exists
+ ApplicationId appId = getId(appName);
+ if (appId != null) {
+ Application application = getApplication(appId);
+ if (application != null) {
+ pendingApps.remove(appName);
+ return application;
+ }
+ }
+
+ ApplicationDescription appDesc = getApplicationDescription(appName);
+
+ Optional<String> loop = appDesc.requiredApps().stream()
+ .filter(app -> pendingApps.contains(app)).findAny();
+ if (loop.isPresent()) {
+ log.error("Circular app dependency detected: {} -> {}", pendingApps, loop.get());
+ pendingApps.remove(appName);
+ return null;
+ }
+
+ boolean success = appDesc.requiredApps().stream()
+ .noneMatch(requiredApp -> loadFromDisk(requiredApp) == null);
+ pendingApps.remove(appName);
+
+ return success ? create(appDesc, false) : null;
+
+ } catch (Exception e) {
+ log.warn("Unable to load application {} from disk; retrying", appName);
+ randomDelay(RETRY_DELAY_MS); //FIXME: This is a deliberate hack; fix in Falcon
+ }
+ }
+ pendingApps.remove(appName);
+ return null;
+ }
+
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
+ apps.removeStatusChangeListener(statusChangeListener);
+ apps.removeListener(appsListener);
+ messageHandlingExecutor.shutdown();
+ executor.shutdown();
+ log.info("Stopped");
+ }
+
+ @Override
+ public void setDelegate(ApplicationStoreDelegate delegate) {
+ super.setDelegate(delegate);
+ executor.execute(this::bootstrapExistingApplications);
+ executor.schedule(() -> loadFromDisk(), APP_LOAD_DELAY_MS, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public Set<Application> getApplications() {
+ return ImmutableSet.copyOf(apps.values()
+ .stream()
+ .map(Versioned::value)
+ .map(InternalApplicationHolder::app)
+ .collect(Collectors.toSet()));
+ }
+
+ @Override
+ public ApplicationId getId(String name) {
+ return idStore.getAppId(name);
+ }
+
+ @Override
+ public Application getApplication(ApplicationId appId) {
+ InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
+ return appHolder != null ? appHolder.app() : null;
+ }
+
+ @Override
+ public ApplicationState getState(ApplicationId appId) {
+ InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
+ InternalState state = appHolder != null ? appHolder.state() : null;
+ return state == null ? null : state == ACTIVATED ? ApplicationState.ACTIVE : ApplicationState.INSTALLED;
+ }
+
+ @Override
+ public Application create(InputStream appDescStream) {
+ ApplicationDescription appDesc = saveApplication(appDescStream);
+ if (hasPrerequisites(appDesc)) {
+ return create(appDesc, true);
+ }
+ throw new ApplicationException("Missing dependencies for app " + appDesc.name());
+ }
+
+ private boolean hasPrerequisites(ApplicationDescription app) {
+ return !app.requiredApps().stream().map(n -> getId(n))
+ .anyMatch(id -> id == null || getApplication(id) == null);
+ }
+
+ private Application create(ApplicationDescription appDesc, boolean updateTime) {
+ Application app = registerApp(appDesc);
+ if (updateTime) {
+ updateTime(app.id().name());
+ }
+ InternalApplicationHolder previousApp =
+ Versioned.valueOrNull(apps.putIfAbsent(app.id(), new InternalApplicationHolder(app, INSTALLED, null)));
+ return previousApp != null ? previousApp.app() : app;
+ }
+
+ @Override
+ public void remove(ApplicationId appId) {
+ uninstallDependentApps(appId);
+ apps.remove(appId);
+ }
+
+ // Uninstalls all apps that depend on the given app.
+ private void uninstallDependentApps(ApplicationId appId) {
+ getApplications().stream()
+ .filter(a -> a.requiredApps().contains(appId.name()))
+ .forEach(a -> remove(a.id()));
+ }
+
+ @Override
+ public void activate(ApplicationId appId) {
+ activate(appId, coreAppId);
+ }
+
+ private void activate(ApplicationId appId, ApplicationId forAppId) {
+ requiredBy.put(appId, forAppId);
+ activate(appId, true);
+ }
+
+
+ private void activate(ApplicationId appId, boolean updateTime) {
+ AtomicBoolean stateChanged = new AtomicBoolean(false);
+ InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.computeIf(appId,
+ v -> v != null && v.state() != ACTIVATED,
+ (k, v) -> {
+ stateChanged.set(true);
+ return new InternalApplicationHolder(v.app(), ACTIVATED, v.permissions());
+ }));
+ if (stateChanged.get()) {
+ if (updateTime) {
+ updateTime(appId.name());
+ }
+ activateRequiredApps(appHolder.app());
+ }
+ }
+
+ // Activates all apps required by this application.
+ private void activateRequiredApps(Application app) {
+ app.requiredApps().stream().map(this::getId).forEach(id -> activate(id, app.id()));
+ }
+
+ @Override
+ public void deactivate(ApplicationId appId) {
+ deactivateDependentApps(appId);
+ deactivate(appId, coreAppId);
+ }
+
+ private void deactivate(ApplicationId appId, ApplicationId forAppId) {
+ requiredBy.remove(appId, forAppId);
+ if (requiredBy.get(appId).isEmpty()) {
+ AtomicBoolean stateChanged = new AtomicBoolean(false);
+ apps.computeIf(appId,
+ v -> v != null && v.state() != DEACTIVATED,
+ (k, v) -> {
+ stateChanged.set(true);
+ return new InternalApplicationHolder(v.app(), DEACTIVATED, v.permissions());
+ });
+ if (stateChanged.get()) {
+ updateTime(appId.name());
+ deactivateRequiredApps(appId);
+ }
+ }
+ }
+
+ // Deactivates all apps that require this application.
+ private void deactivateDependentApps(ApplicationId appId) {
+ apps.values()
+ .stream()
+ .map(Versioned::value)
+ .filter(a -> a.state() == ACTIVATED)
+ .filter(a -> a.app().requiredApps().contains(appId.name()))
+ .forEach(a -> deactivate(a.app().id()));
+ }
+
+ // Deactivates all apps required by this application.
+ private void deactivateRequiredApps(ApplicationId appId) {
+ getApplication(appId).requiredApps()
+ .stream()
+ .map(this::getId)
+ .map(apps::get)
+ .map(Versioned::value)
+ .filter(a -> a.state() == ACTIVATED)
+ .forEach(a -> deactivate(a.app().id(), appId));
+ }
+
+ @Override
+ public Set<Permission> getPermissions(ApplicationId appId) {
+ InternalApplicationHolder app = Versioned.valueOrNull(apps.get(appId));
+ return app != null ? ImmutableSet.copyOf(app.permissions()) : ImmutableSet.of();
+ }
+
+ @Override
+ public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
+ AtomicBoolean permissionsChanged = new AtomicBoolean(false);
+ Versioned<InternalApplicationHolder> appHolder = apps.computeIf(appId,
+ v -> v != null && !Sets.symmetricDifference(v.permissions(), permissions).isEmpty(),
+ (k, v) -> {
+ permissionsChanged.set(true);
+ return new InternalApplicationHolder(v.app(), v.state(), ImmutableSet.copyOf(permissions));
+ });
+ if (permissionsChanged.get()) {
+ delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, appHolder.value().app()));
+ }
+ }
+
+ /**
+ * Listener to application state distributed map changes.
+ */
+ private final class InternalAppsListener
+ implements MapEventListener<ApplicationId, InternalApplicationHolder> {
+ @Override
+ public void event(MapEvent<ApplicationId, InternalApplicationHolder> event) {
+ if (delegate == null) {
+ return;
+ }
+
+ ApplicationId appId = event.key();
+ InternalApplicationHolder newApp = event.newValue() == null ? null : event.newValue().value();
+ InternalApplicationHolder oldApp = event.oldValue() == null ? null : event.oldValue().value();
+ if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
+ if (event.type() == MapEvent.Type.UPDATE && newApp.state() == oldApp.state()) {
+ return;
+ }
+ setupApplicationAndNotify(appId, newApp.app(), newApp.state());
+ } else if (event.type() == MapEvent.Type.REMOVE) {
+ delegate.notify(new ApplicationEvent(APP_UNINSTALLED, oldApp.app()));
+ purgeApplication(appId.name());
+ }
+ }
+ }
+
+ private void setupApplicationAndNotify(ApplicationId appId, Application app, InternalState state) {
+ if (state == INSTALLED) {
+ fetchBitsIfNeeded(app);
+ delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
+ } else if (state == ACTIVATED) {
+ installAppIfNeeded(app);
+ setActive(appId.name());
+ delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
+ } else if (state == DEACTIVATED) {
+ clearActive(appId.name());
+ delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
+ }
+ }
+
+ /**
+ * Determines if the application bits are available locally.
+ */
+ private boolean appBitsAvailable(Application app) {
+ try {
+ ApplicationDescription appDesc = getApplicationDescription(app.id().name());
+ return appDesc.version().equals(app.version());
+ } catch (ApplicationException e) {
+ return false;
+ }
+ }
+
+ /**
+ * Fetches the bits from the cluster peers if necessary.
+ */
+ private void fetchBitsIfNeeded(Application app) {
+ if (!appBitsAvailable(app)) {
+ fetchBits(app);
+ }
+ }
+
+ /**
+ * Installs the application if necessary from the application peers.
+ */
+ private void installAppIfNeeded(Application app) {
+ if (!appBitsAvailable(app)) {
+ fetchBits(app);
+ delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
+ }
+ }
+
+ /**
+ * Fetches the bits from the cluster peers.
+ */
+ private void fetchBits(Application app) {
+ ControllerNode localNode = clusterService.getLocalNode();
+ CountDownLatch latch = new CountDownLatch(1);
+
+ // FIXME: send message with name & version to make sure we don't get served old bits
+
+ log.info("Downloading bits for application {}", app.id().name());
+ for (ControllerNode node : clusterService.getNodes()) {
+ if (latch.getCount() == 0) {
+ break;
+ }
+ if (node.equals(localNode)) {
+ continue;
+ }
+ clusterCommunicator.sendAndReceive(app.id().name(),
+ APP_BITS_REQUEST,
+ s -> s.getBytes(Charsets.UTF_8),
+ Function.identity(),
+ node.id())
+ .whenCompleteAsync((bits, error) -> {
+ if (error == null && latch.getCount() > 0) {
+ saveApplication(new ByteArrayInputStream(bits));
+ log.info("Downloaded bits for application {} from node {}",
+ app.id().name(), node.id());
+ latch.countDown();
+ } else if (error != null) {
+ log.warn("Unable to fetch bits for application {} from node {}",
+ app.id().name(), node.id());
+ }
+ }, executor);
+ }
+
+ try {
+ if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
+ log.warn("Unable to fetch bits for application {}", app.id().name());
+ }
+ } catch (InterruptedException e) {
+ log.warn("Interrupted while fetching bits for application {}", app.id().name());
+ }
+ }
+
+ /**
+ * Produces a registered application from the supplied description.
+ */
+ private Application registerApp(ApplicationDescription appDesc) {
+ ApplicationId appId = idStore.registerApplication(appDesc.name());
+ return new DefaultApplication(appId,
+ appDesc.version(),
+ appDesc.title(),
+ appDesc.description(),
+ appDesc.origin(),
+ appDesc.category(),
+ appDesc.url(),
+ appDesc.readme(),
+ appDesc.icon(),
+ appDesc.role(),
+ appDesc.permissions(),
+ appDesc.featuresRepo(),
+ appDesc.features(),
+ appDesc.requiredApps());
+ }
+
+ /**
+ * Internal class for holding app information.
+ */
+ private static class InternalApplicationHolder {
+ private final Application app;
+ private final InternalState state;
+ private final Set<Permission> permissions;
+
+ @SuppressWarnings("unused")
+ private InternalApplicationHolder() {
+ app = null;
+ state = null;
+ permissions = null;
+ }
+
+ public InternalApplicationHolder(Application app, InternalState state, Set<Permission> permissions) {
+ this.app = Preconditions.checkNotNull(app);
+ this.state = state;
+ this.permissions = permissions == null ? null : ImmutableSet.copyOf(permissions);
+ }
+
+ public Application app() {
+ return app;
+ }
+
+ public InternalState state() {
+ return state;
+ }
+
+ public Set<Permission> permissions() {
+ return permissions;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("app", app.id())
+ .add("state", state)
+ .toString();
+ }
+ }
+}