Working on IO loop stuff.
diff --git a/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java b/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java
new file mode 100644
index 0000000..e7503e9
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/AbstractMessage.java
@@ -0,0 +1,15 @@
+package org.onlab.nio;
+
+/**
+ * Base {@link Message} implementation.
+ */
+public abstract class AbstractMessage implements Message {
+
+ protected int length;
+
+ @Override
+ public int length() {
+ return length;
+ }
+
+}
diff --git a/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java b/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java
index a5acc41..785dbf9 100644
--- a/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java
+++ b/utils/nio/src/main/java/org/onlab/nio/AcceptorLoop.java
@@ -28,7 +28,7 @@
public AcceptorLoop(long selectTimeout, SocketAddress listenAddress)
throws IOException {
super(selectTimeout);
- this.listenAddress = checkNotNull(this.listenAddress, "Address cannot be null");
+ this.listenAddress = checkNotNull(listenAddress, "Address cannot be null");
}
/**
diff --git a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
new file mode 100644
index 0000000..9e1c2d3
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
@@ -0,0 +1,271 @@
+package org.onlab.nio;
+
+import java.io.IOException;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArraySet;
+
+/**
+ * I/O loop for driving inbound & outbound {@link Message} transfer via
+ * {@link MessageStream}.
+ *
+ * @param <M> message type
+ * @param <S> message stream type
+ */
+public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
+ extends SelectorLoop {
+
+ // Queue of requests for new message streams to enter the IO loop processing.
+ private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>();
+
+ // Carries information required for admitting a new message stream.
+ private class NewStreamRequest {
+ private final S stream;
+ private final SelectableChannel channel;
+ private final int op;
+
+ public NewStreamRequest(S stream, SelectableChannel channel, int op) {
+ this.stream = stream;
+ this.channel = channel;
+ this.op = op;
+ }
+ }
+
+ // Set of message streams currently admitted into the IO loop.
+ private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>();
+
+ /**
+ * Creates an IO loop with the given selection timeout.
+ *
+ * @param timeout selection timeout in milliseconds
+ * @throws IOException if the backing selector cannot be opened
+ */
+ public IOLoop(long timeout) throws IOException {
+ super(timeout);
+ }
+
+ /**
+ * Creates a new message stream backed by the specified socket channel.
+ *
+ * @param byteChannel backing byte channel
+ * @return newly created message stream
+ */
+ protected abstract S createStream(ByteChannel byteChannel);
+
+ /**
+ * Removes the specified message stream from the IO loop.
+ *
+ * @param stream message stream to remove
+ */
+ void removeStream(MessageStream<M> stream) {
+ streams.remove(stream);
+ }
+
+ /**
+ * Processes the list of messages extracted from the specified message
+ * stream.
+ *
+ * @param messages non-empty list of received messages
+ * @param stream message stream from which the messages were extracted
+ */
+ protected abstract void processMessages(List<M> messages, MessageStream<M> stream);
+
+ /**
+ * Completes connection request pending on the given selection key.
+ *
+ * @param key selection key holding the pending connect operation.
+ */
+ protected void connect(SelectionKey key) {
+ try {
+ SocketChannel ch = (SocketChannel) key.channel();
+ ch.finishConnect();
+ } catch (IOException | IllegalStateException e) {
+ log.warn("Unable to complete connection", e);
+ }
+
+ if (key.isValid()) {
+ key.interestOps(SelectionKey.OP_READ);
+ }
+ }
+
+ /**
+ * Processes an IO operation pending on the specified key.
+ *
+ * @param key selection key holding the pending I/O operation.
+ */
+ protected void processKeyOperation(SelectionKey key) {
+ @SuppressWarnings("unchecked")
+ S stream = (S) key.attachment();
+
+ try {
+ // If the key is not valid, bail out.
+ if (!key.isValid()) {
+ stream.close();
+ return;
+ }
+
+ // If there is a pending connect operation, complete it.
+ if (key.isConnectable()) {
+ connect(key);
+ }
+
+ // If there is a read operation, slurp as much data as possible.
+ if (key.isReadable()) {
+ List<M> messages = stream.read();
+
+ // No messages or failed flush imply disconnect; bail.
+ if (messages == null || stream.hadError()) {
+ stream.close();
+ return;
+ }
+
+ // If there were any messages read, process them.
+ if (!messages.isEmpty()) {
+ try {
+ processMessages(messages, stream);
+ } catch (RuntimeException e) {
+ onError(stream, e);
+ }
+ }
+ }
+
+ // If there are pending writes, flush them
+ if (key.isWritable()) {
+ stream.flushIfPossible();
+ }
+
+ // If there were any issued flushing, close the stream.
+ if (stream.hadError()) {
+ stream.close();
+ }
+
+ } catch (CancelledKeyException e) {
+ // Key was cancelled, so silently close the stream
+ stream.close();
+ } catch (IOException e) {
+ if (!stream.isClosed() && !isResetByPeer(e)) {
+ log.warn("Unable to process IO", e);
+ }
+ stream.close();
+ }
+ }
+
+ // Indicates whether or not this exception is caused by 'reset by peer'.
+ private boolean isResetByPeer(IOException e) {
+ Throwable cause = e.getCause();
+ return cause != null && cause instanceof IOException &&
+ cause.getMessage().contains("reset by peer");
+ }
+
+ /**
+ * Hook to allow intercept of any errors caused during message processing.
+ * Default behaviour is to rethrow the error.
+ *
+ * @param stream message stream involved in the error
+ * @param error the runtime exception
+ */
+ protected void onError(S stream, RuntimeException error) {
+ throw error;
+ }
+
+ /**
+ * Admits a new message stream backed by the specified socket channel
+ * with a pending accept operation.
+ *
+ * @param channel backing socket channel
+ */
+ public void acceptStream(SocketChannel channel) {
+ createAndAdmit(channel, SelectionKey.OP_READ);
+ }
+
+
+ /**
+ * Admits a new message stream backed by the specified socket channel
+ * with a pending connect operation.
+ *
+ * @param channel backing socket channel
+ */
+ public void connectStream(SocketChannel channel) {
+ createAndAdmit(channel, SelectionKey.OP_CONNECT);
+ }
+
+ /**
+ * Creates a new message stream backed by the specified socket channel
+ * and admits it into the IO loop.
+ *
+ * @param channel socket channel
+ * @param op pending operations mask to be applied to the selection
+ * key as a set of initial interestedOps
+ */
+ private synchronized void createAndAdmit(SocketChannel channel, int op) {
+ S stream = createStream(channel);
+ streams.add(stream);
+ newStreamRequests.add(new NewStreamRequest(stream, channel, op));
+ selector.wakeup();
+ }
+
+ /**
+ * Safely admits new streams into the IO loop.
+ */
+ private void admitNewStreams() {
+ Iterator<NewStreamRequest> it = newStreamRequests.iterator();
+ while (isRunning() && it.hasNext()) {
+ try {
+ NewStreamRequest request = it.next();
+ it.remove();
+ SelectionKey key = request.channel.register(selector, request.op,
+ request.stream);
+ request.stream.setKey(key);
+ } catch (ClosedChannelException e) {
+ log.warn("Unable to admit new message stream", e);
+ }
+ }
+ }
+
+ @Override
+ protected void loop() throws IOException {
+ notifyReady();
+
+ // Keep going until told otherwise.
+ while (isRunning()) {
+ admitNewStreams();
+
+ // Process flushes & write selects on all streams
+ for (MessageStream<M> stream : streams) {
+ stream.flushIfWriteNotPending();
+ }
+
+ // Select keys and process them.
+ int count = selector.select(selectTimeout);
+ if (count > 0 && isRunning()) {
+ Iterator<SelectionKey> it = selector.selectedKeys().iterator();
+ while (it.hasNext()) {
+ SelectionKey key = it.next();
+ it.remove();
+ processKeyOperation(key);
+ }
+ }
+ }
+ }
+
+ /**
+ * Prunes the registered streams by discarding any stale ones.
+ */
+ public synchronized void pruneStaleStreams() {
+ for (MessageStream<M> stream : streams) {
+ if (stream.isStale()) {
+ stream.close();
+ }
+ }
+ }
+
+}
diff --git a/utils/nio/src/main/java/org/onlab/nio/Message.java b/utils/nio/src/main/java/org/onlab/nio/Message.java
new file mode 100644
index 0000000..5ce6d44
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/Message.java
@@ -0,0 +1,15 @@
+package org.onlab.nio;
+
+/**
+ * Representation of a message transferred via {@link MessageStream}.
+ */
+public interface Message {
+
+ /**
+ * Gets the message length in bytes.
+ *
+ * @return number of bytes
+ */
+ int length();
+
+}
diff --git a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
new file mode 100644
index 0000000..b2acca4
--- /dev/null
+++ b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
@@ -0,0 +1,347 @@
+package org.onlab.nio;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ByteChannel;
+import java.nio.channels.SelectionKey;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.lang.System.currentTimeMillis;
+import static java.nio.ByteBuffer.allocateDirect;
+
+/**
+ * Bi-directional message stream for transferring messages to & from the
+ * network via two byte buffers.
+ *
+ * @param <M> message type
+ */
+public abstract class MessageStream<M extends Message> {
+
+ protected Logger log = LoggerFactory.getLogger(getClass());
+
+ private final IOLoop<M, ?> loop;
+ private final ByteChannel channel;
+ private final int maxIdleMillis;
+
+ private final ByteBuffer inbound;
+ private ByteBuffer outbound;
+ private SelectionKey key;
+
+ private volatile boolean closed = false;
+ private volatile boolean writePending;
+ private volatile boolean writeOccurred;
+
+ private Exception ioError;
+ private long lastActiveTime;
+
+
+ /**
+ * Creates a message stream associated with the specified IO loop and
+ * backed by the given byte channel.
+ *
+ * @param loop IO loop
+ * @param byteChannel backing byte channel
+ * @param bufferSize size of the backing byte buffers
+ * @param maxIdleMillis maximum number of millis the stream can be idle
+ * before it will be closed
+ */
+ protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
+ int bufferSize, int maxIdleMillis) {
+ this.loop = checkNotNull(loop, "Loop cannot be null");
+ this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
+
+ checkArgument(maxIdleMillis > 0, "Idle time must be positive");
+ this.maxIdleMillis = maxIdleMillis;
+
+ inbound = allocateDirect(bufferSize);
+ outbound = allocateDirect(bufferSize);
+ }
+
+ /**
+ * Gets a single message from the specified byte buffer; this is
+ * to be done without manipulating the buffer via flip, reset or clear.
+ *
+ * @param buffer byte buffer
+ * @return read message or null if there are not enough bytes to read
+ * a complete message
+ */
+ protected abstract M read(ByteBuffer buffer);
+
+ /**
+ * Puts the specified message into the specified byte buffer; this is
+ * to be done without manipulating the buffer via flip, reset or clear.
+ *
+ * @param message message to be write into the buffer
+ * @param buffer byte buffer
+ */
+ protected abstract void write(M message, ByteBuffer buffer);
+
+ /**
+ * Closes the message buffer.
+ */
+ public void close() {
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ }
+
+ loop.removeStream(this);
+ if (key != null) {
+ try {
+ key.cancel();
+ key.channel().close();
+ } catch (IOException e) {
+ log.warn("Unable to close stream", e);
+ }
+ }
+ }
+
+ /**
+ * Indicates whether this buffer has been closed.
+ *
+ * @return true if this stream has been closed
+ */
+ public synchronized boolean isClosed() {
+ return closed;
+ }
+
+ /**
+ * Returns the stream IO selection key.
+ *
+ * @return socket channel registration selection key
+ */
+ public SelectionKey key() {
+ return key;
+ }
+
+ /**
+ * Binds the selection key to be used for driving IO operations on the stream.
+ *
+ * @param key IO selection key
+ */
+ public void setKey(SelectionKey key) {
+ this.key = key;
+ this.lastActiveTime = currentTimeMillis();
+ }
+
+ /**
+ * Returns the IO loop to which this stream is bound.
+ *
+ * @return I/O loop used to drive this stream
+ */
+ public IOLoop<M, ?> loop() {
+ return loop;
+ }
+
+ /**
+ * Indicates whether the any prior IO encountered an error.
+ *
+ * @return true if a write failed
+ */
+ public boolean hadError() {
+ return ioError != null;
+ }
+
+ /**
+ * Gets the prior IO error, if one occurred.
+ *
+ * @return IO error; null if none occurred
+ */
+ public Exception getError() {
+ return ioError;
+ }
+
+ /**
+ * Reads, withouth blocking, a list of messages from the stream.
+ * The list will be empty if there were not messages pending.
+ *
+ * @return list of messages or null if backing channel has been closed
+ * @throws IOException if messages could not be read
+ */
+ public List<M> read() throws IOException {
+ try {
+ int read = channel.read(inbound);
+ if (read != -1) {
+ // Read the messages one-by-one and add them to the list.
+ List<M> messages = new ArrayList<>();
+ M message;
+ inbound.flip();
+ while ((message = read(inbound)) != null) {
+ messages.add(message);
+ }
+ inbound.compact();
+
+ // Mark the stream with current time to indicate liveness.
+ lastActiveTime = currentTimeMillis();
+ return messages;
+ }
+ return null;
+
+ } catch (Exception e) {
+ throw new IOException("Unable to read messages", e);
+ }
+ }
+
+ /**
+ * Writes the specified list of messages to the stream.
+ *
+ * @param messages list of messages to write
+ * @throws IOException if error occurred while writing the data
+ */
+ public void write(List<M> messages) throws IOException {
+ synchronized (this) {
+ // First write all messages.
+ for (M m : messages) {
+ append(m);
+ }
+ flushUnlessAlreadyPlanningTo();
+ }
+ }
+
+ /**
+ * Writes the given message to the stream.
+ *
+ * @param message message to write
+ * @throws IOException if error occurred while writing the data
+ */
+ public void write(M message) throws IOException {
+ synchronized (this) {
+ append(message);
+ flushUnlessAlreadyPlanningTo();
+ }
+ }
+
+ // Appends the specified message into the internal buffer, growing the
+ // buffer if required.
+ private void append(M message) {
+ // If the buffer does not have sufficient length double it.
+ while (outbound.remaining() < message.length()) {
+ doubleSize();
+ }
+ // Place the message into the buffer and bump the output trackers.
+ write(message, outbound);
+ }
+
+ // Forces a flush, unless one is planned already.
+ private void flushUnlessAlreadyPlanningTo() throws IOException {
+ if (!writeOccurred && !writePending) {
+ flush();
+ }
+ }
+
+ /**
+ * Flushes any pending writes.
+ *
+ * @throws IOException if flush failed
+ */
+ public void flush() throws IOException {
+ synchronized (this) {
+ if (!writeOccurred && !writePending) {
+ outbound.flip();
+ try {
+ channel.write(outbound);
+ } catch (IOException e) {
+ if (!closed && !e.getMessage().equals("Broken pipe")) {
+ log.warn("Unable to write data", e);
+ ioError = e;
+ }
+ }
+ lastActiveTime = currentTimeMillis();
+ writeOccurred = true;
+ writePending = outbound.hasRemaining();
+ outbound.compact();
+ }
+ }
+ }
+
+ /**
+ * Indicates whether the stream has bytes to be written to the channel.
+ *
+ * @return true if there are bytes to be written
+ */
+ boolean isWritePending() {
+ synchronized (this) {
+ return writePending;
+ }
+ }
+
+ /**
+ * Attempts to flush data, internal stream state and channel availability
+ * permitting. Invoked by the driver I/O loop during handling of writable
+ * selection key.
+ * <p/>
+ * Resets the internal state flags {@code writeOccurred} and
+ * {@code writePending}.
+ *
+ * @throws IOException if implicit flush failed
+ */
+ void flushIfPossible() throws IOException {
+ synchronized (this) {
+ writePending = false;
+ writeOccurred = false;
+ if (outbound.position() > 0) {
+ flush();
+ }
+ }
+ key.interestOps(SelectionKey.OP_READ);
+ }
+
+ /**
+ * Attempts to flush data, internal stream state and channel availability
+ * permitting and if other writes are not pending. Invoked by the driver
+ * I/O loop prior to entering select wait. Resets the internal
+ * {@code writeOccurred} state flag.
+ *
+ * @throws IOException if implicit flush failed
+ */
+ void flushIfWriteNotPending() throws IOException {
+ synchronized (this) {
+ writeOccurred = false;
+ if (!writePending && outbound.position() > 0) {
+ flush();
+ }
+ }
+ if (isWritePending()) {
+ key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
+ }
+ }
+
+ /**
+ * Doubles the size of the outbound buffer.
+ */
+ private void doubleSize() {
+ ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
+ outbound.flip();
+ newBuffer.put(outbound);
+ outbound = newBuffer;
+ }
+
+ /**
+ * Returns the maximum number of milliseconds the stream is allowed
+ * without any read/write operations.
+ *
+ * @return number if millis of permissible idle time
+ */
+ protected int maxIdleMillis() {
+ return maxIdleMillis;
+ }
+
+
+ /**
+ * Returns true if the given stream has gone stale.
+ *
+ * @return true if the stream is stale
+ */
+ boolean isStale() {
+ return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
+ }
+
+}