blob: 21b4d704beb976867b25fe6235a7fd0abfe3d04a [file] [log] [blame]
Jordan Haltermane9c37092017-03-21 11:16:14 -07001/*
2 * Copyright 2017-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.store.primitives.impl;
17
18import com.google.common.collect.Lists;
19import io.atomix.catalyst.concurrent.SingleThreadContext;
20import io.atomix.catalyst.concurrent.ThreadContext;
21import io.atomix.catalyst.transport.Address;
22import io.atomix.catalyst.transport.Client;
23import io.atomix.catalyst.transport.Server;
24import io.atomix.catalyst.transport.Transport;
25import io.atomix.copycat.protocol.ConnectRequest;
26import io.atomix.copycat.protocol.ConnectResponse;
27import io.atomix.copycat.protocol.PublishRequest;
28import io.atomix.copycat.protocol.PublishResponse;
29import io.atomix.copycat.protocol.Response;
30import org.junit.After;
31import org.junit.Before;
32import org.junit.Test;
33import org.onlab.packet.IpAddress;
34import org.onlab.util.Tools;
35import org.onosproject.cluster.PartitionId;
36import org.onosproject.store.cluster.messaging.Endpoint;
37import org.onosproject.store.cluster.messaging.MessagingService;
38
39import java.time.Duration;
40import java.util.Map;
41import java.util.UUID;
42import java.util.concurrent.CompletableFuture;
43import java.util.concurrent.ConcurrentHashMap;
44import java.util.concurrent.CountDownLatch;
45import java.util.concurrent.Executor;
46import java.util.concurrent.TimeUnit;
47import java.util.function.BiConsumer;
48import java.util.function.BiFunction;
49
50import static org.junit.Assert.assertEquals;
51import static org.junit.Assert.assertNotNull;
52import static org.junit.Assert.fail;
53import static org.onlab.junit.TestTools.findAvailablePort;
54
55/**
56 * Copycat transport test.
57 */
58public class CopycatTransportTest {
59
60 private static final String IP_STRING = "127.0.0.1";
61
62 private Endpoint endpoint1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
63 private Endpoint endpoint2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
64
65 private MessagingService service1;
66 private MessagingService service2;
67
68 private Transport clientTransport;
69 private ThreadContext clientContext;
70
71 private Transport serverTransport;
72 private ThreadContext serverContext;
73
74 @Before
75 public void setUp() throws Exception {
76 Map<Endpoint, TestMessagingService> services = new ConcurrentHashMap<>();
77
78 endpoint1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
79 service1 = new TestMessagingService(endpoint1, services);
80 clientTransport = new CopycatTransport(PartitionId.from(1), service1);
81 clientContext = new SingleThreadContext("client-test-%d", CatalystSerializers.getSerializer());
82
83 endpoint2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
84 service2 = new TestMessagingService(endpoint2, services);
85 serverTransport = new CopycatTransport(PartitionId.from(1), service2);
86 serverContext = new SingleThreadContext("server-test-%d", CatalystSerializers.getSerializer());
87 }
88
89 @After
90 public void tearDown() throws Exception {
91 if (clientContext != null) {
92 clientContext.close();
93 }
94 if (serverContext != null) {
95 serverContext.close();
96 }
97 }
98
99 /**
100 * Tests sending a message from the client side of a Copycat connection to the server side.
101 */
102 @Test
103 public void testCopycatClientConnectionSend() throws Exception {
104 Client client = clientTransport.client();
105 Server server = serverTransport.server();
106
107 CountDownLatch latch = new CountDownLatch(4);
108 CountDownLatch listenLatch = new CountDownLatch(1);
109 CountDownLatch handlerLatch = new CountDownLatch(1);
110 serverContext.executor().execute(() -> {
111 server.listen(new Address(IP_STRING, endpoint2.port()), connection -> {
112 serverContext.checkThread();
113 latch.countDown();
114 connection.handler(ConnectRequest.class, request -> {
115 serverContext.checkThread();
116 latch.countDown();
117 return CompletableFuture.completedFuture(ConnectResponse.builder()
118 .withStatus(Response.Status.OK)
119 .withLeader(new Address(IP_STRING, endpoint2.port()))
120 .withMembers(Lists.newArrayList(new Address(IP_STRING, endpoint2.port())))
121 .build());
122 });
123 handlerLatch.countDown();
124 }).thenRun(listenLatch::countDown);
125 });
126
127 listenLatch.await(5, TimeUnit.SECONDS);
128
129 clientContext.executor().execute(() -> {
130 client.connect(new Address(IP_STRING, endpoint2.port())).thenAccept(connection -> {
131 clientContext.checkThread();
132 latch.countDown();
133 try {
134 handlerLatch.await(5, TimeUnit.SECONDS);
135 } catch (InterruptedException e) {
136 fail();
137 }
138 connection.<ConnectRequest, ConnectResponse>send(ConnectRequest.builder()
139 .withClientId(UUID.randomUUID().toString())
140 .build())
141 .thenAccept(response -> {
142 clientContext.checkThread();
143 assertNotNull(response);
144 assertEquals(Response.Status.OK, response.status());
145 latch.countDown();
146 });
147 });
148 });
149
150 latch.await(5, TimeUnit.SECONDS);
151 assertEquals(0, latch.getCount());
152 }
153
154 /**
155 * Tests sending a message from the server side of a Copycat connection to the client side.
156 */
157 @Test
158 public void testCopycatServerConnectionSend() throws Exception {
159 Client client = clientTransport.client();
160 Server server = serverTransport.server();
161
162 CountDownLatch latch = new CountDownLatch(4);
163 CountDownLatch listenLatch = new CountDownLatch(1);
164 serverContext.executor().execute(() -> {
165 server.listen(new Address(IP_STRING, endpoint2.port()), connection -> {
166 serverContext.checkThread();
167 latch.countDown();
168 serverContext.schedule(Duration.ofMillis(100), () -> {
169 connection.<PublishRequest, PublishResponse>send(PublishRequest.builder()
170 .withSession(1)
171 .withEventIndex(3)
172 .withPreviousIndex(2)
173 .build())
174 .thenAccept(response -> {
175 serverContext.checkThread();
176 assertEquals(Response.Status.OK, response.status());
177 assertEquals(1, response.index());
178 latch.countDown();
179 });
180 });
181 }).thenRun(listenLatch::countDown);
182 });
183
184 listenLatch.await(5, TimeUnit.SECONDS);
185
186 clientContext.executor().execute(() -> {
187 client.connect(new Address(IP_STRING, endpoint2.port())).thenAccept(connection -> {
188 clientContext.checkThread();
189 latch.countDown();
190 connection.handler(PublishRequest.class, request -> {
191 clientContext.checkThread();
192 latch.countDown();
193 assertEquals(1, request.session());
194 assertEquals(3, request.eventIndex());
195 assertEquals(2, request.previousIndex());
196 return CompletableFuture.completedFuture(PublishResponse.builder()
197 .withStatus(Response.Status.OK)
198 .withIndex(1)
199 .build());
200 });
201 });
202 });
203
204 latch.await(5, TimeUnit.SECONDS);
205 assertEquals(0, latch.getCount());
206 }
207
208 /**
209 * Tests closing the server side of a Copycat connection.
210 */
211 @Test
212 public void testCopycatClientConnectionClose() throws Exception {
213 Client client = clientTransport.client();
214 Server server = serverTransport.server();
215
216 CountDownLatch latch = new CountDownLatch(5);
217 CountDownLatch listenLatch = new CountDownLatch(1);
218 serverContext.executor().execute(() -> {
219 server.listen(new Address(IP_STRING, endpoint2.port()), connection -> {
220 serverContext.checkThread();
221 latch.countDown();
222 connection.closeListener(c -> {
223 serverContext.checkThread();
224 latch.countDown();
225 });
226 }).thenRun(listenLatch::countDown);
227 });
228
229 listenLatch.await(5, TimeUnit.SECONDS);
230
231 clientContext.executor().execute(() -> {
232 client.connect(new Address(IP_STRING, endpoint2.port())).thenAccept(connection -> {
233 clientContext.checkThread();
234 latch.countDown();
235 connection.closeListener(c -> {
236 clientContext.checkThread();
237 latch.countDown();
238 });
239 clientContext.schedule(Duration.ofMillis(100), () -> {
240 connection.close().whenComplete((result, error) -> {
241 clientContext.checkThread();
242 latch.countDown();
243 });
244 });
245 });
246 });
247
248 latch.await(5, TimeUnit.SECONDS);
249 assertEquals(0, latch.getCount());
250 }
251
252 /**
253 * Tests closing the server side of a Copycat connection.
254 */
255 @Test
256 public void testCopycatServerConnectionClose() throws Exception {
257 Client client = clientTransport.client();
258 Server server = serverTransport.server();
259
260 CountDownLatch latch = new CountDownLatch(5);
261 CountDownLatch listenLatch = new CountDownLatch(1);
262 serverContext.executor().execute(() -> {
263 server.listen(new Address(IP_STRING, endpoint2.port()), connection -> {
264 serverContext.checkThread();
265 latch.countDown();
266 connection.closeListener(c -> {
267 latch.countDown();
268 });
269 serverContext.schedule(Duration.ofMillis(100), () -> {
270 connection.close().whenComplete((result, error) -> {
271 serverContext.checkThread();
272 latch.countDown();
273 });
274 });
275 }).thenRun(listenLatch::countDown);
276 });
277
278 listenLatch.await(5, TimeUnit.SECONDS);
279
280 clientContext.executor().execute(() -> {
281 client.connect(new Address(IP_STRING, endpoint2.port())).thenAccept(connection -> {
282 clientContext.checkThread();
283 latch.countDown();
284 connection.closeListener(c -> {
285 latch.countDown();
286 });
287 });
288 });
289
290 latch.await(5, TimeUnit.SECONDS);
291 assertEquals(0, latch.getCount());
292 }
293
294 /**
295 * Custom implementation of {@code MessagingService} used for testing. Really, this should
296 * be mocked but suffices for now.
297 */
298 public static final class TestMessagingService implements MessagingService {
299 private final Endpoint endpoint;
300 private final Map<Endpoint, TestMessagingService> services;
301 private final Map<String, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>>> handlers =
302 new ConcurrentHashMap<>();
303
304 TestMessagingService(Endpoint endpoint, Map<Endpoint, TestMessagingService> services) {
305 this.endpoint = endpoint;
306 this.services = services;
307 services.put(endpoint, this);
308 }
309
310 private CompletableFuture<byte[]> handle(Endpoint ep, String type, byte[] message, Executor executor) {
311 BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler = handlers.get(type);
312 if (handler == null) {
313 return Tools.exceptionalFuture(new IllegalStateException());
314 }
315 return handler.apply(ep, message).thenApplyAsync(r -> r, executor);
316 }
317
318 @Override
319 public CompletableFuture<Void> sendAsync(Endpoint ep, String type, byte[] payload) {
320 // Unused for testing
321 return null;
322 }
323
324 @Override
325 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload) {
326 // Unused for testing
327 return null;
328 }
329
330 @Override
331 public CompletableFuture<byte[]> sendAndReceive(Endpoint ep, String type, byte[] payload, Executor executor) {
332 TestMessagingService service = services.get(ep);
333 if (service == null) {
334 return Tools.exceptionalFuture(new IllegalStateException());
335 }
336 return service.handle(endpoint, type, payload, executor);
337 }
338
339 @Override
340 public void registerHandler(String type, BiConsumer<Endpoint, byte[]> handler, Executor executor) {
341 // Unused for testing
342 }
343
344 @Override
345 public void registerHandler(String type, BiFunction<Endpoint, byte[], byte[]> handler, Executor executor) {
346 // Unused for testing
347 }
348
349 @Override
350 public void registerHandler(String type, BiFunction<Endpoint, byte[], CompletableFuture<byte[]>> handler) {
351 handlers.put(type, handler);
352 }
353
354 @Override
355 public void unregisterHandler(String type) {
356 handlers.remove(type);
357 }
358 }
359
360}