Makes establishing connections between onos nodes asynchronous, prevents threads blocking when nodes are inaccessible.

Change-Id: I46ce54505e8c4c34b56009412ddb1d645c83aaa3
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index f48b22c..8b003f0 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -96,8 +96,8 @@
             })
             .build();
 
-    private final GenericKeyedObjectPool<Endpoint, Channel> channels
-            = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+    private final GenericKeyedObjectPool<Endpoint, Connection> channels
+            = new GenericKeyedObjectPool<Endpoint, Connection>(new OnosCommunicationChannelFactory());
 
     private EventLoopGroup serverGroup;
     private EventLoopGroup clientGroup;
@@ -179,18 +179,13 @@
 
         CompletableFuture<Void> future = new CompletableFuture<>();
         try {
-            Channel channel = null;
+            Connection connection = null;
             try {
-                channel = channels.borrowObject(ep);
-                channel.writeAndFlush(message).addListener(channelFuture -> {
-                    if (!channelFuture.isSuccess()) {
-                        future.completeExceptionally(channelFuture.cause());
-                    } else {
-                        future.complete(null);
-                    }
-                });
+                connection = channels.borrowObject(ep);
+                connection.send(message, future);
+
             } finally {
-                channels.returnObject(ep, channel);
+                channels.returnObject(ep, connection);
             }
         } catch (Exception e) {
             future.completeExceptionally(e);
@@ -292,21 +287,22 @@
     }
 
     private class OnosCommunicationChannelFactory
-        implements KeyedPoolableObjectFactory<Endpoint, Channel> {
+        implements KeyedPoolableObjectFactory<Endpoint, Connection> {
 
         @Override
-        public void activateObject(Endpoint endpoint, Channel channel)
+        public void activateObject(Endpoint endpoint,  Connection connection)
                 throws Exception {
         }
 
         @Override
-        public void destroyObject(Endpoint ep, Channel channel) throws Exception {
+        public void destroyObject(Endpoint ep, Connection connection) throws Exception {
             log.debug("Closing connection to {}", ep);
-            channel.close();
+            //Is this the right way to destroy?
+            connection.destroy();
         }
 
         @Override
-        public Channel makeObject(Endpoint ep) throws Exception {
+        public Connection makeObject(Endpoint ep) throws Exception {
             Bootstrap bootstrap = new Bootstrap();
             bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
             bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 10 * 64 * 1024);
@@ -324,19 +320,28 @@
                 bootstrap.handler(new OnosCommunicationChannelInitializer());
             }
             // Start the client.
-            ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
+            CompletableFuture<Channel> retFuture = new CompletableFuture<>();
+            ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port());
+
+            f.addListener(future -> {
+                        if (future.isSuccess()) {
+                            retFuture.complete(f.channel());
+                        } else {
+                            retFuture.completeExceptionally(future.cause());
+                        }
+                    });
             log.debug("Established a new connection to {}", ep);
-            return f.channel();
+            return new Connection(retFuture);
         }
 
         @Override
-        public void passivateObject(Endpoint ep, Channel channel)
+        public void passivateObject(Endpoint ep, Connection connection)
                 throws Exception {
         }
 
         @Override
-        public boolean validateObject(Endpoint ep, Channel channel) {
-            return channel.isOpen();
+        public boolean validateObject(Endpoint ep, Connection connection) {
+            return connection.validate();
         }
     }
 
@@ -486,4 +491,62 @@
             executor.execute(() -> future.completeExceptionally(error));
         }
     }
+    private final class Connection {
+        private final CompletableFuture<Channel> internalFuture;
+
+        public Connection(CompletableFuture<Channel> internalFuture) {
+            this.internalFuture = internalFuture;
+        }
+
+        /**
+         * Sends a message out on its channel and associated the message with a
+         * completable future used for signaling.
+         * @param message the message to be sent
+         * @param future a future that is completed normally or exceptionally if
+         *               message sending succeeds or fails respectively
+         */
+        public void send(Object message, CompletableFuture<Void> future) {
+            internalFuture.whenComplete((channel, throwable) -> {
+                if (throwable == null) {
+                    channel.writeAndFlush(message).addListener(channelFuture -> {
+                        if (!channelFuture.isSuccess()) {
+                            future.completeExceptionally(channelFuture.cause());
+                        } else {
+                            future.complete(null);
+                        }
+                    });
+                } else {
+                    future.completeExceptionally(throwable);
+                }
+
+            });
+        }
+
+        /**
+         * Destroys a channel by closing its channel (if it exists) and
+         * cancelling its future.
+         */
+        public void destroy() {
+            Channel channel = internalFuture.getNow(null);
+            if (channel != null) {
+                channel.close();
+            }
+            internalFuture.cancel(false);
+        }
+
+        /**
+         * Determines whether the connection is valid meaning it is either
+         * complete with and active channel
+         * or it has not yet completed.
+         * @return true if the channel has an active connection or has not
+         * yet completed
+         */
+        public boolean validate() {
+            if (internalFuture.isCompletedExceptionally()) {
+                return false;
+            }
+            Channel channel = internalFuture.getNow(null);
+            return channel == null || channel.isActive();
+        }
+    }
 }