Limit the amont of work that happens on netty event loop threads.
Currently we are kryo serializing/deserializing the message envelope which can potentially limit throughput.

Change-Id: I0ae9dab53bbb765b7618ceaefda1edf4f77b0b59
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 1469433..5c20718 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -76,7 +76,7 @@
     @Activate
     public void activate() {
         ControllerNode localNode = clusterService.getLocalNode();
-        NettyMessagingService netty = new NettyMessagingService(localNode.ip().toString(), localNode.tcpPort());
+        NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
         // FIXME: workaround until it becomes a service.
         try {
             netty.activate();
@@ -143,7 +143,7 @@
     private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
-        Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
+        Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
         try {
             messagingService.sendAsync(nodeEp, subject.value(), payload);
             return true;
@@ -166,7 +166,7 @@
     public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
         ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
-        Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
+        Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
         try {
             return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
 
diff --git a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java b/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
index c5bb4cb..dc885aa 100644
--- a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
+++ b/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
@@ -19,9 +19,11 @@
  * State transitions a decoder goes through as it is decoding an incoming message.
  */
 public enum DecoderState {
-    READ_HEADER_VERSION,
-    READ_PREAMBLE,
+    READ_MESSAGE_ID,
+    READ_SENDER_IP_VERSION,
+    READ_SENDER_IP,
+    READ_SENDER_PORT,
+    READ_MESSAGE_TYPE,
     READ_CONTENT_LENGTH,
-    READ_SERIALIZER_VERSION,
     READ_CONTENT
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
index 50bc58d..ecbb08f 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -15,8 +15,12 @@
  */
 package org.onlab.netty;
 
+import static com.google.common.base.Preconditions.*;
+
 import java.util.Objects;
 
+import org.onlab.packet.IpAddress;
+
 import com.google.common.base.MoreObjects;
 
 /**
@@ -25,15 +29,15 @@
 public final class Endpoint {
 
     private final int port;
-    private final String host;
+    private final IpAddress ip;
 
-    public Endpoint(String host, int port) {
-        this.host = host;
+    public Endpoint(IpAddress host, int port) {
+        this.ip = checkNotNull(host);
         this.port = port;
     }
 
-    public String host() {
-        return host;
+    public IpAddress host() {
+        return ip;
     }
 
     public int port() {
@@ -43,14 +47,14 @@
     @Override
     public String toString() {
         return MoreObjects.toStringHelper(getClass())
-                .add("host", host)
+                .add("ip", ip)
                 .add("port", port)
                 .toString();
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(host, port);
+        return Objects.hash(ip, port);
     }
 
     @Override
@@ -66,6 +70,6 @@
         }
         Endpoint that = (Endpoint) obj;
         return Objects.equals(this.port, that.port) &&
-               Objects.equals(this.host, that.host);
+               Objects.equals(this.ip, that.ip);
     }
 }
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index ba3c896..f9d4e0b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -27,18 +27,19 @@
  */
 public final class InternalMessage implements Message {
 
-    public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+    public static final long REPLY_MESSAGE_TYPE =
+            NettyMessagingService.hashToLong("NETTY_MESSAGING_REQUEST_REPLY");
 
     private long id;
     private Endpoint sender;
-    private String type;
+    private long type;
     private byte[] payload;
     private transient NettyMessagingService messagingService;
 
     // Must be created using the Builder.
     private InternalMessage() {}
 
-    InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
+    InternalMessage(long id, Endpoint sender, long type, byte[] payload) {
         this.id = id;
         this.sender = sender;
         this.type = type;
@@ -49,7 +50,7 @@
         return id;
     }
 
-    public String type() {
+    public long type() {
         return type;
     }
 
@@ -103,7 +104,7 @@
             return this;
         }
 
-        public Builder withType(String type) {
+        public Builder withType(long type) {
             message.type = type;
             return this;
         }
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
deleted file mode 100644
index 6118c49..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import org.onlab.util.KryoNamespace;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import java.nio.ByteBuffer;
-
-/**
- * Kryo Serializer.
- */
-public class KryoSerializer {
-
-    private KryoNamespace serializerPool;
-
-    public KryoSerializer() {
-        setupKryoPool();
-    }
-
-    /**
-     * Sets up the common serialzers pool.
-     */
-    protected void setupKryoPool() {
-        serializerPool = KryoNamespace.newBuilder()
-                .register(byte[].class)
-                .register(new InternalMessageSerializer(), InternalMessage.class)
-                .register(new EndPointSerializer(), Endpoint.class)
-                .build();
-    }
-
-
-    public <T> T decode(byte[] data) {
-        return serializerPool.deserialize(data);
-    }
-
-    public byte[] encode(Object payload) {
-        return serializerPool.serialize(payload);
-    }
-
-    public <T> T decode(ByteBuffer buffer) {
-        return serializerPool.deserialize(buffer);
-    }
-
-    public void encode(Object obj, ByteBuffer buffer) {
-        serializerPool.serialize(obj, buffer);
-    }
-
-    public static final class InternalMessageSerializer
-            extends Serializer<InternalMessage> {
-
-        @Override
-        public void write(Kryo kryo, Output output, InternalMessage object) {
-            output.writeLong(object.id());
-            kryo.writeClassAndObject(output, object.sender());
-            output.writeString(object.type());
-            output.writeInt(object.payload().length, true);
-            output.writeBytes(object.payload());
-        }
-
-        @Override
-        public InternalMessage read(Kryo kryo, Input input,
-                                    Class<InternalMessage> type) {
-            long id = input.readLong();
-            Endpoint sender = (Endpoint) kryo.readClassAndObject(input);
-            String msgtype = input.readString();
-            int length = input.readInt(true);
-            byte[] payload = input.readBytes(length);
-            return new InternalMessage(id, sender, msgtype, payload);
-        }
-
-    }
-
-    public static final class EndPointSerializer extends Serializer<Endpoint> {
-
-        @Override
-        public void write(Kryo kryo, Output output, Endpoint object) {
-            output.writeString(object.host());
-            output.writeInt(object.port());
-        }
-
-        @Override
-        public Endpoint read(Kryo kryo, Input input, Class<Endpoint> type) {
-            String host = input.readString();
-            int port = input.readInt();
-            return new Endpoint(host, port);
-        }
-    }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 51f8dd3..1697539 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -20,9 +20,10 @@
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ReplayingDecoder;
 
-import java.util.Arrays;
 import java.util.List;
 
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpAddress.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,12 +36,15 @@
 
     private final NettyMessagingService messagingService;
 
-    private static final KryoSerializer SERIALIZER = new KryoSerializer();
-
+    private long messageId;
+    private Version ipVersion;
+    private IpAddress senderIp;
+    private int senderPort;
     private int contentLength;
+    private long messageType;
 
     public MessageDecoder(NettyMessagingService messagingService) {
-        super(DecoderState.READ_HEADER_VERSION);
+        super(DecoderState.READ_MESSAGE_ID);
         this.messagingService = messagingService;
     }
 
@@ -51,27 +55,37 @@
             List<Object> out) throws Exception {
 
         switch (state()) {
-        case READ_HEADER_VERSION:
-            int headerVersion = buffer.readInt();
-            checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
-            checkpoint(DecoderState.READ_PREAMBLE);
-        case READ_PREAMBLE:
-            byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
-            buffer.readBytes(preamble);
-            checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
+        case READ_MESSAGE_ID:
+            messageId = buffer.readLong();
+            checkpoint(DecoderState.READ_SENDER_IP_VERSION);
+        case READ_SENDER_IP_VERSION:
+            ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
+            checkpoint(DecoderState.READ_SENDER_IP);
+        case READ_SENDER_IP:
+            byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
+            buffer.readBytes(octects);
+            senderIp = IpAddress.valueOf(ipVersion, octects);
+            checkpoint(DecoderState.READ_SENDER_PORT);
+        case READ_SENDER_PORT:
+            senderPort = buffer.readInt();
+            checkpoint(DecoderState.READ_MESSAGE_TYPE);
+        case READ_MESSAGE_TYPE:
+            messageType = buffer.readLong();
             checkpoint(DecoderState.READ_CONTENT_LENGTH);
         case READ_CONTENT_LENGTH:
             contentLength = buffer.readInt();
-            checkpoint(DecoderState.READ_SERIALIZER_VERSION);
-        case READ_SERIALIZER_VERSION:
-            int serializerVersion = buffer.readInt();
-            checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
             checkpoint(DecoderState.READ_CONTENT);
         case READ_CONTENT:
-            InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
+            byte[] payload = new byte[contentLength];
+            buffer.readBytes(payload);
+            InternalMessage message = new InternalMessage(
+                    messageId,
+                    new Endpoint(senderIp, senderPort),
+                    messageType,
+                    payload);
             message.setMessagingService(messagingService);
             out.add(message);
-            checkpoint(DecoderState.READ_HEADER_VERSION);
+            checkpoint(DecoderState.READ_MESSAGE_ID);
             break;
          default:
             checkState(false, "Must not be here");
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 88bec40..c05eae2 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -15,17 +15,18 @@
  */
 package org.onlab.netty;
 
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 
+import java.io.IOException;
+
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpAddress.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Encode InternalMessage out into a byte buffer.
  */
@@ -34,34 +35,36 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    // onosiscool in ascii
-    static final byte[] PREAMBLE = "onosiscool".getBytes(StandardCharsets.US_ASCII);
-    public static final int HEADER_VERSION = 1;
-    public static final int SERIALIZER_VERSION = 1;
-
-
-    private static final KryoSerializer SERIALIZER = new KryoSerializer();
-
     @Override
     protected void encode(
             ChannelHandlerContext context,
             InternalMessage message,
             ByteBuf out) throws Exception {
 
-        // write version
-        out.writeInt(HEADER_VERSION);
+        // write message id
+        out.writeLong(message.id());
 
-        // write preamble
-        out.writeBytes(PREAMBLE);
+        Endpoint sender = message.sender();
 
-        byte[] payload = SERIALIZER.encode(message);
+        IpAddress senderIp = sender.host();
+        if (senderIp.version() == Version.INET) {
+            out.writeByte(0);
+        } else {
+            out.writeByte(1);
+        }
+        out.writeBytes(senderIp.toOctets());
+
+        // write sender port
+        out.writeInt(sender.port());
+
+        // write message type.
+        out.writeLong(message.type());
+
+        byte[] payload = message.payload();
 
         // write payload length
         out.writeInt(payload.length);
 
-        // write payloadSerializer version
-        out.writeInt(SERIALIZER_VERSION);
-
         // write payload.
         out.writeBytes(payload);
     }
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index b611fad..69806b1 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -37,6 +37,7 @@
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -46,13 +47,18 @@
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.onlab.packet.IpAddress;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Charsets;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
+import com.google.common.hash.Hashing;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
 
@@ -64,7 +70,7 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final Endpoint localEp;
-    private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+    private final ConcurrentMap<Long, MessageHandler> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
     private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
             .maximumSize(100000)
@@ -78,6 +84,17 @@
                 }
             })
             .build();
+
+    private final LoadingCache<String, Long> messageTypeLookupCache = CacheBuilder.newBuilder()
+            .softValues()
+            .build(new CacheLoader<String, Long>() {
+
+                @Override
+                public Long load(String type) {
+                    return hashToLong(type);
+                }
+            });
+
     private final GenericKeyedObjectPool<Endpoint, Channel> channels
             = new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
 
@@ -103,7 +120,7 @@
         clientChannelClass = NioSocketChannel.class;
     }
 
-    public NettyMessagingService(String ip, int port) {
+    public NettyMessagingService(IpAddress ip, int port) {
         localEp = new Endpoint(ip, port);
     }
 
@@ -113,7 +130,7 @@
 
     public NettyMessagingService(int port) {
         try {
-            localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
+            localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
         } catch (UnknownHostException e) {
             // Cannot resolve the local host, something is very wrong. Bailing out.
             throw new IllegalStateException("Cannot resolve local host", e);
@@ -146,7 +163,7 @@
         InternalMessage message = new InternalMessage.Builder(this)
             .withId(messageIdGenerator.incrementAndGet())
             .withSender(localEp)
-            .withType(type)
+            .withType(messageTypeLookupCache.getUnchecked(type))
             .withPayload(payload)
             .build();
         sendAsync(ep, message);
@@ -178,7 +195,7 @@
         InternalMessage message = new InternalMessage.Builder(this)
             .withId(messageId)
             .withSender(localEp)
-            .withType(type)
+            .withType(messageTypeLookupCache.getUnchecked(type))
             .withPayload(payload)
             .build();
         try {
@@ -192,7 +209,7 @@
 
     @Override
     public void registerHandler(String type, MessageHandler handler) {
-        handlers.putIfAbsent(type, handler);
+        handlers.putIfAbsent(hashToLong(type), handler);
     }
 
     @Override
@@ -200,7 +217,7 @@
         handlers.remove(type);
     }
 
-    private MessageHandler getMessageHandler(String type) {
+    private MessageHandler getMessageHandler(long type) {
         return handlers.get(type);
     }
 
@@ -245,7 +262,7 @@
             bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
             bootstrap.handler(new OnosCommunicationChannelInitializer());
             // Start the client.
-            ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
+            ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
             return f.channel();
         }
 
@@ -295,8 +312,8 @@
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
-            String type = message.type();
-            if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
+            long type = message.type();
+            if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
                 try {
                     SettableFuture<byte[]> futureResponse =
                         NettyMessagingService.this.responseFutures.getIfPresent(message.id());
@@ -326,4 +343,13 @@
             context.close();
         }
     }
-}
+
+    /**
+     * Returns the md5 hash of the specified input string as a long.
+     * @param input input string.
+     * @return md5 hash as long.
+     */
+    public static long hashToLong(String input) {
+        return Hashing.md5().hashBytes(input.getBytes(Charsets.UTF_8)).asLong();
+    }
+}
\ No newline at end of file
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
index ab816b2..3eef484 100644
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
@@ -15,15 +15,16 @@
  */
 package org.onlab.netty;
 
+import static org.junit.Assert.assertArrayEquals;
+
+import java.net.InetAddress;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.lang3.RandomUtils;
-
-import static org.junit.Assert.*;
-
 import org.junit.Ignore;
 import org.junit.Test;
+import org.onlab.packet.IpAddress;
 
 /**
  * Simple ping-pong test that exercises NettyMessagingService.
@@ -40,7 +41,9 @@
             ponger.activate();
             ponger.registerHandler("echo", new EchoHandler());
             byte[] payload = RandomUtils.nextBytes(100);
-            Future<byte[]> responseFuture = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
+            Future<byte[]> responseFuture =
+                    pinger.sendAndReceive(
+                            new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), 9086), "echo", payload);
             assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));
         } finally {
             pinger.deactivate();