Initial check-in for new p2p cluster messaging; to be refactored.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessageStream.java
new file mode 100644
index 0000000..b003945
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/TLVMessageStream.java
@@ -0,0 +1,95 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.onos.cluster.DefaultControllerNode;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Stream for transferring TLV messages between cluster members.
+ */
+public class TLVMessageStream extends MessageStream<TLVMessage> {
+
+ public static final int METADATA_LENGTH = 16; // 8 + 4 + 4
+
+ private static final int LENGTH_OFFSET = 12;
+ private static final long MARKER = 0xfeedcafecafefeedL;
+
+ private DefaultControllerNode node;
+
+ /**
+ * Creates a message stream associated with the specified IO loop and
+ * backed by the given byte channel.
+ *
+ * @param loop IO loop
+ * @param byteChannel backing byte channel
+ * @param bufferSize size of the backing byte buffers
+ * @param maxIdleMillis maximum number of millis the stream can be idle
+ */
+ protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel,
+ int bufferSize, int maxIdleMillis) {
+ super(loop, byteChannel, bufferSize, maxIdleMillis);
+ }
+
+ /**
+ * Returns the node with which this stream is associated.
+ *
+ * @return controller node
+ */
+ DefaultControllerNode node() {
+ return node;
+ }
+
+ /**
+ * Sets the node with which this stream is affiliated.
+ *
+ * @param node controller node
+ */
+ void setNode(DefaultControllerNode node) {
+ checkState(this.node == null, "Stream is already bound to a node");
+ this.node = node;
+ }
+
+ @Override
+ protected TLVMessage read(ByteBuffer buffer) {
+ // Do we have enough bytes to read the header? If not, bail.
+ if (buffer.remaining() < METADATA_LENGTH) {
+ return null;
+ }
+
+ // Peek at the length and if we have enough to read the entire message
+ // go ahead, otherwise bail.
+ int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
+ if (buffer.remaining() < length) {
+ return null;
+ }
+
+ // At this point, we have enough data to read a complete message.
+ long marker = buffer.getLong();
+ checkState(marker == MARKER, "Incorrect message marker");
+
+ int type = buffer.getInt();
+ length = buffer.getInt();
+
+ // TODO: add deserialization hook here
+ byte[] data = new byte[length - METADATA_LENGTH];
+ buffer.get(data);
+
+ return new TLVMessage(type, data);
+ }
+
+ @Override
+ protected void write(TLVMessage message, ByteBuffer buffer) {
+ buffer.putLong(MARKER);
+ buffer.putInt(message.type());
+ buffer.putInt(message.length());
+
+ // TODO: add serialization hook here
+ buffer.put(message.data());
+ }
+
+}