Removed deprecated ClusterCommunicationService APIs
MessagingService::sendAsync now returns a CompletableFuture<Void> in place of boolean
Change-Id: I98134c4c0ea65b9c7e9ba705eebd1669067324ef
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
index eb36202..f547a47 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterCommunicationService.java
@@ -24,61 +24,12 @@
import org.onosproject.cluster.NodeId;
-import com.google.common.util.concurrent.ListenableFuture;
-
/**
* Service for assisting communications between controller cluster nodes.
*/
public interface ClusterCommunicationService {
/**
- * Broadcast a message to all controller nodes.
- *
- * @param message message to send
- * @return true if the message was sent successfully to all nodes; false otherwise.
- */
- @Deprecated
- boolean broadcast(ClusterMessage message);
-
- /**
- * Broadcast a message to all controller nodes including self.
- *
- * @param message message to send
- * @return true if the message was sent successfully to all nodes; false otherwise.
- */
- @Deprecated
- boolean broadcastIncludeSelf(ClusterMessage message);
-
- /**
- * Sends a message to the specified controller node.
- *
- * @param message message to send
- * @param toNodeId node identifier
- * @return true if the message was sent successfully; false otherwise.
- */
- @Deprecated
- boolean unicast(ClusterMessage message, NodeId toNodeId);
-
- /**
- * Multicast a message to a set of controller nodes.
- *
- * @param message message to send
- * @param nodeIds recipient node identifiers
- * @return true if the message was sent successfully to all nodes in the group; false otherwise.
- */
- @Deprecated
- boolean multicast(ClusterMessage message, Iterable<NodeId> nodeIds);
-
- /**
- * Sends a message synchronously.
- * @param message message to send
- * @param toNodeId recipient node identifier
- * @return reply future.
- */
- @Deprecated
- ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId);
-
- /**
* Adds a new subscriber for the specified message subject.
*
* @param subject message subject
@@ -120,9 +71,9 @@
* @param encoder function for encoding message to byte[]
* @param toNodeId destination node identifier
* @param <M> message type
- * @return true if the message was sent successfully; false otherwise
+ * @return future that is completed when the message is sent
*/
- <M> boolean unicast(M message,
+ <M> CompletableFuture<Void> unicast(M message,
MessageSubject subject,
Function<M, byte[]> encoder,
NodeId toNodeId);
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
index 09f15f8..6ccd483 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/MessagingService.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.store.cluster.messaging;
-import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
@@ -32,9 +31,9 @@
* @param ep end point to send the message to.
* @param type type of message.
* @param payload message payload bytes.
- * @throws IOException when I/O exception of some sort has occurred
+ * @return future that is completed when the message is sent
*/
- void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
+ CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload);
/**
* Sends a message synchronously and waits for a response.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index ec297e2..859efeb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -42,7 +42,6 @@
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
-import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
@@ -237,11 +236,11 @@
private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
Endpoint remoteEp = new Endpoint(peer.ip(), peer.tcpPort());
- try {
- messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
- } catch (IOException e) {
- log.trace("Sending heartbeat to {} failed", remoteEp, e);
- }
+ messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload).whenComplete((result, error) -> {
+ if (error != null) {
+ log.trace("Sending heartbeat to {} failed", remoteEp, error);
+ }
+ });
}
private class HeartbeatMessageHandler implements Consumer<byte[]> {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 21b0919..8a237ef 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -35,10 +35,6 @@
import org.slf4j.LoggerFactory;
import com.google.common.base.Objects;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.SettableFuture;
-
-import java.io.IOException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
@@ -62,8 +58,11 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected MessagingService messagingService;
+ private NodeId localNodeId;
+
@Activate
public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
log.info("Started");
}
@@ -73,60 +72,6 @@
}
@Override
- public boolean broadcast(ClusterMessage message) {
- boolean ok = true;
- final ControllerNode localNode = clusterService.getLocalNode();
- byte[] payload = message.getBytes();
- for (ControllerNode node : clusterService.getNodes()) {
- if (!node.equals(localNode)) {
- ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
- }
- }
- return ok;
- }
-
- @Override
- public boolean broadcastIncludeSelf(ClusterMessage message) {
- boolean ok = true;
- byte[] payload = message.getBytes();
- for (ControllerNode node : clusterService.getNodes()) {
- ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
- }
- return ok;
- }
-
- @Override
- public boolean multicast(ClusterMessage message, Iterable<NodeId> nodes) {
- boolean ok = true;
- final ControllerNode localNode = clusterService.getLocalNode();
- byte[] payload = message.getBytes();
- for (NodeId nodeId : nodes) {
- if (!nodeId.equals(localNode.id())) {
- ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
- }
- }
- return ok;
- }
-
- @Override
- public boolean unicast(ClusterMessage message, NodeId toNodeId) {
- return unicastUnchecked(message.subject(), message.getBytes(), toNodeId);
- }
-
- @Override
- public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) {
- SettableFuture<byte[]> response = SettableFuture.create();
- sendAndReceive(message.subject(), message.getBytes(), toNodeId).whenComplete((r, e) -> {
- if (e == null) {
- response.set(r);
- } else {
- response.setException(e);
- }
- });
- return response;
- }
-
- @Override
public <M> void broadcast(M message,
MessageSubject subject,
Function<M, byte[]> encoder) {
@@ -154,15 +99,19 @@
}
@Override
- public <M> boolean unicast(M message,
- MessageSubject subject,
- Function<M, byte[]> encoder,
- NodeId toNodeId) {
- byte[] payload = new ClusterMessage(
- clusterService.getLocalNode().id(),
- subject,
- encoder.apply(message)).getBytes();
- return unicastUnchecked(subject, payload, toNodeId);
+ public <M> CompletableFuture<Void> unicast(M message,
+ MessageSubject subject,
+ Function<M, byte[]> encoder,
+ NodeId toNodeId) {
+ try {
+ byte[] payload = new ClusterMessage(
+ localNodeId,
+ subject,
+ encoder.apply(message)).getBytes();
+ return doUnicast(subject, payload, toNodeId);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
}
@Override
@@ -171,10 +120,10 @@
Function<M, byte[]> encoder,
Set<NodeId> nodes) {
byte[] payload = new ClusterMessage(
- clusterService.getLocalNode().id(),
+ localNodeId,
subject,
encoder.apply(message)).getBytes();
- nodes.forEach(nodeId -> unicastUnchecked(subject, payload, nodeId));
+ nodes.forEach(nodeId -> doUnicast(subject, payload, nodeId));
}
@Override
@@ -194,17 +143,11 @@
}
}
- private boolean unicastUnchecked(MessageSubject subject, byte[] payload, NodeId toNodeId) {
+ private CompletableFuture<Void> doUnicast(MessageSubject subject, byte[] payload, NodeId toNodeId) {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
- try {
- messagingService.sendAsync(nodeEp, subject.value(), payload);
- return true;
- } catch (IOException e) {
- log.debug("Failed to send cluster message to nodeId: " + toNodeId, e);
- return false;
- }
+ return messagingService.sendAsync(nodeEp, subject.value(), payload);
}
private CompletableFuture<byte[]> sendAndReceive(MessageSubject subject, byte[] payload, NodeId toNodeId) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
index 23b219b..823f658 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapBuilderImpl.java
@@ -66,27 +66,27 @@
}
@Override
- public EventuallyConsistentMapBuilder withName(String name) {
+ public EventuallyConsistentMapBuilder<K, V> withName(String name) {
this.name = checkNotNull(name);
return this;
}
@Override
- public EventuallyConsistentMapBuilder withSerializer(
+ public EventuallyConsistentMapBuilder<K, V> withSerializer(
KryoNamespace.Builder serializerBuilder) {
this.serializerBuilder = checkNotNull(serializerBuilder);
return this;
}
@Override
- public EventuallyConsistentMapBuilder withClockService(
+ public EventuallyConsistentMapBuilder<K, V> withClockService(
ClockService<K, V> clockService) {
this.clockService = checkNotNull(clockService);
return this;
}
@Override
- public EventuallyConsistentMapBuilder withEventExecutor(ExecutorService executor) {
+ public EventuallyConsistentMapBuilder<K, V> withEventExecutor(ExecutorService executor) {
this.eventExecutor = checkNotNull(executor);
return this;
}
@@ -99,13 +99,13 @@
}
@Override
- public EventuallyConsistentMapBuilder withBackgroundExecutor(ScheduledExecutorService executor) {
+ public EventuallyConsistentMapBuilder<K, V> withBackgroundExecutor(ScheduledExecutorService executor) {
this.backgroundExecutor = checkNotNull(executor);
return this;
}
@Override
- public EventuallyConsistentMapBuilder withPeerUpdateFunction(
+ public EventuallyConsistentMapBuilder<K, V> withPeerUpdateFunction(
BiFunction<K, V, Collection<NodeId>> peerUpdateFunction) {
this.peerUpdateFunction = checkNotNull(peerUpdateFunction);
return this;
diff --git a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
index f03215d..18ecefe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/ecmap/EventuallyConsistentMapImpl.java
@@ -509,12 +509,6 @@
);
}
- private boolean unicastMessage(NodeId peer, MessageSubject subject, Object event) {
- return clusterCommunicator.unicast(event, subject, serializer::encode, peer);
- // Note: we had this flipped before...
-// communicationExecutor.execute(() -> clusterCommunicator.unicast(message, peer));
- }
-
private boolean underHighLoad() {
return counter.get(LOAD_WINDOW) > HIGH_LOAD_THRESHOLD;
}
@@ -556,10 +550,14 @@
}
AntiEntropyAdvertisement<K> ad = createAdvertisement();
+ NodeId destination = peer;
+ clusterCommunicator.unicast(ad, antiEntropyAdvertisementSubject, serializer::encode, peer)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to send anti-entropy advertisement to {}", destination);
+ }
+ });
- if (!unicastMessage(peer, antiEntropyAdvertisementSubject, ad)) {
- log.debug("Failed to send anti-entropy advertisement to {}", peer);
- }
} catch (Exception e) {
// Catch all exceptions to avoid scheduled task being suppressed.
log.error("Exception thrown while sending advertisement", e);
@@ -595,9 +593,14 @@
// Send the advertisement back if this peer is out-of-sync
final NodeId sender = ad.sender();
AntiEntropyAdvertisement<K> myAd = createAdvertisement();
- if (!unicastMessage(sender, antiEntropyAdvertisementSubject, myAd)) {
- log.debug("Failed to send reactive anti-entropy advertisement to {}", sender);
- }
+
+ clusterCommunicator.unicast(myAd, antiEntropyAdvertisementSubject, serializer::encode, sender)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to send reactive "
+ + "anti-entropy advertisement to {}", sender);
+ }
+ });
break;
}
}
@@ -801,11 +804,15 @@
)
);
communicationExecutor.submit(() -> {
- try {
- unicastMessage(peer, updateMessageSubject, Lists.newArrayList(map.values()));
- } catch (Exception e) {
- log.warn("broadcast error", e);
- }
+ clusterCommunicator.unicast(Lists.newArrayList(map.values()),
+ updateMessageSubject,
+ serializer::encode,
+ peer)
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to send to {}", peer);
+ }
+ });
});
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
index 35d004e..3b82655 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/NewDistributedFlowRuleStore.java
@@ -407,21 +407,22 @@
log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}",
master, deviceId);
- if (!clusterCommunicator.unicast(operation,
- APPLY_BATCH_FLOWS,
- SERIALIZER::encode,
- master)) {
- log.warn("Failed to storeBatch: {} to {}", operation, master);
+ clusterCommunicator.unicast(operation,
+ APPLY_BATCH_FLOWS,
+ SERIALIZER::encode,
+ master)
+ .whenComplete((result, error) -> {
+ log.warn("Failed to storeBatch: {} to {}", operation, master);
- Set<FlowRule> allFailures = operation.getOperations().stream()
- .map(op -> op.target())
- .collect(Collectors.toSet());
+ Set<FlowRule> allFailures = operation.getOperations()
+ .stream()
+ .map(op -> op.target())
+ .collect(Collectors.toSet());
- notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(false, allFailures, deviceId)));
- return;
- }
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(false, allFailures, deviceId)));
+ });
}
private void storeBatchInternal(FlowRuleBatchOperation operation) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
index 3ebbf78..e91031a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/group/impl/DistributedGroupStore.java
@@ -395,8 +395,7 @@
}
// Check if group to be created by a remote instance
- if (mastershipService.getLocalRole(
- groupDesc.deviceId()) != MastershipRole.MASTER) {
+ if (mastershipService.getLocalRole(groupDesc.deviceId()) != MastershipRole.MASTER) {
log.debug("storeGroupDescription: Device {} local role is not MASTER",
groupDesc.deviceId());
if (mastershipService.getMasterFor(groupDesc.deviceId()) == null) {
@@ -410,19 +409,22 @@
createGroupAddRequestMsg(groupDesc.deviceId(),
groupDesc);
- if (!clusterCommunicator.unicast(groupOp,
+ clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
m -> kryoBuilder.build().serialize(m),
- mastershipService.getMasterFor(groupDesc.deviceId()))) {
- log.warn("Failed to send request to master: {} to {}",
- groupOp,
- mastershipService.getMasterFor(groupDesc.deviceId()));
- //TODO: Send Group operation failure event
- return;
- }
- log.debug("Sent Group operation request for device {} to remote MASTER {}",
- groupDesc.deviceId(),
- mastershipService.getMasterFor(groupDesc.deviceId()));
+ mastershipService.getMasterFor(groupDesc.deviceId())).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to send request to master: {} to {}",
+ groupOp,
+ mastershipService.getMasterFor(groupDesc.deviceId()));
+ //TODO: Send Group operation failure event
+ } else {
+ log.debug("Sent Group operation request for device {} "
+ + "to remote MASTER {}",
+ groupDesc.deviceId(),
+ mastershipService.getMasterFor(groupDesc.deviceId()));
+ }
+ });
return;
}
@@ -512,15 +514,17 @@
newBuckets,
newAppCookie);
- if (!clusterCommunicator.unicast(groupOp,
- GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
- m -> kryoBuilder.build().serialize(m),
- mastershipService.getMasterFor(deviceId))) {
- log.warn("Failed to send request to master: {} to {}",
- groupOp,
- mastershipService.getMasterFor(deviceId));
- //TODO: Send Group operation failure event
- }
+ clusterCommunicator.unicast(groupOp,
+ GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
+ m -> kryoBuilder.build().serialize(m),
+ mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to send request to master: {} to {}",
+ groupOp,
+ mastershipService.getMasterFor(deviceId), error);
+ }
+ //TODO: Send Group operation failure event
+ });
return;
}
log.debug("updateGroupDescription for device {} is getting handled locally",
@@ -643,15 +647,17 @@
createGroupDeleteRequestMsg(deviceId,
appCookie);
- if (!clusterCommunicator.unicast(groupOp,
+ clusterCommunicator.unicast(groupOp,
GroupStoreMessageSubjects.REMOTE_GROUP_OP_REQUEST,
m -> kryoBuilder.build().serialize(m),
- mastershipService.getMasterFor(deviceId))) {
- log.warn("Failed to send request to master: {} to {}",
- groupOp,
- mastershipService.getMasterFor(deviceId));
- //TODO: Send Group operation failure event
- }
+ mastershipService.getMasterFor(deviceId)).whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to send request to master: {} to {}",
+ groupOp,
+ mastershipService.getMasterFor(deviceId), error);
+ }
+ //TODO: Send Group operation failure event
+ });
return;
}
log.debug("deleteGroupDescription in device {} is getting handled locally",
diff --git a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
index 1b41d0c..77db68c 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/ecmap/EventuallyConsistentMapImplTest.java
@@ -18,7 +18,6 @@
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import org.junit.After;
@@ -145,7 +144,7 @@
.register(KryoNamespaces.API)
.register(TestTimestamp.class);
- ecMap = new EventuallyConsistentMapBuilderImpl<>(
+ ecMap = new EventuallyConsistentMapBuilderImpl<String, String>(
clusterService, clusterCommunicator)
.withName(MAP_NAME)
.withSerializer(serializer)
@@ -702,7 +701,7 @@
anyObject(MessageSubject.class),
anyObject(Function.class),
anyObject(NodeId.class)))
- .andReturn(true)
+ .andReturn(CompletableFuture.completedFuture(null))
.anyTimes();
replay(clusterCommunicator);
}
@@ -761,9 +760,9 @@
}
@Override
- public <M> boolean unicast(M message, MessageSubject subject,
+ public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
Function<M, byte[]> encoder, NodeId toNodeId) {
- return false;
+ return null;
}
@Override
@@ -795,33 +794,6 @@
Function<byte[], M> decoder, Consumer<M> handler,
Executor executor) {
}
-
- @Override
- public boolean broadcast(ClusterMessage message) {
- return false;
- }
-
- @Override
- public boolean broadcastIncludeSelf(ClusterMessage message) {
- return false;
- }
-
- @Override
- public boolean unicast(ClusterMessage message, NodeId toNodeId) {
- return false;
- }
-
- @Override
- public boolean multicast(ClusterMessage message,
- Iterable<NodeId> nodeIds) {
- return false;
- }
-
- @Override
- public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message,
- NodeId toNodeId) {
- return null;
- }
}
/**
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index c19dc59..9c63d84 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -20,7 +20,6 @@
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
@@ -136,32 +135,39 @@
}
@Override
- public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
+ public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
InternalMessage message = new InternalMessage(messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
- sendAsync(ep, message);
+ return sendAsync(ep, message);
}
- protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
- if (ep.equals(localEp)) {
- dispatchLocally(message);
- return;
- }
- Channel channel = null;
+ protected CompletableFuture<Void> sendAsync(Endpoint ep, InternalMessage message) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
try {
- try {
- channel = channels.borrowObject(ep);
- channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
- } finally {
- channels.returnObject(ep, channel);
+ if (ep.equals(localEp)) {
+ dispatchLocally(message);
+ future.complete(null);
+ } else {
+ Channel channel = null;
+ try {
+ channel = channels.borrowObject(ep);
+ channel.writeAndFlush(message).addListener(channelFuture -> {
+ if (!channelFuture.isSuccess()) {
+ future.completeExceptionally(channelFuture.cause());
+ } else {
+ future.complete(null);
+ }
+ });
+ } finally {
+ channels.returnObject(ep, channel);
+ }
}
- } catch (IOException e) {
- throw e;
} catch (Exception e) {
- throw new IOException(e);
+ future.completeExceptionally(e);
}
+ return future;
}
@Override
@@ -193,11 +199,11 @@
localEp,
REPLY_MESSAGE_TYPE,
responsePayload);
- try {
- sendAsync(message.sender(), response);
- } catch (IOException e) {
- log.debug("Failed to respond", e);
- }
+ sendAsync(message.sender(), response).whenComplete((result, error) -> {
+ if (error != null) {
+ log.debug("Failed to respond", error);
+ }
+ });
}
}));
}
@@ -206,17 +212,17 @@
public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
handlers.put(type, message -> {
handler.apply(message.payload()).whenComplete((result, error) -> {
- if (error == null) {
- InternalMessage response = new InternalMessage(message.id(),
- localEp,
- REPLY_MESSAGE_TYPE,
- result);
- try {
- sendAsync(message.sender(), response);
- } catch (IOException e) {
- log.debug("Failed to respond", e);
+ if (error == null) {
+ InternalMessage response = new InternalMessage(message.id(),
+ localEp,
+ REPLY_MESSAGE_TYPE,
+ result);
+ sendAsync(message.sender(), response).whenComplete((r, e) -> {
+ if (e != null) {
+ log.debug("Failed to respond", e);
+ }
+ });
}
- }
});
});
}
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
index 37a6535..c195d16 100644
--- a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
+++ b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
@@ -138,29 +138,30 @@
@Override
- public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
+ public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
DefaultMessage message = new DefaultMessage(
messageIdGenerator.incrementAndGet(),
localEp,
type,
payload);
- sendAsync(ep, message);
+ return sendAsync(ep, message);
}
- protected void sendAsync(Endpoint ep, DefaultMessage message) throws IOException {
+ protected CompletableFuture<Void> sendAsync(Endpoint ep, DefaultMessage message) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
if (ep.equals(localEp)) {
dispatchLocally(message);
- return;
+ future.complete(null);
+ return future;
}
DefaultMessageStream stream = null;
try {
stream = streams.borrowObject(ep);
- } catch (Exception e) {
- throw new IOException(e);
- }
- try {
stream.write(message);
+ future.complete(null);
+ } catch (Exception e) {
+ future.completeExceptionally(e);
} finally {
try {
streams.returnObject(ep, stream);
@@ -168,6 +169,7 @@
log.warn("Failed to return stream to pool");
}
}
+ return future;
}
@Override
@@ -202,30 +204,30 @@
localEp,
REPLY_MESSAGE_TYPE,
responsePayload);
- try {
- sendAsync(message.sender(), response);
- } catch (IOException e) {
- log.debug("Failed to respond", e);
- }
+ sendAsync(message.sender(), response).whenComplete((result, error) -> {
+ log.debug("Failed to respond", error);
+ });
}
}));
}
@Override
public void registerHandler(String type, Function<byte[], CompletableFuture<byte[]>> handler) {
- handlers.put(type, message -> handler.apply(message.payload()).whenComplete((result, error) -> {
- if (error == null) {
- DefaultMessage response = new DefaultMessage(message.id(),
+ handlers.put(type, message -> {
+ handler.apply(message.payload()).whenComplete((result, error) -> {
+ if (error == null) {
+ DefaultMessage response = new DefaultMessage(message.id(),
localEp,
REPLY_MESSAGE_TYPE,
result);
- try {
- sendAsync(message.sender(), response);
- } catch (IOException e) {
- log.debug("Failed to respond", e);
+ sendAsync(message.sender(), response).whenComplete((r, e) -> {
+ if (e != null) {
+ log.debug("Failed to respond", e);
+ }
+ });
}
- }
- }));
+ });
+ });
}
@Override