| package org.onlab.nio; |
| |
| import java.io.IOException; |
| import java.nio.channels.ByteChannel; |
| import java.nio.channels.CancelledKeyException; |
| import java.nio.channels.ClosedChannelException; |
| import java.nio.channels.SelectableChannel; |
| import java.nio.channels.SelectionKey; |
| import java.nio.channels.SocketChannel; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.CopyOnWriteArraySet; |
| |
| /** |
| * I/O loop for driving inbound & outbound {@link Message} transfer via |
| * {@link MessageStream}. |
| * |
| * @param <M> message type |
| * @param <S> message stream type |
| */ |
| public abstract class IOLoop<M extends Message, S extends MessageStream<M>> |
| extends SelectorLoop { |
| |
| // Queue of requests for new message streams to enter the IO loop processing. |
| private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>(); |
| |
| // Carries information required for admitting a new message stream. |
| private class NewStreamRequest { |
| private final S stream; |
| private final SelectableChannel channel; |
| private final int op; |
| |
| public NewStreamRequest(S stream, SelectableChannel channel, int op) { |
| this.stream = stream; |
| this.channel = channel; |
| this.op = op; |
| } |
| } |
| |
| // Set of message streams currently admitted into the IO loop. |
| private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>(); |
| |
| /** |
| * Creates an IO loop with the given selection timeout. |
| * |
| * @param timeout selection timeout in milliseconds |
| * @throws IOException if the backing selector cannot be opened |
| */ |
| public IOLoop(long timeout) throws IOException { |
| super(timeout); |
| } |
| |
| /** |
| * Returns the number of message stream in custody of the loop. |
| * |
| * @return number of message streams |
| */ |
| public int streamCount() { |
| return streams.size(); |
| } |
| |
| /** |
| * Creates a new message stream backed by the specified socket channel. |
| * |
| * @param byteChannel backing byte channel |
| * @return newly created message stream |
| */ |
| protected abstract S createStream(ByteChannel byteChannel); |
| |
| /** |
| * Removes the specified message stream from the IO loop. |
| * |
| * @param stream message stream to remove |
| */ |
| protected void removeStream(MessageStream<M> stream) { |
| streams.remove(stream); |
| } |
| |
| /** |
| * Processes the list of messages extracted from the specified message |
| * stream. |
| * |
| * @param messages non-empty list of received messages |
| * @param stream message stream from which the messages were extracted |
| */ |
| protected abstract void processMessages(List<M> messages, MessageStream<M> stream); |
| |
| /** |
| * Completes connection request pending on the given selection key. |
| * |
| * @param key selection key holding the pending connect operation. |
| */ |
| protected void connect(SelectionKey key) { |
| try { |
| SocketChannel ch = (SocketChannel) key.channel(); |
| ch.finishConnect(); |
| } catch (IOException | IllegalStateException e) { |
| log.warn("Unable to complete connection", e); |
| } |
| |
| if (key.isValid()) { |
| key.interestOps(SelectionKey.OP_READ); |
| } |
| } |
| |
| /** |
| * Processes an IO operation pending on the specified key. |
| * |
| * @param key selection key holding the pending I/O operation. |
| */ |
| protected void processKeyOperation(SelectionKey key) { |
| @SuppressWarnings("unchecked") |
| S stream = (S) key.attachment(); |
| |
| try { |
| // If the key is not valid, bail out. |
| if (!key.isValid()) { |
| stream.close(); |
| return; |
| } |
| |
| // If there is a pending connect operation, complete it. |
| if (key.isConnectable()) { |
| connect(key); |
| } |
| |
| // If there is a read operation, slurp as much data as possible. |
| if (key.isReadable()) { |
| List<M> messages = stream.read(); |
| |
| // No messages or failed flush imply disconnect; bail. |
| if (messages == null || stream.hadError()) { |
| stream.close(); |
| return; |
| } |
| |
| // If there were any messages read, process them. |
| if (!messages.isEmpty()) { |
| try { |
| processMessages(messages, stream); |
| } catch (RuntimeException e) { |
| onError(stream, e); |
| } |
| } |
| } |
| |
| // If there are pending writes, flush them |
| if (key.isWritable()) { |
| stream.flushIfPossible(); |
| } |
| |
| // If there were any issued flushing, close the stream. |
| if (stream.hadError()) { |
| stream.close(); |
| } |
| |
| } catch (CancelledKeyException e) { |
| // Key was cancelled, so silently close the stream |
| stream.close(); |
| } catch (IOException e) { |
| if (!stream.isClosed() && !isResetByPeer(e)) { |
| log.warn("Unable to process IO", e); |
| } |
| stream.close(); |
| } |
| } |
| |
| // Indicates whether or not this exception is caused by 'reset by peer'. |
| private boolean isResetByPeer(IOException e) { |
| Throwable cause = e.getCause(); |
| return cause != null && cause instanceof IOException && |
| cause.getMessage().contains("reset by peer"); |
| } |
| |
| /** |
| * Hook to allow intercept of any errors caused during message processing. |
| * Default behaviour is to rethrow the error. |
| * |
| * @param stream message stream involved in the error |
| * @param error the runtime exception |
| */ |
| protected void onError(S stream, RuntimeException error) { |
| throw error; |
| } |
| |
| /** |
| * Admits a new message stream backed by the specified socket channel |
| * with a pending accept operation. |
| * |
| * @param channel backing socket channel |
| * @return newly accepted message stream |
| */ |
| public S acceptStream(SocketChannel channel) { |
| return createAndAdmit(channel, SelectionKey.OP_READ); |
| } |
| |
| |
| /** |
| * Admits a new message stream backed by the specified socket channel |
| * with a pending connect operation. |
| * |
| * @param channel backing socket channel |
| * @return newly connected message stream |
| */ |
| public S connectStream(SocketChannel channel) { |
| return createAndAdmit(channel, SelectionKey.OP_CONNECT); |
| } |
| |
| /** |
| * Creates a new message stream backed by the specified socket channel |
| * and admits it into the IO loop. |
| * |
| * @param channel socket channel |
| * @param op pending operations mask to be applied to the selection |
| * key as a set of initial interestedOps |
| * @return newly created message stream |
| */ |
| private synchronized S createAndAdmit(SocketChannel channel, int op) { |
| S stream = createStream(channel); |
| streams.add(stream); |
| newStreamRequests.add(new NewStreamRequest(stream, channel, op)); |
| selector.wakeup(); |
| return stream; |
| } |
| |
| /** |
| * Safely admits new streams into the IO loop. |
| */ |
| private void admitNewStreams() { |
| Iterator<NewStreamRequest> it = newStreamRequests.iterator(); |
| while (isRunning() && it.hasNext()) { |
| try { |
| NewStreamRequest request = it.next(); |
| it.remove(); |
| SelectionKey key = request.channel.register(selector, request.op, |
| request.stream); |
| request.stream.setKey(key); |
| } catch (ClosedChannelException e) { |
| log.warn("Unable to admit new message stream", e); |
| } |
| } |
| } |
| |
| @Override |
| protected void loop() throws IOException { |
| notifyReady(); |
| |
| // Keep going until told otherwise. |
| while (isRunning()) { |
| admitNewStreams(); |
| |
| // Process flushes & write selects on all streams |
| for (MessageStream<M> stream : streams) { |
| stream.flushIfWriteNotPending(); |
| } |
| |
| // Select keys and process them. |
| int count = selector.select(selectTimeout); |
| if (count > 0 && isRunning()) { |
| Iterator<SelectionKey> it = selector.selectedKeys().iterator(); |
| while (it.hasNext()) { |
| SelectionKey key = it.next(); |
| it.remove(); |
| processKeyOperation(key); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Prunes the registered streams by discarding any stale ones. |
| * |
| * @return number of remaining streams |
| */ |
| public synchronized int pruneStaleStreams() { |
| for (MessageStream<M> stream : streams) { |
| if (stream.isStale()) { |
| stream.close(); |
| } |
| } |
| return streams.size(); |
| } |
| |
| } |