Corrected some IO loop tests.
diff --git a/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java b/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java
index fc88e16..a74703a 100644
--- a/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java
+++ b/utils/nio/src/main/java/org/onlab/nio/SelectorLoop.java
@@ -7,6 +7,7 @@
import java.nio.channels.Selector;
import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.System.currentTimeMillis;
/**
* Abstraction of an I/O processing loop based on an NIO selector.
@@ -118,4 +119,41 @@
notifyAll();
}
+ /**
+ * Waits for the loop execution to start.
+ *
+ * @param timeout number of milliseconds to wait
+ * @return true if loop started in time
+ */
+ public final synchronized boolean awaitStart(long timeout) {
+ long max = currentTimeMillis() + timeout;
+ while (state != State.STARTED && (currentTimeMillis() < max)) {
+ try {
+ wait(timeout);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+ }
+ return state == State.STARTED;
+ }
+
+ /**
+ * Waits for the loop execution to stop.
+ *
+ * @param timeout number of milliseconds to wait
+ * @return true if loop finished in time
+ */
+ public final synchronized boolean awaitStop(long timeout) {
+ long max = currentTimeMillis() + timeout;
+ while (state != State.STOPPED && (currentTimeMillis() < max)) {
+ try {
+ wait(timeout);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted", e);
+ }
+ }
+ return state == State.STOPPED;
+ }
+
+
}
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
index b7f706e..e61f6e2 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
@@ -1,12 +1,12 @@
package org.onlab.nio;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import java.net.InetAddress;
-import java.text.DecimalFormat;
import java.util.Random;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import static org.onlab.junit.TestTools.delay;
@@ -15,55 +15,51 @@
*/
public class IOLoopIntegrationTest {
- private static final int MILLION = 1000000;
- private static final int TIMEOUT = 60;
-
private static final int THREADS = 6;
- private static final int MSG_COUNT = 20 * MILLION;
- private static final int MSG_SIZE = 128;
+ private static final int TIMEOUT = 60;
+ private static final int MESSAGE_LENGTH = 128;
- private static final long MIN_MPS = 10 * MILLION;
+ private static final int MILLION = 1000000;
+ private static final int MSG_COUNT = 40 * MILLION;
@Before
public void warmUp() throws Exception {
+ Logger.getLogger("").setLevel(Level.SEVERE);
try {
- run(MILLION, MSG_SIZE, 15, 0);
+ runTest(MILLION, MESSAGE_LENGTH, 15);
} catch (Throwable e) {
System.err.println("Failed warmup but moving on.");
e.printStackTrace();
}
}
- @Ignore
+
@Test
public void basic() throws Exception {
- run(MSG_COUNT, MSG_SIZE, TIMEOUT, MIN_MPS);
+ runTest(MILLION, MESSAGE_LENGTH, TIMEOUT);
}
+ public void longHaul() throws Exception {
+ runTest(MSG_COUNT, MESSAGE_LENGTH, TIMEOUT);
+ }
- private void run(int count, int size, int timeout, double mps) throws Exception {
- DecimalFormat f = new DecimalFormat("#,##0");
- System.out.print(f.format(count * THREADS) +
- (mps > 0.0 ? " messages: " : " message warm-up: "));
-
- // Setup the test on a random port to avoid intermittent test failures
- // due to the port being already bound.
- int port = IOLoopServer.PORT + new Random().nextInt(100);
+ private void runTest(int count, int size, int timeout) throws Exception {
+ // Use a random port to prevent conflicts.
+ int port = IOLoopTestServer.PORT + new Random().nextInt(100);
InetAddress ip = InetAddress.getLoopbackAddress();
- IOLoopServer sss = new IOLoopServer(ip, THREADS, size, port);
- IOLoopClient ssc = new IOLoopClient(ip, THREADS, count, size, port);
+ IOLoopTestServer server = new IOLoopTestServer(ip, THREADS, size, port);
+ IOLoopTestClient client = new IOLoopTestClient(ip, THREADS, count, size, port);
- sss.start();
- ssc.start();
- delay(250); // give the server and client a chance to go
+ server.start();
+ client.start();
+ delay(100); // Pause to allow loops to get going
- ssc.await(timeout);
- ssc.report();
+ client.await(timeout);
+ client.report();
- delay(1000);
- sss.stop();
- sss.report();
+ server.stop();
+ server.report();
}
}
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopClient.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
similarity index 80%
rename from utils/nio/src/test/java/org/onlab/nio/IOLoopClient.java
rename to utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
index 2497e0e..8121cc3 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopClient.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
@@ -22,15 +22,18 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import static java.lang.String.format;
+import static java.lang.System.out;
import static org.onlab.junit.TestTools.delay;
+import static org.onlab.nio.IOLoopTestServer.PORT;
import static org.onlab.util.Tools.namedThreads;
/**
* Auxiliary test fixture to measure speed of NIO-based channels.
*/
-public class IOLoopClient {
+public class IOLoopTestClient {
- private static Logger log = LoggerFactory.getLogger(IOLoopClient.class);
+ private static Logger log = LoggerFactory.getLogger(IOLoopTestClient.class);
private final InetAddress ip;
private final int port;
@@ -55,6 +58,18 @@
*/
public static void main(String[] args)
throws IOException, InterruptedException, ExecutionException, TimeoutException {
+ startStandalone(args);
+
+ System.exit(0);
+ }
+
+ /**
+ * Starts a standalone IO loop test client.
+ *
+ * @param args command-line arguments
+ */
+ public static void startStandalone(String[] args)
+ throws IOException, InterruptedException, ExecutionException, TimeoutException {
InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
@@ -63,15 +78,13 @@
log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
wc, mc, ml, ip);
- IOLoopClient sc = new IOLoopClient(ip, wc, mc, ml, IOLoopServer.PORT);
+ IOLoopTestClient client = new IOLoopTestClient(ip, wc, mc, ml, PORT);
- sc.start();
+ client.start();
delay(2000);
- sc.await(to);
- sc.report();
-
- System.exit(0);
+ client.await(to);
+ client.report();
}
/**
@@ -84,7 +97,7 @@
* @param port socket port
* @throws IOException if unable to create IO loops
*/
- public IOLoopClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
+ public IOLoopTestClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
this.ip = ip;
this.port = port;
this.msgCount = mc;
@@ -112,8 +125,9 @@
}
// Wait for all of them to get going
-// for (CustomIOLoop l : iloops)
-// l.waitForStart(TIMEOUT);
+ for (CustomIOLoop l : iloops) {
+ l.awaitStart(1000);
+ }
// ... and Next open all connections; one-per-loop
for (CustomIOLoop l : iloops) {
@@ -162,11 +176,10 @@
*/
public void report() {
DecimalFormat f = new DecimalFormat("#,##0");
- log.info("{} messages; {} bytes; {} mps; {} Mbs",
- f.format(messages.total()),
- f.format(bytes.total()),
- f.format(messages.throughput()),
- f.format(bytes.throughput() / (1024 * 128)));
+ out.println(format("Client: %s messages; %s bytes; %s mps; %s Mbs",
+ f.format(messages.total()), f.format(bytes.total()),
+ f.format(messages.throughput()),
+ f.format(bytes.throughput() / (1024 * msgLength))));
}
@@ -186,19 +199,20 @@
}
@Override
- protected synchronized void removeStream(MessageStream<TestMessage> b) {
- super.removeStream(b);
+ protected synchronized void removeStream(MessageStream<TestMessage> stream) {
+ super.removeStream(stream);
- messages.add(b.messagesIn().total());
- bytes.add(b.bytesIn().total());
- b.messagesOut().reset();
- b.bytesOut().reset();
-//
- log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
- IOLoopServer.FORMAT.format(b.messagesIn().throughput()),
- IOLoopServer.FORMAT.format(b.bytesIn().throughput() / (1024 * 128)),
- IOLoopServer.FORMAT.format(b.messagesOut().throughput()),
- IOLoopServer.FORMAT.format(b.bytesOut().throughput() / (1024 * 128)));
+ messages.add(stream.messagesIn().total());
+ bytes.add(stream.bytesIn().total());
+
+// out.println(format("Disconnected client; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
+// FORMAT.format(stream.messagesIn().throughput()),
+// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
+// FORMAT.format(stream.messagesOut().throughput()),
+// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
+
+ stream.messagesOut().reset();
+ stream.bytesOut().reset();
}
@Override
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopServer.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
similarity index 75%
rename from utils/nio/src/test/java/org/onlab/nio/IOLoopServer.java
rename to utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
index d133e75..8a190ec 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopServer.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestServer.java
@@ -18,15 +18,17 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import static java.lang.String.format;
+import static java.lang.System.out;
import static org.onlab.junit.TestTools.delay;
import static org.onlab.util.Tools.namedThreads;
/**
* Auxiliary test fixture to measure speed of NIO-based channels.
*/
-public class IOLoopServer {
+public class IOLoopTestServer {
- private static Logger log = LoggerFactory.getLogger(IOLoopServer.class);
+ private static Logger log = LoggerFactory.getLogger(IOLoopTestServer.class);
private static final int PRUNE_FREQUENCY = 1000;
@@ -34,8 +36,8 @@
static final long TIMEOUT = 1000;
static final boolean SO_NO_DELAY = false;
- static final int SO_SEND_BUFFER_SIZE = 1024 * 1024;
- static final int SO_RCV_BUFFER_SIZE = 1024 * 1024;
+ static final int SO_SEND_BUFFER_SIZE = 128 * 1024;
+ static final int SO_RCV_BUFFER_SIZE = 128 * 1024;
static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
@@ -59,19 +61,29 @@
* @throws IOException if unable to crate IO loops
*/
public static void main(String[] args) throws IOException {
+ startStandalone(args);
+ System.exit(0);
+ }
+
+ /**
+ * Starts a standalone IO loop test server.
+ *
+ * @param args command-line arguments
+ */
+ private static void startStandalone(String[] args) throws IOException {
InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
log.info("Setting up the server with {} workers, {} byte messages on {}... ",
wc, ml, ip);
- IOLoopServer ss = new IOLoopServer(ip, wc, ml, PORT);
- ss.start();
+ IOLoopTestServer server = new IOLoopTestServer(ip, wc, ml, PORT);
+ server.start();
// Start pruning clients.
while (true) {
delay(PRUNE_FREQUENCY);
- ss.prune();
+ server.prune();
}
}
@@ -84,7 +96,7 @@
* @param port listen port
* @throws IOException if unable to create IO loops
*/
- public IOLoopServer(InetAddress ip, int wc, int ml, int port) throws IOException {
+ public IOLoopTestServer(InetAddress ip, int wc, int ml, int port) throws IOException {
this.workerCount = wc;
this.msgLength = ml;
this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
@@ -107,9 +119,10 @@
}
apool.execute(aloop);
-// for (CustomIOLoop l : iloops)
-// l.waitForStart(TIMEOUT);
-// aloop.waitForStart(TIMEOUT);
+ for (CustomIOLoop l : iloops) {
+ l.awaitStart(TIMEOUT);
+ }
+ aloop.awaitStart(TIMEOUT);
}
/**
@@ -121,10 +134,11 @@
l.shutdown();
}
-// for (CustomIOLoop l : iloops)
-// l.waitForFinish(TIMEOUT);
-// aloop.waitForFinish(TIMEOUT);
-//
+ for (CustomIOLoop l : iloops) {
+ l.awaitStop(TIMEOUT);
+ }
+ aloop.awaitStop(TIMEOUT);
+
messages.freeze();
bytes.freeze();
}
@@ -134,11 +148,10 @@
*/
public void report() {
DecimalFormat f = new DecimalFormat("#,##0");
- log.info("{} messages; {} bytes; {} mps; {} Mbs",
- f.format(messages.total()),
- f.format(bytes.total()),
- f.format(messages.throughput()),
- f.format(bytes.throughput() / (1024 * 128)));
+ out.println(format("Server: %s messages; %s bytes; %s mps; %s Mbs",
+ f.format(messages.total()), f.format(bytes.total()),
+ f.format(messages.throughput()),
+ f.format(bytes.throughput() / (1024 * msgLength))));
}
/**
@@ -175,11 +188,11 @@
messages.add(stream.messagesIn().total());
bytes.add(stream.bytesIn().total());
- log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
- FORMAT.format(stream.messagesIn().throughput()),
- FORMAT.format(stream.bytesIn().throughput() / (1024 * 128)),
- FORMAT.format(stream.messagesOut().throughput()),
- FORMAT.format(stream.bytesOut().throughput() / (1024 * 128)));
+// out.println(format("Disconnected server; inbound %s mps, %s Mbps; outbound %s mps, %s Mbps",
+// FORMAT.format(stream.messagesIn().throughput()),
+// FORMAT.format(stream.bytesIn().throughput() / (1024 * msgLength)),
+// FORMAT.format(stream.messagesOut().throughput()),
+// FORMAT.format(stream.bytesOut().throughput() / (1024 * msgLength))));
}
@Override