blob: dfb2f67624b7f7541ca297731db3e2f1bdc7c8cc [file] [log] [blame]
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2016-present Open Networking Foundation
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani3961b752016-01-13 13:30:58 -080017
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import com.google.common.collect.Sets;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070019import com.google.common.util.concurrent.MoreExecutors;
20import com.google.common.util.concurrent.Uninterruptibles;
Madan Jampani3961b752016-01-13 13:30:58 -080021import org.junit.After;
22import org.junit.Before;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070023import org.junit.Ignore;
Madan Jampani3961b752016-01-13 13:30:58 -080024import org.junit.Test;
25import org.onlab.packet.IpAddress;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080026import org.onosproject.cluster.ClusterMetadata;
27import org.onosproject.cluster.ClusterMetadataEventListener;
28import org.onosproject.cluster.ClusterMetadataService;
29import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.NodeId;
Madan Jampani05833872016-07-12 23:01:39 -070031import org.onosproject.core.HybridLogicalClockService;
32import org.onosproject.core.HybridLogicalTime;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080033import org.onosproject.net.provider.ProviderId;
Madan Jampani3961b752016-01-13 13:30:58 -080034import org.onosproject.store.cluster.messaging.Endpoint;
35
Jordan Halterman2f7a5d02017-05-03 16:56:53 -070036import java.net.ConnectException;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070037import java.util.Arrays;
38import java.util.UUID;
39import java.util.concurrent.CompletableFuture;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070040import java.util.concurrent.CompletionException;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070041import java.util.concurrent.CountDownLatch;
42import java.util.concurrent.ExecutorService;
43import java.util.concurrent.Executors;
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -070044import java.util.concurrent.TimeoutException;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070045import java.util.concurrent.atomic.AtomicBoolean;
Jordan Haltermanef92f192017-12-21 11:59:38 -080046import java.util.concurrent.atomic.AtomicInteger;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070047import java.util.concurrent.atomic.AtomicLong;
48import java.util.concurrent.atomic.AtomicReference;
49import java.util.function.BiFunction;
Madan Jampani3961b752016-01-13 13:30:58 -080050
Jonathan Hart6f4d5862016-08-17 09:18:01 -070051import static org.junit.Assert.assertEquals;
52import static org.junit.Assert.assertNotNull;
53import static org.junit.Assert.assertNull;
54import static org.junit.Assert.assertTrue;
55import static org.junit.Assert.fail;
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080056import static org.onlab.junit.TestTools.findAvailablePort;
Madan Jampani3961b752016-01-13 13:30:58 -080057
58/**
59 * Unit tests for NettyMessaging.
60 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080061public class NettyMessagingManagerTest {
Madan Jampani3961b752016-01-13 13:30:58 -080062
Madan Jampani05833872016-07-12 23:01:39 -070063 HybridLogicalClockService testClockService = new HybridLogicalClockService() {
64 AtomicLong counter = new AtomicLong();
65 @Override
66 public HybridLogicalTime timeNow() {
67 return new HybridLogicalTime(counter.incrementAndGet(), 0);
68 }
69
70 @Override
71 public void recordEventTime(HybridLogicalTime time) {
72 }
73 };
74
Aaron Kruglikov1b727382016-02-09 16:17:47 -080075 NettyMessagingManager netty1;
76 NettyMessagingManager netty2;
Madan Jampani3961b752016-01-13 13:30:58 -080077
Aaron Kruglikov1b727382016-02-09 16:17:47 -080078 private static final String DUMMY_NAME = "node";
79 private static final String IP_STRING = "127.0.0.1";
80
81 Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
82 Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
83 Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
Madan Jampani3961b752016-01-13 13:30:58 -080084
85 @Before
86 public void setUp() throws Exception {
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080087 ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080088 netty1 = new NettyMessagingManager();
89 netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
Madan Jampani05833872016-07-12 23:01:39 -070090 netty1.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080091 netty1.activate();
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080092
93 ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080094 netty2 = new NettyMessagingManager();
95 netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
Madan Jampani05833872016-07-12 23:01:39 -070096 netty2.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080097 netty2.activate();
Madan Jampani3961b752016-01-13 13:30:58 -080098 }
99
Madan Jampani0a70f732016-07-20 11:30:59 -0700100 /**
101 * Returns a random String to be used as a test subject.
102 * @return string
103 */
104 private String nextSubject() {
105 return UUID.randomUUID().toString();
106 }
107
Madan Jampani3961b752016-01-13 13:30:58 -0800108 @After
109 public void tearDown() throws Exception {
110 if (netty1 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800111 netty1.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -0800112 }
113
114 if (netty2 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800115 netty2.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -0800116 }
117 }
118
119 @Test
120 public void testSendAsync() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700121 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800122 CountDownLatch latch1 = new CountDownLatch(1);
Madan Jampani0a70f732016-07-20 11:30:59 -0700123 CompletableFuture<Void> response = netty1.sendAsync(ep2, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800124 response.whenComplete((r, e) -> {
125 assertNull(e);
126 latch1.countDown();
127 });
128 Uninterruptibles.awaitUninterruptibly(latch1);
129
130 CountDownLatch latch2 = new CountDownLatch(1);
Madan Jampani0a70f732016-07-20 11:30:59 -0700131 response = netty1.sendAsync(invalidEndPoint, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800132 response.whenComplete((r, e) -> {
133 assertNotNull(e);
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700134 assertTrue(e instanceof ConnectException);
Madan Jampani3961b752016-01-13 13:30:58 -0800135 latch2.countDown();
136 });
137 Uninterruptibles.awaitUninterruptibly(latch2);
138 }
139
140 @Test
Brian O'Connor63298802016-09-29 18:21:40 -0700141 @Ignore // FIXME disabled on 9/29/16 due to random failures
Madan Jampani3961b752016-01-13 13:30:58 -0800142 public void testSendAndReceive() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700143 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800144 AtomicBoolean handlerInvoked = new AtomicBoolean(false);
145 AtomicReference<byte[]> request = new AtomicReference<>();
146 AtomicReference<Endpoint> sender = new AtomicReference<>();
147
148 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
149 handlerInvoked.set(true);
150 sender.set(ep);
151 request.set(data);
152 return "hello there".getBytes();
153 };
Madan Jampani0a70f732016-07-20 11:30:59 -0700154 netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
Madan Jampani3961b752016-01-13 13:30:58 -0800155
Madan Jampani0a70f732016-07-20 11:30:59 -0700156 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800157 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
158 assertTrue(handlerInvoked.get());
159 assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
160 assertEquals(ep1, sender.get());
161 }
162
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700163 @Test
Jordan Haltermanef92f192017-12-21 11:59:38 -0800164 public void testDefaultTimeout() {
Jordan Haltermanf7c7f6f2017-05-05 03:02:34 -0700165 String subject = nextSubject();
166 BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> new CompletableFuture<>();
167 netty2.registerHandler(subject, handler);
168
169 try {
170 netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
171 fail();
172 } catch (CompletionException e) {
173 assertTrue(e.getCause() instanceof TimeoutException);
174 }
175 }
176
Jordan Haltermanef92f192017-12-21 11:59:38 -0800177 @Test
178 public void testDynamicTimeout() {
179 String subject = nextSubject();
180 AtomicInteger counter = new AtomicInteger();
181 BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = (ep, payload) -> {
182 if (counter.incrementAndGet() <= 50) {
183 return CompletableFuture.completedFuture(new byte[0]);
184 } else {
185 return new CompletableFuture<>();
186 }
187 };
188 netty2.registerHandler(subject, handler);
189
190 for (int i = 0; i < 50; i++) {
191 netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
192 }
193 try {
194 netty1.sendAndReceive(ep2, subject, "hello world".getBytes()).join();
195 fail();
196 } catch (CompletionException e) {
197 assertTrue(e.getCause() instanceof TimeoutException);
198 }
199 }
200
Madan Jampani3961b752016-01-13 13:30:58 -0800201 /*
202 * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
203 * and response completion occurs on the expected thread.
204 */
205 @Test
Jonathan Hart6f4d5862016-08-17 09:18:01 -0700206 @Ignore
Madan Jampani3961b752016-01-13 13:30:58 -0800207 public void testSendAndReceiveWithExecutor() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700208 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800209 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
210 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
211 AtomicReference<String> handlerThreadName = new AtomicReference<>();
212 AtomicReference<String> completionThreadName = new AtomicReference<>();
213
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700214 final CountDownLatch latch = new CountDownLatch(1);
215
Madan Jampani3961b752016-01-13 13:30:58 -0800216 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
217 handlerThreadName.set(Thread.currentThread().getName());
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700218 try {
219 latch.await();
220 } catch (InterruptedException e1) {
221 Thread.currentThread().interrupt();
222 fail("InterruptedException");
223 }
Madan Jampani3961b752016-01-13 13:30:58 -0800224 return "hello there".getBytes();
225 };
Madan Jampani0a70f732016-07-20 11:30:59 -0700226 netty2.registerHandler(subject, handler, handlerExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800227
228 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
Madan Jampani0a70f732016-07-20 11:30:59 -0700229 subject,
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800230 "hello world".getBytes(),
231 completionExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800232 response.whenComplete((r, e) -> {
233 completionThreadName.set(Thread.currentThread().getName());
234 });
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700235 latch.countDown();
Madan Jampani3961b752016-01-13 13:30:58 -0800236
237 // Verify that the message was request handling and response completion happens on the correct thread.
238 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
239 assertEquals("completion-thread", completionThreadName.get());
240 assertEquals("handler-thread", handlerThreadName.get());
241 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800242
243 private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
244 return new ClusterMetadataService() {
245 @Override
246 public ClusterMetadata getClusterMetadata() {
247 return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
Jordan Halterman00e92da2018-05-22 23:05:52 -0700248 name, getLocalNode(), Sets.newHashSet());
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800249 }
250
251 @Override
252 public ControllerNode getLocalNode() {
253 return new ControllerNode() {
254 @Override
255 public NodeId id() {
256 return null;
257 }
258
259 @Override
260 public IpAddress ip() {
261 return IpAddress.valueOf(ipAddress);
262 }
263
264 @Override
265 public int tcpPort() {
266 return ep.port();
267 }
268 };
269 }
270
271 @Override
272 public void addListener(ClusterMetadataEventListener listener) {}
273
274 @Override
275 public void removeListener(ClusterMetadataEventListener listener) {}
276 };
277 }
Jonathan Hart6f4d5862016-08-17 09:18:01 -0700278}