blob: 38c9cf692c7fdfe6ccbf5a174dbb9f0d1ea1d80c [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
3import java.io.IOException;
4import java.nio.channels.ByteChannel;
5import java.nio.channels.CancelledKeyException;
6import java.nio.channels.ClosedChannelException;
7import java.nio.channels.SelectableChannel;
8import java.nio.channels.SelectionKey;
9import java.nio.channels.SocketChannel;
10import java.util.Iterator;
11import java.util.List;
12import java.util.Queue;
13import java.util.Set;
14import java.util.concurrent.ConcurrentLinkedQueue;
15import java.util.concurrent.CopyOnWriteArraySet;
16
17/**
18 * I/O loop for driving inbound & outbound {@link Message} transfer via
19 * {@link MessageStream}.
20 *
21 * @param <M> message type
22 * @param <S> message stream type
23 */
24public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
25 extends SelectorLoop {
26
27 // Queue of requests for new message streams to enter the IO loop processing.
28 private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>();
29
30 // Carries information required for admitting a new message stream.
31 private class NewStreamRequest {
32 private final S stream;
33 private final SelectableChannel channel;
34 private final int op;
35
36 public NewStreamRequest(S stream, SelectableChannel channel, int op) {
37 this.stream = stream;
38 this.channel = channel;
39 this.op = op;
40 }
41 }
42
43 // Set of message streams currently admitted into the IO loop.
44 private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>();
45
46 /**
47 * Creates an IO loop with the given selection timeout.
48 *
49 * @param timeout selection timeout in milliseconds
50 * @throws IOException if the backing selector cannot be opened
51 */
52 public IOLoop(long timeout) throws IOException {
53 super(timeout);
54 }
55
56 /**
tom16555622014-09-29 08:49:27 -070057 * Returns the number of streams in custody of the IO loop.
58 *
59 * @return number of message streams using this loop
60 */
61 public int streamCount() {
62 return streams.size();
63 }
64
65 /**
toma7083182014-09-25 21:38:03 -070066 * Creates a new message stream backed by the specified socket channel.
67 *
68 * @param byteChannel backing byte channel
69 * @return newly created message stream
70 */
71 protected abstract S createStream(ByteChannel byteChannel);
72
73 /**
74 * Removes the specified message stream from the IO loop.
75 *
76 * @param stream message stream to remove
77 */
tomf110fff2014-09-26 00:38:18 -070078 protected void removeStream(MessageStream<M> stream) {
toma7083182014-09-25 21:38:03 -070079 streams.remove(stream);
80 }
81
82 /**
83 * Processes the list of messages extracted from the specified message
84 * stream.
85 *
86 * @param messages non-empty list of received messages
87 * @param stream message stream from which the messages were extracted
88 */
89 protected abstract void processMessages(List<M> messages, MessageStream<M> stream);
90
91 /**
92 * Completes connection request pending on the given selection key.
93 *
94 * @param key selection key holding the pending connect operation.
95 */
96 protected void connect(SelectionKey key) {
97 try {
98 SocketChannel ch = (SocketChannel) key.channel();
99 ch.finishConnect();
100 } catch (IOException | IllegalStateException e) {
101 log.warn("Unable to complete connection", e);
102 }
103
104 if (key.isValid()) {
105 key.interestOps(SelectionKey.OP_READ);
106 }
107 }
108
109 /**
110 * Processes an IO operation pending on the specified key.
111 *
112 * @param key selection key holding the pending I/O operation.
113 */
114 protected void processKeyOperation(SelectionKey key) {
115 @SuppressWarnings("unchecked")
116 S stream = (S) key.attachment();
117
118 try {
119 // If the key is not valid, bail out.
120 if (!key.isValid()) {
121 stream.close();
122 return;
123 }
124
125 // If there is a pending connect operation, complete it.
126 if (key.isConnectable()) {
127 connect(key);
128 }
129
130 // If there is a read operation, slurp as much data as possible.
131 if (key.isReadable()) {
132 List<M> messages = stream.read();
133
134 // No messages or failed flush imply disconnect; bail.
135 if (messages == null || stream.hadError()) {
136 stream.close();
137 return;
138 }
139
140 // If there were any messages read, process them.
141 if (!messages.isEmpty()) {
142 try {
143 processMessages(messages, stream);
144 } catch (RuntimeException e) {
145 onError(stream, e);
146 }
147 }
148 }
149
150 // If there are pending writes, flush them
151 if (key.isWritable()) {
152 stream.flushIfPossible();
153 }
154
155 // If there were any issued flushing, close the stream.
156 if (stream.hadError()) {
157 stream.close();
158 }
159
160 } catch (CancelledKeyException e) {
161 // Key was cancelled, so silently close the stream
162 stream.close();
163 } catch (IOException e) {
164 if (!stream.isClosed() && !isResetByPeer(e)) {
165 log.warn("Unable to process IO", e);
166 }
167 stream.close();
168 }
169 }
170
171 // Indicates whether or not this exception is caused by 'reset by peer'.
172 private boolean isResetByPeer(IOException e) {
173 Throwable cause = e.getCause();
174 return cause != null && cause instanceof IOException &&
175 cause.getMessage().contains("reset by peer");
176 }
177
178 /**
179 * Hook to allow intercept of any errors caused during message processing.
180 * Default behaviour is to rethrow the error.
181 *
182 * @param stream message stream involved in the error
183 * @param error the runtime exception
184 */
185 protected void onError(S stream, RuntimeException error) {
186 throw error;
187 }
188
189 /**
190 * Admits a new message stream backed by the specified socket channel
191 * with a pending accept operation.
192 *
193 * @param channel backing socket channel
tom16555622014-09-29 08:49:27 -0700194 * @return newly accepted message stream
toma7083182014-09-25 21:38:03 -0700195 */
tom16555622014-09-29 08:49:27 -0700196 public S acceptStream(SocketChannel channel) {
197 return createAndAdmit(channel, SelectionKey.OP_READ);
toma7083182014-09-25 21:38:03 -0700198 }
199
200
201 /**
202 * Admits a new message stream backed by the specified socket channel
203 * with a pending connect operation.
204 *
205 * @param channel backing socket channel
tom16555622014-09-29 08:49:27 -0700206 * @return newly connected message stream
toma7083182014-09-25 21:38:03 -0700207 */
tom16555622014-09-29 08:49:27 -0700208 public S connectStream(SocketChannel channel) {
209 return createAndAdmit(channel, SelectionKey.OP_CONNECT);
toma7083182014-09-25 21:38:03 -0700210 }
211
212 /**
213 * Creates a new message stream backed by the specified socket channel
214 * and admits it into the IO loop.
215 *
216 * @param channel socket channel
217 * @param op pending operations mask to be applied to the selection
218 * key as a set of initial interestedOps
tom16555622014-09-29 08:49:27 -0700219 * @return newly created message stream
toma7083182014-09-25 21:38:03 -0700220 */
tom16555622014-09-29 08:49:27 -0700221 private synchronized S createAndAdmit(SocketChannel channel, int op) {
toma7083182014-09-25 21:38:03 -0700222 S stream = createStream(channel);
223 streams.add(stream);
224 newStreamRequests.add(new NewStreamRequest(stream, channel, op));
225 selector.wakeup();
tom16555622014-09-29 08:49:27 -0700226 return stream;
toma7083182014-09-25 21:38:03 -0700227 }
228
229 /**
230 * Safely admits new streams into the IO loop.
231 */
232 private void admitNewStreams() {
233 Iterator<NewStreamRequest> it = newStreamRequests.iterator();
234 while (isRunning() && it.hasNext()) {
235 try {
236 NewStreamRequest request = it.next();
237 it.remove();
238 SelectionKey key = request.channel.register(selector, request.op,
239 request.stream);
240 request.stream.setKey(key);
241 } catch (ClosedChannelException e) {
242 log.warn("Unable to admit new message stream", e);
243 }
244 }
245 }
246
247 @Override
248 protected void loop() throws IOException {
249 notifyReady();
250
251 // Keep going until told otherwise.
252 while (isRunning()) {
253 admitNewStreams();
254
255 // Process flushes & write selects on all streams
256 for (MessageStream<M> stream : streams) {
257 stream.flushIfWriteNotPending();
258 }
259
260 // Select keys and process them.
261 int count = selector.select(selectTimeout);
262 if (count > 0 && isRunning()) {
263 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
264 while (it.hasNext()) {
265 SelectionKey key = it.next();
266 it.remove();
267 processKeyOperation(key);
268 }
269 }
270 }
271 }
272
273 /**
274 * Prunes the registered streams by discarding any stale ones.
tomf7e13b02014-09-26 11:12:25 -0700275 *
276 * @return number of remaining streams
toma7083182014-09-25 21:38:03 -0700277 */
tomf7e13b02014-09-26 11:12:25 -0700278 public synchronized int pruneStaleStreams() {
toma7083182014-09-25 21:38:03 -0700279 for (MessageStream<M> stream : streams) {
280 if (stream.isStale()) {
281 stream.close();
282 }
283 }
tomf7e13b02014-09-26 11:12:25 -0700284 return streams.size();
toma7083182014-09-25 21:38:03 -0700285 }
286
287}