Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/apps/foo/pom.xml b/apps/foo/pom.xml
index 868b992..860d70b 100644
--- a/apps/foo/pom.xml
+++ b/apps/foo/pom.xml
@@ -28,6 +28,10 @@
             <version>${project.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.livetribe.slp</groupId>
+            <artifactId>livetribe-slp</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.apache.karaf.shell</groupId>
             <artifactId>org.apache.karaf.shell.console</artifactId>
         </dependency>
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
index 29a1904..3ec8c07 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
@@ -26,7 +26,7 @@
 import java.util.concurrent.TimeoutException;
 
 import static java.lang.String.format;
-import static java.lang.System.currentTimeMillis;
+import static java.lang.System.nanoTime;
 import static java.lang.System.out;
 import static org.onlab.onos.foo.IOLoopTestServer.PORT;
 import static org.onlab.util.Tools.delay;
@@ -81,7 +81,7 @@
         int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
         int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
         int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
-        int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
+        int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
 
         log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
                  wc, mc, ml, ip);
@@ -185,7 +185,7 @@
      */
     public void report() {
         DecimalFormat f = new DecimalFormat("#,##0");
-        out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency",
+        out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
                            f.format(messages.total()), f.format(bytes.total()),
                            f.format(messages.throughput()),
                            f.format(bytes.throughput() / (1024 * msgLength)),
@@ -217,13 +217,6 @@
 
             messages.add(stream.messagesIn().total());
             bytes.add(stream.bytesIn().total());
-
-//            out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
-//                               FORMAT.format(stream.messagesIn().throughput()),
-//                               FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
-//                               FORMAT.format(stream.messagesOut().throughput()),
-//                               FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
-
             stream.messagesOut().reset();
             stream.bytesOut().reset();
         }
@@ -233,7 +226,7 @@
                                        MessageStream<TestMessage> stream) {
             for (TestMessage message : messages) {
                 // TODO: summarize latency data better
-                latencyTotal += currentTimeMillis() - message.requestorTime();
+                latencyTotal += nanoTime() - message.requestorTime();
                 latencyCount++;
             }
             worker.release(messages.size());
@@ -254,7 +247,7 @@
      */
     private class Worker implements Runnable {
 
-        private static final int BATCH_SIZE = 1000;
+        private static final int BATCH_SIZE = 10;
         private static final int PERMITS = 2 * BATCH_SIZE;
 
         private TestMessageStream stream;
@@ -297,8 +290,8 @@
             // Build a batch of messages
             List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
             for (int i = 0; i < size; i++) {
-                batch.add(new TestMessage(msgLength, currentTimeMillis(), 0,
-                                          this.stream.padding(msgLength)));
+                batch.add(new TestMessage(msgLength, nanoTime(), 0,
+                                          stream.padding()));
             }
             acquire(size);
             stream.write(batch);
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java
index bb5fee7..12cd273 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java
@@ -23,7 +23,7 @@
 import java.util.concurrent.Executors;
 
 import static java.lang.String.format;
-import static java.lang.System.currentTimeMillis;
+import static java.lang.System.nanoTime;
 import static java.lang.System.out;
 import static org.onlab.util.Tools.delay;
 import static org.onlab.util.Tools.namedThreads;
@@ -85,11 +85,14 @@
         IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
         server.start();
 
-        // Start pruning clients.
-        while (true) {
+        // Start pruning clients and keep going until their number goes to 0.
+        int remaining = -1;
+        while (remaining == -1 || remaining > 0) {
             delay(PRUNE_FREQUENCY);
-            server.prune();
+            int r = server.prune();
+            remaining = remaining == -1 && r == 0 ? remaining : r;
         }
+        server.stop();
     }
 
     /**
@@ -153,7 +156,7 @@
      */
     public void report() {
         DecimalFormat f = new DecimalFormat("#,##0");
-        out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
+        out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
                            f.format(messages.total()), f.format(bytes.total()),
                            f.format(messages.throughput()),
                            f.format(bytes.throughput() / (1024 * msgLength))));
@@ -161,11 +164,15 @@
 
     /**
      * Prunes the IO loops of stale message buffers.
+     *
+     * @return number of remaining IO loops among all workers.
      */
-    public void prune() {
+    public int prune() {
+        int count = 0;
         for (CustomIOLoop l : iloops) {
-            l.pruneStaleStreams();
+            count += l.pruneStaleStreams();
         }
+        return count;
     }
 
     // Get the next worker to which a client should be assigned
@@ -189,15 +196,8 @@
         @Override
         protected void removeStream(MessageStream<TestMessage> stream) {
             super.removeStream(stream);
-
             messages.add(stream.messagesIn().total());
             bytes.add(stream.bytesIn().total());
-
-//            out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
-//                               FORMAT.format(stream.messagesIn().throughput()),
-//                               FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
-//                               FORMAT.format(stream.messagesOut().throughput()),
-//                               FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
         }
 
         @Override
@@ -214,7 +214,7 @@
             List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
             for (TestMessage message : messages) {
                 responses.add(new TestMessage(message.length(), message.requestorTime(),
-                                              currentTimeMillis(), message.padding()));
+                                              nanoTime(), message.padding()));
             }
             return responses;
         }
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestIOClientCommand.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOClientCommand.java
index 52128e3..b48fe53 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/TestIOClientCommand.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOClientCommand.java
@@ -1,5 +1,6 @@
 package org.onlab.onos.foo;
 
+import org.apache.karaf.shell.commands.Argument;
 import org.apache.karaf.shell.commands.Command;
 import org.onlab.onos.cli.AbstractShellCommand;
 
@@ -12,12 +13,32 @@
          description = "Starts the test IO loop client")
 public class TestIOClientCommand extends AbstractShellCommand {
 
+    @Argument(index = 0, name = "serverIp", description = "Server IP address",
+              required = false, multiValued = false)
+    String serverIp = "127.0.0.1";
+
+    @Argument(index = 1, name = "workers", description = "IO workers",
+              required = false, multiValued = false)
+    String workers = "6";
+
+    @Argument(index = 2, name = "messageCount", description = "Message count",
+              required = false, multiValued = false)
+    String messageCount = "1000000";
+
+    @Argument(index = 3, name = "messageLength", description = "Message length (bytes)",
+              required = false, multiValued = false)
+    String messageLength = "128";
+
+    @Argument(index = 4, name = "timeoutSecs", description = "Test timeout (seconds)",
+              required = false, multiValued = false)
+    String timeoutSecs = "60";
+
     @Override
     protected void execute() {
         try {
-            startStandalone(new String[]{});
+            startStandalone(new String[]{serverIp, workers, messageCount, messageLength, timeoutSecs});
         } catch (Exception e) {
-            error("Unable to start server %s", e);
+            error("Unable to start client %s", e);
         }
     }
 
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestIOServerCommand.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOServerCommand.java
index 313141d..4777e71 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/TestIOServerCommand.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOServerCommand.java
@@ -1,11 +1,11 @@
 package org.onlab.onos.foo;
 
+import org.apache.karaf.shell.commands.Argument;
 import org.apache.karaf.shell.commands.Command;
 import org.onlab.onos.cli.AbstractShellCommand;
 
 import static org.onlab.onos.foo.IOLoopTestServer.startStandalone;
 
-
 /**
  * Starts the test IO loop server.
  */
@@ -13,10 +13,22 @@
          description = "Starts the test IO loop server")
 public class TestIOServerCommand extends AbstractShellCommand {
 
+    @Argument(index = 0, name = "serverIp", description = "Server IP address",
+              required = false, multiValued = false)
+    String serverIp = "127.0.0.1";
+
+    @Argument(index = 1, name = "workers", description = "IO workers",
+              required = false, multiValued = false)
+    String workers = "6";
+
+    @Argument(index = 2, name = "messageLength", description = "Message length (bytes)",
+              required = false, multiValued = false)
+    String messageLength = "128";
+
     @Override
     protected void execute() {
         try {
-            startStandalone(new String[]{});
+            startStandalone(new String[]{serverIp, workers, messageLength});
         } catch (Exception e) {
             error("Unable to start server %s", e);
         }
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java
index a7a3e6a..0ed9e8d 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java
@@ -6,6 +6,7 @@
 import java.nio.ByteBuffer;
 import java.nio.channels.ByteChannel;
 
+import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
 /**
@@ -19,12 +20,18 @@
     private static final int META_LENGTH = 40;
 
     private final int length;
+    private boolean isStrict = true;
 
     public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) {
         super(loop, ch, 64 * 1024, 500);
+        checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40");
         this.length = length;
     }
 
+    void setNonStrict() {
+        isStrict = false;
+    }
+
     @Override
     protected TestMessage read(ByteBuffer rb) {
         if (rb.remaining() < length) {
@@ -32,16 +39,20 @@
         }
 
         long startTag = rb.getLong();
-        checkState(startTag == START_TAG, "Incorrect message start");
+        if (isStrict) {
+            checkState(startTag == START_TAG, "Incorrect message start");
+        }
 
         long size = rb.getLong();
         long requestorTime = rb.getLong();
         long responderTime = rb.getLong();
-        byte[] padding = padding(length);
+        byte[] padding = padding();
         rb.get(padding);
 
         long endTag = rb.getLong();
-        checkState(endTag == END_TAG, "Incorrect message end");
+        if (isStrict) {
+            checkState(endTag == END_TAG, "Incorrect message end");
+        }
 
         return new TestMessage((int) size, requestorTime, responderTime, padding);
     }
@@ -60,7 +71,7 @@
         wb.putLong(END_TAG);
     }
 
-    public byte[] padding(int msgLength) {
-        return new byte[msgLength - META_LENGTH];
+    public byte[] padding() {
+        return new byte[length - META_LENGTH];
     }
 }
diff --git a/core/api/src/main/java/org/onlab/onos/cluster/MastershipTerm.java b/core/api/src/main/java/org/onlab/onos/cluster/MastershipTerm.java
index f015ae5..71d08f2 100644
--- a/core/api/src/main/java/org/onlab/onos/cluster/MastershipTerm.java
+++ b/core/api/src/main/java/org/onlab/onos/cluster/MastershipTerm.java
@@ -5,7 +5,7 @@
 public final class MastershipTerm {
 
     private final NodeId master;
-    private int termNumber;
+    private final int termNumber;
 
     private MastershipTerm(NodeId master, int term) {
         this.master = master;
diff --git a/pom.xml b/pom.xml
index 26e555e..b98a9e5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -134,6 +134,12 @@
             </dependency>
 
             <dependency>
+                <groupId>org.livetribe.slp</groupId>
+                <artifactId>livetribe-slp</artifactId>
+                <version>2.2.1</version>
+            </dependency>
+
+            <dependency>
               <groupId>com.hazelcast</groupId>
               <artifactId>hazelcast</artifactId>
               <version>3.3</version>
diff --git a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
index 94cd688..1309330 100644
--- a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
+++ b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
@@ -259,13 +259,16 @@
 
     /**
      * Prunes the registered streams by discarding any stale ones.
+     *
+     * @return number of remaining streams
      */
-    public synchronized void pruneStaleStreams() {
+    public synchronized int pruneStaleStreams() {
         for (MessageStream<M> stream : streams) {
             if (stream.isStale()) {
                 stream.close();
             }
         }
+        return streams.size();
     }
 
 }
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
index 6f6bd6d..bdcc97a 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
@@ -24,7 +24,7 @@
 import java.util.concurrent.TimeoutException;
 
 import static java.lang.String.format;
-import static java.lang.System.currentTimeMillis;
+import static java.lang.System.nanoTime;
 import static java.lang.System.out;
 import static org.onlab.nio.IOLoopTestServer.PORT;
 import static org.onlab.util.Tools.delay;
@@ -79,7 +79,7 @@
         int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
         int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
         int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
-        int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
+        int to = args.length > 4 ? Integer.parseInt(args[4]) : 60;
 
         log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
                  wc, mc, ml, ip);
@@ -183,7 +183,7 @@
      */
     public void report() {
         DecimalFormat f = new DecimalFormat("#,##0");
-        out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency",
+        out.println(format("Client: %s messages; %s bytes; %s mps; %s MBs; %s ns latency",
                            f.format(messages.total()), f.format(bytes.total()),
                            f.format(messages.throughput()),
                            f.format(bytes.throughput() / (1024 * msgLength)),
@@ -212,16 +212,8 @@
         @Override
         protected synchronized void removeStream(MessageStream<TestMessage> stream) {
             super.removeStream(stream);
-
             messages.add(stream.messagesIn().total());
             bytes.add(stream.bytesIn().total());
-
-//            out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
-//                               FORMAT.format(stream.messagesIn().throughput()),
-//                               FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
-//                               FORMAT.format(stream.messagesOut().throughput()),
-//                               FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
-
             stream.messagesOut().reset();
             stream.bytesOut().reset();
         }
@@ -231,7 +223,7 @@
                                        MessageStream<TestMessage> stream) {
             for (TestMessage message : messages) {
                 // TODO: summarize latency data better
-                latencyTotal += currentTimeMillis() - message.requestorTime();
+                latencyTotal += nanoTime() - message.requestorTime();
                 latencyCount++;
             }
             worker.release(messages.size());
@@ -252,7 +244,7 @@
      */
     private class Worker implements Runnable {
 
-        private static final int BATCH_SIZE = 1000;
+        private static final int BATCH_SIZE = 50;
         private static final int PERMITS = 2 * BATCH_SIZE;
 
         private TestMessageStream stream;
@@ -295,8 +287,7 @@
             // Build a batch of messages
             List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
             for (int i = 0; i < size; i++) {
-                batch.add(new TestMessage(msgLength, currentTimeMillis(), 0,
-                                          stream.padding()));
+                batch.add(new TestMessage(msgLength, nanoTime(), 0, stream.padding()));
             }
             acquire(size);
             stream.write(batch);
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
index 18566d7..a295c86 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
@@ -20,7 +20,6 @@
 import java.util.concurrent.Executors;
 
 import static java.lang.String.format;
-import static java.lang.System.currentTimeMillis;
 import static java.lang.System.out;
 import static org.onlab.util.Tools.delay;
 import static org.onlab.util.Tools.namedThreads;
@@ -82,11 +81,14 @@
         IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
         server.start();
 
-        // Start pruning clients.
-        while (true) {
+        // Start pruning clients and keep going until their number goes to 0.
+        int remaining = -1;
+        while (remaining == -1 || remaining > 0) {
             delay(PRUNE_FREQUENCY);
-            server.prune();
+            int r = server.prune();
+            remaining = remaining == -1 && r == 0 ? remaining : r;
         }
+        server.stop();
     }
 
     /**
@@ -150,7 +152,7 @@
      */
     public void report() {
         DecimalFormat f = new DecimalFormat("#,##0");
-        out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
+        out.println(format("Server: %s messages; %s bytes; %s mps; %s MBs",
                            f.format(messages.total()), f.format(bytes.total()),
                            f.format(messages.throughput()),
                            f.format(bytes.throughput() / (1024 * msgLength))));
@@ -158,11 +160,15 @@
 
     /**
      * Prunes the IO loops of stale message buffers.
+     *
+     * @return number of remaining IO loops among all workers.
      */
-    public void prune() {
+    public int prune() {
+        int count = 0;
         for (CustomIOLoop l : iloops) {
-            l.pruneStaleStreams();
+            count += l.pruneStaleStreams();
         }
+        return count;
     }
 
     // Get the next worker to which a client should be assigned
@@ -186,15 +192,8 @@
         @Override
         protected void removeStream(MessageStream<TestMessage> stream) {
             super.removeStream(stream);
-
             messages.add(stream.messagesIn().total());
             bytes.add(stream.bytesIn().total());
-
-//            out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
-//                               FORMAT.format(stream.messagesIn().throughput()),
-//                               FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
-//                               FORMAT.format(stream.messagesOut().throughput()),
-//                               FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
         }
 
         @Override
@@ -211,7 +210,7 @@
             List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
             for (TestMessage message : messages) {
                 responses.add(new TestMessage(message.length(), message.requestorTime(),
-                                              currentTimeMillis(), message.padding()));
+                                              System.nanoTime(), message.padding()));
             }
             return responses;
         }