Working on IO loop stuff.
diff --git a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
new file mode 100644
index 0000000..9e1c2d3
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
@@ -0,0 +1,271 @@
+package org.onlab.nio;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * I/O loop for driving inbound & outbound {@link Message} transfer via
+ * {@link MessageStream}.
+ *
+ * @param <M> message type
+ * @param <S> message stream type
+ */
+public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
+        extends SelectorLoop {
+
+    // Queue of requests for new message streams to enter the IO loop processing.
+    private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>();
+
+    // Carries information required for admitting a new message stream.
+    private class NewStreamRequest {
+        private final S stream;
+        private final SelectableChannel channel;
+        private final int op;
+
+        public NewStreamRequest(S stream, SelectableChannel channel, int op) {
+            this.stream = stream;
+            this.channel = channel;
+            this.op = op;
+        }
+    }
+
+    // Set of message streams currently admitted into the IO loop.
+    private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>();
+
+    /**
+     * Creates an IO loop with the given selection timeout.
+     *
+     * @param timeout selection timeout in milliseconds
+     * @throws IOException if the backing selector cannot be opened
+     */
+    public IOLoop(long timeout) throws IOException {
+        super(timeout);
+    }
+
+    /**
+     * Creates a new message stream backed by the specified socket channel.
+     *
+     * @param byteChannel backing byte channel
+     * @return newly created message stream
+     */
+    protected abstract S createStream(ByteChannel byteChannel);
+
+    /**
+     * Removes the specified message stream from the IO loop.
+     *
+     * @param stream message stream to remove
+     */
+    void removeStream(MessageStream<M> stream) {
+        streams.remove(stream);
+    }
+
+    /**
+     * Processes the list of messages extracted from the specified message
+     * stream.
+     *
+     * @param messages non-empty list of received messages
+     * @param stream   message stream from which the messages were extracted
+     */
+    protected abstract void processMessages(List<M> messages, MessageStream<M> stream);
+
+    /**
+     * Completes connection request pending on the given selection key.
+     *
+     * @param key selection key holding the pending connect operation.
+     */
+    protected void connect(SelectionKey key) {
+        try {
+            SocketChannel ch = (SocketChannel) key.channel();
+            ch.finishConnect();
+        } catch (IOException | IllegalStateException e) {
+            log.warn("Unable to complete connection", e);
+        }
+
+        if (key.isValid()) {
+            key.interestOps(SelectionKey.OP_READ);
+        }
+    }
+
+    /**
+     * Processes an IO operation pending on the specified key.
+     *
+     * @param key selection key holding the pending I/O operation.
+     */
+    protected void processKeyOperation(SelectionKey key) {
+        @SuppressWarnings("unchecked")
+        S stream = (S) key.attachment();
+
+        try {
+            // If the key is not valid, bail out.
+            if (!key.isValid()) {
+                stream.close();
+                return;
+            }
+
+            // If there is a pending connect operation, complete it.
+            if (key.isConnectable()) {
+                connect(key);
+            }
+
+            // If there is a read operation, slurp as much data as possible.
+            if (key.isReadable()) {
+                List<M> messages = stream.read();
+
+                // No messages or failed flush imply disconnect; bail.
+                if (messages == null || stream.hadError()) {
+                    stream.close();
+                    return;
+                }
+
+                // If there were any messages read, process them.
+                if (!messages.isEmpty()) {
+                    try {
+                        processMessages(messages, stream);
+                    } catch (RuntimeException e) {
+                        onError(stream, e);
+                    }
+                }
+            }
+
+            // If there are pending writes, flush them
+            if (key.isWritable()) {
+                stream.flushIfPossible();
+            }
+
+            // If there were any issued flushing, close the stream.
+            if (stream.hadError()) {
+                stream.close();
+            }
+
+        } catch (CancelledKeyException e) {
+            // Key was cancelled, so silently close the stream
+            stream.close();
+        } catch (IOException e) {
+            if (!stream.isClosed() && !isResetByPeer(e)) {
+                log.warn("Unable to process IO", e);
+            }
+            stream.close();
+        }
+    }
+
+    // Indicates whether or not this exception is caused by 'reset by peer'.
+    private boolean isResetByPeer(IOException e) {
+        Throwable cause = e.getCause();
+        return cause != null && cause instanceof IOException &&
+                cause.getMessage().contains("reset by peer");
+    }
+
+    /**
+     * Hook to allow intercept of any errors caused during message processing.
+     * Default behaviour is to rethrow the error.
+     *
+     * @param stream message stream involved in the error
+     * @param error  the runtime exception
+     */
+    protected void onError(S stream, RuntimeException error) {
+        throw error;
+    }
+
+    /**
+     * Admits a new message stream backed by the specified socket channel
+     * with a pending accept operation.
+     *
+     * @param channel backing socket channel
+     */
+    public void acceptStream(SocketChannel channel) {
+        createAndAdmit(channel, SelectionKey.OP_READ);
+    }
+
+
+    /**
+     * Admits a new message stream backed by the specified socket channel
+     * with a pending connect operation.
+     *
+     * @param channel backing socket channel
+     */
+    public void connectStream(SocketChannel channel) {
+        createAndAdmit(channel, SelectionKey.OP_CONNECT);
+    }
+
+    /**
+     * Creates a new message stream backed by the specified socket channel
+     * and admits it into the IO loop.
+     *
+     * @param channel socket channel
+     * @param op      pending operations mask to be applied to the selection
+     *                key as a set of initial interestedOps
+     */
+    private synchronized void createAndAdmit(SocketChannel channel, int op) {
+        S stream = createStream(channel);
+        streams.add(stream);
+        newStreamRequests.add(new NewStreamRequest(stream, channel, op));
+        selector.wakeup();
+    }
+
+    /**
+     * Safely admits new streams into the IO loop.
+     */
+    private void admitNewStreams() {
+        Iterator<NewStreamRequest> it = newStreamRequests.iterator();
+        while (isRunning() && it.hasNext()) {
+            try {
+                NewStreamRequest request = it.next();
+                it.remove();
+                SelectionKey key = request.channel.register(selector, request.op,
+                                                            request.stream);
+                request.stream.setKey(key);
+            } catch (ClosedChannelException e) {
+                log.warn("Unable to admit new message stream", e);
+            }
+        }
+    }
+
+    @Override
+    protected void loop() throws IOException {
+        notifyReady();
+
+        // Keep going until told otherwise.
+        while (isRunning()) {
+            admitNewStreams();
+
+            // Process flushes & write selects on all streams
+            for (MessageStream<M> stream : streams) {
+                stream.flushIfWriteNotPending();
+            }
+
+            // Select keys and process them.
+            int count = selector.select(selectTimeout);
+            if (count > 0 && isRunning()) {
+                Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+                while (it.hasNext()) {
+                    SelectionKey key = it.next();
+                    it.remove();
+                    processKeyOperation(key);
+                }
+            }
+        }
+    }
+
+    /**
+     * Prunes the registered streams by discarding any stale ones.
+     */
+    public synchronized void pruneStaleStreams() {
+        for (MessageStream<M> stream : streams) {
+            if (stream.isStale()) {
+                stream.close();
+            }
+        }
+    }
+
+}