blob: b2acca4bd21903dbe4120a589a65b5fe96b16352 [file] [log] [blame]
package org.onlab.nio;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.lang.System.currentTimeMillis;
import static java.nio.ByteBuffer.allocateDirect;
/**
* Bi-directional message stream for transferring messages to & from the
* network via two byte buffers.
*
* @param <M> message type
*/
public abstract class MessageStream<M extends Message> {
protected Logger log = LoggerFactory.getLogger(getClass());
private final IOLoop<M, ?> loop;
private final ByteChannel channel;
private final int maxIdleMillis;
private final ByteBuffer inbound;
private ByteBuffer outbound;
private SelectionKey key;
private volatile boolean closed = false;
private volatile boolean writePending;
private volatile boolean writeOccurred;
private Exception ioError;
private long lastActiveTime;
/**
* Creates a message stream associated with the specified IO loop and
* backed by the given byte channel.
*
* @param loop IO loop
* @param byteChannel backing byte channel
* @param bufferSize size of the backing byte buffers
* @param maxIdleMillis maximum number of millis the stream can be idle
* before it will be closed
*/
protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
int bufferSize, int maxIdleMillis) {
this.loop = checkNotNull(loop, "Loop cannot be null");
this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
checkArgument(maxIdleMillis > 0, "Idle time must be positive");
this.maxIdleMillis = maxIdleMillis;
inbound = allocateDirect(bufferSize);
outbound = allocateDirect(bufferSize);
}
/**
* Gets a single message from the specified byte buffer; this is
* to be done without manipulating the buffer via flip, reset or clear.
*
* @param buffer byte buffer
* @return read message or null if there are not enough bytes to read
* a complete message
*/
protected abstract M read(ByteBuffer buffer);
/**
* Puts the specified message into the specified byte buffer; this is
* to be done without manipulating the buffer via flip, reset or clear.
*
* @param message message to be write into the buffer
* @param buffer byte buffer
*/
protected abstract void write(M message, ByteBuffer buffer);
/**
* Closes the message buffer.
*/
public void close() {
synchronized (this) {
if (closed) {
return;
}
closed = true;
}
loop.removeStream(this);
if (key != null) {
try {
key.cancel();
key.channel().close();
} catch (IOException e) {
log.warn("Unable to close stream", e);
}
}
}
/**
* Indicates whether this buffer has been closed.
*
* @return true if this stream has been closed
*/
public synchronized boolean isClosed() {
return closed;
}
/**
* Returns the stream IO selection key.
*
* @return socket channel registration selection key
*/
public SelectionKey key() {
return key;
}
/**
* Binds the selection key to be used for driving IO operations on the stream.
*
* @param key IO selection key
*/
public void setKey(SelectionKey key) {
this.key = key;
this.lastActiveTime = currentTimeMillis();
}
/**
* Returns the IO loop to which this stream is bound.
*
* @return I/O loop used to drive this stream
*/
public IOLoop<M, ?> loop() {
return loop;
}
/**
* Indicates whether the any prior IO encountered an error.
*
* @return true if a write failed
*/
public boolean hadError() {
return ioError != null;
}
/**
* Gets the prior IO error, if one occurred.
*
* @return IO error; null if none occurred
*/
public Exception getError() {
return ioError;
}
/**
* Reads, withouth blocking, a list of messages from the stream.
* The list will be empty if there were not messages pending.
*
* @return list of messages or null if backing channel has been closed
* @throws IOException if messages could not be read
*/
public List<M> read() throws IOException {
try {
int read = channel.read(inbound);
if (read != -1) {
// Read the messages one-by-one and add them to the list.
List<M> messages = new ArrayList<>();
M message;
inbound.flip();
while ((message = read(inbound)) != null) {
messages.add(message);
}
inbound.compact();
// Mark the stream with current time to indicate liveness.
lastActiveTime = currentTimeMillis();
return messages;
}
return null;
} catch (Exception e) {
throw new IOException("Unable to read messages", e);
}
}
/**
* Writes the specified list of messages to the stream.
*
* @param messages list of messages to write
* @throws IOException if error occurred while writing the data
*/
public void write(List<M> messages) throws IOException {
synchronized (this) {
// First write all messages.
for (M m : messages) {
append(m);
}
flushUnlessAlreadyPlanningTo();
}
}
/**
* Writes the given message to the stream.
*
* @param message message to write
* @throws IOException if error occurred while writing the data
*/
public void write(M message) throws IOException {
synchronized (this) {
append(message);
flushUnlessAlreadyPlanningTo();
}
}
// Appends the specified message into the internal buffer, growing the
// buffer if required.
private void append(M message) {
// If the buffer does not have sufficient length double it.
while (outbound.remaining() < message.length()) {
doubleSize();
}
// Place the message into the buffer and bump the output trackers.
write(message, outbound);
}
// Forces a flush, unless one is planned already.
private void flushUnlessAlreadyPlanningTo() throws IOException {
if (!writeOccurred && !writePending) {
flush();
}
}
/**
* Flushes any pending writes.
*
* @throws IOException if flush failed
*/
public void flush() throws IOException {
synchronized (this) {
if (!writeOccurred && !writePending) {
outbound.flip();
try {
channel.write(outbound);
} catch (IOException e) {
if (!closed && !e.getMessage().equals("Broken pipe")) {
log.warn("Unable to write data", e);
ioError = e;
}
}
lastActiveTime = currentTimeMillis();
writeOccurred = true;
writePending = outbound.hasRemaining();
outbound.compact();
}
}
}
/**
* Indicates whether the stream has bytes to be written to the channel.
*
* @return true if there are bytes to be written
*/
boolean isWritePending() {
synchronized (this) {
return writePending;
}
}
/**
* Attempts to flush data, internal stream state and channel availability
* permitting. Invoked by the driver I/O loop during handling of writable
* selection key.
* <p/>
* Resets the internal state flags {@code writeOccurred} and
* {@code writePending}.
*
* @throws IOException if implicit flush failed
*/
void flushIfPossible() throws IOException {
synchronized (this) {
writePending = false;
writeOccurred = false;
if (outbound.position() > 0) {
flush();
}
}
key.interestOps(SelectionKey.OP_READ);
}
/**
* Attempts to flush data, internal stream state and channel availability
* permitting and if other writes are not pending. Invoked by the driver
* I/O loop prior to entering select wait. Resets the internal
* {@code writeOccurred} state flag.
*
* @throws IOException if implicit flush failed
*/
void flushIfWriteNotPending() throws IOException {
synchronized (this) {
writeOccurred = false;
if (!writePending && outbound.position() > 0) {
flush();
}
}
if (isWritePending()) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
}
}
/**
* Doubles the size of the outbound buffer.
*/
private void doubleSize() {
ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
outbound.flip();
newBuffer.put(outbound);
outbound = newBuffer;
}
/**
* Returns the maximum number of milliseconds the stream is allowed
* without any read/write operations.
*
* @return number if millis of permissible idle time
*/
protected int maxIdleMillis() {
return maxIdleMillis;
}
/**
* Returns true if the given stream has gone stale.
*
* @return true if the stream is stale
*/
boolean isStale() {
return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
}
}