Working on IO loop tests commands.
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
index 29a1904..ebbe5bc 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
@@ -298,7 +298,7 @@
List<TestMessage> batch = Lists.newArrayListWithCapacity(size);
for (int i = 0; i < size; i++) {
batch.add(new TestMessage(msgLength, currentTimeMillis(), 0,
- this.stream.padding(msgLength)));
+ stream.padding()));
}
acquire(size);
stream.write(batch);
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java
index bb5fee7..778f217 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestServer.java
@@ -85,10 +85,12 @@
IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
server.start();
- // Start pruning clients.
- while (true) {
+ // Start pruning clients and keep going until their number goes to 0.
+ int remaining = -1;
+ while (remaining == -1 || remaining > 0) {
delay(PRUNE_FREQUENCY);
- server.prune();
+ int r = server.prune();
+ remaining = remaining == -1 && r == 0 ? remaining : r;
}
}
@@ -161,11 +163,15 @@
/**
* Prunes the IO loops of stale message buffers.
+ *
+ * @return number of remaining IO loops among all workers.
*/
- public void prune() {
+ public int prune() {
+ int count = 0;
for (CustomIOLoop l : iloops) {
- l.pruneStaleStreams();
+ count += l.pruneStaleStreams();
}
+ return count;
}
// Get the next worker to which a client should be assigned
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestIOClientCommand.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOClientCommand.java
index 52128e3..612d320 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/TestIOClientCommand.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOClientCommand.java
@@ -1,5 +1,6 @@
package org.onlab.onos.foo;
+import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cli.AbstractShellCommand;
@@ -12,12 +13,32 @@
description = "Starts the test IO loop client")
public class TestIOClientCommand extends AbstractShellCommand {
+ @Argument(index = 0, name = "serverIp", description = "Server IP address",
+ required = false, multiValued = false)
+ String serverIp = "127.0.0.1";
+
+ @Argument(index = 1, name = "workers", description = "IO workers",
+ required = false, multiValued = false)
+ String workers = "6";
+
+ @Argument(index = 2, name = "messageCount", description = "Message count",
+ required = false, multiValued = false)
+ String messageCount = "10000000";
+
+ @Argument(index = 3, name = "messageLength", description = "Message length (bytes)",
+ required = false, multiValued = false)
+ String messageLength = "128";
+
+ @Argument(index = 4, name = "timeoutSecs", description = "Test timeout (seconds)",
+ required = false, multiValued = false)
+ String timeoutSecs = "30";
+
@Override
protected void execute() {
try {
- startStandalone(new String[]{});
+ startStandalone(new String[]{serverIp, workers, messageCount, messageLength, timeoutSecs});
} catch (Exception e) {
- error("Unable to start server %s", e);
+ error("Unable to start client %s", e);
}
}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestIOServerCommand.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOServerCommand.java
index 313141d..4777e71 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/TestIOServerCommand.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestIOServerCommand.java
@@ -1,11 +1,11 @@
package org.onlab.onos.foo;
+import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.cli.AbstractShellCommand;
import static org.onlab.onos.foo.IOLoopTestServer.startStandalone;
-
/**
* Starts the test IO loop server.
*/
@@ -13,10 +13,22 @@
description = "Starts the test IO loop server")
public class TestIOServerCommand extends AbstractShellCommand {
+ @Argument(index = 0, name = "serverIp", description = "Server IP address",
+ required = false, multiValued = false)
+ String serverIp = "127.0.0.1";
+
+ @Argument(index = 1, name = "workers", description = "IO workers",
+ required = false, multiValued = false)
+ String workers = "6";
+
+ @Argument(index = 2, name = "messageLength", description = "Message length (bytes)",
+ required = false, multiValued = false)
+ String messageLength = "128";
+
@Override
protected void execute() {
try {
- startStandalone(new String[]{});
+ startStandalone(new String[]{serverIp, workers, messageLength});
} catch (Exception e) {
error("Unable to start server %s", e);
}
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java
index a7a3e6a..0ed9e8d 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/TestMessageStream.java
@@ -6,6 +6,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
+import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
/**
@@ -19,12 +20,18 @@
private static final int META_LENGTH = 40;
private final int length;
+ private boolean isStrict = true;
public TestMessageStream(int length, ByteChannel ch, IOLoop<TestMessage, ?> loop) {
super(loop, ch, 64 * 1024, 500);
+ checkArgument(length >= META_LENGTH, "Length must be greater than header length of 40");
this.length = length;
}
+ void setNonStrict() {
+ isStrict = false;
+ }
+
@Override
protected TestMessage read(ByteBuffer rb) {
if (rb.remaining() < length) {
@@ -32,16 +39,20 @@
}
long startTag = rb.getLong();
- checkState(startTag == START_TAG, "Incorrect message start");
+ if (isStrict) {
+ checkState(startTag == START_TAG, "Incorrect message start");
+ }
long size = rb.getLong();
long requestorTime = rb.getLong();
long responderTime = rb.getLong();
- byte[] padding = padding(length);
+ byte[] padding = padding();
rb.get(padding);
long endTag = rb.getLong();
- checkState(endTag == END_TAG, "Incorrect message end");
+ if (isStrict) {
+ checkState(endTag == END_TAG, "Incorrect message end");
+ }
return new TestMessage((int) size, requestorTime, responderTime, padding);
}
@@ -60,7 +71,7 @@
wb.putLong(END_TAG);
}
- public byte[] padding(int msgLength) {
- return new byte[msgLength - META_LENGTH];
+ public byte[] padding() {
+ return new byte[length - META_LENGTH];
}
}
diff --git a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
index 94cd688..1309330 100644
--- a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
+++ b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
@@ -259,13 +259,16 @@
/**
* Prunes the registered streams by discarding any stale ones.
+ *
+ * @return number of remaining streams
*/
- public synchronized void pruneStaleStreams() {
+ public synchronized int pruneStaleStreams() {
for (MessageStream<M> stream : streams) {
if (stream.isStale()) {
stream.close();
}
}
+ return streams.size();
}
}