New ApplicationStore that uses a single ConsistentMap to track all app related state
Change-Id: Ieacc97f213add8ece8f462cd9971fb6ef3d0dde5
diff --git a/core/api/src/main/java/org/onosproject/store/service/Versioned.java b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
index ecd56b3..6c6834e 100644
--- a/core/api/src/main/java/org/onosproject/store/service/Versioned.java
+++ b/core/api/src/main/java/org/onosproject/store/service/Versioned.java
@@ -96,7 +96,7 @@
* @param <U> value type of the returned instance
* @return mapped instance
*/
- public <U> Versioned<U> map(Function<V, U> transformer) {
+ public synchronized <U> Versioned<U> map(Function<V, U> transformer) {
return new Versioned<>(transformer.apply(value), version, creationTime);
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
similarity index 64%
rename from core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
rename to core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
index 9809d11..d2707d6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2015-present Open Networking Laboratory
+ * Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,18 +16,20 @@
package org.onosproject.store.app;
import com.google.common.base.Charsets;
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoNamespace;
import org.onosproject.app.ApplicationDescription;
import org.onosproject.app.ApplicationEvent;
import org.onosproject.app.ApplicationException;
@@ -46,13 +48,14 @@
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.EventuallyConsistentMap;
-import org.onosproject.store.service.EventuallyConsistentMapEvent;
-import org.onosproject.store.service.EventuallyConsistentMapListener;
-import org.onosproject.store.service.LogicalClockService;
-import org.onosproject.store.service.MultiValuedTimestamp;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.onosproject.store.service.DistributedPrimitive.Status;
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
@@ -66,7 +69,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
import java.util.function.Function;
+import java.util.stream.Collectors;
import static com.google.common.collect.Multimaps.newSetMultimap;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
@@ -76,18 +82,16 @@
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.randomDelay;
import static org.onosproject.app.ApplicationEvent.Type.*;
-import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
-import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
+import static org.onosproject.store.app.DistributedApplicationStore.InternalState.*;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Manages inventory of applications in a distributed data store that uses
- * optimistic replication and gossip based anti-entropy techniques.
+ * Manages inventory of applications in a distributed data store providing
+ * stronger consistency guarantees.
*/
-@Component(immediate = true)
+@Component(immediate = true, enabled = true)
@Service
-public class GossipApplicationStore extends ApplicationArchive
+public class DistributedApplicationStore extends ApplicationArchive
implements ApplicationStore {
private final Logger log = getLogger(getClass());
@@ -110,9 +114,7 @@
private ScheduledExecutorService executor;
private ExecutorService messageHandlingExecutor;
- private EventuallyConsistentMap<ApplicationId, Application> apps;
- private EventuallyConsistentMap<Application, InternalState> states;
- private EventuallyConsistentMap<Application, Set<Permission>> permissions;
+ private ConsistentMap<ApplicationId, InternalApplicationHolder> apps;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@@ -124,11 +126,12 @@
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected LogicalClockService clockService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ApplicationIdStore idStore;
+ private final InternalAppsListener appsListener = new InternalAppsListener();
+
+ private Consumer<Status> statusChangeListener;
+
// Multimap to track which apps are required by others apps
// app -> { required-by, ... }
// Apps explicitly activated will be required by the CORE app
@@ -139,16 +142,8 @@
@Activate
public void activate() {
- KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
- .register(KryoNamespaces.API)
- .register(MultiValuedTimestamp.class)
- .register(InternalState.class);
-
- executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
-
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/app", "message-handler", log));
-
clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
bytes -> new String(bytes, Charsets.UTF_8),
name -> {
@@ -161,33 +156,36 @@
Function.identity(),
messageHandlingExecutor);
- // FIXME: Consider consolidating into a single map.
-
- apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
- .withName("apps")
- .withSerializer(serializer)
- .withTimestampProvider((k, v) -> clockService.getTimestamp())
+ apps = storageService.<ApplicationId, InternalApplicationHolder>consistentMapBuilder()
+ .withName("onos-apps")
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ InternalApplicationHolder.class,
+ InternalState.class))
.build();
- states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
- .withName("app-states")
- .withSerializer(serializer)
- .withTimestampProvider((k, v) -> clockService.getTimestamp())
- .build();
-
- states.addListener(new InternalAppStatesListener());
-
- permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
- .withName("app-permissions")
- .withSerializer(serializer)
- .withTimestampProvider((k, v) -> clockService.getTimestamp())
- .build();
-
+ executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
+ statusChangeListener = status -> {
+ if (status == Status.ACTIVE) {
+ executor.execute(this::bootstrapExistingApplications);
+ }
+ };
+ apps.addListener(appsListener, messageHandlingExecutor);
+ apps.addStatusChangeListener(statusChangeListener);
coreAppId = getId(CoreService.CORE_APP_NAME);
log.info("Started");
}
/**
+ * Processes existing applications from the distributed map. This is done to
+ * account for events that this instance may be have missed due to a staggered start.
+ */
+ void bootstrapExistingApplications() {
+ apps.asJavaMap().forEach((appId, holder) -> setupApplicationAndNotify(appId, holder.app(), holder.state()));
+
+ }
+
+ /**
* Loads the application inventory from the disk and activates apps if
* they are marked to be active.
*/
@@ -244,23 +242,27 @@
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
+ apps.removeStatusChangeListener(statusChangeListener);
+ apps.removeListener(appsListener);
messageHandlingExecutor.shutdown();
executor.shutdown();
- apps.destroy();
- states.destroy();
- permissions.destroy();
log.info("Stopped");
}
@Override
public void setDelegate(ApplicationStoreDelegate delegate) {
super.setDelegate(delegate);
+ executor.execute(this::bootstrapExistingApplications);
executor.schedule(() -> loadFromDisk(), APP_LOAD_DELAY_MS, TimeUnit.MILLISECONDS);
}
@Override
public Set<Application> getApplications() {
- return ImmutableSet.copyOf(apps.values());
+ return ImmutableSet.copyOf(apps.values()
+ .stream()
+ .map(Versioned::value)
+ .map(InternalApplicationHolder::app)
+ .collect(Collectors.toSet()));
}
@Override
@@ -270,15 +272,15 @@
@Override
public Application getApplication(ApplicationId appId) {
- return apps.get(appId);
+ InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
+ return appHolder != null ? appHolder.app() : null;
}
@Override
public ApplicationState getState(ApplicationId appId) {
- Application app = apps.get(appId);
- InternalState s = app == null ? null : states.get(app);
- return s == null ? null : s == ACTIVATED ?
- ApplicationState.ACTIVE : ApplicationState.INSTALLED;
+ InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.get(appId));
+ InternalState state = appHolder != null ? appHolder.state() : null;
+ return state == null ? null : state == ACTIVATED ? ApplicationState.ACTIVE : ApplicationState.INSTALLED;
}
@Override
@@ -300,26 +302,21 @@
if (updateTime) {
updateTime(app.id().name());
}
- apps.put(app.id(), app);
- states.put(app, INSTALLED);
- return app;
+ InternalApplicationHolder previousApp =
+ Versioned.valueOrNull(apps.putIfAbsent(app.id(), new InternalApplicationHolder(app, INSTALLED, null)));
+ return previousApp != null ? previousApp.app() : app;
}
@Override
public void remove(ApplicationId appId) {
- Application app = apps.get(appId);
- if (app != null) {
- uninstallDependentApps(app);
- apps.remove(appId);
- states.remove(app);
- permissions.remove(app);
- }
+ uninstallDependentApps(appId);
+ apps.remove(appId);
}
// Uninstalls all apps that depend on the given app.
- private void uninstallDependentApps(Application app) {
+ private void uninstallDependentApps(ApplicationId appId) {
getApplications().stream()
- .filter(a -> a.requiredApps().contains(app.id().name()))
+ .filter(a -> a.requiredApps().contains(appId.name()))
.forEach(a -> remove(a.id()));
}
@@ -335,13 +332,18 @@
private void activate(ApplicationId appId, boolean updateTime) {
- Application app = apps.get(appId);
- if (app != null) {
+ AtomicBoolean stateChanged = new AtomicBoolean(false);
+ InternalApplicationHolder appHolder = Versioned.valueOrNull(apps.computeIf(appId,
+ v -> v != null && v.state() != ACTIVATED,
+ (k, v) -> {
+ stateChanged.set(true);
+ return new InternalApplicationHolder(v.app(), ACTIVATED, v.permissions());
+ }));
+ if (stateChanged.get()) {
if (updateTime) {
updateTime(appId.name());
}
- activateRequiredApps(app);
- states.put(app, ACTIVATED);
+ activateRequiredApps(appHolder.app());
}
}
@@ -352,90 +354,108 @@
@Override
public void deactivate(ApplicationId appId) {
- deactivateDependentApps(getApplication(appId));
+ deactivateDependentApps(appId);
deactivate(appId, coreAppId);
}
private void deactivate(ApplicationId appId, ApplicationId forAppId) {
requiredBy.remove(appId, forAppId);
if (requiredBy.get(appId).isEmpty()) {
- Application app = apps.get(appId);
- if (app != null) {
+ AtomicBoolean stateChanged = new AtomicBoolean(false);
+ apps.computeIf(appId,
+ v -> v != null && v.state() != DEACTIVATED,
+ (k, v) -> {
+ stateChanged.set(true);
+ return new InternalApplicationHolder(v.app(), DEACTIVATED, v.permissions());
+ });
+ if (stateChanged.get()) {
updateTime(appId.name());
- states.put(app, DEACTIVATED);
- deactivateRequiredApps(app);
+ deactivateRequiredApps(appId);
}
}
}
// Deactivates all apps that require this application.
- private void deactivateDependentApps(Application app) {
- getApplications().stream()
- .filter(a -> states.get(a) == ACTIVATED)
- .filter(a -> a.requiredApps().contains(app.id().name()))
- .forEach(a -> deactivate(a.id()));
+ private void deactivateDependentApps(ApplicationId appId) {
+ apps.values()
+ .stream()
+ .map(Versioned::value)
+ .filter(a -> a.state() == ACTIVATED)
+ .filter(a -> a.app().requiredApps().contains(appId.name()))
+ .forEach(a -> deactivate(a.app().id()));
}
// Deactivates all apps required by this application.
- private void deactivateRequiredApps(Application app) {
- app.requiredApps().stream().map(this::getId).map(this::getApplication)
- .filter(a -> states.get(a) == ACTIVATED)
- .forEach(a -> deactivate(a.id(), app.id()));
+ private void deactivateRequiredApps(ApplicationId appId) {
+ getApplication(appId).requiredApps()
+ .stream()
+ .map(this::getId)
+ .map(apps::get)
+ .map(Versioned::value)
+ .filter(a -> a.state() == ACTIVATED)
+ .forEach(a -> deactivate(a.app().id(), appId));
}
@Override
public Set<Permission> getPermissions(ApplicationId appId) {
- Application app = apps.get(appId);
- return app != null ? permissions.get(app) : null;
+ InternalApplicationHolder app = Versioned.valueOrNull(apps.get(appId));
+ return app != null ? ImmutableSet.copyOf(app.permissions()) : ImmutableSet.of();
}
@Override
public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
- Application app = getApplication(appId);
- if (app != null) {
- this.permissions.put(app, permissions);
- delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
+ AtomicBoolean permissionsChanged = new AtomicBoolean(false);
+ Versioned<InternalApplicationHolder> appHolder = apps.computeIf(appId,
+ v -> v != null && !Sets.symmetricDifference(v.permissions(), permissions).isEmpty(),
+ (k, v) -> {
+ permissionsChanged.set(true);
+ return new InternalApplicationHolder(v.app(), v.state(), ImmutableSet.copyOf(permissions));
+ });
+ if (permissionsChanged.get()) {
+ delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, appHolder.value().app()));
}
}
/**
* Listener to application state distributed map changes.
*/
- private final class InternalAppStatesListener
- implements EventuallyConsistentMapListener<Application, InternalState> {
+ private final class InternalAppsListener
+ implements MapEventListener<ApplicationId, InternalApplicationHolder> {
@Override
- public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
- // If we do not have a delegate, refuse to process any events entirely.
- // This is to allow the anti-entropy to kick in and process the events
- // perhaps a bit later, but with opportunity to notify delegate.
+ public void event(MapEvent<ApplicationId, InternalApplicationHolder> event) {
if (delegate == null) {
return;
}
- Application app = event.key();
- InternalState state = event.value();
-
- if (event.type() == PUT) {
- if (state == INSTALLED) {
- fetchBitsIfNeeded(app);
- delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
-
- } else if (state == ACTIVATED) {
- installAppIfNeeded(app);
- setActive(app.id().name());
- delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
-
- } else if (state == DEACTIVATED) {
- clearActive(app.id().name());
- delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
+ ApplicationId appId = event.key();
+ InternalApplicationHolder newApp = event.newValue() == null ? null : event.newValue().value();
+ InternalApplicationHolder oldApp = event.oldValue() == null ? null : event.oldValue().value();
+ if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
+ if (event.type() == MapEvent.Type.UPDATE && newApp.state() == oldApp.state()) {
+ return;
}
- } else if (event.type() == REMOVE) {
- delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
- purgeApplication(app.id().name());
+ setupApplicationAndNotify(appId, newApp.app(), newApp.state());
+ } else if (event.type() == MapEvent.Type.REMOVE) {
+ delegate.notify(new ApplicationEvent(APP_UNINSTALLED, oldApp.app()));
+ purgeApplication(appId.name());
}
}
}
+ private void setupApplicationAndNotify(ApplicationId appId, Application app, InternalState state) {
+ if (state == INSTALLED) {
+ fetchBitsIfNeeded(app);
+ delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
+ } else if (state == ACTIVATED) {
+ installAppIfNeeded(app);
+ setActive(appId.name());
+ delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
+ } else if (state == DEACTIVATED) {
+ clearActive(appId.name());
+ delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
+ }
+ }
+
/**
* Determines if the application bits are available locally.
*/
@@ -512,19 +532,6 @@
}
/**
- * Prunes applications which are not in the map, but are on disk.
- */
- private void pruneUninstalledApps() {
- for (String name : getApplicationNames()) {
- if (getApplication(getId(name)) == null) {
- Application app = registerApp(getApplicationDescription(name));
- delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
- purgeApplication(app.id().name());
- }
- }
- }
-
- /**
* Produces a registered application from the supplied description.
*/
private Application registerApp(ApplicationDescription appDesc) {
@@ -544,4 +551,46 @@
appDesc.features(),
appDesc.requiredApps());
}
+
+ /**
+ * Internal class for holding app information.
+ */
+ private static class InternalApplicationHolder {
+ private final Application app;
+ private final InternalState state;
+ private final Set<Permission> permissions;
+
+ @SuppressWarnings("unused")
+ private InternalApplicationHolder() {
+ app = null;
+ state = null;
+ permissions = null;
+ }
+
+ public InternalApplicationHolder(Application app, InternalState state, Set<Permission> permissions) {
+ this.app = Preconditions.checkNotNull(app);
+ this.state = state;
+ this.permissions = permissions == null ? null : ImmutableSet.copyOf(permissions);
+ }
+
+ public Application app() {
+ return app;
+ }
+
+ public InternalState state() {
+ return state;
+ }
+
+ public Set<Permission> permissions() {
+ return permissions;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("app", app.id())
+ .add("state", state)
+ .toString();
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/package-info.java b/core/store/dist/src/main/java/org/onosproject/store/app/package-info.java
index 2765d29..19ba3dc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/package-info.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/package-info.java
@@ -15,6 +15,6 @@
*/
/**
- * Implementation of distributed applications store.
+ * Implementation of distributed application store.
*/
package org.onosproject.store.app;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java b/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java
index cc98ac5..2c7b554 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/core/impl/DistributedApplicationIdStore.java
@@ -38,7 +38,6 @@
import org.onosproject.store.service.MapEventListener;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
-import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
@@ -109,8 +108,14 @@
@Override
public ApplicationId registerApplication(String name) {
- return Versioned.valueOrNull(registeredIds.computeIfAbsent(name,
- key -> new DefaultApplicationId((int) appIdCounter.incrementAndGet(), name)));
+ ApplicationId exisitingAppId = registeredIds.asJavaMap().get(name);
+ if (exisitingAppId == null) {
+ ApplicationId newAppId = new DefaultApplicationId((int) appIdCounter.incrementAndGet(), name);
+ exisitingAppId = registeredIds.asJavaMap().putIfAbsent(name, newAppId);
+ return exisitingAppId == null ? newAppId : exisitingAppId;
+ } else {
+ return exisitingAppId;
+ }
}
private void primeIdToAppIdCache() {
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java
index 45e83b25..31452ac 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/tunnel/impl/DistributedTunnelStore.java
@@ -54,7 +54,6 @@
import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.AbstractStore;
-import org.onosproject.store.app.GossipApplicationStore.InternalState;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
@@ -121,8 +120,7 @@
public void activate() {
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
- .register(MultiValuedTimestamp.class)
- .register(InternalState.class);
+ .register(MultiValuedTimestamp.class);
tunnelIdAsKeyStore = storageService
.<TunnelId, Tunnel>eventuallyConsistentMapBuilder()
.withName("all_tunnel").withSerializer(serializer)