Makes establishing connections between onos nodes asynchronous, prevents threads blocking when nodes are inaccessible.
Change-Id: I46ce54505e8c4c34b56009412ddb1d645c83aaa3
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index f48b22c..8b003f0 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -96,8 +96,8 @@
})
.build();
- private final GenericKeyedObjectPool<Endpoint, Channel> channels
- = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+ private final GenericKeyedObjectPool<Endpoint, Connection> channels
+ = new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
private EventLoopGroup serverGroup;
private EventLoopGroup clientGroup;
@@ -179,18 +179,13 @@
CompletableFuture<Void> future = new CompletableFuture<>();
try {
- Channel channel = null;
+ Connection connection = null;
try {
- channel = channels.borrowObject(ep);
- channel.writeAndFlush(message).addListener(channelFuture -> {
- if (!channelFuture.isSuccess()) {
- future.completeExceptionally(channelFuture.cause());
- } else {
- future.complete(null);
- }
- });
+ connection = channels.borrowObject(ep);
+ connection.send(message, future);
+
} finally {
- channels.returnObject(ep, channel);
+ channels.returnObject(ep, connection);
}
} catch (Exception e) {
future.completeExceptionally(e);
@@ -292,21 +287,22 @@
}
private class OnosCommunicationChannelFactory
- implements KeyedPoolableObjectFactory<Endpoint, Channel> {
+ implements KeyedPoolableObjectFactory<Endpoint, Connection> {
@Override
- public void activateObject(Endpoint endpoint, Channel channel)
+ public void activateObject(Endpoint endpoint, Connection connection)
throws Exception {
}
@Override
- public void destroyObject(Endpoint ep, Channel channel) throws Exception {
+ public void destroyObject(Endpoint ep, Connection connection) throws Exception {
log.debug("Closing connection to {}", ep);
- channel.close();
+ //Is this the right way to destroy?
+ connection.destroy();
}
@Override
- public Channel makeObject(Endpoint ep) throws Exception {
+ public Connection makeObject(Endpoint ep) throws Exception {
Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
@@ -324,19 +320,28 @@
bootstrap.handler(new OnosCommunicationChannelInitializer());
}
// Start the client.
- ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
+ CompletableFuture<Channel> retFuture = new CompletableFuture<>();
+ ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
+
+ f.addListener(future -> {
+ if (future.isSuccess()) {
+ retFuture.complete(f.channel());
+ } else {
+ retFuture.completeExceptionally(future.cause());
+ }
+ });
log.debug("Established a new connection to {}", ep);
- return f.channel();
+ return new Connection(retFuture);
}
@Override
- public void passivateObject(Endpoint ep, Channel channel)
+ public void passivateObject(Endpoint ep, Connection connection)
throws Exception {
}
@Override
- public boolean validateObject(Endpoint ep, Channel channel) {
- return channel.isOpen();
+ public boolean validateObject(Endpoint ep, Connection connection) {
+ return connection.validate();
}
}
@@ -486,4 +491,62 @@
executor.execute(() -> future.completeExceptionally(error));
}
}
+ private final class Connection {
+ private final CompletableFuture<Channel> internalFuture;
+
+ public Connection(CompletableFuture<Channel> internalFuture) {
+ this.internalFuture = internalFuture;
+ }
+
+ /**
+ * Sends a message out on its channel and associated the message with a
+ * completable future used for signaling.
+ * @param message the message to be sent
+ * @param future a future that is completed normally or exceptionally if
+ * message sending succeeds or fails respectively
+ */
+ public void send(Object message, CompletableFuture<Void> future) {
+ internalFuture.whenComplete((channel, throwable) -> {
+ if (throwable == null) {
+ channel.writeAndFlush(message).addListener(channelFuture -> {
+ if (!channelFuture.isSuccess()) {
+ future.completeExceptionally(channelFuture.cause());
+ } else {
+ future.complete(null);
+ }
+ });
+ } else {
+ future.completeExceptionally(throwable);
+ }
+
+ });
+ }
+
+ /**
+ * Destroys a channel by closing its channel (if it exists) and
+ * cancelling its future.
+ */
+ public void destroy() {
+ Channel channel = internalFuture.getNow(null);
+ if (channel != null) {
+ channel.close();
+ }
+ internalFuture.cancel(false);
+ }
+
+ /**
+ * Determines whether the connection is valid meaning it is either
+ * complete with and active channel
+ * or it has not yet completed.
+ * @return true if the channel has an active connection or has not
+ * yet completed
+ */
+ public boolean validate() {
+ if (internalFuture.isCompletedExceptionally()) {
+ return false;
+ }
+ Channel channel = internalFuture.getNow(null);
+ return channel == null || channel.isActive();
+ }
+ }
}