Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 9fe62db..54b02de 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -20,6 +20,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import io.netty.bootstrap.Bootstrap;
@@ -27,6 +28,7 @@
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
@@ -49,6 +51,8 @@
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -64,8 +68,13 @@
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
- // TODO: Once the entry expires, notify blocking threads (if any).
- .expireAfterWrite(10, TimeUnit.MINUTES)
+ .expireAfterWrite(10, TimeUnit.SECONDS)
+ .removalListener(new RemovalListener<Long, SettableFuture<byte[]>>() {
+ @Override
+ public void onRemoval(RemovalNotification<Long, SettableFuture<byte[]>> entry) {
+ entry.getValue().setException(new TimeoutException("Timedout waiting for reply"));
+ }
+ })
.build();
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
@@ -153,9 +162,12 @@
} finally {
channels.returnObject(ep, channel);
}
+ } catch (IOException e) {
+ throw e;
} catch (Exception e) {
- throw new IOException("Failed to send message to " + ep.toString(), e);
+ throw new IOException(e);
}
+
}
@Override
@@ -172,7 +184,7 @@
.build();
try {
sendAsync(ep, message);
- } catch (IOException e) {
+ } catch (Exception e) {
responseFutures.invalidate(messageId);
throw e;
}
@@ -277,7 +289,7 @@
@Override
public void run() {
- channel.writeAndFlush(message, channel.voidPromise());
+ channel.writeAndFlush(message).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
}