blob: 583d0ec5a6c32f0daf0e56cf318c7825570a66b0 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
3import org.junit.After;
4import org.junit.Before;
5import org.junit.Test;
6
7import java.io.IOException;
8import java.nio.ByteBuffer;
9import java.nio.channels.ByteChannel;
10import java.nio.channels.ClosedChannelException;
11import java.nio.channels.SelectableChannel;
12import java.nio.channels.SelectionKey;
13import java.nio.channels.Selector;
14import java.nio.channels.spi.SelectorProvider;
15import java.util.ArrayList;
16import java.util.List;
17
18import static org.junit.Assert.assertEquals;
19import static org.junit.Assert.assertNull;
20
21/**
22 * Tests of the message message stream implementation.
23 */
24public class MessageStreamTest {
25
tom2d6d3972014-09-25 22:38:57 -070026 private static final int SIZE = 16;
27 private static final TestMessage MESSAGE = new TestMessage(SIZE);
toma7083182014-09-25 21:38:03 -070028
29 private static final int BIG_SIZE = 32 * 1024;
30 private static final TestMessage BIG_MESSAGE = new TestMessage(BIG_SIZE);
31
tom2d6d3972014-09-25 22:38:57 -070032 private TestIOLoop loop;
toma7083182014-09-25 21:38:03 -070033 private TestByteChannel channel;
tom2d6d3972014-09-25 22:38:57 -070034 private TestMessageStream stream;
toma7083182014-09-25 21:38:03 -070035 private TestKey key;
36
37 @Before
38 public void setUp() throws IOException {
tom2d6d3972014-09-25 22:38:57 -070039 loop = new TestIOLoop();
toma7083182014-09-25 21:38:03 -070040 channel = new TestByteChannel();
41 key = new TestKey(channel);
tom2d6d3972014-09-25 22:38:57 -070042 stream = loop.createStream(channel);
43 stream.setKey(key);
toma7083182014-09-25 21:38:03 -070044 }
45
46 @After
47 public void tearDown() {
48 loop.shutdown();
tom2d6d3972014-09-25 22:38:57 -070049 stream.close();
toma7083182014-09-25 21:38:03 -070050 }
51
tom2d6d3972014-09-25 22:38:57 -070052 // Validates the state of the message stream
53 private void validate(boolean wp, boolean fr, int read, int written) {
54 assertEquals(wp, stream.isWritePending());
55 assertEquals(fr, stream.isFlushRequired());
toma7083182014-09-25 21:38:03 -070056 assertEquals(read, channel.readBytes);
57 assertEquals(written, channel.writtenBytes);
58 }
59
60 @Test
61 public void endOfStream() throws IOException {
62 channel.close();
tom2d6d3972014-09-25 22:38:57 -070063 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -070064 assertNull(messages);
65 }
66
67 @Test
68 public void bufferGrowth() throws IOException {
tom2d6d3972014-09-25 22:38:57 -070069 // Create a stream for big messages and test the growth.
70 stream = new TestMessageStream(BIG_SIZE, channel, loop);
71 stream.write(BIG_MESSAGE);
72 stream.write(BIG_MESSAGE);
73 stream.write(BIG_MESSAGE);
74 stream.write(BIG_MESSAGE);
75 stream.write(BIG_MESSAGE);
toma7083182014-09-25 21:38:03 -070076 }
77
78 @Test
79 public void discardBeforeKey() {
tom2d6d3972014-09-25 22:38:57 -070080 // Create a stream that does not yet have the key set and discard it.
81 stream = loop.createStream(channel);
82 assertNull(stream.key());
83 stream.close();
toma7083182014-09-25 21:38:03 -070084 // There is not key, so nothing to check; we just expect no problem.
85 }
86
87 @Test
88 public void bufferedRead() throws IOException {
tom2d6d3972014-09-25 22:38:57 -070089 channel.bytesToRead = SIZE + 4;
90 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -070091 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -070092 validate(false, false, SIZE + 4, 0);
toma7083182014-09-25 21:38:03 -070093
tom2d6d3972014-09-25 22:38:57 -070094 channel.bytesToRead = SIZE - 4;
95 messages = stream.read();
toma7083182014-09-25 21:38:03 -070096 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -070097 validate(false, false, SIZE * 2, 0);
toma7083182014-09-25 21:38:03 -070098 }
99
100 @Test
101 public void bufferedWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700102 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700103
104 // First write is immediate...
tom2d6d3972014-09-25 22:38:57 -0700105 stream.write(MESSAGE);
106 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700107
108 // Second and third get buffered...
tom2d6d3972014-09-25 22:38:57 -0700109 stream.write(MESSAGE);
110 validate(false, true, 0, SIZE);
111 stream.write(MESSAGE);
112 validate(false, true, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700113
114 // Reset write, which will flush if needed; the next write is again buffered
tom2d6d3972014-09-25 22:38:57 -0700115 stream.flushIfWriteNotPending();
116 validate(false, false, 0, SIZE * 3);
117 stream.write(MESSAGE);
118 validate(false, true, 0, SIZE * 3);
toma7083182014-09-25 21:38:03 -0700119
120 // Select reset, which will flush if needed; the next write is again buffered
tom2d6d3972014-09-25 22:38:57 -0700121 stream.flushIfPossible();
122 validate(false, false, 0, SIZE * 4);
123 stream.write(MESSAGE);
124 validate(false, true, 0, SIZE * 4);
125 stream.flush();
126 validate(false, true, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700127 }
128
129 @Test
130 public void bufferedWriteList() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700131 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700132
133 // First write is immediate...
tom2d6d3972014-09-25 22:38:57 -0700134 List<TestMessage> messages = new ArrayList<>();
135 messages.add(MESSAGE);
136 messages.add(MESSAGE);
137 messages.add(MESSAGE);
138 messages.add(MESSAGE);
toma7083182014-09-25 21:38:03 -0700139
tom2d6d3972014-09-25 22:38:57 -0700140 stream.write(messages);
141 validate(false, false, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700142
tom2d6d3972014-09-25 22:38:57 -0700143 stream.write(messages);
144 validate(false, true, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700145
tom2d6d3972014-09-25 22:38:57 -0700146 stream.flushIfPossible();
147 validate(false, false, 0, SIZE * 8);
toma7083182014-09-25 21:38:03 -0700148 }
149
150 @Test
151 public void bufferedPartialWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700152 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700153
154 // First write is immediate...
tom2d6d3972014-09-25 22:38:57 -0700155 stream.write(MESSAGE);
156 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700157
158 // Tell test channel to accept only half.
tom2d6d3972014-09-25 22:38:57 -0700159 channel.bytesToWrite = SIZE / 2;
toma7083182014-09-25 21:38:03 -0700160
161 // Second and third get buffered...
tom2d6d3972014-09-25 22:38:57 -0700162 stream.write(MESSAGE);
163 validate(false, true, 0, SIZE);
164 stream.flushIfPossible();
165 validate(true, true, 0, SIZE + SIZE / 2);
toma7083182014-09-25 21:38:03 -0700166 }
167
168 @Test
169 public void bufferedPartialWrite2() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700170 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700171
172 // First write is immediate...
tom2d6d3972014-09-25 22:38:57 -0700173 stream.write(MESSAGE);
174 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700175
176 // Tell test channel to accept only half.
tom2d6d3972014-09-25 22:38:57 -0700177 channel.bytesToWrite = SIZE / 2;
toma7083182014-09-25 21:38:03 -0700178
179 // Second and third get buffered...
tom2d6d3972014-09-25 22:38:57 -0700180 stream.write(MESSAGE);
181 validate(false, true, 0, SIZE);
182 stream.flushIfWriteNotPending();
183 validate(true, true, 0, SIZE + SIZE / 2);
toma7083182014-09-25 21:38:03 -0700184 }
185
186 @Test
187 public void bufferedReadWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700188 channel.bytesToRead = SIZE + 4;
189 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -0700190 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700191 validate(false, false, SIZE + 4, 0);
toma7083182014-09-25 21:38:03 -0700192
tom2d6d3972014-09-25 22:38:57 -0700193 stream.write(MESSAGE);
194 validate(false, false, SIZE + 4, SIZE);
toma7083182014-09-25 21:38:03 -0700195
tom2d6d3972014-09-25 22:38:57 -0700196 channel.bytesToRead = SIZE - 4;
197 messages = stream.read();
toma7083182014-09-25 21:38:03 -0700198 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700199 validate(false, false, SIZE * 2, SIZE);
toma7083182014-09-25 21:38:03 -0700200 }
201
202 // Fake IO driver loop
tom2d6d3972014-09-25 22:38:57 -0700203 private static class TestIOLoop extends IOLoop<TestMessage, TestMessageStream> {
toma7083182014-09-25 21:38:03 -0700204
tom2d6d3972014-09-25 22:38:57 -0700205 public TestIOLoop() throws IOException {
toma7083182014-09-25 21:38:03 -0700206 super(500);
207 }
208
209 @Override
210 protected TestMessageStream createStream(ByteChannel channel) {
tom2d6d3972014-09-25 22:38:57 -0700211 return new TestMessageStream(SIZE, channel, this);
toma7083182014-09-25 21:38:03 -0700212 }
213
214 @Override
215 protected void processMessages(List<TestMessage> messages,
216 MessageStream<TestMessage> stream) {
217 }
218
219 }
220
221 // Byte channel test fixture
222 private static class TestByteChannel extends SelectableChannel implements ByteChannel {
223
224 private static final int BUFFER_LENGTH = 1024;
225 byte[] bytes = new byte[BUFFER_LENGTH];
226 int bytesToWrite = BUFFER_LENGTH;
227 int bytesToRead = BUFFER_LENGTH;
228 int writtenBytes = 0;
229 int readBytes = 0;
230
231 @Override
232 public int read(ByteBuffer dst) throws IOException {
233 int l = Math.min(dst.remaining(), bytesToRead);
234 if (bytesToRead > 0) {
235 readBytes += l;
236 dst.put(bytes, 0, l);
237 }
238 return l;
239 }
240
241 @Override
242 public int write(ByteBuffer src) throws IOException {
243 int l = Math.min(src.remaining(), bytesToWrite);
244 writtenBytes += l;
245 src.get(bytes, 0, l);
246 return l;
247 }
248
249 @Override
250 public Object blockingLock() {
251 return null;
252 }
253
254 @Override
255 public SelectableChannel configureBlocking(boolean arg0) throws IOException {
256 return null;
257 }
258
259 @Override
260 public boolean isBlocking() {
261 return false;
262 }
263
264 @Override
265 public boolean isRegistered() {
266 return false;
267 }
268
269 @Override
270 public SelectionKey keyFor(Selector arg0) {
271 return null;
272 }
273
274 @Override
275 public SelectorProvider provider() {
276 return null;
277 }
278
279 @Override
280 public SelectionKey register(Selector arg0, int arg1, Object arg2)
281 throws ClosedChannelException {
282 return null;
283 }
284
285 @Override
286 public int validOps() {
287 return 0;
288 }
289
290 @Override
291 protected void implCloseChannel() throws IOException {
292 bytesToRead = -1;
293 }
294
295 }
296
297 // Selection key text fixture
298 private static class TestKey extends SelectionKey {
299
300 private SelectableChannel channel;
301
302 public TestKey(TestByteChannel channel) {
303 this.channel = channel;
304 }
305
306 @Override
307 public void cancel() {
308 }
309
310 @Override
311 public SelectableChannel channel() {
312 return channel;
313 }
314
315 @Override
316 public int interestOps() {
317 return 0;
318 }
319
320 @Override
321 public SelectionKey interestOps(int ops) {
322 return null;
323 }
324
325 @Override
326 public boolean isValid() {
327 return true;
328 }
329
330 @Override
331 public int readyOps() {
332 return 0;
333 }
334
335 @Override
336 public Selector selector() {
337 return null;
338 }
339 }
340
341}