Added ability to measure round-trip latency and to assure message integrity.
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
index 72e7a59..29a1904 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
@@ -1,5 +1,6 @@
 package org.onlab.onos.foo;
 
+import com.google.common.collect.Lists;
 import org.onlab.nio.IOLoop;
 import org.onlab.nio.MessageStream;
 import org.onlab.util.Counter;
@@ -25,6 +26,7 @@
 import java.util.concurrent.TimeoutException;
 
 import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
 import static java.lang.System.out;
 import static org.onlab.onos.foo.IOLoopTestServer.PORT;
 import static org.onlab.util.Tools.delay;
@@ -48,15 +50,18 @@
 
     Counter messages;
     Counter bytes;
+    long latencyTotal = 0;
+    long latencyCount = 0;
+
 
     /**
      * Main entry point to launch the client.
      *
      * @param args command-line arguments
-     * @throws java.io.IOException          if unable to connect to server
-     * @throws InterruptedException if latch wait gets interrupted
-     * @throws java.util.concurrent.ExecutionException   if wait gets interrupted
-     * @throws java.util.concurrent.TimeoutException     if timeout occurred while waiting for completion
+     * @throws java.io.IOException                     if unable to connect to server
+     * @throws InterruptedException                    if latch wait gets interrupted
+     * @throws java.util.concurrent.ExecutionException if wait gets interrupted
+     * @throws java.util.concurrent.TimeoutException   if timeout occurred while waiting for completion
      */
     public static void main(String[] args)
             throws IOException, InterruptedException, ExecutionException, TimeoutException {
@@ -158,15 +163,17 @@
      * Waits for the client workers to complete.
      *
      * @param secs timeout in seconds
-     * @throws java.util.concurrent.ExecutionException   if execution failed
-     * @throws InterruptedException if interrupt occurred while waiting
-     * @throws java.util.concurrent.TimeoutException     if timeout occurred
+     * @throws java.util.concurrent.ExecutionException if execution failed
+     * @throws InterruptedException                    if interrupt occurred while waiting
+     * @throws java.util.concurrent.TimeoutException   if timeout occurred
      */
     public void await(int secs) throws InterruptedException,
             ExecutionException, TimeoutException {
         for (CustomIOLoop l : iloops) {
             if (l.worker.task != null) {
                 l.worker.task.get(secs, TimeUnit.SECONDS);
+                latencyTotal += l.latencyTotal;
+                latencyCount += l.latencyCount;
             }
         }
         messages.freeze();
@@ -178,10 +185,11 @@
      */
     public void report() {
         DecimalFormat f = new DecimalFormat("#,##0");
-        out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs",
+        out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency",
                            f.format(messages.total()), f.format(bytes.total()),
                            f.format(messages.throughput()),
-                           f.format(bytes.throughput() / (1024 * msgLength))));
+                           f.format(bytes.throughput() / (1024 * msgLength)),
+                           f.format(latencyTotal / latencyCount)));
     }
 
 
@@ -189,6 +197,9 @@
     private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
 
         Worker worker = new Worker();
+        long latencyTotal = 0;
+        long latencyCount = 0;
+
 
         public CustomIOLoop() throws IOException {
             super(500);
@@ -219,7 +230,12 @@
 
         @Override
         protected void processMessages(List<TestMessage> messages,
-                                       MessageStream<TestMessage> b) {
+                                       MessageStream<TestMessage> stream) {
+            for (TestMessage message : messages) {
+                // TODO: summarize latency data better
+                latencyTotal += currentTimeMillis() - message.requestorTime();
+                latencyCount++;
+            }
             worker.release(messages.size());
         }
 
@@ -241,15 +257,15 @@
         private static final int BATCH_SIZE = 1000;
         private static final int PERMITS = 2 * BATCH_SIZE;
 
-        private TestMessageStream b;
+        private TestMessageStream stream;
         private FutureTask<Worker> task;
 
         // Stuff to throttle pump
         private final Semaphore semaphore = new Semaphore(PERMITS);
         private int msgWritten;
 
-        void pump(TestMessageStream b) {
-            this.b = b;
+        void pump(TestMessageStream stream) {
+            this.stream = stream;
             task = new FutureTask<>(this, this);
             wpool.execute(task);
         }
@@ -259,18 +275,15 @@
             try {
                 log.info("Worker started...");
 
-                List<TestMessage> batch = new ArrayList<>();
-                for (int i = 0; i < BATCH_SIZE; i++) {
-                    batch.add(new TestMessage(msgLength));
-                }
-
                 while (msgWritten < msgCount) {
-                    msgWritten += writeBatch(b, batch);
+                    int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
+                    writeBatch(size);
+                    msgWritten += size;
                 }
 
                 // Now try to get all the permits back before sending poison pill
                 semaphore.acquireUninterruptibly(PERMITS);
-                b.close();
+                stream.close();
 
                 log.info("Worker done...");
 
@@ -280,18 +293,15 @@
         }
 
 
-        private int writeBatch(TestMessageStream b, List<TestMessage> batch)
-                throws IOException {
-            int count = Math.min(BATCH_SIZE, msgCount - msgWritten);
-            acquire(count);
-            if (count == BATCH_SIZE) {
-                b.write(batch);
-            } else {
-                for (int i = 0; i < count; i++) {
-                    b.write(batch.get(i));
-                }
+        private void writeBatch(int size) throws IOException {
+            // Build a batch of messages
+            List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
+            for (int i = 0; i < size; i++) {
+                batch.add(new TestMessage(msgLength, currentTimeMillis(), 0,
+                                          this.stream.padding(msgLength)));
             }
-            return count;
+            acquire(size);
+            stream.write(batch);
         }