Further simplified the store & connection manager relationship.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
index 492afbc..8acc8fe 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
@@ -13,7 +13,6 @@
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
 import org.onlab.onos.store.cluster.messaging.HelloMessage;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
@@ -50,8 +49,8 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-    private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
-    private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
+    private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
+    private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
 
     private static final long START_TIMEOUT = 1000;
     private static final int WORKERS = 3;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
index c096a7d..d442cc8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
@@ -5,7 +5,6 @@
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
 import org.onlab.onos.store.cluster.messaging.HelloMessage;
 import org.onlab.onos.store.cluster.messaging.SerializationService;
 import org.slf4j.Logger;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
new file mode 100644
index 0000000..d182aa1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
@@ -0,0 +1,69 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Stream for transferring messages between two cluster members.
+ */
+public class ClusterMessageStream extends MessageStream<ClusterMessage> {
+
+    private static final int COMM_BUFFER_SIZE = 32 * 1024;
+    private static final int COMM_IDLE_TIME = 500;
+
+    private DefaultControllerNode node;
+    private SerializationService serializationService;
+
+    /**
+     * Creates a message stream associated with the specified IO loop and
+     * backed by the given byte channel.
+     *
+     * @param serializationService service for encoding/decoding messages
+     * @param loop                 IO loop
+     * @param byteChannel          backing byte channel
+     */
+    public ClusterMessageStream(SerializationService serializationService,
+                                IOLoop<ClusterMessage, ?> loop,
+                                ByteChannel byteChannel) {
+        super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
+        this.serializationService = serializationService;
+    }
+
+    /**
+     * Returns the node with which this stream is associated.
+     *
+     * @return controller node
+     */
+    public DefaultControllerNode node() {
+        return node;
+    }
+
+    /**
+     * Sets the node with which this stream is affiliated.
+     *
+     * @param node controller node
+     */
+    public void setNode(DefaultControllerNode node) {
+        checkState(this.node == null, "Stream is already bound to a node");
+        this.node = node;
+    }
+
+    @Override
+    protected ClusterMessage read(ByteBuffer buffer) {
+        return serializationService.decode(buffer);
+    }
+
+    @Override
+    protected void write(ClusterMessage message, ByteBuffer buffer) {
+        serializationService.encode(message, buffer);
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
new file mode 100644
index 0000000..b54f0db
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
@@ -0,0 +1,88 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.HelloMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+import org.onlab.packet.IpPrefix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Factory for parsing messages sent between cluster members.
+ */
+@Component(immediate = true)
+@Service
+public class MessageSerializer implements SerializationService {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
+    private static final int LENGTH_OFFSET = 12;
+
+    private static final long MARKER = 0xfeedcafebeaddeadL;
+
+    @Override
+    public ClusterMessage decode(ByteBuffer buffer) {
+        try {
+            // Do we have enough bytes to read the header? If not, bail.
+            if (buffer.remaining() < METADATA_LENGTH) {
+                return null;
+            }
+
+            // Peek at the length and if we have enough to read the entire message
+            // go ahead, otherwise bail.
+            int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
+            if (buffer.remaining() < length) {
+                return null;
+            }
+
+            // At this point, we have enough data to read a complete message.
+            long marker = buffer.getLong();
+            checkState(marker == MARKER, "Incorrect message marker");
+
+            int subjectOrdinal = buffer.getInt();
+            MessageSubject subject = MessageSubject.values()[subjectOrdinal];
+            length = buffer.getInt();
+
+            // TODO: sanity checking for length
+            byte[] data = new byte[length - METADATA_LENGTH];
+            buffer.get(data);
+
+            // TODO: add deserialization hook here; for now this hack
+            String[] fields = new String(data).split(":");
+            return new HelloMessage(new NodeId(fields[0]), IpPrefix.valueOf(fields[1]), Integer.parseInt(fields[2]));
+
+        } catch (Exception e) {
+            // TODO: recover from exceptions by forwarding stream to next marker
+            log.warn("Unable to decode message due to: " + e);
+        }
+        return null;
+    }
+
+    @Override
+    public void encode(ClusterMessage message, ByteBuffer buffer) {
+        try {
+            HelloMessage helloMessage = (HelloMessage) message;
+            buffer.putLong(MARKER);
+            buffer.putInt(message.subject().ordinal());
+
+            String str = helloMessage.nodeId() + ":" + helloMessage.ipAddress() + ":" + helloMessage.tcpPort();
+            byte[] data = str.getBytes();
+            buffer.putInt(data.length + METADATA_LENGTH);
+            buffer.put(data);
+
+        } catch (Exception e) {
+            // TODO: recover from exceptions by forwarding stream to next marker
+            log.warn("Unable to encode message due to: " + e);
+        }
+    }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
new file mode 100644
index 0000000..f4b9710
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Distributed cluster store and messaging subsystem implementation.
+ */
+package org.onlab.onos.store.cluster.impl;
\ No newline at end of file