Revamped ClusterCommunicationService API

Change-Id: I9326369de3d2413b0882b324979d10483c093de9
diff --git a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
index 5c1fc33..7e7ec06 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/app/GossipApplicationStore.java
@@ -17,7 +17,6 @@
 
 import com.google.common.base.Charsets;
 import com.google.common.collect.ImmutableSet;
-import com.google.common.util.concurrent.ListenableFuture;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -54,13 +53,14 @@
 import org.slf4j.Logger;
 
 import java.io.ByteArrayInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
+
 import static com.google.common.io.ByteStreams.toByteArray;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.onlab.util.Tools.groupedThreads;
@@ -351,22 +351,34 @@
      */
     private void fetchBits(Application app) {
         ControllerNode localNode = clusterService.getLocalNode();
-        ClusterMessage message = new ClusterMessage(localNode.id(), APP_BITS_REQUEST,
-                                                    app.id().name().getBytes(Charsets.UTF_8));
-        //Map<ControllerNode, ListenableFuture<byte[]>> futures = new HashMap<>();
         CountDownLatch latch = new CountDownLatch(1);
 
         // FIXME: send message with name & version to make sure we don't get served old bits
 
         log.info("Downloading bits for application {}", app.id().name());
         for (ControllerNode node : clusterService.getNodes()) {
-            try {
-                ListenableFuture<byte[]> future = clusterCommunicator.sendAndReceive(message, node.id());
-                future.addListener(new InternalBitListener(app, node, future, latch), executor);
-            } catch (IOException e) {
-                log.debug("Unable to request bits for application {} from node {}",
-                          app.id().name(), node.id());
+            if (latch.getCount() == 0) {
+                break;
             }
+            if (node.equals(localNode)) {
+                continue;
+            }
+            clusterCommunicator.sendAndReceive(app.id().name(),
+                                    APP_BITS_REQUEST,
+                                    s -> s.getBytes(Charsets.UTF_8),
+                                    Function.identity(),
+                                    node.id())
+                               .whenCompleteAsync((bits, error) -> {
+                                   if (error == null && latch.getCount() > 0) {
+                                       saveApplication(new ByteArrayInputStream(bits));
+                                       log.info("Downloaded bits for application {} from node {}",
+                                               app.id().name(), node.id());
+                                       latch.countDown();
+                                   } else if (error != null) {
+                                       log.warn("Unable to fetch bits for application {} from node {}",
+                                               app.id().name(), node.id(), error);
+                                   }
+                               }, executor);
         }
 
         try {
@@ -392,41 +404,6 @@
             }
         }
     }
-
-    /**
-     * Processes completed fetch requests.
-     */
-    private class InternalBitListener implements Runnable {
-        private final Application app;
-        private final ControllerNode node;
-        private final ListenableFuture<byte[]> future;
-        private final CountDownLatch latch;
-
-        public InternalBitListener(Application app, ControllerNode node,
-                                   ListenableFuture<byte[]> future, CountDownLatch latch) {
-            this.app = app;
-            this.node = node;
-            this.future = future;
-            this.latch = latch;
-        }
-
-        @Override
-        public void run() {
-            if (latch.getCount() > 0 && !future.isCancelled()) {
-                try {
-                    byte[] bits = future.get(1, MILLISECONDS);
-                    saveApplication(new ByteArrayInputStream(bits));
-                    log.info("Downloaded bits for application {} from node {}",
-                             app.id().name(), node.id());
-                    latch.countDown();
-                } catch (Exception e) {
-                    log.warn("Unable to fetch bits for application {} from node {}",
-                             app.id().name(), node.id());
-                }
-            }
-        }
-    }
-
     /**
      * Prunes applications which are not in the map, but are on disk.
      */
@@ -449,6 +426,4 @@
                                       appDesc.origin(), appDesc.permissions(),
                                       appDesc.featuresRepo(), appDesc.features());
     }
-
 }
-