blob: b003945620fb7c944ffa2bffa8d553d527b0f1b3 [file] [log] [blame]
tom73094832014-09-29 13:47:08 -07001package org.onlab.onos.store.cluster.impl;
2
3import org.onlab.nio.IOLoop;
4import org.onlab.nio.MessageStream;
5import org.onlab.onos.cluster.DefaultControllerNode;
6
7import java.nio.ByteBuffer;
8import java.nio.channels.ByteChannel;
9
10import static com.google.common.base.Preconditions.checkState;
11
12/**
13 * Stream for transferring TLV messages between cluster members.
14 */
15public class TLVMessageStream extends MessageStream<TLVMessage> {
16
17 public static final int METADATA_LENGTH = 16; // 8 + 4 + 4
18
19 private static final int LENGTH_OFFSET = 12;
20 private static final long MARKER = 0xfeedcafecafefeedL;
21
22 private DefaultControllerNode node;
23
24 /**
25 * Creates a message stream associated with the specified IO loop and
26 * backed by the given byte channel.
27 *
28 * @param loop IO loop
29 * @param byteChannel backing byte channel
30 * @param bufferSize size of the backing byte buffers
31 * @param maxIdleMillis maximum number of millis the stream can be idle
32 */
33 protected TLVMessageStream(IOLoop<TLVMessage, ?> loop, ByteChannel byteChannel,
34 int bufferSize, int maxIdleMillis) {
35 super(loop, byteChannel, bufferSize, maxIdleMillis);
36 }
37
38 /**
39 * Returns the node with which this stream is associated.
40 *
41 * @return controller node
42 */
43 DefaultControllerNode node() {
44 return node;
45 }
46
47 /**
48 * Sets the node with which this stream is affiliated.
49 *
50 * @param node controller node
51 */
52 void setNode(DefaultControllerNode node) {
53 checkState(this.node == null, "Stream is already bound to a node");
54 this.node = node;
55 }
56
57 @Override
58 protected TLVMessage read(ByteBuffer buffer) {
59 // Do we have enough bytes to read the header? If not, bail.
60 if (buffer.remaining() < METADATA_LENGTH) {
61 return null;
62 }
63
64 // Peek at the length and if we have enough to read the entire message
65 // go ahead, otherwise bail.
66 int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
67 if (buffer.remaining() < length) {
68 return null;
69 }
70
71 // At this point, we have enough data to read a complete message.
72 long marker = buffer.getLong();
73 checkState(marker == MARKER, "Incorrect message marker");
74
75 int type = buffer.getInt();
76 length = buffer.getInt();
77
78 // TODO: add deserialization hook here
79 byte[] data = new byte[length - METADATA_LENGTH];
80 buffer.get(data);
81
82 return new TLVMessage(type, data);
83 }
84
85 @Override
86 protected void write(TLVMessage message, ByteBuffer buffer) {
87 buffer.putLong(MARKER);
88 buffer.putInt(message.type());
89 buffer.putInt(message.length());
90
91 // TODO: add serialization hook here
92 buffer.put(message.data());
93 }
94
95}