Limit the amont of work that happens on netty event loop threads.
Currently we are kryo serializing/deserializing the message envelope which can potentially limit throughput.
Change-Id: I0ae9dab53bbb765b7618ceaefda1edf4f77b0b59
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index b611fad..69806b1 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -37,6 +37,7 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -46,13 +47,18 @@
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Charsets;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
+import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -64,7 +70,7 @@
private final Logger log = LoggerFactory.getLogger(getClass());
private final Endpoint localEp;
- private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, MessageHandler> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
@@ -78,6 +84,17 @@
}
})
.build();
+
+ private final LoadingCache<String, Long> messageTypeLookupCache = CacheBuilder.newBuilder()
+ .softValues()
+ .build(new CacheLoader<String, Long>() {
+
+ @Override
+ public Long load(String type) {
+ return hashToLong(type);
+ }
+ });
+
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
@@ -103,7 +120,7 @@
clientChannelClass = NioSocketChannel.class;
}
- public NettyMessagingService(String ip, int port) {
+ public NettyMessagingService(IpAddress ip, int port) {
localEp = new Endpoint(ip, port);
}
@@ -113,7 +130,7 @@
public NettyMessagingService(int port) {
try {
- localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
+ localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
} catch (UnknownHostException e) {
// Cannot resolve the local host, something is very wrong. Bailing out.
throw new IllegalStateException("Cannot resolve local host", e);
@@ -146,7 +163,7 @@
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageIdGenerator.incrementAndGet())
.withSender(localEp)
- .withType(type)
+ .withType(messageTypeLookupCache.getUnchecked(type))
.withPayload(payload)
.build();
sendAsync(ep, message);
@@ -178,7 +195,7 @@
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
.withSender(localEp)
- .withType(type)
+ .withType(messageTypeLookupCache.getUnchecked(type))
.withPayload(payload)
.build();
try {
@@ -192,7 +209,7 @@
@Override
public void registerHandler(String type, MessageHandler handler) {
- handlers.putIfAbsent(type, handler);
+ handlers.putIfAbsent(hashToLong(type), handler);
}
@Override
@@ -200,7 +217,7 @@
handlers.remove(type);
}
- private MessageHandler getMessageHandler(String type) {
+ private MessageHandler getMessageHandler(long type) {
return handlers.get(type);
}
@@ -245,7 +262,7 @@
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.
- ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
+ ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
return f.channel();
}
@@ -295,8 +312,8 @@
@Override
protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
- String type = message.type();
- if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
+ long type = message.type();
+ if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
try {
SettableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
@@ -326,4 +343,13 @@
context.close();
}
}
-}
+
+ /**
+ * Returns the md5 hash of the specified input string as a long.
+ * @param input input string.
+ * @return md5 hash as long.
+ */
+ public static long hashToLong(String input) {
+ return Hashing.md5().hashBytes(input.getBytes(Charsets.UTF_8)).asLong();
+ }
+}
\ No newline at end of file