Added IO loop test to the foo app.
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
new file mode 100644
index 0000000..72e7a59
--- /dev/null
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
@@ -0,0 +1,310 @@
+package org.onlab.onos.foo;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.util.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.lang.String.format;
+import static java.lang.System.out;
+import static org.onlab.onos.foo.IOLoopTestServer.PORT;
+import static org.onlab.util.Tools.delay;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Auxiliary test fixture to measure speed of NIO-based channels.
+ */
+public class IOLoopTestClient {
+
+    private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
+
+    private final InetAddress ip;
+    private final int port;
+    private final int msgCount;
+    private final int msgLength;
+
+    private final List<CustomIOLoop> iloops = new ArrayList<>();
+    private final ExecutorService ipool;
+    private final ExecutorService wpool;
+
+    Counter messages;
+    Counter bytes;
+
+    /**
+     * Main entry point to launch the client.
+     *
+     * @param args command-line arguments
+     * @throws java.io.IOException          if unable to connect to server
+     * @throws InterruptedException if latch wait gets interrupted
+     * @throws java.util.concurrent.ExecutionException   if wait gets interrupted
+     * @throws java.util.concurrent.TimeoutException     if timeout occurred while waiting for completion
+     */
+    public static void main(String[] args)
+            throws IOException, InterruptedException, ExecutionException, TimeoutException {
+        startStandalone(args);
+
+        System.exit(0);
+    }
+
+    /**
+     * Starts a standalone IO loop test client.
+     *
+     * @param args command-line arguments
+     */
+    public static void startStandalone(String[] args)
+            throws IOException, InterruptedException, ExecutionException, TimeoutException {
+        InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
+        int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
+        int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
+        int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
+        int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
+
+        log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
+                 wc, mc, ml, ip);
+        IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
+
+        client.start();
+        delay(500);
+
+        client.await(to);
+        client.report();
+    }
+
+    /**
+     * Creates a speed client.
+     *
+     * @param ip   ip address of server
+     * @param wc   worker count
+     * @param mc   message count to send per client
+     * @param ml   message length in bytes
+     * @param port socket port
+     * @throws java.io.IOException if unable to create IO loops
+     */
+    public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
+        this.ip = ip;
+        this.port = port;
+        this.msgCount = mc;
+        this.msgLength = ml;
+        this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
+        this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
+
+        for (int i = 0; i < wc; i++) {
+            iloops.add(new CustomIOLoop());
+        }
+    }
+
+    /**
+     * Starts the client workers.
+     *
+     * @throws java.io.IOException if unable to open connection
+     */
+    public void start() throws IOException {
+        messages = new Counter();
+        bytes = new Counter();
+
+        // First start up all the IO loops
+        for (CustomIOLoop l : iloops) {
+            ipool.execute(l);
+        }
+
+        // Wait for all of them to get going
+        for (CustomIOLoop l : iloops) {
+            l.awaitStart(1000);
+        }
+
+        // ... and Next open all connections; one-per-loop
+        for (CustomIOLoop l : iloops) {
+            openConnection(l);
+        }
+    }
+
+
+    /**
+     * Initiates open connection request and registers the pending socket
+     * channel with the given IO loop.
+     *
+     * @param loop loop with which the channel should be registered
+     * @throws java.io.IOException if the socket could not be open or connected
+     */
+    private void openConnection(CustomIOLoop loop) throws IOException {
+        SocketAddress sa = new InetSocketAddress(ip, port);
+        SocketChannel ch = SocketChannel.open();
+        ch.configureBlocking(false);
+        loop.connectStream(ch);
+        ch.connect(sa);
+    }
+
+
+    /**
+     * Waits for the client workers to complete.
+     *
+     * @param secs timeout in seconds
+     * @throws java.util.concurrent.ExecutionException   if execution failed
+     * @throws InterruptedException if interrupt occurred while waiting
+     * @throws java.util.concurrent.TimeoutException     if timeout occurred
+     */
+    public void await(int secs) throws InterruptedException,
+            ExecutionException, TimeoutException {
+        for (CustomIOLoop l : iloops) {
+            if (l.worker.task != null) {
+                l.worker.task.get(secs, TimeUnit.SECONDS);
+            }
+        }
+        messages.freeze();
+        bytes.freeze();
+    }
+
+    /**
+     * Reports on the accumulated throughput trackers.
+     */
+    public void report() {
+        DecimalFormat f = new DecimalFormat("#,##0");
+        out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs",
+                           f.format(messages.total()), f.format(bytes.total()),
+                           f.format(messages.throughput()),
+                           f.format(bytes.throughput() / (1024 * msgLength))));
+    }
+
+
+    // Loop for transfer of fixed-length messages
+    private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
+
+        Worker worker = new Worker();
+
+        public CustomIOLoop() throws IOException {
+            super(500);
+        }
+
+
+        @Override
+        protected TestMessageStream createStream(ByteChannel channel) {
+            return new TestMessageStream(msgLength, channel, this);
+        }
+
+        @Override
+        protected synchronized void removeStream(MessageStream<TestMessage> stream) {
+            super.removeStream(stream);
+
+            messages.add(stream.messagesIn().total());
+            bytes.add(stream.bytesIn().total());
+
+//            out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
+//                               FORMAT.format(stream.messagesIn().throughput()),
+//                               FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
+//                               FORMAT.format(stream.messagesOut().throughput()),
+//                               FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
+
+            stream.messagesOut().reset();
+            stream.bytesOut().reset();
+        }
+
+        @Override
+        protected void processMessages(List<TestMessage> messages,
+                                       MessageStream<TestMessage> b) {
+            worker.release(messages.size());
+        }
+
+        @Override
+        protected void connect(SelectionKey key) {
+            super.connect(key);
+            TestMessageStream b = (TestMessageStream) key.attachment();
+            Worker w = ((CustomIOLoop) b.loop()).worker;
+            w.pump(b);
+        }
+
+    }
+
+    /**
+     * Auxiliary worker to connect and pump batched messages using blocking I/O.
+     */
+    private class Worker implements Runnable {
+
+        private static final int BATCH_SIZE = 1000;
+        private static final int PERMITS = 2 * BATCH_SIZE;
+
+        private TestMessageStream b;
+        private FutureTask<Worker> task;
+
+        // Stuff to throttle pump
+        private final Semaphore semaphore = new Semaphore(PERMITS);
+        private int msgWritten;
+
+        void pump(TestMessageStream b) {
+            this.b = b;
+            task = new FutureTask<>(this, this);
+            wpool.execute(task);
+        }
+
+        @Override
+        public void run() {
+            try {
+                log.info("Worker started...");
+
+                List<TestMessage> batch = new ArrayList<>();
+                for (int i = 0; i < BATCH_SIZE; i++) {
+                    batch.add(new TestMessage(msgLength));
+                }
+
+                while (msgWritten < msgCount) {
+                    msgWritten += writeBatch(b, batch);
+                }
+
+                // Now try to get all the permits back before sending poison pill
+                semaphore.acquireUninterruptibly(PERMITS);
+                b.close();
+
+                log.info("Worker done...");
+
+            } catch (IOException e) {
+                log.error("Worker unable to perform I/O", e);
+            }
+        }
+
+
+        private int writeBatch(TestMessageStream b, List<TestMessage> batch)
+                throws IOException {
+            int count = Math.min(BATCH_SIZE, msgCount - msgWritten);
+            acquire(count);
+            if (count == BATCH_SIZE) {
+                b.write(batch);
+            } else {
+                for (int i = 0; i < count; i++) {
+                    b.write(batch.get(i));
+                }
+            }
+            return count;
+        }
+
+
+        // Release permits based on the specified number of message credits
+        private void release(int permits) {
+            semaphore.release(permits);
+        }
+
+        // Acquire permit for a single batch
+        private void acquire(int permits) {
+            semaphore.acquireUninterruptibly(permits);
+        }
+
+    }
+
+}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java
new file mode 100644
index 0000000..9c1f649
--- /dev/null
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java
@@ -0,0 +1,234 @@
+package org.onlab.onos.foo;
+
+import org.onlab.nio.AcceptorLoop;
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+import org.onlab.util.Counter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static java.lang.String.format;
+import static java.lang.System.out;
+import static org.onlab.util.Tools.delay;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Auxiliary test fixture to measure speed of NIO-based channels.
+ */
+public class IOLoopTestServer {
+
+    private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
+
+    private static final int PRUNE_FREQUENCY = 1000;
+
+    static final int PORT = 9876;
+    static final long TIMEOUT = 1000;
+
+    static final boolean SO_NO_DELAY = false;
+    static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
+    static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
+
+    static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
+
+    private final AcceptorLoop aloop;
+    private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
+
+    private final List<CustomIOLoop> iloops = new ArrayList<>();
+    private final ExecutorService ipool;
+
+    private final int workerCount;
+    private final int msgLength;
+    private int lastWorker = -1;
+
+    Counter messages;
+    Counter bytes;
+
+    /**
+     * Main entry point to launch the server.
+     *
+     * @param args command-line arguments
+     * @throws java.io.IOException if unable to crate IO loops
+     */
+    public static void main(String[] args) throws IOException {
+        startStandalone(args);
+        System.exit(0);
+    }
+
+    /**
+     * Starts a standalone IO loop test server.
+     *
+     * @param args command-line arguments
+     */
+    public static void startStandalone(String[] args) throws IOException {
+        InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
+        int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
+        int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
+
+        log.info("Setting up the server with {} workers, {} byte messages on {}... ",
+                 wc, ml, ip);
+        IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
+        server.start();
+
+        // Start pruning clients.
+        while (true) {
+            delay(PRUNE_FREQUENCY);
+            server.prune();
+        }
+    }
+
+    /**
+     * Creates a speed server.
+     *
+     * @param ip   optional ip of the adapter where to bind
+     * @param wc   worker count
+     * @param ml   message length in bytes
+     * @param port listen port
+     * @throws java.io.IOException if unable to create IO loops
+     */
+    public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
+        this.workerCount = wc;
+        this.msgLength = ml;
+        this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
+
+        this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
+        for (int i = 0; i < workerCount; i++) {
+            iloops.add(new CustomIOLoop());
+        }
+    }
+
+    /**
+     * Start the server IO loops and kicks off throughput tracking.
+     */
+    public void start() {
+        messages = new Counter();
+        bytes = new Counter();
+
+        for (CustomIOLoop l : iloops) {
+            ipool.execute(l);
+        }
+        apool.execute(aloop);
+
+        for (CustomIOLoop l : iloops) {
+            l.awaitStart(TIMEOUT);
+        }
+        aloop.awaitStart(TIMEOUT);
+    }
+
+    /**
+     * Stop the server IO loops and freezes throughput tracking.
+     */
+    public void stop() {
+        aloop.shutdown();
+        for (CustomIOLoop l : iloops) {
+            l.shutdown();
+        }
+
+        for (CustomIOLoop l : iloops) {
+            l.awaitStop(TIMEOUT);
+        }
+        aloop.awaitStop(TIMEOUT);
+
+        messages.freeze();
+        bytes.freeze();
+    }
+
+    /**
+     * Reports on the accumulated throughput trackers.
+     */
+    public void report() {
+        DecimalFormat f = new DecimalFormat("#,##0");
+        out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
+                           f.format(messages.total()), f.format(bytes.total()),
+                           f.format(messages.throughput()),
+                           f.format(bytes.throughput() / (1024 * msgLength))));
+    }
+
+    /**
+     * Prunes the IO loops of stale message buffers.
+     */
+    public void prune() {
+        for (CustomIOLoop l : iloops) {
+            l.pruneStaleStreams();
+        }
+    }
+
+    // Get the next worker to which a client should be assigned
+    private synchronized CustomIOLoop nextWorker() {
+        lastWorker = (lastWorker + 1) % workerCount;
+        return iloops.get(lastWorker);
+    }
+
+    // Loop for transfer of fixed-length messages
+    private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
+
+        public CustomIOLoop() throws IOException {
+            super(500);
+        }
+
+        @Override
+        protected TestMessageStream createStream(ByteChannel channel) {
+            return new TestMessageStream(msgLength, channel, this);
+        }
+
+        @Override
+        protected void removeStream(MessageStream<TestMessage> stream) {
+            super.removeStream(stream);
+
+            messages.add(stream.messagesIn().total());
+            bytes.add(stream.bytesIn().total());
+
+//            out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
+//                               FORMAT.format(stream.messagesIn().throughput()),
+//                               FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
+//                               FORMAT.format(stream.messagesOut().throughput()),
+//                               FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
+        }
+
+        @Override
+        protected void processMessages(List<TestMessage> messages,
+                                       MessageStream<TestMessage> stream) {
+            try {
+                stream.write(messages);
+            } catch (IOException e) {
+                log.error("Unable to echo messages", e);
+            }
+        }
+    }
+
+    // Loop for accepting client connections
+    private class CustomAcceptLoop extends AcceptorLoop {
+
+        public CustomAcceptLoop(SocketAddress address) throws IOException {
+            super(500, address);
+        }
+
+        @Override
+        protected void acceptConnection(ServerSocketChannel channel) throws IOException {
+            SocketChannel sc = channel.accept();
+            sc.configureBlocking(false);
+
+            Socket so = sc.socket();
+            so.setTcpNoDelay(SO_NO_DELAY);
+            so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
+            so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
+
+            nextWorker().acceptStream(sc);
+            log.info("Connected client");
+        }
+    }
+
+}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestIOClientCommand.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOClientCommand.java
new file mode 100644
index 0000000..52128e3
--- /dev/null
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOClientCommand.java
@@ -0,0 +1,24 @@
+package org.onlab.onos.foo;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onlab.onos.cli.AbstractShellCommand;
+
+import static org.onlab.onos.foo.IOLoopTestClient.startStandalone;
+
+/**
+ * Starts the test IO loop client.
+ */
+@Command(scope = "onos", name = "test-io-client",
+         description = "Starts the test IO loop client")
+public class TestIOClientCommand extends AbstractShellCommand {
+
+    @Override
+    protected void execute() {
+        try {
+            startStandalone(new String[]{});
+        } catch (Exception e) {
+            error("Unable to start server %s", e);
+        }
+    }
+
+}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestIOServerCommand.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOServerCommand.java
new file mode 100644
index 0000000..313141d
--- /dev/null
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOServerCommand.java
@@ -0,0 +1,25 @@
+package org.onlab.onos.foo;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onlab.onos.cli.AbstractShellCommand;
+
+import static org.onlab.onos.foo.IOLoopTestServer.startStandalone;
+
+
+/**
+ * Starts the test IO loop server.
+ */
+@Command(scope = "onos", name = "test-io-server",
+         description = "Starts the test IO loop server")
+public class TestIOServerCommand extends AbstractShellCommand {
+
+    @Override
+    protected void execute() {
+        try {
+            startStandalone(new String[]{});
+        } catch (Exception e) {
+            error("Unable to start server %s", e);
+        }
+    }
+
+}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestMessage.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessage.java
new file mode 100644
index 0000000..b6f1768
--- /dev/null
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessage.java
@@ -0,0 +1,41 @@
+package org.onlab.onos.foo;
+
+import org.onlab.nio.AbstractMessage;
+
+/**
+ * Fixed-length message.
+ */
+public class TestMessage extends AbstractMessage {
+
+    private final byte[] data;
+
+    /**
+     * Creates a new message with the specified length.
+     *
+     * @param length message length
+     */
+    public TestMessage(int length) {
+        this.length = length;
+        data = new byte[length];
+    }
+
+    /**
+     * Creates a new message with the specified data.
+     *
+     * @param data message data
+     */
+    TestMessage(byte[] data) {
+        this.length = data.length;
+        this.data = data;
+    }
+
+    /**
+     * Gets the backing byte array data.
+     *
+     * @return backing byte array
+     */
+    public byte[] data() {
+        return data;
+    }
+
+}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java
new file mode 100644
index 0000000..4a44682
--- /dev/null
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java
@@ -0,0 +1,58 @@
+package org.onlab.onos.foo;
+
+import org.onlab.nio.IOLoop;
+import org.onlab.nio.MessageStream;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+/**
+ * Fixed-length message transfer buffer.
+ */
+public class TestMessageStream extends MessageStream<TestMessage> {
+
+    private static final String E_WRONG_LEN = "Illegal message length: ";
+
+    private final int length;
+
+    /**
+     * Create a new buffer for transferring messages of the specified length.
+     *
+     * @param length message length
+     * @param ch     backing channel
+     * @param loop   driver loop
+     */
+    public TestMessageStream(int length, ByteChannel ch,
+                             IOLoop<TestMessage, ?> loop) {
+        super(loop, ch, 64 * 1024, 500);
+        this.length = length;
+    }
+
+    @Override
+    protected TestMessage read(ByteBuffer rb) {
+        if (rb.remaining() < length) {
+            return null;
+        }
+        TestMessage message = new TestMessage(length);
+        rb.get(message.data());
+        return message;
+    }
+
+    /**
+     * {@inheritDoc}
+     * <p/>
+     * This implementation enforces the message length against the buffer
+     * supported length.
+     *
+     * @throws IllegalArgumentException if message size does not match the
+     *                                  supported buffer size
+     */
+    @Override
+    protected void write(TestMessage message, ByteBuffer wb) {
+        if (message.length() != length) {
+            throw new IllegalArgumentException(E_WRONG_LEN + message.length());
+        }
+        wb.put(message.data());
+    }
+
+}
diff --git a/apps/foo/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/apps/foo/src/main/resources/OSGI-INF/blueprint/shell-config.xml
new file mode 100644
index 0000000..93bd020
--- /dev/null
+++ b/apps/foo/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -0,0 +1,12 @@
+<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0">
+
+    <command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
+        <command>
+            <action class="org.onlab.onos.foo.TestIOClientCommand"/>
+        </command>
+        <command>
+            <action class="org.onlab.onos.foo.TestIOServerCommand"/>
+        </command>
+    </command-bundle>
+
+</blueprint>