blob: 89107bfc3b0d9cc98636443e8190a3488c3890f4 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
tom2d6d3972014-09-25 22:38:57 -07003import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -07004import org.slf4j.Logger;
5import org.slf4j.LoggerFactory;
6
7import java.io.IOException;
8import java.nio.ByteBuffer;
9import java.nio.channels.ByteChannel;
10import java.nio.channels.SelectionKey;
11import java.util.ArrayList;
12import java.util.List;
13
14import static com.google.common.base.Preconditions.checkArgument;
15import static com.google.common.base.Preconditions.checkNotNull;
16import static java.lang.System.currentTimeMillis;
17import static java.nio.ByteBuffer.allocateDirect;
18
19/**
20 * Bi-directional message stream for transferring messages to & from the
21 * network via two byte buffers.
22 *
23 * @param <M> message type
24 */
25public abstract class MessageStream<M extends Message> {
26
27 protected Logger log = LoggerFactory.getLogger(getClass());
28
29 private final IOLoop<M, ?> loop;
30 private final ByteChannel channel;
31 private final int maxIdleMillis;
32
33 private final ByteBuffer inbound;
34 private ByteBuffer outbound;
35 private SelectionKey key;
36
37 private volatile boolean closed = false;
38 private volatile boolean writePending;
39 private volatile boolean writeOccurred;
40
41 private Exception ioError;
42 private long lastActiveTime;
43
tom2d6d3972014-09-25 22:38:57 -070044 private final Counter bytesIn = new Counter();
45 private final Counter messagesIn = new Counter();
46 private final Counter bytesOut = new Counter();
47 private final Counter messagesOut = new Counter();
toma7083182014-09-25 21:38:03 -070048
49 /**
50 * Creates a message stream associated with the specified IO loop and
51 * backed by the given byte channel.
52 *
53 * @param loop IO loop
54 * @param byteChannel backing byte channel
55 * @param bufferSize size of the backing byte buffers
56 * @param maxIdleMillis maximum number of millis the stream can be idle
57 * before it will be closed
58 */
59 protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
60 int bufferSize, int maxIdleMillis) {
61 this.loop = checkNotNull(loop, "Loop cannot be null");
62 this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
63
64 checkArgument(maxIdleMillis > 0, "Idle time must be positive");
65 this.maxIdleMillis = maxIdleMillis;
66
67 inbound = allocateDirect(bufferSize);
68 outbound = allocateDirect(bufferSize);
69 }
70
71 /**
72 * Gets a single message from the specified byte buffer; this is
73 * to be done without manipulating the buffer via flip, reset or clear.
74 *
75 * @param buffer byte buffer
76 * @return read message or null if there are not enough bytes to read
77 * a complete message
78 */
79 protected abstract M read(ByteBuffer buffer);
80
81 /**
82 * Puts the specified message into the specified byte buffer; this is
83 * to be done without manipulating the buffer via flip, reset or clear.
84 *
85 * @param message message to be write into the buffer
86 * @param buffer byte buffer
87 */
88 protected abstract void write(M message, ByteBuffer buffer);
89
90 /**
91 * Closes the message buffer.
92 */
93 public void close() {
94 synchronized (this) {
95 if (closed) {
96 return;
97 }
98 closed = true;
99 }
100
tom2d6d3972014-09-25 22:38:57 -0700101 bytesIn.freeze();
102 bytesOut.freeze();
103 messagesIn.freeze();
104 messagesOut.freeze();
105
toma7083182014-09-25 21:38:03 -0700106 loop.removeStream(this);
107 if (key != null) {
108 try {
109 key.cancel();
110 key.channel().close();
111 } catch (IOException e) {
112 log.warn("Unable to close stream", e);
113 }
114 }
115 }
116
117 /**
118 * Indicates whether this buffer has been closed.
119 *
120 * @return true if this stream has been closed
121 */
122 public synchronized boolean isClosed() {
123 return closed;
124 }
125
126 /**
127 * Returns the stream IO selection key.
128 *
129 * @return socket channel registration selection key
130 */
131 public SelectionKey key() {
132 return key;
133 }
134
135 /**
136 * Binds the selection key to be used for driving IO operations on the stream.
137 *
138 * @param key IO selection key
139 */
140 public void setKey(SelectionKey key) {
141 this.key = key;
142 this.lastActiveTime = currentTimeMillis();
143 }
144
145 /**
146 * Returns the IO loop to which this stream is bound.
147 *
148 * @return I/O loop used to drive this stream
149 */
150 public IOLoop<M, ?> loop() {
151 return loop;
152 }
153
154 /**
155 * Indicates whether the any prior IO encountered an error.
156 *
157 * @return true if a write failed
158 */
159 public boolean hadError() {
160 return ioError != null;
161 }
162
163 /**
164 * Gets the prior IO error, if one occurred.
165 *
166 * @return IO error; null if none occurred
167 */
168 public Exception getError() {
169 return ioError;
170 }
171
172 /**
173 * Reads, withouth blocking, a list of messages from the stream.
174 * The list will be empty if there were not messages pending.
175 *
176 * @return list of messages or null if backing channel has been closed
177 * @throws IOException if messages could not be read
178 */
179 public List<M> read() throws IOException {
180 try {
181 int read = channel.read(inbound);
182 if (read != -1) {
183 // Read the messages one-by-one and add them to the list.
184 List<M> messages = new ArrayList<>();
185 M message;
186 inbound.flip();
187 while ((message = read(inbound)) != null) {
188 messages.add(message);
tom2d6d3972014-09-25 22:38:57 -0700189 messagesIn.add(1);
190 bytesIn.add(message.length());
toma7083182014-09-25 21:38:03 -0700191 }
192 inbound.compact();
193
194 // Mark the stream with current time to indicate liveness.
195 lastActiveTime = currentTimeMillis();
196 return messages;
197 }
198 return null;
199
200 } catch (Exception e) {
201 throw new IOException("Unable to read messages", e);
202 }
203 }
204
205 /**
206 * Writes the specified list of messages to the stream.
207 *
208 * @param messages list of messages to write
209 * @throws IOException if error occurred while writing the data
210 */
211 public void write(List<M> messages) throws IOException {
212 synchronized (this) {
213 // First write all messages.
214 for (M m : messages) {
215 append(m);
216 }
217 flushUnlessAlreadyPlanningTo();
218 }
219 }
220
221 /**
222 * Writes the given message to the stream.
223 *
224 * @param message message to write
225 * @throws IOException if error occurred while writing the data
226 */
227 public void write(M message) throws IOException {
228 synchronized (this) {
229 append(message);
230 flushUnlessAlreadyPlanningTo();
231 }
232 }
233
234 // Appends the specified message into the internal buffer, growing the
235 // buffer if required.
236 private void append(M message) {
237 // If the buffer does not have sufficient length double it.
238 while (outbound.remaining() < message.length()) {
239 doubleSize();
240 }
toma7083182014-09-25 21:38:03 -0700241 write(message, outbound);
tom2d6d3972014-09-25 22:38:57 -0700242 messagesOut.add(1);
243 bytesOut.add(message.length());
toma7083182014-09-25 21:38:03 -0700244 }
245
246 // Forces a flush, unless one is planned already.
247 private void flushUnlessAlreadyPlanningTo() throws IOException {
248 if (!writeOccurred && !writePending) {
249 flush();
250 }
251 }
252
253 /**
254 * Flushes any pending writes.
255 *
256 * @throws IOException if flush failed
257 */
258 public void flush() throws IOException {
259 synchronized (this) {
260 if (!writeOccurred && !writePending) {
261 outbound.flip();
262 try {
263 channel.write(outbound);
264 } catch (IOException e) {
265 if (!closed && !e.getMessage().equals("Broken pipe")) {
266 log.warn("Unable to write data", e);
267 ioError = e;
268 }
269 }
270 lastActiveTime = currentTimeMillis();
271 writeOccurred = true;
272 writePending = outbound.hasRemaining();
273 outbound.compact();
274 }
275 }
276 }
277
278 /**
279 * Indicates whether the stream has bytes to be written to the channel.
280 *
281 * @return true if there are bytes to be written
282 */
283 boolean isWritePending() {
284 synchronized (this) {
285 return writePending;
286 }
287 }
288
tom2d6d3972014-09-25 22:38:57 -0700289
290 /**
291 * Indicates whether data has been written but not flushed yet.
292 *
293 * @return true if flush is required
294 */
295 boolean isFlushRequired() {
296 synchronized (this) {
297 return outbound.position() > 0;
298 }
299 }
300
toma7083182014-09-25 21:38:03 -0700301 /**
302 * Attempts to flush data, internal stream state and channel availability
303 * permitting. Invoked by the driver I/O loop during handling of writable
304 * selection key.
305 * <p/>
306 * Resets the internal state flags {@code writeOccurred} and
307 * {@code writePending}.
308 *
309 * @throws IOException if implicit flush failed
310 */
311 void flushIfPossible() throws IOException {
312 synchronized (this) {
313 writePending = false;
314 writeOccurred = false;
315 if (outbound.position() > 0) {
316 flush();
317 }
318 }
319 key.interestOps(SelectionKey.OP_READ);
320 }
321
322 /**
323 * Attempts to flush data, internal stream state and channel availability
324 * permitting and if other writes are not pending. Invoked by the driver
325 * I/O loop prior to entering select wait. Resets the internal
326 * {@code writeOccurred} state flag.
327 *
328 * @throws IOException if implicit flush failed
329 */
330 void flushIfWriteNotPending() throws IOException {
331 synchronized (this) {
332 writeOccurred = false;
333 if (!writePending && outbound.position() > 0) {
334 flush();
335 }
336 }
337 if (isWritePending()) {
338 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
339 }
340 }
341
342 /**
343 * Doubles the size of the outbound buffer.
344 */
345 private void doubleSize() {
346 ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
347 outbound.flip();
348 newBuffer.put(outbound);
349 outbound = newBuffer;
350 }
351
352 /**
353 * Returns the maximum number of milliseconds the stream is allowed
354 * without any read/write operations.
355 *
356 * @return number if millis of permissible idle time
357 */
358 protected int maxIdleMillis() {
359 return maxIdleMillis;
360 }
361
362
363 /**
364 * Returns true if the given stream has gone stale.
365 *
366 * @return true if the stream is stale
367 */
368 boolean isStale() {
369 return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
370 }
371
tom2d6d3972014-09-25 22:38:57 -0700372 /**
373 * Returns the inbound bytes counter.
374 *
375 * @return inbound bytes counter
376 */
377 public Counter bytesIn() {
378 return bytesIn;
379 }
380
381 /**
382 * Returns the outbound bytes counter.
383 *
384 * @return outbound bytes counter
385 */
386 public Counter bytesOut() {
387 return bytesOut;
388 }
389
390 /**
391 * Returns the inbound messages counter.
392 *
393 * @return inbound messages counter
394 */
395 public Counter messagesIn() {
396 return messagesIn;
397 }
398
399 /**
400 * Returns the outbound messages counter.
401 *
402 * @return outbound messages counter
403 */
404 public Counter messagesOut() {
405 return messagesOut;
406 }
407
toma7083182014-09-25 21:38:03 -0700408}