blob: 9c59d65247585dcd8a0db1fdb3f43b52a2fb54f9 [file] [log] [blame]
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani3961b752016-01-13 13:30:58 -080017
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import com.google.common.collect.Sets;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070019import com.google.common.util.concurrent.MoreExecutors;
20import com.google.common.util.concurrent.Uninterruptibles;
Madan Jampani3961b752016-01-13 13:30:58 -080021import org.junit.After;
22import org.junit.Before;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070023import org.junit.Ignore;
Madan Jampani3961b752016-01-13 13:30:58 -080024import org.junit.Test;
25import org.onlab.packet.IpAddress;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080026import org.onosproject.cluster.ClusterMetadata;
27import org.onosproject.cluster.ClusterMetadataEventListener;
28import org.onosproject.cluster.ClusterMetadataService;
29import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.NodeId;
Madan Jampani05833872016-07-12 23:01:39 -070031import org.onosproject.core.HybridLogicalClockService;
32import org.onosproject.core.HybridLogicalTime;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080033import org.onosproject.net.provider.ProviderId;
Madan Jampani3961b752016-01-13 13:30:58 -080034import org.onosproject.store.cluster.messaging.Endpoint;
35
Jordan Halterman2f7a5d02017-05-03 16:56:53 -070036import java.net.ConnectException;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070037import java.util.Arrays;
38import java.util.UUID;
39import java.util.concurrent.CompletableFuture;
40import java.util.concurrent.CountDownLatch;
41import java.util.concurrent.ExecutorService;
42import java.util.concurrent.Executors;
43import java.util.concurrent.atomic.AtomicBoolean;
44import java.util.concurrent.atomic.AtomicLong;
45import java.util.concurrent.atomic.AtomicReference;
46import java.util.function.BiFunction;
Madan Jampani3961b752016-01-13 13:30:58 -080047
Jonathan Hart6f4d5862016-08-17 09:18:01 -070048import static org.junit.Assert.assertEquals;
49import static org.junit.Assert.assertNotNull;
50import static org.junit.Assert.assertNull;
51import static org.junit.Assert.assertTrue;
52import static org.junit.Assert.fail;
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080053import static org.onlab.junit.TestTools.findAvailablePort;
Madan Jampani3961b752016-01-13 13:30:58 -080054
55/**
56 * Unit tests for NettyMessaging.
57 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080058public class NettyMessagingManagerTest {
Madan Jampani3961b752016-01-13 13:30:58 -080059
Madan Jampani05833872016-07-12 23:01:39 -070060 HybridLogicalClockService testClockService = new HybridLogicalClockService() {
61 AtomicLong counter = new AtomicLong();
62 @Override
63 public HybridLogicalTime timeNow() {
64 return new HybridLogicalTime(counter.incrementAndGet(), 0);
65 }
66
67 @Override
68 public void recordEventTime(HybridLogicalTime time) {
69 }
70 };
71
Aaron Kruglikov1b727382016-02-09 16:17:47 -080072 NettyMessagingManager netty1;
73 NettyMessagingManager netty2;
Madan Jampani3961b752016-01-13 13:30:58 -080074
Aaron Kruglikov1b727382016-02-09 16:17:47 -080075 private static final String DUMMY_NAME = "node";
76 private static final String IP_STRING = "127.0.0.1";
77
78 Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
79 Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
80 Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
Madan Jampani3961b752016-01-13 13:30:58 -080081
82 @Before
83 public void setUp() throws Exception {
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080084 ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080085 netty1 = new NettyMessagingManager();
86 netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
Madan Jampani05833872016-07-12 23:01:39 -070087 netty1.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080088 netty1.activate();
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080089
90 ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080091 netty2 = new NettyMessagingManager();
92 netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
Madan Jampani05833872016-07-12 23:01:39 -070093 netty2.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080094 netty2.activate();
Madan Jampani3961b752016-01-13 13:30:58 -080095 }
96
Madan Jampani0a70f732016-07-20 11:30:59 -070097 /**
98 * Returns a random String to be used as a test subject.
99 * @return string
100 */
101 private String nextSubject() {
102 return UUID.randomUUID().toString();
103 }
104
Madan Jampani3961b752016-01-13 13:30:58 -0800105 @After
106 public void tearDown() throws Exception {
107 if (netty1 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800108 netty1.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -0800109 }
110
111 if (netty2 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800112 netty2.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -0800113 }
114 }
115
116 @Test
117 public void testSendAsync() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700118 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800119 CountDownLatch latch1 = new CountDownLatch(1);
Madan Jampani0a70f732016-07-20 11:30:59 -0700120 CompletableFuture<Void> response = netty1.sendAsync(ep2, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800121 response.whenComplete((r, e) -> {
122 assertNull(e);
123 latch1.countDown();
124 });
125 Uninterruptibles.awaitUninterruptibly(latch1);
126
127 CountDownLatch latch2 = new CountDownLatch(1);
Madan Jampani0a70f732016-07-20 11:30:59 -0700128 response = netty1.sendAsync(invalidEndPoint, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800129 response.whenComplete((r, e) -> {
130 assertNotNull(e);
Jordan Halterman2f7a5d02017-05-03 16:56:53 -0700131 assertTrue(e instanceof ConnectException);
Madan Jampani3961b752016-01-13 13:30:58 -0800132 latch2.countDown();
133 });
134 Uninterruptibles.awaitUninterruptibly(latch2);
135 }
136
137 @Test
Brian O'Connor63298802016-09-29 18:21:40 -0700138 @Ignore // FIXME disabled on 9/29/16 due to random failures
Madan Jampani3961b752016-01-13 13:30:58 -0800139 public void testSendAndReceive() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700140 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800141 AtomicBoolean handlerInvoked = new AtomicBoolean(false);
142 AtomicReference<byte[]> request = new AtomicReference<>();
143 AtomicReference<Endpoint> sender = new AtomicReference<>();
144
145 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
146 handlerInvoked.set(true);
147 sender.set(ep);
148 request.set(data);
149 return "hello there".getBytes();
150 };
Madan Jampani0a70f732016-07-20 11:30:59 -0700151 netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
Madan Jampani3961b752016-01-13 13:30:58 -0800152
Madan Jampani0a70f732016-07-20 11:30:59 -0700153 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800154 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
155 assertTrue(handlerInvoked.get());
156 assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
157 assertEquals(ep1, sender.get());
158 }
159
160 /*
161 * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
162 * and response completion occurs on the expected thread.
163 */
164 @Test
Jonathan Hart6f4d5862016-08-17 09:18:01 -0700165 @Ignore
Madan Jampani3961b752016-01-13 13:30:58 -0800166 public void testSendAndReceiveWithExecutor() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700167 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800168 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
169 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
170 AtomicReference<String> handlerThreadName = new AtomicReference<>();
171 AtomicReference<String> completionThreadName = new AtomicReference<>();
172
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700173 final CountDownLatch latch = new CountDownLatch(1);
174
Madan Jampani3961b752016-01-13 13:30:58 -0800175 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
176 handlerThreadName.set(Thread.currentThread().getName());
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700177 try {
178 latch.await();
179 } catch (InterruptedException e1) {
180 Thread.currentThread().interrupt();
181 fail("InterruptedException");
182 }
Madan Jampani3961b752016-01-13 13:30:58 -0800183 return "hello there".getBytes();
184 };
Madan Jampani0a70f732016-07-20 11:30:59 -0700185 netty2.registerHandler(subject, handler, handlerExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800186
187 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
Madan Jampani0a70f732016-07-20 11:30:59 -0700188 subject,
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800189 "hello world".getBytes(),
190 completionExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800191 response.whenComplete((r, e) -> {
192 completionThreadName.set(Thread.currentThread().getName());
193 });
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700194 latch.countDown();
Madan Jampani3961b752016-01-13 13:30:58 -0800195
196 // Verify that the message was request handling and response completion happens on the correct thread.
197 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
198 assertEquals("completion-thread", completionThreadName.get());
199 assertEquals("handler-thread", handlerThreadName.get());
200 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800201
202 private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
203 return new ClusterMetadataService() {
204 @Override
205 public ClusterMetadata getClusterMetadata() {
206 return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
207 name, Sets.newHashSet(), Sets.newHashSet());
208 }
209
210 @Override
211 public ControllerNode getLocalNode() {
212 return new ControllerNode() {
213 @Override
214 public NodeId id() {
215 return null;
216 }
217
218 @Override
219 public IpAddress ip() {
220 return IpAddress.valueOf(ipAddress);
221 }
222
223 @Override
224 public int tcpPort() {
225 return ep.port();
226 }
227 };
228 }
229
230 @Override
231 public void addListener(ClusterMetadataEventListener listener) {}
232
233 @Override
234 public void removeListener(ClusterMetadataEventListener listener) {}
235 };
236 }
Jonathan Hart6f4d5862016-08-17 09:18:01 -0700237}