Updating change to distributed application store to re-enable reinstalling apps
Change-Id: I0ba74e5b4cbb4ae77d6cf30270f1572c4ce46e48
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
index fd6826a..c9981da 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
@@ -146,33 +146,33 @@
@Activate
public void activate() {
messageHandlingExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app",
- "message-handler", log));
+ "message-handler", log));
clusterCommunicator.addSubscriber(APP_BITS_REQUEST,
- bytes -> new String(bytes, Charsets.UTF_8),
- name -> {
- try {
- log.info("Sending bits for application {}", name);
- return toByteArray(getApplicationInputStream(name));
- } catch (IOException e) {
- throw new StorageException(e);
- }
- },
- Function.identity(),
- messageHandlingExecutor);
+ bytes -> new String(bytes, Charsets.UTF_8),
+ name -> {
+ try {
+ log.info("Sending bits for application {}", name);
+ return toByteArray(getApplicationInputStream(name));
+ } catch (IOException e) {
+ throw new StorageException(e);
+ }
+ },
+ Function.identity(),
+ messageHandlingExecutor);
apps = storageService.<ApplicationId, InternalApplicationHolder>consistentMapBuilder()
.withName("onos-apps")
.withRelaxedReadConsistency()
.withSerializer(Serializer.using(KryoNamespaces.API,
- InternalApplicationHolder.class,
- InternalState.class))
+ InternalApplicationHolder.class,
+ InternalState.class))
.build();
appActivationTopic = storageService.getTopic("onos-apps-activation-topic",
- Serializer.using(KryoNamespaces.API));
+ Serializer.using(KryoNamespaces.API));
activationExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app",
- "app-activation", log));
+ "app-activation", log));
appActivationTopic.subscribe(appActivator, activationExecutor);
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
@@ -272,10 +272,10 @@
@Override
public Set<Application> getApplications() {
return ImmutableSet.copyOf(apps.values()
- .stream()
- .map(Versioned::value)
- .map(InternalApplicationHolder::app)
- .collect(Collectors.toSet()));
+ .stream()
+ .map(Versioned::value)
+ .map(InternalApplicationHolder::app)
+ .collect(Collectors.toSet()));
}
@Override
@@ -385,11 +385,11 @@
if (requiredBy.get(appId).isEmpty()) {
AtomicBoolean stateChanged = new AtomicBoolean(false);
apps.computeIf(appId,
- v -> v != null && v.state() != DEACTIVATED,
- (k, v) -> {
- stateChanged.set(true);
- return new InternalApplicationHolder(v.app(), DEACTIVATED, v.permissions());
- });
+ v -> v != null && v.state() != DEACTIVATED,
+ (k, v) -> {
+ stateChanged.set(true);
+ return new InternalApplicationHolder(v.app(), DEACTIVATED, v.permissions());
+ });
if (stateChanged.get()) {
updateTime(appId.name());
deactivateRequiredApps(appId);
@@ -400,22 +400,22 @@
// Deactivates all apps that require this application.
private void deactivateDependentApps(ApplicationId appId) {
apps.values()
- .stream()
- .map(Versioned::value)
- .filter(a -> a.state() == ACTIVATED)
- .filter(a -> a.app().requiredApps().contains(appId.name()))
- .forEach(a -> deactivate(a.app().id()));
+ .stream()
+ .map(Versioned::value)
+ .filter(a -> a.state() == ACTIVATED)
+ .filter(a -> a.app().requiredApps().contains(appId.name()))
+ .forEach(a -> deactivate(a.app().id()));
}
// Deactivates all apps required by this application.
private void deactivateRequiredApps(ApplicationId appId) {
getApplication(appId).requiredApps()
- .stream()
- .map(this::getId)
- .map(apps::get)
- .map(Versioned::value)
- .filter(a -> a.state() == ACTIVATED)
- .forEach(a -> deactivate(a.app().id(), appId));
+ .stream()
+ .map(this::getId)
+ .map(apps::get)
+ .map(Versioned::value)
+ .filter(a -> a.state() == ACTIVATED)
+ .forEach(a -> deactivate(a.app().id(), appId));
}
@Override
@@ -428,11 +428,11 @@
public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
AtomicBoolean permissionsChanged = new AtomicBoolean(false);
Versioned<InternalApplicationHolder> appHolder = apps.computeIf(appId,
- v -> v != null && !Sets.symmetricDifference(v.permissions(), permissions).isEmpty(),
- (k, v) -> {
- permissionsChanged.set(true);
- return new InternalApplicationHolder(v.app(), v.state(), ImmutableSet.copyOf(permissions));
- });
+ v -> v != null && !Sets.symmetricDifference(v.permissions(), permissions).isEmpty(),
+ (k, v) -> {
+ permissionsChanged.set(true);
+ return new InternalApplicationHolder(v.app(), v.state(), ImmutableSet.copyOf(permissions));
+ });
if (permissionsChanged.get()) {
notifyDelegate(new ApplicationEvent(APP_PERMISSIONS_CHANGED, appHolder.value().app()));
}
@@ -464,17 +464,18 @@
ApplicationId appId = event.key();
InternalApplicationHolder newApp = event.newValue() == null ? null : event.newValue().value();
InternalApplicationHolder oldApp = event.oldValue() == null ? null : event.oldValue().value();
- if (newApp == null || oldApp == null) {
+ if (event.type() == MapEvent.Type.UPDATE && (newApp == null || oldApp == null ||
+ newApp.state() == oldApp.state())) {
+ log.warn("Can't update the application {}", event.key());
return;
}
- if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
- if (event.type() == MapEvent.Type.UPDATE && newApp.state() == oldApp.state()) {
- return;
- }
+ if ((event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) && newApp != null) {
setupApplicationAndNotify(appId, newApp.app(), newApp.state());
- } else if (event.type() == MapEvent.Type.REMOVE) {
+ } else if (event.type() == MapEvent.Type.REMOVE && oldApp != null) {
notifyDelegate(new ApplicationEvent(APP_UNINSTALLED, oldApp.app()));
purgeApplication(appId.name());
+ } else {
+ log.warn("Can't perform {} on application {}", event.type(), event.key());
}
}
}
@@ -538,22 +539,22 @@
continue;
}
clusterCommunicator.sendAndReceive(app.id().name(),
- APP_BITS_REQUEST,
- s -> s.getBytes(Charsets.UTF_8),
- Function.identity(),
- node.id())
+ APP_BITS_REQUEST,
+ s -> s.getBytes(Charsets.UTF_8),
+ Function.identity(),
+ node.id())
.whenCompleteAsync((bits, error) -> {
if (error == null && latch.getCount() > 0) {
saveApplication(new ByteArrayInputStream(bits));
log.info("Downloaded bits for application {} from node {}",
- app.id().name(), node.id());
+ app.id().name(), node.id());
latch.countDown();
if (delegateInstallation) {
notifyDelegate(new ApplicationEvent(APP_INSTALLED, app));
}
} else if (error != null) {
log.warn("Unable to fetch bits for application {} from node {}",
- app.id().name(), node.id());
+ app.id().name(), node.id());
}
}, executor);
}