blob: 35a8b7c3227fd81f35acb5a8752ca372e8f7b79c [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
toma7083182014-09-25 21:38:03 -070019package org.onlab.nio;
20
21import java.io.IOException;
22import java.nio.channels.ByteChannel;
23import java.nio.channels.CancelledKeyException;
24import java.nio.channels.ClosedChannelException;
25import java.nio.channels.SelectableChannel;
26import java.nio.channels.SelectionKey;
27import java.nio.channels.SocketChannel;
28import java.util.Iterator;
29import java.util.List;
30import java.util.Queue;
31import java.util.Set;
32import java.util.concurrent.ConcurrentLinkedQueue;
33import java.util.concurrent.CopyOnWriteArraySet;
34
35/**
36 * I/O loop for driving inbound & outbound {@link Message} transfer via
37 * {@link MessageStream}.
38 *
39 * @param <M> message type
40 * @param <S> message stream type
41 */
42public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
43 extends SelectorLoop {
44
45 // Queue of requests for new message streams to enter the IO loop processing.
46 private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>();
47
48 // Carries information required for admitting a new message stream.
49 private class NewStreamRequest {
50 private final S stream;
51 private final SelectableChannel channel;
52 private final int op;
53
54 public NewStreamRequest(S stream, SelectableChannel channel, int op) {
55 this.stream = stream;
56 this.channel = channel;
57 this.op = op;
58 }
59 }
60
61 // Set of message streams currently admitted into the IO loop.
62 private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>();
63
64 /**
65 * Creates an IO loop with the given selection timeout.
66 *
67 * @param timeout selection timeout in milliseconds
68 * @throws IOException if the backing selector cannot be opened
69 */
70 public IOLoop(long timeout) throws IOException {
71 super(timeout);
72 }
73
74 /**
tom9710fb42014-09-29 11:35:28 -070075 * Returns the number of message stream in custody of the loop.
76 *
77 * @return number of message streams
78 */
79 public int streamCount() {
80 return streams.size();
81 }
82
83 /**
toma7083182014-09-25 21:38:03 -070084 * Creates a new message stream backed by the specified socket channel.
85 *
86 * @param byteChannel backing byte channel
87 * @return newly created message stream
88 */
89 protected abstract S createStream(ByteChannel byteChannel);
90
91 /**
92 * Removes the specified message stream from the IO loop.
93 *
94 * @param stream message stream to remove
95 */
tomf110fff2014-09-26 00:38:18 -070096 protected void removeStream(MessageStream<M> stream) {
toma7083182014-09-25 21:38:03 -070097 streams.remove(stream);
98 }
99
100 /**
101 * Processes the list of messages extracted from the specified message
102 * stream.
103 *
104 * @param messages non-empty list of received messages
105 * @param stream message stream from which the messages were extracted
106 */
107 protected abstract void processMessages(List<M> messages, MessageStream<M> stream);
108
109 /**
110 * Completes connection request pending on the given selection key.
111 *
112 * @param key selection key holding the pending connect operation.
113 */
tom5a8779c2014-09-29 14:48:43 -0700114 protected void connect(SelectionKey key) throws IOException {
115 SocketChannel ch = (SocketChannel) key.channel();
116 ch.finishConnect();
toma7083182014-09-25 21:38:03 -0700117 if (key.isValid()) {
118 key.interestOps(SelectionKey.OP_READ);
119 }
120 }
121
122 /**
123 * Processes an IO operation pending on the specified key.
124 *
125 * @param key selection key holding the pending I/O operation.
126 */
127 protected void processKeyOperation(SelectionKey key) {
128 @SuppressWarnings("unchecked")
129 S stream = (S) key.attachment();
130
131 try {
132 // If the key is not valid, bail out.
133 if (!key.isValid()) {
134 stream.close();
135 return;
136 }
137
138 // If there is a pending connect operation, complete it.
139 if (key.isConnectable()) {
tom5a8779c2014-09-29 14:48:43 -0700140 try {
141 connect(key);
142 } catch (IOException | IllegalStateException e) {
143 log.warn("Unable to complete connection", e);
144 }
toma7083182014-09-25 21:38:03 -0700145 }
146
147 // If there is a read operation, slurp as much data as possible.
148 if (key.isReadable()) {
149 List<M> messages = stream.read();
150
151 // No messages or failed flush imply disconnect; bail.
152 if (messages == null || stream.hadError()) {
153 stream.close();
154 return;
155 }
156
157 // If there were any messages read, process them.
158 if (!messages.isEmpty()) {
159 try {
160 processMessages(messages, stream);
161 } catch (RuntimeException e) {
162 onError(stream, e);
163 }
164 }
165 }
166
167 // If there are pending writes, flush them
168 if (key.isWritable()) {
169 stream.flushIfPossible();
170 }
171
172 // If there were any issued flushing, close the stream.
173 if (stream.hadError()) {
174 stream.close();
175 }
176
177 } catch (CancelledKeyException e) {
178 // Key was cancelled, so silently close the stream
179 stream.close();
180 } catch (IOException e) {
181 if (!stream.isClosed() && !isResetByPeer(e)) {
182 log.warn("Unable to process IO", e);
183 }
184 stream.close();
185 }
186 }
187
188 // Indicates whether or not this exception is caused by 'reset by peer'.
189 private boolean isResetByPeer(IOException e) {
190 Throwable cause = e.getCause();
191 return cause != null && cause instanceof IOException &&
192 cause.getMessage().contains("reset by peer");
193 }
194
195 /**
196 * Hook to allow intercept of any errors caused during message processing.
197 * Default behaviour is to rethrow the error.
198 *
199 * @param stream message stream involved in the error
200 * @param error the runtime exception
201 */
202 protected void onError(S stream, RuntimeException error) {
203 throw error;
204 }
205
206 /**
207 * Admits a new message stream backed by the specified socket channel
208 * with a pending accept operation.
209 *
210 * @param channel backing socket channel
tom9710fb42014-09-29 11:35:28 -0700211 * @return newly accepted message stream
toma7083182014-09-25 21:38:03 -0700212 */
tom9710fb42014-09-29 11:35:28 -0700213 public S acceptStream(SocketChannel channel) {
214 return createAndAdmit(channel, SelectionKey.OP_READ);
toma7083182014-09-25 21:38:03 -0700215 }
216
217
218 /**
219 * Admits a new message stream backed by the specified socket channel
220 * with a pending connect operation.
221 *
222 * @param channel backing socket channel
tom9710fb42014-09-29 11:35:28 -0700223 * @return newly connected message stream
toma7083182014-09-25 21:38:03 -0700224 */
tom9710fb42014-09-29 11:35:28 -0700225 public S connectStream(SocketChannel channel) {
226 return createAndAdmit(channel, SelectionKey.OP_CONNECT);
toma7083182014-09-25 21:38:03 -0700227 }
228
229 /**
230 * Creates a new message stream backed by the specified socket channel
231 * and admits it into the IO loop.
232 *
233 * @param channel socket channel
234 * @param op pending operations mask to be applied to the selection
235 * key as a set of initial interestedOps
tom9710fb42014-09-29 11:35:28 -0700236 * @return newly created message stream
toma7083182014-09-25 21:38:03 -0700237 */
tom9710fb42014-09-29 11:35:28 -0700238 private synchronized S createAndAdmit(SocketChannel channel, int op) {
toma7083182014-09-25 21:38:03 -0700239 S stream = createStream(channel);
240 streams.add(stream);
241 newStreamRequests.add(new NewStreamRequest(stream, channel, op));
242 selector.wakeup();
tom9710fb42014-09-29 11:35:28 -0700243 return stream;
toma7083182014-09-25 21:38:03 -0700244 }
245
246 /**
247 * Safely admits new streams into the IO loop.
248 */
249 private void admitNewStreams() {
250 Iterator<NewStreamRequest> it = newStreamRequests.iterator();
251 while (isRunning() && it.hasNext()) {
252 try {
253 NewStreamRequest request = it.next();
254 it.remove();
255 SelectionKey key = request.channel.register(selector, request.op,
256 request.stream);
257 request.stream.setKey(key);
258 } catch (ClosedChannelException e) {
259 log.warn("Unable to admit new message stream", e);
260 }
261 }
262 }
263
264 @Override
265 protected void loop() throws IOException {
266 notifyReady();
267
268 // Keep going until told otherwise.
269 while (isRunning()) {
270 admitNewStreams();
271
272 // Process flushes & write selects on all streams
273 for (MessageStream<M> stream : streams) {
274 stream.flushIfWriteNotPending();
275 }
276
277 // Select keys and process them.
278 int count = selector.select(selectTimeout);
279 if (count > 0 && isRunning()) {
280 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
281 while (it.hasNext()) {
282 SelectionKey key = it.next();
283 it.remove();
284 processKeyOperation(key);
285 }
286 }
287 }
288 }
289
290 /**
291 * Prunes the registered streams by discarding any stale ones.
tomf7e13b02014-09-26 11:12:25 -0700292 *
293 * @return number of remaining streams
toma7083182014-09-25 21:38:03 -0700294 */
tomf7e13b02014-09-26 11:12:25 -0700295 public synchronized int pruneStaleStreams() {
toma7083182014-09-25 21:38:03 -0700296 for (MessageStream<M> stream : streams) {
297 if (stream.isStale()) {
298 stream.close();
299 }
300 }
tomf7e13b02014-09-26 11:12:25 -0700301 return streams.size();
toma7083182014-09-25 21:38:03 -0700302 }
303
304}