Fixed an issue with out-of-order app activation in distributed context.

Change-Id: Ibaad5cec977f69c8ba077634ad9ff6f9a41ae2d7
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
index c65d1aa..c0b124c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
@@ -67,7 +67,6 @@
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -78,6 +77,7 @@
 import static com.google.common.collect.Multimaps.newSetMultimap;
 import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
 import static com.google.common.io.ByteStreams.toByteArray;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.onlab.util.Tools.groupedThreads;
@@ -95,9 +95,6 @@
 public class DistributedApplicationStore extends ApplicationArchive
         implements ApplicationStore {
 
-    // FIXME: eliminate the need for this
-    private static final int FIXME_ACTIVATION_DELAY = 500;
-
     private final Logger log = getLogger(getClass());
 
     private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
@@ -116,7 +113,7 @@
     }
 
     private ScheduledExecutorService executor;
-    private ExecutorService messageHandlingExecutor;
+    private ExecutorService messageHandlingExecutor, activationExecutor;
 
     private ConsistentMap<ApplicationId, InternalApplicationHolder> apps;
     private Topic<Application> appActivationTopic;
@@ -148,12 +145,13 @@
 
     @Activate
     public void activate() {
-        messageHandlingExecutor = Executors.newSingleThreadExecutor(
-                groupedThreads("onos/store/app", "message-handler", log));
+        messageHandlingExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app",
+                                                                         "message-handler", log));
         clusterCommunicator.addSubscriber(APP_BITS_REQUEST,
                                           bytes -> new String(bytes, Charsets.UTF_8),
                                           name -> {
                                               try {
+                                                  log.info("Sending bits for application {}", name);
                                                   return toByteArray(getApplicationInputStream(name));
                                               } catch (IOException e) {
                                                   throw new StorageException(e);
@@ -173,7 +171,9 @@
         appActivationTopic = storageService.getTopic("onos-apps-activation-topic",
                                                      Serializer.using(KryoNamespaces.API));
 
-        appActivationTopic.subscribe(appActivator, messageHandlingExecutor);
+        activationExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app",
+                                                                    "app-activation", log));
+        appActivationTopic.subscribe(appActivator, activationExecutor);
 
         executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
         statusChangeListener = status -> {
@@ -181,7 +181,7 @@
                 executor.execute(this::bootstrapExistingApplications);
             }
         };
-        apps.addListener(appsListener, messageHandlingExecutor);
+        apps.addListener(appsListener, activationExecutor);
         apps.addStatusChangeListener(statusChangeListener);
         coreAppId = getId(CoreService.CORE_APP_NAME);
         log.info("Started");
@@ -257,6 +257,7 @@
         apps.removeListener(appsListener);
         appActivationTopic.unsubscribe(appActivator);
         messageHandlingExecutor.shutdown();
+        activationExecutor.shutdown();
         executor.shutdown();
         log.info("Stopped");
     }
@@ -508,7 +509,7 @@
      */
     private void fetchBitsIfNeeded(Application app) {
         if (!appBitsAvailable(app)) {
-            fetchBits(app);
+            fetchBits(app, false);
         }
     }
 
@@ -517,15 +518,14 @@
      */
     private void installAppIfNeeded(Application app) {
         if (!appBitsAvailable(app)) {
-            fetchBits(app);
-            notifyDelegate(new ApplicationEvent(APP_INSTALLED, app));
+            fetchBits(app, true);
         }
     }
 
     /**
      * Fetches the bits from the cluster peers.
      */
-    private void fetchBits(Application app) {
+    private void fetchBits(Application app, boolean delegateInstallation) {
         ControllerNode localNode = clusterService.getLocalNode();
         CountDownLatch latch = new CountDownLatch(1);
 
@@ -550,6 +550,9 @@
                             log.info("Downloaded bits for application {} from node {}",
                                      app.id().name(), node.id());
                             latch.countDown();
+                            if (delegateInstallation) {
+                                notifyDelegate(new ApplicationEvent(APP_INSTALLED, app));
+                            }
                         } else if (error != null) {
                             log.warn("Unable to fetch bits for application {} from node {}",
                                      app.id().name(), node.id());