blob: 9c9bca8b096f5572e1f06a90bfba6a6d32cffa65 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
toma7083182014-09-25 21:38:03 -070019package org.onlab.nio;
20
tom2d6d3972014-09-25 22:38:57 -070021import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -070022import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
24
25import java.io.IOException;
26import java.nio.ByteBuffer;
27import java.nio.channels.ByteChannel;
28import java.nio.channels.SelectionKey;
29import java.util.ArrayList;
30import java.util.List;
tom5a8779c2014-09-29 14:48:43 -070031import java.util.Objects;
toma7083182014-09-25 21:38:03 -070032
33import static com.google.common.base.Preconditions.checkArgument;
34import static com.google.common.base.Preconditions.checkNotNull;
35import static java.lang.System.currentTimeMillis;
36import static java.nio.ByteBuffer.allocateDirect;
37
38/**
39 * Bi-directional message stream for transferring messages to & from the
40 * network via two byte buffers.
41 *
42 * @param <M> message type
43 */
44public abstract class MessageStream<M extends Message> {
45
46 protected Logger log = LoggerFactory.getLogger(getClass());
47
48 private final IOLoop<M, ?> loop;
49 private final ByteChannel channel;
50 private final int maxIdleMillis;
51
52 private final ByteBuffer inbound;
53 private ByteBuffer outbound;
54 private SelectionKey key;
55
56 private volatile boolean closed = false;
57 private volatile boolean writePending;
58 private volatile boolean writeOccurred;
59
60 private Exception ioError;
61 private long lastActiveTime;
62
tom2d6d3972014-09-25 22:38:57 -070063 private final Counter bytesIn = new Counter();
64 private final Counter messagesIn = new Counter();
65 private final Counter bytesOut = new Counter();
66 private final Counter messagesOut = new Counter();
toma7083182014-09-25 21:38:03 -070067
68 /**
69 * Creates a message stream associated with the specified IO loop and
70 * backed by the given byte channel.
71 *
72 * @param loop IO loop
73 * @param byteChannel backing byte channel
74 * @param bufferSize size of the backing byte buffers
75 * @param maxIdleMillis maximum number of millis the stream can be idle
76 * before it will be closed
77 */
78 protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
79 int bufferSize, int maxIdleMillis) {
80 this.loop = checkNotNull(loop, "Loop cannot be null");
81 this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
82
83 checkArgument(maxIdleMillis > 0, "Idle time must be positive");
84 this.maxIdleMillis = maxIdleMillis;
85
86 inbound = allocateDirect(bufferSize);
87 outbound = allocateDirect(bufferSize);
88 }
89
90 /**
91 * Gets a single message from the specified byte buffer; this is
92 * to be done without manipulating the buffer via flip, reset or clear.
93 *
94 * @param buffer byte buffer
95 * @return read message or null if there are not enough bytes to read
96 * a complete message
97 */
98 protected abstract M read(ByteBuffer buffer);
99
100 /**
101 * Puts the specified message into the specified byte buffer; this is
102 * to be done without manipulating the buffer via flip, reset or clear.
103 *
104 * @param message message to be write into the buffer
105 * @param buffer byte buffer
106 */
107 protected abstract void write(M message, ByteBuffer buffer);
108
109 /**
110 * Closes the message buffer.
111 */
112 public void close() {
113 synchronized (this) {
114 if (closed) {
115 return;
116 }
117 closed = true;
118 }
119
tom2d6d3972014-09-25 22:38:57 -0700120 bytesIn.freeze();
121 bytesOut.freeze();
122 messagesIn.freeze();
123 messagesOut.freeze();
124
toma7083182014-09-25 21:38:03 -0700125 loop.removeStream(this);
126 if (key != null) {
127 try {
128 key.cancel();
129 key.channel().close();
130 } catch (IOException e) {
131 log.warn("Unable to close stream", e);
132 }
133 }
134 }
135
136 /**
137 * Indicates whether this buffer has been closed.
138 *
139 * @return true if this stream has been closed
140 */
141 public synchronized boolean isClosed() {
142 return closed;
143 }
144
145 /**
146 * Returns the stream IO selection key.
147 *
148 * @return socket channel registration selection key
149 */
150 public SelectionKey key() {
151 return key;
152 }
153
154 /**
155 * Binds the selection key to be used for driving IO operations on the stream.
156 *
157 * @param key IO selection key
158 */
159 public void setKey(SelectionKey key) {
160 this.key = key;
161 this.lastActiveTime = currentTimeMillis();
162 }
163
164 /**
165 * Returns the IO loop to which this stream is bound.
166 *
167 * @return I/O loop used to drive this stream
168 */
169 public IOLoop<M, ?> loop() {
170 return loop;
171 }
172
173 /**
174 * Indicates whether the any prior IO encountered an error.
175 *
176 * @return true if a write failed
177 */
178 public boolean hadError() {
179 return ioError != null;
180 }
181
182 /**
183 * Gets the prior IO error, if one occurred.
184 *
185 * @return IO error; null if none occurred
186 */
187 public Exception getError() {
188 return ioError;
189 }
190
191 /**
tom73094832014-09-29 13:47:08 -0700192 * Reads, without blocking, a list of messages from the stream.
toma7083182014-09-25 21:38:03 -0700193 * The list will be empty if there were not messages pending.
194 *
195 * @return list of messages or null if backing channel has been closed
196 * @throws IOException if messages could not be read
197 */
198 public List<M> read() throws IOException {
199 try {
200 int read = channel.read(inbound);
201 if (read != -1) {
202 // Read the messages one-by-one and add them to the list.
203 List<M> messages = new ArrayList<>();
204 M message;
205 inbound.flip();
206 while ((message = read(inbound)) != null) {
207 messages.add(message);
tom2d6d3972014-09-25 22:38:57 -0700208 messagesIn.add(1);
209 bytesIn.add(message.length());
toma7083182014-09-25 21:38:03 -0700210 }
211 inbound.compact();
212
213 // Mark the stream with current time to indicate liveness.
214 lastActiveTime = currentTimeMillis();
215 return messages;
216 }
217 return null;
218
219 } catch (Exception e) {
220 throw new IOException("Unable to read messages", e);
221 }
222 }
223
224 /**
225 * Writes the specified list of messages to the stream.
226 *
227 * @param messages list of messages to write
228 * @throws IOException if error occurred while writing the data
229 */
230 public void write(List<M> messages) throws IOException {
231 synchronized (this) {
232 // First write all messages.
233 for (M m : messages) {
234 append(m);
235 }
236 flushUnlessAlreadyPlanningTo();
237 }
238 }
239
240 /**
241 * Writes the given message to the stream.
242 *
243 * @param message message to write
244 * @throws IOException if error occurred while writing the data
245 */
246 public void write(M message) throws IOException {
247 synchronized (this) {
248 append(message);
249 flushUnlessAlreadyPlanningTo();
250 }
251 }
252
253 // Appends the specified message into the internal buffer, growing the
254 // buffer if required.
255 private void append(M message) {
256 // If the buffer does not have sufficient length double it.
257 while (outbound.remaining() < message.length()) {
258 doubleSize();
259 }
toma7083182014-09-25 21:38:03 -0700260 write(message, outbound);
tom2d6d3972014-09-25 22:38:57 -0700261 messagesOut.add(1);
262 bytesOut.add(message.length());
toma7083182014-09-25 21:38:03 -0700263 }
264
265 // Forces a flush, unless one is planned already.
266 private void flushUnlessAlreadyPlanningTo() throws IOException {
267 if (!writeOccurred && !writePending) {
268 flush();
269 }
270 }
271
272 /**
273 * Flushes any pending writes.
274 *
275 * @throws IOException if flush failed
276 */
277 public void flush() throws IOException {
278 synchronized (this) {
279 if (!writeOccurred && !writePending) {
280 outbound.flip();
281 try {
282 channel.write(outbound);
283 } catch (IOException e) {
tom5a8779c2014-09-29 14:48:43 -0700284 if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
toma7083182014-09-25 21:38:03 -0700285 log.warn("Unable to write data", e);
286 ioError = e;
287 }
288 }
289 lastActiveTime = currentTimeMillis();
290 writeOccurred = true;
291 writePending = outbound.hasRemaining();
292 outbound.compact();
293 }
294 }
295 }
296
297 /**
298 * Indicates whether the stream has bytes to be written to the channel.
299 *
300 * @return true if there are bytes to be written
301 */
302 boolean isWritePending() {
303 synchronized (this) {
304 return writePending;
305 }
306 }
307
tom2d6d3972014-09-25 22:38:57 -0700308
309 /**
310 * Indicates whether data has been written but not flushed yet.
311 *
312 * @return true if flush is required
313 */
314 boolean isFlushRequired() {
315 synchronized (this) {
316 return outbound.position() > 0;
317 }
318 }
319
toma7083182014-09-25 21:38:03 -0700320 /**
321 * Attempts to flush data, internal stream state and channel availability
322 * permitting. Invoked by the driver I/O loop during handling of writable
323 * selection key.
324 * <p/>
325 * Resets the internal state flags {@code writeOccurred} and
326 * {@code writePending}.
327 *
328 * @throws IOException if implicit flush failed
329 */
330 void flushIfPossible() throws IOException {
331 synchronized (this) {
332 writePending = false;
333 writeOccurred = false;
334 if (outbound.position() > 0) {
335 flush();
336 }
337 }
338 key.interestOps(SelectionKey.OP_READ);
339 }
340
341 /**
342 * Attempts to flush data, internal stream state and channel availability
343 * permitting and if other writes are not pending. Invoked by the driver
344 * I/O loop prior to entering select wait. Resets the internal
345 * {@code writeOccurred} state flag.
346 *
347 * @throws IOException if implicit flush failed
348 */
349 void flushIfWriteNotPending() throws IOException {
350 synchronized (this) {
351 writeOccurred = false;
352 if (!writePending && outbound.position() > 0) {
353 flush();
354 }
355 }
356 if (isWritePending()) {
357 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
358 }
359 }
360
361 /**
362 * Doubles the size of the outbound buffer.
363 */
364 private void doubleSize() {
365 ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
366 outbound.flip();
367 newBuffer.put(outbound);
368 outbound = newBuffer;
369 }
370
371 /**
372 * Returns the maximum number of milliseconds the stream is allowed
373 * without any read/write operations.
374 *
375 * @return number if millis of permissible idle time
376 */
377 protected int maxIdleMillis() {
378 return maxIdleMillis;
379 }
380
381
382 /**
383 * Returns true if the given stream has gone stale.
384 *
385 * @return true if the stream is stale
386 */
387 boolean isStale() {
388 return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
389 }
390
tom2d6d3972014-09-25 22:38:57 -0700391 /**
392 * Returns the inbound bytes counter.
393 *
394 * @return inbound bytes counter
395 */
396 public Counter bytesIn() {
397 return bytesIn;
398 }
399
400 /**
401 * Returns the outbound bytes counter.
402 *
403 * @return outbound bytes counter
404 */
405 public Counter bytesOut() {
406 return bytesOut;
407 }
408
409 /**
410 * Returns the inbound messages counter.
411 *
412 * @return inbound messages counter
413 */
414 public Counter messagesIn() {
415 return messagesIn;
416 }
417
418 /**
419 * Returns the outbound messages counter.
420 *
421 * @return outbound messages counter
422 */
423 public Counter messagesOut() {
424 return messagesOut;
425 }
426
toma7083182014-09-25 21:38:03 -0700427}