Removed deprecated ClusterCommunicationService APIs
MessagingService::sendAsync now returns a CompletableFuture<Void> in place of boolean

Change-Id: I98134c4c0ea65b9c7e9ba705eebd1669067324ef
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 21b0919..8a237ef 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -35,10 +35,6 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
-import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -62,8 +58,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MessagingService messagingService;
 
+    private NodeId localNodeId;
+
     @Activate
     public void activate() {
+        localNodeId = clusterService.getLocalNode().id();
         log.info("Started");
     }
 
@@ -73,60 +72,6 @@
     }
 
     @Override
-    public boolean broadcast(ClusterMessage message) {
-        boolean ok = true;
-        final ControllerNode localNode = clusterService.getLocalNode();
-        byte[] payload = message.getBytes();
-        for (ControllerNode node : clusterService.getNodes()) {
-            if (!node.equals(localNode)) {
-                ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
-            }
-        }
-        return ok;
-    }
-
-    @Override
-    public boolean broadcastIncludeSelf(ClusterMessage message) {
-        boolean ok = true;
-        byte[] payload = message.getBytes();
-        for (ControllerNode node : clusterService.getNodes()) {
-            ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
-        }
-        return ok;
-    }
-
-    @Override
-    public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
-        boolean ok = true;
-        final ControllerNode localNode = clusterService.getLocalNode();
-        byte[] payload = message.getBytes();
-        for (NodeId nodeId : nodes) {
-            if (!nodeId.equals(localNode.id())) {
-                ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
-            }
-        }
-        return ok;
-    }
-
-    @Override
-    public boolean unicast(ClusterMessage message, NodeId toNodeId) {
-        return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
-    }
-
-    @Override
-    public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
-        SettableFuture<byte[]> response = SettableFuture.create();
-        sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
-            if (e == null) {
-                response.set(r);
-            } else {
-                response.setException(e);
-            }
-        });
-        return response;
-    }
-
-    @Override
     public <M> void broadcast(M message,
                               MessageSubject subject,
                               Function<M, byte[]> encoder) {
@@ -154,15 +99,19 @@
     }
 
     @Override
-    public <M> boolean unicast(M message,
-                               MessageSubject subject,
-                               Function<M, byte[]> encoder,
-                               NodeId toNodeId) {
-        byte[] payload = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                encoder.apply(message)).getBytes();
-        return unicastUnchecked(subject, payload, toNodeId);
+    public <M> CompletableFuture<Void> unicast(M message,
+                                               MessageSubject subject,
+                                               Function<M, byte[]> encoder,
+                                               NodeId toNodeId) {
+        try {
+            byte[] payload = new ClusterMessage(
+                    localNodeId,
+                    subject,
+                    encoder.apply(message)).getBytes();
+            return doUnicast(subject, payload, toNodeId);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
     }
 
     @Override
@@ -171,10 +120,10 @@
                               Function<M, byte[]> encoder,
                               Set<NodeId> nodes) {
         byte[] payload = new ClusterMessage(
-                clusterService.getLocalNode().id(),
+                localNodeId,
                 subject,
                 encoder.apply(message)).getBytes();
-        nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
+        nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
     }
 
     @Override
@@ -194,17 +143,11 @@
         }
     }
 
-    private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+    private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
-        try {
-            messagingService.sendAsync(nodeEp, subject.value(), payload);
-            return true;
-        } catch (IOException e) {
-            log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
-            return false;
-        }
+        return messagingService.sendAsync(nodeEp, subject.value(), payload);
     }
 
     private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {