blob: dc3ecaf0b6b687644ee68b5ccf8147abfe837786 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
3import java.io.IOException;
4import java.nio.channels.ByteChannel;
5import java.nio.channels.CancelledKeyException;
6import java.nio.channels.ClosedChannelException;
7import java.nio.channels.SelectableChannel;
8import java.nio.channels.SelectionKey;
9import java.nio.channels.SocketChannel;
10import java.util.Iterator;
11import java.util.List;
12import java.util.Queue;
13import java.util.Set;
14import java.util.concurrent.ConcurrentLinkedQueue;
15import java.util.concurrent.CopyOnWriteArraySet;
16
17/**
18 * I/O loop for driving inbound & outbound {@link Message} transfer via
19 * {@link MessageStream}.
20 *
21 * @param <M> message type
22 * @param <S> message stream type
23 */
24public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
25 extends SelectorLoop {
26
27 // Queue of requests for new message streams to enter the IO loop processing.
28 private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>();
29
30 // Carries information required for admitting a new message stream.
31 private class NewStreamRequest {
32 private final S stream;
33 private final SelectableChannel channel;
34 private final int op;
35
36 public NewStreamRequest(S stream, SelectableChannel channel, int op) {
37 this.stream = stream;
38 this.channel = channel;
39 this.op = op;
40 }
41 }
42
43 // Set of message streams currently admitted into the IO loop.
44 private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>();
45
46 /**
47 * Creates an IO loop with the given selection timeout.
48 *
49 * @param timeout selection timeout in milliseconds
50 * @throws IOException if the backing selector cannot be opened
51 */
52 public IOLoop(long timeout) throws IOException {
53 super(timeout);
54 }
55
56 /**
tom9710fb42014-09-29 11:35:28 -070057 * Returns the number of message stream in custody of the loop.
58 *
59 * @return number of message streams
60 */
61 public int streamCount() {
62 return streams.size();
63 }
64
65 /**
toma7083182014-09-25 21:38:03 -070066 * Creates a new message stream backed by the specified socket channel.
67 *
68 * @param byteChannel backing byte channel
69 * @return newly created message stream
70 */
71 protected abstract S createStream(ByteChannel byteChannel);
72
73 /**
74 * Removes the specified message stream from the IO loop.
75 *
76 * @param stream message stream to remove
77 */
tomf110fff2014-09-26 00:38:18 -070078 protected void removeStream(MessageStream<M> stream) {
toma7083182014-09-25 21:38:03 -070079 streams.remove(stream);
80 }
81
82 /**
83 * Processes the list of messages extracted from the specified message
84 * stream.
85 *
86 * @param messages non-empty list of received messages
87 * @param stream message stream from which the messages were extracted
88 */
89 protected abstract void processMessages(List<M> messages, MessageStream<M> stream);
90
91 /**
92 * Completes connection request pending on the given selection key.
93 *
94 * @param key selection key holding the pending connect operation.
95 */
tom5a8779c2014-09-29 14:48:43 -070096 protected void connect(SelectionKey key) throws IOException {
97 SocketChannel ch = (SocketChannel) key.channel();
98 ch.finishConnect();
toma7083182014-09-25 21:38:03 -070099 if (key.isValid()) {
100 key.interestOps(SelectionKey.OP_READ);
101 }
102 }
103
104 /**
105 * Processes an IO operation pending on the specified key.
106 *
107 * @param key selection key holding the pending I/O operation.
108 */
109 protected void processKeyOperation(SelectionKey key) {
110 @SuppressWarnings("unchecked")
111 S stream = (S) key.attachment();
112
113 try {
114 // If the key is not valid, bail out.
115 if (!key.isValid()) {
116 stream.close();
117 return;
118 }
119
120 // If there is a pending connect operation, complete it.
121 if (key.isConnectable()) {
tom5a8779c2014-09-29 14:48:43 -0700122 try {
123 connect(key);
124 } catch (IOException | IllegalStateException e) {
125 log.warn("Unable to complete connection", e);
126 }
toma7083182014-09-25 21:38:03 -0700127 }
128
129 // If there is a read operation, slurp as much data as possible.
130 if (key.isReadable()) {
131 List<M> messages = stream.read();
132
133 // No messages or failed flush imply disconnect; bail.
134 if (messages == null || stream.hadError()) {
135 stream.close();
136 return;
137 }
138
139 // If there were any messages read, process them.
140 if (!messages.isEmpty()) {
141 try {
142 processMessages(messages, stream);
143 } catch (RuntimeException e) {
144 onError(stream, e);
145 }
146 }
147 }
148
149 // If there are pending writes, flush them
150 if (key.isWritable()) {
151 stream.flushIfPossible();
152 }
153
154 // If there were any issued flushing, close the stream.
155 if (stream.hadError()) {
156 stream.close();
157 }
158
159 } catch (CancelledKeyException e) {
160 // Key was cancelled, so silently close the stream
161 stream.close();
162 } catch (IOException e) {
163 if (!stream.isClosed() && !isResetByPeer(e)) {
164 log.warn("Unable to process IO", e);
165 }
166 stream.close();
167 }
168 }
169
170 // Indicates whether or not this exception is caused by 'reset by peer'.
171 private boolean isResetByPeer(IOException e) {
172 Throwable cause = e.getCause();
173 return cause != null && cause instanceof IOException &&
174 cause.getMessage().contains("reset by peer");
175 }
176
177 /**
178 * Hook to allow intercept of any errors caused during message processing.
179 * Default behaviour is to rethrow the error.
180 *
181 * @param stream message stream involved in the error
182 * @param error the runtime exception
183 */
184 protected void onError(S stream, RuntimeException error) {
185 throw error;
186 }
187
188 /**
189 * Admits a new message stream backed by the specified socket channel
190 * with a pending accept operation.
191 *
192 * @param channel backing socket channel
tom9710fb42014-09-29 11:35:28 -0700193 * @return newly accepted message stream
toma7083182014-09-25 21:38:03 -0700194 */
tom9710fb42014-09-29 11:35:28 -0700195 public S acceptStream(SocketChannel channel) {
196 return createAndAdmit(channel, SelectionKey.OP_READ);
toma7083182014-09-25 21:38:03 -0700197 }
198
199
200 /**
201 * Admits a new message stream backed by the specified socket channel
202 * with a pending connect operation.
203 *
204 * @param channel backing socket channel
tom9710fb42014-09-29 11:35:28 -0700205 * @return newly connected message stream
toma7083182014-09-25 21:38:03 -0700206 */
tom9710fb42014-09-29 11:35:28 -0700207 public S connectStream(SocketChannel channel) {
208 return createAndAdmit(channel, SelectionKey.OP_CONNECT);
toma7083182014-09-25 21:38:03 -0700209 }
210
211 /**
212 * Creates a new message stream backed by the specified socket channel
213 * and admits it into the IO loop.
214 *
215 * @param channel socket channel
216 * @param op pending operations mask to be applied to the selection
217 * key as a set of initial interestedOps
tom9710fb42014-09-29 11:35:28 -0700218 * @return newly created message stream
toma7083182014-09-25 21:38:03 -0700219 */
tom9710fb42014-09-29 11:35:28 -0700220 private synchronized S createAndAdmit(SocketChannel channel, int op) {
toma7083182014-09-25 21:38:03 -0700221 S stream = createStream(channel);
222 streams.add(stream);
223 newStreamRequests.add(new NewStreamRequest(stream, channel, op));
224 selector.wakeup();
tom9710fb42014-09-29 11:35:28 -0700225 return stream;
toma7083182014-09-25 21:38:03 -0700226 }
227
228 /**
229 * Safely admits new streams into the IO loop.
230 */
231 private void admitNewStreams() {
232 Iterator<NewStreamRequest> it = newStreamRequests.iterator();
233 while (isRunning() && it.hasNext()) {
234 try {
235 NewStreamRequest request = it.next();
236 it.remove();
237 SelectionKey key = request.channel.register(selector, request.op,
238 request.stream);
239 request.stream.setKey(key);
240 } catch (ClosedChannelException e) {
241 log.warn("Unable to admit new message stream", e);
242 }
243 }
244 }
245
246 @Override
247 protected void loop() throws IOException {
248 notifyReady();
249
250 // Keep going until told otherwise.
251 while (isRunning()) {
252 admitNewStreams();
253
254 // Process flushes & write selects on all streams
255 for (MessageStream<M> stream : streams) {
256 stream.flushIfWriteNotPending();
257 }
258
259 // Select keys and process them.
260 int count = selector.select(selectTimeout);
261 if (count > 0 && isRunning()) {
262 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
263 while (it.hasNext()) {
264 SelectionKey key = it.next();
265 it.remove();
266 processKeyOperation(key);
267 }
268 }
269 }
270 }
271
272 /**
273 * Prunes the registered streams by discarding any stale ones.
tomf7e13b02014-09-26 11:12:25 -0700274 *
275 * @return number of remaining streams
toma7083182014-09-25 21:38:03 -0700276 */
tomf7e13b02014-09-26 11:12:25 -0700277 public synchronized int pruneStaleStreams() {
toma7083182014-09-25 21:38:03 -0700278 for (MessageStream<M> stream : streams) {
279 if (stream.isStale()) {
280 stream.close();
281 }
282 }
tomf7e13b02014-09-26 11:12:25 -0700283 return streams.size();
toma7083182014-09-25 21:38:03 -0700284 }
285
286}