Further simplified the store & connection manager relationship.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
index 492afbc..8acc8fe 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
@@ -13,7 +13,6 @@
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
@@ -50,8 +49,8 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private static final long CONNECTION_CUSTODIAN_DELAY = 1000L;
- private static final long CONNECTION_CUSTODIAN_FREQUENCY = 5000;
+ private static final long CONNECTION_CUSTODIAN_DELAY = 100L;
+ private static final long CONNECTION_CUSTODIAN_FREQUENCY = 2000;
private static final long START_TIMEOUT = 1000;
private static final int WORKERS = 3;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
index c096a7d..d442cc8 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterIOWorker.java
@@ -5,7 +5,6 @@
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.ClusterMessageStream;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.slf4j.Logger;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
new file mode 100644
index 0000000..d182aa1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMessageStream.java
@@ -0,0 +1,69 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Stream for transferring messages between two cluster members.
+ */
+public class ClusterMessageStream extends MessageStream<ClusterMessage> {
+
+ private static final int COMM_BUFFER_SIZE = 32 * 1024;
+ private static final int COMM_IDLE_TIME = 500;
+
+ private DefaultControllerNode node;
+ private SerializationService serializationService;
+
+ /**
+ * Creates a message stream associated with the specified IO loop and
+ * backed by the given byte channel.
+ *
+ * @param serializationService service for encoding/decoding messages
+ * @param loop IO loop
+ * @param byteChannel backing byte channel
+ */
+ public ClusterMessageStream(SerializationService serializationService,
+ IOLoop<ClusterMessage, ?> loop,
+ ByteChannel byteChannel) {
+ super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
+ this.serializationService = serializationService;
+ }
+
+ /**
+ * Returns the node with which this stream is associated.
+ *
+ * @return controller node
+ */
+ public DefaultControllerNode node() {
+ return node;
+ }
+
+ /**
+ * Sets the node with which this stream is affiliated.
+ *
+ * @param node controller node
+ */
+ public void setNode(DefaultControllerNode node) {
+ checkState(this.node == null, "Stream is already bound to a node");
+ this.node = node;
+ }
+
+ @Override
+ protected ClusterMessage read(ByteBuffer buffer) {
+ return serializationService.decode(buffer);
+ }
+
+ @Override
+ protected void write(ClusterMessage message, ByteBuffer buffer) {
+ serializationService.encode(message, buffer);
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
new file mode 100644
index 0000000..b54f0db
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
@@ -0,0 +1,88 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.HelloMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+import org.onlab.packet.IpPrefix;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Factory for parsing messages sent between cluster members.
+ */
+@Component(immediate = true)
+@Service
+public class MessageSerializer implements SerializationService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
+ private static final int LENGTH_OFFSET = 12;
+
+ private static final long MARKER = 0xfeedcafebeaddeadL;
+
+ @Override
+ public ClusterMessage decode(ByteBuffer buffer) {
+ try {
+ // Do we have enough bytes to read the header? If not, bail.
+ if (buffer.remaining() < METADATA_LENGTH) {
+ return null;
+ }
+
+ // Peek at the length and if we have enough to read the entire message
+ // go ahead, otherwise bail.
+ int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
+ if (buffer.remaining() < length) {
+ return null;
+ }
+
+ // At this point, we have enough data to read a complete message.
+ long marker = buffer.getLong();
+ checkState(marker == MARKER, "Incorrect message marker");
+
+ int subjectOrdinal = buffer.getInt();
+ MessageSubject subject = MessageSubject.values()[subjectOrdinal];
+ length = buffer.getInt();
+
+ // TODO: sanity checking for length
+ byte[] data = new byte[length - METADATA_LENGTH];
+ buffer.get(data);
+
+ // TODO: add deserialization hook here; for now this hack
+ String[] fields = new String(data).split(":");
+ return new HelloMessage(new NodeId(fields[0]), IpPrefix.valueOf(fields[1]), Integer.parseInt(fields[2]));
+
+ } catch (Exception e) {
+ // TODO: recover from exceptions by forwarding stream to next marker
+ log.warn("Unable to decode message due to: " + e);
+ }
+ return null;
+ }
+
+ @Override
+ public void encode(ClusterMessage message, ByteBuffer buffer) {
+ try {
+ HelloMessage helloMessage = (HelloMessage) message;
+ buffer.putLong(MARKER);
+ buffer.putInt(message.subject().ordinal());
+
+ String str = helloMessage.nodeId() + ":" + helloMessage.ipAddress() + ":" + helloMessage.tcpPort();
+ byte[] data = str.getBytes();
+ buffer.putInt(data.length + METADATA_LENGTH);
+ buffer.put(data);
+
+ } catch (Exception e) {
+ // TODO: recover from exceptions by forwarding stream to next marker
+ log.warn("Unable to encode message due to: " + e);
+ }
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
new file mode 100644
index 0000000..f4b9710
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Distributed cluster store and messaging subsystem implementation.
+ */
+package org.onlab.onos.store.cluster.impl;
\ No newline at end of file