Thomas Vachuska | 24c849c | 2014-10-27 09:53:05 -0700 | [diff] [blame] | 1 | /* |
Thomas Vachuska | 4f1a60c | 2014-10-28 13:39:07 -0700 | [diff] [blame^] | 2 | * Copyright 2014 Open Networking Laboratory |
Thomas Vachuska | 24c849c | 2014-10-27 09:53:05 -0700 | [diff] [blame] | 3 | * |
Thomas Vachuska | 4f1a60c | 2014-10-28 13:39:07 -0700 | [diff] [blame^] | 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
Thomas Vachuska | 24c849c | 2014-10-27 09:53:05 -0700 | [diff] [blame] | 7 | * |
Thomas Vachuska | 4f1a60c | 2014-10-28 13:39:07 -0700 | [diff] [blame^] | 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
Thomas Vachuska | 24c849c | 2014-10-27 09:53:05 -0700 | [diff] [blame] | 15 | */ |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 16 | package org.onlab.nio; |
| 17 | |
| 18 | import java.io.IOException; |
| 19 | import java.nio.channels.ByteChannel; |
| 20 | import java.nio.channels.CancelledKeyException; |
| 21 | import java.nio.channels.ClosedChannelException; |
| 22 | import java.nio.channels.SelectableChannel; |
| 23 | import java.nio.channels.SelectionKey; |
| 24 | import java.nio.channels.SocketChannel; |
| 25 | import java.util.Iterator; |
| 26 | import java.util.List; |
| 27 | import java.util.Queue; |
| 28 | import java.util.Set; |
| 29 | import java.util.concurrent.ConcurrentLinkedQueue; |
| 30 | import java.util.concurrent.CopyOnWriteArraySet; |
| 31 | |
| 32 | /** |
| 33 | * I/O loop for driving inbound & outbound {@link Message} transfer via |
| 34 | * {@link MessageStream}. |
| 35 | * |
| 36 | * @param <M> message type |
| 37 | * @param <S> message stream type |
| 38 | */ |
| 39 | public abstract class IOLoop<M extends Message, S extends MessageStream<M>> |
| 40 | extends SelectorLoop { |
| 41 | |
| 42 | // Queue of requests for new message streams to enter the IO loop processing. |
| 43 | private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>(); |
| 44 | |
| 45 | // Carries information required for admitting a new message stream. |
| 46 | private class NewStreamRequest { |
| 47 | private final S stream; |
| 48 | private final SelectableChannel channel; |
| 49 | private final int op; |
| 50 | |
| 51 | public NewStreamRequest(S stream, SelectableChannel channel, int op) { |
| 52 | this.stream = stream; |
| 53 | this.channel = channel; |
| 54 | this.op = op; |
| 55 | } |
| 56 | } |
| 57 | |
| 58 | // Set of message streams currently admitted into the IO loop. |
| 59 | private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>(); |
| 60 | |
| 61 | /** |
| 62 | * Creates an IO loop with the given selection timeout. |
| 63 | * |
| 64 | * @param timeout selection timeout in milliseconds |
| 65 | * @throws IOException if the backing selector cannot be opened |
| 66 | */ |
| 67 | public IOLoop(long timeout) throws IOException { |
| 68 | super(timeout); |
| 69 | } |
| 70 | |
| 71 | /** |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 72 | * Returns the number of message stream in custody of the loop. |
| 73 | * |
| 74 | * @return number of message streams |
| 75 | */ |
| 76 | public int streamCount() { |
| 77 | return streams.size(); |
| 78 | } |
| 79 | |
| 80 | /** |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 81 | * Creates a new message stream backed by the specified socket channel. |
| 82 | * |
| 83 | * @param byteChannel backing byte channel |
| 84 | * @return newly created message stream |
| 85 | */ |
| 86 | protected abstract S createStream(ByteChannel byteChannel); |
| 87 | |
| 88 | /** |
| 89 | * Removes the specified message stream from the IO loop. |
| 90 | * |
| 91 | * @param stream message stream to remove |
| 92 | */ |
tom | f110fff | 2014-09-26 00:38:18 -0700 | [diff] [blame] | 93 | protected void removeStream(MessageStream<M> stream) { |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 94 | streams.remove(stream); |
| 95 | } |
| 96 | |
| 97 | /** |
| 98 | * Processes the list of messages extracted from the specified message |
| 99 | * stream. |
| 100 | * |
| 101 | * @param messages non-empty list of received messages |
| 102 | * @param stream message stream from which the messages were extracted |
| 103 | */ |
| 104 | protected abstract void processMessages(List<M> messages, MessageStream<M> stream); |
| 105 | |
| 106 | /** |
| 107 | * Completes connection request pending on the given selection key. |
| 108 | * |
| 109 | * @param key selection key holding the pending connect operation. |
| 110 | */ |
tom | 5a8779c | 2014-09-29 14:48:43 -0700 | [diff] [blame] | 111 | protected void connect(SelectionKey key) throws IOException { |
| 112 | SocketChannel ch = (SocketChannel) key.channel(); |
| 113 | ch.finishConnect(); |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 114 | if (key.isValid()) { |
| 115 | key.interestOps(SelectionKey.OP_READ); |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | /** |
| 120 | * Processes an IO operation pending on the specified key. |
| 121 | * |
| 122 | * @param key selection key holding the pending I/O operation. |
| 123 | */ |
| 124 | protected void processKeyOperation(SelectionKey key) { |
| 125 | @SuppressWarnings("unchecked") |
| 126 | S stream = (S) key.attachment(); |
| 127 | |
| 128 | try { |
| 129 | // If the key is not valid, bail out. |
| 130 | if (!key.isValid()) { |
| 131 | stream.close(); |
| 132 | return; |
| 133 | } |
| 134 | |
| 135 | // If there is a pending connect operation, complete it. |
| 136 | if (key.isConnectable()) { |
tom | 5a8779c | 2014-09-29 14:48:43 -0700 | [diff] [blame] | 137 | try { |
| 138 | connect(key); |
| 139 | } catch (IOException | IllegalStateException e) { |
| 140 | log.warn("Unable to complete connection", e); |
| 141 | } |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 142 | } |
| 143 | |
| 144 | // If there is a read operation, slurp as much data as possible. |
| 145 | if (key.isReadable()) { |
| 146 | List<M> messages = stream.read(); |
| 147 | |
| 148 | // No messages or failed flush imply disconnect; bail. |
| 149 | if (messages == null || stream.hadError()) { |
| 150 | stream.close(); |
| 151 | return; |
| 152 | } |
| 153 | |
| 154 | // If there were any messages read, process them. |
| 155 | if (!messages.isEmpty()) { |
| 156 | try { |
| 157 | processMessages(messages, stream); |
| 158 | } catch (RuntimeException e) { |
| 159 | onError(stream, e); |
| 160 | } |
| 161 | } |
| 162 | } |
| 163 | |
| 164 | // If there are pending writes, flush them |
| 165 | if (key.isWritable()) { |
| 166 | stream.flushIfPossible(); |
| 167 | } |
| 168 | |
| 169 | // If there were any issued flushing, close the stream. |
| 170 | if (stream.hadError()) { |
| 171 | stream.close(); |
| 172 | } |
| 173 | |
| 174 | } catch (CancelledKeyException e) { |
| 175 | // Key was cancelled, so silently close the stream |
| 176 | stream.close(); |
| 177 | } catch (IOException e) { |
| 178 | if (!stream.isClosed() && !isResetByPeer(e)) { |
| 179 | log.warn("Unable to process IO", e); |
| 180 | } |
| 181 | stream.close(); |
| 182 | } |
| 183 | } |
| 184 | |
| 185 | // Indicates whether or not this exception is caused by 'reset by peer'. |
| 186 | private boolean isResetByPeer(IOException e) { |
| 187 | Throwable cause = e.getCause(); |
| 188 | return cause != null && cause instanceof IOException && |
| 189 | cause.getMessage().contains("reset by peer"); |
| 190 | } |
| 191 | |
| 192 | /** |
| 193 | * Hook to allow intercept of any errors caused during message processing. |
| 194 | * Default behaviour is to rethrow the error. |
| 195 | * |
| 196 | * @param stream message stream involved in the error |
| 197 | * @param error the runtime exception |
| 198 | */ |
| 199 | protected void onError(S stream, RuntimeException error) { |
| 200 | throw error; |
| 201 | } |
| 202 | |
| 203 | /** |
| 204 | * Admits a new message stream backed by the specified socket channel |
| 205 | * with a pending accept operation. |
| 206 | * |
| 207 | * @param channel backing socket channel |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 208 | * @return newly accepted message stream |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 209 | */ |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 210 | public S acceptStream(SocketChannel channel) { |
| 211 | return createAndAdmit(channel, SelectionKey.OP_READ); |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 212 | } |
| 213 | |
| 214 | |
| 215 | /** |
| 216 | * Admits a new message stream backed by the specified socket channel |
| 217 | * with a pending connect operation. |
| 218 | * |
| 219 | * @param channel backing socket channel |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 220 | * @return newly connected message stream |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 221 | */ |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 222 | public S connectStream(SocketChannel channel) { |
| 223 | return createAndAdmit(channel, SelectionKey.OP_CONNECT); |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 224 | } |
| 225 | |
| 226 | /** |
| 227 | * Creates a new message stream backed by the specified socket channel |
| 228 | * and admits it into the IO loop. |
| 229 | * |
| 230 | * @param channel socket channel |
| 231 | * @param op pending operations mask to be applied to the selection |
| 232 | * key as a set of initial interestedOps |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 233 | * @return newly created message stream |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 234 | */ |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 235 | private synchronized S createAndAdmit(SocketChannel channel, int op) { |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 236 | S stream = createStream(channel); |
| 237 | streams.add(stream); |
| 238 | newStreamRequests.add(new NewStreamRequest(stream, channel, op)); |
| 239 | selector.wakeup(); |
tom | 9710fb4 | 2014-09-29 11:35:28 -0700 | [diff] [blame] | 240 | return stream; |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 241 | } |
| 242 | |
| 243 | /** |
| 244 | * Safely admits new streams into the IO loop. |
| 245 | */ |
| 246 | private void admitNewStreams() { |
| 247 | Iterator<NewStreamRequest> it = newStreamRequests.iterator(); |
| 248 | while (isRunning() && it.hasNext()) { |
| 249 | try { |
| 250 | NewStreamRequest request = it.next(); |
| 251 | it.remove(); |
| 252 | SelectionKey key = request.channel.register(selector, request.op, |
| 253 | request.stream); |
| 254 | request.stream.setKey(key); |
| 255 | } catch (ClosedChannelException e) { |
| 256 | log.warn("Unable to admit new message stream", e); |
| 257 | } |
| 258 | } |
| 259 | } |
| 260 | |
| 261 | @Override |
| 262 | protected void loop() throws IOException { |
| 263 | notifyReady(); |
| 264 | |
| 265 | // Keep going until told otherwise. |
| 266 | while (isRunning()) { |
| 267 | admitNewStreams(); |
| 268 | |
| 269 | // Process flushes & write selects on all streams |
| 270 | for (MessageStream<M> stream : streams) { |
| 271 | stream.flushIfWriteNotPending(); |
| 272 | } |
| 273 | |
| 274 | // Select keys and process them. |
| 275 | int count = selector.select(selectTimeout); |
| 276 | if (count > 0 && isRunning()) { |
| 277 | Iterator<SelectionKey> it = selector.selectedKeys().iterator(); |
| 278 | while (it.hasNext()) { |
| 279 | SelectionKey key = it.next(); |
| 280 | it.remove(); |
| 281 | processKeyOperation(key); |
| 282 | } |
| 283 | } |
| 284 | } |
| 285 | } |
| 286 | |
| 287 | /** |
| 288 | * Prunes the registered streams by discarding any stale ones. |
tom | f7e13b0 | 2014-09-26 11:12:25 -0700 | [diff] [blame] | 289 | * |
| 290 | * @return number of remaining streams |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 291 | */ |
tom | f7e13b0 | 2014-09-26 11:12:25 -0700 | [diff] [blame] | 292 | public synchronized int pruneStaleStreams() { |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 293 | for (MessageStream<M> stream : streams) { |
| 294 | if (stream.isStale()) { |
| 295 | stream.close(); |
| 296 | } |
| 297 | } |
tom | f7e13b0 | 2014-09-26 11:12:25 -0700 | [diff] [blame] | 298 | return streams.size(); |
tom | a708318 | 2014-09-25 21:38:03 -0700 | [diff] [blame] | 299 | } |
| 300 | |
| 301 | } |