Bypass netty stack for messages that are sent to self

Change-Id: Ifb1fd610892bd22a291cda472a8a5ef7a1dcfe6d

Manual serde for ClusterMessage to avoid one additional kryo serialization overhead for each message sent/received

Change-Id: I08d9a2c10403b0e9e9e1736c6bd36fa008bb8db0
diff --git a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java
index cadcfa2..fef1018 100644
--- a/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java
+++ b/core/api/src/main/java/org/onosproject/store/cluster/messaging/ClusterMessage.java
@@ -15,13 +15,16 @@
  */
 package org.onosproject.store.cluster.messaging;
 
-import com.google.common.base.MoreObjects;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
 import org.onlab.util.ByteArraySizeHashPrinter;
 import org.onosproject.cluster.NodeId;
 
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Objects;
+import com.google.common.base.Charsets;
+import com.google.common.base.MoreObjects;
 
 // TODO: Should payload type be ByteBuffer?
 /**
@@ -105,6 +108,43 @@
                 Arrays.equals(this.payload, that.payload);
     }
 
+    /**
+     * Serializes this instance.
+     * @return bytes
+     */
+    public byte[] getBytes() {
+        byte[] senderBytes = sender.toString().getBytes(Charsets.UTF_8);
+        byte[] subjectBytes = subject.value().getBytes(Charsets.UTF_8);
+        int capacity = 12 + senderBytes.length + subjectBytes.length + payload.length;
+        ByteBuffer buffer = ByteBuffer.allocate(capacity);
+        buffer.putInt(senderBytes.length);
+        buffer.put(senderBytes);
+        buffer.putInt(subjectBytes.length);
+        buffer.put(subjectBytes);
+        buffer.putInt(payload.length);
+        buffer.put(payload);
+        return buffer.array();
+    }
+
+    /**
+     * Decodes a new ClusterMessage from raw bytes.
+     * @param bytes raw bytes
+     * @return cluster message
+     */
+    public static ClusterMessage fromBytes(byte[] bytes) {
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        byte[] senderBytes = new byte[buffer.getInt()];
+        buffer.get(senderBytes);
+        byte[] subjectBytes = new byte[buffer.getInt()];
+        buffer.get(subjectBytes);
+        byte[] payloadBytes = new byte[buffer.getInt()];
+        buffer.get(payloadBytes);
+
+        return new ClusterMessage(new NodeId(new String(senderBytes, Charsets.UTF_8)),
+                new MessageSubject(new String(senderBytes, Charsets.UTF_8)),
+                payloadBytes);
+    }
+
     @Override
     public int hashCode() {
         return Objects.hash(sender, subject, payload);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index b27e009..59671c0 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -28,7 +28,6 @@
 import org.onlab.netty.MessageHandler;
 import org.onlab.netty.MessagingService;
 import org.onlab.netty.NettyMessagingService;
-import org.onlab.util.KryoNamespace;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
@@ -36,10 +35,6 @@
 import org.onosproject.store.cluster.messaging.ClusterMessage;
 import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
 import org.onosproject.store.cluster.messaging.MessageSubject;
-import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.serializers.KryoSerializer;
-import org.onosproject.store.serializers.impl.ClusterMessageSerializer;
-import org.onosproject.store.serializers.impl.MessageSubjectSerializer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,19 +57,6 @@
     // TODO: This probably should not be a OSGi service.
     private MessagingService messagingService;
 
-    private static final KryoSerializer SERIALIZER = new KryoSerializer() {
-        @Override
-        protected void setupKryoPool() {
-            serializerPool = KryoNamespace.newBuilder()
-                    .register(KryoNamespaces.API)
-                    .nextId(KryoNamespaces.BEGIN_USER_CUSTOM_ID)
-                    .register(new ClusterMessageSerializer(), ClusterMessage.class)
-                    .register(new MessageSubjectSerializer(), MessageSubject.class)
-                    .build();
-        }
-
-    };
-
     @Activate
     public void activate() {
         ControllerNode localNode = clusterService.getLocalNode();
@@ -105,7 +87,7 @@
     public boolean broadcast(ClusterMessage message) {
         boolean ok = true;
         final ControllerNode localNode = clusterService.getLocalNode();
-        byte[] payload = SERIALIZER.encode(message);
+        byte[] payload = message.getBytes();
         for (ControllerNode node : clusterService.getNodes()) {
             if (!node.equals(localNode)) {
                 ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
@@ -117,7 +99,7 @@
     @Override
     public boolean broadcastIncludeSelf(ClusterMessage message) {
         boolean ok = true;
-        byte[] payload = SERIALIZER.encode(message);
+        byte[] payload = message.getBytes();
         for (ControllerNode node : clusterService.getNodes()) {
             ok = unicastUnchecked(message.subject(), payload, node.id()) && ok;
         }
@@ -128,7 +110,7 @@
     public boolean multicast(ClusterMessage message, Set<NodeId> nodes) {
         boolean ok = true;
         final ControllerNode localNode = clusterService.getLocalNode();
-        byte[] payload = SERIALIZER.encode(message);
+        byte[] payload = message.getBytes();
         for (NodeId nodeId : nodes) {
             if (!nodeId.equals(localNode.id())) {
                 ok = unicastUnchecked(message.subject(), payload, nodeId) && ok;
@@ -139,7 +121,7 @@
 
     @Override
     public boolean unicast(ClusterMessage message, NodeId toNodeId) throws IOException {
-        return unicast(message.subject(), SERIALIZER.encode(message), toNodeId);
+        return unicast(message.subject(), message.getBytes(), toNodeId);
     }
 
     private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
@@ -170,7 +152,7 @@
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
         try {
-            return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
+            return messagingService.sendAndReceive(nodeEp, message.subject().value(), message.getBytes());
 
         } catch (IOException e) {
             log.trace("Failed interaction with remote nodeId: " + toNodeId, e);
@@ -209,7 +191,7 @@
         public void handle(Message message) {
             final ClusterMessage clusterMessage;
             try {
-                clusterMessage = SERIALIZER.decode(message.payload());
+                clusterMessage = ClusterMessage.fromBytes(message.payload());
             } catch (Exception e) {
                 log.error("Failed decoding {}", message, e);
                 throw e;
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index f11a513..317f01a 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -87,9 +87,7 @@
             .build();
 
     private final LoadingCache<String, Long> messageTypeLookupCache = CacheBuilder.newBuilder()
-            .softValues()
             .build(new CacheLoader<String, Long>() {
-
                 @Override
                 public Long load(String type) {
                     return hashToLong(type);
@@ -171,6 +169,10 @@
     }
 
     protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
+        if (ep.equals(localEp)) {
+            dispatchLocally(message);
+            return;
+        }
         Channel channel = null;
         try {
             try {
@@ -329,29 +331,7 @@
 
         @Override
         protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
-            long type = message.type();
-            if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
-                try {
-                    SettableFuture<byte[]> futureResponse =
-                        NettyMessagingService.this.responseFutures.getIfPresent(message.id());
-                    if (futureResponse != null) {
-                        futureResponse.set(message.payload());
-                    } else {
-                        log.warn("Received a reply for message id:[{}]. "
-                                + " from {}. But was unable to locate the"
-                                + " request handle", message.id(), message.sender());
-                    }
-                } finally {
-                    NettyMessagingService.this.responseFutures.invalidate(message.id());
-                }
-                return;
-            }
-            MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
-            if (handler != null) {
-                handler.handle(message);
-            } else {
-                log.debug("No handler registered for {}", type);
-            }
+            dispatchLocally(message);
         }
 
         @Override
@@ -361,6 +341,32 @@
         }
     }
 
+    private void dispatchLocally(InternalMessage message) throws IOException {
+        long type = message.type();
+        if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
+            try {
+                SettableFuture<byte[]> futureResponse =
+                    NettyMessagingService.this.responseFutures.getIfPresent(message.id());
+                if (futureResponse != null) {
+                    futureResponse.set(message.payload());
+                } else {
+                    log.warn("Received a reply for message id:[{}]. "
+                            + " from {}. But was unable to locate the"
+                            + " request handle", message.id(), message.sender());
+                }
+            } finally {
+                NettyMessagingService.this.responseFutures.invalidate(message.id());
+            }
+            return;
+        }
+        MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
+        if (handler != null) {
+            handler.handle(message);
+        } else {
+            log.debug("No handler registered for {}", type);
+        }
+    }
+
     /**
      * Returns the md5 hash of the specified input string as a long.
      * @param input input string.