More IO loop work.
diff --git a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
index b2acca4..89107bf 100644
--- a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
+++ b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
@@ -1,5 +1,6 @@
 package org.onlab.nio;
 
+import org.onlab.util.Counter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +41,10 @@
     private Exception ioError;
     private long lastActiveTime;
 
+    private final Counter bytesIn = new Counter();
+    private final Counter messagesIn = new Counter();
+    private final Counter bytesOut = new Counter();
+    private final Counter messagesOut = new Counter();
 
     /**
      * Creates a message stream associated with the specified IO loop and
@@ -93,6 +98,11 @@
             closed = true;
         }
 
+        bytesIn.freeze();
+        bytesOut.freeze();
+        messagesIn.freeze();
+        messagesOut.freeze();
+
         loop.removeStream(this);
         if (key != null) {
             try {
@@ -176,6 +186,8 @@
                 inbound.flip();
                 while ((message = read(inbound)) != null) {
                     messages.add(message);
+                    messagesIn.add(1);
+                    bytesIn.add(message.length());
                 }
                 inbound.compact();
 
@@ -226,8 +238,9 @@
         while (outbound.remaining() < message.length()) {
             doubleSize();
         }
-        // Place the message into the buffer and bump the output trackers.
         write(message, outbound);
+        messagesOut.add(1);
+        bytesOut.add(message.length());
     }
 
     // Forces a flush, unless one is planned already.
@@ -273,6 +286,18 @@
         }
     }
 
+
+    /**
+     * Indicates whether data has been written but not flushed yet.
+     *
+     * @return true if flush is required
+     */
+    boolean isFlushRequired() {
+        synchronized (this) {
+            return outbound.position() > 0;
+        }
+    }
+
     /**
      * Attempts to flush data, internal stream state and channel availability
      * permitting. Invoked by the driver I/O loop during handling of writable
@@ -344,4 +369,40 @@
         return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
     }
 
+    /**
+     * Returns the inbound bytes counter.
+     *
+     * @return inbound bytes counter
+     */
+    public Counter bytesIn() {
+        return bytesIn;
+    }
+
+    /**
+     * Returns the outbound bytes counter.
+     *
+     * @return outbound bytes counter
+     */
+    public Counter bytesOut() {
+        return bytesOut;
+    }
+
+    /**
+     * Returns the inbound messages counter.
+     *
+     * @return inbound messages counter
+     */
+    public Counter messagesIn() {
+        return messagesIn;
+    }
+
+    /**
+     * Returns the outbound messages counter.
+     *
+     * @return outbound messages counter
+     */
+    public Counter messagesOut() {
+        return messagesOut;
+    }
+
 }
diff --git a/utils/nio/src/main/java/org/onlab/nio/package-info.java b/utils/nio/src/main/java/org/onlab/nio/package-info.java
new file mode 100644
index 0000000..d5ddd10
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Mechanism to transfer messages over network using IO loop and
+ * message stream, backed by NIO byte buffers.
+ */
+package org.onlab.nio;
\ No newline at end of file
diff --git a/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedClient.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopClient.java
similarity index 83%
rename from utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedClient.java
rename to utils/nio/src/test/java/org/onlab/nio/IOLoopClient.java
index 6445221..2497e0e 100644
--- a/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedClient.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopClient.java
@@ -1,5 +1,6 @@
 package org.onlab.nio;
 
+import org.onlab.util.Counter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -10,6 +11,7 @@
 import java.nio.channels.ByteChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.text.DecimalFormat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -26,9 +28,9 @@
 /**
  * Auxiliary test fixture to measure speed of NIO-based channels.
  */
-public class StandaloneSpeedClient {
+public class IOLoopClient {
 
-    private static Logger log = LoggerFactory.getLogger(StandaloneSpeedClient.class);
+    private static Logger log = LoggerFactory.getLogger(IOLoopClient.class);
 
     private final InetAddress ip;
     private final int port;
@@ -39,8 +41,8 @@
     private final ExecutorService ipool;
     private final ExecutorService wpool;
 
-//    ThroughputTracker messages;
-//    ThroughputTracker bytes;
+    Counter messages;
+    Counter bytes;
 
     /**
      * Main entry point to launch the client.
@@ -61,7 +63,7 @@
 
         log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
                  wc, mc, ml, ip);
-        StandaloneSpeedClient sc = new StandaloneSpeedClient(ip, wc, mc, ml, StandaloneSpeedServer.PORT);
+        IOLoopClient sc = new IOLoopClient(ip, wc, mc, ml, IOLoopServer.PORT);
 
         sc.start();
         delay(2000);
@@ -82,7 +84,7 @@
      * @param port socket port
      * @throws IOException if unable to create IO loops
      */
-    public StandaloneSpeedClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
+    public IOLoopClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
         this.ip = ip;
         this.port = port;
         this.msgCount = mc;
@@ -101,15 +103,15 @@
      * @throws IOException if unable to open connection
      */
     public void start() throws IOException {
-//        messages = new ThroughputTracker();
-//        bytes = new ThroughputTracker();
+        messages = new Counter();
+        bytes = new Counter();
 
         // First start up all the IO loops
         for (CustomIOLoop l : iloops) {
             ipool.execute(l);
         }
 
-//        // Wait for all of them to get going
+        // Wait for all of them to get going
 //        for (CustomIOLoop l : iloops)
 //            l.waitForStart(TIMEOUT);
 
@@ -151,20 +153,20 @@
                 l.worker.task.get(secs, TimeUnit.SECONDS);
             }
         }
-//        messages.freeze();
-//        bytes.freeze();
+        messages.freeze();
+        bytes.freeze();
     }
 
     /**
      * Reports on the accumulated throughput trackers.
      */
     public void report() {
-//        DecimalFormat f = new DecimalFormat("#,##0");
-//        log.info("{} messages; {} bytes; {} mps; {} Mbs",
-//                 f.format(messages.total()),
-//                 f.format(bytes.total()),
-//                 f.format(messages.throughput()),
-//                 f.format(bytes.throughput() / (1024 * 128)));
+        DecimalFormat f = new DecimalFormat("#,##0");
+        log.info("{} messages; {} bytes; {} mps; {} Mbs",
+                 f.format(messages.total()),
+                 f.format(bytes.total()),
+                 f.format(messages.throughput()),
+                 f.format(bytes.throughput() / (1024 * 128)));
     }
 
 
@@ -187,16 +189,16 @@
         protected synchronized void removeStream(MessageStream<TestMessage> b) {
             super.removeStream(b);
 
-//            messages.add(b.inMessages().total());
-//            bytes.add(b.inBytes().total());
-//            b.inMessages().reset();
-//            b.inBytes().reset();
-
-//            log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
-//                     StandaloneSpeedServer.format.format(b.inMessages().throughput()),
-//                     StandaloneSpeedServer.format.format(b.inBytes().throughput() / (1024 * 128)),
-//                     StandaloneSpeedServer.format.format(b.outMessages().throughput()),
-//                     StandaloneSpeedServer.format.format(b.outBytes().throughput() / (1024 * 128)));
+            messages.add(b.messagesIn().total());
+            bytes.add(b.bytesIn().total());
+            b.messagesOut().reset();
+            b.bytesOut().reset();
+//
+            log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
+                     IOLoopServer.FORMAT.format(b.messagesIn().throughput()),
+                     IOLoopServer.FORMAT.format(b.bytesIn().throughput() / (1024 * 128)),
+                     IOLoopServer.FORMAT.format(b.messagesOut().throughput()),
+                     IOLoopServer.FORMAT.format(b.bytesOut().throughput() / (1024 * 128)));
         }
 
         @Override
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
index 21843f0..b7f706e 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
@@ -48,11 +48,11 @@
 
         // Setup the test on a random port to avoid intermittent test failures
         // due to the port being already bound.
-        int port = StandaloneSpeedServer.PORT + new Random().nextInt(100);
+        int port = IOLoopServer.PORT + new Random().nextInt(100);
 
         InetAddress ip = InetAddress.getLoopbackAddress();
-        StandaloneSpeedServer sss = new StandaloneSpeedServer(ip, THREADS, size, port);
-        StandaloneSpeedClient ssc = new StandaloneSpeedClient(ip, THREADS, count, size, port);
+        IOLoopServer sss = new IOLoopServer(ip, THREADS, size, port);
+        IOLoopClient ssc = new IOLoopClient(ip, THREADS, count, size, port);
 
         sss.start();
         ssc.start();
@@ -64,32 +64,6 @@
         delay(1000);
         sss.stop();
         sss.report();
-
-        // Note that the client and server will have potentially significantly
-        // differing rates. This is due to the wide variance in how tightly
-        // the throughput tracking starts & stops relative to to the short
-        // test duration.
-//        System.out.println(f.format(ssc.messages.throughput()) + " mps");
-
-//        // Make sure client sent everything.
-//        assertEquals("incorrect client message count sent",
-//                     (long) count * THREADS, ssc.messages.total());
-//        assertEquals("incorrect client bytes count sent",
-//                     (long) size * count * THREADS, ssc.bytes.total());
-//
-//        // Make sure server received everything.
-//        assertEquals("incorrect server message count received",
-//                     (long) count * THREADS, sss.messages.total());
-//        assertEquals("incorrect server bytes count received",
-//                     (long) size * count * THREADS, sss.bytes.total());
-//
-//        // Make sure speeds were reasonable.
-//        if (mps > 0.0) {
-//            assertAboveThreshold("insufficient client speed", mps,
-//                                 ssc.messages.throughput());
-//            assertAboveThreshold("insufficient server speed", mps / 2,
-//                                 sss.messages.throughput());
-//        }
     }
 
 }
diff --git a/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedServer.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopServer.java
similarity index 80%
rename from utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedServer.java
rename to utils/nio/src/test/java/org/onlab/nio/IOLoopServer.java
index a193f50..d133e75 100644
--- a/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedServer.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopServer.java
@@ -1,5 +1,6 @@
 package org.onlab.nio;
 
+import org.onlab.util.Counter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -23,9 +24,9 @@
 /**
  * Auxiliary test fixture to measure speed of NIO-based channels.
  */
-public class StandaloneSpeedServer {
+public class IOLoopServer {
 
-    private static Logger log = LoggerFactory.getLogger(StandaloneSpeedServer.class);
+    private static Logger log = LoggerFactory.getLogger(IOLoopServer.class);
 
     private static final int PRUNE_FREQUENCY = 1000;
 
@@ -48,8 +49,8 @@
     private final int msgLength;
     private int lastWorker = -1;
 
-//    ThroughputTracker messages;
-//    ThroughputTracker bytes;
+    Counter messages;
+    Counter bytes;
 
     /**
      * Main entry point to launch the server.
@@ -64,7 +65,7 @@
 
         log.info("Setting up the server with {} workers, {} byte messages on {}... ",
                  wc, ml, ip);
-        StandaloneSpeedServer ss = new StandaloneSpeedServer(ip, wc, ml, PORT);
+        IOLoopServer ss = new IOLoopServer(ip, wc, ml, PORT);
         ss.start();
 
         // Start pruning clients.
@@ -83,7 +84,7 @@
      * @param port listen port
      * @throws IOException if unable to create IO loops
      */
-    public StandaloneSpeedServer(InetAddress ip, int wc, int ml, int port) throws IOException {
+    public IOLoopServer(InetAddress ip, int wc, int ml, int port) throws IOException {
         this.workerCount = wc;
         this.msgLength = ml;
         this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
@@ -98,14 +99,14 @@
      * Start the server IO loops and kicks off throughput tracking.
      */
     public void start() {
-//        messages = new ThroughputTracker();
-//        bytes = new ThroughputTracker();
+        messages = new Counter();
+        bytes = new Counter();
 
         for (CustomIOLoop l : iloops) {
             ipool.execute(l);
         }
         apool.execute(aloop);
-//
+
 //        for (CustomIOLoop l : iloops)
 //            l.waitForStart(TIMEOUT);
 //        aloop.waitForStart(TIMEOUT);
@@ -124,20 +125,20 @@
 //            l.waitForFinish(TIMEOUT);
 //        aloop.waitForFinish(TIMEOUT);
 //
-//        messages.freeze();
-//        bytes.freeze();
+        messages.freeze();
+        bytes.freeze();
     }
 
     /**
      * Reports on the accumulated throughput trackers.
      */
     public void report() {
-//        DecimalFormat f = new DecimalFormat("#,##0");
-//        log.info("{} messages; {} bytes; {} mps; {} Mbs",
-//                 f.format(messages.total()),
-//                 f.format(bytes.total()),
-//                 f.format(messages.throughput()),
-//                 f.format(bytes.throughput() / (1024 * 128)));
+        DecimalFormat f = new DecimalFormat("#,##0");
+        log.info("{} messages; {} bytes; {} mps; {} Mbs",
+                 f.format(messages.total()),
+                 f.format(bytes.total()),
+                 f.format(messages.throughput()),
+                 f.format(bytes.throughput() / (1024 * 128)));
     }
 
     /**
@@ -170,15 +171,15 @@
         @Override
         protected void removeStream(MessageStream<TestMessage> stream) {
             super.removeStream(stream);
-//
-//            messages.add(b.inMessages().total());
-//            bytes.add(b.inBytes().total());
-//
-//            log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
-//                     format.format(b.inMessages().throughput()),
-//                     format.format(b.inBytes().throughput() / (1024 * 128)),
-//                     format.format(b.outMessages().throughput()),
-//                     format.format(b.outBytes().throughput() / (1024 * 128)));
+
+            messages.add(stream.messagesIn().total());
+            bytes.add(stream.bytesIn().total());
+
+            log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
+                     FORMAT.format(stream.messagesIn().throughput()),
+                     FORMAT.format(stream.bytesIn().throughput() / (1024 * 128)),
+                     FORMAT.format(stream.messagesOut().throughput()),
+                     FORMAT.format(stream.bytesOut().throughput() / (1024 * 128)));
         }
 
         @Override
diff --git a/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
index f0f896f..583d0ec 100644
--- a/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
+++ b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
@@ -23,57 +23,36 @@
  */
 public class MessageStreamTest {
 
-    private static final int LENGTH = 16;
-
-    private static final TestMessage TM1 = new TestMessage(LENGTH);
-    private static final TestMessage TM2 = new TestMessage(LENGTH);
-    private static final TestMessage TM3 = new TestMessage(LENGTH);
-    private static final TestMessage TM4 = new TestMessage(LENGTH);
+    private static final int SIZE = 16;
+    private static final TestMessage MESSAGE = new TestMessage(SIZE);
 
     private static final int BIG_SIZE = 32 * 1024;
     private static final TestMessage BIG_MESSAGE = new TestMessage(BIG_SIZE);
 
-    private static enum WritePending {
-        ON, OFF;
-
-        public boolean on() {
-            return this == ON;
-        }
-    }
-
-    private static enum FlushRequired {
-        ON, OFF;
-
-        public boolean on() {
-            return this == ON;
-        }
-    }
-
-    private FakeIOLoop loop;
+    private TestIOLoop loop;
     private TestByteChannel channel;
-    private TestMessageStream buffer;
+    private TestMessageStream stream;
     private TestKey key;
 
     @Before
     public void setUp() throws IOException {
-        loop = new FakeIOLoop();
+        loop = new TestIOLoop();
         channel = new TestByteChannel();
         key = new TestKey(channel);
-        buffer = loop.createStream(channel);
-        buffer.setKey(key);
+        stream = loop.createStream(channel);
+        stream.setKey(key);
     }
 
     @After
     public void tearDown() {
         loop.shutdown();
-        buffer.close();
+        stream.close();
     }
 
-    // Check state of the message buffer
-    private void assertState(WritePending wp, FlushRequired fr,
-                             int read, int written) {
-        assertEquals(wp.on(), buffer.isWritePending());
-//        assertEquals(fr.on(), buffer.requiresFlush());
+    // Validates the state of the message stream
+    private void validate(boolean wp, boolean fr, int read, int written) {
+        assertEquals(wp, stream.isWritePending());
+        assertEquals(fr, stream.isFlushRequired());
         assertEquals(read, channel.readBytes);
         assertEquals(written, channel.writtenBytes);
     }
@@ -81,155 +60,155 @@
     @Test
     public void endOfStream() throws IOException {
         channel.close();
-        List<TestMessage> messages = buffer.read();
+        List<TestMessage> messages = stream.read();
         assertNull(messages);
     }
 
     @Test
     public void bufferGrowth() throws IOException {
-        // Create a buffer for big messages and test the growth.
-        buffer = new TestMessageStream(BIG_SIZE, channel, loop);
-        buffer.write(BIG_MESSAGE);
-        buffer.write(BIG_MESSAGE);
-        buffer.write(BIG_MESSAGE);
-        buffer.write(BIG_MESSAGE);
-        buffer.write(BIG_MESSAGE);
+        // Create a stream for big messages and test the growth.
+        stream = new TestMessageStream(BIG_SIZE, channel, loop);
+        stream.write(BIG_MESSAGE);
+        stream.write(BIG_MESSAGE);
+        stream.write(BIG_MESSAGE);
+        stream.write(BIG_MESSAGE);
+        stream.write(BIG_MESSAGE);
     }
 
     @Test
     public void discardBeforeKey() {
-        // Create a buffer that does not yet have the key set and discard it.
-        buffer = loop.createStream(channel);
-        assertNull(buffer.key());
-        buffer.close();
+        // Create a stream that does not yet have the key set and discard it.
+        stream = loop.createStream(channel);
+        assertNull(stream.key());
+        stream.close();
         // There is not key, so nothing to check; we just expect no problem.
     }
 
     @Test
     public void bufferedRead() throws IOException {
-        channel.bytesToRead = LENGTH + 4;
-        List<TestMessage> messages = buffer.read();
+        channel.bytesToRead = SIZE + 4;
+        List<TestMessage> messages = stream.read();
         assertEquals(1, messages.size());
-        assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, 0);
+        validate(false, false, SIZE + 4, 0);
 
-        channel.bytesToRead = LENGTH - 4;
-        messages = buffer.read();
+        channel.bytesToRead = SIZE - 4;
+        messages = stream.read();
         assertEquals(1, messages.size());
-        assertState(WritePending.OFF, FlushRequired.OFF, LENGTH * 2, 0);
+        validate(false, false, SIZE * 2, 0);
     }
 
     @Test
     public void bufferedWrite() throws IOException {
-        assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+        validate(false, false, 0, 0);
 
         // First write is immediate...
-        buffer.write(TM1);
-        assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
+        stream.write(MESSAGE);
+        validate(false, false, 0, SIZE);
 
         // Second and third get buffered...
-        buffer.write(TM2);
-        assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
-        buffer.write(TM3);
-        assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
+        stream.write(MESSAGE);
+        validate(false, true, 0, SIZE);
+        stream.write(MESSAGE);
+        validate(false, true, 0, SIZE);
 
         // Reset write, which will flush if needed; the next write is again buffered
-        buffer.flushIfWriteNotPending();
-        assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 3);
-        buffer.write(TM4);
-        assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 3);
+        stream.flushIfWriteNotPending();
+        validate(false, false, 0, SIZE * 3);
+        stream.write(MESSAGE);
+        validate(false, true, 0, SIZE * 3);
 
         // Select reset, which will flush if needed; the next write is again buffered
-        buffer.flushIfPossible();
-        assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 4);
-        buffer.write(TM1);
-        assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
-        buffer.flush();
-        assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
+        stream.flushIfPossible();
+        validate(false, false, 0, SIZE * 4);
+        stream.write(MESSAGE);
+        validate(false, true, 0, SIZE * 4);
+        stream.flush();
+        validate(false, true, 0, SIZE * 4);
     }
 
     @Test
     public void bufferedWriteList() throws IOException {
-        assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+        validate(false, false, 0, 0);
 
         // First write is immediate...
-        List<TestMessage> messages = new ArrayList<TestMessage>();
-        messages.add(TM1);
-        messages.add(TM2);
-        messages.add(TM3);
-        messages.add(TM4);
+        List<TestMessage> messages = new ArrayList<>();
+        messages.add(MESSAGE);
+        messages.add(MESSAGE);
+        messages.add(MESSAGE);
+        messages.add(MESSAGE);
 
-        buffer.write(messages);
-        assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 4);
+        stream.write(messages);
+        validate(false, false, 0, SIZE * 4);
 
-        buffer.write(messages);
-        assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
+        stream.write(messages);
+        validate(false, true, 0, SIZE * 4);
 
-        buffer.flushIfPossible();
-        assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 8);
+        stream.flushIfPossible();
+        validate(false, false, 0, SIZE * 8);
     }
 
     @Test
     public void bufferedPartialWrite() throws IOException {
-        assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+        validate(false, false, 0, 0);
 
         // First write is immediate...
-        buffer.write(TM1);
-        assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
+        stream.write(MESSAGE);
+        validate(false, false, 0, SIZE);
 
         // Tell test channel to accept only half.
-        channel.bytesToWrite = LENGTH / 2;
+        channel.bytesToWrite = SIZE / 2;
 
         // Second and third get buffered...
-        buffer.write(TM2);
-        assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
-        buffer.flushIfPossible();
-        assertState(WritePending.ON, FlushRequired.ON, 0, LENGTH + LENGTH / 2);
+        stream.write(MESSAGE);
+        validate(false, true, 0, SIZE);
+        stream.flushIfPossible();
+        validate(true, true, 0, SIZE + SIZE / 2);
     }
 
     @Test
     public void bufferedPartialWrite2() throws IOException {
-        assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+        validate(false, false, 0, 0);
 
         // First write is immediate...
-        buffer.write(TM1);
-        assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
+        stream.write(MESSAGE);
+        validate(false, false, 0, SIZE);
 
         // Tell test channel to accept only half.
-        channel.bytesToWrite = LENGTH / 2;
+        channel.bytesToWrite = SIZE / 2;
 
         // Second and third get buffered...
-        buffer.write(TM2);
-        assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
-        buffer.flushIfWriteNotPending();
-        assertState(WritePending.ON, FlushRequired.ON, 0, LENGTH + LENGTH / 2);
+        stream.write(MESSAGE);
+        validate(false, true, 0, SIZE);
+        stream.flushIfWriteNotPending();
+        validate(true, true, 0, SIZE + SIZE / 2);
     }
 
     @Test
     public void bufferedReadWrite() throws IOException {
-        channel.bytesToRead = LENGTH + 4;
-        List<TestMessage> messages = buffer.read();
+        channel.bytesToRead = SIZE + 4;
+        List<TestMessage> messages = stream.read();
         assertEquals(1, messages.size());
-        assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, 0);
+        validate(false, false, SIZE + 4, 0);
 
-        buffer.write(TM1);
-        assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, LENGTH);
+        stream.write(MESSAGE);
+        validate(false, false, SIZE + 4, SIZE);
 
-        channel.bytesToRead = LENGTH - 4;
-        messages = buffer.read();
+        channel.bytesToRead = SIZE - 4;
+        messages = stream.read();
         assertEquals(1, messages.size());
-        assertState(WritePending.OFF, FlushRequired.OFF, LENGTH * 2, LENGTH);
+        validate(false, false, SIZE * 2, SIZE);
     }
 
     // Fake IO driver loop
-    private static class FakeIOLoop extends IOLoop<TestMessage, TestMessageStream> {
+    private static class TestIOLoop extends IOLoop<TestMessage, TestMessageStream> {
 
-        public FakeIOLoop() throws IOException {
+        public TestIOLoop() throws IOException {
             super(500);
         }
 
         @Override
         protected TestMessageStream createStream(ByteChannel channel) {
-            return new TestMessageStream(LENGTH, channel, this);
+            return new TestMessageStream(SIZE, channel, this);
         }
 
         @Override