Working on IO loop tests.
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
index 9942740..6f6bd6d 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
@@ -1,5 +1,6 @@
package org.onlab.nio;
+import com.google.common.collect.Lists;
import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,6 +24,7 @@
import java.util.concurrent.TimeoutException;
import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
import static java.lang.System.out;
import static org.onlab.nio.IOLoopTestServer.PORT;
import static org.onlab.util.Tools.delay;
@@ -46,15 +48,18 @@
Counter messages;
Counter bytes;
+ long latencyTotal = 0;
+ long latencyCount = 0;
+
/**
* Main entry point to launch the client.
*
* @param args command-line arguments
- * @throws IOException if unable to connect to server
- * @throws InterruptedException if latch wait gets interrupted
- * @throws ExecutionException if wait gets interrupted
- * @throws TimeoutException if timeout occurred while waiting for completion
+ * @throws java.io.IOException if unable to connect to server
+ * @throws InterruptedException if latch wait gets interrupted
+ * @throws java.util.concurrent.ExecutionException if wait gets interrupted
+ * @throws java.util.concurrent.TimeoutException if timeout occurred while waiting for completion
*/
public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
@@ -95,7 +100,7 @@
* @param mc message count to send per client
* @param ml message length in bytes
* @param port socket port
- * @throws IOException if unable to create IO loops
+ * @throws java.io.IOException if unable to create IO loops
*/
public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
this.ip = ip;
@@ -113,7 +118,7 @@
/**
* Starts the client workers.
*
- * @throws IOException if unable to open connection
+ * @throws java.io.IOException if unable to open connection
*/
public void start() throws IOException {
messages = new Counter();
@@ -141,7 +146,7 @@
* channel with the given IO loop.
*
* @param loop loop with which the channel should be registered
- * @throws IOException if the socket could not be open or connected
+ * @throws java.io.IOException if the socket could not be open or connected
*/
private void openConnection(CustomIOLoop loop) throws IOException {
SocketAddress sa = new InetSocketAddress(ip, port);
@@ -156,15 +161,17 @@
* Waits for the client workers to complete.
*
* @param secs timeout in seconds
- * @throws ExecutionException if execution failed
- * @throws InterruptedException if interrupt occurred while waiting
- * @throws TimeoutException if timeout occurred
+ * @throws java.util.concurrent.ExecutionException if execution failed
+ * @throws InterruptedException if interrupt occurred while waiting
+ * @throws java.util.concurrent.TimeoutException if timeout occurred
*/
public void await(int secs) throws InterruptedException,
ExecutionException, TimeoutException {
for (CustomIOLoop l : iloops) {
if (l.worker.task != null) {
l.worker.task.get(secs, TimeUnit.SECONDS);
+ latencyTotal += l.latencyTotal;
+ latencyCount += l.latencyCount;
}
}
messages.freeze();
@@ -176,10 +183,11 @@
*/
public void report() {
DecimalFormat f = new DecimalFormat("#,##0");
- out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs",
+ out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs; %s ms latency",
f.format(messages.total()), f.format(bytes.total()),
f.format(messages.throughput()),
- f.format(bytes.throughput() / (1024 * msgLength))));
+ f.format(bytes.throughput() / (1024 * msgLength)),
+ f.format(latencyTotal / latencyCount)));
}
@@ -187,6 +195,9 @@
private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
Worker worker = new Worker();
+ long latencyTotal = 0;
+ long latencyCount = 0;
+
public CustomIOLoop() throws IOException {
super(500);
@@ -217,7 +228,12 @@
@Override
protected void processMessages(List<TestMessage> messages,
- MessageStream<TestMessage> b) {
+ MessageStream<TestMessage> stream) {
+ for (TestMessage message : messages) {
+ // TODO: summarize latency data better
+ latencyTotal += currentTimeMillis() - message.requestorTime();
+ latencyCount++;
+ }
worker.release(messages.size());
}
@@ -239,15 +255,15 @@
private static final int BATCH_SIZE = 1000;
private static final int PERMITS = 2 * BATCH_SIZE;
- private TestMessageStream b;
+ private TestMessageStream stream;
private FutureTask<Worker> task;
// Stuff to throttle pump
private final Semaphore semaphore = new Semaphore(PERMITS);
private int msgWritten;
- void pump(TestMessageStream b) {
- this.b = b;
+ void pump(TestMessageStream stream) {
+ this.stream = stream;
task = new FutureTask<>(this, this);
wpool.execute(task);
}
@@ -257,18 +273,15 @@
try {
log.info("Worker started...");
- List<TestMessage> batch = new ArrayList<>();
- for (int i = 0; i < BATCH_SIZE; i++) {
- batch.add(new TestMessage(msgLength));
- }
-
while (msgWritten < msgCount) {
- msgWritten += writeBatch(b, batch);
+ int size = Math.min(BATCH_SIZE, msgCount - msgWritten);
+ writeBatch(size);
+ msgWritten += size;
}
// Now try to get all the permits back before sending poison pill
semaphore.acquireUninterruptibly(PERMITS);
- b.close();
+ stream.close();
log.info("Worker done...");
@@ -278,18 +291,15 @@
}
- private int writeBatch(TestMessageStream b, List<TestMessage> batch)
- throws IOException {
- int count = Math.min(BATCH_SIZE, msgCount - msgWritten);
- acquire(count);
- if (count == BATCH_SIZE) {
- b.write(batch);
- } else {
- for (int i = 0; i < count; i++) {
- b.write(batch.get(i));
- }
+ private void writeBatch(int size) throws IOException {
+ // Build a batch of messages
+ List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
+ for (int i = 0; i < size; i++) {
+ batch.add(new TestMessage(msgLength, currentTimeMillis(), 0,
+ stream.padding()));
}
- return count;
+ acquire(size);
+ stream.write(batch);
}
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
index 3bcbaa1..18566d7 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
@@ -1,5 +1,6 @@
package org.onlab.nio;
+import com.google.common.collect.Lists;
import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -19,8 +20,9 @@
import java.util.concurrent.Executors;
import static java.lang.String.format;
+import static java.lang.System.currentTimeMillis;
import static java.lang.System.out;
-import static org.onlab.junit.TestTools.delay;
+import static org.onlab.util.Tools.delay;
import static org.onlab.util.Tools.namedThreads;
/**
@@ -58,7 +60,7 @@
* Main entry point to launch the server.
*
* @param args command-line arguments
- * @throws IOException if unable to crate IO loops
+ * @throws java.io.IOException if unable to crate IO loops
*/
public static void main(String[] args) throws IOException {
startStandalone(args);
@@ -94,7 +96,7 @@
* @param wc worker count
* @param ml message length in bytes
* @param port listen port
- * @throws IOException if unable to create IO loops
+ * @throws java.io.IOException if unable to create IO loops
*/
public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
this.workerCount = wc;
@@ -199,11 +201,20 @@
protected void processMessages(List<TestMessage> messages,
MessageStream<TestMessage> stream) {
try {
- stream.write(messages);
+ stream.write(createResponses(messages));
} catch (IOException e) {
log.error("Unable to echo messages", e);
}
}
+
+ private List<TestMessage> createResponses(List<TestMessage> messages) {
+ List<TestMessage> responses = Lists.newArrayListWithCapacity(messages.size());
+ for (TestMessage message : messages) {
+ responses.add(new TestMessage(message.length(), message.requestorTime(),
+ currentTimeMillis(), message.padding()));
+ }
+ return responses;
+ }
}
// Loop for accepting client connections
diff --git a/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
index 583d0ec..40b5a4f 100644
--- a/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
+++ b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
@@ -23,11 +23,10 @@
*/
public class MessageStreamTest {
- private static final int SIZE = 16;
- private static final TestMessage MESSAGE = new TestMessage(SIZE);
-
+ private static final int SIZE = 64;
private static final int BIG_SIZE = 32 * 1024;
- private static final TestMessage BIG_MESSAGE = new TestMessage(BIG_SIZE);
+
+ private TestMessage message;
private TestIOLoop loop;
private TestByteChannel channel;
@@ -41,6 +40,8 @@
key = new TestKey(channel);
stream = loop.createStream(channel);
stream.setKey(key);
+ stream.setNonStrict();
+ message = new TestMessage(SIZE, 0, 0, stream.padding());
}
@After
@@ -68,11 +69,13 @@
public void bufferGrowth() throws IOException {
// Create a stream for big messages and test the growth.
stream = new TestMessageStream(BIG_SIZE, channel, loop);
- stream.write(BIG_MESSAGE);
- stream.write(BIG_MESSAGE);
- stream.write(BIG_MESSAGE);
- stream.write(BIG_MESSAGE);
- stream.write(BIG_MESSAGE);
+ TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding());
+
+ stream.write(bigMessage);
+ stream.write(bigMessage);
+ stream.write(bigMessage);
+ stream.write(bigMessage);
+ stream.write(bigMessage);
}
@Test
@@ -102,25 +105,25 @@
validate(false, false, 0, 0);
// First write is immediate...
- stream.write(MESSAGE);
+ stream.write(message);
validate(false, false, 0, SIZE);
// Second and third get buffered...
- stream.write(MESSAGE);
+ stream.write(message);
validate(false, true, 0, SIZE);
- stream.write(MESSAGE);
+ stream.write(message);
validate(false, true, 0, SIZE);
// Reset write, which will flush if needed; the next write is again buffered
stream.flushIfWriteNotPending();
validate(false, false, 0, SIZE * 3);
- stream.write(MESSAGE);
+ stream.write(message);
validate(false, true, 0, SIZE * 3);
// Select reset, which will flush if needed; the next write is again buffered
stream.flushIfPossible();
validate(false, false, 0, SIZE * 4);
- stream.write(MESSAGE);
+ stream.write(message);
validate(false, true, 0, SIZE * 4);
stream.flush();
validate(false, true, 0, SIZE * 4);
@@ -132,10 +135,10 @@
// First write is immediate...
List<TestMessage> messages = new ArrayList<>();
- messages.add(MESSAGE);
- messages.add(MESSAGE);
- messages.add(MESSAGE);
- messages.add(MESSAGE);
+ messages.add(message);
+ messages.add(message);
+ messages.add(message);
+ messages.add(message);
stream.write(messages);
validate(false, false, 0, SIZE * 4);
@@ -152,14 +155,14 @@
validate(false, false, 0, 0);
// First write is immediate...
- stream.write(MESSAGE);
+ stream.write(message);
validate(false, false, 0, SIZE);
// Tell test channel to accept only half.
channel.bytesToWrite = SIZE / 2;
// Second and third get buffered...
- stream.write(MESSAGE);
+ stream.write(message);
validate(false, true, 0, SIZE);
stream.flushIfPossible();
validate(true, true, 0, SIZE + SIZE / 2);
@@ -170,14 +173,14 @@
validate(false, false, 0, 0);
// First write is immediate...
- stream.write(MESSAGE);
+ stream.write(message);
validate(false, false, 0, SIZE);
// Tell test channel to accept only half.
channel.bytesToWrite = SIZE / 2;
// Second and third get buffered...
- stream.write(MESSAGE);
+ stream.write(message);
validate(false, true, 0, SIZE);
stream.flushIfWriteNotPending();
validate(true, true, 0, SIZE + SIZE / 2);
@@ -190,7 +193,7 @@
assertEquals(1, messages.size());
validate(false, false, SIZE + 4, 0);
- stream.write(MESSAGE);
+ stream.write(message);
validate(false, false, SIZE + 4, SIZE);
channel.bytesToRead = SIZE - 4;
diff --git a/utils/nio/src/test/java/org/onlab/nio/TestMessage.java b/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
index 00315ec..1ce469a 100644
--- a/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
+++ b/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
@@ -1,39 +1,41 @@
package org.onlab.nio;
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
- * Fixed-length message.
+ * Test message for measuring rate and round-trip latency.
*/
public class TestMessage extends AbstractMessage {
- private final byte[] data;
+ private final byte[] padding;
- /**
- * Creates a new message with the specified length.
- *
- * @param length message length
- */
- public TestMessage(int length) {
- this.length = length;
- data = new byte[length];
- }
+ private final long requestorTime;
+ private final long responderTime;
/**
* Creates a new message with the specified data.
*
- * @param data message data
+ * @param requestorTime requester time
+ * @param responderTime responder time
+ * @param padding message padding
*/
- TestMessage(byte[] data) {
- this.length = data.length;
- this.data = data;
+ TestMessage(int length, long requestorTime, long responderTime, byte[] padding) {
+ this.length = length;
+ this.requestorTime = requestorTime;
+ this.responderTime = responderTime;
+ this.padding = checkNotNull(padding, "Padding cannot be null");
}
- /**
- * Gets the backing byte array data.
- *
- * @return backing byte array
- */
- public byte[] data() {
- return data;
+ public long requestorTime() {
+ return requestorTime;
+ }
+
+ public long responderTime() {
+ return responderTime;
+ }
+
+ public byte[] padding() {
+ return padding;
}
}
diff --git a/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java b/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
index a8ab8fa..c357fe4 100644
--- a/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
+++ b/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
@@ -3,53 +3,72 @@
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
/**
* Fixed-length message transfer buffer.
*/
public class TestMessageStream extends MessageStream<TestMessage> {
private static final String E_WRONG_LEN = "Illegal message length: ";
+ private static final long START_TAG = 0xfeedcafedeaddeedL;
+ private static final long END_TAG = 0xbeadcafedeaddeedL;
+ private static final int META_LENGTH = 40;
private final int length;
+ private boolean isStrict = true;
- /**
- * Create a new buffer for transferring messages of the specified length.
- *
- * @param length message length
- * @param ch backing channel
- * @param loop driver loop
- */
- public TestMessageStream(int length, ByteChannel ch,
- IOLoop<TestMessage, ?> loop) {
+ public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) {
super(loop, ch, 64 * 1024, 500);
+ checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40");
this.length = length;
}
+ void setNonStrict() {
+ isStrict = false;
+ }
+
@Override
protected TestMessage read(ByteBuffer rb) {
if (rb.remaining() < length) {
return null;
}
- TestMessage message = new TestMessage(length);
- rb.get(message.data());
- return message;
+
+ long startTag = rb.getLong();
+ if (isStrict) {
+ checkState(startTag == START_TAG, "Incorrect message start");
+ }
+
+ long size = rb.getLong();
+ long requestorTime = rb.getLong();
+ long responderTime = rb.getLong();
+ byte[] padding = padding();
+ rb.get(padding);
+
+ long endTag = rb.getLong();
+ if (isStrict) {
+ checkState(endTag == END_TAG, "Incorrect message end");
+ }
+
+ return new TestMessage((int) size, requestorTime, responderTime, padding);
}
- /**
- * {@inheritDoc}
- * <p/>
- * This implementation enforces the message length against the buffer
- * supported length.
- *
- * @throws IllegalArgumentException if message size does not match the
- * supported buffer size
- */
@Override
protected void write(TestMessage message, ByteBuffer wb) {
if (message.length() != length) {
throw new IllegalArgumentException(E_WRONG_LEN + message.length());
}
- wb.put(message.data());
+
+ wb.putLong(START_TAG);
+ wb.putLong(message.length());
+ wb.putLong(message.requestorTime());
+ wb.putLong(message.responderTime());
+ wb.put(message.padding(), 0, length - META_LENGTH);
+ wb.putLong(END_TAG);
}
+ public byte[] padding() {
+ return new byte[length - META_LENGTH];
+ }
}