ApplicationStore to use Topic instead of AtomicValue for app activation notifications

Change-Id: I25cf6d1744969d0b0dfd0557ec1dd163ad3148d0
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
index 9f9cb2d..f78b30b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
@@ -48,15 +48,13 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.AtomicValue;
-import org.onosproject.store.service.AtomicValueEvent;
-import org.onosproject.store.service.AtomicValueEventListener;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageException;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Topic;
 import org.onosproject.store.service.Versioned;
 import org.onosproject.store.service.DistributedPrimitive.Status;
 import org.slf4j.Logger;
@@ -121,7 +119,7 @@
     private ExecutorService messageHandlingExecutor;
 
     private ConsistentMap<ApplicationId, InternalApplicationHolder> apps;
-    private AtomicValue<Application> nextAppToActivate;
+    private Topic<Application> appActivationTopic;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterCommunicationService clusterCommunicator;
@@ -136,7 +134,7 @@
     protected ApplicationIdStore idStore;
 
     private final InternalAppsListener appsListener = new InternalAppsListener();
-    private final NextAppToActivateValueListener nextAppToActivateListener = new NextAppToActivateValueListener();
+    private final Consumer<Application> appActivator = new AppActivator();
 
     private Consumer<Status> statusChangeListener;
 
@@ -172,13 +170,10 @@
                                                  InternalState.class))
                 .build();
 
-        nextAppToActivate = storageService.<Application>atomicValueBuilder()
-                .withName("onos-apps-activation-order-value")
-                .withSerializer(Serializer.using(KryoNamespaces.API))
-                .build()
-                .asAtomicValue();
+        appActivationTopic = storageService.getTopic("onos-apps-activation-topic",
+                                                     Serializer.using(KryoNamespaces.API));
 
-        nextAppToActivate.addListener(nextAppToActivateListener);
+        appActivationTopic.subscribe(appActivator, messageHandlingExecutor);
 
         executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
         statusChangeListener = status -> {
@@ -260,7 +255,7 @@
         clusterCommunicator.removeSubscriber(APP_BITS_REQUEST);
         apps.removeStatusChangeListener(statusChangeListener);
         apps.removeListener(appsListener);
-        nextAppToActivate.removeListener(nextAppToActivateListener);
+        appActivationTopic.unsubscribe(appActivator);
         messageHandlingExecutor.shutdown();
         executor.shutdown();
         log.info("Stopped");
@@ -362,7 +357,7 @@
             apps.computeIf(appId, v -> v != null && v.state() != ACTIVATED,
                     (k, v) -> new InternalApplicationHolder(
                             v.app(), ACTIVATED, v.permissions()));
-            nextAppToActivate.set(vAppHolder.value().app());
+            appActivationTopic.publish(vAppHolder.value().app());
         }
     }
 
@@ -435,17 +430,13 @@
         }
     }
 
-    private class NextAppToActivateValueListener implements AtomicValueEventListener<Application> {
-
+    private class AppActivator implements Consumer<Application> {
         @Override
-        public void event(AtomicValueEvent<Application> event) {
-            messageHandlingExecutor.execute(() -> {
-                Application app = event.newValue();
-                String appName = app.id().name();
-                installAppIfNeeded(app);
-                setActive(appName);
-                delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
-            });
+        public void accept(Application app) {
+            String appName = app.id().name();
+            installAppIfNeeded(app);
+            setActive(appName);
+            delegate.notify(new ApplicationEvent(APP_ACTIVATED, app));
         }
     }