Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
index 9942740..6f6bd6d 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
@@ -1,5 +1,6 @@
 package org.onlab.nio;
 
+import com.google.common.collect.Lists;
 import org.onlab.util.Counter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -23,6 +24,7 @@
 import java.util.concurrent.TimeoutException;
 
 import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
 import static java.lang.System.out;
 import static org.onlab.nio.IOLoopTestServer.PORT;
 import static org.onlab.util.Tools.delay;
@@ -46,15 +48,18 @@
 
     Counter messages;
     Counter bytes;
+    long latencyTotal = 0;
+    long latencyCount = 0;
+
 
     /**
      * Main entry point to launch the client.
      *
      * @param args command-line arguments
-     * @throws IOException          if unable to connect to server
-     * @throws InterruptedException if latch wait gets interrupted
-     * @throws ExecutionException   if wait gets interrupted
-     * @throws TimeoutException     if timeout occurred while waiting for completion
+     * @throws java.io.IOException                     if unable to connect to server
+     * @throws InterruptedException                    if latch wait gets interrupted
+     * @throws java.util.concurrent.ExecutionException if wait gets interrupted
+     * @throws java.util.concurrent.TimeoutException   if timeout occurred while waiting for completion
      */
     public static void main(String[] args)
             throws IOException, InterruptedException, ExecutionException, TimeoutException {
@@ -95,7 +100,7 @@
      * @param mc   message count to send per client
      * @param ml   message length in bytes
      * @param port socket port
-     * @throws IOException if unable to create IO loops
+     * @throws java.io.IOException if unable to create IO loops
      */
     public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
         this.ip = ip;
@@ -113,7 +118,7 @@
     /**
      * Starts the client workers.
      *
-     * @throws IOException if unable to open connection
+     * @throws java.io.IOException if unable to open connection
      */
     public void start() throws IOException {
         messages = new Counter();
@@ -141,7 +146,7 @@
      * channel with the given IO loop.
      *
      * @param loop loop with which the channel should be registered
-     * @throws IOException if the socket could not be open or connected
+     * @throws java.io.IOException if the socket could not be open or connected
      */
     private void openConnection(CustomIOLoop loop) throws IOException {
         SocketAddress sa = new InetSocketAddress(ip, port);
@@ -156,15 +161,17 @@
      * Waits for the client workers to complete.
      *
      * @param secs timeout in seconds
-     * @throws ExecutionException   if execution failed
-     * @throws InterruptedException if interrupt occurred while waiting
-     * @throws TimeoutException     if timeout occurred
+     * @throws java.util.concurrent.ExecutionException if execution failed
+     * @throws InterruptedException                    if interrupt occurred while waiting
+     * @throws java.util.concurrent.TimeoutException   if timeout occurred
      */
     public void await(int secs) throws InterruptedException,
             ExecutionException, TimeoutException {
         for (CustomIOLoop l : iloops) {
             if (l.worker.task != null) {
                 l.worker.task.get(secs, TimeUnit.SECONDS);
+                latencyTotal += l.latencyTotal;
+                latencyCount += l.latencyCount;
             }
         }
         messages.freeze();
@@ -176,10 +183,11 @@
      */
     public void report() {
         DecimalFormat f = new DecimalFormat("#,##0");
-        out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs",
+        out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency",
                            f.format(messages.total()), f.format(bytes.total()),
                            f.format(messages.throughput()),
-                           f.format(bytes.throughput() / (1024 * msgLength))));
+                           f.format(bytes.throughput() / (1024 * msgLength)),
+                           f.format(latencyTotal / latencyCount)));
     }
 
 
@@ -187,6 +195,9 @@
     private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
 
         Worker worker = new Worker();
+        long latencyTotal = 0;
+        long latencyCount = 0;
+
 
         public CustomIOLoop() throws IOException {
             super(500);
@@ -217,7 +228,12 @@
 
         @Override
         protected void processMessages(List<TestMessage> messages,
-                                       MessageStream<TestMessage> b) {
+                                       MessageStream<TestMessage> stream) {
+            for (TestMessage message : messages) {
+                // TODO: summarize latency data better
+                latencyTotal += currentTimeMillis() - message.requestorTime();
+                latencyCount++;
+            }
             worker.release(messages.size());
         }
 
@@ -239,15 +255,15 @@
         private static final int BATCH_SIZE = 1000;
         private static final int PERMITS = 2 * BATCH_SIZE;
 
-        private TestMessageStream b;
+        private TestMessageStream stream;
         private FutureTask<Worker> task;
 
         // Stuff to throttle pump
         private final Semaphore semaphore = new Semaphore(PERMITS);
         private int msgWritten;
 
-        void pump(TestMessageStream b) {
-            this.b = b;
+        void pump(TestMessageStream stream) {
+            this.stream = stream;
             task = new FutureTask<>(this, this);
             wpool.execute(task);
         }
@@ -257,18 +273,15 @@
             try {
                 log.info("Worker started...");
 
-                List<TestMessage> batch = new ArrayList<>();
-                for (int i = 0; i < BATCH_SIZE; i++) {
-                    batch.add(new TestMessage(msgLength));
-                }
-
                 while (msgWritten < msgCount) {
-                    msgWritten += writeBatch(b, batch);
+                    int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
+                    writeBatch(size);
+                    msgWritten += size;
                 }
 
                 // Now try to get all the permits back before sending poison pill
                 semaphore.acquireUninterruptibly(PERMITS);
-                b.close();
+                stream.close();
 
                 log.info("Worker done...");
 
@@ -278,18 +291,15 @@
         }
 
 
-        private int writeBatch(TestMessageStream b, List<TestMessage> batch)
-                throws IOException {
-            int count = Math.min(BATCH_SIZE, msgCount - msgWritten);
-            acquire(count);
-            if (count == BATCH_SIZE) {
-                b.write(batch);
-            } else {
-                for (int i = 0; i < count; i++) {
-                    b.write(batch.get(i));
-                }
+        private void writeBatch(int size) throws IOException {
+            // Build a batch of messages
+            List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
+            for (int i = 0; i < size; i++) {
+                batch.add(new TestMessage(msgLength, currentTimeMillis(), 0,
+                                          stream.padding()));
             }
-            return count;
+            acquire(size);
+            stream.write(batch);
         }
 
 
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
index 3bcbaa1..18566d7 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
@@ -1,5 +1,6 @@
 package org.onlab.nio;
 
+import com.google.common.collect.Lists;
 import org.onlab.util.Counter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -19,8 +20,9 @@
 import java.util.concurrent.Executors;
 
 import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
 import static java.lang.System.out;
-import static org.onlab.junit.TestTools.delay;
+import static org.onlab.util.Tools.delay;
 import static org.onlab.util.Tools.namedThreads;
 
 /**
@@ -58,7 +60,7 @@
      * Main entry point to launch the server.
      *
      * @param args command-line arguments
-     * @throws IOException if unable to crate IO loops
+     * @throws java.io.IOException if unable to crate IO loops
      */
     public static void main(String[] args) throws IOException {
         startStandalone(args);
@@ -94,7 +96,7 @@
      * @param wc   worker count
      * @param ml   message length in bytes
      * @param port listen port
-     * @throws IOException if unable to create IO loops
+     * @throws java.io.IOException if unable to create IO loops
      */
     public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
         this.workerCount = wc;
@@ -199,11 +201,20 @@
         protected void processMessages(List<TestMessage> messages,
                                        MessageStream<TestMessage> stream) {
             try {
-                stream.write(messages);
+                stream.write(createResponses(messages));
             } catch (IOException e) {
                 log.error("Unable to echo messages", e);
             }
         }
+
+        private List<TestMessage> createResponses(List<TestMessage> messages) {
+            List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
+            for (TestMessage message : messages) {
+                responses.add(new TestMessage(message.length(), message.requestorTime(),
+                                              currentTimeMillis(), message.padding()));
+            }
+            return responses;
+        }
     }
 
     // Loop for accepting client connections
diff --git a/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
index 583d0ec..40b5a4f 100644
--- a/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
+++ b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
@@ -23,11 +23,10 @@
  */
 public class MessageStreamTest {
 
-    private static final int SIZE = 16;
-    private static final TestMessage MESSAGE = new TestMessage(SIZE);
-
+    private static final int SIZE = 64;
     private static final int BIG_SIZE = 32 * 1024;
-    private static final TestMessage BIG_MESSAGE = new TestMessage(BIG_SIZE);
+
+    private TestMessage message;
 
     private TestIOLoop loop;
     private TestByteChannel channel;
@@ -41,6 +40,8 @@
         key = new TestKey(channel);
         stream = loop.createStream(channel);
         stream.setKey(key);
+        stream.setNonStrict();
+        message = new TestMessage(SIZE, 0, 0, stream.padding());
     }
 
     @After
@@ -68,11 +69,13 @@
     public void bufferGrowth() throws IOException {
         // Create a stream for big messages and test the growth.
         stream = new TestMessageStream(BIG_SIZE, channel, loop);
-        stream.write(BIG_MESSAGE);
-        stream.write(BIG_MESSAGE);
-        stream.write(BIG_MESSAGE);
-        stream.write(BIG_MESSAGE);
-        stream.write(BIG_MESSAGE);
+        TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding());
+
+        stream.write(bigMessage);
+        stream.write(bigMessage);
+        stream.write(bigMessage);
+        stream.write(bigMessage);
+        stream.write(bigMessage);
     }
 
     @Test
@@ -102,25 +105,25 @@
         validate(false, false, 0, 0);
 
         // First write is immediate...
-        stream.write(MESSAGE);
+        stream.write(message);
         validate(false, false, 0, SIZE);
 
         // Second and third get buffered...
-        stream.write(MESSAGE);
+        stream.write(message);
         validate(false, true, 0, SIZE);
-        stream.write(MESSAGE);
+        stream.write(message);
         validate(false, true, 0, SIZE);
 
         // Reset write, which will flush if needed; the next write is again buffered
         stream.flushIfWriteNotPending();
         validate(false, false, 0, SIZE * 3);
-        stream.write(MESSAGE);
+        stream.write(message);
         validate(false, true, 0, SIZE * 3);
 
         // Select reset, which will flush if needed; the next write is again buffered
         stream.flushIfPossible();
         validate(false, false, 0, SIZE * 4);
-        stream.write(MESSAGE);
+        stream.write(message);
         validate(false, true, 0, SIZE * 4);
         stream.flush();
         validate(false, true, 0, SIZE * 4);
@@ -132,10 +135,10 @@
 
         // First write is immediate...
         List<TestMessage> messages = new ArrayList<>();
-        messages.add(MESSAGE);
-        messages.add(MESSAGE);
-        messages.add(MESSAGE);
-        messages.add(MESSAGE);
+        messages.add(message);
+        messages.add(message);
+        messages.add(message);
+        messages.add(message);
 
         stream.write(messages);
         validate(false, false, 0, SIZE * 4);
@@ -152,14 +155,14 @@
         validate(false, false, 0, 0);
 
         // First write is immediate...
-        stream.write(MESSAGE);
+        stream.write(message);
         validate(false, false, 0, SIZE);
 
         // Tell test channel to accept only half.
         channel.bytesToWrite = SIZE / 2;
 
         // Second and third get buffered...
-        stream.write(MESSAGE);
+        stream.write(message);
         validate(false, true, 0, SIZE);
         stream.flushIfPossible();
         validate(true, true, 0, SIZE + SIZE / 2);
@@ -170,14 +173,14 @@
         validate(false, false, 0, 0);
 
         // First write is immediate...
-        stream.write(MESSAGE);
+        stream.write(message);
         validate(false, false, 0, SIZE);
 
         // Tell test channel to accept only half.
         channel.bytesToWrite = SIZE / 2;
 
         // Second and third get buffered...
-        stream.write(MESSAGE);
+        stream.write(message);
         validate(false, true, 0, SIZE);
         stream.flushIfWriteNotPending();
         validate(true, true, 0, SIZE + SIZE / 2);
@@ -190,7 +193,7 @@
         assertEquals(1, messages.size());
         validate(false, false, SIZE + 4, 0);
 
-        stream.write(MESSAGE);
+        stream.write(message);
         validate(false, false, SIZE + 4, SIZE);
 
         channel.bytesToRead = SIZE - 4;
diff --git a/utils/nio/src/test/java/org/onlab/nio/TestMessage.java b/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
index 00315ec..1ce469a 100644
--- a/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
+++ b/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
@@ -1,39 +1,41 @@
 package org.onlab.nio;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
- * Fixed-length message.
+ * Test message for measuring rate and round-trip latency.
  */
 public class TestMessage extends AbstractMessage {
 
-    private final byte[] data;
+    private final byte[] padding;
 
-    /**
-     * Creates a new message with the specified length.
-     *
-     * @param length message length
-     */
-    public TestMessage(int length) {
-        this.length = length;
-        data = new byte[length];
-    }
+    private final long requestorTime;
+    private final long responderTime;
 
     /**
      * Creates a new message with the specified data.
      *
-     * @param data message data
+     * @param requestorTime requester time
+     * @param responderTime responder time
+     * @param padding       message padding
      */
-    TestMessage(byte[] data) {
-        this.length = data.length;
-        this.data = data;
+    TestMessage(int length, long requestorTime, long responderTime, byte[] padding) {
+        this.length = length;
+        this.requestorTime = requestorTime;
+        this.responderTime = responderTime;
+        this.padding = checkNotNull(padding, "Padding cannot be null");
     }
 
-    /**
-     * Gets the backing byte array data.
-     *
-     * @return backing byte array
-     */
-    public byte[] data() {
-        return data;
+    public long requestorTime() {
+        return requestorTime;
+    }
+
+    public long responderTime() {
+        return responderTime;
+    }
+
+    public byte[] padding() {
+        return padding;
     }
 
 }
diff --git a/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java b/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
index a8ab8fa..c357fe4 100644
--- a/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
+++ b/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
@@ -3,53 +3,72 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
 /**
  * Fixed-length message transfer buffer.
  */
 public class TestMessageStream extends MessageStream<TestMessage> {
 
     private static final String E_WRONG_LEN = "Illegal message length: ";
+    private static final long START_TAG = 0xfeedcafedeaddeedL;
+    private static final long END_TAG = 0xbeadcafedeaddeedL;
+    private static final int META_LENGTH = 40;
 
     private final int length;
+    private boolean isStrict = true;
 
-    /**
-     * Create a new buffer for transferring messages of the specified length.
-     *
-     * @param length message length
-     * @param ch     backing channel
-     * @param loop   driver loop
-     */
-    public TestMessageStream(int length, ByteChannel ch,
-                             IOLoop<TestMessage, ?> loop) {
+    public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) {
         super(loop, ch, 64 * 1024, 500);
+        checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40");
         this.length = length;
     }
 
+    void setNonStrict() {
+        isStrict = false;
+    }
+
     @Override
     protected TestMessage read(ByteBuffer rb) {
         if (rb.remaining() < length) {
             return null;
         }
-        TestMessage message = new TestMessage(length);
-        rb.get(message.data());
-        return message;
+
+        long startTag = rb.getLong();
+        if (isStrict) {
+            checkState(startTag == START_TAG, "Incorrect message start");
+        }
+
+        long size = rb.getLong();
+        long requestorTime = rb.getLong();
+        long responderTime = rb.getLong();
+        byte[] padding = padding();
+        rb.get(padding);
+
+        long endTag = rb.getLong();
+        if (isStrict) {
+            checkState(endTag == END_TAG, "Incorrect message end");
+        }
+
+        return new TestMessage((int) size, requestorTime, responderTime, padding);
     }
 
-    /**
-     * {@inheritDoc}
-     * <p/>
-     * This implementation enforces the message length against the buffer
-     * supported length.
-     *
-     * @throws IllegalArgumentException if message size does not match the
-     *                                  supported buffer size
-     */
     @Override
     protected void write(TestMessage message, ByteBuffer wb) {
         if (message.length() != length) {
             throw new IllegalArgumentException(E_WRONG_LEN + message.length());
         }
-        wb.put(message.data());
+
+        wb.putLong(START_TAG);
+        wb.putLong(message.length());
+        wb.putLong(message.requestorTime());
+        wb.putLong(message.responderTime());
+        wb.put(message.padding(), 0, length - META_LENGTH);
+        wb.putLong(END_TAG);
     }
 
+    public byte[] padding() {
+        return new byte[length - META_LENGTH];
+    }
 }