blob: b2acca4bd21903dbe4120a589a65b5fe96b16352 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
3import org.slf4j.Logger;
4import org.slf4j.LoggerFactory;
5
6import java.io.IOException;
7import java.nio.ByteBuffer;
8import java.nio.channels.ByteChannel;
9import java.nio.channels.SelectionKey;
10import java.util.ArrayList;
11import java.util.List;
12
13import static com.google.common.base.Preconditions.checkArgument;
14import static com.google.common.base.Preconditions.checkNotNull;
15import static java.lang.System.currentTimeMillis;
16import static java.nio.ByteBuffer.allocateDirect;
17
18/**
19 * Bi-directional message stream for transferring messages to & from the
20 * network via two byte buffers.
21 *
22 * @param <M> message type
23 */
24public abstract class MessageStream<M extends Message> {
25
26 protected Logger log = LoggerFactory.getLogger(getClass());
27
28 private final IOLoop<M, ?> loop;
29 private final ByteChannel channel;
30 private final int maxIdleMillis;
31
32 private final ByteBuffer inbound;
33 private ByteBuffer outbound;
34 private SelectionKey key;
35
36 private volatile boolean closed = false;
37 private volatile boolean writePending;
38 private volatile boolean writeOccurred;
39
40 private Exception ioError;
41 private long lastActiveTime;
42
43
44 /**
45 * Creates a message stream associated with the specified IO loop and
46 * backed by the given byte channel.
47 *
48 * @param loop IO loop
49 * @param byteChannel backing byte channel
50 * @param bufferSize size of the backing byte buffers
51 * @param maxIdleMillis maximum number of millis the stream can be idle
52 * before it will be closed
53 */
54 protected MessageStream(IOLoop<M, ?> loop, ByteChannel byteChannel,
55 int bufferSize, int maxIdleMillis) {
56 this.loop = checkNotNull(loop, "Loop cannot be null");
57 this.channel = checkNotNull(byteChannel, "Byte channel cannot be null");
58
59 checkArgument(maxIdleMillis > 0, "Idle time must be positive");
60 this.maxIdleMillis = maxIdleMillis;
61
62 inbound = allocateDirect(bufferSize);
63 outbound = allocateDirect(bufferSize);
64 }
65
66 /**
67 * Gets a single message from the specified byte buffer; this is
68 * to be done without manipulating the buffer via flip, reset or clear.
69 *
70 * @param buffer byte buffer
71 * @return read message or null if there are not enough bytes to read
72 * a complete message
73 */
74 protected abstract M read(ByteBuffer buffer);
75
76 /**
77 * Puts the specified message into the specified byte buffer; this is
78 * to be done without manipulating the buffer via flip, reset or clear.
79 *
80 * @param message message to be write into the buffer
81 * @param buffer byte buffer
82 */
83 protected abstract void write(M message, ByteBuffer buffer);
84
85 /**
86 * Closes the message buffer.
87 */
88 public void close() {
89 synchronized (this) {
90 if (closed) {
91 return;
92 }
93 closed = true;
94 }
95
96 loop.removeStream(this);
97 if (key != null) {
98 try {
99 key.cancel();
100 key.channel().close();
101 } catch (IOException e) {
102 log.warn("Unable to close stream", e);
103 }
104 }
105 }
106
107 /**
108 * Indicates whether this buffer has been closed.
109 *
110 * @return true if this stream has been closed
111 */
112 public synchronized boolean isClosed() {
113 return closed;
114 }
115
116 /**
117 * Returns the stream IO selection key.
118 *
119 * @return socket channel registration selection key
120 */
121 public SelectionKey key() {
122 return key;
123 }
124
125 /**
126 * Binds the selection key to be used for driving IO operations on the stream.
127 *
128 * @param key IO selection key
129 */
130 public void setKey(SelectionKey key) {
131 this.key = key;
132 this.lastActiveTime = currentTimeMillis();
133 }
134
135 /**
136 * Returns the IO loop to which this stream is bound.
137 *
138 * @return I/O loop used to drive this stream
139 */
140 public IOLoop<M, ?> loop() {
141 return loop;
142 }
143
144 /**
145 * Indicates whether the any prior IO encountered an error.
146 *
147 * @return true if a write failed
148 */
149 public boolean hadError() {
150 return ioError != null;
151 }
152
153 /**
154 * Gets the prior IO error, if one occurred.
155 *
156 * @return IO error; null if none occurred
157 */
158 public Exception getError() {
159 return ioError;
160 }
161
162 /**
163 * Reads, withouth blocking, a list of messages from the stream.
164 * The list will be empty if there were not messages pending.
165 *
166 * @return list of messages or null if backing channel has been closed
167 * @throws IOException if messages could not be read
168 */
169 public List<M> read() throws IOException {
170 try {
171 int read = channel.read(inbound);
172 if (read != -1) {
173 // Read the messages one-by-one and add them to the list.
174 List<M> messages = new ArrayList<>();
175 M message;
176 inbound.flip();
177 while ((message = read(inbound)) != null) {
178 messages.add(message);
179 }
180 inbound.compact();
181
182 // Mark the stream with current time to indicate liveness.
183 lastActiveTime = currentTimeMillis();
184 return messages;
185 }
186 return null;
187
188 } catch (Exception e) {
189 throw new IOException("Unable to read messages", e);
190 }
191 }
192
193 /**
194 * Writes the specified list of messages to the stream.
195 *
196 * @param messages list of messages to write
197 * @throws IOException if error occurred while writing the data
198 */
199 public void write(List<M> messages) throws IOException {
200 synchronized (this) {
201 // First write all messages.
202 for (M m : messages) {
203 append(m);
204 }
205 flushUnlessAlreadyPlanningTo();
206 }
207 }
208
209 /**
210 * Writes the given message to the stream.
211 *
212 * @param message message to write
213 * @throws IOException if error occurred while writing the data
214 */
215 public void write(M message) throws IOException {
216 synchronized (this) {
217 append(message);
218 flushUnlessAlreadyPlanningTo();
219 }
220 }
221
222 // Appends the specified message into the internal buffer, growing the
223 // buffer if required.
224 private void append(M message) {
225 // If the buffer does not have sufficient length double it.
226 while (outbound.remaining() < message.length()) {
227 doubleSize();
228 }
229 // Place the message into the buffer and bump the output trackers.
230 write(message, outbound);
231 }
232
233 // Forces a flush, unless one is planned already.
234 private void flushUnlessAlreadyPlanningTo() throws IOException {
235 if (!writeOccurred && !writePending) {
236 flush();
237 }
238 }
239
240 /**
241 * Flushes any pending writes.
242 *
243 * @throws IOException if flush failed
244 */
245 public void flush() throws IOException {
246 synchronized (this) {
247 if (!writeOccurred && !writePending) {
248 outbound.flip();
249 try {
250 channel.write(outbound);
251 } catch (IOException e) {
252 if (!closed && !e.getMessage().equals("Broken pipe")) {
253 log.warn("Unable to write data", e);
254 ioError = e;
255 }
256 }
257 lastActiveTime = currentTimeMillis();
258 writeOccurred = true;
259 writePending = outbound.hasRemaining();
260 outbound.compact();
261 }
262 }
263 }
264
265 /**
266 * Indicates whether the stream has bytes to be written to the channel.
267 *
268 * @return true if there are bytes to be written
269 */
270 boolean isWritePending() {
271 synchronized (this) {
272 return writePending;
273 }
274 }
275
276 /**
277 * Attempts to flush data, internal stream state and channel availability
278 * permitting. Invoked by the driver I/O loop during handling of writable
279 * selection key.
280 * <p/>
281 * Resets the internal state flags {@code writeOccurred} and
282 * {@code writePending}.
283 *
284 * @throws IOException if implicit flush failed
285 */
286 void flushIfPossible() throws IOException {
287 synchronized (this) {
288 writePending = false;
289 writeOccurred = false;
290 if (outbound.position() > 0) {
291 flush();
292 }
293 }
294 key.interestOps(SelectionKey.OP_READ);
295 }
296
297 /**
298 * Attempts to flush data, internal stream state and channel availability
299 * permitting and if other writes are not pending. Invoked by the driver
300 * I/O loop prior to entering select wait. Resets the internal
301 * {@code writeOccurred} state flag.
302 *
303 * @throws IOException if implicit flush failed
304 */
305 void flushIfWriteNotPending() throws IOException {
306 synchronized (this) {
307 writeOccurred = false;
308 if (!writePending && outbound.position() > 0) {
309 flush();
310 }
311 }
312 if (isWritePending()) {
313 key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
314 }
315 }
316
317 /**
318 * Doubles the size of the outbound buffer.
319 */
320 private void doubleSize() {
321 ByteBuffer newBuffer = allocateDirect(outbound.capacity() * 2);
322 outbound.flip();
323 newBuffer.put(outbound);
324 outbound = newBuffer;
325 }
326
327 /**
328 * Returns the maximum number of milliseconds the stream is allowed
329 * without any read/write operations.
330 *
331 * @return number if millis of permissible idle time
332 */
333 protected int maxIdleMillis() {
334 return maxIdleMillis;
335 }
336
337
338 /**
339 * Returns true if the given stream has gone stale.
340 *
341 * @return true if the stream is stale
342 */
343 boolean isStale() {
344 return currentTimeMillis() - lastActiveTime > maxIdleMillis() && key != null;
345 }
346
347}