Removed deprecated ClusterCommunicationService APIs
MessagingService::sendAsync now returns a CompletableFuture<Void> in place of boolean

Change-Id: I98134c4c0ea65b9c7e9ba705eebd1669067324ef
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index eb36202..f547a47 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -24,61 +24,12 @@
 
 import org.onosproject.cluster.NodeId;
 
-import com.google.common.util.concurrent.ListenableFuture;
-
 /**
  * Service for assisting communications between controller cluster nodes.
  */
 public interface ClusterCommunicationService {
 
     /**
-     * Broadcast a message to all controller nodes.
-     *
-     * @param message  message to send
-     * @return true if the message was sent successfully to all nodes; false otherwise.
-     */
-    @Deprecated
-    boolean broadcast(ClusterMessage message);
-
-    /**
-     * Broadcast a message to all controller nodes including self.
-     *
-     * @param message  message to send
-     * @return true if the message was sent successfully to all nodes; false otherwise.
-     */
-    @Deprecated
-    boolean broadcastIncludeSelf(ClusterMessage message);
-
-    /**
-     * Sends a message to the specified controller node.
-     *
-     * @param message  message to send
-     * @param toNodeId node identifier
-     * @return true if the message was sent successfully; false otherwise.
-     */
-    @Deprecated
-    boolean unicast(ClusterMessage message, NodeId toNodeId);
-
-    /**
-     * Multicast a message to a set of controller nodes.
-     *
-     * @param message  message to send
-     * @param nodeIds  recipient node identifiers
-     * @return true if the message was sent successfully to all nodes in the group; false otherwise.
-     */
-    @Deprecated
-    boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds);
-
-    /**
-     * Sends a message synchronously.
-     * @param message message to send
-     * @param toNodeId recipient node identifier
-     * @return reply future.
-     */
-    @Deprecated
-    ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId);
-
-    /**
      * Adds a new subscriber for the specified message subject.
      *
      * @param subject    message subject
@@ -120,9 +71,9 @@
      * @param encoder function for encoding message to byte[]
      * @param toNodeId destination node identifier
      * @param <M> message type
-     * @return true if the message was sent successfully; false otherwise
+     * @return future that is completed when the message is sent
      */
-    <M> boolean unicast(M message,
+    <M> CompletableFuture<Void> unicast(M message,
                         MessageSubject subject,
                         Function<M, byte[]> encoder,
                         NodeId toNodeId);
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
index 09f15f8..6ccd483 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.store.cluster.messaging;
 
-import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.function.Consumer;
@@ -32,9 +31,9 @@
      * @param ep end point to send the message to.
      * @param type type of message.
      * @param payload message payload bytes.
-     * @throws IOException when I/O exception of some sort has occurred
+     * @return future that is completed when the message is sent
      */
-    void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
+    CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload);
 
     /**
      * Sends a message synchronously and waits for a response.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index ec297e2..859efeb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -42,7 +42,6 @@
 import org.onosproject.store.serializers.KryoSerializer;
 import org.slf4j.Logger;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -237,11 +236,11 @@
 
     private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
         Endpoint remoteEp = new Endpoint(peer.ip(), peer.tcpPort());
-        try {
-            messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
-        } catch (IOException e) {
-            log.trace("Sending heartbeat to {} failed", remoteEp, e);
-        }
+        messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload).whenComplete((result, error) -> {
+            if (error != null) {
+                log.trace("Sending heartbeat to {} failed", remoteEp, error);
+            }
+        });
     }
 
     private class HeartbeatMessageHandler implements Consumer<byte[]> {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 21b0919..8a237ef 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -35,10 +35,6 @@
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Objects;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
-import java.io.IOException;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -62,8 +58,11 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MessagingService messagingService;
 
+    private NodeId localNodeId;
+
     @Activate
     public void activate() {
+        localNodeId = clusterService.getLocalNode().id();
         log.info("Started");
     }
 
@@ -73,60 +72,6 @@
     }
 
     @Override
-    public boolean broadcast(ClusterMessage message) {
-        boolean ok = true;
-        final ControllerNode localNode = clusterService.getLocalNode();
-        byte[] payload = message.getBytes();
-        for (ControllerNode node : clusterService.getNodes()) {
-            if (!node.equals(localNode)) {
-                ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
-            }
-        }
-        return ok;
-    }
-
-    @Override
-    public boolean broadcastIncludeSelf(ClusterMessage message) {
-        boolean ok = true;
-        byte[] payload = message.getBytes();
-        for (ControllerNode node : clusterService.getNodes()) {
-            ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
-        }
-        return ok;
-    }
-
-    @Override
-    public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
-        boolean ok = true;
-        final ControllerNode localNode = clusterService.getLocalNode();
-        byte[] payload = message.getBytes();
-        for (NodeId nodeId : nodes) {
-            if (!nodeId.equals(localNode.id())) {
-                ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
-            }
-        }
-        return ok;
-    }
-
-    @Override
-    public boolean unicast(ClusterMessage message, NodeId toNodeId) {
-        return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
-    }
-
-    @Override
-    public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
-        SettableFuture<byte[]> response = SettableFuture.create();
-        sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
-            if (e == null) {
-                response.set(r);
-            } else {
-                response.setException(e);
-            }
-        });
-        return response;
-    }
-
-    @Override
     public <M> void broadcast(M message,
                               MessageSubject subject,
                               Function<M, byte[]> encoder) {
@@ -154,15 +99,19 @@
     }
 
     @Override
-    public <M> boolean unicast(M message,
-                               MessageSubject subject,
-                               Function<M, byte[]> encoder,
-                               NodeId toNodeId) {
-        byte[] payload = new ClusterMessage(
-                clusterService.getLocalNode().id(),
-                subject,
-                encoder.apply(message)).getBytes();
-        return unicastUnchecked(subject, payload, toNodeId);
+    public <M> CompletableFuture<Void> unicast(M message,
+                                               MessageSubject subject,
+                                               Function<M, byte[]> encoder,
+                                               NodeId toNodeId) {
+        try {
+            byte[] payload = new ClusterMessage(
+                    localNodeId,
+                    subject,
+                    encoder.apply(message)).getBytes();
+            return doUnicast(subject, payload, toNodeId);
+        } catch (Exception e) {
+            return Tools.exceptionalFuture(e);
+        }
     }
 
     @Override
@@ -171,10 +120,10 @@
                               Function<M, byte[]> encoder,
                               Set<NodeId> nodes) {
         byte[] payload = new ClusterMessage(
-                clusterService.getLocalNode().id(),
+                localNodeId,
                 subject,
                 encoder.apply(message)).getBytes();
-        nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
+        nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
     }
 
     @Override
@@ -194,17 +143,11 @@
         }
     }
 
-    private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+    private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
-        try {
-            messagingService.sendAsync(nodeEp, subject.value(), payload);
-            return true;
-        } catch (IOException e) {
-            log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
-            return false;
-        }
+        return messagingService.sendAsync(nodeEp, subject.value(), payload);
     }
 
     private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
index 23b219b..823f658 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
@@ -66,27 +66,27 @@
     }
 
     @Override
-    public EventuallyConsistentMapBuilder withName(String name) {
+    public EventuallyConsistentMapBuilder<K, V> withName(String name) {
         this.name = checkNotNull(name);
         return this;
     }
 
     @Override
-    public EventuallyConsistentMapBuilder withSerializer(
+    public EventuallyConsistentMapBuilder<K, V> withSerializer(
             KryoNamespace.Builder serializerBuilder) {
         this.serializerBuilder = checkNotNull(serializerBuilder);
         return this;
     }
 
     @Override
-    public EventuallyConsistentMapBuilder withClockService(
+    public EventuallyConsistentMapBuilder<K, V> withClockService(
             ClockService<K, V> clockService) {
         this.clockService = checkNotNull(clockService);
         return this;
     }
 
     @Override
-    public EventuallyConsistentMapBuilder withEventExecutor(ExecutorService executor) {
+    public EventuallyConsistentMapBuilder<K, V> withEventExecutor(ExecutorService executor) {
         this.eventExecutor = checkNotNull(executor);
         return this;
     }
@@ -99,13 +99,13 @@
     }
 
     @Override
-    public EventuallyConsistentMapBuilder withBackgroundExecutor(ScheduledExecutorService executor) {
+    public EventuallyConsistentMapBuilder<K, V> withBackgroundExecutor(ScheduledExecutorService executor) {
         this.backgroundExecutor = checkNotNull(executor);
         return this;
     }
 
     @Override
-    public EventuallyConsistentMapBuilder withPeerUpdateFunction(
+    public EventuallyConsistentMapBuilder<K, V> withPeerUpdateFunction(
             BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
         this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
         return this;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index f03215d..18ecefe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -509,12 +509,6 @@
         );
     }
 
-    private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
-        return clusterCommunicator.unicast(event, subject, serializer::encode, peer);
-        // Note: we had this flipped before...
-//        communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
-    }
-
     private boolean underHighLoad() {
         return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
     }
@@ -556,10 +550,14 @@
                 }
 
                 AntiEntropyAdvertisement<K> ad = createAdvertisement();
+                NodeId destination = peer;
+                clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
+                                   .whenComplete((result, error) -> {
+                                       if (error != null) {
+                                           log.debug("Failed to send anti-entropy advertisement to {}", destination);
+                                       }
+                                   });
 
-                if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
-                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
-                }
             } catch (Exception e) {
                 // Catch all exceptions to avoid scheduled task being suppressed.
                 log.error("Exception thrown while sending advertisement", e);
@@ -595,9 +593,14 @@
                     // Send the advertisement back if this peer is out-of-sync
                     final NodeId sender = ad.sender();
                     AntiEntropyAdvertisement<K> myAd = createAdvertisement();
-                    if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
-                        log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
-                    }
+
+                    clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
+                                       .whenComplete((result, error) -> {
+                                           if (error != null) {
+                                               log.debug("Failed to send reactive "
+                                                       + "anti-entropy advertisement to {}", sender);
+                                           }
+                                       });
                     break;
                 }
             }
@@ -801,11 +804,15 @@
                   )
             );
             communicationExecutor.submit(() -> {
-                try {
-                    unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
-                } catch (Exception e) {
-                    log.warn("broadcast error", e);
-                }
+                clusterCommunicator.unicast(Lists.newArrayList(map.values()),
+                                            updateMessageSubject,
+                                            serializer::encode,
+                                            peer)
+                                   .whenComplete((result, error) -> {
+                                       if (error != null) {
+                                           log.debug("Failed to send to {}", peer);
+                                       }
+                                   });
             });
         }
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index 35d004e..3b82655 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -407,21 +407,22 @@
         log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
                   master, deviceId);
 
-        if (!clusterCommunicator.unicast(operation,
-                                         APPLY_BATCH_FLOWS,
-                                         SERIALIZER::encode,
-                                         master)) {
-            log.warn("Failed to storeBatch: {} to {}", operation, master);
+        clusterCommunicator.unicast(operation,
+                                    APPLY_BATCH_FLOWS,
+                                    SERIALIZER::encode,
+                                    master)
+                           .whenComplete((result, error) -> {
+                               log.warn("Failed to storeBatch: {} to {}", operation, master);
 
-            Set<FlowRule> allFailures = operation.getOperations().stream()
-                    .map(op -> op.target())
-                    .collect(Collectors.toSet());
+                               Set<FlowRule> allFailures = operation.getOperations()
+                                                                    .stream()
+                                                                    .map(op -> op.target())
+                                                                    .collect(Collectors.toSet());
 
-            notifyDelegate(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(false, allFailures, deviceId)));
-            return;
-        }
+                               notifyDelegate(FlowRuleBatchEvent.completed(
+                                       new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                                       new CompletedBatchOperation(false, allFailures, deviceId)));
+                           });
     }
 
     private void storeBatchInternal(FlowRuleBatchOperation operation) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 3ebbf78..e91031a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -395,8 +395,7 @@
         }
 
         // Check if group to be created by a remote instance
-        if (mastershipService.getLocalRole(
-                     groupDesc.deviceId()) != MastershipRole.MASTER) {
+        if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
             log.debug("storeGroupDescription: Device {} local role is not MASTER",
                       groupDesc.deviceId());
             if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
@@ -410,19 +409,22 @@
                     createGroupAddRequestMsg(groupDesc.deviceId(),
                                              groupDesc);
 
-            if (!clusterCommunicator.unicast(groupOp,
+            clusterCommunicator.unicast(groupOp,
                     GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
                     m -> kryoBuilder.build().serialize(m),
-                    mastershipService.getMasterFor(groupDesc.deviceId()))) {
-                log.warn("Failed to send request to master: {} to {}",
-                         groupOp,
-                         mastershipService.getMasterFor(groupDesc.deviceId()));
-                //TODO: Send Group operation failure event
-                return;
-            }
-            log.debug("Sent Group operation request for device {} to remote MASTER {}",
-                      groupDesc.deviceId(),
-                      mastershipService.getMasterFor(groupDesc.deviceId()));
+                    mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
+                        if (error != null) {
+                            log.warn("Failed to send request to master: {} to {}",
+                                    groupOp,
+                                    mastershipService.getMasterFor(groupDesc.deviceId()));
+                            //TODO: Send Group operation failure event
+                        } else {
+                            log.debug("Sent Group operation request for device {} "
+                                    + "to remote MASTER {}",
+                                    groupDesc.deviceId(),
+                                    mastershipService.getMasterFor(groupDesc.deviceId()));
+                        }
+                    });
             return;
         }
 
@@ -512,15 +514,17 @@
                                                 newBuckets,
                                                 newAppCookie);
 
-            if (!clusterCommunicator.unicast(groupOp,
-                        GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
-                        m -> kryoBuilder.build().serialize(m),
-                        mastershipService.getMasterFor(deviceId))) {
-                log.warn("Failed to send request to master: {} to {}",
-                         groupOp,
-                         mastershipService.getMasterFor(deviceId));
-                //TODO: Send Group operation failure event
-            }
+            clusterCommunicator.unicast(groupOp,
+                    GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+                    m -> kryoBuilder.build().serialize(m),
+                    mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
+                        if (error !=  null) {
+                            log.warn("Failed to send request to master: {} to {}",
+                                    groupOp,
+                                    mastershipService.getMasterFor(deviceId), error);
+                        }
+                        //TODO: Send Group operation failure event
+                    });
             return;
         }
         log.debug("updateGroupDescription for device {} is getting handled locally",
@@ -643,15 +647,17 @@
                     createGroupDeleteRequestMsg(deviceId,
                                                 appCookie);
 
-            if (!clusterCommunicator.unicast(groupOp,
+            clusterCommunicator.unicast(groupOp,
                     GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
                     m -> kryoBuilder.build().serialize(m),
-                    mastershipService.getMasterFor(deviceId))) {
-                log.warn("Failed to send request to master: {} to {}",
-                         groupOp,
-                         mastershipService.getMasterFor(deviceId));
-                //TODO: Send Group operation failure event
-            }
+                    mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
+                        if (error != null) {
+                            log.warn("Failed to send request to master: {} to {}",
+                                    groupOp,
+                                    mastershipService.getMasterFor(deviceId), error);
+                        }
+                        //TODO: Send Group operation failure event
+                    });
             return;
         }
         log.debug("deleteGroupDescription in device {} is getting handled locally",
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 1b41d0c..77db68c 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -18,7 +18,6 @@
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.MoreExecutors;
 
 import org.junit.After;
@@ -145,7 +144,7 @@
                 .register(KryoNamespaces.API)
                 .register(TestTimestamp.class);
 
-        ecMap = new EventuallyConsistentMapBuilderImpl<>(
+        ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
                         clusterService, clusterCommunicator)
                 .withName(MAP_NAME)
                 .withSerializer(serializer)
@@ -702,7 +701,7 @@
                     anyObject(MessageSubject.class),
                     anyObject(Function.class),
                     anyObject(NodeId.class)))
-                .andReturn(true)
+                .andReturn(CompletableFuture.completedFuture(null))
                 .anyTimes();
         replay(clusterCommunicator);
     }
@@ -761,9 +760,9 @@
         }
 
         @Override
-        public <M> boolean unicast(M message, MessageSubject subject,
+        public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
                 Function<M, byte[]> encoder, NodeId toNodeId) {
-            return false;
+            return null;
         }
 
         @Override
@@ -795,33 +794,6 @@
                 Function<byte[], M> decoder, Consumer<M> handler,
                 Executor executor) {
         }
-
-        @Override
-        public boolean broadcast(ClusterMessage message) {
-            return false;
-        }
-
-        @Override
-        public boolean broadcastIncludeSelf(ClusterMessage message) {
-            return false;
-        }
-
-        @Override
-        public boolean unicast(ClusterMessage message, NodeId toNodeId) {
-            return false;
-        }
-
-        @Override
-        public boolean multicast(ClusterMessage message,
-                Iterable<NodeId> nodeIds) {
-            return false;
-        }
-
-        @Override
-        public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
-                NodeId toNodeId) {
-            return null;
-        }
     }
 
     /**
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index c19dc59..9c63d84 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -20,7 +20,6 @@
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
@@ -136,32 +135,39 @@
     }
 
     @Override
-    public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
+    public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
         InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
                                                       localEp,
                                                       type,
                                                       payload);
-        sendAsync(ep, message);
+        return sendAsync(ep, message);
     }
 
-    protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
-        if (ep.equals(localEp)) {
-            dispatchLocally(message);
-            return;
-        }
-        Channel channel = null;
+    protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
         try {
-            try {
-                channel = channels.borrowObject(ep);
-                channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
-            } finally {
-                channels.returnObject(ep, channel);
+            if (ep.equals(localEp)) {
+                dispatchLocally(message);
+                future.complete(null);
+            } else {
+                Channel channel = null;
+                try {
+                    channel = channels.borrowObject(ep);
+                    channel.writeAndFlush(message).addListener(channelFuture -> {
+                        if (!channelFuture.isSuccess()) {
+                            future.completeExceptionally(channelFuture.cause());
+                        } else {
+                            future.complete(null);
+                        }
+                    });
+                } finally {
+                    channels.returnObject(ep, channel);
+                }
             }
-        } catch (IOException e) {
-            throw e;
         } catch (Exception e) {
-            throw new IOException(e);
+            future.completeExceptionally(e);
         }
+        return future;
     }
 
     @Override
@@ -193,11 +199,11 @@
                         localEp,
                         REPLY_MESSAGE_TYPE,
                         responsePayload);
-                try {
-                    sendAsync(message.sender(), response);
-                } catch (IOException e) {
-                    log.debug("Failed to respond", e);
-                }
+                sendAsync(message.sender(), response).whenComplete((result, error) -> {
+                    if (error != null) {
+                        log.debug("Failed to respond", error);
+                    }
+                });
             }
         }));
     }
@@ -206,17 +212,17 @@
     public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
         handlers.put(type, message -> {
             handler.apply(message.payload()).whenComplete((result, error) -> {
-            if (error == null) {
-                InternalMessage response = new InternalMessage(message.id(),
-                        localEp,
-                        REPLY_MESSAGE_TYPE,
-                        result);
-                try {
-                    sendAsync(message.sender(), response);
-                } catch (IOException e) {
-                    log.debug("Failed to respond", e);
+                if (error == null) {
+                    InternalMessage response = new InternalMessage(message.id(),
+                            localEp,
+                            REPLY_MESSAGE_TYPE,
+                            result);
+                    sendAsync(message.sender(), response).whenComplete((r, e) -> {
+                        if (e != null) {
+                            log.debug("Failed to respond", e);
+                        }
+                    });
                 }
-            }
             });
         });
     }
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
index 37a6535..c195d16 100644
--- a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
+++ b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
@@ -138,29 +138,30 @@
 
 
     @Override
-    public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
+    public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
         DefaultMessage message = new DefaultMessage(
                 messageIdGenerator.incrementAndGet(),
                 localEp,
                 type,
                 payload);
-        sendAsync(ep, message);
+        return sendAsync(ep, message);
     }
 
-    protected void sendAsync(Endpoint ep, DefaultMessage message) throws IOException {
+    protected CompletableFuture<Void> sendAsync(Endpoint ep, DefaultMessage message) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
         if (ep.equals(localEp)) {
             dispatchLocally(message);
-            return;
+            future.complete(null);
+            return future;
         }
 
         DefaultMessageStream stream = null;
         try {
             stream = streams.borrowObject(ep);
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
-        try {
             stream.write(message);
+            future.complete(null);
+        } catch (Exception e) {
+            future.completeExceptionally(e);
         } finally {
             try {
                 streams.returnObject(ep, stream);
@@ -168,6 +169,7 @@
                 log.warn("Failed to return stream to pool");
             }
         }
+        return future;
     }
 
     @Override
@@ -202,30 +204,30 @@
                         localEp,
                         REPLY_MESSAGE_TYPE,
                         responsePayload);
-                try {
-                    sendAsync(message.sender(), response);
-                } catch (IOException e) {
-                    log.debug("Failed to respond", e);
-                }
+                sendAsync(message.sender(), response).whenComplete((result, error) -> {
+                    log.debug("Failed to respond", error);
+                });
             }
         }));
     }
 
     @Override
     public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
-        handlers.put(type, message -> handler.apply(message.payload()).whenComplete((result, error) -> {
-            if (error == null) {
-                DefaultMessage response = new DefaultMessage(message.id(),
+        handlers.put(type, message -> {
+            handler.apply(message.payload()).whenComplete((result, error) -> {
+                if (error == null) {
+                    DefaultMessage response = new DefaultMessage(message.id(),
                         localEp,
                         REPLY_MESSAGE_TYPE,
                         result);
-                try {
-                    sendAsync(message.sender(), response);
-                } catch (IOException e) {
-                    log.debug("Failed to respond", e);
+                    sendAsync(message.sender(), response).whenComplete((r, e) -> {
+                        if (e != null) {
+                            log.debug("Failed to respond", e);
+                        }
+                    });
                 }
-            }
-        }));
+            });
+        });
     }
 
     @Override