blob: 9e1c2d3411801dbcfcfb183a31272d47118b4953 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
3import java.io.IOException;
4import java.nio.channels.ByteChannel;
5import java.nio.channels.CancelledKeyException;
6import java.nio.channels.ClosedChannelException;
7import java.nio.channels.SelectableChannel;
8import java.nio.channels.SelectionKey;
9import java.nio.channels.SocketChannel;
10import java.util.Iterator;
11import java.util.List;
12import java.util.Queue;
13import java.util.Set;
14import java.util.concurrent.ConcurrentLinkedQueue;
15import java.util.concurrent.CopyOnWriteArraySet;
16
17/**
18 * I/O loop for driving inbound & outbound {@link Message} transfer via
19 * {@link MessageStream}.
20 *
21 * @param <M> message type
22 * @param <S> message stream type
23 */
24public abstract class IOLoop<M extends Message, S extends MessageStream<M>>
25 extends SelectorLoop {
26
27 // Queue of requests for new message streams to enter the IO loop processing.
28 private final Queue<NewStreamRequest> newStreamRequests = new ConcurrentLinkedQueue<>();
29
30 // Carries information required for admitting a new message stream.
31 private class NewStreamRequest {
32 private final S stream;
33 private final SelectableChannel channel;
34 private final int op;
35
36 public NewStreamRequest(S stream, SelectableChannel channel, int op) {
37 this.stream = stream;
38 this.channel = channel;
39 this.op = op;
40 }
41 }
42
43 // Set of message streams currently admitted into the IO loop.
44 private final Set<MessageStream<M>> streams = new CopyOnWriteArraySet<>();
45
46 /**
47 * Creates an IO loop with the given selection timeout.
48 *
49 * @param timeout selection timeout in milliseconds
50 * @throws IOException if the backing selector cannot be opened
51 */
52 public IOLoop(long timeout) throws IOException {
53 super(timeout);
54 }
55
56 /**
57 * Creates a new message stream backed by the specified socket channel.
58 *
59 * @param byteChannel backing byte channel
60 * @return newly created message stream
61 */
62 protected abstract S createStream(ByteChannel byteChannel);
63
64 /**
65 * Removes the specified message stream from the IO loop.
66 *
67 * @param stream message stream to remove
68 */
69 void removeStream(MessageStream<M> stream) {
70 streams.remove(stream);
71 }
72
73 /**
74 * Processes the list of messages extracted from the specified message
75 * stream.
76 *
77 * @param messages non-empty list of received messages
78 * @param stream message stream from which the messages were extracted
79 */
80 protected abstract void processMessages(List<M> messages, MessageStream<M> stream);
81
82 /**
83 * Completes connection request pending on the given selection key.
84 *
85 * @param key selection key holding the pending connect operation.
86 */
87 protected void connect(SelectionKey key) {
88 try {
89 SocketChannel ch = (SocketChannel) key.channel();
90 ch.finishConnect();
91 } catch (IOException | IllegalStateException e) {
92 log.warn("Unable to complete connection", e);
93 }
94
95 if (key.isValid()) {
96 key.interestOps(SelectionKey.OP_READ);
97 }
98 }
99
100 /**
101 * Processes an IO operation pending on the specified key.
102 *
103 * @param key selection key holding the pending I/O operation.
104 */
105 protected void processKeyOperation(SelectionKey key) {
106 @SuppressWarnings("unchecked")
107 S stream = (S) key.attachment();
108
109 try {
110 // If the key is not valid, bail out.
111 if (!key.isValid()) {
112 stream.close();
113 return;
114 }
115
116 // If there is a pending connect operation, complete it.
117 if (key.isConnectable()) {
118 connect(key);
119 }
120
121 // If there is a read operation, slurp as much data as possible.
122 if (key.isReadable()) {
123 List<M> messages = stream.read();
124
125 // No messages or failed flush imply disconnect; bail.
126 if (messages == null || stream.hadError()) {
127 stream.close();
128 return;
129 }
130
131 // If there were any messages read, process them.
132 if (!messages.isEmpty()) {
133 try {
134 processMessages(messages, stream);
135 } catch (RuntimeException e) {
136 onError(stream, e);
137 }
138 }
139 }
140
141 // If there are pending writes, flush them
142 if (key.isWritable()) {
143 stream.flushIfPossible();
144 }
145
146 // If there were any issued flushing, close the stream.
147 if (stream.hadError()) {
148 stream.close();
149 }
150
151 } catch (CancelledKeyException e) {
152 // Key was cancelled, so silently close the stream
153 stream.close();
154 } catch (IOException e) {
155 if (!stream.isClosed() && !isResetByPeer(e)) {
156 log.warn("Unable to process IO", e);
157 }
158 stream.close();
159 }
160 }
161
162 // Indicates whether or not this exception is caused by 'reset by peer'.
163 private boolean isResetByPeer(IOException e) {
164 Throwable cause = e.getCause();
165 return cause != null && cause instanceof IOException &&
166 cause.getMessage().contains("reset by peer");
167 }
168
169 /**
170 * Hook to allow intercept of any errors caused during message processing.
171 * Default behaviour is to rethrow the error.
172 *
173 * @param stream message stream involved in the error
174 * @param error the runtime exception
175 */
176 protected void onError(S stream, RuntimeException error) {
177 throw error;
178 }
179
180 /**
181 * Admits a new message stream backed by the specified socket channel
182 * with a pending accept operation.
183 *
184 * @param channel backing socket channel
185 */
186 public void acceptStream(SocketChannel channel) {
187 createAndAdmit(channel, SelectionKey.OP_READ);
188 }
189
190
191 /**
192 * Admits a new message stream backed by the specified socket channel
193 * with a pending connect operation.
194 *
195 * @param channel backing socket channel
196 */
197 public void connectStream(SocketChannel channel) {
198 createAndAdmit(channel, SelectionKey.OP_CONNECT);
199 }
200
201 /**
202 * Creates a new message stream backed by the specified socket channel
203 * and admits it into the IO loop.
204 *
205 * @param channel socket channel
206 * @param op pending operations mask to be applied to the selection
207 * key as a set of initial interestedOps
208 */
209 private synchronized void createAndAdmit(SocketChannel channel, int op) {
210 S stream = createStream(channel);
211 streams.add(stream);
212 newStreamRequests.add(new NewStreamRequest(stream, channel, op));
213 selector.wakeup();
214 }
215
216 /**
217 * Safely admits new streams into the IO loop.
218 */
219 private void admitNewStreams() {
220 Iterator<NewStreamRequest> it = newStreamRequests.iterator();
221 while (isRunning() && it.hasNext()) {
222 try {
223 NewStreamRequest request = it.next();
224 it.remove();
225 SelectionKey key = request.channel.register(selector, request.op,
226 request.stream);
227 request.stream.setKey(key);
228 } catch (ClosedChannelException e) {
229 log.warn("Unable to admit new message stream", e);
230 }
231 }
232 }
233
234 @Override
235 protected void loop() throws IOException {
236 notifyReady();
237
238 // Keep going until told otherwise.
239 while (isRunning()) {
240 admitNewStreams();
241
242 // Process flushes & write selects on all streams
243 for (MessageStream<M> stream : streams) {
244 stream.flushIfWriteNotPending();
245 }
246
247 // Select keys and process them.
248 int count = selector.select(selectTimeout);
249 if (count > 0 && isRunning()) {
250 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
251 while (it.hasNext()) {
252 SelectionKey key = it.next();
253 it.remove();
254 processKeyOperation(key);
255 }
256 }
257 }
258 }
259
260 /**
261 * Prunes the registered streams by discarding any stale ones.
262 */
263 public synchronized void pruneStaleStreams() {
264 for (MessageStream<M> stream : streams) {
265 if (stream.isStale()) {
266 stream.close();
267 }
268 }
269 }
270
271}