blob: 7086c1eb35c266715eef26b4b584d9d3c99414a8 [file] [log] [blame]
Madan Jampani3961b752016-01-13 13:30:58 -08001package org.onlab.netty;
2
3import java.util.Arrays;
4import java.util.concurrent.CompletableFuture;
5import java.util.concurrent.CountDownLatch;
6import java.util.concurrent.ExecutorService;
7import java.util.concurrent.Executors;
8import java.util.concurrent.atomic.AtomicBoolean;
9import java.util.concurrent.atomic.AtomicReference;
10import java.util.function.BiFunction;
11
12import org.junit.After;
13import org.junit.Before;
14import org.junit.Test;
15import org.onlab.packet.IpAddress;
16import org.onosproject.store.cluster.messaging.Endpoint;
17
18import com.google.common.util.concurrent.MoreExecutors;
19import com.google.common.util.concurrent.Uninterruptibles;
20
21import static org.junit.Assert.*;
22
23/**
24 * Unit tests for NettyMessaging.
25 */
26public class NettyMessagingTest {
27
28 NettyMessaging netty1;
29 NettyMessaging netty2;
30
31 Endpoint ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5001);
32 Endpoint ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5002);
33 Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf("127.0.0.1"), 5003);
34
35 @Before
36 public void setUp() throws Exception {
37 netty1 = new NettyMessaging();
38 netty2 = new NettyMessaging();
39
40 netty1.start(12, ep1);
41 netty2.start(12, ep2);
42 }
43
44 @After
45 public void tearDown() throws Exception {
46 if (netty1 != null) {
47 netty1.stop();
48 }
49
50 if (netty2 != null) {
51 netty2.stop();
52 }
53 }
54
55 @Test
56 public void testSendAsync() {
57 CountDownLatch latch1 = new CountDownLatch(1);
58 CompletableFuture<Void> response = netty1.sendAsync(ep2, "test-subject", "hello world".getBytes());
59 response.whenComplete((r, e) -> {
60 assertNull(e);
61 latch1.countDown();
62 });
63 Uninterruptibles.awaitUninterruptibly(latch1);
64
65 CountDownLatch latch2 = new CountDownLatch(1);
66 response = netty1.sendAsync(invalidEndPoint, "test-subject", "hello world".getBytes());
67 response.whenComplete((r, e) -> {
68 assertNotNull(e);
69 latch2.countDown();
70 });
71 Uninterruptibles.awaitUninterruptibly(latch2);
72 }
73
74 @Test
75 public void testSendAndReceive() {
76 AtomicBoolean handlerInvoked = new AtomicBoolean(false);
77 AtomicReference<byte[]> request = new AtomicReference<>();
78 AtomicReference<Endpoint> sender = new AtomicReference<>();
79
80 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
81 handlerInvoked.set(true);
82 sender.set(ep);
83 request.set(data);
84 return "hello there".getBytes();
85 };
86 netty2.registerHandler("test-subject", handler, MoreExecutors.directExecutor());
87
88 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, "test-subject", "hello world".getBytes());
89 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
90 assertTrue(handlerInvoked.get());
91 assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
92 assertEquals(ep1, sender.get());
93 }
94
95 /*
96 * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
97 * and response completion occurs on the expected thread.
98 */
99 @Test
100 public void testSendAndReceiveWithExecutor() {
101 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
102 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
103 AtomicReference<String> handlerThreadName = new AtomicReference<>();
104 AtomicReference<String> completionThreadName = new AtomicReference<>();
105
106 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
107 handlerThreadName.set(Thread.currentThread().getName());
108 return "hello there".getBytes();
109 };
110 netty2.registerHandler("test-subject", handler, handlerExecutor);
111
112 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
113 "test-subject",
114 "hello world".getBytes(),
115 completionExecutor);
116 response.whenComplete((r, e) -> {
117 completionThreadName.set(Thread.currentThread().getName());
118 });
119
120 // Verify that the message was request handling and response completion happens on the correct thread.
121 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
122 assertEquals("completion-thread", completionThreadName.get());
123 assertEquals("handler-thread", handlerThreadName.get());
124 }
125}