blob: 106df7b279303ea624b072391fc9dfb079e5091c [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07002 * Copyright 2014 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
toma7083182014-09-25 21:38:03 -070016package org.onlab.nio;
17
18import java.io.IOException;
19import java.nio.channels.ByteChannel;
20import java.nio.channels.CancelledKeyException;
21import java.nio.channels.ClosedChannelException;
22import java.nio.channels.SelectableChannel;
23import java.nio.channels.SelectionKey;
24import java.nio.channels.SocketChannel;
25import java.util.Iterator;
26import java.util.List;
27import java.util.Queue;
28import java.util.Set;
29import java.util.concurrent.ConcurrentLinkedQueue;
30import java.util.concurrent.CopyOnWriteArraySet;
31
32/**
33 * I/O loop for driving inbound & outbound {@link Message} transfer via
34 * {@link MessageStream}.
35 *
36 * @param <M> message type
37 * @param <S> message stream type
38 */
39public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
40 extends SelectorLoop {
41
42 // Queue of requests for new message streams to enter the IO loop processing.
43 private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>();
44
45 // Carries information required for admitting a new message stream.
46 private class NewStreamRequest {
47 private final S stream;
48 private final SelectableChannel channel;
49 private final int op;
50
51 public NewStreamRequest(S stream, SelectableChannel channel, int op) {
52 this.stream = stream;
53 this.channel = channel;
54 this.op = op;
55 }
56 }
57
58 // Set of message streams currently admitted into the IO loop.
59 private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>();
60
61 /**
62 * Creates an IO loop with the given selection timeout.
63 *
64 * @param timeout selection timeout in milliseconds
65 * @throws IOException if the backing selector cannot be opened
66 */
67 public IOLoop(long timeout) throws IOException {
68 super(timeout);
69 }
70
71 /**
tom9710fb42014-09-29 11:35:28 -070072 * Returns the number of message stream in custody of the loop.
73 *
74 * @return number of message streams
75 */
76 public int streamCount() {
77 return streams.size();
78 }
79
80 /**
toma7083182014-09-25 21:38:03 -070081 * Creates a new message stream backed by the specified socket channel.
82 *
83 * @param byteChannel backing byte channel
84 * @return newly created message stream
85 */
86 protected abstract S createStream(ByteChannel byteChannel);
87
88 /**
89 * Removes the specified message stream from the IO loop.
90 *
91 * @param stream message stream to remove
92 */
tomf110fff2014-09-26 00:38:18 -070093 protected void removeStream(MessageStream<M> stream) {
toma7083182014-09-25 21:38:03 -070094 streams.remove(stream);
95 }
96
97 /**
98 * Processes the list of messages extracted from the specified message
99 * stream.
100 *
101 * @param messages non-empty list of received messages
102 * @param stream message stream from which the messages were extracted
103 */
104 protected abstract void processMessages(List<M> messages, MessageStream<M> stream);
105
106 /**
107 * Completes connection request pending on the given selection key.
108 *
109 * @param key selection key holding the pending connect operation.
Yuta HIGUCHI5c947272014-11-03 21:39:21 -0800110 * @throws IOException when I/O exception of some sort has occurred
toma7083182014-09-25 21:38:03 -0700111 */
tom5a8779c2014-09-29 14:48:43 -0700112 protected void connect(SelectionKey key) throws IOException {
113 SocketChannel ch = (SocketChannel) key.channel();
114 ch.finishConnect();
toma7083182014-09-25 21:38:03 -0700115 if (key.isValid()) {
116 key.interestOps(SelectionKey.OP_READ);
117 }
118 }
119
120 /**
121 * Processes an IO operation pending on the specified key.
122 *
123 * @param key selection key holding the pending I/O operation.
124 */
125 protected void processKeyOperation(SelectionKey key) {
126 @SuppressWarnings("unchecked")
127 S stream = (S) key.attachment();
128
129 try {
130 // If the key is not valid, bail out.
131 if (!key.isValid()) {
132 stream.close();
133 return;
134 }
135
136 // If there is a pending connect operation, complete it.
137 if (key.isConnectable()) {
tom5a8779c2014-09-29 14:48:43 -0700138 try {
139 connect(key);
140 } catch (IOException | IllegalStateException e) {
141 log.warn("Unable to complete connection", e);
142 }
toma7083182014-09-25 21:38:03 -0700143 }
144
145 // If there is a read operation, slurp as much data as possible.
146 if (key.isReadable()) {
147 List<M> messages = stream.read();
148
149 // No messages or failed flush imply disconnect; bail.
150 if (messages == null || stream.hadError()) {
151 stream.close();
152 return;
153 }
154
155 // If there were any messages read, process them.
156 if (!messages.isEmpty()) {
157 try {
158 processMessages(messages, stream);
159 } catch (RuntimeException e) {
160 onError(stream, e);
161 }
162 }
163 }
164
165 // If there are pending writes, flush them
166 if (key.isWritable()) {
167 stream.flushIfPossible();
168 }
169
170 // If there were any issued flushing, close the stream.
171 if (stream.hadError()) {
172 stream.close();
173 }
174
175 } catch (CancelledKeyException e) {
176 // Key was cancelled, so silently close the stream
177 stream.close();
178 } catch (IOException e) {
179 if (!stream.isClosed() && !isResetByPeer(e)) {
180 log.warn("Unable to process IO", e);
181 }
182 stream.close();
183 }
184 }
185
186 // Indicates whether or not this exception is caused by 'reset by peer'.
187 private boolean isResetByPeer(IOException e) {
188 Throwable cause = e.getCause();
189 return cause != null && cause instanceof IOException &&
190 cause.getMessage().contains("reset by peer");
191 }
192
193 /**
194 * Hook to allow intercept of any errors caused during message processing.
195 * Default behaviour is to rethrow the error.
196 *
197 * @param stream message stream involved in the error
198 * @param error the runtime exception
199 */
200 protected void onError(S stream, RuntimeException error) {
201 throw error;
202 }
203
204 /**
205 * Admits a new message stream backed by the specified socket channel
206 * with a pending accept operation.
207 *
208 * @param channel backing socket channel
tom9710fb42014-09-29 11:35:28 -0700209 * @return newly accepted message stream
toma7083182014-09-25 21:38:03 -0700210 */
tom9710fb42014-09-29 11:35:28 -0700211 public S acceptStream(SocketChannel channel) {
212 return createAndAdmit(channel, SelectionKey.OP_READ);
toma7083182014-09-25 21:38:03 -0700213 }
214
215
216 /**
217 * Admits a new message stream backed by the specified socket channel
218 * with a pending connect operation.
219 *
220 * @param channel backing socket channel
tom9710fb42014-09-29 11:35:28 -0700221 * @return newly connected message stream
toma7083182014-09-25 21:38:03 -0700222 */
tom9710fb42014-09-29 11:35:28 -0700223 public S connectStream(SocketChannel channel) {
224 return createAndAdmit(channel, SelectionKey.OP_CONNECT);
toma7083182014-09-25 21:38:03 -0700225 }
226
227 /**
228 * Creates a new message stream backed by the specified socket channel
229 * and admits it into the IO loop.
230 *
231 * @param channel socket channel
232 * @param op pending operations mask to be applied to the selection
233 * key as a set of initial interestedOps
tom9710fb42014-09-29 11:35:28 -0700234 * @return newly created message stream
toma7083182014-09-25 21:38:03 -0700235 */
tom9710fb42014-09-29 11:35:28 -0700236 private synchronized S createAndAdmit(SocketChannel channel, int op) {
toma7083182014-09-25 21:38:03 -0700237 S stream = createStream(channel);
238 streams.add(stream);
239 newStreamRequests.add(new NewStreamRequest(stream, channel, op));
240 selector.wakeup();
tom9710fb42014-09-29 11:35:28 -0700241 return stream;
toma7083182014-09-25 21:38:03 -0700242 }
243
244 /**
245 * Safely admits new streams into the IO loop.
246 */
247 private void admitNewStreams() {
248 Iterator<NewStreamRequest> it = newStreamRequests.iterator();
249 while (isRunning() && it.hasNext()) {
250 try {
251 NewStreamRequest request = it.next();
252 it.remove();
253 SelectionKey key = request.channel.register(selector, request.op,
254 request.stream);
255 request.stream.setKey(key);
256 } catch (ClosedChannelException e) {
257 log.warn("Unable to admit new message stream", e);
258 }
259 }
260 }
261
262 @Override
263 protected void loop() throws IOException {
264 notifyReady();
265
266 // Keep going until told otherwise.
267 while (isRunning()) {
268 admitNewStreams();
269
270 // Process flushes & write selects on all streams
271 for (MessageStream<M> stream : streams) {
272 stream.flushIfWriteNotPending();
273 }
274
275 // Select keys and process them.
276 int count = selector.select(selectTimeout);
277 if (count > 0 && isRunning()) {
278 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
279 while (it.hasNext()) {
280 SelectionKey key = it.next();
281 it.remove();
282 processKeyOperation(key);
283 }
284 }
285 }
286 }
287
288 /**
289 * Prunes the registered streams by discarding any stale ones.
tomf7e13b02014-09-26 11:12:25 -0700290 *
291 * @return number of remaining streams
toma7083182014-09-25 21:38:03 -0700292 */
tomf7e13b02014-09-26 11:12:25 -0700293 public synchronized int pruneStaleStreams() {
toma7083182014-09-25 21:38:03 -0700294 for (MessageStream<M> stream : streams) {
295 if (stream.isStale()) {
296 stream.close();
297 }
298 }
tomf7e13b02014-09-26 11:12:25 -0700299 return streams.size();
toma7083182014-09-25 21:38:03 -0700300 }
301
302}