Working on the cluster i/o
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
new file mode 100644
index 0000000..0970726
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
@@ -0,0 +1,67 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.onos.cluster.DefaultControllerNode;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Stream for transferring messages between two cluster members.
+ */
+public class ClusterMessageStream extends MessageStream<ClusterMessage> {
+
+ private static final int COMM_BUFFER_SIZE = 32 * 1024;
+ private static final int COMM_IDLE_TIME = 500;
+
+ private DefaultControllerNode node;
+ private SerializationService serializationService;
+
+ /**
+ * Creates a message stream associated with the specified IO loop and
+ * backed by the given byte channel.
+ *
+ * @param serializationService service for encoding/decoding messages
+ * @param loop IO loop
+ * @param byteChannel backing byte channel
+ */
+ public ClusterMessageStream(SerializationService serializationService,
+ IOLoop<ClusterMessage, ?> loop,
+ ByteChannel byteChannel) {
+ super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
+ this.serializationService = serializationService;
+ }
+
+ /**
+ * Returns the node with which this stream is associated.
+ *
+ * @return controller node
+ */
+ public DefaultControllerNode node() {
+ return node;
+ }
+
+ /**
+ * Sets the node with which this stream is affiliated.
+ *
+ * @param node controller node
+ */
+ public void setNode(DefaultControllerNode node) {
+ checkState(this.node == null, "Stream is already bound to a node");
+ this.node = node;
+ }
+
+ @Override
+ protected ClusterMessage read(ByteBuffer buffer) {
+ return serializationService.decode(buffer);
+ }
+
+ @Override
+ protected void write(ClusterMessage message, ByteBuffer buffer) {
+ serializationService.encode(message, buffer);
+ }
+
+}