Adding fingerprints to avoid interference between clusters.
Change-Id: I5e5278916f8b9b900d7d403b6d08f1f66a866fb2
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
index d61d7dc..ca6f9c1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -48,7 +48,8 @@
public void activate() throws Exception {
ControllerNode localNode = clusterMetadataService.getLocalNode();
getTlsParameters();
- super.start(new Endpoint(localNode.ip(), localNode.tcpPort()));
+ super.start(clusterMetadataService.getClusterMetadata().getName().hashCode(),
+ new Endpoint(localNode.ip(), localNode.tcpPort()));
log.info("Started");
}
diff --git a/tools/test/bin/onos-gen-partitions b/tools/test/bin/onos-gen-partitions
index 35195b0..bbf6f93 100755
--- a/tools/test/bin/onos-gen-partitions
+++ b/tools/test/bin/onos-gen-partitions
@@ -1,19 +1,21 @@
-#!/usr/bin/env python
-'''
+#!/usr/bin/env python
+"""
Generate the partitions json file from the $OC* environment variables
-
+
Usage: onos-gen-partitions [output file]
If output file is not provided, the json is written to stdout.
-'''
+"""
from os import environ
from collections import deque, OrderedDict
import re
import json
import sys
+import random
+import string
-convert = lambda text: int(text) if text.isdigit() else text.lower()
-alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
+convert = lambda text: int(text) if text.isdigit() else text.lower()
+alphanum_key = lambda key: [convert(c) for c in re.split('([0-9]+)', key)]
def get_OC_vars():
vars = []
@@ -42,10 +44,13 @@
vars = get_OC_vars()
nodes = get_nodes(vars)
partitions = generate_permutations([v.get('id') for v in nodes], 3)
+ name = 0
+ for node in nodes:
+ name = name ^ node['ip']
data = {
- 'name': 'default',
+ 'name': name,
'nodes': nodes,
- 'partitions': partitions
+ 'partitions': partitions
}
output = json.dumps(data, indent=4)
diff --git a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java b/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
index c9fc725..c439301 100644
--- a/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
+++ b/utils/netty/src/main/java/org/onlab/netty/DecoderState.java
@@ -19,6 +19,7 @@
* State transitions a decoder goes through as it is decoding an incoming message.
*/
public enum DecoderState {
+ READ_MESSAGE_PREAMBLE,
READ_MESSAGE_ID,
READ_SENDER_IP_VERSION,
READ_SENDER_IP,
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index c34d3cc..af52a41 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -16,6 +16,7 @@
package org.onlab.netty;
import static com.google.common.base.Preconditions.checkState;
+
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
@@ -37,7 +38,9 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private final int correctPreamble;
private long messageId;
+ private int preamble;
private Version ipVersion;
private IpAddress senderIp;
private int senderPort;
@@ -45,8 +48,9 @@
private String messageType;
private int contentLength;
- public MessageDecoder() {
- super(DecoderState.READ_MESSAGE_ID);
+ public MessageDecoder(int correctPreamble) {
+ super(DecoderState.READ_MESSAGE_PREAMBLE);
+ this.correctPreamble = correctPreamble;
}
@Override
@@ -56,6 +60,12 @@
List<Object> out) throws Exception {
switch (state()) {
+ case READ_MESSAGE_PREAMBLE:
+ preamble = buffer.readInt();
+ if (preamble != correctPreamble) {
+ throw new IllegalStateException("This message had an incorrect preamble.");
+ }
+ checkpoint(DecoderState.READ_MESSAGE_ID);
case READ_MESSAGE_ID:
messageId = buffer.readLong();
checkpoint(DecoderState.READ_SENDER_IP_VERSION);
@@ -63,9 +73,9 @@
ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
checkpoint(DecoderState.READ_SENDER_IP);
case READ_SENDER_IP:
- byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
- buffer.readBytes(octects);
- senderIp = IpAddress.valueOf(ipVersion, octects);
+ byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
+ buffer.readBytes(octets);
+ senderIp = IpAddress.valueOf(ipVersion, octets);
checkpoint(DecoderState.READ_SENDER_PORT);
case READ_SENDER_PORT:
senderPort = buffer.readInt();
@@ -82,15 +92,15 @@
contentLength = buffer.readInt();
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
+ //TODO Perform a sanity check on the size before allocating
byte[] payload = new byte[contentLength];
buffer.readBytes(payload);
- InternalMessage message = new InternalMessage(
- messageId,
+ InternalMessage message = new InternalMessage(messageId,
new Endpoint(senderIp, senderPort),
messageType,
payload);
out.add(message);
- checkpoint(DecoderState.READ_MESSAGE_ID);
+ checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
break;
default:
checkState(false, "Must not be here");
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 2b7784f..c74c1de 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -36,6 +36,13 @@
@Sharable
public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
+ private final int preamble;
+
+ public MessageEncoder(int preamble) {
+ super();
+ this.preamble = preamble;
+ }
+
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
@@ -44,6 +51,8 @@
InternalMessage message,
ByteBuf out) throws Exception {
+ out.writeInt(this.preamble);
+
// write message id
out.writeLong(message.id());
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index 1cd7ca7..2dda747 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -74,6 +74,7 @@
private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
private Endpoint localEp;
+ private int preamble;
private final AtomicBoolean started = new AtomicBoolean(false);
private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
private final AtomicLong messageIdGenerator = new AtomicLong(0);
@@ -123,11 +124,12 @@
clientChannelClass = NioSocketChannel.class;
}
- public void start(Endpoint localEp) throws Exception {
+ public void start(int preamble, Endpoint localEp) throws Exception {
if (started.get()) {
log.warn("Already running at local endpoint: {}", localEp);
return;
}
+ this.preamble = preamble;
this.localEp = localEp;
channels.setLifo(true);
channels.setTestOnBorrow(true);
@@ -324,7 +326,7 @@
private class SslServerCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder();
+ private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
@@ -351,7 +353,7 @@
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(serverSslEngine))
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder())
+ .addLast("decoder", new MessageDecoder(preamble))
.addLast("handler", dispatcher);
}
@@ -360,7 +362,7 @@
private class SslClientCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder();
+ private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
@@ -386,7 +388,7 @@
channel.pipeline().addLast("ssl", new io.netty.handler.ssl.SslHandler(clientSslEngine))
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder())
+ .addLast("decoder", new MessageDecoder(preamble))
.addLast("handler", dispatcher);
}
@@ -395,13 +397,13 @@
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder();
+ private final ChannelHandler encoder = new MessageEncoder(preamble);
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder())
+ .addLast("decoder", new MessageDecoder(preamble))
.addLast("handler", dispatcher);
}
}
@@ -424,7 +426,6 @@
context.close();
}
}
-
private void dispatchLocally(InternalMessage message) throws IOException {
String type = message.type();
if (REPLY_MESSAGE_TYPE.equals(type)) {