More IO loop work.
diff --git a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
index b2acca4..89107bf 100644
--- a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
+++ b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
@@ -1,5 +1,6 @@
 package org.onlab.nio;
 
+import org.onlab.util.Counter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +41,10 @@
     private Exception ioError;
     private long lastActiveTime;
 
+    private final Counter bytesIn = new Counter();
+    private final Counter messagesIn = new Counter();
+    private final Counter bytesOut = new Counter();
+    private final Counter messagesOut = new Counter();
 
     /**
      * Creates a message stream associated with the specified IO loop and
@@ -93,6 +98,11 @@
             closed = true;
         }
 
+        bytesIn.freeze();
+        bytesOut.freeze();
+        messagesIn.freeze();
+        messagesOut.freeze();
+
         loop.removeStream(this);
         if (key != null) {
             try {
@@ -176,6 +186,8 @@
                 inbound.flip();
                 while ((message = read(inbound)) != null) {
                     messages.add(message);
+                    messagesIn.add(1);
+                    bytesIn.add(message.length());
                 }
                 inbound.compact();
 
@@ -226,8 +238,9 @@
         while (outbound.remaining() < message.length()) {
             doubleSize();
         }
-        // Place the message into the buffer and bump the output trackers.
         write(message, outbound);
+        messagesOut.add(1);
+        bytesOut.add(message.length());
     }
 
     // Forces a flush, unless one is planned already.
@@ -273,6 +286,18 @@
         }
     }
 
+
+    /**
+     * Indicates whether data has been written but not flushed yet.
+     *
+     * @return true if flush is required
+     */
+    boolean isFlushRequired() {
+        synchronized (this) {
+            return outbound.position() > 0;
+        }
+    }
+
     /**
      * Attempts to flush data, internal stream state and channel availability
      * permitting. Invoked by the driver I/O loop during handling of writable
@@ -344,4 +369,40 @@
         return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
     }
 
+    /**
+     * Returns the inbound bytes counter.
+     *
+     * @return inbound bytes counter
+     */
+    public Counter bytesIn() {
+        return bytesIn;
+    }
+
+    /**
+     * Returns the outbound bytes counter.
+     *
+     * @return outbound bytes counter
+     */
+    public Counter bytesOut() {
+        return bytesOut;
+    }
+
+    /**
+     * Returns the inbound messages counter.
+     *
+     * @return inbound messages counter
+     */
+    public Counter messagesIn() {
+        return messagesIn;
+    }
+
+    /**
+     * Returns the outbound messages counter.
+     *
+     * @return outbound messages counter
+     */
+    public Counter messagesOut() {
+        return messagesOut;
+    }
+
 }