blob: 9965d3895d2121db047f85251d47cee7d93213a6 [file] [log] [blame]
Thomas Vachuska24c849c2014-10-27 09:53:05 -07001/*
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07002 * Copyright 2014 Open Networking Laboratory
Thomas Vachuska24c849c2014-10-27 09:53:05 -07003 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07004 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
Thomas Vachuska24c849c2014-10-27 09:53:05 -07007 *
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07008 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
Thomas Vachuska24c849c2014-10-27 09:53:05 -070015 */
toma7083182014-09-25 21:38:03 -070016package org.onlab.nio;
17
18import org.junit.After;
19import org.junit.Before;
20import org.junit.Test;
21
22import java.io.IOException;
23import java.nio.ByteBuffer;
24import java.nio.channels.ByteChannel;
25import java.nio.channels.ClosedChannelException;
26import java.nio.channels.SelectableChannel;
27import java.nio.channels.SelectionKey;
28import java.nio.channels.Selector;
29import java.nio.channels.spi.SelectorProvider;
30import java.util.ArrayList;
31import java.util.List;
32
33import static org.junit.Assert.assertEquals;
34import static org.junit.Assert.assertNull;
35
36/**
37 * Tests of the message message stream implementation.
38 */
39public class MessageStreamTest {
40
tom1ae3d162014-09-26 09:38:16 -070041 private static final int SIZE = 64;
toma7083182014-09-25 21:38:03 -070042 private static final int BIG_SIZE = 32 * 1024;
tom1ae3d162014-09-26 09:38:16 -070043
44 private TestMessage message;
toma7083182014-09-25 21:38:03 -070045
tom2d6d3972014-09-25 22:38:57 -070046 private TestIOLoop loop;
toma7083182014-09-25 21:38:03 -070047 private TestByteChannel channel;
tom2d6d3972014-09-25 22:38:57 -070048 private TestMessageStream stream;
toma7083182014-09-25 21:38:03 -070049 private TestKey key;
50
51 @Before
52 public void setUp() throws IOException {
tom2d6d3972014-09-25 22:38:57 -070053 loop = new TestIOLoop();
toma7083182014-09-25 21:38:03 -070054 channel = new TestByteChannel();
55 key = new TestKey(channel);
tom2d6d3972014-09-25 22:38:57 -070056 stream = loop.createStream(channel);
57 stream.setKey(key);
tom1ae3d162014-09-26 09:38:16 -070058 stream.setNonStrict();
59 message = new TestMessage(SIZE, 0, 0, stream.padding());
toma7083182014-09-25 21:38:03 -070060 }
61
62 @After
63 public void tearDown() {
64 loop.shutdown();
tom2d6d3972014-09-25 22:38:57 -070065 stream.close();
toma7083182014-09-25 21:38:03 -070066 }
67
tom2d6d3972014-09-25 22:38:57 -070068 // Validates the state of the message stream
69 private void validate(boolean wp, boolean fr, int read, int written) {
70 assertEquals(wp, stream.isWritePending());
71 assertEquals(fr, stream.isFlushRequired());
toma7083182014-09-25 21:38:03 -070072 assertEquals(read, channel.readBytes);
73 assertEquals(written, channel.writtenBytes);
74 }
75
76 @Test
77 public void endOfStream() throws IOException {
78 channel.close();
tom2d6d3972014-09-25 22:38:57 -070079 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -070080 assertNull(messages);
81 }
82
83 @Test
84 public void bufferGrowth() throws IOException {
tom2d6d3972014-09-25 22:38:57 -070085 // Create a stream for big messages and test the growth.
86 stream = new TestMessageStream(BIG_SIZE, channel, loop);
tom1ae3d162014-09-26 09:38:16 -070087 TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding());
88
89 stream.write(bigMessage);
90 stream.write(bigMessage);
91 stream.write(bigMessage);
92 stream.write(bigMessage);
93 stream.write(bigMessage);
toma7083182014-09-25 21:38:03 -070094 }
95
96 @Test
97 public void discardBeforeKey() {
tom2d6d3972014-09-25 22:38:57 -070098 // Create a stream that does not yet have the key set and discard it.
99 stream = loop.createStream(channel);
100 assertNull(stream.key());
101 stream.close();
toma7083182014-09-25 21:38:03 -0700102 // There is not key, so nothing to check; we just expect no problem.
103 }
104
105 @Test
106 public void bufferedRead() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700107 channel.bytesToRead = SIZE + 4;
108 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -0700109 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700110 validate(false, false, SIZE + 4, 0);
toma7083182014-09-25 21:38:03 -0700111
tom2d6d3972014-09-25 22:38:57 -0700112 channel.bytesToRead = SIZE - 4;
113 messages = stream.read();
toma7083182014-09-25 21:38:03 -0700114 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700115 validate(false, false, SIZE * 2, 0);
toma7083182014-09-25 21:38:03 -0700116 }
117
118 @Test
119 public void bufferedWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700120 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700121
122 // First write is immediate...
tom1ae3d162014-09-26 09:38:16 -0700123 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700124 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700125
126 // Second and third get buffered...
tom1ae3d162014-09-26 09:38:16 -0700127 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700128 validate(false, true, 0, SIZE);
tom1ae3d162014-09-26 09:38:16 -0700129 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700130 validate(false, true, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700131
132 // Reset write, which will flush if needed; the next write is again buffered
tom2d6d3972014-09-25 22:38:57 -0700133 stream.flushIfWriteNotPending();
134 validate(false, false, 0, SIZE * 3);
tom1ae3d162014-09-26 09:38:16 -0700135 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700136 validate(false, true, 0, SIZE * 3);
toma7083182014-09-25 21:38:03 -0700137
138 // Select reset, which will flush if needed; the next write is again buffered
tom2d6d3972014-09-25 22:38:57 -0700139 stream.flushIfPossible();
140 validate(false, false, 0, SIZE * 4);
tom1ae3d162014-09-26 09:38:16 -0700141 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700142 validate(false, true, 0, SIZE * 4);
143 stream.flush();
144 validate(false, true, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700145 }
146
147 @Test
148 public void bufferedWriteList() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700149 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700150
151 // First write is immediate...
tom2d6d3972014-09-25 22:38:57 -0700152 List<TestMessage> messages = new ArrayList<>();
tom1ae3d162014-09-26 09:38:16 -0700153 messages.add(message);
154 messages.add(message);
155 messages.add(message);
156 messages.add(message);
toma7083182014-09-25 21:38:03 -0700157
tom2d6d3972014-09-25 22:38:57 -0700158 stream.write(messages);
159 validate(false, false, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700160
tom2d6d3972014-09-25 22:38:57 -0700161 stream.write(messages);
162 validate(false, true, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700163
tom2d6d3972014-09-25 22:38:57 -0700164 stream.flushIfPossible();
165 validate(false, false, 0, SIZE * 8);
toma7083182014-09-25 21:38:03 -0700166 }
167
168 @Test
169 public void bufferedPartialWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700170 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700171
172 // First write is immediate...
tom1ae3d162014-09-26 09:38:16 -0700173 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700174 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700175
176 // Tell test channel to accept only half.
tom2d6d3972014-09-25 22:38:57 -0700177 channel.bytesToWrite = SIZE / 2;
toma7083182014-09-25 21:38:03 -0700178
179 // Second and third get buffered...
tom1ae3d162014-09-26 09:38:16 -0700180 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700181 validate(false, true, 0, SIZE);
182 stream.flushIfPossible();
183 validate(true, true, 0, SIZE + SIZE / 2);
toma7083182014-09-25 21:38:03 -0700184 }
185
186 @Test
187 public void bufferedPartialWrite2() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700188 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700189
190 // First write is immediate...
tom1ae3d162014-09-26 09:38:16 -0700191 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700192 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700193
194 // Tell test channel to accept only half.
tom2d6d3972014-09-25 22:38:57 -0700195 channel.bytesToWrite = SIZE / 2;
toma7083182014-09-25 21:38:03 -0700196
197 // Second and third get buffered...
tom1ae3d162014-09-26 09:38:16 -0700198 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700199 validate(false, true, 0, SIZE);
200 stream.flushIfWriteNotPending();
201 validate(true, true, 0, SIZE + SIZE / 2);
toma7083182014-09-25 21:38:03 -0700202 }
203
204 @Test
205 public void bufferedReadWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700206 channel.bytesToRead = SIZE + 4;
207 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -0700208 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700209 validate(false, false, SIZE + 4, 0);
toma7083182014-09-25 21:38:03 -0700210
tom1ae3d162014-09-26 09:38:16 -0700211 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700212 validate(false, false, SIZE + 4, SIZE);
toma7083182014-09-25 21:38:03 -0700213
tom2d6d3972014-09-25 22:38:57 -0700214 channel.bytesToRead = SIZE - 4;
215 messages = stream.read();
toma7083182014-09-25 21:38:03 -0700216 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700217 validate(false, false, SIZE * 2, SIZE);
toma7083182014-09-25 21:38:03 -0700218 }
219
220 // Fake IO driver loop
tom2d6d3972014-09-25 22:38:57 -0700221 private static class TestIOLoop extends IOLoop<TestMessage, TestMessageStream> {
toma7083182014-09-25 21:38:03 -0700222
tom2d6d3972014-09-25 22:38:57 -0700223 public TestIOLoop() throws IOException {
toma7083182014-09-25 21:38:03 -0700224 super(500);
225 }
226
227 @Override
228 protected TestMessageStream createStream(ByteChannel channel) {
tom2d6d3972014-09-25 22:38:57 -0700229 return new TestMessageStream(SIZE, channel, this);
toma7083182014-09-25 21:38:03 -0700230 }
231
232 @Override
233 protected void processMessages(List<TestMessage> messages,
234 MessageStream<TestMessage> stream) {
235 }
236
237 }
238
239 // Byte channel test fixture
240 private static class TestByteChannel extends SelectableChannel implements ByteChannel {
241
242 private static final int BUFFER_LENGTH = 1024;
243 byte[] bytes = new byte[BUFFER_LENGTH];
244 int bytesToWrite = BUFFER_LENGTH;
245 int bytesToRead = BUFFER_LENGTH;
246 int writtenBytes = 0;
247 int readBytes = 0;
248
249 @Override
250 public int read(ByteBuffer dst) throws IOException {
251 int l = Math.min(dst.remaining(), bytesToRead);
252 if (bytesToRead > 0) {
253 readBytes += l;
254 dst.put(bytes, 0, l);
255 }
256 return l;
257 }
258
259 @Override
260 public int write(ByteBuffer src) throws IOException {
261 int l = Math.min(src.remaining(), bytesToWrite);
262 writtenBytes += l;
263 src.get(bytes, 0, l);
264 return l;
265 }
266
267 @Override
268 public Object blockingLock() {
269 return null;
270 }
271
272 @Override
273 public SelectableChannel configureBlocking(boolean arg0) throws IOException {
274 return null;
275 }
276
277 @Override
278 public boolean isBlocking() {
279 return false;
280 }
281
282 @Override
283 public boolean isRegistered() {
284 return false;
285 }
286
287 @Override
288 public SelectionKey keyFor(Selector arg0) {
289 return null;
290 }
291
292 @Override
293 public SelectorProvider provider() {
294 return null;
295 }
296
297 @Override
298 public SelectionKey register(Selector arg0, int arg1, Object arg2)
299 throws ClosedChannelException {
300 return null;
301 }
302
303 @Override
304 public int validOps() {
305 return 0;
306 }
307
308 @Override
309 protected void implCloseChannel() throws IOException {
310 bytesToRead = -1;
311 }
312
313 }
314
315 // Selection key text fixture
316 private static class TestKey extends SelectionKey {
317
318 private SelectableChannel channel;
319
320 public TestKey(TestByteChannel channel) {
321 this.channel = channel;
322 }
323
324 @Override
325 public void cancel() {
326 }
327
328 @Override
329 public SelectableChannel channel() {
330 return channel;
331 }
332
333 @Override
334 public int interestOps() {
335 return 0;
336 }
337
338 @Override
339 public SelectionKey interestOps(int ops) {
340 return null;
341 }
342
343 @Override
344 public boolean isValid() {
345 return true;
346 }
347
348 @Override
349 public int readyOps() {
350 return 0;
351 }
352
353 @Override
354 public Selector selector() {
355 return null;
356 }
357 }
358
359}