Removed deprecated ClusterCommunicationService APIs
MessagingService::sendAsync now returns a CompletableFuture<Void> in place of boolean
Change-Id: I98134c4c0ea65b9c7e9ba705eebd1669067324ef
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index c19dc59..9c63d84 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -20,7 +20,6 @@
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
@@ -136,32 +135,39 @@
}
@Override
- public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
+ public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
- sendAsync(ep, message);
+ return sendAsync(ep, message);
}
- protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
- if (ep.equals(localEp)) {
- dispatchLocally(message);
- return;
- }
- Channel channel = null;
+ protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
try {
- try {
- channel = channels.borrowObject(ep);
- channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- } finally {
- channels.returnObject(ep, channel);
+ if (ep.equals(localEp)) {
+ dispatchLocally(message);
+ future.complete(null);
+ } else {
+ Channel channel = null;
+ try {
+ channel = channels.borrowObject(ep);
+ channel.writeAndFlush(message).addListener(channelFuture -> {
+ if (!channelFuture.isSuccess()) {
+ future.completeExceptionally(channelFuture.cause());
+ } else {
+ future.complete(null);
+ }
+ });
+ } finally {
+ channels.returnObject(ep, channel);
+ }
}
- } catch (IOException e) {
- throw e;
} catch (Exception e) {
- throw new IOException(e);
+ future.completeExceptionally(e);
}
+ return future;
}
@Override
@@ -193,11 +199,11 @@
localEp,
REPLY_MESSAGE_TYPE,
responsePayload);
- try {
- sendAsync(message.sender(), response);
- } catch (IOException e) {
- log.debug("Failed to respond", e);
- }
+ sendAsync(message.sender(), response).whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to respond", error);
+ }
+ });
}
}));
}
@@ -206,17 +212,17 @@
public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
handlers.put(type, message -> {
handler.apply(message.payload()).whenComplete((result, error) -> {
- if (error == null) {
- InternalMessage response = new InternalMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- result);
- try {
- sendAsync(message.sender(), response);
- } catch (IOException e) {
- log.debug("Failed to respond", e);
+ if (error == null) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ result);
+ sendAsync(message.sender(), response).whenComplete((r, e) -> {
+ if (e != null) {
+ log.debug("Failed to respond", e);
+ }
+ });
}
- }
});
});
}