More IO loop work.
diff --git a/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedClient.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopClient.java
similarity index 83%
rename from utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedClient.java
rename to utils/nio/src/test/java/org/onlab/nio/IOLoopClient.java
index 6445221..2497e0e 100644
--- a/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedClient.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopClient.java
@@ -1,5 +1,6 @@
package org.onlab.nio;
+import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -10,6 +11,7 @@
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
+import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -26,9 +28,9 @@
/**
* Auxiliary test fixture to measure speed of NIO-based channels.
*/
-public class StandaloneSpeedClient {
+public class IOLoopClient {
- private static Logger log = LoggerFactory.getLogger(StandaloneSpeedClient.class);
+ private static Logger log = LoggerFactory.getLogger(IOLoopClient.class);
private final InetAddress ip;
private final int port;
@@ -39,8 +41,8 @@
private final ExecutorService ipool;
private final ExecutorService wpool;
-// ThroughputTracker messages;
-// ThroughputTracker bytes;
+ Counter messages;
+ Counter bytes;
/**
* Main entry point to launch the client.
@@ -61,7 +63,7 @@
log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
wc, mc, ml, ip);
- StandaloneSpeedClient sc = new StandaloneSpeedClient(ip, wc, mc, ml, StandaloneSpeedServer.PORT);
+ IOLoopClient sc = new IOLoopClient(ip, wc, mc, ml, IOLoopServer.PORT);
sc.start();
delay(2000);
@@ -82,7 +84,7 @@
* @param port socket port
* @throws IOException if unable to create IO loops
*/
- public StandaloneSpeedClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
+ public IOLoopClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
this.ip = ip;
this.port = port;
this.msgCount = mc;
@@ -101,15 +103,15 @@
* @throws IOException if unable to open connection
*/
public void start() throws IOException {
-// messages = new ThroughputTracker();
-// bytes = new ThroughputTracker();
+ messages = new Counter();
+ bytes = new Counter();
// First start up all the IO loops
for (CustomIOLoop l : iloops) {
ipool.execute(l);
}
-// // Wait for all of them to get going
+ // Wait for all of them to get going
// for (CustomIOLoop l : iloops)
// l.waitForStart(TIMEOUT);
@@ -151,20 +153,20 @@
l.worker.task.get(secs, TimeUnit.SECONDS);
}
}
-// messages.freeze();
-// bytes.freeze();
+ messages.freeze();
+ bytes.freeze();
}
/**
* Reports on the accumulated throughput trackers.
*/
public void report() {
-// DecimalFormat f = new DecimalFormat("#,##0");
-// log.info("{} messages; {} bytes; {} mps; {} Mbs",
-// f.format(messages.total()),
-// f.format(bytes.total()),
-// f.format(messages.throughput()),
-// f.format(bytes.throughput() / (1024 * 128)));
+ DecimalFormat f = new DecimalFormat("#,##0");
+ log.info("{} messages; {} bytes; {} mps; {} Mbs",
+ f.format(messages.total()),
+ f.format(bytes.total()),
+ f.format(messages.throughput()),
+ f.format(bytes.throughput() / (1024 * 128)));
}
@@ -187,16 +189,16 @@
protected synchronized void removeStream(MessageStream<TestMessage> b) {
super.removeStream(b);
-// messages.add(b.inMessages().total());
-// bytes.add(b.inBytes().total());
-// b.inMessages().reset();
-// b.inBytes().reset();
-
-// log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
-// StandaloneSpeedServer.format.format(b.inMessages().throughput()),
-// StandaloneSpeedServer.format.format(b.inBytes().throughput() / (1024 * 128)),
-// StandaloneSpeedServer.format.format(b.outMessages().throughput()),
-// StandaloneSpeedServer.format.format(b.outBytes().throughput() / (1024 * 128)));
+ messages.add(b.messagesIn().total());
+ bytes.add(b.bytesIn().total());
+ b.messagesOut().reset();
+ b.bytesOut().reset();
+//
+ log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
+ IOLoopServer.FORMAT.format(b.messagesIn().throughput()),
+ IOLoopServer.FORMAT.format(b.bytesIn().throughput() / (1024 * 128)),
+ IOLoopServer.FORMAT.format(b.messagesOut().throughput()),
+ IOLoopServer.FORMAT.format(b.bytesOut().throughput() / (1024 * 128)));
}
@Override
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
index 21843f0..b7f706e 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
@@ -48,11 +48,11 @@
// Setup the test on a random port to avoid intermittent test failures
// due to the port being already bound.
- int port = StandaloneSpeedServer.PORT + new Random().nextInt(100);
+ int port = IOLoopServer.PORT + new Random().nextInt(100);
InetAddress ip = InetAddress.getLoopbackAddress();
- StandaloneSpeedServer sss = new StandaloneSpeedServer(ip, THREADS, size, port);
- StandaloneSpeedClient ssc = new StandaloneSpeedClient(ip, THREADS, count, size, port);
+ IOLoopServer sss = new IOLoopServer(ip, THREADS, size, port);
+ IOLoopClient ssc = new IOLoopClient(ip, THREADS, count, size, port);
sss.start();
ssc.start();
@@ -64,32 +64,6 @@
delay(1000);
sss.stop();
sss.report();
-
- // Note that the client and server will have potentially significantly
- // differing rates. This is due to the wide variance in how tightly
- // the throughput tracking starts & stops relative to to the short
- // test duration.
-// System.out.println(f.format(ssc.messages.throughput()) + " mps");
-
-// // Make sure client sent everything.
-// assertEquals("incorrect client message count sent",
-// (long) count * THREADS, ssc.messages.total());
-// assertEquals("incorrect client bytes count sent",
-// (long) size * count * THREADS, ssc.bytes.total());
-//
-// // Make sure server received everything.
-// assertEquals("incorrect server message count received",
-// (long) count * THREADS, sss.messages.total());
-// assertEquals("incorrect server bytes count received",
-// (long) size * count * THREADS, sss.bytes.total());
-//
-// // Make sure speeds were reasonable.
-// if (mps > 0.0) {
-// assertAboveThreshold("insufficient client speed", mps,
-// ssc.messages.throughput());
-// assertAboveThreshold("insufficient server speed", mps / 2,
-// sss.messages.throughput());
-// }
}
}
diff --git a/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedServer.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopServer.java
similarity index 80%
rename from utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedServer.java
rename to utils/nio/src/test/java/org/onlab/nio/IOLoopServer.java
index a193f50..d133e75 100644
--- a/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedServer.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopServer.java
@@ -1,5 +1,6 @@
package org.onlab.nio;
+import org.onlab.util.Counter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,9 +24,9 @@
/**
* Auxiliary test fixture to measure speed of NIO-based channels.
*/
-public class StandaloneSpeedServer {
+public class IOLoopServer {
- private static Logger log = LoggerFactory.getLogger(StandaloneSpeedServer.class);
+ private static Logger log = LoggerFactory.getLogger(IOLoopServer.class);
private static final int PRUNE_FREQUENCY = 1000;
@@ -48,8 +49,8 @@
private final int msgLength;
private int lastWorker = -1;
-// ThroughputTracker messages;
-// ThroughputTracker bytes;
+ Counter messages;
+ Counter bytes;
/**
* Main entry point to launch the server.
@@ -64,7 +65,7 @@
log.info("Setting up the server with {} workers, {} byte messages on {}... ",
wc, ml, ip);
- StandaloneSpeedServer ss = new StandaloneSpeedServer(ip, wc, ml, PORT);
+ IOLoopServer ss = new IOLoopServer(ip, wc, ml, PORT);
ss.start();
// Start pruning clients.
@@ -83,7 +84,7 @@
* @param port listen port
* @throws IOException if unable to create IO loops
*/
- public StandaloneSpeedServer(InetAddress ip, int wc, int ml, int port) throws IOException {
+ public IOLoopServer(InetAddress ip, int wc, int ml, int port) throws IOException {
this.workerCount = wc;
this.msgLength = ml;
this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
@@ -98,14 +99,14 @@
* Start the server IO loops and kicks off throughput tracking.
*/
public void start() {
-// messages = new ThroughputTracker();
-// bytes = new ThroughputTracker();
+ messages = new Counter();
+ bytes = new Counter();
for (CustomIOLoop l : iloops) {
ipool.execute(l);
}
apool.execute(aloop);
-//
+
// for (CustomIOLoop l : iloops)
// l.waitForStart(TIMEOUT);
// aloop.waitForStart(TIMEOUT);
@@ -124,20 +125,20 @@
// l.waitForFinish(TIMEOUT);
// aloop.waitForFinish(TIMEOUT);
//
-// messages.freeze();
-// bytes.freeze();
+ messages.freeze();
+ bytes.freeze();
}
/**
* Reports on the accumulated throughput trackers.
*/
public void report() {
-// DecimalFormat f = new DecimalFormat("#,##0");
-// log.info("{} messages; {} bytes; {} mps; {} Mbs",
-// f.format(messages.total()),
-// f.format(bytes.total()),
-// f.format(messages.throughput()),
-// f.format(bytes.throughput() / (1024 * 128)));
+ DecimalFormat f = new DecimalFormat("#,##0");
+ log.info("{} messages; {} bytes; {} mps; {} Mbs",
+ f.format(messages.total()),
+ f.format(bytes.total()),
+ f.format(messages.throughput()),
+ f.format(bytes.throughput() / (1024 * 128)));
}
/**
@@ -170,15 +171,15 @@
@Override
protected void removeStream(MessageStream<TestMessage> stream) {
super.removeStream(stream);
-//
-// messages.add(b.inMessages().total());
-// bytes.add(b.inBytes().total());
-//
-// log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
-// format.format(b.inMessages().throughput()),
-// format.format(b.inBytes().throughput() / (1024 * 128)),
-// format.format(b.outMessages().throughput()),
-// format.format(b.outBytes().throughput() / (1024 * 128)));
+
+ messages.add(stream.messagesIn().total());
+ bytes.add(stream.bytesIn().total());
+
+ log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
+ FORMAT.format(stream.messagesIn().throughput()),
+ FORMAT.format(stream.bytesIn().throughput() / (1024 * 128)),
+ FORMAT.format(stream.messagesOut().throughput()),
+ FORMAT.format(stream.bytesOut().throughput() / (1024 * 128)));
}
@Override
diff --git a/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
index f0f896f..583d0ec 100644
--- a/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
+++ b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
@@ -23,57 +23,36 @@
*/
public class MessageStreamTest {
- private static final int LENGTH = 16;
-
- private static final TestMessage TM1 = new TestMessage(LENGTH);
- private static final TestMessage TM2 = new TestMessage(LENGTH);
- private static final TestMessage TM3 = new TestMessage(LENGTH);
- private static final TestMessage TM4 = new TestMessage(LENGTH);
+ private static final int SIZE = 16;
+ private static final TestMessage MESSAGE = new TestMessage(SIZE);
private static final int BIG_SIZE = 32 * 1024;
private static final TestMessage BIG_MESSAGE = new TestMessage(BIG_SIZE);
- private static enum WritePending {
- ON, OFF;
-
- public boolean on() {
- return this == ON;
- }
- }
-
- private static enum FlushRequired {
- ON, OFF;
-
- public boolean on() {
- return this == ON;
- }
- }
-
- private FakeIOLoop loop;
+ private TestIOLoop loop;
private TestByteChannel channel;
- private TestMessageStream buffer;
+ private TestMessageStream stream;
private TestKey key;
@Before
public void setUp() throws IOException {
- loop = new FakeIOLoop();
+ loop = new TestIOLoop();
channel = new TestByteChannel();
key = new TestKey(channel);
- buffer = loop.createStream(channel);
- buffer.setKey(key);
+ stream = loop.createStream(channel);
+ stream.setKey(key);
}
@After
public void tearDown() {
loop.shutdown();
- buffer.close();
+ stream.close();
}
- // Check state of the message buffer
- private void assertState(WritePending wp, FlushRequired fr,
- int read, int written) {
- assertEquals(wp.on(), buffer.isWritePending());
-// assertEquals(fr.on(), buffer.requiresFlush());
+ // Validates the state of the message stream
+ private void validate(boolean wp, boolean fr, int read, int written) {
+ assertEquals(wp, stream.isWritePending());
+ assertEquals(fr, stream.isFlushRequired());
assertEquals(read, channel.readBytes);
assertEquals(written, channel.writtenBytes);
}
@@ -81,155 +60,155 @@
@Test
public void endOfStream() throws IOException {
channel.close();
- List<TestMessage> messages = buffer.read();
+ List<TestMessage> messages = stream.read();
assertNull(messages);
}
@Test
public void bufferGrowth() throws IOException {
- // Create a buffer for big messages and test the growth.
- buffer = new TestMessageStream(BIG_SIZE, channel, loop);
- buffer.write(BIG_MESSAGE);
- buffer.write(BIG_MESSAGE);
- buffer.write(BIG_MESSAGE);
- buffer.write(BIG_MESSAGE);
- buffer.write(BIG_MESSAGE);
+ // Create a stream for big messages and test the growth.
+ stream = new TestMessageStream(BIG_SIZE, channel, loop);
+ stream.write(BIG_MESSAGE);
+ stream.write(BIG_MESSAGE);
+ stream.write(BIG_MESSAGE);
+ stream.write(BIG_MESSAGE);
+ stream.write(BIG_MESSAGE);
}
@Test
public void discardBeforeKey() {
- // Create a buffer that does not yet have the key set and discard it.
- buffer = loop.createStream(channel);
- assertNull(buffer.key());
- buffer.close();
+ // Create a stream that does not yet have the key set and discard it.
+ stream = loop.createStream(channel);
+ assertNull(stream.key());
+ stream.close();
// There is not key, so nothing to check; we just expect no problem.
}
@Test
public void bufferedRead() throws IOException {
- channel.bytesToRead = LENGTH + 4;
- List<TestMessage> messages = buffer.read();
+ channel.bytesToRead = SIZE + 4;
+ List<TestMessage> messages = stream.read();
assertEquals(1, messages.size());
- assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, 0);
+ validate(false, false, SIZE + 4, 0);
- channel.bytesToRead = LENGTH - 4;
- messages = buffer.read();
+ channel.bytesToRead = SIZE - 4;
+ messages = stream.read();
assertEquals(1, messages.size());
- assertState(WritePending.OFF, FlushRequired.OFF, LENGTH * 2, 0);
+ validate(false, false, SIZE * 2, 0);
}
@Test
public void bufferedWrite() throws IOException {
- assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+ validate(false, false, 0, 0);
// First write is immediate...
- buffer.write(TM1);
- assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
+ stream.write(MESSAGE);
+ validate(false, false, 0, SIZE);
// Second and third get buffered...
- buffer.write(TM2);
- assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
- buffer.write(TM3);
- assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
+ stream.write(MESSAGE);
+ validate(false, true, 0, SIZE);
+ stream.write(MESSAGE);
+ validate(false, true, 0, SIZE);
// Reset write, which will flush if needed; the next write is again buffered
- buffer.flushIfWriteNotPending();
- assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 3);
- buffer.write(TM4);
- assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 3);
+ stream.flushIfWriteNotPending();
+ validate(false, false, 0, SIZE * 3);
+ stream.write(MESSAGE);
+ validate(false, true, 0, SIZE * 3);
// Select reset, which will flush if needed; the next write is again buffered
- buffer.flushIfPossible();
- assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 4);
- buffer.write(TM1);
- assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
- buffer.flush();
- assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
+ stream.flushIfPossible();
+ validate(false, false, 0, SIZE * 4);
+ stream.write(MESSAGE);
+ validate(false, true, 0, SIZE * 4);
+ stream.flush();
+ validate(false, true, 0, SIZE * 4);
}
@Test
public void bufferedWriteList() throws IOException {
- assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+ validate(false, false, 0, 0);
// First write is immediate...
- List<TestMessage> messages = new ArrayList<TestMessage>();
- messages.add(TM1);
- messages.add(TM2);
- messages.add(TM3);
- messages.add(TM4);
+ List<TestMessage> messages = new ArrayList<>();
+ messages.add(MESSAGE);
+ messages.add(MESSAGE);
+ messages.add(MESSAGE);
+ messages.add(MESSAGE);
- buffer.write(messages);
- assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 4);
+ stream.write(messages);
+ validate(false, false, 0, SIZE * 4);
- buffer.write(messages);
- assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
+ stream.write(messages);
+ validate(false, true, 0, SIZE * 4);
- buffer.flushIfPossible();
- assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 8);
+ stream.flushIfPossible();
+ validate(false, false, 0, SIZE * 8);
}
@Test
public void bufferedPartialWrite() throws IOException {
- assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+ validate(false, false, 0, 0);
// First write is immediate...
- buffer.write(TM1);
- assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
+ stream.write(MESSAGE);
+ validate(false, false, 0, SIZE);
// Tell test channel to accept only half.
- channel.bytesToWrite = LENGTH / 2;
+ channel.bytesToWrite = SIZE / 2;
// Second and third get buffered...
- buffer.write(TM2);
- assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
- buffer.flushIfPossible();
- assertState(WritePending.ON, FlushRequired.ON, 0, LENGTH + LENGTH / 2);
+ stream.write(MESSAGE);
+ validate(false, true, 0, SIZE);
+ stream.flushIfPossible();
+ validate(true, true, 0, SIZE + SIZE / 2);
}
@Test
public void bufferedPartialWrite2() throws IOException {
- assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+ validate(false, false, 0, 0);
// First write is immediate...
- buffer.write(TM1);
- assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
+ stream.write(MESSAGE);
+ validate(false, false, 0, SIZE);
// Tell test channel to accept only half.
- channel.bytesToWrite = LENGTH / 2;
+ channel.bytesToWrite = SIZE / 2;
// Second and third get buffered...
- buffer.write(TM2);
- assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
- buffer.flushIfWriteNotPending();
- assertState(WritePending.ON, FlushRequired.ON, 0, LENGTH + LENGTH / 2);
+ stream.write(MESSAGE);
+ validate(false, true, 0, SIZE);
+ stream.flushIfWriteNotPending();
+ validate(true, true, 0, SIZE + SIZE / 2);
}
@Test
public void bufferedReadWrite() throws IOException {
- channel.bytesToRead = LENGTH + 4;
- List<TestMessage> messages = buffer.read();
+ channel.bytesToRead = SIZE + 4;
+ List<TestMessage> messages = stream.read();
assertEquals(1, messages.size());
- assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, 0);
+ validate(false, false, SIZE + 4, 0);
- buffer.write(TM1);
- assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, LENGTH);
+ stream.write(MESSAGE);
+ validate(false, false, SIZE + 4, SIZE);
- channel.bytesToRead = LENGTH - 4;
- messages = buffer.read();
+ channel.bytesToRead = SIZE - 4;
+ messages = stream.read();
assertEquals(1, messages.size());
- assertState(WritePending.OFF, FlushRequired.OFF, LENGTH * 2, LENGTH);
+ validate(false, false, SIZE * 2, SIZE);
}
// Fake IO driver loop
- private static class FakeIOLoop extends IOLoop<TestMessage, TestMessageStream> {
+ private static class TestIOLoop extends IOLoop<TestMessage, TestMessageStream> {
- public FakeIOLoop() throws IOException {
+ public TestIOLoop() throws IOException {
super(500);
}
@Override
protected TestMessageStream createStream(ByteChannel channel) {
- return new TestMessageStream(LENGTH, channel, this);
+ return new TestMessageStream(SIZE, channel, this);
}
@Override