Fixed an issue with out-of-order app activation in distributed context.
Change-Id: Ibaad5cec977f69c8ba077634ad9ff6f9a41ae2d7
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
index c65d1aa..c0b124c 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/DistributedApplicationStore.java
@@ -67,7 +67,6 @@
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -78,6 +77,7 @@
import static com.google.common.collect.Multimaps.newSetMultimap;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static com.google.common.io.ByteStreams.toByteArray;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
@@ -95,9 +95,6 @@
public class DistributedApplicationStore extends ApplicationArchive
implements ApplicationStore {
- // FIXME: eliminate the need for this
- private static final int FIXME_ACTIVATION_DELAY = 500;
-
private final Logger log = getLogger(getClass());
private static final MessageSubject APP_BITS_REQUEST = new MessageSubject("app-bits-request");
@@ -116,7 +113,7 @@
}
private ScheduledExecutorService executor;
- private ExecutorService messageHandlingExecutor;
+ private ExecutorService messageHandlingExecutor, activationExecutor;
private ConsistentMap<ApplicationId, InternalApplicationHolder> apps;
private Topic<Application> appActivationTopic;
@@ -148,12 +145,13 @@
@Activate
public void activate() {
- messageHandlingExecutor = Executors.newSingleThreadExecutor(
- groupedThreads("onos/store/app", "message-handler", log));
+ messageHandlingExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app",
+ "message-handler", log));
clusterCommunicator.addSubscriber(APP_BITS_REQUEST,
bytes -> new String(bytes, Charsets.UTF_8),
name -> {
try {
+ log.info("Sending bits for application {}", name);
return toByteArray(getApplicationInputStream(name));
} catch (IOException e) {
throw new StorageException(e);
@@ -173,7 +171,9 @@
appActivationTopic = storageService.getTopic("onos-apps-activation-topic",
Serializer.using(KryoNamespaces.API));
- appActivationTopic.subscribe(appActivator, messageHandlingExecutor);
+ activationExecutor = newSingleThreadExecutor(groupedThreads("onos/store/app",
+ "app-activation", log));
+ appActivationTopic.subscribe(appActivator, activationExecutor);
executor = newSingleThreadScheduledExecutor(groupedThreads("onos/app", "store", log));
statusChangeListener = status -> {
@@ -181,7 +181,7 @@
executor.execute(this::bootstrapExistingApplications);
}
};
- apps.addListener(appsListener, messageHandlingExecutor);
+ apps.addListener(appsListener, activationExecutor);
apps.addStatusChangeListener(statusChangeListener);
coreAppId = getId(CoreService.CORE_APP_NAME);
log.info("Started");
@@ -257,6 +257,7 @@
apps.removeListener(appsListener);
appActivationTopic.unsubscribe(appActivator);
messageHandlingExecutor.shutdown();
+ activationExecutor.shutdown();
executor.shutdown();
log.info("Stopped");
}
@@ -508,7 +509,7 @@
*/
private void fetchBitsIfNeeded(Application app) {
if (!appBitsAvailable(app)) {
- fetchBits(app);
+ fetchBits(app, false);
}
}
@@ -517,15 +518,14 @@
*/
private void installAppIfNeeded(Application app) {
if (!appBitsAvailable(app)) {
- fetchBits(app);
- notifyDelegate(new ApplicationEvent(APP_INSTALLED, app));
+ fetchBits(app, true);
}
}
/**
* Fetches the bits from the cluster peers.
*/
- private void fetchBits(Application app) {
+ private void fetchBits(Application app, boolean delegateInstallation) {
ControllerNode localNode = clusterService.getLocalNode();
CountDownLatch latch = new CountDownLatch(1);
@@ -550,6 +550,9 @@
log.info("Downloaded bits for application {} from node {}",
app.id().name(), node.id());
latch.countDown();
+ if (delegateInstallation) {
+ notifyDelegate(new ApplicationEvent(APP_INSTALLED, app));
+ }
} else if (error != null) {
log.warn("Unable to fetch bits for application {} from node {}",
app.id().name(), node.id());