Working on the cluster i/o
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
new file mode 100644
index 0000000..0970726
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
@@ -0,0 +1,67 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.onos.cluster.DefaultControllerNode;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Stream for transferring messages between two cluster members.
+ */
+public class ClusterMessageStream extends MessageStream<ClusterMessage> {
+
+    private static final int COMM_BUFFER_SIZE = 32 * 1024;
+    private static final int COMM_IDLE_TIME = 500;
+
+    private DefaultControllerNode node;
+    private SerializationService serializationService;
+
+    /**
+     * Creates a message stream associated with the specified IO loop and
+     * backed by the given byte channel.
+     *
+     * @param serializationService service for encoding/decoding messages
+     * @param loop                 IO loop
+     * @param byteChannel          backing byte channel
+     */
+    public ClusterMessageStream(SerializationService serializationService,
+                                IOLoop<ClusterMessage, ?> loop,
+                                ByteChannel byteChannel) {
+        super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
+        this.serializationService = serializationService;
+    }
+
+    /**
+     * Returns the node with which this stream is associated.
+     *
+     * @return controller node
+     */
+    public DefaultControllerNode node() {
+        return node;
+    }
+
+    /**
+     * Sets the node with which this stream is affiliated.
+     *
+     * @param node controller node
+     */
+    public void setNode(DefaultControllerNode node) {
+        checkState(this.node == null, "Stream is already bound to a node");
+        this.node = node;
+    }
+
+    @Override
+    protected ClusterMessage read(ByteBuffer buffer) {
+        return serializationService.decode(buffer);
+    }
+
+    @Override
+    protected void write(ClusterMessage message, ByteBuffer buffer) {
+        serializationService.encode(message, buffer);
+    }
+
+}