Working on IO loop stuff.
diff --git a/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java b/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java
new file mode 100644
index 0000000..91fc0d7
--- /dev/null
+++ b/utils/nio/src/test/java/org/onlab/nio/AbstractLoopTest.java
@@ -0,0 +1,45 @@
+package org.onlab.nio;
+
+import org.junit.Before;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.junit.Assert.fail;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Base class for various NIO loop unit tests.
+ */
+public abstract class AbstractLoopTest {
+
+ protected static final long MAX_MS_WAIT = 500;
+
+ /** Block on specified countdown latch. Return when countdown reaches
+ * zero, or fail the test if the {@value #MAX_MS_WAIT} ms timeout expires.
+ *
+ * @param latch the latch
+ * @param label an identifying label
+ */
+ protected void waitForLatch(CountDownLatch latch, String label) {
+ try {
+ boolean ok = latch.await(MAX_MS_WAIT, TimeUnit.MILLISECONDS);
+ if (!ok) {
+ fail("Latch await timeout! [" + label + "]");
+ }
+ } catch (InterruptedException e) {
+ System.out.println("Latch interrupt [" + label + "] : " + e);
+ fail("Unexpected interrupt");
+ }
+ }
+
+ protected ExecutorService exec;
+
+ @Before
+ public void setUp() {
+ exec = newSingleThreadExecutor(namedThreads("test"));
+ }
+
+}
diff --git a/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java b/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java
new file mode 100644
index 0000000..70f9cd5
--- /dev/null
+++ b/utils/nio/src/test/java/org/onlab/nio/AcceptorLoopTest.java
@@ -0,0 +1,73 @@
+package org.onlab.nio;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.onlab.junit.TestTools.delay;
+
+/**
+ * Unit tests for AcceptLoop.
+ */
+public class AcceptorLoopTest extends AbstractLoopTest {
+
+ private static final int PORT = 9876;
+
+ private static final SocketAddress SOCK_ADDR = new InetSocketAddress("127.0.0.1", PORT);
+
+ private static class MyAcceptLoop extends AcceptorLoop {
+ private final CountDownLatch loopStarted = new CountDownLatch(1);
+ private final CountDownLatch loopFinished = new CountDownLatch(1);
+ private final CountDownLatch runDone = new CountDownLatch(1);
+ private final CountDownLatch ceaseLatch = new CountDownLatch(1);
+
+ private int acceptCount = 0;
+
+ MyAcceptLoop() throws IOException {
+ super(500, SOCK_ADDR);
+ }
+
+ @Override
+ protected void acceptConnection(ServerSocketChannel ssc) throws IOException {
+ acceptCount++;
+ }
+
+ @Override
+ public void loop() throws IOException {
+ loopStarted.countDown();
+ super.loop();
+ loopFinished.countDown();
+ }
+
+ @Override
+ public void run() {
+ super.run();
+ runDone.countDown();
+ }
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ ceaseLatch.countDown();
+ }
+ }
+
+ @Test
+// @Ignore("Doesn't shut down the socket")
+ public void basic() throws IOException {
+ MyAcceptLoop myAccLoop = new MyAcceptLoop();
+ AcceptorLoop accLoop = myAccLoop;
+ exec.execute(accLoop);
+ waitForLatch(myAccLoop.loopStarted, "loopStarted");
+ delay(200); // take a quick nap
+ accLoop.shutdown();
+ waitForLatch(myAccLoop.loopFinished, "loopFinished");
+ waitForLatch(myAccLoop.runDone, "runDone");
+ assertEquals(0, myAccLoop.acceptCount);
+ }
+}
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
new file mode 100644
index 0000000..21843f0
--- /dev/null
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopIntegrationTest.java
@@ -0,0 +1,95 @@
+package org.onlab.nio;
+
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.net.InetAddress;
+import java.text.DecimalFormat;
+import java.util.Random;
+
+import static org.onlab.junit.TestTools.delay;
+
+/**
+ * Integration test for the select, accept and IO loops.
+ */
+public class IOLoopIntegrationTest {
+
+ private static final int MILLION = 1000000;
+ private static final int TIMEOUT = 60;
+
+ private static final int THREADS = 6;
+ private static final int MSG_COUNT = 20 * MILLION;
+ private static final int MSG_SIZE = 128;
+
+ private static final long MIN_MPS = 10 * MILLION;
+
+ @Before
+ public void warmUp() throws Exception {
+ try {
+ run(MILLION, MSG_SIZE, 15, 0);
+ } catch (Throwable e) {
+ System.err.println("Failed warmup but moving on.");
+ e.printStackTrace();
+ }
+ }
+
+ @Ignore
+ @Test
+ public void basic() throws Exception {
+ run(MSG_COUNT, MSG_SIZE, TIMEOUT, MIN_MPS);
+ }
+
+
+ private void run(int count, int size, int timeout, double mps) throws Exception {
+ DecimalFormat f = new DecimalFormat("#,##0");
+ System.out.print(f.format(count * THREADS) +
+ (mps > 0.0 ? " messages: " : " message warm-up: "));
+
+ // Setup the test on a random port to avoid intermittent test failures
+ // due to the port being already bound.
+ int port = StandaloneSpeedServer.PORT + new Random().nextInt(100);
+
+ InetAddress ip = InetAddress.getLoopbackAddress();
+ StandaloneSpeedServer sss = new StandaloneSpeedServer(ip, THREADS, size, port);
+ StandaloneSpeedClient ssc = new StandaloneSpeedClient(ip, THREADS, count, size, port);
+
+ sss.start();
+ ssc.start();
+ delay(250); // give the server and client a chance to go
+
+ ssc.await(timeout);
+ ssc.report();
+
+ delay(1000);
+ sss.stop();
+ sss.report();
+
+ // Note that the client and server will have potentially significantly
+ // differing rates. This is due to the wide variance in how tightly
+ // the throughput tracking starts & stops relative to to the short
+ // test duration.
+// System.out.println(f.format(ssc.messages.throughput()) + " mps");
+
+// // Make sure client sent everything.
+// assertEquals("incorrect client message count sent",
+// (long) count * THREADS, ssc.messages.total());
+// assertEquals("incorrect client bytes count sent",
+// (long) size * count * THREADS, ssc.bytes.total());
+//
+// // Make sure server received everything.
+// assertEquals("incorrect server message count received",
+// (long) count * THREADS, sss.messages.total());
+// assertEquals("incorrect server bytes count received",
+// (long) size * count * THREADS, sss.bytes.total());
+//
+// // Make sure speeds were reasonable.
+// if (mps > 0.0) {
+// assertAboveThreshold("insufficient client speed", mps,
+// ssc.messages.throughput());
+// assertAboveThreshold("insufficient server speed", mps / 2,
+// sss.messages.throughput());
+// }
+ }
+
+}
diff --git a/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
new file mode 100644
index 0000000..f0f896f
--- /dev/null
+++ b/utils/nio/src/test/java/org/onlab/nio/MessageStreamTest.java
@@ -0,0 +1,362 @@
+package org.onlab.nio;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+/**
+ * Tests of the message message stream implementation.
+ */
+public class MessageStreamTest {
+
+ private static final int LENGTH = 16;
+
+ private static final TestMessage TM1 = new TestMessage(LENGTH);
+ private static final TestMessage TM2 = new TestMessage(LENGTH);
+ private static final TestMessage TM3 = new TestMessage(LENGTH);
+ private static final TestMessage TM4 = new TestMessage(LENGTH);
+
+ private static final int BIG_SIZE = 32 * 1024;
+ private static final TestMessage BIG_MESSAGE = new TestMessage(BIG_SIZE);
+
+ private static enum WritePending {
+ ON, OFF;
+
+ public boolean on() {
+ return this == ON;
+ }
+ }
+
+ private static enum FlushRequired {
+ ON, OFF;
+
+ public boolean on() {
+ return this == ON;
+ }
+ }
+
+ private FakeIOLoop loop;
+ private TestByteChannel channel;
+ private TestMessageStream buffer;
+ private TestKey key;
+
+ @Before
+ public void setUp() throws IOException {
+ loop = new FakeIOLoop();
+ channel = new TestByteChannel();
+ key = new TestKey(channel);
+ buffer = loop.createStream(channel);
+ buffer.setKey(key);
+ }
+
+ @After
+ public void tearDown() {
+ loop.shutdown();
+ buffer.close();
+ }
+
+ // Check state of the message buffer
+ private void assertState(WritePending wp, FlushRequired fr,
+ int read, int written) {
+ assertEquals(wp.on(), buffer.isWritePending());
+// assertEquals(fr.on(), buffer.requiresFlush());
+ assertEquals(read, channel.readBytes);
+ assertEquals(written, channel.writtenBytes);
+ }
+
+ @Test
+ public void endOfStream() throws IOException {
+ channel.close();
+ List<TestMessage> messages = buffer.read();
+ assertNull(messages);
+ }
+
+ @Test
+ public void bufferGrowth() throws IOException {
+ // Create a buffer for big messages and test the growth.
+ buffer = new TestMessageStream(BIG_SIZE, channel, loop);
+ buffer.write(BIG_MESSAGE);
+ buffer.write(BIG_MESSAGE);
+ buffer.write(BIG_MESSAGE);
+ buffer.write(BIG_MESSAGE);
+ buffer.write(BIG_MESSAGE);
+ }
+
+ @Test
+ public void discardBeforeKey() {
+ // Create a buffer that does not yet have the key set and discard it.
+ buffer = loop.createStream(channel);
+ assertNull(buffer.key());
+ buffer.close();
+ // There is not key, so nothing to check; we just expect no problem.
+ }
+
+ @Test
+ public void bufferedRead() throws IOException {
+ channel.bytesToRead = LENGTH + 4;
+ List<TestMessage> messages = buffer.read();
+ assertEquals(1, messages.size());
+ assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, 0);
+
+ channel.bytesToRead = LENGTH - 4;
+ messages = buffer.read();
+ assertEquals(1, messages.size());
+ assertState(WritePending.OFF, FlushRequired.OFF, LENGTH * 2, 0);
+ }
+
+ @Test
+ public void bufferedWrite() throws IOException {
+ assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+
+ // First write is immediate...
+ buffer.write(TM1);
+ assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
+
+ // Second and third get buffered...
+ buffer.write(TM2);
+ assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
+ buffer.write(TM3);
+ assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
+
+ // Reset write, which will flush if needed; the next write is again buffered
+ buffer.flushIfWriteNotPending();
+ assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 3);
+ buffer.write(TM4);
+ assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 3);
+
+ // Select reset, which will flush if needed; the next write is again buffered
+ buffer.flushIfPossible();
+ assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 4);
+ buffer.write(TM1);
+ assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
+ buffer.flush();
+ assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
+ }
+
+ @Test
+ public void bufferedWriteList() throws IOException {
+ assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+
+ // First write is immediate...
+ List<TestMessage> messages = new ArrayList<TestMessage>();
+ messages.add(TM1);
+ messages.add(TM2);
+ messages.add(TM3);
+ messages.add(TM4);
+
+ buffer.write(messages);
+ assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 4);
+
+ buffer.write(messages);
+ assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
+
+ buffer.flushIfPossible();
+ assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 8);
+ }
+
+ @Test
+ public void bufferedPartialWrite() throws IOException {
+ assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+
+ // First write is immediate...
+ buffer.write(TM1);
+ assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
+
+ // Tell test channel to accept only half.
+ channel.bytesToWrite = LENGTH / 2;
+
+ // Second and third get buffered...
+ buffer.write(TM2);
+ assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
+ buffer.flushIfPossible();
+ assertState(WritePending.ON, FlushRequired.ON, 0, LENGTH + LENGTH / 2);
+ }
+
+ @Test
+ public void bufferedPartialWrite2() throws IOException {
+ assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
+
+ // First write is immediate...
+ buffer.write(TM1);
+ assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
+
+ // Tell test channel to accept only half.
+ channel.bytesToWrite = LENGTH / 2;
+
+ // Second and third get buffered...
+ buffer.write(TM2);
+ assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
+ buffer.flushIfWriteNotPending();
+ assertState(WritePending.ON, FlushRequired.ON, 0, LENGTH + LENGTH / 2);
+ }
+
+ @Test
+ public void bufferedReadWrite() throws IOException {
+ channel.bytesToRead = LENGTH + 4;
+ List<TestMessage> messages = buffer.read();
+ assertEquals(1, messages.size());
+ assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, 0);
+
+ buffer.write(TM1);
+ assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, LENGTH);
+
+ channel.bytesToRead = LENGTH - 4;
+ messages = buffer.read();
+ assertEquals(1, messages.size());
+ assertState(WritePending.OFF, FlushRequired.OFF, LENGTH * 2, LENGTH);
+ }
+
+ // Fake IO driver loop
+ private static class FakeIOLoop extends IOLoop<TestMessage, TestMessageStream> {
+
+ public FakeIOLoop() throws IOException {
+ super(500);
+ }
+
+ @Override
+ protected TestMessageStream createStream(ByteChannel channel) {
+ return new TestMessageStream(LENGTH, channel, this);
+ }
+
+ @Override
+ protected void processMessages(List<TestMessage> messages,
+ MessageStream<TestMessage> stream) {
+ }
+
+ }
+
+ // Byte channel test fixture
+ private static class TestByteChannel extends SelectableChannel implements ByteChannel {
+
+ private static final int BUFFER_LENGTH = 1024;
+ byte[] bytes = new byte[BUFFER_LENGTH];
+ int bytesToWrite = BUFFER_LENGTH;
+ int bytesToRead = BUFFER_LENGTH;
+ int writtenBytes = 0;
+ int readBytes = 0;
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ int l = Math.min(dst.remaining(), bytesToRead);
+ if (bytesToRead > 0) {
+ readBytes += l;
+ dst.put(bytes, 0, l);
+ }
+ return l;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ int l = Math.min(src.remaining(), bytesToWrite);
+ writtenBytes += l;
+ src.get(bytes, 0, l);
+ return l;
+ }
+
+ @Override
+ public Object blockingLock() {
+ return null;
+ }
+
+ @Override
+ public SelectableChannel configureBlocking(boolean arg0) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean isBlocking() {
+ return false;
+ }
+
+ @Override
+ public boolean isRegistered() {
+ return false;
+ }
+
+ @Override
+ public SelectionKey keyFor(Selector arg0) {
+ return null;
+ }
+
+ @Override
+ public SelectorProvider provider() {
+ return null;
+ }
+
+ @Override
+ public SelectionKey register(Selector arg0, int arg1, Object arg2)
+ throws ClosedChannelException {
+ return null;
+ }
+
+ @Override
+ public int validOps() {
+ return 0;
+ }
+
+ @Override
+ protected void implCloseChannel() throws IOException {
+ bytesToRead = -1;
+ }
+
+ }
+
+ // Selection key text fixture
+ private static class TestKey extends SelectionKey {
+
+ private SelectableChannel channel;
+
+ public TestKey(TestByteChannel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public void cancel() {
+ }
+
+ @Override
+ public SelectableChannel channel() {
+ return channel;
+ }
+
+ @Override
+ public int interestOps() {
+ return 0;
+ }
+
+ @Override
+ public SelectionKey interestOps(int ops) {
+ return null;
+ }
+
+ @Override
+ public boolean isValid() {
+ return true;
+ }
+
+ @Override
+ public int readyOps() {
+ return 0;
+ }
+
+ @Override
+ public Selector selector() {
+ return null;
+ }
+ }
+
+}
diff --git a/utils/nio/src/test/java/org/onlab/nio/MockSelector.java b/utils/nio/src/test/java/org/onlab/nio/MockSelector.java
new file mode 100644
index 0000000..a162aed
--- /dev/null
+++ b/utils/nio/src/test/java/org/onlab/nio/MockSelector.java
@@ -0,0 +1,70 @@
+package org.onlab.nio;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.AbstractSelectableChannel;
+import java.nio.channels.spi.AbstractSelector;
+import java.util.Set;
+
+/**
+ * A selector instrumented for unit tests.
+ */
+public class MockSelector extends AbstractSelector {
+
+ int wakeUpCount = 0;
+
+ /**
+ * Creates a mock selector, specifying null as the SelectorProvider.
+ */
+ public MockSelector() {
+ super(null);
+ }
+
+ @Override
+ public String toString() {
+ return "{MockSelector: wake=" + wakeUpCount + "}";
+ }
+
+ @Override
+ protected void implCloseSelector() throws IOException {
+ }
+
+ @Override
+ protected SelectionKey register(AbstractSelectableChannel ch, int ops,
+ Object att) {
+ return null;
+ }
+
+ @Override
+ public Set<SelectionKey> keys() {
+ return null;
+ }
+
+ @Override
+ public Set<SelectionKey> selectedKeys() {
+ return null;
+ }
+
+ @Override
+ public int selectNow() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int select(long timeout) throws IOException {
+ return 0;
+ }
+
+ @Override
+ public int select() throws IOException {
+ return 0;
+ }
+
+ @Override
+ public Selector wakeup() {
+ wakeUpCount++;
+ return null;
+ }
+
+}
diff --git a/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedClient.java b/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedClient.java
new file mode 100644
index 0000000..6445221
--- /dev/null
+++ b/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedClient.java
@@ -0,0 +1,292 @@
+package org.onlab.nio;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.onlab.junit.TestTools.delay;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Auxiliary test fixture to measure speed of NIO-based channels.
+ */
+public class StandaloneSpeedClient {
+
+ private static Logger log = LoggerFactory.getLogger(StandaloneSpeedClient.class);
+
+ private final InetAddress ip;
+ private final int port;
+ private final int msgCount;
+ private final int msgLength;
+
+ private final List<CustomIOLoop> iloops = new ArrayList<>();
+ private final ExecutorService ipool;
+ private final ExecutorService wpool;
+
+// ThroughputTracker messages;
+// ThroughputTracker bytes;
+
+ /**
+ * Main entry point to launch the client.
+ *
+ * @param args command-line arguments
+ * @throws IOException if unable to connect to server
+ * @throws InterruptedException if latch wait gets interrupted
+ * @throws ExecutionException if wait gets interrupted
+ * @throws TimeoutException if timeout occurred while waiting for completion
+ */
+ public static void main(String[] args)
+ throws IOException, InterruptedException, ExecutionException, TimeoutException {
+ InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
+ int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
+ int mc = args.length > 2 ? Integer.parseInt(args[2]) : 50 * 1000000;
+ int ml = args.length > 3 ? Integer.parseInt(args[3]) : 128;
+ int to = args.length > 4 ? Integer.parseInt(args[4]) : 30;
+
+ log.info("Setting up client with {} workers sending {} {}-byte messages to {} server... ",
+ wc, mc, ml, ip);
+ StandaloneSpeedClient sc = new StandaloneSpeedClient(ip, wc, mc, ml, StandaloneSpeedServer.PORT);
+
+ sc.start();
+ delay(2000);
+
+ sc.await(to);
+ sc.report();
+
+ System.exit(0);
+ }
+
+ /**
+ * Creates a speed client.
+ *
+ * @param ip ip address of server
+ * @param wc worker count
+ * @param mc message count to send per client
+ * @param ml message length in bytes
+ * @param port socket port
+ * @throws IOException if unable to create IO loops
+ */
+ public StandaloneSpeedClient(InetAddress ip, int wc, int mc, int ml, int port) throws IOException {
+ this.ip = ip;
+ this.port = port;
+ this.msgCount = mc;
+ this.msgLength = ml;
+ this.wpool = Executors.newFixedThreadPool(wc, namedThreads("worker"));
+ this.ipool = Executors.newFixedThreadPool(wc, namedThreads("io-loop"));
+
+ for (int i = 0; i < wc; i++) {
+ iloops.add(new CustomIOLoop());
+ }
+ }
+
+ /**
+ * Starts the client workers.
+ *
+ * @throws IOException if unable to open connection
+ */
+ public void start() throws IOException {
+// messages = new ThroughputTracker();
+// bytes = new ThroughputTracker();
+
+ // First start up all the IO loops
+ for (CustomIOLoop l : iloops) {
+ ipool.execute(l);
+ }
+
+// // Wait for all of them to get going
+// for (CustomIOLoop l : iloops)
+// l.waitForStart(TIMEOUT);
+
+ // ... and Next open all connections; one-per-loop
+ for (CustomIOLoop l : iloops) {
+ openConnection(l);
+ }
+ }
+
+
+ /**
+ * Initiates open connection request and registers the pending socket
+ * channel with the given IO loop.
+ *
+ * @param loop loop with which the channel should be registered
+ * @throws IOException if the socket could not be open or connected
+ */
+ private void openConnection(CustomIOLoop loop) throws IOException {
+ SocketAddress sa = new InetSocketAddress(ip, port);
+ SocketChannel ch = SocketChannel.open();
+ ch.configureBlocking(false);
+ loop.connectStream(ch);
+ ch.connect(sa);
+ }
+
+
+ /**
+ * Waits for the client workers to complete.
+ *
+ * @param secs timeout in seconds
+ * @throws ExecutionException if execution failed
+ * @throws InterruptedException if interrupt occurred while waiting
+ * @throws TimeoutException if timeout occurred
+ */
+ public void await(int secs) throws InterruptedException,
+ ExecutionException, TimeoutException {
+ for (CustomIOLoop l : iloops) {
+ if (l.worker.task != null) {
+ l.worker.task.get(secs, TimeUnit.SECONDS);
+ }
+ }
+// messages.freeze();
+// bytes.freeze();
+ }
+
+ /**
+ * Reports on the accumulated throughput trackers.
+ */
+ public void report() {
+// DecimalFormat f = new DecimalFormat("#,##0");
+// log.info("{} messages; {} bytes; {} mps; {} Mbs",
+// f.format(messages.total()),
+// f.format(bytes.total()),
+// f.format(messages.throughput()),
+// f.format(bytes.throughput() / (1024 * 128)));
+ }
+
+
+ // Loop for transfer of fixed-length messages
+ private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
+
+ Worker worker = new Worker();
+
+ public CustomIOLoop() throws IOException {
+ super(500);
+ }
+
+
+ @Override
+ protected TestMessageStream createStream(ByteChannel channel) {
+ return new TestMessageStream(msgLength, channel, this);
+ }
+
+ @Override
+ protected synchronized void removeStream(MessageStream<TestMessage> b) {
+ super.removeStream(b);
+
+// messages.add(b.inMessages().total());
+// bytes.add(b.inBytes().total());
+// b.inMessages().reset();
+// b.inBytes().reset();
+
+// log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
+// StandaloneSpeedServer.format.format(b.inMessages().throughput()),
+// StandaloneSpeedServer.format.format(b.inBytes().throughput() / (1024 * 128)),
+// StandaloneSpeedServer.format.format(b.outMessages().throughput()),
+// StandaloneSpeedServer.format.format(b.outBytes().throughput() / (1024 * 128)));
+ }
+
+ @Override
+ protected void processMessages(List<TestMessage> messages,
+ MessageStream<TestMessage> b) {
+ worker.release(messages.size());
+ }
+
+ @Override
+ protected void connect(SelectionKey key) {
+ super.connect(key);
+ TestMessageStream b = (TestMessageStream) key.attachment();
+ Worker w = ((CustomIOLoop) b.loop()).worker;
+ w.pump(b);
+ }
+
+ }
+
+ /**
+ * Auxiliary worker to connect and pump batched messages using blocking I/O.
+ */
+ private class Worker implements Runnable {
+
+ private static final int BATCH_SIZE = 1000;
+ private static final int PERMITS = 2 * BATCH_SIZE;
+
+ private TestMessageStream b;
+ private FutureTask<Worker> task;
+
+ // Stuff to throttle pump
+ private final Semaphore semaphore = new Semaphore(PERMITS);
+ private int msgWritten;
+
+ void pump(TestMessageStream b) {
+ this.b = b;
+ task = new FutureTask<>(this, this);
+ wpool.execute(task);
+ }
+
+ @Override
+ public void run() {
+ try {
+ log.info("Worker started...");
+
+ List<TestMessage> batch = new ArrayList<>();
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ batch.add(new TestMessage(msgLength));
+ }
+
+ while (msgWritten < msgCount) {
+ msgWritten += writeBatch(b, batch);
+ }
+
+ // Now try to get all the permits back before sending poison pill
+ semaphore.acquireUninterruptibly(PERMITS);
+ b.close();
+
+ log.info("Worker done...");
+
+ } catch (IOException e) {
+ log.error("Worker unable to perform I/O", e);
+ }
+ }
+
+
+ private int writeBatch(TestMessageStream b, List<TestMessage> batch)
+ throws IOException {
+ int count = Math.min(BATCH_SIZE, msgCount - msgWritten);
+ acquire(count);
+ if (count == BATCH_SIZE) {
+ b.write(batch);
+ } else {
+ for (int i = 0; i < count; i++) {
+ b.write(batch.get(i));
+ }
+ }
+ return count;
+ }
+
+
+ // Release permits based on the specified number of message credits
+ private void release(int permits) {
+ semaphore.release(permits);
+ }
+
+ // Acquire permit for a single batch
+ private void acquire(int permits) {
+ semaphore.acquireUninterruptibly(permits);
+ }
+
+ }
+
+}
diff --git a/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedServer.java b/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedServer.java
new file mode 100644
index 0000000..a193f50
--- /dev/null
+++ b/utils/nio/src/test/java/org/onlab/nio/StandaloneSpeedServer.java
@@ -0,0 +1,217 @@
+package org.onlab.nio;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.onlab.junit.TestTools.delay;
+import static org.onlab.util.Tools.namedThreads;
+
+/**
+ * Auxiliary test fixture to measure speed of NIO-based channels.
+ */
+public class StandaloneSpeedServer {
+
+ private static Logger log = LoggerFactory.getLogger(StandaloneSpeedServer.class);
+
+ private static final int PRUNE_FREQUENCY = 1000;
+
+ static final int PORT = 9876;
+ static final long TIMEOUT = 1000;
+
+ static final boolean SO_NO_DELAY = false;
+ static final int SO_SEND_BUFFER_SIZE = 1024 * 1024;
+ static final int SO_RCV_BUFFER_SIZE = 1024 * 1024;
+
+ static final DecimalFormat FORMAT = new DecimalFormat("#,##0");
+
+ private final AcceptorLoop aloop;
+ private final ExecutorService apool = Executors.newSingleThreadExecutor(namedThreads("accept"));
+
+ private final List<CustomIOLoop> iloops = new ArrayList<>();
+ private final ExecutorService ipool;
+
+ private final int workerCount;
+ private final int msgLength;
+ private int lastWorker = -1;
+
+// ThroughputTracker messages;
+// ThroughputTracker bytes;
+
+ /**
+ * Main entry point to launch the server.
+ *
+ * @param args command-line arguments
+ * @throws IOException if unable to crate IO loops
+ */
+ public static void main(String[] args) throws IOException {
+ InetAddress ip = InetAddress.getByName(args.length > 0 ? args[0] : "127.0.0.1");
+ int wc = args.length > 1 ? Integer.parseInt(args[1]) : 6;
+ int ml = args.length > 2 ? Integer.parseInt(args[2]) : 128;
+
+ log.info("Setting up the server with {} workers, {} byte messages on {}... ",
+ wc, ml, ip);
+ StandaloneSpeedServer ss = new StandaloneSpeedServer(ip, wc, ml, PORT);
+ ss.start();
+
+ // Start pruning clients.
+ while (true) {
+ delay(PRUNE_FREQUENCY);
+ ss.prune();
+ }
+ }
+
+ /**
+ * Creates a speed server.
+ *
+ * @param ip optional ip of the adapter where to bind
+ * @param wc worker count
+ * @param ml message length in bytes
+ * @param port listen port
+ * @throws IOException if unable to create IO loops
+ */
+ public StandaloneSpeedServer(InetAddress ip, int wc, int ml, int port) throws IOException {
+ this.workerCount = wc;
+ this.msgLength = ml;
+ this.ipool = Executors.newFixedThreadPool(workerCount, namedThreads("io-loop"));
+
+ this.aloop = new CustomAcceptLoop(new InetSocketAddress(ip, port));
+ for (int i = 0; i < workerCount; i++) {
+ iloops.add(new CustomIOLoop());
+ }
+ }
+
+ /**
+ * Start the server IO loops and kicks off throughput tracking.
+ */
+ public void start() {
+// messages = new ThroughputTracker();
+// bytes = new ThroughputTracker();
+
+ for (CustomIOLoop l : iloops) {
+ ipool.execute(l);
+ }
+ apool.execute(aloop);
+//
+// for (CustomIOLoop l : iloops)
+// l.waitForStart(TIMEOUT);
+// aloop.waitForStart(TIMEOUT);
+ }
+
+ /**
+ * Stop the server IO loops and freezes throughput tracking.
+ */
+ public void stop() {
+ aloop.shutdown();
+ for (CustomIOLoop l : iloops) {
+ l.shutdown();
+ }
+
+// for (CustomIOLoop l : iloops)
+// l.waitForFinish(TIMEOUT);
+// aloop.waitForFinish(TIMEOUT);
+//
+// messages.freeze();
+// bytes.freeze();
+ }
+
+ /**
+ * Reports on the accumulated throughput trackers.
+ */
+ public void report() {
+// DecimalFormat f = new DecimalFormat("#,##0");
+// log.info("{} messages; {} bytes; {} mps; {} Mbs",
+// f.format(messages.total()),
+// f.format(bytes.total()),
+// f.format(messages.throughput()),
+// f.format(bytes.throughput() / (1024 * 128)));
+ }
+
+ /**
+ * Prunes the IO loops of stale message buffers.
+ */
+ public void prune() {
+ for (CustomIOLoop l : iloops) {
+ l.pruneStaleStreams();
+ }
+ }
+
+ // Get the next worker to which a client should be assigned
+ private synchronized CustomIOLoop nextWorker() {
+ lastWorker = (lastWorker + 1) % workerCount;
+ return iloops.get(lastWorker);
+ }
+
+ // Loop for transfer of fixed-length messages
+ private class CustomIOLoop extends IOLoop<TestMessage, TestMessageStream> {
+
+ public CustomIOLoop() throws IOException {
+ super(500);
+ }
+
+ @Override
+ protected TestMessageStream createStream(ByteChannel channel) {
+ return new TestMessageStream(msgLength, channel, this);
+ }
+
+ @Override
+ protected void removeStream(MessageStream<TestMessage> stream) {
+ super.removeStream(stream);
+//
+// messages.add(b.inMessages().total());
+// bytes.add(b.inBytes().total());
+//
+// log.info("Disconnected client; inbound {} mps, {} Mbps; outbound {} mps, {} Mbps",
+// format.format(b.inMessages().throughput()),
+// format.format(b.inBytes().throughput() / (1024 * 128)),
+// format.format(b.outMessages().throughput()),
+// format.format(b.outBytes().throughput() / (1024 * 128)));
+ }
+
+ @Override
+ protected void processMessages(List<TestMessage> messages,
+ MessageStream<TestMessage> stream) {
+ try {
+ stream.write(messages);
+ } catch (IOException e) {
+ log.error("Unable to echo messages", e);
+ }
+ }
+ }
+
+ // Loop for accepting client connections
+ private class CustomAcceptLoop extends AcceptorLoop {
+
+ public CustomAcceptLoop(SocketAddress address) throws IOException {
+ super(500, address);
+ }
+
+ @Override
+ protected void acceptConnection(ServerSocketChannel channel) throws IOException {
+ SocketChannel sc = channel.accept();
+ sc.configureBlocking(false);
+
+ Socket so = sc.socket();
+ so.setTcpNoDelay(SO_NO_DELAY);
+ so.setReceiveBufferSize(SO_RCV_BUFFER_SIZE);
+ so.setSendBufferSize(SO_SEND_BUFFER_SIZE);
+
+ nextWorker().acceptStream(sc);
+ log.info("Connected client");
+ }
+ }
+
+}
diff --git a/utils/nio/src/test/java/org/onlab/nio/TestMessage.java b/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
new file mode 100644
index 0000000..00315ec
--- /dev/null
+++ b/utils/nio/src/test/java/org/onlab/nio/TestMessage.java
@@ -0,0 +1,39 @@
+package org.onlab.nio;
+
+/**
+ * Fixed-length message.
+ */
+public class TestMessage extends AbstractMessage {
+
+ private final byte[] data;
+
+ /**
+ * Creates a new message with the specified length.
+ *
+ * @param length message length
+ */
+ public TestMessage(int length) {
+ this.length = length;
+ data = new byte[length];
+ }
+
+ /**
+ * Creates a new message with the specified data.
+ *
+ * @param data message data
+ */
+ TestMessage(byte[] data) {
+ this.length = data.length;
+ this.data = data;
+ }
+
+ /**
+ * Gets the backing byte array data.
+ *
+ * @return backing byte array
+ */
+ public byte[] data() {
+ return data;
+ }
+
+}
diff --git a/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java b/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
new file mode 100644
index 0000000..a8ab8fa
--- /dev/null
+++ b/utils/nio/src/test/java/org/onlab/nio/TestMessageStream.java
@@ -0,0 +1,55 @@
+package org.onlab.nio;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+
+/**
+ * Fixed-length message transfer buffer.
+ */
+public class TestMessageStream extends MessageStream<TestMessage> {
+
+ private static final String E_WRONG_LEN = "Illegal message length: ";
+
+ private final int length;
+
+ /**
+ * Create a new buffer for transferring messages of the specified length.
+ *
+ * @param length message length
+ * @param ch backing channel
+ * @param loop driver loop
+ */
+ public TestMessageStream(int length, ByteChannel ch,
+ IOLoop<TestMessage, ?> loop) {
+ super(loop, ch, 64 * 1024, 500);
+ this.length = length;
+ }
+
+ @Override
+ protected TestMessage read(ByteBuffer rb) {
+ if (rb.remaining() < length) {
+ return null;
+ }
+ TestMessage message = new TestMessage(length);
+ rb.get(message.data());
+ return message;
+ }
+
+ /**
+ * {@inheritDoc}
+ * <p/>
+ * This implementation enforces the message length against the buffer
+ * supported length.
+ *
+ * @throws IllegalArgumentException if message size does not match the
+ * supported buffer size
+ */
+ @Override
+ protected void write(TestMessage message, ByteBuffer wb) {
+ if (message.length() != length) {
+ throw new IllegalArgumentException(E_WRONG_LEN + message.length());
+ }
+ wb.put(message.data());
+ }
+
+}