blob: c38f0f514d73444dbc635b0746201f2de241bc71 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
tom2d6d3972014-09-25 22:38:57 -07003import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -07004import org.slf4j.Logger;
5import org.slf4j.LoggerFactory;
6
7import java.io.IOException;
8import java.nio.ByteBuffer;
9import java.nio.channels.ByteChannel;
10import java.nio.channels.SelectionKey;
11import java.util.ArrayList;
12import java.util.List;
tom5a8779c2014-09-29 14:48:43 -070013import java.util.Objects;
toma7083182014-09-25 21:38:03 -070014
15import static com.google.common.base.Preconditions.checkArgument;
16import static com.google.common.base.Preconditions.checkNotNull;
17import static java.lang.System.currentTimeMillis;
18import static java.nio.ByteBuffer.allocateDirect;
19
20/**
21 * Bi-directional message stream for transferring messages to & from the
22 * network via two byte buffers.
23 *
24 * @param <M> message type
25 */
26public abstract class MessageStream<M extends Message> {
27
28 protected Logger log = LoggerFactory.getLogger(getClass());
29
30 private final IOLoop<M, ?> loop;
31 private final ByteChannel channel;
32 private final int maxIdleMillis;
33
34 private final ByteBuffer inbound;
35 private ByteBuffer outbound;
36 private SelectionKey key;
37
38 private volatile boolean closed = false;
39 private volatile boolean writePending;
40 private volatile boolean writeOccurred;
41
42 private Exception ioError;
43 private long lastActiveTime;
44
tom2d6d3972014-09-25 22:38:57 -070045 private final Counter bytesIn = new Counter();
46 private final Counter messagesIn = new Counter();
47 private final Counter bytesOut = new Counter();
48 private final Counter messagesOut = new Counter();
toma7083182014-09-25 21:38:03 -070049
50 /**
51 * Creates a message stream associated with the specified IO loop and
52 * backed by the given byte channel.
53 *
54 * @param loop IO loop
55 * @param byteChannel backing byte channel
56 * @param bufferSize size of the backing byte buffers
57 * @param maxIdleMillis maximum number of millis the stream can be idle
58 * before it will be closed
59 */
60 protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
61 int bufferSize, int maxIdleMillis) {
62 this.loop = checkNotNull(loop, "Loop cannot be null");
63 this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
64
65 checkArgument(maxIdleMillis > 0, "Idle time must be positive");
66 this.maxIdleMillis = maxIdleMillis;
67
68 inbound = allocateDirect(bufferSize);
69 outbound = allocateDirect(bufferSize);
70 }
71
72 /**
73 * Gets a single message from the specified byte buffer; this is
74 * to be done without manipulating the buffer via flip, reset or clear.
75 *
76 * @param buffer byte buffer
77 * @return read message or null if there are not enough bytes to read
78 * a complete message
79 */
80 protected abstract M read(ByteBuffer buffer);
81
82 /**
83 * Puts the specified message into the specified byte buffer; this is
84 * to be done without manipulating the buffer via flip, reset or clear.
85 *
86 * @param message message to be write into the buffer
87 * @param buffer byte buffer
88 */
89 protected abstract void write(M message, ByteBuffer buffer);
90
91 /**
92 * Closes the message buffer.
93 */
94 public void close() {
95 synchronized (this) {
96 if (closed) {
97 return;
98 }
99 closed = true;
100 }
101
tom2d6d3972014-09-25 22:38:57 -0700102 bytesIn.freeze();
103 bytesOut.freeze();
104 messagesIn.freeze();
105 messagesOut.freeze();
106
toma7083182014-09-25 21:38:03 -0700107 loop.removeStream(this);
108 if (key != null) {
109 try {
110 key.cancel();
111 key.channel().close();
112 } catch (IOException e) {
113 log.warn("Unable to close stream", e);
114 }
115 }
116 }
117
118 /**
119 * Indicates whether this buffer has been closed.
120 *
121 * @return true if this stream has been closed
122 */
123 public synchronized boolean isClosed() {
124 return closed;
125 }
126
127 /**
128 * Returns the stream IO selection key.
129 *
130 * @return socket channel registration selection key
131 */
132 public SelectionKey key() {
133 return key;
134 }
135
136 /**
137 * Binds the selection key to be used for driving IO operations on the stream.
138 *
139 * @param key IO selection key
140 */
141 public void setKey(SelectionKey key) {
142 this.key = key;
143 this.lastActiveTime = currentTimeMillis();
144 }
145
146 /**
147 * Returns the IO loop to which this stream is bound.
148 *
149 * @return I/O loop used to drive this stream
150 */
151 public IOLoop<M, ?> loop() {
152 return loop;
153 }
154
155 /**
156 * Indicates whether the any prior IO encountered an error.
157 *
158 * @return true if a write failed
159 */
160 public boolean hadError() {
161 return ioError != null;
162 }
163
164 /**
165 * Gets the prior IO error, if one occurred.
166 *
167 * @return IO error; null if none occurred
168 */
169 public Exception getError() {
170 return ioError;
171 }
172
173 /**
tom73094832014-09-29 13:47:08 -0700174 * Reads, without blocking, a list of messages from the stream.
toma7083182014-09-25 21:38:03 -0700175 * The list will be empty if there were not messages pending.
176 *
177 * @return list of messages or null if backing channel has been closed
178 * @throws IOException if messages could not be read
179 */
180 public List<M> read() throws IOException {
181 try {
182 int read = channel.read(inbound);
183 if (read != -1) {
184 // Read the messages one-by-one and add them to the list.
185 List<M> messages = new ArrayList<>();
186 M message;
187 inbound.flip();
188 while ((message = read(inbound)) != null) {
189 messages.add(message);
tom2d6d3972014-09-25 22:38:57 -0700190 messagesIn.add(1);
191 bytesIn.add(message.length());
toma7083182014-09-25 21:38:03 -0700192 }
193 inbound.compact();
194
195 // Mark the stream with current time to indicate liveness.
196 lastActiveTime = currentTimeMillis();
197 return messages;
198 }
199 return null;
200
201 } catch (Exception e) {
202 throw new IOException("Unable to read messages", e);
203 }
204 }
205
206 /**
207 * Writes the specified list of messages to the stream.
208 *
209 * @param messages list of messages to write
210 * @throws IOException if error occurred while writing the data
211 */
212 public void write(List<M> messages) throws IOException {
213 synchronized (this) {
214 // First write all messages.
215 for (M m : messages) {
216 append(m);
217 }
218 flushUnlessAlreadyPlanningTo();
219 }
220 }
221
222 /**
223 * Writes the given message to the stream.
224 *
225 * @param message message to write
226 * @throws IOException if error occurred while writing the data
227 */
228 public void write(M message) throws IOException {
229 synchronized (this) {
230 append(message);
231 flushUnlessAlreadyPlanningTo();
232 }
233 }
234
235 // Appends the specified message into the internal buffer, growing the
236 // buffer if required.
237 private void append(M message) {
238 // If the buffer does not have sufficient length double it.
239 while (outbound.remaining() < message.length()) {
240 doubleSize();
241 }
toma7083182014-09-25 21:38:03 -0700242 write(message, outbound);
tom2d6d3972014-09-25 22:38:57 -0700243 messagesOut.add(1);
244 bytesOut.add(message.length());
toma7083182014-09-25 21:38:03 -0700245 }
246
247 // Forces a flush, unless one is planned already.
248 private void flushUnlessAlreadyPlanningTo() throws IOException {
249 if (!writeOccurred && !writePending) {
250 flush();
251 }
252 }
253
254 /**
255 * Flushes any pending writes.
256 *
257 * @throws IOException if flush failed
258 */
259 public void flush() throws IOException {
260 synchronized (this) {
261 if (!writeOccurred && !writePending) {
262 outbound.flip();
263 try {
264 channel.write(outbound);
265 } catch (IOException e) {
tom5a8779c2014-09-29 14:48:43 -0700266 if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
toma7083182014-09-25 21:38:03 -0700267 log.warn("Unable to write data", e);
268 ioError = e;
269 }
270 }
271 lastActiveTime = currentTimeMillis();
272 writeOccurred = true;
273 writePending = outbound.hasRemaining();
274 outbound.compact();
275 }
276 }
277 }
278
279 /**
280 * Indicates whether the stream has bytes to be written to the channel.
281 *
282 * @return true if there are bytes to be written
283 */
284 boolean isWritePending() {
285 synchronized (this) {
286 return writePending;
287 }
288 }
289
tom2d6d3972014-09-25 22:38:57 -0700290
291 /**
292 * Indicates whether data has been written but not flushed yet.
293 *
294 * @return true if flush is required
295 */
296 boolean isFlushRequired() {
297 synchronized (this) {
298 return outbound.position() > 0;
299 }
300 }
301
toma7083182014-09-25 21:38:03 -0700302 /**
303 * Attempts to flush data, internal stream state and channel availability
304 * permitting. Invoked by the driver I/O loop during handling of writable
305 * selection key.
306 * <p/>
307 * Resets the internal state flags {@code writeOccurred} and
308 * {@code writePending}.
309 *
310 * @throws IOException if implicit flush failed
311 */
312 void flushIfPossible() throws IOException {
313 synchronized (this) {
314 writePending = false;
315 writeOccurred = false;
316 if (outbound.position() > 0) {
317 flush();
318 }
319 }
320 key.interestOps(SelectionKey.OP_READ);
321 }
322
323 /**
324 * Attempts to flush data, internal stream state and channel availability
325 * permitting and if other writes are not pending. Invoked by the driver
326 * I/O loop prior to entering select wait. Resets the internal
327 * {@code writeOccurred} state flag.
328 *
329 * @throws IOException if implicit flush failed
330 */
331 void flushIfWriteNotPending() throws IOException {
332 synchronized (this) {
333 writeOccurred = false;
334 if (!writePending && outbound.position() > 0) {
335 flush();
336 }
337 }
338 if (isWritePending()) {
339 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
340 }
341 }
342
343 /**
344 * Doubles the size of the outbound buffer.
345 */
346 private void doubleSize() {
347 ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
348 outbound.flip();
349 newBuffer.put(outbound);
350 outbound = newBuffer;
351 }
352
353 /**
354 * Returns the maximum number of milliseconds the stream is allowed
355 * without any read/write operations.
356 *
357 * @return number if millis of permissible idle time
358 */
359 protected int maxIdleMillis() {
360 return maxIdleMillis;
361 }
362
363
364 /**
365 * Returns true if the given stream has gone stale.
366 *
367 * @return true if the stream is stale
368 */
369 boolean isStale() {
370 return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
371 }
372
tom2d6d3972014-09-25 22:38:57 -0700373 /**
374 * Returns the inbound bytes counter.
375 *
376 * @return inbound bytes counter
377 */
378 public Counter bytesIn() {
379 return bytesIn;
380 }
381
382 /**
383 * Returns the outbound bytes counter.
384 *
385 * @return outbound bytes counter
386 */
387 public Counter bytesOut() {
388 return bytesOut;
389 }
390
391 /**
392 * Returns the inbound messages counter.
393 *
394 * @return inbound messages counter
395 */
396 public Counter messagesIn() {
397 return messagesIn;
398 }
399
400 /**
401 * Returns the outbound messages counter.
402 *
403 * @return outbound messages counter
404 */
405 public Counter messagesOut() {
406 return messagesOut;
407 }
408
toma7083182014-09-25 21:38:03 -0700409}