blob: f7737d237d3efcc7e2466ffdec0a62f56fbab9ab [file] [log] [blame]
Madan Jampani3961b752016-01-13 13:30:58 -08001package org.onlab.netty;
2
3import java.util.Arrays;
4import java.util.concurrent.CompletableFuture;
5import java.util.concurrent.CountDownLatch;
6import java.util.concurrent.ExecutorService;
7import java.util.concurrent.Executors;
8import java.util.concurrent.atomic.AtomicBoolean;
9import java.util.concurrent.atomic.AtomicReference;
10import java.util.function.BiFunction;
11
12import org.junit.After;
13import org.junit.Before;
14import org.junit.Test;
15import org.onlab.packet.IpAddress;
16import org.onosproject.store.cluster.messaging.Endpoint;
17
18import com.google.common.util.concurrent.MoreExecutors;
19import com.google.common.util.concurrent.Uninterruptibles;
20
21import static org.junit.Assert.*;
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080022import static org.onlab.junit.TestTools.findAvailablePort;
Madan Jampani3961b752016-01-13 13:30:58 -080023
24/**
25 * Unit tests for NettyMessaging.
26 */
27public class NettyMessagingTest {
28
29 NettyMessaging netty1;
30 NettyMessaging netty2;
31
32 Endpoint ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5001);
33 Endpoint ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5002);
34 Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5003);
35
36 @Before
37 public void setUp() throws Exception {
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080038 ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
Madan Jampani3961b752016-01-13 13:30:58 -080039 netty1 = new NettyMessaging();
Madan Jampani3961b752016-01-13 13:30:58 -080040 netty1.start(12, ep1);
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080041
42 ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
43 netty2 = new NettyMessaging();
Madan Jampani3961b752016-01-13 13:30:58 -080044 netty2.start(12, ep2);
45 }
46
47 @After
48 public void tearDown() throws Exception {
49 if (netty1 != null) {
50 netty1.stop();
51 }
52
53 if (netty2 != null) {
54 netty2.stop();
55 }
56 }
57
58 @Test
59 public void testSendAsync() {
60 CountDownLatch latch1 = new CountDownLatch(1);
61 CompletableFuture<Void> response = netty1.sendAsync(ep2, "test-subject", "hello world".getBytes());
62 response.whenComplete((r, e) -> {
63 assertNull(e);
64 latch1.countDown();
65 });
66 Uninterruptibles.awaitUninterruptibly(latch1);
67
68 CountDownLatch latch2 = new CountDownLatch(1);
69 response = netty1.sendAsync(invalidEndPoint, "test-subject", "hello world".getBytes());
70 response.whenComplete((r, e) -> {
71 assertNotNull(e);
72 latch2.countDown();
73 });
74 Uninterruptibles.awaitUninterruptibly(latch2);
75 }
76
77 @Test
78 public void testSendAndReceive() {
79 AtomicBoolean handlerInvoked = new AtomicBoolean(false);
80 AtomicReference<byte[]> request = new AtomicReference<>();
81 AtomicReference<Endpoint> sender = new AtomicReference<>();
82
83 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
84 handlerInvoked.set(true);
85 sender.set(ep);
86 request.set(data);
87 return "hello there".getBytes();
88 };
89 netty2.registerHandler("test-subject", handler, MoreExecutors.directExecutor());
90
91 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, "test-subject", "hello world".getBytes());
92 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
93 assertTrue(handlerInvoked.get());
94 assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
95 assertEquals(ep1, sender.get());
96 }
97
98 /*
99 * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
100 * and response completion occurs on the expected thread.
101 */
102 @Test
103 public void testSendAndReceiveWithExecutor() {
104 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
105 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
106 AtomicReference<String> handlerThreadName = new AtomicReference<>();
107 AtomicReference<String> completionThreadName = new AtomicReference<>();
108
109 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
110 handlerThreadName.set(Thread.currentThread().getName());
111 return "hello there".getBytes();
112 };
113 netty2.registerHandler("test-subject", handler, handlerExecutor);
114
115 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
116 "test-subject",
117 "hello world".getBytes(),
118 completionExecutor);
119 response.whenComplete((r, e) -> {
120 completionThreadName.set(Thread.currentThread().getName());
121 });
122
123 // Verify that the message was request handling and response completion happens on the correct thread.
124 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
125 assertEquals("completion-thread", completionThreadName.get());
126 assertEquals("handler-thread", handlerThreadName.get());
127 }
128}