Working on the cluster i/o
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
new file mode 100644
index 0000000..87ed221
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationService.java
@@ -0,0 +1,46 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+
+import java.util.Set;
+
+/**
+ * Service for assisting communications between controller cluster nodes.
+ */
+public interface ClusterCommunicationService {
+
+ /**
+ * Sends a message to the specified controller node.
+ *
+ * @param message message to send
+ * @param toNodeId node identifier
+ * @return true if the message was sent sucessfully; false if there is
+ * no stream or if there was an error
+ */
+ boolean send(ClusterMessage message, NodeId toNodeId);
+
+ /**
+ * Adds a new subscriber for the specified message subject.
+ *
+ * @param subject message subject
+ * @param subscriber message subscriber
+ */
+ void addSubscriber(MessageSubject subject, MessageSubscriber subscriber);
+
+ /**
+ * Removes the specified subscriber from the given message subject.
+ *
+ * @param subject message subject
+ * @param subscriber message subscriber
+ */
+ void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber);
+
+ /**
+ * Returns the set of subscribers for the specified message subject.
+ *
+ * @param subject message subject
+ * @return set of message subscribers
+ */
+ Set<MessageSubscriber> getSubscribers(MessageSubject subject);
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
new file mode 100644
index 0000000..3033ac9
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -0,0 +1,37 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.nio.AbstractMessage;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Base message for cluster-wide communications.
+ */
+public abstract class ClusterMessage extends AbstractMessage {
+
+ private final MessageSubject subject;
+
+ /**
+ * Creates a cluster message.
+ *
+ * @param subject message subject
+ */
+ protected ClusterMessage(MessageSubject subject) {
+ this.subject = subject;
+ }
+
+ /**
+ * Returns the message subject indicator.
+ *
+ * @return message subject
+ */
+ public MessageSubject subject() {
+ return subject;
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("subject", subject).add("length", length).toString();
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
new file mode 100644
index 0000000..0970726
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageStream.java
@@ -0,0 +1,67 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.onos.cluster.DefaultControllerNode;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Stream for transferring messages between two cluster members.
+ */
+public class ClusterMessageStream extends MessageStream<ClusterMessage> {
+
+ private static final int COMM_BUFFER_SIZE = 32 * 1024;
+ private static final int COMM_IDLE_TIME = 500;
+
+ private DefaultControllerNode node;
+ private SerializationService serializationService;
+
+ /**
+ * Creates a message stream associated with the specified IO loop and
+ * backed by the given byte channel.
+ *
+ * @param serializationService service for encoding/decoding messages
+ * @param loop IO loop
+ * @param byteChannel backing byte channel
+ */
+ public ClusterMessageStream(SerializationService serializationService,
+ IOLoop<ClusterMessage, ?> loop,
+ ByteChannel byteChannel) {
+ super(loop, byteChannel, COMM_BUFFER_SIZE, COMM_IDLE_TIME);
+ this.serializationService = serializationService;
+ }
+
+ /**
+ * Returns the node with which this stream is associated.
+ *
+ * @return controller node
+ */
+ public DefaultControllerNode node() {
+ return node;
+ }
+
+ /**
+ * Sets the node with which this stream is affiliated.
+ *
+ * @param node controller node
+ */
+ public void setNode(DefaultControllerNode node) {
+ checkState(this.node == null, "Stream is already bound to a node");
+ this.node = node;
+ }
+
+ @Override
+ protected ClusterMessage read(ByteBuffer buffer) {
+ return serializationService.decode(buffer);
+ }
+
+ @Override
+ protected void write(ClusterMessage message, ByteBuffer buffer) {
+ serializationService.encode(message, buffer);
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/EchoMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/EchoMessage.java
new file mode 100644
index 0000000..d25a341
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/EchoMessage.java
@@ -0,0 +1,37 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+
+/**l
+ * Echo heart-beat message that nodes send to each other.
+ */
+public class EchoMessage extends ClusterMessage {
+
+ private NodeId nodeId;
+
+ // For serialization
+ private EchoMessage() {
+ super(MessageSubject.HELLO);
+ nodeId = null;
+ }
+
+ /**
+ * Creates a new heart-beat echo message.
+ *
+ * @param nodeId sending node identification
+ */
+ public EchoMessage(NodeId nodeId) {
+ super(MessageSubject.HELLO);
+ nodeId = nodeId;
+ }
+
+ /**
+ * Returns the sending node identifer.
+ *
+ * @return node identifier
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
new file mode 100644
index 0000000..ddc79d3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
@@ -0,0 +1,63 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
+
+/**
+ * Hello message that nodes use to greet each other.
+ */
+public class HelloMessage extends ClusterMessage {
+
+ private NodeId nodeId;
+ private IpPrefix ipAddress;
+ private int tcpPort;
+
+ // For serialization
+ private HelloMessage() {
+ super(MessageSubject.HELLO);
+ nodeId = null;
+ ipAddress = null;
+ tcpPort = 0;
+ }
+
+ /**
+ * Creates a new hello message for the specified end-point data.
+ *
+ * @param nodeId sending node identification
+ * @param ipAddress sending node IP address
+ * @param tcpPort sending node TCP port
+ */
+ public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
+ super(MessageSubject.HELLO);
+ nodeId = nodeId;
+ ipAddress = ipAddress;
+ tcpPort = tcpPort;
+ }
+
+ /**
+ * Returns the sending node identifer.
+ *
+ * @return node identifier
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Returns the sending node IP address.
+ *
+ * @return node IP address
+ */
+ public IpPrefix ipAddress() {
+ return ipAddress;
+ }
+
+ /**
+ * Returns the sending node TCP listen port.
+ *
+ * @return TCP listen port
+ */
+ public int tcpPort() {
+ return tcpPort;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
new file mode 100644
index 0000000..3b888b3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -0,0 +1,14 @@
+package org.onlab.onos.store.cluster.messaging;
+
+/**
+ * Representation of a message subject.
+ */
+public enum MessageSubject {
+
+ /** Represents a first greeting message. */
+ HELLO,
+
+ /** Signifies a heart-beat message. */
+ ECHO
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
new file mode 100644
index 0000000..6b78fec
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
@@ -0,0 +1,15 @@
+package org.onlab.onos.store.cluster.messaging;
+
+/**
+ * Represents a message consumer.
+ */
+public interface MessageSubscriber {
+
+ /**
+ * Receives the specified cluster message.
+ *
+ * @param message message to be received
+ */
+ void receive(ClusterMessage message);
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
new file mode 100644
index 0000000..79e054b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
@@ -0,0 +1,26 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Service for serializing/deserializing intra-cluster messages.
+ */
+public interface SerializationService {
+
+ /**
+ * Decodes the specified byte buffer to obtain a message within.
+ *
+ * @param buffer byte buffer with message(s)
+ * @return parsed message
+ */
+ ClusterMessage decode(ByteBuffer buffer);
+
+ /**
+ * Encodes the specified message into the given byte buffer.
+ *
+ * @param message message to be encoded
+ * @param buffer byte buffer to receive the message data
+ */
+ void encode(ClusterMessage message, ByteBuffer buffer);
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
new file mode 100644
index 0000000..bafb2c3
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -0,0 +1,64 @@
+package org.onlab.onos.store.cluster.messaging.impl;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.impl.CommunicationsDelegate;
+import org.onlab.onos.store.cluster.impl.MessageSender;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
+
+import java.util.Set;
+
+/**
+ * Implements the cluster communication services to use by other stores.
+ */
+@Component(immediate = true)
+@Service
+public class ClusterCommunicationManager
+ implements ClusterCommunicationService, CommunicationsDelegate {
+
+ // TODO: use something different that won't require synchronization
+ private Multimap<MessageSubject, MessageSubscriber> subscribers = HashMultimap.create();
+ private MessageSender messageSender;
+
+ @Override
+ public boolean send(ClusterMessage message, NodeId toNodeId) {
+ return messageSender.send(toNodeId, message);
+ }
+
+ @Override
+ public synchronized void addSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
+ subscribers.put(subject, subscriber);
+ }
+
+ @Override
+ public synchronized void removeSubscriber(MessageSubject subject, MessageSubscriber subscriber) {
+ subscribers.remove(subject, subscriber);
+ }
+
+ @Override
+ public Set<MessageSubscriber> getSubscribers(MessageSubject subject) {
+ return ImmutableSet.copyOf(subscribers.get(subject));
+ }
+
+ @Override
+ public void dispatch(ClusterMessage message) {
+ Set<MessageSubscriber> set = getSubscribers(message.subject());
+ if (set != null) {
+ for (MessageSubscriber subscriber : set) {
+ subscriber.receive(message);
+ }
+ }
+ }
+
+ @Override
+ public void setSender(MessageSender messageSender) {
+ this.messageSender = messageSender;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
new file mode 100644
index 0000000..93c8310
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
@@ -0,0 +1,69 @@
+package org.onlab.onos.store.cluster.messaging.impl;
+
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+
+import java.nio.ByteBuffer;
+
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Factory for parsing messages sent between cluster members.
+ */
+public class MessageSerializer implements SerializationService {
+
+ private static final int METADATA_LENGTH = 16; // 8 + 4 + 4
+ private static final int LENGTH_OFFSET = 12;
+
+ private static final long MARKER = 0xfeedcafebeaddeadL;
+
+ @Override
+ public ClusterMessage decode(ByteBuffer buffer) {
+ try {
+ // Do we have enough bytes to read the header? If not, bail.
+ if (buffer.remaining() < METADATA_LENGTH) {
+ return null;
+ }
+
+ // Peek at the length and if we have enough to read the entire message
+ // go ahead, otherwise bail.
+ int length = buffer.getInt(buffer.position() + LENGTH_OFFSET);
+ if (buffer.remaining() < length) {
+ return null;
+ }
+
+ // At this point, we have enough data to read a complete message.
+ long marker = buffer.getLong();
+ checkState(marker == MARKER, "Incorrect message marker");
+
+ int subjectOrdinal = buffer.getInt();
+ MessageSubject subject = MessageSubject.values()[subjectOrdinal];
+ length = buffer.getInt();
+
+ // TODO: sanity checking for length
+ byte[] data = new byte[length - METADATA_LENGTH];
+ buffer.get(data);
+
+ // TODO: add deserialization hook here; for now this hack
+ return null; // actually deserialize
+
+ } catch (Exception e) {
+ // TODO: recover from exceptions by forwarding stream to next marker
+ e.printStackTrace();
+ }
+ return null;
+ }
+
+ @Override
+ public void encode(ClusterMessage message, ByteBuffer buffer) {
+ try {
+ int i = 0;
+ // Type based lookup for proper encoder
+ } catch (Exception e) {
+ // TODO: recover from exceptions by forwarding stream to next marker
+ e.printStackTrace();
+ }
+ }
+
+}