blob: 302b049603bc118dfd689f9480bbca3ed654ced1 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07002 * Copyright 2014 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
toma7083182014-09-25 21:38:03 -070016package org.onlab.nio;
17
tom2d6d3972014-09-25 22:38:57 -070018import org.onlab.util.Counter;
toma7083182014-09-25 21:38:03 -070019import org.slf4j.Logger;
20import org.slf4j.LoggerFactory;
21
22import java.io.IOException;
23import java.nio.ByteBuffer;
24import java.nio.channels.ByteChannel;
25import java.nio.channels.SelectionKey;
26import java.util.ArrayList;
27import java.util.List;
tom5a8779c2014-09-29 14:48:43 -070028import java.util.Objects;
toma7083182014-09-25 21:38:03 -070029
30import static com.google.common.base.Preconditions.checkArgument;
31import static com.google.common.base.Preconditions.checkNotNull;
32import static java.lang.System.currentTimeMillis;
33import static java.nio.ByteBuffer.allocateDirect;
34
35/**
36 * Bi-directional message stream for transferring messages to & from the
37 * network via two byte buffers.
38 *
39 * @param <M> message type
40 */
41public abstract class MessageStream<M extends Message> {
42
43 protected Logger log = LoggerFactory.getLogger(getClass());
44
45 private final IOLoop<M, ?> loop;
46 private final ByteChannel channel;
47 private final int maxIdleMillis;
48
49 private final ByteBuffer inbound;
50 private ByteBuffer outbound;
51 private SelectionKey key;
52
53 private volatile boolean closed = false;
54 private volatile boolean writePending;
55 private volatile boolean writeOccurred;
56
57 private Exception ioError;
58 private long lastActiveTime;
59
tom2d6d3972014-09-25 22:38:57 -070060 private final Counter bytesIn = new Counter();
61 private final Counter messagesIn = new Counter();
62 private final Counter bytesOut = new Counter();
63 private final Counter messagesOut = new Counter();
toma7083182014-09-25 21:38:03 -070064
65 /**
66 * Creates a message stream associated with the specified IO loop and
67 * backed by the given byte channel.
68 *
69 * @param loop IO loop
70 * @param byteChannel backing byte channel
71 * @param bufferSize size of the backing byte buffers
72 * @param maxIdleMillis maximum number of millis the stream can be idle
73 * before it will be closed
74 */
75 protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
76 int bufferSize, int maxIdleMillis) {
77 this.loop = checkNotNull(loop, "Loop cannot be null");
78 this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
79
80 checkArgument(maxIdleMillis > 0, "Idle time must be positive");
81 this.maxIdleMillis = maxIdleMillis;
82
83 inbound = allocateDirect(bufferSize);
84 outbound = allocateDirect(bufferSize);
85 }
86
87 /**
88 * Gets a single message from the specified byte buffer; this is
89 * to be done without manipulating the buffer via flip, reset or clear.
90 *
91 * @param buffer byte buffer
92 * @return read message or null if there are not enough bytes to read
93 * a complete message
94 */
95 protected abstract M read(ByteBuffer buffer);
96
97 /**
98 * Puts the specified message into the specified byte buffer; this is
99 * to be done without manipulating the buffer via flip, reset or clear.
100 *
101 * @param message message to be write into the buffer
102 * @param buffer byte buffer
103 */
104 protected abstract void write(M message, ByteBuffer buffer);
105
106 /**
107 * Closes the message buffer.
108 */
109 public void close() {
110 synchronized (this) {
111 if (closed) {
112 return;
113 }
114 closed = true;
115 }
116
tom2d6d3972014-09-25 22:38:57 -0700117 bytesIn.freeze();
118 bytesOut.freeze();
119 messagesIn.freeze();
120 messagesOut.freeze();
121
toma7083182014-09-25 21:38:03 -0700122 loop.removeStream(this);
123 if (key != null) {
124 try {
125 key.cancel();
126 key.channel().close();
127 } catch (IOException e) {
128 log.warn("Unable to close stream", e);
129 }
130 }
131 }
132
133 /**
134 * Indicates whether this buffer has been closed.
135 *
136 * @return true if this stream has been closed
137 */
138 public synchronized boolean isClosed() {
139 return closed;
140 }
141
142 /**
143 * Returns the stream IO selection key.
144 *
145 * @return socket channel registration selection key
146 */
147 public SelectionKey key() {
148 return key;
149 }
150
151 /**
152 * Binds the selection key to be used for driving IO operations on the stream.
153 *
154 * @param key IO selection key
155 */
156 public void setKey(SelectionKey key) {
157 this.key = key;
158 this.lastActiveTime = currentTimeMillis();
159 }
160
161 /**
162 * Returns the IO loop to which this stream is bound.
163 *
164 * @return I/O loop used to drive this stream
165 */
166 public IOLoop<M, ?> loop() {
167 return loop;
168 }
169
170 /**
171 * Indicates whether the any prior IO encountered an error.
172 *
173 * @return true if a write failed
174 */
175 public boolean hadError() {
176 return ioError != null;
177 }
178
179 /**
180 * Gets the prior IO error, if one occurred.
181 *
182 * @return IO error; null if none occurred
183 */
184 public Exception getError() {
185 return ioError;
186 }
187
188 /**
tom73094832014-09-29 13:47:08 -0700189 * Reads, without blocking, a list of messages from the stream.
toma7083182014-09-25 21:38:03 -0700190 * The list will be empty if there were not messages pending.
191 *
192 * @return list of messages or null if backing channel has been closed
193 * @throws IOException if messages could not be read
194 */
195 public List<M> read() throws IOException {
196 try {
197 int read = channel.read(inbound);
198 if (read != -1) {
199 // Read the messages one-by-one and add them to the list.
200 List<M> messages = new ArrayList<>();
201 M message;
202 inbound.flip();
203 while ((message = read(inbound)) != null) {
204 messages.add(message);
tom2d6d3972014-09-25 22:38:57 -0700205 messagesIn.add(1);
206 bytesIn.add(message.length());
toma7083182014-09-25 21:38:03 -0700207 }
208 inbound.compact();
209
210 // Mark the stream with current time to indicate liveness.
211 lastActiveTime = currentTimeMillis();
212 return messages;
213 }
214 return null;
215
216 } catch (Exception e) {
217 throw new IOException("Unable to read messages", e);
218 }
219 }
220
221 /**
222 * Writes the specified list of messages to the stream.
223 *
224 * @param messages list of messages to write
225 * @throws IOException if error occurred while writing the data
226 */
227 public void write(List<M> messages) throws IOException {
228 synchronized (this) {
229 // First write all messages.
230 for (M m : messages) {
231 append(m);
232 }
233 flushUnlessAlreadyPlanningTo();
234 }
235 }
236
237 /**
238 * Writes the given message to the stream.
239 *
240 * @param message message to write
241 * @throws IOException if error occurred while writing the data
242 */
243 public void write(M message) throws IOException {
244 synchronized (this) {
245 append(message);
246 flushUnlessAlreadyPlanningTo();
247 }
248 }
249
250 // Appends the specified message into the internal buffer, growing the
251 // buffer if required.
252 private void append(M message) {
253 // If the buffer does not have sufficient length double it.
254 while (outbound.remaining() < message.length()) {
255 doubleSize();
256 }
toma7083182014-09-25 21:38:03 -0700257 write(message, outbound);
tom2d6d3972014-09-25 22:38:57 -0700258 messagesOut.add(1);
259 bytesOut.add(message.length());
toma7083182014-09-25 21:38:03 -0700260 }
261
262 // Forces a flush, unless one is planned already.
263 private void flushUnlessAlreadyPlanningTo() throws IOException {
264 if (!writeOccurred && !writePending) {
265 flush();
266 }
267 }
268
269 /**
270 * Flushes any pending writes.
271 *
272 * @throws IOException if flush failed
273 */
274 public void flush() throws IOException {
275 synchronized (this) {
276 if (!writeOccurred && !writePending) {
277 outbound.flip();
278 try {
279 channel.write(outbound);
280 } catch (IOException e) {
tom5a8779c2014-09-29 14:48:43 -0700281 if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
toma7083182014-09-25 21:38:03 -0700282 log.warn("Unable to write data", e);
283 ioError = e;
284 }
285 }
286 lastActiveTime = currentTimeMillis();
287 writeOccurred = true;
288 writePending = outbound.hasRemaining();
289 outbound.compact();
290 }
291 }
292 }
293
294 /**
295 * Indicates whether the stream has bytes to be written to the channel.
296 *
297 * @return true if there are bytes to be written
298 */
299 boolean isWritePending() {
300 synchronized (this) {
301 return writePending;
302 }
303 }
304
tom2d6d3972014-09-25 22:38:57 -0700305
306 /**
307 * Indicates whether data has been written but not flushed yet.
308 *
309 * @return true if flush is required
310 */
311 boolean isFlushRequired() {
312 synchronized (this) {
313 return outbound.position() > 0;
314 }
315 }
316
toma7083182014-09-25 21:38:03 -0700317 /**
318 * Attempts to flush data, internal stream state and channel availability
319 * permitting. Invoked by the driver I/O loop during handling of writable
320 * selection key.
321 * <p/>
322 * Resets the internal state flags {@code writeOccurred} and
323 * {@code writePending}.
324 *
325 * @throws IOException if implicit flush failed
326 */
327 void flushIfPossible() throws IOException {
328 synchronized (this) {
329 writePending = false;
330 writeOccurred = false;
331 if (outbound.position() > 0) {
332 flush();
333 }
334 }
335 key.interestOps(SelectionKey.OP_READ);
336 }
337
338 /**
339 * Attempts to flush data, internal stream state and channel availability
340 * permitting and if other writes are not pending. Invoked by the driver
341 * I/O loop prior to entering select wait. Resets the internal
342 * {@code writeOccurred} state flag.
343 *
344 * @throws IOException if implicit flush failed
345 */
346 void flushIfWriteNotPending() throws IOException {
347 synchronized (this) {
348 writeOccurred = false;
349 if (!writePending && outbound.position() > 0) {
350 flush();
351 }
352 }
353 if (isWritePending()) {
354 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
355 }
356 }
357
358 /**
359 * Doubles the size of the outbound buffer.
360 */
361 private void doubleSize() {
362 ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
363 outbound.flip();
364 newBuffer.put(outbound);
365 outbound = newBuffer;
366 }
367
368 /**
369 * Returns the maximum number of milliseconds the stream is allowed
370 * without any read/write operations.
371 *
372 * @return number if millis of permissible idle time
373 */
374 protected int maxIdleMillis() {
375 return maxIdleMillis;
376 }
377
378
379 /**
380 * Returns true if the given stream has gone stale.
381 *
382 * @return true if the stream is stale
383 */
384 boolean isStale() {
385 return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
386 }
387
tom2d6d3972014-09-25 22:38:57 -0700388 /**
389 * Returns the inbound bytes counter.
390 *
391 * @return inbound bytes counter
392 */
393 public Counter bytesIn() {
394 return bytesIn;
395 }
396
397 /**
398 * Returns the outbound bytes counter.
399 *
400 * @return outbound bytes counter
401 */
402 public Counter bytesOut() {
403 return bytesOut;
404 }
405
406 /**
407 * Returns the inbound messages counter.
408 *
409 * @return inbound messages counter
410 */
411 public Counter messagesIn() {
412 return messagesIn;
413 }
414
415 /**
416 * Returns the outbound messages counter.
417 *
418 * @return outbound messages counter
419 */
420 public Counter messagesOut() {
421 return messagesOut;
422 }
423
toma7083182014-09-25 21:38:03 -0700424}