blob: de76d0dd6ac32c03b4f7cdaf2a57ba737830f387 [file] [log] [blame]
Jordan Haltermane9c37092017-03-21 11:16:14 -07001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
Jordan Haltermane9c37092017-03-21 11:16:14 -070018import java.time.Duration;
19import java.util.Map;
20import java.util.UUID;
21import java.util.concurrent.CompletableFuture;
22import java.util.concurrent.ConcurrentHashMap;
23import java.util.concurrent.CountDownLatch;
24import java.util.concurrent.Executor;
25import java.util.concurrent.TimeUnit;
26import java.util.function.BiConsumer;
27import java.util.function.BiFunction;
28
Jordan Haltermanfda46f92017-04-14 10:49:44 -070029import com.google.common.collect.Lists;
30import io.atomix.catalyst.concurrent.SingleThreadContext;
31import io.atomix.catalyst.concurrent.ThreadContext;
32import io.atomix.catalyst.transport.Address;
33import io.atomix.catalyst.transport.Client;
34import io.atomix.catalyst.transport.Server;
35import io.atomix.catalyst.transport.Transport;
36import io.atomix.copycat.protocol.ConnectRequest;
37import io.atomix.copycat.protocol.ConnectResponse;
38import io.atomix.copycat.protocol.Response;
39import org.junit.After;
40import org.junit.Before;
41import org.junit.Test;
42import org.onlab.packet.IpAddress;
43import org.onlab.util.Tools;
44import org.onosproject.cluster.PartitionId;
45import org.onosproject.store.cluster.messaging.Endpoint;
46import org.onosproject.store.cluster.messaging.MessagingService;
47
Jordan Haltermane9c37092017-03-21 11:16:14 -070048import static org.junit.Assert.assertEquals;
49import static org.junit.Assert.assertNotNull;
50import static org.junit.Assert.fail;
51import static org.onlab.junit.TestTools.findAvailablePort;
52
53/**
54 * Copycat transport test.
55 */
56public class CopycatTransportTest {
57
58 private static final String IP_STRING = "127.0.0.1";
59
60 private Endpoint endpoint1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
61 private Endpoint endpoint2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
62
63 private MessagingService service1;
64 private MessagingService service2;
65
66 private Transport clientTransport;
67 private ThreadContext clientContext;
68
69 private Transport serverTransport;
70 private ThreadContext serverContext;
71
72 @Before
73 public void setUp() throws Exception {
74 Map<Endpoint, TestMessagingService> services = new ConcurrentHashMap<>();
75
76 endpoint1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
77 service1 = new TestMessagingService(endpoint1, services);
78 clientTransport = new CopycatTransport(PartitionId.from(1), service1);
79 clientContext = new SingleThreadContext("client-test-%d", CatalystSerializers.getSerializer());
80
81 endpoint2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
82 service2 = new TestMessagingService(endpoint2, services);
83 serverTransport = new CopycatTransport(PartitionId.from(1), service2);
84 serverContext = new SingleThreadContext("server-test-%d", CatalystSerializers.getSerializer());
85 }
86
87 @After
88 public void tearDown() throws Exception {
89 if (clientContext != null) {
90 clientContext.close();
91 }
92 if (serverContext != null) {
93 serverContext.close();
94 }
95 }
96
97 /**
98 * Tests sending a message from the client side of a Copycat connection to the server side.
99 */
100 @Test
101 public void testCopycatClientConnectionSend() throws Exception {
102 Client client = clientTransport.client();
103 Server server = serverTransport.server();
104
105 CountDownLatch latch = new CountDownLatch(4);
106 CountDownLatch listenLatch = new CountDownLatch(1);
107 CountDownLatch handlerLatch = new CountDownLatch(1);
108 serverContext.executor().execute(() -> {
109 server.listen(new Address(IP_STRING, endpoint2.port()), connection -> {
110 serverContext.checkThread();
111 latch.countDown();
112 connection.handler(ConnectRequest.class, request -> {
113 serverContext.checkThread();
114 latch.countDown();
115 return CompletableFuture.completedFuture(ConnectResponse.builder()
116 .withStatus(Response.Status.OK)
117 .withLeader(new Address(IP_STRING, endpoint2.port()))
118 .withMembers(Lists.newArrayList(new Address(IP_STRING, endpoint2.port())))
119 .build());
120 });
121 handlerLatch.countDown();
122 }).thenRun(listenLatch::countDown);
123 });
124
125 listenLatch.await(5, TimeUnit.SECONDS);
126
127 clientContext.executor().execute(() -> {
128 client.connect(new Address(IP_STRING, endpoint2.port())).thenAccept(connection -> {
129 clientContext.checkThread();
130 latch.countDown();
131 try {
132 handlerLatch.await(5, TimeUnit.SECONDS);
133 } catch (InterruptedException e) {
134 fail();
135 }
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700136 connection.<ConnectRequest, ConnectResponse>sendAndReceive(ConnectRequest.builder()
Jordan Haltermane9c37092017-03-21 11:16:14 -0700137 .withClientId(UUID.randomUUID().toString())
138 .build())
139 .thenAccept(response -> {
140 clientContext.checkThread();
141 assertNotNull(response);
142 assertEquals(Response.Status.OK, response.status());
143 latch.countDown();
144 });
145 });
146 });
147
148 latch.await(5, TimeUnit.SECONDS);
149 assertEquals(0, latch.getCount());
150 }
151
152 /**
153 * Tests sending a message from the server side of a Copycat connection to the client side.
154 */
155 @Test
156 public void testCopycatServerConnectionSend() throws Exception {
157 Client client = clientTransport.client();
158 Server server = serverTransport.server();
159
160 CountDownLatch latch = new CountDownLatch(4);
161 CountDownLatch listenLatch = new CountDownLatch(1);
162 serverContext.executor().execute(() -> {
163 server.listen(new Address(IP_STRING, endpoint2.port()), connection -> {
164 serverContext.checkThread();
165 latch.countDown();
166 serverContext.schedule(Duration.ofMillis(100), () -> {
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700167 connection.<ConnectRequest, ConnectResponse>sendAndReceive(ConnectRequest.builder()
168 .withClientId("foo")
Jordan Haltermane9c37092017-03-21 11:16:14 -0700169 .build())
170 .thenAccept(response -> {
171 serverContext.checkThread();
172 assertEquals(Response.Status.OK, response.status());
Jordan Haltermane9c37092017-03-21 11:16:14 -0700173 latch.countDown();
174 });
175 });
176 }).thenRun(listenLatch::countDown);
177 });
178
179 listenLatch.await(5, TimeUnit.SECONDS);
180
181 clientContext.executor().execute(() -> {
182 client.connect(new Address(IP_STRING, endpoint2.port())).thenAccept(connection -> {
183 clientContext.checkThread();
184 latch.countDown();
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700185 connection.handler(ConnectRequest.class, request -> {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700186 clientContext.checkThread();
187 latch.countDown();
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700188 assertEquals("foo", request.client());
189 return CompletableFuture.completedFuture(ConnectResponse.builder()
Jordan Haltermane9c37092017-03-21 11:16:14 -0700190 .withStatus(Response.Status.OK)
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700191 .withLeader(new Address(IP_STRING, endpoint2.port()))
192 .withMembers(Lists.newArrayList(new Address(IP_STRING, endpoint2.port())))
Jordan Haltermane9c37092017-03-21 11:16:14 -0700193 .build());
194 });
195 });
196 });
197
198 latch.await(5, TimeUnit.SECONDS);
199 assertEquals(0, latch.getCount());
200 }
201
202 /**
203 * Tests closing the server side of a Copycat connection.
204 */
205 @Test
206 public void testCopycatClientConnectionClose() throws Exception {
207 Client client = clientTransport.client();
208 Server server = serverTransport.server();
209
210 CountDownLatch latch = new CountDownLatch(5);
211 CountDownLatch listenLatch = new CountDownLatch(1);
212 serverContext.executor().execute(() -> {
213 server.listen(new Address(IP_STRING, endpoint2.port()), connection -> {
214 serverContext.checkThread();
215 latch.countDown();
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700216 connection.onClose(c -> {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700217 serverContext.checkThread();
218 latch.countDown();
219 });
220 }).thenRun(listenLatch::countDown);
221 });
222
223 listenLatch.await(5, TimeUnit.SECONDS);
224
225 clientContext.executor().execute(() -> {
226 client.connect(new Address(IP_STRING, endpoint2.port())).thenAccept(connection -> {
227 clientContext.checkThread();
228 latch.countDown();
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700229 connection.onClose(c -> {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700230 clientContext.checkThread();
231 latch.countDown();
232 });
233 clientContext.schedule(Duration.ofMillis(100), () -> {
234 connection.close().whenComplete((result, error) -> {
235 clientContext.checkThread();
236 latch.countDown();
237 });
238 });
239 });
240 });
241
242 latch.await(5, TimeUnit.SECONDS);
243 assertEquals(0, latch.getCount());
244 }
245
246 /**
247 * Tests closing the server side of a Copycat connection.
248 */
249 @Test
250 public void testCopycatServerConnectionClose() throws Exception {
251 Client client = clientTransport.client();
252 Server server = serverTransport.server();
253
254 CountDownLatch latch = new CountDownLatch(5);
255 CountDownLatch listenLatch = new CountDownLatch(1);
256 serverContext.executor().execute(() -> {
257 server.listen(new Address(IP_STRING, endpoint2.port()), connection -> {
258 serverContext.checkThread();
259 latch.countDown();
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700260 connection.onClose(c -> {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700261 latch.countDown();
262 });
263 serverContext.schedule(Duration.ofMillis(100), () -> {
264 connection.close().whenComplete((result, error) -> {
265 serverContext.checkThread();
266 latch.countDown();
267 });
268 });
269 }).thenRun(listenLatch::countDown);
270 });
271
272 listenLatch.await(5, TimeUnit.SECONDS);
273
274 clientContext.executor().execute(() -> {
275 client.connect(new Address(IP_STRING, endpoint2.port())).thenAccept(connection -> {
276 clientContext.checkThread();
277 latch.countDown();
Jordan Haltermanfda46f92017-04-14 10:49:44 -0700278 connection.onClose(c -> {
Jordan Haltermane9c37092017-03-21 11:16:14 -0700279 latch.countDown();
280 });
281 });
282 });
283
284 latch.await(5, TimeUnit.SECONDS);
285 assertEquals(0, latch.getCount());
286 }
287
288 /**
289 * Custom implementation of {@code MessagingService} used for testing. Really, this should
290 * be mocked but suffices for now.
291 */
292 public static final class TestMessagingService implements MessagingService {
293 private final Endpoint endpoint;
294 private final Map<Endpoint, TestMessagingService> services;
295 private final Map<String, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>>> handlers =
296 new ConcurrentHashMap<>();
297
298 TestMessagingService(Endpoint endpoint, Map<Endpoint, TestMessagingService> services) {
299 this.endpoint = endpoint;
300 this.services = services;
301 services.put(endpoint, this);
302 }
303
304 private CompletableFuture<byte[]> handle(Endpoint ep, String type, byte[] message, Executor executor) {
305 BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = handlers.get(type);
306 if (handler == null) {
307 return Tools.exceptionalFuture(new IllegalStateException());
308 }
309 return handler.apply(ep, message).thenApplyAsync(r -> r, executor);
310 }
311
312 @Override
313 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
314 // Unused for testing
315 return null;
316 }
317
318 @Override
319 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
320 // Unused for testing
321 return null;
322 }
323
324 @Override
325 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
326 TestMessagingService service = services.get(ep);
327 if (service == null) {
328 return Tools.exceptionalFuture(new IllegalStateException());
329 }
330 return service.handle(endpoint, type, payload, executor);
331 }
332
333 @Override
334 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
335 // Unused for testing
336 }
337
338 @Override
339 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
340 // Unused for testing
341 }
342
343 @Override
344 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
345 handlers.put(type, handler);
346 }
347
348 @Override
349 public void unregisterHandler(String type) {
350 handlers.remove(type);
351 }
352 }
353
354}