Added ability to measure round-trip latency and to assure message integrity.
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
index 72e7a59..29a1904 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
@@ -1,5 +1,6 @@
package org.onlab.onos.foo;
+import com.google.common.collect.Lists;
import org.onlab.nio.IOLoop;
import org.onlab.nio.MessageStream;
import org.onlab.util.Counter;
@@ -25,6 +26,7 @@
import java.util.concurrent.TimeoutException;
import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
import static java.lang.System.out;
import static org.onlab.onos.foo.IOLoopTestServer.PORT;
import static org.onlab.util.Tools.delay;
@@ -48,15 +50,18 @@
Counter messages;
Counter bytes;
+ long latencyTotal = 0;
+ long latencyCount = 0;
+
/**
* Main entry point to launch the client.
*
* @param args command-line arguments
- * @throws java.io.IOException if unable to connect to server
- * @throws InterruptedException if latch wait gets interrupted
- * @throws java.util.concurrent.ExecutionException if wait gets interrupted
- * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
+ * @throws java.io.IOException if unable to connect to server
+ * @throws InterruptedException if latch wait gets interrupted
+ * @throws java.util.concurrent.ExecutionException if wait gets interrupted
+ * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
*/
public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
@@ -158,15 +163,17 @@
* Waits for the client workers to complete.
*
* @param secs timeout in seconds
- * @throws java.util.concurrent.ExecutionException if execution failed
- * @throws InterruptedException if interrupt occurred while waiting
- * @throws java.util.concurrent.TimeoutException if timeout occurred
+ * @throws java.util.concurrent.ExecutionException if execution failed
+ * @throws InterruptedException if interrupt occurred while waiting
+ * @throws java.util.concurrent.TimeoutException if timeout occurred
*/
public void await(int secs) throws InterruptedException,
ExecutionException, TimeoutException {
for (CustomIOLoop l : iloops) {
if (l.worker.task != null) {
l.worker.task.get(secs, TimeUnit.SECONDS);
+ latencyTotal += l.latencyTotal;
+ latencyCount += l.latencyCount;
}
}
messages.freeze();
@@ -178,10 +185,11 @@
*/
public void report() {
DecimalFormat f = new DecimalFormat("#,##0");
- out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs",
+ out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency",
f.format(messages.total()), f.format(bytes.total()),
f.format(messages.throughput()),
- f.format(bytes.throughput() / (1024 * msgLength))));
+ f.format(bytes.throughput() / (1024 * msgLength)),
+ f.format(latencyTotal / latencyCount)));
}
@@ -189,6 +197,9 @@
private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
Worker worker = new Worker();
+ long latencyTotal = 0;
+ long latencyCount = 0;
+
public CustomIOLoop() throws IOException {
super(500);
@@ -219,7 +230,12 @@
@Override
protected void processMessages(List<TestMessage> messages,
- MessageStream<TestMessage> b) {
+ MessageStream<TestMessage> stream) {
+ for (TestMessage message : messages) {
+ // TODO: summarize latency data better
+ latencyTotal += currentTimeMillis() - message.requestorTime();
+ latencyCount++;
+ }
worker.release(messages.size());
}
@@ -241,15 +257,15 @@
private static final int BATCH_SIZE = 1000;
private static final int PERMITS = 2 * BATCH_SIZE;
- private TestMessageStream b;
+ private TestMessageStream stream;
private FutureTask<Worker> task;
// Stuff to throttle pump
private final Semaphore semaphore = new Semaphore(PERMITS);
private int msgWritten;
- void pump(TestMessageStream b) {
- this.b = b;
+ void pump(TestMessageStream stream) {
+ this.stream = stream;
task = new FutureTask<>(this, this);
wpool.execute(task);
}
@@ -259,18 +275,15 @@
try {
log.info("Worker started...");
- List<TestMessage> batch = new ArrayList<>();
- for (int i = 0; i < BATCH_SIZE; i++) {
- batch.add(new TestMessage(msgLength));
- }
-
while (msgWritten < msgCount) {
- msgWritten += writeBatch(b, batch);
+ int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
+ writeBatch(size);
+ msgWritten += size;
}
// Now try to get all the permits back before sending poison pill
semaphore.acquireUninterruptibly(PERMITS);
- b.close();
+ stream.close();
log.info("Worker done...");
@@ -280,18 +293,15 @@
}
- private int writeBatch(TestMessageStream b, List<TestMessage> batch)
- throws IOException {
- int count = Math.min(BATCH_SIZE, msgCount - msgWritten);
- acquire(count);
- if (count == BATCH_SIZE) {
- b.write(batch);
- } else {
- for (int i = 0; i < count; i++) {
- b.write(batch.get(i));
- }
+ private void writeBatch(int size) throws IOException {
+ // Build a batch of messages
+ List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
+ for (int i = 0; i < size; i++) {
+ batch.add(new TestMessage(msgLength, currentTimeMillis(), 0,
+ this.stream.padding(msgLength)));
}
- return count;
+ acquire(size);
+ stream.write(batch);
}