| /* |
| * Copyright 2014 Open Networking Laboratory |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.onlab.nio; |
| |
| import org.onlab.util.Counter; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.ByteChannel; |
| import java.nio.channels.SelectionKey; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Objects; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static java.lang.System.currentTimeMillis; |
| import static java.nio.ByteBuffer.allocateDirect; |
| |
| /** |
| * Bi-directional message stream for transferring messages to & from the |
| * network via two byte buffers. |
| * |
| * @param <M> message type |
| */ |
| public abstract class MessageStream<M extends Message> { |
| |
| protected Logger log = LoggerFactory.getLogger(getClass()); |
| |
| private final IOLoop<M, ?> loop; |
| private final ByteChannel channel; |
| private final int maxIdleMillis; |
| |
| private final ByteBuffer inbound; |
| private ByteBuffer outbound; |
| private SelectionKey key; |
| |
| private volatile boolean closed = false; |
| private volatile boolean writePending; |
| private volatile boolean writeOccurred; |
| |
| private Exception ioError; |
| private long lastActiveTime; |
| |
| private final Counter bytesIn = new Counter(); |
| private final Counter messagesIn = new Counter(); |
| private final Counter bytesOut = new Counter(); |
| private final Counter messagesOut = new Counter(); |
| |
| /** |
| * Creates a message stream associated with the specified IO loop and |
| * backed by the given byte channel. |
| * |
| * @param loop IO loop |
| * @param byteChannel backing byte channel |
| * @param bufferSize size of the backing byte buffers |
| * @param maxIdleMillis maximum number of millis the stream can be idle |
| * before it will be closed |
| */ |
| protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel, |
| int bufferSize, int maxIdleMillis) { |
| this.loop = checkNotNull(loop, "Loop cannot be null"); |
| this.channel = checkNotNull(byteChannel, "Byte channel cannot be null"); |
| |
| checkArgument(maxIdleMillis > 0, "Idle time must be positive"); |
| this.maxIdleMillis = maxIdleMillis; |
| |
| inbound = allocateDirect(bufferSize); |
| outbound = allocateDirect(bufferSize); |
| } |
| |
| /** |
| * Gets a single message from the specified byte buffer; this is |
| * to be done without manipulating the buffer via flip, reset or clear. |
| * |
| * @param buffer byte buffer |
| * @return read message or null if there are not enough bytes to read |
| * a complete message |
| */ |
| protected abstract M read(ByteBuffer buffer); |
| |
| /** |
| * Puts the specified message into the specified byte buffer; this is |
| * to be done without manipulating the buffer via flip, reset or clear. |
| * |
| * @param message message to be write into the buffer |
| * @param buffer byte buffer |
| */ |
| protected abstract void write(M message, ByteBuffer buffer); |
| |
| /** |
| * Closes the message buffer. |
| */ |
| public void close() { |
| synchronized (this) { |
| if (closed) { |
| return; |
| } |
| closed = true; |
| } |
| |
| bytesIn.freeze(); |
| bytesOut.freeze(); |
| messagesIn.freeze(); |
| messagesOut.freeze(); |
| |
| loop.removeStream(this); |
| if (key != null) { |
| try { |
| key.cancel(); |
| key.channel().close(); |
| } catch (IOException e) { |
| log.warn("Unable to close stream", e); |
| } |
| } |
| } |
| |
| /** |
| * Indicates whether this buffer has been closed. |
| * |
| * @return true if this stream has been closed |
| */ |
| public synchronized boolean isClosed() { |
| return closed; |
| } |
| |
| /** |
| * Returns the stream IO selection key. |
| * |
| * @return socket channel registration selection key |
| */ |
| public SelectionKey key() { |
| return key; |
| } |
| |
| /** |
| * Binds the selection key to be used for driving IO operations on the stream. |
| * |
| * @param key IO selection key |
| */ |
| public void setKey(SelectionKey key) { |
| this.key = key; |
| this.lastActiveTime = currentTimeMillis(); |
| } |
| |
| /** |
| * Returns the IO loop to which this stream is bound. |
| * |
| * @return I/O loop used to drive this stream |
| */ |
| public IOLoop<M, ?> loop() { |
| return loop; |
| } |
| |
| /** |
| * Indicates whether the any prior IO encountered an error. |
| * |
| * @return true if a write failed |
| */ |
| public boolean hadError() { |
| return ioError != null; |
| } |
| |
| /** |
| * Gets the prior IO error, if one occurred. |
| * |
| * @return IO error; null if none occurred |
| */ |
| public Exception getError() { |
| return ioError; |
| } |
| |
| /** |
| * Reads, without blocking, a list of messages from the stream. |
| * The list will be empty if there were not messages pending. |
| * |
| * @return list of messages or null if backing channel has been closed |
| * @throws IOException if messages could not be read |
| */ |
| public List<M> read() throws IOException { |
| try { |
| int read = channel.read(inbound); |
| if (read != -1) { |
| // Read the messages one-by-one and add them to the list. |
| List<M> messages = new ArrayList<>(); |
| M message; |
| inbound.flip(); |
| while ((message = read(inbound)) != null) { |
| messages.add(message); |
| messagesIn.add(1); |
| bytesIn.add(message.length()); |
| } |
| inbound.compact(); |
| |
| // Mark the stream with current time to indicate liveness. |
| lastActiveTime = currentTimeMillis(); |
| return messages; |
| } |
| return null; |
| |
| } catch (Exception e) { |
| throw new IOException("Unable to read messages", e); |
| } |
| } |
| |
| /** |
| * Writes the specified list of messages to the stream. |
| * |
| * @param messages list of messages to write |
| * @throws IOException if error occurred while writing the data |
| */ |
| public void write(List<M> messages) throws IOException { |
| synchronized (this) { |
| // First write all messages. |
| for (M m : messages) { |
| append(m); |
| } |
| flushUnlessAlreadyPlanningTo(); |
| } |
| } |
| |
| /** |
| * Writes the given message to the stream. |
| * |
| * @param message message to write |
| * @throws IOException if error occurred while writing the data |
| */ |
| public void write(M message) throws IOException { |
| synchronized (this) { |
| append(message); |
| flushUnlessAlreadyPlanningTo(); |
| } |
| } |
| |
| // Appends the specified message into the internal buffer, growing the |
| // buffer if required. |
| private void append(M message) { |
| // If the buffer does not have sufficient length double it. |
| while (outbound.remaining() < message.length()) { |
| doubleSize(); |
| } |
| write(message, outbound); |
| messagesOut.add(1); |
| bytesOut.add(message.length()); |
| } |
| |
| // Forces a flush, unless one is planned already. |
| private void flushUnlessAlreadyPlanningTo() throws IOException { |
| if (!writeOccurred && !writePending) { |
| flush(); |
| } |
| } |
| |
| /** |
| * Flushes any pending writes. |
| * |
| * @throws IOException if flush failed |
| */ |
| public void flush() throws IOException { |
| synchronized (this) { |
| if (!writeOccurred && !writePending) { |
| outbound.flip(); |
| try { |
| channel.write(outbound); |
| } catch (IOException e) { |
| if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) { |
| log.warn("Unable to write data", e); |
| ioError = e; |
| } |
| } |
| lastActiveTime = currentTimeMillis(); |
| writeOccurred = true; |
| writePending = outbound.hasRemaining(); |
| outbound.compact(); |
| } |
| } |
| } |
| |
| /** |
| * Indicates whether the stream has bytes to be written to the channel. |
| * |
| * @return true if there are bytes to be written |
| */ |
| boolean isWritePending() { |
| synchronized (this) { |
| return writePending; |
| } |
| } |
| |
| |
| /** |
| * Indicates whether data has been written but not flushed yet. |
| * |
| * @return true if flush is required |
| */ |
| boolean isFlushRequired() { |
| synchronized (this) { |
| return outbound.position() > 0; |
| } |
| } |
| |
| /** |
| * Attempts to flush data, internal stream state and channel availability |
| * permitting. Invoked by the driver I/O loop during handling of writable |
| * selection key. |
| * <p> |
| * Resets the internal state flags {@code writeOccurred} and |
| * {@code writePending}. |
| * </p> |
| * @throws IOException if implicit flush failed |
| */ |
| void flushIfPossible() throws IOException { |
| synchronized (this) { |
| writePending = false; |
| writeOccurred = false; |
| if (outbound.position() > 0) { |
| flush(); |
| } |
| } |
| key.interestOps(SelectionKey.OP_READ); |
| } |
| |
| /** |
| * Attempts to flush data, internal stream state and channel availability |
| * permitting and if other writes are not pending. Invoked by the driver |
| * I/O loop prior to entering select wait. Resets the internal |
| * {@code writeOccurred} state flag. |
| * |
| * @throws IOException if implicit flush failed |
| */ |
| void flushIfWriteNotPending() throws IOException { |
| synchronized (this) { |
| writeOccurred = false; |
| if (!writePending && outbound.position() > 0) { |
| flush(); |
| } |
| } |
| if (isWritePending()) { |
| key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); |
| } |
| } |
| |
| /** |
| * Doubles the size of the outbound buffer. |
| */ |
| private void doubleSize() { |
| ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2); |
| outbound.flip(); |
| newBuffer.put(outbound); |
| outbound = newBuffer; |
| } |
| |
| /** |
| * Returns the maximum number of milliseconds the stream is allowed |
| * without any read/write operations. |
| * |
| * @return number if millis of permissible idle time |
| */ |
| protected int maxIdleMillis() { |
| return maxIdleMillis; |
| } |
| |
| |
| /** |
| * Returns true if the given stream has gone stale. |
| * |
| * @return true if the stream is stale |
| */ |
| boolean isStale() { |
| return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null; |
| } |
| |
| /** |
| * Returns the inbound bytes counter. |
| * |
| * @return inbound bytes counter |
| */ |
| public Counter bytesIn() { |
| return bytesIn; |
| } |
| |
| /** |
| * Returns the outbound bytes counter. |
| * |
| * @return outbound bytes counter |
| */ |
| public Counter bytesOut() { |
| return bytesOut; |
| } |
| |
| /** |
| * Returns the inbound messages counter. |
| * |
| * @return inbound messages counter |
| */ |
| public Counter messagesIn() { |
| return messagesIn; |
| } |
| |
| /** |
| * Returns the outbound messages counter. |
| * |
| * @return outbound messages counter |
| */ |
| public Counter messagesOut() { |
| return messagesOut; |
| } |
| |
| } |