Adding fingerprints to avoid interference between clusters.

Change-Id: I5e5278916f8b9b900d7d403b6d08f1f66a866fb2
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index c34d3cc..af52a41 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -16,6 +16,7 @@
 package org.onlab.netty;
 
 import static com.google.common.base.Preconditions.checkState;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ReplayingDecoder;
@@ -37,7 +38,9 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private final int correctPreamble;
     private long messageId;
+    private int preamble;
     private Version ipVersion;
     private IpAddress senderIp;
     private int senderPort;
@@ -45,8 +48,9 @@
     private String messageType;
     private int contentLength;
 
-    public MessageDecoder() {
-        super(DecoderState.READ_MESSAGE_ID);
+    public MessageDecoder(int correctPreamble) {
+        super(DecoderState.READ_MESSAGE_PREAMBLE);
+        this.correctPreamble = correctPreamble;
     }
 
     @Override
@@ -56,6 +60,12 @@
             List<Object> out) throws Exception {
 
         switch (state()) {
+        case READ_MESSAGE_PREAMBLE:
+            preamble = buffer.readInt();
+            if (preamble != correctPreamble) {
+                throw new IllegalStateException("This message had an incorrect preamble.");
+            }
+            checkpoint(DecoderState.READ_MESSAGE_ID);
         case READ_MESSAGE_ID:
             messageId = buffer.readLong();
             checkpoint(DecoderState.READ_SENDER_IP_VERSION);
@@ -63,9 +73,9 @@
             ipVersion = buffer.readByte() == 0x0 ? Version.INET : Version.INET6;
             checkpoint(DecoderState.READ_SENDER_IP);
         case READ_SENDER_IP:
-            byte[] octects = new byte[IpAddress.byteLength(ipVersion)];
-            buffer.readBytes(octects);
-            senderIp = IpAddress.valueOf(ipVersion, octects);
+            byte[] octets = new byte[IpAddress.byteLength(ipVersion)];
+            buffer.readBytes(octets);
+            senderIp = IpAddress.valueOf(ipVersion, octets);
             checkpoint(DecoderState.READ_SENDER_PORT);
         case READ_SENDER_PORT:
             senderPort = buffer.readInt();
@@ -82,15 +92,15 @@
             contentLength = buffer.readInt();
             checkpoint(DecoderState.READ_CONTENT);
         case READ_CONTENT:
+            //TODO Perform a sanity check on the size before allocating
             byte[] payload = new byte[contentLength];
             buffer.readBytes(payload);
-            InternalMessage message = new InternalMessage(
-                    messageId,
+            InternalMessage message = new InternalMessage(messageId,
                     new Endpoint(senderIp, senderPort),
                     messageType,
                     payload);
             out.add(message);
-            checkpoint(DecoderState.READ_MESSAGE_ID);
+            checkpoint(DecoderState.READ_MESSAGE_PREAMBLE);
             break;
          default:
             checkState(false, "Must not be here");