Working on the cluster i/o
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
new file mode 100644
index 0000000..87ed221
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
@@ -0,0 +1,46 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+
+import java.util.Set;
+
+/**
+ * Service for assisting communications between controller cluster nodes.
+ */
+public interface ClusterCommunicationService {
+
+    /**
+     * Sends a message to the specified controller node.
+     *
+     * @param message  message to send
+     * @param toNodeId node identifier
+     * @return true if the message was sent sucessfully; false if there is
+     * no stream or if there was an error
+     */
+    boolean send(ClusterMessage message, NodeId toNodeId);
+
+    /**
+     * Adds a new subscriber for the specified message subject.
+     *
+     * @param subject    message subject
+     * @param subscriber message subscriber
+     */
+    void addSubscriber(MessageSubject subject, MessageSubscriber subscriber);
+
+    /**
+     * Removes the specified subscriber from the given message subject.
+     *
+     * @param subject    message subject
+     * @param subscriber message subscriber
+     */
+    void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber);
+
+    /**
+     * Returns the set of subscribers for the specified message subject.
+     *
+     * @param subject message subject
+     * @return set of message subscribers
+     */
+    Set<MessageSubscriber> getSubscribers(MessageSubject subject);
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
new file mode 100644
index 0000000..3033ac9
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -0,0 +1,37 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.nio.AbstractMessage;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Base message for cluster-wide communications.
+ */
+public abstract class ClusterMessage extends AbstractMessage {
+
+    private final MessageSubject subject;
+
+    /**
+     * Creates a cluster message.
+     *
+     * @param subject message subject
+     */
+    protected ClusterMessage(MessageSubject subject) {
+        this.subject = subject;
+    }
+
+    /**
+     * Returns the message subject indicator.
+     *
+     * @return message subject
+     */
+    public MessageSubject subject() {
+        return subject;
+    }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this).add("subject", subject).add("length", length).toString();
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
new file mode 100644
index 0000000..0970726
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
@@ -0,0 +1,67 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.onos.cluster.DefaultControllerNode;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Stream for transferring messages between two cluster members.
+ */
+public class ClusterMessageStream extends MessageStream<ClusterMessage> {
+
+    private static final int COMM_BUFFER_SIZE = 32 * 1024;
+    private static final int COMM_IDLE_TIME = 500;
+
+    private DefaultControllerNode node;
+    private SerializationService serializationService;
+
+    /**
+     * Creates a message stream associated with the specified IO loop and
+     * backed by the given byte channel.
+     *
+     * @param serializationService service for encoding/decoding messages
+     * @param loop                 IO loop
+     * @param byteChannel          backing byte channel
+     */
+    public ClusterMessageStream(SerializationService serializationService,
+                                IOLoop<ClusterMessage, ?> loop,
+                                ByteChannel byteChannel) {
+        super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
+        this.serializationService = serializationService;
+    }
+
+    /**
+     * Returns the node with which this stream is associated.
+     *
+     * @return controller node
+     */
+    public DefaultControllerNode node() {
+        return node;
+    }
+
+    /**
+     * Sets the node with which this stream is affiliated.
+     *
+     * @param node controller node
+     */
+    public void setNode(DefaultControllerNode node) {
+        checkState(this.node == null, "Stream is already bound to a node");
+        this.node = node;
+    }
+
+    @Override
+    protected ClusterMessage read(ByteBuffer buffer) {
+        return serializationService.decode(buffer);
+    }
+
+    @Override
+    protected void write(ClusterMessage message, ByteBuffer buffer) {
+        serializationService.encode(message, buffer);
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/EchoMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/EchoMessage.java
new file mode 100644
index 0000000..d25a341
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/EchoMessage.java
@@ -0,0 +1,37 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+
+/**l
+ * Echo heart-beat message that nodes send to each other.
+ */
+public class EchoMessage extends ClusterMessage {
+
+    private NodeId nodeId;
+
+    // For serialization
+    private EchoMessage() {
+        super(MessageSubject.HELLO);
+        nodeId = null;
+    }
+
+    /**
+     * Creates a new heart-beat echo message.
+     *
+     * @param nodeId    sending node identification
+     */
+    public EchoMessage(NodeId nodeId) {
+        super(MessageSubject.HELLO);
+        nodeId = nodeId;
+    }
+
+    /**
+     * Returns the sending node identifer.
+     *
+     * @return node identifier
+     */
+    public NodeId nodeId() {
+        return nodeId;
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
new file mode 100644
index 0000000..ddc79d3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
@@ -0,0 +1,63 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
+
+/**
+ * Hello message that nodes use to greet each other.
+ */
+public class HelloMessage extends ClusterMessage {
+
+    private NodeId nodeId;
+    private IpPrefix ipAddress;
+    private int tcpPort;
+
+    // For serialization
+    private HelloMessage() {
+        super(MessageSubject.HELLO);
+        nodeId = null;
+        ipAddress = null;
+        tcpPort = 0;
+    }
+
+    /**
+     * Creates a new hello message for the specified end-point data.
+     *
+     * @param nodeId    sending node identification
+     * @param ipAddress sending node IP address
+     * @param tcpPort   sending node TCP port
+     */
+    public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
+        super(MessageSubject.HELLO);
+        nodeId = nodeId;
+        ipAddress = ipAddress;
+        tcpPort = tcpPort;
+    }
+
+    /**
+     * Returns the sending node identifer.
+     *
+     * @return node identifier
+     */
+    public NodeId nodeId() {
+        return nodeId;
+    }
+
+    /**
+     * Returns the sending node IP address.
+     *
+     * @return node IP address
+     */
+    public IpPrefix ipAddress() {
+        return ipAddress;
+    }
+
+    /**
+     * Returns the sending node TCP listen port.
+     *
+     * @return TCP listen port
+     */
+    public int tcpPort() {
+        return tcpPort;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
new file mode 100644
index 0000000..3b888b3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -0,0 +1,14 @@
+package org.onlab.onos.store.cluster.messaging;
+
+/**
+ * Representation of a message subject.
+ */
+public enum MessageSubject {
+
+    /** Represents a first greeting message. */
+    HELLO,
+
+    /** Signifies a heart-beat message. */
+    ECHO
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
new file mode 100644
index 0000000..6b78fec
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
@@ -0,0 +1,15 @@
+package org.onlab.onos.store.cluster.messaging;
+
+/**
+ * Represents a message consumer.
+ */
+public interface MessageSubscriber {
+
+    /**
+     * Receives the specified cluster message.
+     *
+     * @param message message to be received
+     */
+    void receive(ClusterMessage message);
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
new file mode 100644
index 0000000..79e054b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
@@ -0,0 +1,26 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Service for serializing/deserializing intra-cluster messages.
+ */
+public interface SerializationService {
+
+    /**
+     * Decodes the specified byte buffer to obtain a message within.
+     *
+     * @param buffer byte buffer with message(s)
+     * @return parsed message
+     */
+    ClusterMessage decode(ByteBuffer buffer);
+
+    /**
+     * Encodes the specified message into the given byte buffer.
+     *
+     * @param message message to be encoded
+     * @param buffer byte buffer to receive the message data
+     */
+    void encode(ClusterMessage message, ByteBuffer buffer);
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
new file mode 100644
index 0000000..bafb2c3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -0,0 +1,64 @@
+package org.onlab.onos.store.cluster.messaging.impl;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
+import org.onlab.onos.store.cluster.impl.MessageSender;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
+
+import java.util.Set;
+
+/**
+ * Implements the cluster communication services to use by other stores.
+ */
+@Component(immediate = true)
+@Service
+public class ClusterCommunicationManager
+        implements ClusterCommunicationService, CommunicationsDelegate {
+
+    // TODO: use something different that won't require synchronization
+    private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
+    private MessageSender messageSender;
+
+    @Override
+    public boolean send(ClusterMessage message, NodeId toNodeId) {
+        return messageSender.send(toNodeId, message);
+    }
+
+    @Override
+    public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
+        subscribers.put(subject, subscriber);
+    }
+
+    @Override
+    public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
+        subscribers.remove(subject, subscriber);
+    }
+
+    @Override
+    public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
+        return ImmutableSet.copyOf(subscribers.get(subject));
+    }
+
+    @Override
+    public void dispatch(ClusterMessage message) {
+        Set<MessageSubscriber> set = getSubscribers(message.subject());
+        if (set != null) {
+            for (MessageSubscriber subscriber : set) {
+                subscriber.receive(message);
+            }
+        }
+    }
+
+    @Override
+    public void setSender(MessageSender messageSender) {
+        this.messageSender = messageSender;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
new file mode 100644
index 0000000..93c8310
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
@@ -0,0 +1,69 @@
+package org.onlab.onos.store.cluster.messaging.impl;
+
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Factory for parsing messages sent between cluster members.
+ */
+public class MessageSerializer implements SerializationService {
+
+    private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
+    private static final int LENGTH_OFFSET = 12;
+
+    private static final long MARKER = 0xfeedcafebeaddeadL;
+
+    @Override
+    public ClusterMessage decode(ByteBuffer buffer) {
+        try {
+            // Do we have enough bytes to read the header? If not, bail.
+            if (buffer.remaining() < METADATA_LENGTH) {
+                return null;
+            }
+
+            // Peek at the length and if we have enough to read the entire message
+            // go ahead, otherwise bail.
+            int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
+            if (buffer.remaining() < length) {
+                return null;
+            }
+
+            // At this point, we have enough data to read a complete message.
+            long marker = buffer.getLong();
+            checkState(marker == MARKER, "Incorrect message marker");
+
+            int subjectOrdinal = buffer.getInt();
+            MessageSubject subject = MessageSubject.values()[subjectOrdinal];
+            length = buffer.getInt();
+
+            // TODO: sanity checking for length
+            byte[] data = new byte[length - METADATA_LENGTH];
+            buffer.get(data);
+
+            // TODO: add deserialization hook here; for now this hack
+            return null; // actually deserialize
+
+        } catch (Exception e) {
+            // TODO: recover from exceptions by forwarding stream to next marker
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    @Override
+    public void encode(ClusterMessage message, ByteBuffer buffer) {
+        try {
+            int i = 0;
+            // Type based lookup for proper encoder
+        } catch (Exception e) {
+            // TODO: recover from exceptions by forwarding stream to next marker
+            e.printStackTrace();
+        }
+    }
+
+}