blob: 9809d11aef71443d5c9756c689f676b7f0a876df [file] [log] [blame]
/*
* Copyright 2015-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.app;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.KryoNamespace;
import org.onosproject.app.ApplicationDescription;
import org.onosproject.app.ApplicationEvent;
import org.onosproject.app.ApplicationException;
import org.onosproject.app.ApplicationState;
import org.onosproject.app.ApplicationStore;
import org.onosproject.app.ApplicationStoreDelegate;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.common.app.ApplicationArchive;
import org.onosproject.core.Application;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.ApplicationIdStore;
import org.onosproject.core.CoreService;
import org.onosproject.core.DefaultApplication;
import org.onosproject.security.Permission;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.EventuallyConsistentMapEvent;
import org.onosproject.store.service.EventuallyConsistentMapListener;
import org.onosproject.store.service.LogicalClockService;
import org.onosproject.store.service.MultiValuedTimestamp;
import org.onosproject.store.service.StorageException;
import org.onosproject.store.service.StorageService;
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static com.google.common.collect.Multimaps.newSetMultimap;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
import static org.onlab.util.Tools.randomDelay;
import static org.onosproject.app.ApplicationEvent.Type.*;
import static org.onosproject.store.app.GossipApplicationStore.InternalState.*;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.PUT;
import static org.onosproject.store.service.EventuallyConsistentMapEvent.Type.REMOVE;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of applications in a distributed data store that uses
* optimistic replication and gossip based anti-entropy techniques.
*/
@Component(immediate = true)
@Service
public class GossipApplicationStore extends ApplicationArchive
implements ApplicationStore {
private final Logger log = getLogger(getClass());
private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
private static final int MAX_LOAD_RETRIES = 5;
private static final int RETRY_DELAY_MS = 2_000;
private static final int FETCH_TIMEOUT_MS = 10_000;
private static final int APP_LOAD_DELAY_MS = 500;
private static List<String> pendingApps = Lists.newArrayList();
public enum InternalState {
INSTALLED, ACTIVATED, DEACTIVATED
}
private ScheduledExecutorService executor;
private ExecutorService messageHandlingExecutor;
private EventuallyConsistentMap<ApplicationId, Application> apps;
private EventuallyConsistentMap<Application, InternalState> states;
private EventuallyConsistentMap<Application, Set<Permission>> permissions;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterCommunicationService clusterCommunicator;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected LogicalClockService clockService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ApplicationIdStore idStore;
// Multimap to track which apps are required by others apps
// app -> { required-by, ... }
// Apps explicitly activated will be required by the CORE app
private final Multimap<ApplicationId, ApplicationId> requiredBy =
synchronizedSetMultimap(newSetMultimap(Maps.newHashMap(), Sets::newHashSet));
private ApplicationId coreAppId;
@Activate
public void activate() {
KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MultiValuedTimestamp.class)
.register(InternalState.class);
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
messageHandlingExecutor = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/app", "message-handler", log));
clusterCommunicator.<String, byte[]>addSubscriber(APP_BITS_REQUEST,
bytes -> new String(bytes, Charsets.UTF_8),
name -> {
try {
return toByteArray(getApplicationInputStream(name));
} catch (IOException e) {
throw new StorageException(e);
}
},
Function.identity(),
messageHandlingExecutor);
// FIXME: Consider consolidating into a single map.
apps = storageService.<ApplicationId, Application>eventuallyConsistentMapBuilder()
.withName("apps")
.withSerializer(serializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
states = storageService.<Application, InternalState>eventuallyConsistentMapBuilder()
.withName("app-states")
.withSerializer(serializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
states.addListener(new InternalAppStatesListener());
permissions = storageService.<Application, Set<Permission>>eventuallyConsistentMapBuilder()
.withName("app-permissions")
.withSerializer(serializer)
.withTimestampProvider((k, v) -> clockService.getTimestamp())
.build();
coreAppId = getId(CoreService.CORE_APP_NAME);
log.info("Started");
}
/**
* Loads the application inventory from the disk and activates apps if
* they are marked to be active.
*/
private void loadFromDisk() {
getApplicationNames().forEach(appName -> {
Application app = loadFromDisk(appName);
if (app != null && isActive(app.id().name())) {
activate(app.id(), false);
// TODO Load app permissions
}
});
}
private Application loadFromDisk(String appName) {
pendingApps.add(appName);
for (int i = 0; i < MAX_LOAD_RETRIES; i++) {
try {
// Directly return if app already exists
ApplicationId appId = getId(appName);
if (appId != null) {
Application application = getApplication(appId);
if (application != null) {
pendingApps.remove(appName);
return application;
}
}
ApplicationDescription appDesc = getApplicationDescription(appName);
Optional<String> loop = appDesc.requiredApps().stream()
.filter(app -> pendingApps.contains(app)).findAny();
if (loop.isPresent()) {
log.error("Circular app dependency detected: {} -> {}", pendingApps, loop.get());
pendingApps.remove(appName);
return null;
}
boolean success = appDesc.requiredApps().stream()
.noneMatch(requiredApp -> loadFromDisk(requiredApp) == null);
pendingApps.remove(appName);
return success ? create(appDesc, false) : null;
} catch (Exception e) {
log.warn("Unable to load application {} from disk; retrying", appName);
randomDelay(RETRY_DELAY_MS); //FIXME: This is a deliberate hack; fix in Falcon
}
}
pendingApps.remove(appName);
return null;
}
@Deactivate
public void deactivate() {
clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
messageHandlingExecutor.shutdown();
executor.shutdown();
apps.destroy();
states.destroy();
permissions.destroy();
log.info("Stopped");
}
@Override
public void setDelegate(ApplicationStoreDelegate delegate) {
super.setDelegate(delegate);
executor.schedule(() -> loadFromDisk(), APP_LOAD_DELAY_MS, TimeUnit.MILLISECONDS);
}
@Override
public Set<Application> getApplications() {
return ImmutableSet.copyOf(apps.values());
}
@Override
public ApplicationId getId(String name) {
return idStore.getAppId(name);
}
@Override
public Application getApplication(ApplicationId appId) {
return apps.get(appId);
}
@Override
public ApplicationState getState(ApplicationId appId) {
Application app = apps.get(appId);
InternalState s = app == null ? null : states.get(app);
return s == null ? null : s == ACTIVATED ?
ApplicationState.ACTIVE : ApplicationState.INSTALLED;
}
@Override
public Application create(InputStream appDescStream) {
ApplicationDescription appDesc = saveApplication(appDescStream);
if (hasPrerequisites(appDesc)) {
return create(appDesc, true);
}
throw new ApplicationException("Missing dependencies for app " + appDesc.name());
}
private boolean hasPrerequisites(ApplicationDescription app) {
return !app.requiredApps().stream().map(n -> getId(n))
.anyMatch(id -> id == null || getApplication(id) == null);
}
private Application create(ApplicationDescription appDesc, boolean updateTime) {
Application app = registerApp(appDesc);
if (updateTime) {
updateTime(app.id().name());
}
apps.put(app.id(), app);
states.put(app, INSTALLED);
return app;
}
@Override
public void remove(ApplicationId appId) {
Application app = apps.get(appId);
if (app != null) {
uninstallDependentApps(app);
apps.remove(appId);
states.remove(app);
permissions.remove(app);
}
}
// Uninstalls all apps that depend on the given app.
private void uninstallDependentApps(Application app) {
getApplications().stream()
.filter(a -> a.requiredApps().contains(app.id().name()))
.forEach(a -> remove(a.id()));
}
@Override
public void activate(ApplicationId appId) {
activate(appId, coreAppId);
}
private void activate(ApplicationId appId, ApplicationId forAppId) {
requiredBy.put(appId, forAppId);
activate(appId, true);
}
private void activate(ApplicationId appId, boolean updateTime) {
Application app = apps.get(appId);
if (app != null) {
if (updateTime) {
updateTime(appId.name());
}
activateRequiredApps(app);
states.put(app, ACTIVATED);
}
}
// Activates all apps required by this application.
private void activateRequiredApps(Application app) {
app.requiredApps().stream().map(this::getId).forEach(id -> activate(id, app.id()));
}
@Override
public void deactivate(ApplicationId appId) {
deactivateDependentApps(getApplication(appId));
deactivate(appId, coreAppId);
}
private void deactivate(ApplicationId appId, ApplicationId forAppId) {
requiredBy.remove(appId, forAppId);
if (requiredBy.get(appId).isEmpty()) {
Application app = apps.get(appId);
if (app != null) {
updateTime(appId.name());
states.put(app, DEACTIVATED);
deactivateRequiredApps(app);
}
}
}
// Deactivates all apps that require this application.
private void deactivateDependentApps(Application app) {
getApplications().stream()
.filter(a -> states.get(a) == ACTIVATED)
.filter(a -> a.requiredApps().contains(app.id().name()))
.forEach(a -> deactivate(a.id()));
}
// Deactivates all apps required by this application.
private void deactivateRequiredApps(Application app) {
app.requiredApps().stream().map(this::getId).map(this::getApplication)
.filter(a -> states.get(a) == ACTIVATED)
.forEach(a -> deactivate(a.id(), app.id()));
}
@Override
public Set<Permission> getPermissions(ApplicationId appId) {
Application app = apps.get(appId);
return app != null ? permissions.get(app) : null;
}
@Override
public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
Application app = getApplication(appId);
if (app != null) {
this.permissions.put(app, permissions);
delegate.notify(new ApplicationEvent(APP_PERMISSIONS_CHANGED, app));
}
}
/**
* Listener to application state distributed map changes.
*/
private final class InternalAppStatesListener
implements EventuallyConsistentMapListener<Application, InternalState> {
@Override
public void event(EventuallyConsistentMapEvent<Application, InternalState> event) {
// If we do not have a delegate, refuse to process any events entirely.
// This is to allow the anti-entropy to kick in and process the events
// perhaps a bit later, but with opportunity to notify delegate.
if (delegate == null) {
return;
}
Application app = event.key();
InternalState state = event.value();
if (event.type() == PUT) {
if (state == INSTALLED) {
fetchBitsIfNeeded(app);
delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
} else if (state == ACTIVATED) {
installAppIfNeeded(app);
setActive(app.id().name());
delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
} else if (state == DEACTIVATED) {
clearActive(app.id().name());
delegate.notify(new ApplicationEvent(APP_DEACTIVATED, app));
}
} else if (event.type() == REMOVE) {
delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
purgeApplication(app.id().name());
}
}
}
/**
* Determines if the application bits are available locally.
*/
private boolean appBitsAvailable(Application app) {
try {
ApplicationDescription appDesc = getApplicationDescription(app.id().name());
return appDesc.version().equals(app.version());
} catch (ApplicationException e) {
return false;
}
}
/**
* Fetches the bits from the cluster peers if necessary.
*/
private void fetchBitsIfNeeded(Application app) {
if (!appBitsAvailable(app)) {
fetchBits(app);
}
}
/**
* Installs the application if necessary from the application peers.
*/
private void installAppIfNeeded(Application app) {
if (!appBitsAvailable(app)) {
fetchBits(app);
delegate.notify(new ApplicationEvent(APP_INSTALLED, app));
}
}
/**
* Fetches the bits from the cluster peers.
*/
private void fetchBits(Application app) {
ControllerNode localNode = clusterService.getLocalNode();
CountDownLatch latch = new CountDownLatch(1);
// FIXME: send message with name & version to make sure we don't get served old bits
log.info("Downloading bits for application {}", app.id().name());
for (ControllerNode node : clusterService.getNodes()) {
if (latch.getCount() == 0) {
break;
}
if (node.equals(localNode)) {
continue;
}
clusterCommunicator.sendAndReceive(app.id().name(),
APP_BITS_REQUEST,
s -> s.getBytes(Charsets.UTF_8),
Function.identity(),
node.id())
.whenCompleteAsync((bits, error) -> {
if (error == null && latch.getCount() > 0) {
saveApplication(new ByteArrayInputStream(bits));
log.info("Downloaded bits for application {} from node {}",
app.id().name(), node.id());
latch.countDown();
} else if (error != null) {
log.warn("Unable to fetch bits for application {} from node {}",
app.id().name(), node.id());
}
}, executor);
}
try {
if (!latch.await(FETCH_TIMEOUT_MS, MILLISECONDS)) {
log.warn("Unable to fetch bits for application {}", app.id().name());
}
} catch (InterruptedException e) {
log.warn("Interrupted while fetching bits for application {}", app.id().name());
}
}
/**
* Prunes applications which are not in the map, but are on disk.
*/
private void pruneUninstalledApps() {
for (String name : getApplicationNames()) {
if (getApplication(getId(name)) == null) {
Application app = registerApp(getApplicationDescription(name));
delegate.notify(new ApplicationEvent(APP_UNINSTALLED, app));
purgeApplication(app.id().name());
}
}
}
/**
* Produces a registered application from the supplied description.
*/
private Application registerApp(ApplicationDescription appDesc) {
ApplicationId appId = idStore.registerApplication(appDesc.name());
return new DefaultApplication(appId,
appDesc.version(),
appDesc.title(),
appDesc.description(),
appDesc.origin(),
appDesc.category(),
appDesc.url(),
appDesc.readme(),
appDesc.icon(),
appDesc.role(),
appDesc.permissions(),
appDesc.featuresRepo(),
appDesc.features(),
appDesc.requiredApps());
}
}