Changed netty message type to String from Long to avoid potential collisions
Change-Id: I42014a920917a8022744ae15a9fefa6bae6890a7
diff --git a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java b/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
index dc885aa..1850183 100644
--- a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
+++ b/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
@@ -23,6 +23,7 @@
READ_SENDER_IP_VERSION,
READ_SENDER_IP,
READ_SENDER_PORT,
+ READ_MESSAGE_TYPE_LENGTH,
READ_MESSAGE_TYPE,
READ_CONTENT_LENGTH,
READ_CONTENT
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index f9d4e0b..ba3c896 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -27,19 +27,18 @@
*/
public final class InternalMessage implements Message {
- public static final long REPLY_MESSAGE_TYPE =
- NettyMessagingService.hashToLong("NETTY_MESSAGING_REQUEST_REPLY");
+ public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
private long id;
private Endpoint sender;
- private long type;
+ private String type;
private byte[] payload;
private transient NettyMessagingService messagingService;
// Must be created using the Builder.
private InternalMessage() {}
- InternalMessage(long id, Endpoint sender, long type, byte[] payload) {
+ InternalMessage(long id, Endpoint sender, String type, byte[] payload) {
this.id = id;
this.sender = sender;
this.type = type;
@@ -50,7 +49,7 @@
return id;
}
- public long type() {
+ public String type() {
return type;
}
@@ -104,7 +103,7 @@
return this;
}
- public Builder withType(long type) {
+ public Builder withType(String type) {
message.type = type;
return this;
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index 1697539..edb2d52 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -27,6 +27,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Charsets;
+
/**
* Decoder for inbound messages.
*/
@@ -40,8 +42,9 @@
private Version ipVersion;
private IpAddress senderIp;
private int senderPort;
+ private int messageTypeLength;
+ private String messageType;
private int contentLength;
- private long messageType;
public MessageDecoder(NettyMessagingService messagingService) {
super(DecoderState.READ_MESSAGE_ID);
@@ -68,9 +71,14 @@
checkpoint(DecoderState.READ_SENDER_PORT);
case READ_SENDER_PORT:
senderPort = buffer.readInt();
+ checkpoint(DecoderState.READ_MESSAGE_TYPE_LENGTH);
+ case READ_MESSAGE_TYPE_LENGTH:
+ messageTypeLength = buffer.readInt();
checkpoint(DecoderState.READ_MESSAGE_TYPE);
case READ_MESSAGE_TYPE:
- messageType = buffer.readLong();
+ byte[] messageTypeBytes = new byte[messageTypeLength];
+ buffer.readBytes(messageTypeBytes);
+ messageType = new String(messageTypeBytes, Charsets.UTF_8);
checkpoint(DecoderState.READ_CONTENT_LENGTH);
case READ_CONTENT_LENGTH:
contentLength = buffer.readInt();
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index c05eae2..14c7b18 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -27,6 +27,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Charsets;
+
/**
* Encode InternalMessage out into a byte buffer.
*/
@@ -57,8 +59,13 @@
// write sender port
out.writeInt(sender.port());
- // write message type.
- out.writeLong(message.type());
+ byte[] messageTypeBytes = message.type().getBytes(Charsets.UTF_8);
+
+ // write length of message type
+ out.writeInt(messageTypeBytes.length);
+
+ // write message type bytes
+ out.writeBytes(messageTypeBytes);
byte[] payload = message.payload();
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index 317f01a..6628bb2 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -52,14 +52,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Charsets;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
-import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
@@ -71,7 +67,7 @@
private final Logger log = LoggerFactory.getLogger(getClass());
private final Endpoint localEp;
- private final ConcurrentMap<Long, MessageHandler> handlers = new ConcurrentHashMap<>();
+ private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
private final Cache<Long, SettableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
@@ -86,14 +82,6 @@
})
.build();
- private final LoadingCache<String, Long> messageTypeLookupCache = CacheBuilder.newBuilder()
- .build(new CacheLoader<String, Long>() {
- @Override
- public Long load(String type) {
- return hashToLong(type);
- }
- });
-
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
@@ -162,7 +150,7 @@
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageIdGenerator.incrementAndGet())
.withSender(localEp)
- .withType(messageTypeLookupCache.getUnchecked(type))
+ .withType(type)
.withPayload(payload)
.build();
sendAsync(ep, message);
@@ -198,7 +186,7 @@
InternalMessage message = new InternalMessage.Builder(this)
.withId(messageId)
.withSender(localEp)
- .withType(messageTypeLookupCache.getUnchecked(type))
+ .withType(type)
.withPayload(payload)
.build();
try {
@@ -212,12 +200,12 @@
@Override
public void registerHandler(String type, MessageHandler handler) {
- handlers.putIfAbsent(hashToLong(type), handler);
+ handlers.putIfAbsent(type, handler);
}
@Override
public void registerHandler(String type, MessageHandler handler, ExecutorService executor) {
- handlers.putIfAbsent(hashToLong(type), new MessageHandler() {
+ handlers.putIfAbsent(type, new MessageHandler() {
@Override
public void handle(Message message) throws IOException {
executor.submit(() -> {
@@ -233,10 +221,10 @@
@Override
public void unregisterHandler(String type) {
- handlers.remove(hashToLong(type));
+ handlers.remove(type);
}
- private MessageHandler getMessageHandler(long type) {
+ private MessageHandler getMessageHandler(String type) {
return handlers.get(type);
}
@@ -342,8 +330,8 @@
}
private void dispatchLocally(InternalMessage message) throws IOException {
- long type = message.type();
- if (type == InternalMessage.REPLY_MESSAGE_TYPE) {
+ String type = message.type();
+ if (InternalMessage.REPLY_MESSAGE_TYPE.equals(type)) {
try {
SettableFuture<byte[]> futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
@@ -366,13 +354,4 @@
log.debug("No handler registered for {}", type);
}
}
-
- /**
- * Returns the md5 hash of the specified input string as a long.
- * @param input input string.
- * @return md5 hash as long.
- */
- public static long hashToLong(String input) {
- return Hashing.md5().hashBytes(input.getBytes(Charsets.UTF_8)).asLong();
- }
}
\ No newline at end of file