Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 9fe62db..54b02de 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -20,6 +20,7 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 
 import io.netty.bootstrap.Bootstrap;
@@ -27,6 +28,7 @@
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
@@ -49,6 +51,8 @@
 
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -64,8 +68,13 @@
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
     private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
             .maximumSize(100000)
-            // TODO: Once the entry expires, notify blocking threads (if any).
-            .expireAfterWrite(10, TimeUnit.MINUTES)
+            .expireAfterWrite(10, TimeUnit.SECONDS)
+            .removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() {
+                @Override
+                public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> entry) {
+                    entry.getValue().setException(new TimeoutException("Timedout waiting for reply"));
+                }
+            })
             .build();
     private final GenericKeyedObjectPool<Endpoint, Channel> channels
             = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
@@ -153,9 +162,12 @@
             } finally {
                 channels.returnObject(ep, channel);
             }
+        } catch (IOException e) {
+            throw e;
         } catch (Exception e) {
-            throw new IOException("Failed to send message to " + ep.toString(), e);
+            throw new IOException(e);
         }
+
     }
 
     @Override
@@ -172,7 +184,7 @@
             .build();
         try {
             sendAsync(ep, message);
-        } catch (IOException e) {
+        } catch (Exception e) {
             responseFutures.invalidate(messageId);
             throw e;
         }
@@ -277,7 +289,7 @@
 
         @Override
         public void run() {
-            channel.writeAndFlush(message, channel.voidPromise());
+            channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
         }
     }