blob: 40b5a4f6255e6eb7fad11eec179b57de681c4a52 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
3import org.junit.After;
4import org.junit.Before;
5import org.junit.Test;
6
7import java.io.IOException;
8import java.nio.ByteBuffer;
9import java.nio.channels.ByteChannel;
10import java.nio.channels.ClosedChannelException;
11import java.nio.channels.SelectableChannel;
12import java.nio.channels.SelectionKey;
13import java.nio.channels.Selector;
14import java.nio.channels.spi.SelectorProvider;
15import java.util.ArrayList;
16import java.util.List;
17
18import static org.junit.Assert.assertEquals;
19import static org.junit.Assert.assertNull;
20
21/**
22 * Tests of the message message stream implementation.
23 */
24public class MessageStreamTest {
25
tom1ae3d162014-09-26 09:38:16 -070026 private static final int SIZE = 64;
toma7083182014-09-25 21:38:03 -070027 private static final int BIG_SIZE = 32 * 1024;
tom1ae3d162014-09-26 09:38:16 -070028
29 private TestMessage message;
toma7083182014-09-25 21:38:03 -070030
tom2d6d3972014-09-25 22:38:57 -070031 private TestIOLoop loop;
toma7083182014-09-25 21:38:03 -070032 private TestByteChannel channel;
tom2d6d3972014-09-25 22:38:57 -070033 private TestMessageStream stream;
toma7083182014-09-25 21:38:03 -070034 private TestKey key;
35
36 @Before
37 public void setUp() throws IOException {
tom2d6d3972014-09-25 22:38:57 -070038 loop = new TestIOLoop();
toma7083182014-09-25 21:38:03 -070039 channel = new TestByteChannel();
40 key = new TestKey(channel);
tom2d6d3972014-09-25 22:38:57 -070041 stream = loop.createStream(channel);
42 stream.setKey(key);
tom1ae3d162014-09-26 09:38:16 -070043 stream.setNonStrict();
44 message = new TestMessage(SIZE, 0, 0, stream.padding());
toma7083182014-09-25 21:38:03 -070045 }
46
47 @After
48 public void tearDown() {
49 loop.shutdown();
tom2d6d3972014-09-25 22:38:57 -070050 stream.close();
toma7083182014-09-25 21:38:03 -070051 }
52
tom2d6d3972014-09-25 22:38:57 -070053 // Validates the state of the message stream
54 private void validate(boolean wp, boolean fr, int read, int written) {
55 assertEquals(wp, stream.isWritePending());
56 assertEquals(fr, stream.isFlushRequired());
toma7083182014-09-25 21:38:03 -070057 assertEquals(read, channel.readBytes);
58 assertEquals(written, channel.writtenBytes);
59 }
60
61 @Test
62 public void endOfStream() throws IOException {
63 channel.close();
tom2d6d3972014-09-25 22:38:57 -070064 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -070065 assertNull(messages);
66 }
67
68 @Test
69 public void bufferGrowth() throws IOException {
tom2d6d3972014-09-25 22:38:57 -070070 // Create a stream for big messages and test the growth.
71 stream = new TestMessageStream(BIG_SIZE, channel, loop);
tom1ae3d162014-09-26 09:38:16 -070072 TestMessage bigMessage = new TestMessage(BIG_SIZE, 0, 0, stream.padding());
73
74 stream.write(bigMessage);
75 stream.write(bigMessage);
76 stream.write(bigMessage);
77 stream.write(bigMessage);
78 stream.write(bigMessage);
toma7083182014-09-25 21:38:03 -070079 }
80
81 @Test
82 public void discardBeforeKey() {
tom2d6d3972014-09-25 22:38:57 -070083 // Create a stream that does not yet have the key set and discard it.
84 stream = loop.createStream(channel);
85 assertNull(stream.key());
86 stream.close();
toma7083182014-09-25 21:38:03 -070087 // There is not key, so nothing to check; we just expect no problem.
88 }
89
90 @Test
91 public void bufferedRead() throws IOException {
tom2d6d3972014-09-25 22:38:57 -070092 channel.bytesToRead = SIZE + 4;
93 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -070094 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -070095 validate(false, false, SIZE + 4, 0);
toma7083182014-09-25 21:38:03 -070096
tom2d6d3972014-09-25 22:38:57 -070097 channel.bytesToRead = SIZE - 4;
98 messages = stream.read();
toma7083182014-09-25 21:38:03 -070099 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700100 validate(false, false, SIZE * 2, 0);
toma7083182014-09-25 21:38:03 -0700101 }
102
103 @Test
104 public void bufferedWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700105 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700106
107 // First write is immediate...
tom1ae3d162014-09-26 09:38:16 -0700108 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700109 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700110
111 // Second and third get buffered...
tom1ae3d162014-09-26 09:38:16 -0700112 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700113 validate(false, true, 0, SIZE);
tom1ae3d162014-09-26 09:38:16 -0700114 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700115 validate(false, true, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700116
117 // Reset write, which will flush if needed; the next write is again buffered
tom2d6d3972014-09-25 22:38:57 -0700118 stream.flushIfWriteNotPending();
119 validate(false, false, 0, SIZE * 3);
tom1ae3d162014-09-26 09:38:16 -0700120 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700121 validate(false, true, 0, SIZE * 3);
toma7083182014-09-25 21:38:03 -0700122
123 // Select reset, which will flush if needed; the next write is again buffered
tom2d6d3972014-09-25 22:38:57 -0700124 stream.flushIfPossible();
125 validate(false, false, 0, SIZE * 4);
tom1ae3d162014-09-26 09:38:16 -0700126 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700127 validate(false, true, 0, SIZE * 4);
128 stream.flush();
129 validate(false, true, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700130 }
131
132 @Test
133 public void bufferedWriteList() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700134 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700135
136 // First write is immediate...
tom2d6d3972014-09-25 22:38:57 -0700137 List<TestMessage> messages = new ArrayList<>();
tom1ae3d162014-09-26 09:38:16 -0700138 messages.add(message);
139 messages.add(message);
140 messages.add(message);
141 messages.add(message);
toma7083182014-09-25 21:38:03 -0700142
tom2d6d3972014-09-25 22:38:57 -0700143 stream.write(messages);
144 validate(false, false, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700145
tom2d6d3972014-09-25 22:38:57 -0700146 stream.write(messages);
147 validate(false, true, 0, SIZE * 4);
toma7083182014-09-25 21:38:03 -0700148
tom2d6d3972014-09-25 22:38:57 -0700149 stream.flushIfPossible();
150 validate(false, false, 0, SIZE * 8);
toma7083182014-09-25 21:38:03 -0700151 }
152
153 @Test
154 public void bufferedPartialWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700155 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700156
157 // First write is immediate...
tom1ae3d162014-09-26 09:38:16 -0700158 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700159 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700160
161 // Tell test channel to accept only half.
tom2d6d3972014-09-25 22:38:57 -0700162 channel.bytesToWrite = SIZE / 2;
toma7083182014-09-25 21:38:03 -0700163
164 // Second and third get buffered...
tom1ae3d162014-09-26 09:38:16 -0700165 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700166 validate(false, true, 0, SIZE);
167 stream.flushIfPossible();
168 validate(true, true, 0, SIZE + SIZE / 2);
toma7083182014-09-25 21:38:03 -0700169 }
170
171 @Test
172 public void bufferedPartialWrite2() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700173 validate(false, false, 0, 0);
toma7083182014-09-25 21:38:03 -0700174
175 // First write is immediate...
tom1ae3d162014-09-26 09:38:16 -0700176 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700177 validate(false, false, 0, SIZE);
toma7083182014-09-25 21:38:03 -0700178
179 // Tell test channel to accept only half.
tom2d6d3972014-09-25 22:38:57 -0700180 channel.bytesToWrite = SIZE / 2;
toma7083182014-09-25 21:38:03 -0700181
182 // Second and third get buffered...
tom1ae3d162014-09-26 09:38:16 -0700183 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700184 validate(false, true, 0, SIZE);
185 stream.flushIfWriteNotPending();
186 validate(true, true, 0, SIZE + SIZE / 2);
toma7083182014-09-25 21:38:03 -0700187 }
188
189 @Test
190 public void bufferedReadWrite() throws IOException {
tom2d6d3972014-09-25 22:38:57 -0700191 channel.bytesToRead = SIZE + 4;
192 List<TestMessage> messages = stream.read();
toma7083182014-09-25 21:38:03 -0700193 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700194 validate(false, false, SIZE + 4, 0);
toma7083182014-09-25 21:38:03 -0700195
tom1ae3d162014-09-26 09:38:16 -0700196 stream.write(message);
tom2d6d3972014-09-25 22:38:57 -0700197 validate(false, false, SIZE + 4, SIZE);
toma7083182014-09-25 21:38:03 -0700198
tom2d6d3972014-09-25 22:38:57 -0700199 channel.bytesToRead = SIZE - 4;
200 messages = stream.read();
toma7083182014-09-25 21:38:03 -0700201 assertEquals(1, messages.size());
tom2d6d3972014-09-25 22:38:57 -0700202 validate(false, false, SIZE * 2, SIZE);
toma7083182014-09-25 21:38:03 -0700203 }
204
205 // Fake IO driver loop
tom2d6d3972014-09-25 22:38:57 -0700206 private static class TestIOLoop extends IOLoop<TestMessage, TestMessageStream> {
toma7083182014-09-25 21:38:03 -0700207
tom2d6d3972014-09-25 22:38:57 -0700208 public TestIOLoop() throws IOException {
toma7083182014-09-25 21:38:03 -0700209 super(500);
210 }
211
212 @Override
213 protected TestMessageStream createStream(ByteChannel channel) {
tom2d6d3972014-09-25 22:38:57 -0700214 return new TestMessageStream(SIZE, channel, this);
toma7083182014-09-25 21:38:03 -0700215 }
216
217 @Override
218 protected void processMessages(List<TestMessage> messages,
219 MessageStream<TestMessage> stream) {
220 }
221
222 }
223
224 // Byte channel test fixture
225 private static class TestByteChannel extends SelectableChannel implements ByteChannel {
226
227 private static final int BUFFER_LENGTH = 1024;
228 byte[] bytes = new byte[BUFFER_LENGTH];
229 int bytesToWrite = BUFFER_LENGTH;
230 int bytesToRead = BUFFER_LENGTH;
231 int writtenBytes = 0;
232 int readBytes = 0;
233
234 @Override
235 public int read(ByteBuffer dst) throws IOException {
236 int l = Math.min(dst.remaining(), bytesToRead);
237 if (bytesToRead > 0) {
238 readBytes += l;
239 dst.put(bytes, 0, l);
240 }
241 return l;
242 }
243
244 @Override
245 public int write(ByteBuffer src) throws IOException {
246 int l = Math.min(src.remaining(), bytesToWrite);
247 writtenBytes += l;
248 src.get(bytes, 0, l);
249 return l;
250 }
251
252 @Override
253 public Object blockingLock() {
254 return null;
255 }
256
257 @Override
258 public SelectableChannel configureBlocking(boolean arg0) throws IOException {
259 return null;
260 }
261
262 @Override
263 public boolean isBlocking() {
264 return false;
265 }
266
267 @Override
268 public boolean isRegistered() {
269 return false;
270 }
271
272 @Override
273 public SelectionKey keyFor(Selector arg0) {
274 return null;
275 }
276
277 @Override
278 public SelectorProvider provider() {
279 return null;
280 }
281
282 @Override
283 public SelectionKey register(Selector arg0, int arg1, Object arg2)
284 throws ClosedChannelException {
285 return null;
286 }
287
288 @Override
289 public int validOps() {
290 return 0;
291 }
292
293 @Override
294 protected void implCloseChannel() throws IOException {
295 bytesToRead = -1;
296 }
297
298 }
299
300 // Selection key text fixture
301 private static class TestKey extends SelectionKey {
302
303 private SelectableChannel channel;
304
305 public TestKey(TestByteChannel channel) {
306 this.channel = channel;
307 }
308
309 @Override
310 public void cancel() {
311 }
312
313 @Override
314 public SelectableChannel channel() {
315 return channel;
316 }
317
318 @Override
319 public int interestOps() {
320 return 0;
321 }
322
323 @Override
324 public SelectionKey interestOps(int ops) {
325 return null;
326 }
327
328 @Override
329 public boolean isValid() {
330 return true;
331 }
332
333 @Override
334 public int readyOps() {
335 return 0;
336 }
337
338 @Override
339 public Selector selector() {
340 return null;
341 }
342 }
343
344}