blob: 9d417ac6d9db08bc287fea3b4a385f78d699b6b1 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07002 * Copyright 2014 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
toma7083182014-09-25 21:38:03 -070016package org.onlab.nio;
17
18import java.io.IOException;
19import java.nio.channels.ByteChannel;
20import java.nio.channels.CancelledKeyException;
21import java.nio.channels.ClosedChannelException;
22import java.nio.channels.SelectableChannel;
23import java.nio.channels.SelectionKey;
24import java.nio.channels.SocketChannel;
25import java.util.Iterator;
26import java.util.List;
27import java.util.Queue;
28import java.util.Set;
29import java.util.concurrent.ConcurrentLinkedQueue;
30import java.util.concurrent.CopyOnWriteArraySet;
31
32/**
33 * I/O loop for driving inbound & outbound {@link Message} transfer via
34 * {@link MessageStream}.
35 *
36 * @param <M> message type
37 * @param <S> message stream type
38 */
39public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
40 extends SelectorLoop {
41
42 // Queue of requests for new message streams to enter the IO loop processing.
43 private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>();
44
45 // Carries information required for admitting a new message stream.
46 private class NewStreamRequest {
47 private final S stream;
48 private final SelectableChannel channel;
49 private final int op;
50
51 public NewStreamRequest(S stream, SelectableChannel channel, int op) {
52 this.stream = stream;
53 this.channel = channel;
54 this.op = op;
55 }
56 }
57
58 // Set of message streams currently admitted into the IO loop.
59 private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>();
60
61 /**
62 * Creates an IO loop with the given selection timeout.
63 *
64 * @param timeout selection timeout in milliseconds
65 * @throws IOException if the backing selector cannot be opened
66 */
67 public IOLoop(long timeout) throws IOException {
68 super(timeout);
69 }
70
71 /**
tom9710fb42014-09-29 11:35:28 -070072 * Returns the number of message stream in custody of the loop.
73 *
74 * @return number of message streams
75 */
76 public int streamCount() {
77 return streams.size();
78 }
79
80 /**
toma7083182014-09-25 21:38:03 -070081 * Creates a new message stream backed by the specified socket channel.
82 *
83 * @param byteChannel backing byte channel
84 * @return newly created message stream
85 */
86 protected abstract S createStream(ByteChannel byteChannel);
87
88 /**
89 * Removes the specified message stream from the IO loop.
90 *
91 * @param stream message stream to remove
92 */
tomf110fff2014-09-26 00:38:18 -070093 protected void removeStream(MessageStream<M> stream) {
toma7083182014-09-25 21:38:03 -070094 streams.remove(stream);
95 }
96
97 /**
98 * Processes the list of messages extracted from the specified message
99 * stream.
100 *
101 * @param messages non-empty list of received messages
102 * @param stream message stream from which the messages were extracted
103 */
104 protected abstract void processMessages(List<M> messages, MessageStream<M> stream);
105
106 /**
107 * Completes connection request pending on the given selection key.
108 *
109 * @param key selection key holding the pending connect operation.
110 */
tom5a8779c2014-09-29 14:48:43 -0700111 protected void connect(SelectionKey key) throws IOException {
112 SocketChannel ch = (SocketChannel) key.channel();
113 ch.finishConnect();
toma7083182014-09-25 21:38:03 -0700114 if (key.isValid()) {
115 key.interestOps(SelectionKey.OP_READ);
116 }
117 }
118
119 /**
120 * Processes an IO operation pending on the specified key.
121 *
122 * @param key selection key holding the pending I/O operation.
123 */
124 protected void processKeyOperation(SelectionKey key) {
125 @SuppressWarnings("unchecked")
126 S stream = (S) key.attachment();
127
128 try {
129 // If the key is not valid, bail out.
130 if (!key.isValid()) {
131 stream.close();
132 return;
133 }
134
135 // If there is a pending connect operation, complete it.
136 if (key.isConnectable()) {
tom5a8779c2014-09-29 14:48:43 -0700137 try {
138 connect(key);
139 } catch (IOException | IllegalStateException e) {
140 log.warn("Unable to complete connection", e);
141 }
toma7083182014-09-25 21:38:03 -0700142 }
143
144 // If there is a read operation, slurp as much data as possible.
145 if (key.isReadable()) {
146 List<M> messages = stream.read();
147
148 // No messages or failed flush imply disconnect; bail.
149 if (messages == null || stream.hadError()) {
150 stream.close();
151 return;
152 }
153
154 // If there were any messages read, process them.
155 if (!messages.isEmpty()) {
156 try {
157 processMessages(messages, stream);
158 } catch (RuntimeException e) {
159 onError(stream, e);
160 }
161 }
162 }
163
164 // If there are pending writes, flush them
165 if (key.isWritable()) {
166 stream.flushIfPossible();
167 }
168
169 // If there were any issued flushing, close the stream.
170 if (stream.hadError()) {
171 stream.close();
172 }
173
174 } catch (CancelledKeyException e) {
175 // Key was cancelled, so silently close the stream
176 stream.close();
177 } catch (IOException e) {
178 if (!stream.isClosed() && !isResetByPeer(e)) {
179 log.warn("Unable to process IO", e);
180 }
181 stream.close();
182 }
183 }
184
185 // Indicates whether or not this exception is caused by 'reset by peer'.
186 private boolean isResetByPeer(IOException e) {
187 Throwable cause = e.getCause();
188 return cause != null && cause instanceof IOException &&
189 cause.getMessage().contains("reset by peer");
190 }
191
192 /**
193 * Hook to allow intercept of any errors caused during message processing.
194 * Default behaviour is to rethrow the error.
195 *
196 * @param stream message stream involved in the error
197 * @param error the runtime exception
198 */
199 protected void onError(S stream, RuntimeException error) {
200 throw error;
201 }
202
203 /**
204 * Admits a new message stream backed by the specified socket channel
205 * with a pending accept operation.
206 *
207 * @param channel backing socket channel
tom9710fb42014-09-29 11:35:28 -0700208 * @return newly accepted message stream
toma7083182014-09-25 21:38:03 -0700209 */
tom9710fb42014-09-29 11:35:28 -0700210 public S acceptStream(SocketChannel channel) {
211 return createAndAdmit(channel, SelectionKey.OP_READ);
toma7083182014-09-25 21:38:03 -0700212 }
213
214
215 /**
216 * Admits a new message stream backed by the specified socket channel
217 * with a pending connect operation.
218 *
219 * @param channel backing socket channel
tom9710fb42014-09-29 11:35:28 -0700220 * @return newly connected message stream
toma7083182014-09-25 21:38:03 -0700221 */
tom9710fb42014-09-29 11:35:28 -0700222 public S connectStream(SocketChannel channel) {
223 return createAndAdmit(channel, SelectionKey.OP_CONNECT);
toma7083182014-09-25 21:38:03 -0700224 }
225
226 /**
227 * Creates a new message stream backed by the specified socket channel
228 * and admits it into the IO loop.
229 *
230 * @param channel socket channel
231 * @param op pending operations mask to be applied to the selection
232 * key as a set of initial interestedOps
tom9710fb42014-09-29 11:35:28 -0700233 * @return newly created message stream
toma7083182014-09-25 21:38:03 -0700234 */
tom9710fb42014-09-29 11:35:28 -0700235 private synchronized S createAndAdmit(SocketChannel channel, int op) {
toma7083182014-09-25 21:38:03 -0700236 S stream = createStream(channel);
237 streams.add(stream);
238 newStreamRequests.add(new NewStreamRequest(stream, channel, op));
239 selector.wakeup();
tom9710fb42014-09-29 11:35:28 -0700240 return stream;
toma7083182014-09-25 21:38:03 -0700241 }
242
243 /**
244 * Safely admits new streams into the IO loop.
245 */
246 private void admitNewStreams() {
247 Iterator<NewStreamRequest> it = newStreamRequests.iterator();
248 while (isRunning() && it.hasNext()) {
249 try {
250 NewStreamRequest request = it.next();
251 it.remove();
252 SelectionKey key = request.channel.register(selector, request.op,
253 request.stream);
254 request.stream.setKey(key);
255 } catch (ClosedChannelException e) {
256 log.warn("Unable to admit new message stream", e);
257 }
258 }
259 }
260
261 @Override
262 protected void loop() throws IOException {
263 notifyReady();
264
265 // Keep going until told otherwise.
266 while (isRunning()) {
267 admitNewStreams();
268
269 // Process flushes & write selects on all streams
270 for (MessageStream<M> stream : streams) {
271 stream.flushIfWriteNotPending();
272 }
273
274 // Select keys and process them.
275 int count = selector.select(selectTimeout);
276 if (count > 0 && isRunning()) {
277 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
278 while (it.hasNext()) {
279 SelectionKey key = it.next();
280 it.remove();
281 processKeyOperation(key);
282 }
283 }
284 }
285 }
286
287 /**
288 * Prunes the registered streams by discarding any stale ones.
tomf7e13b02014-09-26 11:12:25 -0700289 *
290 * @return number of remaining streams
toma7083182014-09-25 21:38:03 -0700291 */
tomf7e13b02014-09-26 11:12:25 -0700292 public synchronized int pruneStaleStreams() {
toma7083182014-09-25 21:38:03 -0700293 for (MessageStream<M> stream : streams) {
294 if (stream.isStale()) {
295 stream.close();
296 }
297 }
tomf7e13b02014-09-26 11:12:25 -0700298 return streams.size();
toma7083182014-09-25 21:38:03 -0700299 }
300
301}