Netty Messaging changes:
1. Lowered the timeout value for expiring unanswered sendAndReceive calls to 10s.
2. Marking the future as complete (with exception) when a entry is evicted due to timeout.
3. Improved exception behavior.
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 33870e2..6a9cf68 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -23,6 +23,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import io.netty.bootstrap.Bootstrap;
@@ -30,6 +31,7 @@
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
@@ -52,6 +54,8 @@
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -67,8 +71,13 @@
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
     private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
             .maximumSize(100000)
-            // TODO: Once the entry expires, notify blocking threads (if any).
-            .expireAfterWrite(10, TimeUnit.MINUTES)
+            .expireAfterWrite(10, TimeUnit.SECONDS)
+            .removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() {
+                @Override
+                public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> entry) {
+                    entry.getValue().setException(new TimeoutException("Timedout waiting for reply"));
+                }
+            })
             .build();
     private final GenericKeyedObjectPool<Endpoint, Channel> channels
             = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
@@ -156,9 +165,12 @@
             } finally {
                 channels.returnObject(ep, channel);
             }
+        } catch (IOException e) {
+            throw e;
         } catch (Exception e) {
-            throw new IOException("Failed to send message to " + ep.toString(), e);
+            throw new IOException(e);
         }
+
     }
 
     @Override
@@ -175,7 +187,7 @@
             .build();
         try {
             sendAsync(ep, message);
-        } catch (IOException e) {
+        } catch (Exception e) {
             responseFutures.invalidate(messageId);
             throw e;
         }
@@ -280,7 +292,7 @@
 
         @Override
         public void run() {
-            channel.writeAndFlush(message, channel.voidPromise());
+            channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
         }
     }