blob: d04c77756535988b3d5753e62fc6b43f90f7c218 [file] [log] [blame]
Aaron Kruglikov1b727382016-02-09 16:17:47 -08001package org.onosproject.store.cluster.messaging.impl;
Madan Jampani3961b752016-01-13 13:30:58 -08002
3import java.util.Arrays;
4import java.util.concurrent.CompletableFuture;
5import java.util.concurrent.CountDownLatch;
6import java.util.concurrent.ExecutorService;
7import java.util.concurrent.Executors;
8import java.util.concurrent.atomic.AtomicBoolean;
9import java.util.concurrent.atomic.AtomicReference;
10import java.util.function.BiFunction;
11
Aaron Kruglikov1b727382016-02-09 16:17:47 -080012import com.google.common.collect.Sets;
Madan Jampani3961b752016-01-13 13:30:58 -080013import org.junit.After;
14import org.junit.Before;
15import org.junit.Test;
16import org.onlab.packet.IpAddress;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080017import org.onosproject.cluster.ClusterMetadata;
18import org.onosproject.cluster.ClusterMetadataEventListener;
19import org.onosproject.cluster.ClusterMetadataService;
20import org.onosproject.cluster.ControllerNode;
21import org.onosproject.cluster.NodeId;
22import org.onosproject.net.provider.ProviderId;
Madan Jampani3961b752016-01-13 13:30:58 -080023import org.onosproject.store.cluster.messaging.Endpoint;
24
25import com.google.common.util.concurrent.MoreExecutors;
26import com.google.common.util.concurrent.Uninterruptibles;
27
28import static org.junit.Assert.*;
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080029import static org.onlab.junit.TestTools.findAvailablePort;
Madan Jampani3961b752016-01-13 13:30:58 -080030
31/**
32 * Unit tests for NettyMessaging.
33 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080034public class NettyMessagingManagerTest {
Madan Jampani3961b752016-01-13 13:30:58 -080035
Aaron Kruglikov1b727382016-02-09 16:17:47 -080036 NettyMessagingManager netty1;
37 NettyMessagingManager netty2;
Madan Jampani3961b752016-01-13 13:30:58 -080038
Aaron Kruglikov1b727382016-02-09 16:17:47 -080039 private static final String DUMMY_NAME = "node";
40 private static final String IP_STRING = "127.0.0.1";
41
42 Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
43 Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
44 Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
Madan Jampani3961b752016-01-13 13:30:58 -080045
46 @Before
47 public void setUp() throws Exception {
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080048 ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080049 netty1 = new NettyMessagingManager();
50 netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
51 netty1.activate();
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080052
53 ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080054 netty2 = new NettyMessagingManager();
55 netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
56 netty2.activate();
Madan Jampani3961b752016-01-13 13:30:58 -080057 }
58
59 @After
60 public void tearDown() throws Exception {
61 if (netty1 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -080062 netty1.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -080063 }
64
65 if (netty2 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -080066 netty2.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -080067 }
68 }
69
70 @Test
71 public void testSendAsync() {
72 CountDownLatch latch1 = new CountDownLatch(1);
73 CompletableFuture<Void> response = netty1.sendAsync(ep2, "test-subject", "hello world".getBytes());
74 response.whenComplete((r, e) -> {
75 assertNull(e);
76 latch1.countDown();
77 });
78 Uninterruptibles.awaitUninterruptibly(latch1);
79
80 CountDownLatch latch2 = new CountDownLatch(1);
81 response = netty1.sendAsync(invalidEndPoint, "test-subject", "hello world".getBytes());
82 response.whenComplete((r, e) -> {
83 assertNotNull(e);
84 latch2.countDown();
85 });
86 Uninterruptibles.awaitUninterruptibly(latch2);
87 }
88
89 @Test
90 public void testSendAndReceive() {
91 AtomicBoolean handlerInvoked = new AtomicBoolean(false);
92 AtomicReference<byte[]> request = new AtomicReference<>();
93 AtomicReference<Endpoint> sender = new AtomicReference<>();
94
95 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
96 handlerInvoked.set(true);
97 sender.set(ep);
98 request.set(data);
99 return "hello there".getBytes();
100 };
101 netty2.registerHandler("test-subject", handler, MoreExecutors.directExecutor());
102
103 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, "test-subject", "hello world".getBytes());
104 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
105 assertTrue(handlerInvoked.get());
106 assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
107 assertEquals(ep1, sender.get());
108 }
109
110 /*
111 * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
112 * and response completion occurs on the expected thread.
113 */
114 @Test
115 public void testSendAndReceiveWithExecutor() {
116 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
117 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
118 AtomicReference<String> handlerThreadName = new AtomicReference<>();
119 AtomicReference<String> completionThreadName = new AtomicReference<>();
120
121 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
122 handlerThreadName.set(Thread.currentThread().getName());
123 return "hello there".getBytes();
124 };
125 netty2.registerHandler("test-subject", handler, handlerExecutor);
126
127 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800128 "test-subject",
129 "hello world".getBytes(),
130 completionExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800131 response.whenComplete((r, e) -> {
132 completionThreadName.set(Thread.currentThread().getName());
133 });
134
135 // Verify that the message was request handling and response completion happens on the correct thread.
136 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
137 assertEquals("completion-thread", completionThreadName.get());
138 assertEquals("handler-thread", handlerThreadName.get());
139 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800140
141 private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
142 return new ClusterMetadataService() {
143 @Override
144 public ClusterMetadata getClusterMetadata() {
145 return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
146 name, Sets.newHashSet(), Sets.newHashSet());
147 }
148
149 @Override
150 public ControllerNode getLocalNode() {
151 return new ControllerNode() {
152 @Override
153 public NodeId id() {
154 return null;
155 }
156
157 @Override
158 public IpAddress ip() {
159 return IpAddress.valueOf(ipAddress);
160 }
161
162 @Override
163 public int tcpPort() {
164 return ep.port();
165 }
166 };
167 }
168
169 @Override
170 public void addListener(ClusterMetadataEventListener listener) {}
171
172 @Override
173 public void removeListener(ClusterMetadataEventListener listener) {}
174 };
175 }
176}