Further simplified the store & connection manager relationship.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
deleted file mode 100644
index 0970726..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package org.onlab.onos.store.cluster.messaging;
-
-import org.onlab.nio.IOLoop;
-import org.onlab.nio.MessageStream;
-import org.onlab.onos.cluster.DefaultControllerNode;
-
-import java.nio.ByteBuffer;
-import java.nio.channels.ByteChannel;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Stream for transferring messages between two cluster members.
- */
-public class ClusterMessageStream extends MessageStream<ClusterMessage> {
-
- private static final int COMM_BUFFER_SIZE = 32 * 1024;
- private static final int COMM_IDLE_TIME = 500;
-
- private DefaultControllerNode node;
- private SerializationService serializationService;
-
- /**
- * Creates a message stream associated with the specified IO loop and
- * backed by the given byte channel.
- *
- * @param serializationService service for encoding/decoding messages
- * @param loop IO loop
- * @param byteChannel backing byte channel
- */
- public ClusterMessageStream(SerializationService serializationService,
- IOLoop<ClusterMessage, ?> loop,
- ByteChannel byteChannel) {
- super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
- this.serializationService = serializationService;
- }
-
- /**
- * Returns the node with which this stream is associated.
- *
- * @return controller node
- */
- public DefaultControllerNode node() {
- return node;
- }
-
- /**
- * Sets the node with which this stream is affiliated.
- *
- * @param node controller node
- */
- public void setNode(DefaultControllerNode node) {
- checkState(this.node == null, "Stream is already bound to a node");
- this.node = node;
- }
-
- @Override
- protected ClusterMessage read(ByteBuffer buffer) {
- return serializationService.decode(buffer);
- }
-
- @Override
- protected void write(ClusterMessage message, ByteBuffer buffer) {
- serializationService.encode(message, buffer);
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
deleted file mode 100644
index 7064801..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package org.onlab.onos.store.cluster.messaging.impl;
-
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.HelloMessage;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-import org.onlab.packet.IpPrefix;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-
-import static com.google.common.base.Preconditions.checkState;
-
-/**
- * Factory for parsing messages sent between cluster members.
- */
-@Component(immediate = true)
-@Service
-public class MessageSerializer implements SerializationService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
- private static final int LENGTH_OFFSET = 12;
-
- private static final long MARKER = 0xfeedcafebeaddeadL;
-
- @Override
- public ClusterMessage decode(ByteBuffer buffer) {
- try {
- // Do we have enough bytes to read the header? If not, bail.
- if (buffer.remaining() < METADATA_LENGTH) {
- return null;
- }
-
- // Peek at the length and if we have enough to read the entire message
- // go ahead, otherwise bail.
- int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
- if (buffer.remaining() < length) {
- return null;
- }
-
- // At this point, we have enough data to read a complete message.
- long marker = buffer.getLong();
- checkState(marker == MARKER, "Incorrect message marker");
-
- int subjectOrdinal = buffer.getInt();
- MessageSubject subject = MessageSubject.values()[subjectOrdinal];
- length = buffer.getInt();
-
- // TODO: sanity checking for length
- byte[] data = new byte[length - METADATA_LENGTH];
- buffer.get(data);
-
- // TODO: add deserialization hook here; for now this hack
- String[] fields = new String(data).split(":");
- return new HelloMessage(new NodeId(fields[0]), IpPrefix.valueOf(fields[1]), Integer.parseInt(fields[2]));
-
- } catch (Exception e) {
- // TODO: recover from exceptions by forwarding stream to next marker
- log.warn("Unable to decode message due to: " + e);
- }
- return null;
- }
-
- @Override
- public void encode(ClusterMessage message, ByteBuffer buffer) {
- try {
- HelloMessage helloMessage = (HelloMessage) message;
- buffer.putLong(MARKER);
- buffer.putInt(message.subject().ordinal());
-
- String str = helloMessage.nodeId() + ":" + helloMessage.ipAddress() + ":" + helloMessage.tcpPort();
- byte[] data = str.getBytes();
- buffer.putInt(data.length + METADATA_LENGTH);
- buffer.put(data);
-
- } catch (Exception e) {
- // TODO: recover from exceptions by forwarding stream to next marker
- log.warn("Unable to encode message due to: " + e);
- }
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java
new file mode 100644
index 0000000..5276b0b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Cluster messaging APIs for the use by the various distributed stores.
+ */
+package org.onlab.onos.store.cluster.messaging;
\ No newline at end of file