Limit the amont of work that happens on netty event loop threads.
Currently we are kryo serializing/deserializing the message envelope which can potentially limit throughput.
Change-Id: I0ae9dab53bbb765b7618ceaefda1edf4f77b0b59
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 1469433..5c20718 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -76,7 +76,7 @@
@Activate
public void activate() {
ControllerNode localNode = clusterService.getLocalNode();
- NettyMessagingService netty = new NettyMessagingService(localNode.ip().toString(), localNode.tcpPort());
+ NettyMessagingService netty = new NettyMessagingService(localNode.ip(), localNode.tcpPort());
// FIXME: workaround until it becomes a service.
try {
netty.activate();
@@ -143,7 +143,7 @@
private boolean unicast(MessageSubject subject, byte[] payload, NodeId toNodeId) throws IOException {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
+ Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
try {
messagingService.sendAsync(nodeEp, subject.value(), payload);
return true;
@@ -166,7 +166,7 @@
public ListenableFuture<byte[]> sendAndReceive(ClusterMessage message, NodeId toNodeId) throws IOException {
ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
- Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
+ Endpoint nodeEp = new Endpoint(node.ip(), node.tcpPort());
try {
return messagingService.sendAndReceive(nodeEp, message.subject().value(), SERIALIZER.encode(message));
diff --git a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java b/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
index c5bb4cb..dc885aa 100644
--- a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
+++ b/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
@@ -19,9 +19,11 @@
* State transitions a decoder goes through as it is decoding an incoming message.
*/
public enum DecoderState {
- READ_HEADER_VERSION,
- READ_PREAMBLE,
+ READ_MESSAGE_ID,
+ READ_SENDER_IP_VERSION,
+ READ_SENDER_IP,
+ READ_SENDER_PORT,
+ READ_MESSAGE_TYPE,
READ_CONTENT_LENGTH,
- READ_SERIALIZER_VERSION,
READ_CONTENT
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
index 50bc58d..ecbb08f 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -15,8 +15,12 @@
*/
package org.onlab.netty;
+import static com.google.common.base.Preconditions.*;
+
import java.util.Objects;
+import org.onlab.packet.IpAddress;
+
import com.google.common.base.MoreObjects;
/**
@@ -25,15 +29,15 @@
public final class Endpoint {
private final int port;
- private final String host;
+ private final IpAddress ip;
- public Endpoint(String host, int port) {
- this.host = host;
+ public Endpoint(IpAddress host, int port) {
+ this.ip = checkNotNull(host);
this.port = port;
}
- public String host() {
- return host;
+ public IpAddress host() {
+ return ip;
}
public int port() {
@@ -43,14 +47,14 @@
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("host", host)
+ .add("ip", ip)
.add("port", port)
.toString();
}
@Override
public int hashCode() {
- return Objects.hash(host, port);
+ return Objects.hash(ip, port);
}
@Override
@@ -66,6 +70,6 @@
}
Endpoint that = (Endpoint) obj;
return Objects.equals(this.port, that.port) &&
- Objects.equals(this.host, that.host);
+ Objects.equals(this.ip, that.ip);
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index ba3c896..f9d4e0b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -27,18 +27,19 @@
*/
public final class InternalMessage implements Message {
- public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
+ public static final long REPLY_MESSAGE_TYPE =
+ NettyMessagingService.hashToLong("NETTY_MESSAGING_REQUEST_REPLY");
private long id;
private Endpoint sender;
- private String type;
+ private long type;
private byte[] payload;
private transient NettyMessagingService messagingService;
// Must be created using the Builder.
private InternalMessage() {}
- InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
+ InternalMessage(long id, Endpoint sender, long type, byte[] payload) {
this.id = id;
this.sender = sender;
this.type = type;
@@ -49,7 +50,7 @@
return id;
}
- public String type() {
+ public long type() {
return type;
}
@@ -103,7 +104,7 @@
return this;
}
- public Builder withType(String type) {
+ public Builder withType(long type) {
message.type = type;
return this;
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
deleted file mode 100644
index 6118c49..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2014 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import org.onlab.util.KryoNamespace;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import java.nio.ByteBuffer;
-
-/**
- * Kryo Serializer.
- */
-public class KryoSerializer {
-
- private KryoNamespace serializerPool;
-
- public KryoSerializer() {
- setupKryoPool();
- }
-
- /**
- * Sets up the common serialzers pool.
- */
- protected void setupKryoPool() {
- serializerPool = KryoNamespace.newBuilder()
- .register(byte[].class)
- .register(new InternalMessageSerializer(), InternalMessage.class)
- .register(new EndPointSerializer(), Endpoint.class)
- .build();
- }
-
-
- public <T> T decode(byte[] data) {
- return serializerPool.deserialize(data);
- }
-
- public byte[] encode(Object payload) {
- return serializerPool.serialize(payload);
- }
-
- public <T> T decode(ByteBuffer buffer) {
- return serializerPool.deserialize(buffer);
- }
-
- public void encode(Object obj, ByteBuffer buffer) {
- serializerPool.serialize(obj, buffer);
- }
-
- public static final class InternalMessageSerializer
- extends Serializer<InternalMessage> {
-
- @Override
- public void write(Kryo kryo, Output output, InternalMessage object) {
- output.writeLong(object.id());
- kryo.writeClassAndObject(output, object.sender());
- output.writeString(object.type());
- output.writeInt(object.payload().length, true);
- output.writeBytes(object.payload());
- }
-
- @Override
- public InternalMessage read(Kryo kryo, Input input,
- Class<InternalMessage> type) {
- long id = input.readLong();
- Endpoint sender = (Endpoint) kryo.readClassAndObject(input);
- String msgtype = input.readString();
- int length = input.readInt(true);
- byte[] payload = input.readBytes(length);
- return new InternalMessage(id, sender, msgtype, payload);
- }
-
- }
-
- public static final class EndPointSerializer extends Serializer<Endpoint> {
-
- @Override
- public void write(Kryo kryo, Output output, Endpoint object) {
- output.writeString(object.host());
- output.writeInt(object.port());
- }
-
- @Override
- public Endpoint read(Kryo kryo, Input input, Class<Endpoint> type) {
- String host = input.readString();
- int port = input.readInt();
- return new Endpoint(host, port);
- }
- }
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 51f8dd3..1697539 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -20,9 +20,10 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
-import java.util.Arrays;
import java.util.List;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpAddress.Version;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,12 +36,15 @@
private final NettyMessagingService messagingService;
- private static final KryoSerializer SERIALIZER = new KryoSerializer();
-
+ private long messageId;
+ private Version ipVersion;
+ private IpAddress senderIp;
+ private int senderPort;
private int contentLength;
+ private long messageType;
public MessageDecoder(NettyMessagingService messagingService) {
- super(DecoderState.READ_HEADER_VERSION);
+ super(DecoderState.READ_MESSAGE_ID);
this.messagingService = messagingService;
}
@@ -51,27 +55,37 @@
List<Object> out) throws Exception {
switch (state()) {
- case READ_HEADER_VERSION:
- int headerVersion = buffer.readInt();
- checkState(headerVersion == MessageEncoder.HEADER_VERSION, "Unexpected header version");
- checkpoint(DecoderState.READ_PREAMBLE);
- case READ_PREAMBLE:
- byte[] preamble = new byte[MessageEncoder.PREAMBLE.length];
- buffer.readBytes(preamble);
- checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
+ case READ_MESSAGE_ID:
+ messageId = buffer.readLong();
+ checkpoint(DecoderState.READ_SENDER_IP_VERSION);
+ case READ_SENDER_IP_VERSION:
+ ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
+ checkpoint(DecoderState.READ_SENDER_IP);
+ case READ_SENDER_IP:
+ byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
+ buffer.readBytes(octects);
+ senderIp = IpAddress.valueOf(ipVersion, octects);
+ checkpoint(DecoderState.READ_SENDER_PORT);
+ case READ_SENDER_PORT:
+ senderPort = buffer.readInt();
+ checkpoint(DecoderState.READ_MESSAGE_TYPE);
+ case READ_MESSAGE_TYPE:
+ messageType = buffer.readLong();
checkpoint(DecoderState.READ_CONTENT_LENGTH);
case READ_CONTENT_LENGTH:
contentLength = buffer.readInt();
- checkpoint(DecoderState.READ_SERIALIZER_VERSION);
- case READ_SERIALIZER_VERSION:
- int serializerVersion = buffer.readInt();
- checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
- InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
+ byte[] payload = new byte[contentLength];
+ buffer.readBytes(payload);
+ InternalMessage message = new InternalMessage(
+ messageId,
+ new Endpoint(senderIp, senderPort),
+ messageType,
+ payload);
message.setMessagingService(messagingService);
out.add(message);
- checkpoint(DecoderState.READ_HEADER_VERSION);
+ checkpoint(DecoderState.READ_MESSAGE_ID);
break;
default:
checkState(false, "Must not be here");
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 88bec40..c05eae2 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -15,17 +15,18 @@
*/
package org.onlab.netty;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
+import java.io.IOException;
+
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpAddress.Version;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Encode InternalMessage out into a byte buffer.
*/
@@ -34,34 +35,36 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- // onosiscool in ascii
- static final byte[] PREAMBLE = "onosiscool".getBytes(StandardCharsets.US_ASCII);
- public static final int HEADER_VERSION = 1;
- public static final int SERIALIZER_VERSION = 1;
-
-
- private static final KryoSerializer SERIALIZER = new KryoSerializer();
-
@Override
protected void encode(
ChannelHandlerContext context,
InternalMessage message,
ByteBuf out) throws Exception {
- // write version
- out.writeInt(HEADER_VERSION);
+ // write message id
+ out.writeLong(message.id());
- // write preamble
- out.writeBytes(PREAMBLE);
+ Endpoint sender = message.sender();
- byte[] payload = SERIALIZER.encode(message);
+ IpAddress senderIp = sender.host();
+ if (senderIp.version() == Version.INET) {
+ out.writeByte(0);
+ } else {
+ out.writeByte(1);
+ }
+ out.writeBytes(senderIp.toOctets());
+
+ // write sender port
+ out.writeInt(sender.port());
+
+ // write message type.
+ out.writeLong(message.type());
+
+ byte[] payload = message.payload();
// write payload length
out.writeInt(payload.length);
- // write payloadSerializer version
- out.writeInt(SERIALIZER_VERSION);
-
// write payload.
out.writeBytes(payload);
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index b611fad..69806b1 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -37,6 +37,7 @@
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -46,13 +47,18 @@
import org.apache.commons.pool.KeyedPoolableObjectFactory;
import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.onlab.packet.IpAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Charsets;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
+import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -64,7 +70,7 @@
private final Logger log = LoggerFactory.getLogger(getClass());
private final Endpoint localEp;
- private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+ private final ConcurrentMap<Long, MessageHandler> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
@@ -78,6 +84,17 @@
}
})
.build();
+
+ private final LoadingCache<String, Long> messageTypeLookupCache = CacheBuilder.newBuilder()
+ .softValues()
+ .build(new CacheLoader<String, Long>() {
+
+ @Override
+ public Long load(String type) {
+ return hashToLong(type);
+ }
+ });
+
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
@@ -103,7 +120,7 @@
clientChannelClass = NioSocketChannel.class;
}
- public NettyMessagingService(String ip, int port) {
+ public NettyMessagingService(IpAddress ip, int port) {
localEp = new Endpoint(ip, port);
}
@@ -113,7 +130,7 @@
public NettyMessagingService(int port) {
try {
- localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
+ localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
} catch (UnknownHostException e) {
// Cannot resolve the local host, something is very wrong. Bailing out.
throw new IllegalStateException("Cannot resolve local host", e);
@@ -146,7 +163,7 @@
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageIdGenerator.incrementAndGet())
.withSender(localEp)
- .withType(type)
+ .withType(messageTypeLookupCache.getUnchecked(type))
.withPayload(payload)
.build();
sendAsync(ep, message);
@@ -178,7 +195,7 @@
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
.withSender(localEp)
- .withType(type)
+ .withType(messageTypeLookupCache.getUnchecked(type))
.withPayload(payload)
.build();
try {
@@ -192,7 +209,7 @@
@Override
public void registerHandler(String type, MessageHandler handler) {
- handlers.putIfAbsent(type, handler);
+ handlers.putIfAbsent(hashToLong(type), handler);
}
@Override
@@ -200,7 +217,7 @@
handlers.remove(type);
}
- private MessageHandler getMessageHandler(String type) {
+ private MessageHandler getMessageHandler(long type) {
return handlers.get(type);
}
@@ -245,7 +262,7 @@
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new OnosCommunicationChannelInitializer());
// Start the client.
- ChannelFuture f = bootstrap.connect(ep.host(), ep.port()).sync();
+ ChannelFuture f = bootstrap.connect(ep.host().toString(), ep.port()).sync();
return f.channel();
}
@@ -295,8 +312,8 @@
@Override
protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
- String type = message.type();
- if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
+ long type = message.type();
+ if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
try {
SettableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
@@ -326,4 +343,13 @@
context.close();
}
}
-}
+
+ /**
+ * Returns the md5 hash of the specified input string as a long.
+ * @param input input string.
+ * @return md5 hash as long.
+ */
+ public static long hashToLong(String input) {
+ return Hashing.md5().hashBytes(input.getBytes(Charsets.UTF_8)).asLong();
+ }
+}
\ No newline at end of file
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
index ab816b2..3eef484 100644
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
@@ -15,15 +15,16 @@
*/
package org.onlab.netty;
+import static org.junit.Assert.assertArrayEquals;
+
+import java.net.InetAddress;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomUtils;
-
-import static org.junit.Assert.*;
-
import org.junit.Ignore;
import org.junit.Test;
+import org.onlab.packet.IpAddress;
/**
* Simple ping-pong test that exercises NettyMessagingService.
@@ -40,7 +41,9 @@
ponger.activate();
ponger.registerHandler("echo", new EchoHandler());
byte[] payload = RandomUtils.nextBytes(100);
- Future<byte[]> responseFuture = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
+ Future<byte[]> responseFuture =
+ pinger.sendAndReceive(
+ new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), 9086), "echo", payload);
assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));
} finally {
pinger.deactivate();