Revamped ClusterCommunicationService API
Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 5c1fc33..7e7ec06 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -17,7 +17,6 @@
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -54,13 +53,14 @@
import org.slf4j.Logger;
import java.io.ByteArrayInputStream;
-import java.io.IOException;
import java.io.InputStream;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+
import static com.google.common.io.ByteStreams.toByteArray;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onlab.util.Tools.groupedThreads;
@@ -351,22 +351,34 @@
*/
private void fetchBits(Application app) {
ControllerNode localNode = clusterService.getLocalNode();
- ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
- app.id().name().getBytes(Charsets.UTF_8));
- //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
CountDownLatch latch = new CountDownLatch(1);
// FIXME: send message with name & version to make sure we don't get served old bits
log.info("Downloading bits for application {}", app.id().name());
for (ControllerNode node : clusterService.getNodes()) {
- try {
- ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
- future.addListener(new InternalBitListener(app, node, future, latch), executor);
- } catch (IOException e) {
- log.debug("Unable to request bits for application {} from node {}",
- app.id().name(), node.id());
+ if (latch.getCount() == 0) {
+ break;
}
+ if (node.equals(localNode)) {
+ continue;
+ }
+ clusterCommunicator.sendAndReceive(app.id().name(),
+ APP_BITS_REQUEST,
+ s -> s.getBytes(Charsets.UTF_8),
+ Function.identity(),
+ node.id())
+ .whenCompleteAsync((bits, error) -> {
+ if (error == null && latch.getCount() > 0) {
+ saveApplication(new ByteArrayInputStream(bits));
+ log.info("Downloaded bits for application {} from node {}",
+ app.id().name(), node.id());
+ latch.countDown();
+ } else if (error != null) {
+ log.warn("Unable to fetch bits for application {} from node {}",
+ app.id().name(), node.id(), error);
+ }
+ }, executor);
}
try {
@@ -392,41 +404,6 @@
}
}
}
-
- /**
- * Processes completed fetch requests.
- */
- private class InternalBitListener implements Runnable {
- private final Application app;
- private final ControllerNode node;
- private final ListenableFuture<byte[]> future;
- private final CountDownLatch latch;
-
- public InternalBitListener(Application app, ControllerNode node,
- ListenableFuture<byte[]> future, CountDownLatch latch) {
- this.app = app;
- this.node = node;
- this.future = future;
- this.latch = latch;
- }
-
- @Override
- public void run() {
- if (latch.getCount() > 0 && !future.isCancelled()) {
- try {
- byte[] bits = future.get(1, MILLISECONDS);
- saveApplication(new ByteArrayInputStream(bits));
- log.info("Downloaded bits for application {} from node {}",
- app.id().name(), node.id());
- latch.countDown();
- } catch (Exception e) {
- log.warn("Unable to fetch bits for application {} from node {}",
- app.id().name(), node.id());
- }
- }
- }
- }
-
/**
* Prunes applications which are not in the map, but are on disk.
*/
@@ -449,6 +426,4 @@
appDesc.origin(), appDesc.permissions(),
appDesc.featuresRepo(), appDesc.features());
}
-
}
-