blob: f0f896f6ab230c9d8f063b5a76e62d8420d7ee43 [file] [log] [blame]
toma7083182014-09-25 21:38:03 -07001package org.onlab.nio;
2
3import org.junit.After;
4import org.junit.Before;
5import org.junit.Test;
6
7import java.io.IOException;
8import java.nio.ByteBuffer;
9import java.nio.channels.ByteChannel;
10import java.nio.channels.ClosedChannelException;
11import java.nio.channels.SelectableChannel;
12import java.nio.channels.SelectionKey;
13import java.nio.channels.Selector;
14import java.nio.channels.spi.SelectorProvider;
15import java.util.ArrayList;
16import java.util.List;
17
18import static org.junit.Assert.assertEquals;
19import static org.junit.Assert.assertNull;
20
21/**
22 * Tests of the message message stream implementation.
23 */
24public class MessageStreamTest {
25
26 private static final int LENGTH = 16;
27
28 private static final TestMessage TM1 = new TestMessage(LENGTH);
29 private static final TestMessage TM2 = new TestMessage(LENGTH);
30 private static final TestMessage TM3 = new TestMessage(LENGTH);
31 private static final TestMessage TM4 = new TestMessage(LENGTH);
32
33 private static final int BIG_SIZE = 32 * 1024;
34 private static final TestMessage BIG_MESSAGE = new TestMessage(BIG_SIZE);
35
36 private static enum WritePending {
37 ON, OFF;
38
39 public boolean on() {
40 return this == ON;
41 }
42 }
43
44 private static enum FlushRequired {
45 ON, OFF;
46
47 public boolean on() {
48 return this == ON;
49 }
50 }
51
52 private FakeIOLoop loop;
53 private TestByteChannel channel;
54 private TestMessageStream buffer;
55 private TestKey key;
56
57 @Before
58 public void setUp() throws IOException {
59 loop = new FakeIOLoop();
60 channel = new TestByteChannel();
61 key = new TestKey(channel);
62 buffer = loop.createStream(channel);
63 buffer.setKey(key);
64 }
65
66 @After
67 public void tearDown() {
68 loop.shutdown();
69 buffer.close();
70 }
71
72 // Check state of the message buffer
73 private void assertState(WritePending wp, FlushRequired fr,
74 int read, int written) {
75 assertEquals(wp.on(), buffer.isWritePending());
76// assertEquals(fr.on(), buffer.requiresFlush());
77 assertEquals(read, channel.readBytes);
78 assertEquals(written, channel.writtenBytes);
79 }
80
81 @Test
82 public void endOfStream() throws IOException {
83 channel.close();
84 List<TestMessage> messages = buffer.read();
85 assertNull(messages);
86 }
87
88 @Test
89 public void bufferGrowth() throws IOException {
90 // Create a buffer for big messages and test the growth.
91 buffer = new TestMessageStream(BIG_SIZE, channel, loop);
92 buffer.write(BIG_MESSAGE);
93 buffer.write(BIG_MESSAGE);
94 buffer.write(BIG_MESSAGE);
95 buffer.write(BIG_MESSAGE);
96 buffer.write(BIG_MESSAGE);
97 }
98
99 @Test
100 public void discardBeforeKey() {
101 // Create a buffer that does not yet have the key set and discard it.
102 buffer = loop.createStream(channel);
103 assertNull(buffer.key());
104 buffer.close();
105 // There is not key, so nothing to check; we just expect no problem.
106 }
107
108 @Test
109 public void bufferedRead() throws IOException {
110 channel.bytesToRead = LENGTH + 4;
111 List<TestMessage> messages = buffer.read();
112 assertEquals(1, messages.size());
113 assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, 0);
114
115 channel.bytesToRead = LENGTH - 4;
116 messages = buffer.read();
117 assertEquals(1, messages.size());
118 assertState(WritePending.OFF, FlushRequired.OFF, LENGTH * 2, 0);
119 }
120
121 @Test
122 public void bufferedWrite() throws IOException {
123 assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
124
125 // First write is immediate...
126 buffer.write(TM1);
127 assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
128
129 // Second and third get buffered...
130 buffer.write(TM2);
131 assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
132 buffer.write(TM3);
133 assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
134
135 // Reset write, which will flush if needed; the next write is again buffered
136 buffer.flushIfWriteNotPending();
137 assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 3);
138 buffer.write(TM4);
139 assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 3);
140
141 // Select reset, which will flush if needed; the next write is again buffered
142 buffer.flushIfPossible();
143 assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 4);
144 buffer.write(TM1);
145 assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
146 buffer.flush();
147 assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
148 }
149
150 @Test
151 public void bufferedWriteList() throws IOException {
152 assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
153
154 // First write is immediate...
155 List<TestMessage> messages = new ArrayList<TestMessage>();
156 messages.add(TM1);
157 messages.add(TM2);
158 messages.add(TM3);
159 messages.add(TM4);
160
161 buffer.write(messages);
162 assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 4);
163
164 buffer.write(messages);
165 assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH * 4);
166
167 buffer.flushIfPossible();
168 assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH * 8);
169 }
170
171 @Test
172 public void bufferedPartialWrite() throws IOException {
173 assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
174
175 // First write is immediate...
176 buffer.write(TM1);
177 assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
178
179 // Tell test channel to accept only half.
180 channel.bytesToWrite = LENGTH / 2;
181
182 // Second and third get buffered...
183 buffer.write(TM2);
184 assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
185 buffer.flushIfPossible();
186 assertState(WritePending.ON, FlushRequired.ON, 0, LENGTH + LENGTH / 2);
187 }
188
189 @Test
190 public void bufferedPartialWrite2() throws IOException {
191 assertState(WritePending.OFF, FlushRequired.OFF, 0, 0);
192
193 // First write is immediate...
194 buffer.write(TM1);
195 assertState(WritePending.OFF, FlushRequired.OFF, 0, LENGTH);
196
197 // Tell test channel to accept only half.
198 channel.bytesToWrite = LENGTH / 2;
199
200 // Second and third get buffered...
201 buffer.write(TM2);
202 assertState(WritePending.OFF, FlushRequired.ON, 0, LENGTH);
203 buffer.flushIfWriteNotPending();
204 assertState(WritePending.ON, FlushRequired.ON, 0, LENGTH + LENGTH / 2);
205 }
206
207 @Test
208 public void bufferedReadWrite() throws IOException {
209 channel.bytesToRead = LENGTH + 4;
210 List<TestMessage> messages = buffer.read();
211 assertEquals(1, messages.size());
212 assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, 0);
213
214 buffer.write(TM1);
215 assertState(WritePending.OFF, FlushRequired.OFF, LENGTH + 4, LENGTH);
216
217 channel.bytesToRead = LENGTH - 4;
218 messages = buffer.read();
219 assertEquals(1, messages.size());
220 assertState(WritePending.OFF, FlushRequired.OFF, LENGTH * 2, LENGTH);
221 }
222
223 // Fake IO driver loop
224 private static class FakeIOLoop extends IOLoop<TestMessage, TestMessageStream> {
225
226 public FakeIOLoop() throws IOException {
227 super(500);
228 }
229
230 @Override
231 protected TestMessageStream createStream(ByteChannel channel) {
232 return new TestMessageStream(LENGTH, channel, this);
233 }
234
235 @Override
236 protected void processMessages(List<TestMessage> messages,
237 MessageStream<TestMessage> stream) {
238 }
239
240 }
241
242 // Byte channel test fixture
243 private static class TestByteChannel extends SelectableChannel implements ByteChannel {
244
245 private static final int BUFFER_LENGTH = 1024;
246 byte[] bytes = new byte[BUFFER_LENGTH];
247 int bytesToWrite = BUFFER_LENGTH;
248 int bytesToRead = BUFFER_LENGTH;
249 int writtenBytes = 0;
250 int readBytes = 0;
251
252 @Override
253 public int read(ByteBuffer dst) throws IOException {
254 int l = Math.min(dst.remaining(), bytesToRead);
255 if (bytesToRead > 0) {
256 readBytes += l;
257 dst.put(bytes, 0, l);
258 }
259 return l;
260 }
261
262 @Override
263 public int write(ByteBuffer src) throws IOException {
264 int l = Math.min(src.remaining(), bytesToWrite);
265 writtenBytes += l;
266 src.get(bytes, 0, l);
267 return l;
268 }
269
270 @Override
271 public Object blockingLock() {
272 return null;
273 }
274
275 @Override
276 public SelectableChannel configureBlocking(boolean arg0) throws IOException {
277 return null;
278 }
279
280 @Override
281 public boolean isBlocking() {
282 return false;
283 }
284
285 @Override
286 public boolean isRegistered() {
287 return false;
288 }
289
290 @Override
291 public SelectionKey keyFor(Selector arg0) {
292 return null;
293 }
294
295 @Override
296 public SelectorProvider provider() {
297 return null;
298 }
299
300 @Override
301 public SelectionKey register(Selector arg0, int arg1, Object arg2)
302 throws ClosedChannelException {
303 return null;
304 }
305
306 @Override
307 public int validOps() {
308 return 0;
309 }
310
311 @Override
312 protected void implCloseChannel() throws IOException {
313 bytesToRead = -1;
314 }
315
316 }
317
318 // Selection key text fixture
319 private static class TestKey extends SelectionKey {
320
321 private SelectableChannel channel;
322
323 public TestKey(TestByteChannel channel) {
324 this.channel = channel;
325 }
326
327 @Override
328 public void cancel() {
329 }
330
331 @Override
332 public SelectableChannel channel() {
333 return channel;
334 }
335
336 @Override
337 public int interestOps() {
338 return 0;
339 }
340
341 @Override
342 public SelectionKey interestOps(int ops) {
343 return null;
344 }
345
346 @Override
347 public boolean isValid() {
348 return true;
349 }
350
351 @Override
352 public int readyOps() {
353 return 0;
354 }
355
356 @Override
357 public Selector selector() {
358 return null;
359 }
360 }
361
362}