tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 1 | package org.onlab.nio; |
| 2 | |
| 3 | import java.io.IOException; |
| 4 | import java.nio.channels.ByteChannel; |
| 5 | import java.nio.channels.CancelledKeyException; |
| 6 | import java.nio.channels.ClosedChannelException; |
| 7 | import java.nio.channels.SelectableChannel; |
| 8 | import java.nio.channels.SelectionKey; |
| 9 | import java.nio.channels.SocketChannel; |
| 10 | import java.util.Iterator; |
| 11 | import java.util.List; |
| 12 | import java.util.Queue; |
| 13 | import java.util.Set; |
| 14 | import java.util.concurrent.ConcurrentLinkedQueue; |
| 15 | import java.util.concurrent.CopyOnWriteArraySet; |
| 16 | |
| 17 | /** |
| 18 | * I/O loop for driving inbound & outbound {@link Message} transfer via |
| 19 | * {@link MessageStream}. |
| 20 | * |
| 21 | * @param <M> message type |
| 22 | * @param <S> message stream type |
| 23 | */ |
| 24 | public abstract class IOLoop<M extends Message, S extends MessageStream<M>> |
| 25 | extends SelectorLoop { |
| 26 | |
| 27 | // Queue of requests for new message streams to enter the IO loop processing. |
| 28 | private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>(); |
| 29 | |
| 30 | // Carries information required for admitting a new message stream. |
| 31 | private class NewStreamRequest { |
| 32 | private final S stream; |
| 33 | private final SelectableChannel channel; |
| 34 | private final int op; |
| 35 | |
| 36 | public NewStreamRequest(S stream, SelectableChannel channel, int op) { |
| 37 | this.stream = stream; |
| 38 | this.channel = channel; |
| 39 | this.op = op; |
| 40 | } |
| 41 | } |
| 42 | |
| 43 | // Set of message streams currently admitted into the IO loop. |
| 44 | private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>(); |
| 45 | |
| 46 | /** |
| 47 | * Creates an IO loop with the given selection timeout. |
| 48 | * |
| 49 | * @param timeout selection timeout in milliseconds |
| 50 | * @throws IOException if the backing selector cannot be opened |
| 51 | */ |
| 52 | public IOLoop(long timeout) throws IOException { |
| 53 | super(timeout); |
| 54 | } |
| 55 | |
| 56 | /** |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 57 | * Returns the number of message stream in custody of the loop. |
| 58 | * |
| 59 | * @return number of message streams |
| 60 | */ |
| 61 | public int streamCount() { |
| 62 | return streams.size(); |
| 63 | } |
| 64 | |
| 65 | /** |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 66 | * Creates a new message stream backed by the specified socket channel. |
| 67 | * |
| 68 | * @param byteChannel backing byte channel |
| 69 | * @return newly created message stream |
| 70 | */ |
| 71 | protected abstract S createStream(ByteChannel byteChannel); |
| 72 | |
| 73 | /** |
| 74 | * Removes the specified message stream from the IO loop. |
| 75 | * |
| 76 | * @param stream message stream to remove |
| 77 | */ |
tom | f110fff | 2014-09-26 00:38:18 -0700 | [diff] [blame] | 78 | protected void removeStream(MessageStream<M> stream) { |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 79 | streams.remove(stream); |
| 80 | } |
| 81 | |
| 82 | /** |
| 83 | * Processes the list of messages extracted from the specified message |
| 84 | * stream. |
| 85 | * |
| 86 | * @param messages non-empty list of received messages |
| 87 | * @param stream message stream from which the messages were extracted |
| 88 | */ |
| 89 | protected abstract void processMessages(List<M> messages, MessageStream<M> stream); |
| 90 | |
| 91 | /** |
| 92 | * Completes connection request pending on the given selection key. |
| 93 | * |
| 94 | * @param key selection key holding the pending connect operation. |
| 95 | */ |
| 96 | protected void connect(SelectionKey key) { |
| 97 | try { |
| 98 | SocketChannel ch = (SocketChannel) key.channel(); |
| 99 | ch.finishConnect(); |
| 100 | } catch (IOException | IllegalStateException e) { |
| 101 | log.warn("Unable to complete connection", e); |
| 102 | } |
| 103 | |
| 104 | if (key.isValid()) { |
| 105 | key.interestOps(SelectionKey.OP_READ); |
| 106 | } |
| 107 | } |
| 108 | |
| 109 | /** |
| 110 | * Processes an IO operation pending on the specified key. |
| 111 | * |
| 112 | * @param key selection key holding the pending I/O operation. |
| 113 | */ |
| 114 | protected void processKeyOperation(SelectionKey key) { |
| 115 | @SuppressWarnings("unchecked") |
| 116 | S stream = (S) key.attachment(); |
| 117 | |
| 118 | try { |
| 119 | // If the key is not valid, bail out. |
| 120 | if (!key.isValid()) { |
| 121 | stream.close(); |
| 122 | return; |
| 123 | } |
| 124 | |
| 125 | // If there is a pending connect operation, complete it. |
| 126 | if (key.isConnectable()) { |
| 127 | connect(key); |
| 128 | } |
| 129 | |
| 130 | // If there is a read operation, slurp as much data as possible. |
| 131 | if (key.isReadable()) { |
| 132 | List<M> messages = stream.read(); |
| 133 | |
| 134 | // No messages or failed flush imply disconnect; bail. |
| 135 | if (messages == null || stream.hadError()) { |
| 136 | stream.close(); |
| 137 | return; |
| 138 | } |
| 139 | |
| 140 | // If there were any messages read, process them. |
| 141 | if (!messages.isEmpty()) { |
| 142 | try { |
| 143 | processMessages(messages, stream); |
| 144 | } catch (RuntimeException e) { |
| 145 | onError(stream, e); |
| 146 | } |
| 147 | } |
| 148 | } |
| 149 | |
| 150 | // If there are pending writes, flush them |
| 151 | if (key.isWritable()) { |
| 152 | stream.flushIfPossible(); |
| 153 | } |
| 154 | |
| 155 | // If there were any issued flushing, close the stream. |
| 156 | if (stream.hadError()) { |
| 157 | stream.close(); |
| 158 | } |
| 159 | |
| 160 | } catch (CancelledKeyException e) { |
| 161 | // Key was cancelled, so silently close the stream |
| 162 | stream.close(); |
| 163 | } catch (IOException e) { |
| 164 | if (!stream.isClosed() && !isResetByPeer(e)) { |
| 165 | log.warn("Unable to process IO", e); |
| 166 | } |
| 167 | stream.close(); |
| 168 | } |
| 169 | } |
| 170 | |
| 171 | // Indicates whether or not this exception is caused by 'reset by peer'. |
| 172 | private boolean isResetByPeer(IOException e) { |
| 173 | Throwable cause = e.getCause(); |
| 174 | return cause != null && cause instanceof IOException && |
| 175 | cause.getMessage().contains("reset by peer"); |
| 176 | } |
| 177 | |
| 178 | /** |
| 179 | * Hook to allow intercept of any errors caused during message processing. |
| 180 | * Default behaviour is to rethrow the error. |
| 181 | * |
| 182 | * @param stream message stream involved in the error |
| 183 | * @param error the runtime exception |
| 184 | */ |
| 185 | protected void onError(S stream, RuntimeException error) { |
| 186 | throw error; |
| 187 | } |
| 188 | |
| 189 | /** |
| 190 | * Admits a new message stream backed by the specified socket channel |
| 191 | * with a pending accept operation. |
| 192 | * |
| 193 | * @param channel backing socket channel |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 194 | * @return newly accepted message stream |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 195 | */ |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 196 | public S acceptStream(SocketChannel channel) { |
| 197 | return createAndAdmit(channel, SelectionKey.OP_READ); |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 198 | } |
| 199 | |
| 200 | |
| 201 | /** |
| 202 | * Admits a new message stream backed by the specified socket channel |
| 203 | * with a pending connect operation. |
| 204 | * |
| 205 | * @param channel backing socket channel |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 206 | * @return newly connected message stream |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 207 | */ |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 208 | public S connectStream(SocketChannel channel) { |
| 209 | return createAndAdmit(channel, SelectionKey.OP_CONNECT); |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 210 | } |
| 211 | |
| 212 | /** |
| 213 | * Creates a new message stream backed by the specified socket channel |
| 214 | * and admits it into the IO loop. |
| 215 | * |
| 216 | * @param channel socket channel |
| 217 | * @param op pending operations mask to be applied to the selection |
| 218 | * key as a set of initial interestedOps |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 219 | * @return newly created message stream |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 220 | */ |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 221 | private synchronized S createAndAdmit(SocketChannel channel, int op) { |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 222 | S stream = createStream(channel); |
| 223 | streams.add(stream); |
| 224 | newStreamRequests.add(new NewStreamRequest(stream, channel, op)); |
| 225 | selector.wakeup(); |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 226 | return stream; |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 227 | } |
| 228 | |
| 229 | /** |
| 230 | * Safely admits new streams into the IO loop. |
| 231 | */ |
| 232 | private void admitNewStreams() { |
| 233 | Iterator<NewStreamRequest> it = newStreamRequests.iterator(); |
| 234 | while (isRunning() && it.hasNext()) { |
| 235 | try { |
| 236 | NewStreamRequest request = it.next(); |
| 237 | it.remove(); |
| 238 | SelectionKey key = request.channel.register(selector, request.op, |
| 239 | request.stream); |
| 240 | request.stream.setKey(key); |
| 241 | } catch (ClosedChannelException e) { |
| 242 | log.warn("Unable to admit new message stream", e); |
| 243 | } |
| 244 | } |
| 245 | } |
| 246 | |
| 247 | @Override |
| 248 | protected void loop() throws IOException { |
| 249 | notifyReady(); |
| 250 | |
| 251 | // Keep going until told otherwise. |
| 252 | while (isRunning()) { |
| 253 | admitNewStreams(); |
| 254 | |
| 255 | // Process flushes & write selects on all streams |
| 256 | for (MessageStream<M> stream : streams) { |
| 257 | stream.flushIfWriteNotPending(); |
| 258 | } |
| 259 | |
| 260 | // Select keys and process them. |
| 261 | int count = selector.select(selectTimeout); |
| 262 | if (count > 0 && isRunning()) { |
| 263 | Iterator<SelectionKey> it = selector.selectedKeys().iterator(); |
| 264 | while (it.hasNext()) { |
| 265 | SelectionKey key = it.next(); |
| 266 | it.remove(); |
| 267 | processKeyOperation(key); |
| 268 | } |
| 269 | } |
| 270 | } |
| 271 | } |
| 272 | |
| 273 | /** |
| 274 | * Prunes the registered streams by discarding any stale ones. |
tom | f7e13b0 | 2014-09-26 11:12:25 -0700 | [diff] [blame] | 275 | * |
| 276 | * @return number of remaining streams |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 277 | */ |
tom | f7e13b0 | 2014-09-26 11:12:25 -0700 | [diff] [blame] | 278 | public synchronized int pruneStaleStreams() { |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 279 | for (MessageStream<M> stream : streams) { |
| 280 | if (stream.isStale()) { |
| 281 | stream.close(); |
| 282 | } |
| 283 | } |
tom | f7e13b0 | 2014-09-26 11:12:25 -0700 | [diff] [blame] | 284 | return streams.size(); |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 285 | } |
| 286 | |
| 287 | } |