Updating change to distributed application store to re-enable reinstalling apps

Change-Id: I0ba74e5b4cbb4ae77d6cf30270f1572c4ce46e48
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
index c774bc4..05e4020 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
@@ -150,33 +150,33 @@
     @Activate
     public void activate() {
         messageHandlingExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app",
-                                                                         "message-handler", log));
+                "message-handler", log));
         clusterCommunicator.addSubscriber(APP_BITS_REQUEST,
-                                          bytes -> new String(bytes, Charsets.UTF_8),
-                                          name -> {
-                                              try {
-                                                  log.info("Sending bits for application {}", name);
-                                                  return toByteArray(getApplicationInputStream(name));
-                                              } catch (IOException e) {
-                                                  throw new StorageException(e);
-                                              }
-                                          },
-                                          Function.identity(),
-                                          messageHandlingExecutor);
+                bytes -> new String(bytes, Charsets.UTF_8),
+                name -> {
+                    try {
+                        log.info("Sending bits for application {}", name);
+                        return toByteArray(getApplicationInputStream(name));
+                    } catch (IOException e) {
+                        throw new StorageException(e);
+                    }
+                },
+                Function.identity(),
+                messageHandlingExecutor);
 
         apps = storageService.<ApplicationId, InternalApplicationHolder>consistentMapBuilder()
                 .withName("onos-apps")
                 .withRelaxedReadConsistency()
                 .withSerializer(Serializer.using(KryoNamespaces.API,
-                                                 InternalApplicationHolder.class,
-                                                 InternalState.class))
+                        InternalApplicationHolder.class,
+                        InternalState.class))
                 .build();
 
         appActivationTopic = storageService.getTopic("onos-apps-activation-topic",
-                                                     Serializer.using(KryoNamespaces.API));
+                Serializer.using(KryoNamespaces.API));
 
         activationExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app",
-                                                                    "app-activation", log));
+                "app-activation", log));
         appActivationTopic.subscribe(appActivator, activationExecutor);
 
         executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
@@ -313,10 +313,10 @@
     @Override
     public Set<Application> getApplications() {
         return ImmutableSet.copyOf(apps.values()
-                                       .stream()
-                                       .map(Versioned::value)
-                                       .map(InternalApplicationHolder::app)
-                                       .collect(Collectors.toSet()));
+                .stream()
+                .map(Versioned::value)
+                .map(InternalApplicationHolder::app)
+                .collect(Collectors.toSet()));
     }
 
     @Override
@@ -425,11 +425,11 @@
         if (requiredBy.get(appId).isEmpty()) {
             AtomicBoolean stateChanged = new AtomicBoolean(false);
             apps.computeIf(appId,
-                v -> v != null && v.state() != DEACTIVATED,
-                (k, v) -> {
-                    stateChanged.set(true);
-                    return new InternalApplicationHolder(v.app(), DEACTIVATED, v.permissions());
-                });
+                    v -> v != null && v.state() != DEACTIVATED,
+                    (k, v) -> {
+                        stateChanged.set(true);
+                        return new InternalApplicationHolder(v.app(), DEACTIVATED, v.permissions());
+                    });
             if (stateChanged.get()) {
                 updateTime(appId.name());
                 deactivateRequiredApps(appId);
@@ -440,22 +440,22 @@
     // Deactivates all apps that require this application.
     private void deactivateDependentApps(ApplicationId appId) {
         apps.values()
-            .stream()
-            .map(Versioned::value)
-            .filter(a -> a.state() == ACTIVATED)
-            .filter(a -> a.app().requiredApps().contains(appId.name()))
-            .forEach(a -> deactivate(a.app().id()));
+                .stream()
+                .map(Versioned::value)
+                .filter(a -> a.state() == ACTIVATED)
+                .filter(a -> a.app().requiredApps().contains(appId.name()))
+                .forEach(a -> deactivate(a.app().id()));
     }
 
     // Deactivates all apps required by this application.
     private void deactivateRequiredApps(ApplicationId appId) {
         getApplication(appId).requiredApps()
-                             .stream()
-                             .map(this::getId)
-                             .map(apps::get)
-                             .map(Versioned::value)
-                             .filter(a -> a.state() == ACTIVATED)
-                             .forEach(a -> deactivate(a.app().id(), appId));
+                .stream()
+                .map(this::getId)
+                .map(apps::get)
+                .map(Versioned::value)
+                .filter(a -> a.state() == ACTIVATED)
+                .forEach(a -> deactivate(a.app().id(), appId));
     }
 
     @Override
@@ -468,11 +468,11 @@
     public void setPermissions(ApplicationId appId, Set<Permission> permissions) {
         AtomicBoolean permissionsChanged = new AtomicBoolean(false);
         Versioned<InternalApplicationHolder> appHolder = apps.computeIf(appId,
-            v -> v != null && !Sets.symmetricDifference(v.permissions(), permissions).isEmpty(),
-            (k, v) -> {
-                permissionsChanged.set(true);
-                return new InternalApplicationHolder(v.app(), v.state(), ImmutableSet.copyOf(permissions));
-            });
+                v -> v != null && !Sets.symmetricDifference(v.permissions(), permissions).isEmpty(),
+                (k, v) -> {
+                    permissionsChanged.set(true);
+                    return new InternalApplicationHolder(v.app(), v.state(), ImmutableSet.copyOf(permissions));
+                });
         if (permissionsChanged.get()) {
             notifyDelegate(new ApplicationEvent(APP_PERMISSIONS_CHANGED, appHolder.value().app()));
         }
@@ -509,17 +509,18 @@
             ApplicationId appId = event.key();
             InternalApplicationHolder newApp = event.newValue() == null ? null : event.newValue().value();
             InternalApplicationHolder oldApp = event.oldValue() == null ? null : event.oldValue().value();
-            if (newApp == null || oldApp == null) {
+            if (event.type() == MapEvent.Type.UPDATE && (newApp == null || oldApp == null ||
+                    newApp.state() == oldApp.state())) {
+                log.warn("Can't update the application {}", event.key());
                 return;
             }
-            if (event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) {
-                if (event.type() == MapEvent.Type.UPDATE && newApp.state() == oldApp.state()) {
-                    return;
-                }
+            if ((event.type() == MapEvent.Type.INSERT || event.type() == MapEvent.Type.UPDATE) && newApp != null) {
                 setupApplicationAndNotify(appId, newApp.app(), newApp.state());
-            } else if (event.type() == MapEvent.Type.REMOVE) {
+            } else if (event.type() == MapEvent.Type.REMOVE && oldApp != null) {
                 notifyDelegate(new ApplicationEvent(APP_UNINSTALLED, oldApp.app()));
                 purgeApplication(appId.name());
+            } else {
+                log.warn("Can't perform {} on application {}", event.type(), event.key());
             }
         }
     }
@@ -583,22 +584,22 @@
                 continue;
             }
             clusterCommunicator.sendAndReceive(app.id().name(),
-                                               APP_BITS_REQUEST,
-                                               s -> s.getBytes(Charsets.UTF_8),
-                                               Function.identity(),
-                                               node.id())
+                    APP_BITS_REQUEST,
+                    s -> s.getBytes(Charsets.UTF_8),
+                    Function.identity(),
+                    node.id())
                     .whenCompleteAsync((bits, error) -> {
                         if (error == null && latch.getCount() > 0) {
                             saveApplication(new ByteArrayInputStream(bits));
                             log.info("Downloaded bits for application {} from node {}",
-                                     app.id().name(), node.id());
+                                    app.id().name(), node.id());
                             latch.countDown();
                             if (delegateInstallation) {
                                 notifyDelegate(new ApplicationEvent(APP_INSTALLED, app));
                             }
                         } else if (error != null) {
                             log.warn("Unable to fetch bits for application {} from node {}",
-                                     app.id().name(), node.id());
+                                    app.id().name(), node.id());
                         }
                     }, messageHandlingExecutor);
         }