blob: 7ced81fe234d55ec1996c12465e4a7eac32429a7 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
toma7083182014-09-25 21:38:03 -070019package org.onlab.nio;
20
21import org.junit.After;
22import org.junit.Before;
23import org.junit.Test;
24
25import java.io.IOException;
26import java.nio.ByteBuffer;
27import java.nio.channels.ByteChannel;
28import java.nio.channels.ClosedChannelException;
29import java.nio.channels.SelectableChannel;
30import java.nio.channels.SelectionKey;
31import java.nio.channels.Selector;
32import java.nio.channels.spi.SelectorProvider;
33import java.util.ArrayList;
34import java.util.List;
35
36import static org.junit.Assert.assertEquals;
37import static org.junit.Assert.assertNull;
38
39/**
40 * Tests of the message message stream implementation.
41 */
42public class MessageStreamTest {
43
tom1ae3d162014-09-26 09:38:16 -070044 private static final int SIZE = 64;
toma7083182014-09-25 21:38:03 -070045 private static final int BIG_SIZE = 32 * 1024;
tom1ae3d162014-09-26 09:38:16 -070046
47 private TestMessage message;
toma7083182014-09-25 21:38:03 -070048
tom2d6d3972014-09-25 22:38:57 -070049 private TestIOLoop loop;
toma7083182014-09-25 21:38:03 -070050 private TestByteChannel channel;
tom2d6d3972014-09-25 22:38:57 -070051 private TestMessageStream stream;
toma7083182014-09-25 21:38:03 -070052 private TestKey key;
53
54 @Before
55 public void setUp() throws IOException {
tom2d6d3972014-09-25 22:38:57 -070056 loop = new TestIOLoop();
toma7083182014-09-25 21:38:03 -070057 channel = new TestByteChannel();
58 key = new TestKey(channel);
tom2d6d3972014-09-25 22:38:57 -070059 stream = loop.createStream(channel);
60 stream.setKey(key);
tom1ae3d162014-09-26 09:38:16 -070061 stream.setNonStrict();
62 message = new TestMessage(SIZE, 0, 0, stream.padding());
toma7083182014-09-25 21:38:03 -070063 }
64
65 @After
66 public void tearDown() {
67 loop.shutdown();
tom2d6d3972014-09-25 22:38:57 -070068 stream.close();
toma7083182014-09-25 21:38:03 -070069 }
70
tom2d6d3972014-09-25 22:38:57 -070071 // Validates the state of the message stream
72 private void validate(boolean wp, boolean fr, int read, int written) {
73 assertEquals(wp, stream.isWritePending());
74 assertEquals(fr, stream.isFlushRequired());
toma7083182014-09-25 21:38:03 -070075 assertEquals(read, channel.readBytes);
76 assertEquals(written, channel.writtenBytes);
77 }
78
79 @Test
80 public void endOfStream() throws IOException {
81 channel.close();
tom2d6d3972014-09-25 22:38:57 -070082 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -070083 assertNull(messages);
84 }
85
86 @Test
87 public void bufferGrowth() throws IOException {
tom2d6d3972014-09-25 22:38:57 -070088 // Create a stream for big messages and test the growth.
89 stream = new TestMessageStream(BIG_SIZE, channel, loop);
tom1ae3d162014-09-26 09:38:16 -070090 TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding());
91
92 stream.write(bigMessage);
93 stream.write(bigMessage);
94 stream.write(bigMessage);
95 stream.write(bigMessage);
96 stream.write(bigMessage);
toma7083182014-09-25 21:38:03 -070097 }
98
99 @Test
100 public void discardBeforeKey() {
tom2d6d3972014-09-25 22:38:57 -0700101 // Create a stream that does not yet have the key set and discard it.
102 stream = loop.createStream(channel);
103 assertNull(stream.key());
104 stream.close();
toma7083182014-09-25 21:38:03 -0700105 // There is not key, so nothing to check; we just expect no problem.
106 }
107
108 @Test
109 public void bufferedRead() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700110 channel.bytesToRead = SIZE + 4;
111 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -0700112 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700113 validate(false, false, SIZE + 4, 0);
toma7083182014-09-25 21:38:03 -0700114
tom2d6d3972014-09-25 22:38:57 -0700115 channel.bytesToRead = SIZE - 4;
116 messages = stream.read();
toma7083182014-09-25 21:38:03 -0700117 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700118 validate(false, false, SIZE * 2, 0);
toma7083182014-09-25 21:38:03 -0700119 }
120
121 @Test
122 public void bufferedWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700123 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700124
125 // First write is immediate...
tom1ae3d162014-09-26 09:38:16 -0700126 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700127 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700128
129 // Second and third get buffered...
tom1ae3d162014-09-26 09:38:16 -0700130 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700131 validate(false, true, 0, SIZE);
tom1ae3d162014-09-26 09:38:16 -0700132 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700133 validate(false, true, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700134
135 // Reset write, which will flush if needed; the next write is again buffered
tom2d6d3972014-09-25 22:38:57 -0700136 stream.flushIfWriteNotPending();
137 validate(false, false, 0, SIZE * 3);
tom1ae3d162014-09-26 09:38:16 -0700138 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700139 validate(false, true, 0, SIZE * 3);
toma7083182014-09-25 21:38:03 -0700140
141 // Select reset, which will flush if needed; the next write is again buffered
tom2d6d3972014-09-25 22:38:57 -0700142 stream.flushIfPossible();
143 validate(false, false, 0, SIZE * 4);
tom1ae3d162014-09-26 09:38:16 -0700144 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700145 validate(false, true, 0, SIZE * 4);
146 stream.flush();
147 validate(false, true, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700148 }
149
150 @Test
151 public void bufferedWriteList() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700152 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700153
154 // First write is immediate...
tom2d6d3972014-09-25 22:38:57 -0700155 List<TestMessage> messages = new ArrayList<>();
tom1ae3d162014-09-26 09:38:16 -0700156 messages.add(message);
157 messages.add(message);
158 messages.add(message);
159 messages.add(message);
toma7083182014-09-25 21:38:03 -0700160
tom2d6d3972014-09-25 22:38:57 -0700161 stream.write(messages);
162 validate(false, false, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700163
tom2d6d3972014-09-25 22:38:57 -0700164 stream.write(messages);
165 validate(false, true, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700166
tom2d6d3972014-09-25 22:38:57 -0700167 stream.flushIfPossible();
168 validate(false, false, 0, SIZE * 8);
toma7083182014-09-25 21:38:03 -0700169 }
170
171 @Test
172 public void bufferedPartialWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700173 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700174
175 // First write is immediate...
tom1ae3d162014-09-26 09:38:16 -0700176 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700177 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700178
179 // Tell test channel to accept only half.
tom2d6d3972014-09-25 22:38:57 -0700180 channel.bytesToWrite = SIZE / 2;
toma7083182014-09-25 21:38:03 -0700181
182 // Second and third get buffered...
tom1ae3d162014-09-26 09:38:16 -0700183 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700184 validate(false, true, 0, SIZE);
185 stream.flushIfPossible();
186 validate(true, true, 0, SIZE + SIZE / 2);
toma7083182014-09-25 21:38:03 -0700187 }
188
189 @Test
190 public void bufferedPartialWrite2() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700191 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700192
193 // First write is immediate...
tom1ae3d162014-09-26 09:38:16 -0700194 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700195 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700196
197 // Tell test channel to accept only half.
tom2d6d3972014-09-25 22:38:57 -0700198 channel.bytesToWrite = SIZE / 2;
toma7083182014-09-25 21:38:03 -0700199
200 // Second and third get buffered...
tom1ae3d162014-09-26 09:38:16 -0700201 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700202 validate(false, true, 0, SIZE);
203 stream.flushIfWriteNotPending();
204 validate(true, true, 0, SIZE + SIZE / 2);
toma7083182014-09-25 21:38:03 -0700205 }
206
207 @Test
208 public void bufferedReadWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700209 channel.bytesToRead = SIZE + 4;
210 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -0700211 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700212 validate(false, false, SIZE + 4, 0);
toma7083182014-09-25 21:38:03 -0700213
tom1ae3d162014-09-26 09:38:16 -0700214 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700215 validate(false, false, SIZE + 4, SIZE);
toma7083182014-09-25 21:38:03 -0700216
tom2d6d3972014-09-25 22:38:57 -0700217 channel.bytesToRead = SIZE - 4;
218 messages = stream.read();
toma7083182014-09-25 21:38:03 -0700219 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700220 validate(false, false, SIZE * 2, SIZE);
toma7083182014-09-25 21:38:03 -0700221 }
222
223 // Fake IO driver loop
tom2d6d3972014-09-25 22:38:57 -0700224 private static class TestIOLoop extends IOLoop<TestMessage, TestMessageStream> {
toma7083182014-09-25 21:38:03 -0700225
tom2d6d3972014-09-25 22:38:57 -0700226 public TestIOLoop() throws IOException {
toma7083182014-09-25 21:38:03 -0700227 super(500);
228 }
229
230 @Override
231 protected TestMessageStream createStream(ByteChannel channel) {
tom2d6d3972014-09-25 22:38:57 -0700232 return new TestMessageStream(SIZE, channel, this);
toma7083182014-09-25 21:38:03 -0700233 }
234
235 @Override
236 protected void processMessages(List<TestMessage> messages,
237 MessageStream<TestMessage> stream) {
238 }
239
240 }
241
242 // Byte channel test fixture
243 private static class TestByteChannel extends SelectableChannel implements ByteChannel {
244
245 private static final int BUFFER_LENGTH = 1024;
246 byte[] bytes = new byte[BUFFER_LENGTH];
247 int bytesToWrite = BUFFER_LENGTH;
248 int bytesToRead = BUFFER_LENGTH;
249 int writtenBytes = 0;
250 int readBytes = 0;
251
252 @Override
253 public int read(ByteBuffer dst) throws IOException {
254 int l = Math.min(dst.remaining(), bytesToRead);
255 if (bytesToRead > 0) {
256 readBytes += l;
257 dst.put(bytes, 0, l);
258 }
259 return l;
260 }
261
262 @Override
263 public int write(ByteBuffer src) throws IOException {
264 int l = Math.min(src.remaining(), bytesToWrite);
265 writtenBytes += l;
266 src.get(bytes, 0, l);
267 return l;
268 }
269
270 @Override
271 public Object blockingLock() {
272 return null;
273 }
274
275 @Override
276 public SelectableChannel configureBlocking(boolean arg0) throws IOException {
277 return null;
278 }
279
280 @Override
281 public boolean isBlocking() {
282 return false;
283 }
284
285 @Override
286 public boolean isRegistered() {
287 return false;
288 }
289
290 @Override
291 public SelectionKey keyFor(Selector arg0) {
292 return null;
293 }
294
295 @Override
296 public SelectorProvider provider() {
297 return null;
298 }
299
300 @Override
301 public SelectionKey register(Selector arg0, int arg1, Object arg2)
302 throws ClosedChannelException {
303 return null;
304 }
305
306 @Override
307 public int validOps() {
308 return 0;
309 }
310
311 @Override
312 protected void implCloseChannel() throws IOException {
313 bytesToRead = -1;
314 }
315
316 }
317
318 // Selection key text fixture
319 private static class TestKey extends SelectionKey {
320
321 private SelectableChannel channel;
322
323 public TestKey(TestByteChannel channel) {
324 this.channel = channel;
325 }
326
327 @Override
328 public void cancel() {
329 }
330
331 @Override
332 public SelectableChannel channel() {
333 return channel;
334 }
335
336 @Override
337 public int interestOps() {
338 return 0;
339 }
340
341 @Override
342 public SelectionKey interestOps(int ops) {
343 return null;
344 }
345
346 @Override
347 public boolean isValid() {
348 return true;
349 }
350
351 @Override
352 public int readyOps() {
353 return 0;
354 }
355
356 @Override
357 public Selector selector() {
358 return null;
359 }
360 }
361
362}