Limit the amont of work that happens on netty event loop threads.
Currently we are kryo serializing/deserializing the message envelope which can potentially limit throughput.

Change-Id: I0ae9dab53bbb765b7618ceaefda1edf4f77b0b59
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index b611fad..69806b1 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -37,6 +37,7 @@
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -46,13 +47,18 @@
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.onlab.packet.IpAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Charsets;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
+import com.google.common.hash.Hashing;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -64,7 +70,7 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final Endpoint localEp;
-    private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, MessageHandler> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
     private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
             .maximumSize(100000)
@@ -78,6 +84,17 @@
                 }
             })
             .build();
+
+    private final LoadingCache<String, Long> messageTypeLookupCache = CacheBuilder.newBuilder()
+            .softValues()
+            .build(new CacheLoader<String, Long>() {
+
+                @Override
+                public Long load(String type) {
+                    return hashToLong(type);
+                }
+            });
+
     private final GenericKeyedObjectPool<Endpoint, Channel> channels
             = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
 
@@ -103,7 +120,7 @@
         clientChannelClass = NioSocketChannel.class;
     }
 
-    public NettyMessagingService(String ip, int port) {
+    public NettyMessagingService(IpAddress ip, int port) {
         localEp = new Endpoint(ip, port);
     }
 
@@ -113,7 +130,7 @@
 
     public NettyMessagingService(int port) {
         try {
-            localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
+            localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
         } catch (UnknownHostException e) {
             // Cannot resolve the local host, something is very wrong. Bailing out.
             throw new IllegalStateException("Cannot resolve local host", e);
@@ -146,7 +163,7 @@
         InternalMessage message = new InternalMessage.Builder(this)
             .withId(messageIdGenerator.incrementAndGet())
             .withSender(localEp)
-            .withType(type)
+            .withType(messageTypeLookupCache.getUnchecked(type))
             .withPayload(payload)
             .build();
         sendAsync(ep, message);
@@ -178,7 +195,7 @@
         InternalMessage message = new InternalMessage.Builder(this)
             .withId(messageId)
             .withSender(localEp)
-            .withType(type)
+            .withType(messageTypeLookupCache.getUnchecked(type))
             .withPayload(payload)
             .build();
         try {
@@ -192,7 +209,7 @@
 
     @Override
     public void registerHandler(String type, MessageHandler handler) {
-        handlers.putIfAbsent(type, handler);
+        handlers.putIfAbsent(hashToLong(type), handler);
     }
 
     @Override
@@ -200,7 +217,7 @@
         handlers.remove(type);
     }
 
-    private MessageHandler getMessageHandler(String type) {
+    private MessageHandler getMessageHandler(long type) {
         return handlers.get(type);
     }
 
@@ -245,7 +262,7 @@
             bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
             bootstrap.handler(new OnosCommunicationChannelInitializer());
             // Start the client.
-            ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
+            ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
             return f.channel();
         }
 
@@ -295,8 +312,8 @@
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
-            String type = message.type();
-            if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
+            long type = message.type();
+            if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
                 try {
                     SettableFuture<byte[]> futureResponse =
                         NettyMessagingService.this.responseFutures.getIfPresent(message.id());
@@ -326,4 +343,13 @@
             context.close();
         }
     }
-}
+
+    /**
+     * Returns the md5 hash of the specified input string as a long.
+     * @param input input string.
+     * @return md5 hash as long.
+     */
+    public static long hashToLong(String input) {
+        return Hashing.md5().hashBytes(input.getBytes(Charsets.UTF_8)).asLong();
+    }
+}
\ No newline at end of file