Working on IO loop stuff.
diff --git a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
new file mode 100644
index 0000000..b2acca4
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
@@ -0,0 +1,347 @@
+package org.onlab.nio;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.System.currentTimeMillis;
+import static java.nio.ByteBuffer.allocateDirect;
+
+/**
+ * Bi-directional message stream for transferring messages to & from the
+ * network via two byte buffers.
+ *
+ * @param <M> message type
+ */
+public abstract class MessageStream<M extends Message> {
+
+    protected Logger log = LoggerFactory.getLogger(getClass());
+
+    private final IOLoop<M, ?> loop;
+    private final ByteChannel channel;
+    private final int maxIdleMillis;
+
+    private final ByteBuffer inbound;
+    private ByteBuffer outbound;
+    private SelectionKey key;
+
+    private volatile boolean closed = false;
+    private volatile boolean writePending;
+    private volatile boolean writeOccurred;
+
+    private Exception ioError;
+    private long lastActiveTime;
+
+
+    /**
+     * Creates a message stream associated with the specified IO loop and
+     * backed by the given byte channel.
+     *
+     * @param loop          IO loop
+     * @param byteChannel   backing byte channel
+     * @param bufferSize    size of the backing byte buffers
+     * @param maxIdleMillis maximum number of millis the stream can be idle
+     *                      before it will be closed
+     */
+    protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
+                            int bufferSize, int maxIdleMillis) {
+        this.loop = checkNotNull(loop, "Loop cannot be null");
+        this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
+
+        checkArgument(maxIdleMillis > 0, "Idle time must be positive");
+        this.maxIdleMillis = maxIdleMillis;
+
+        inbound = allocateDirect(bufferSize);
+        outbound = allocateDirect(bufferSize);
+    }
+
+    /**
+     * Gets a single message from the specified byte buffer; this is
+     * to be done without manipulating the buffer via flip, reset or clear.
+     *
+     * @param buffer byte buffer
+     * @return read message or null if there are not enough bytes to read
+     * a complete message
+     */
+    protected abstract M read(ByteBuffer buffer);
+
+    /**
+     * Puts the specified message into the specified byte buffer; this is
+     * to be done without manipulating the buffer via flip, reset or clear.
+     *
+     * @param message message to be write into the buffer
+     * @param buffer  byte buffer
+     */
+    protected abstract void write(M message, ByteBuffer buffer);
+
+    /**
+     * Closes the message buffer.
+     */
+    public void close() {
+        synchronized (this) {
+            if (closed) {
+                return;
+            }
+            closed = true;
+        }
+
+        loop.removeStream(this);
+        if (key != null) {
+            try {
+                key.cancel();
+                key.channel().close();
+            } catch (IOException e) {
+                log.warn("Unable to close stream", e);
+            }
+        }
+    }
+
+    /**
+     * Indicates whether this buffer has been closed.
+     *
+     * @return true if this stream has been closed
+     */
+    public synchronized boolean isClosed() {
+        return closed;
+    }
+
+    /**
+     * Returns the stream IO selection key.
+     *
+     * @return socket channel registration selection key
+     */
+    public SelectionKey key() {
+        return key;
+    }
+
+    /**
+     * Binds the selection key to be used for driving IO operations on the stream.
+     *
+     * @param key IO selection key
+     */
+    public void setKey(SelectionKey key) {
+        this.key = key;
+        this.lastActiveTime = currentTimeMillis();
+    }
+
+    /**
+     * Returns the IO loop to which this stream is bound.
+     *
+     * @return I/O loop used to drive this stream
+     */
+    public IOLoop<M, ?> loop() {
+        return loop;
+    }
+
+    /**
+     * Indicates whether the any prior IO encountered an error.
+     *
+     * @return true if a write failed
+     */
+    public boolean hadError() {
+        return ioError != null;
+    }
+
+    /**
+     * Gets the prior IO error, if one occurred.
+     *
+     * @return IO error; null if none occurred
+     */
+    public Exception getError() {
+        return ioError;
+    }
+
+    /**
+     * Reads, withouth blocking, a list of messages from the stream.
+     * The list will be empty if there were not messages pending.
+     *
+     * @return list of messages or null if backing channel has been closed
+     * @throws IOException if messages could not be read
+     */
+    public List<M> read() throws IOException {
+        try {
+            int read = channel.read(inbound);
+            if (read != -1) {
+                // Read the messages one-by-one and add them to the list.
+                List<M> messages = new ArrayList<>();
+                M message;
+                inbound.flip();
+                while ((message = read(inbound)) != null) {
+                    messages.add(message);
+                }
+                inbound.compact();
+
+                // Mark the stream with current time to indicate liveness.
+                lastActiveTime = currentTimeMillis();
+                return messages;
+            }
+            return null;
+
+        } catch (Exception e) {
+            throw new IOException("Unable to read messages", e);
+        }
+    }
+
+    /**
+     * Writes the specified list of messages to the stream.
+     *
+     * @param messages list of messages to write
+     * @throws IOException if error occurred while writing the data
+     */
+    public void write(List<M> messages) throws IOException {
+        synchronized (this) {
+            // First write all messages.
+            for (M m : messages) {
+                append(m);
+            }
+            flushUnlessAlreadyPlanningTo();
+        }
+    }
+
+    /**
+     * Writes the given message to the stream.
+     *
+     * @param message message to write
+     * @throws IOException if error occurred while writing the data
+     */
+    public void write(M message) throws IOException {
+        synchronized (this) {
+            append(message);
+            flushUnlessAlreadyPlanningTo();
+        }
+    }
+
+    // Appends the specified message into the internal buffer, growing the
+    // buffer if required.
+    private void append(M message) {
+        // If the buffer does not have sufficient length double it.
+        while (outbound.remaining() < message.length()) {
+            doubleSize();
+        }
+        // Place the message into the buffer and bump the output trackers.
+        write(message, outbound);
+    }
+
+    // Forces a flush, unless one is planned already.
+    private void flushUnlessAlreadyPlanningTo() throws IOException {
+        if (!writeOccurred && !writePending) {
+            flush();
+        }
+    }
+
+    /**
+     * Flushes any pending writes.
+     *
+     * @throws IOException if flush failed
+     */
+    public void flush() throws IOException {
+        synchronized (this) {
+            if (!writeOccurred && !writePending) {
+                outbound.flip();
+                try {
+                    channel.write(outbound);
+                } catch (IOException e) {
+                    if (!closed && !e.getMessage().equals("Broken pipe")) {
+                        log.warn("Unable to write data", e);
+                        ioError = e;
+                    }
+                }
+                lastActiveTime = currentTimeMillis();
+                writeOccurred = true;
+                writePending = outbound.hasRemaining();
+                outbound.compact();
+            }
+        }
+    }
+
+    /**
+     * Indicates whether the stream has bytes to be written to the channel.
+     *
+     * @return true if there are bytes to be written
+     */
+    boolean isWritePending() {
+        synchronized (this) {
+            return writePending;
+        }
+    }
+
+    /**
+     * Attempts to flush data, internal stream state and channel availability
+     * permitting. Invoked by the driver I/O loop during handling of writable
+     * selection key.
+     * <p/>
+     * Resets the internal state flags {@code writeOccurred} and
+     * {@code writePending}.
+     *
+     * @throws IOException if implicit flush failed
+     */
+    void flushIfPossible() throws IOException {
+        synchronized (this) {
+            writePending = false;
+            writeOccurred = false;
+            if (outbound.position() > 0) {
+                flush();
+            }
+        }
+        key.interestOps(SelectionKey.OP_READ);
+    }
+
+    /**
+     * Attempts to flush data, internal stream state and channel availability
+     * permitting and if other writes are not pending. Invoked by the driver
+     * I/O loop prior to entering select wait. Resets the internal
+     * {@code writeOccurred} state flag.
+     *
+     * @throws IOException if implicit flush failed
+     */
+    void flushIfWriteNotPending() throws IOException {
+        synchronized (this) {
+            writeOccurred = false;
+            if (!writePending && outbound.position() > 0) {
+                flush();
+            }
+        }
+        if (isWritePending()) {
+            key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+        }
+    }
+
+    /**
+     * Doubles the size of the outbound buffer.
+     */
+    private void doubleSize() {
+        ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
+        outbound.flip();
+        newBuffer.put(outbound);
+        outbound = newBuffer;
+    }
+
+    /**
+     * Returns the maximum number of milliseconds the stream is allowed
+     * without any read/write operations.
+     *
+     * @return number if millis of permissible idle time
+     */
+    protected int maxIdleMillis() {
+        return maxIdleMillis;
+    }
+
+
+    /**
+     * Returns true if the given stream has gone stale.
+     *
+     * @return true if the stream is stale
+     */
+    boolean isStale() {
+        return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
+    }
+
+}