blob: 4122888094e661629137568556d3c299f8f0136f [file] [log] [blame]
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani3961b752016-01-13 13:30:58 -080017
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import com.google.common.collect.Sets;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070019import com.google.common.util.concurrent.MoreExecutors;
20import com.google.common.util.concurrent.Uninterruptibles;
Madan Jampani3961b752016-01-13 13:30:58 -080021import org.junit.After;
22import org.junit.Before;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070023import org.junit.Ignore;
Madan Jampani3961b752016-01-13 13:30:58 -080024import org.junit.Test;
25import org.onlab.packet.IpAddress;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080026import org.onosproject.cluster.ClusterMetadata;
27import org.onosproject.cluster.ClusterMetadataEventListener;
28import org.onosproject.cluster.ClusterMetadataService;
29import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.NodeId;
Madan Jampani05833872016-07-12 23:01:39 -070031import org.onosproject.core.HybridLogicalClockService;
32import org.onosproject.core.HybridLogicalTime;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080033import org.onosproject.net.provider.ProviderId;
Madan Jampani3961b752016-01-13 13:30:58 -080034import org.onosproject.store.cluster.messaging.Endpoint;
35
Jonathan Hart6f4d5862016-08-17 09:18:01 -070036import java.util.Arrays;
37import java.util.UUID;
38import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.CountDownLatch;
40import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
42import java.util.concurrent.atomic.AtomicBoolean;
43import java.util.concurrent.atomic.AtomicLong;
44import java.util.concurrent.atomic.AtomicReference;
45import java.util.function.BiFunction;
Madan Jampani3961b752016-01-13 13:30:58 -080046
Jonathan Hart6f4d5862016-08-17 09:18:01 -070047import static org.junit.Assert.assertEquals;
48import static org.junit.Assert.assertNotNull;
49import static org.junit.Assert.assertNull;
50import static org.junit.Assert.assertTrue;
51import static org.junit.Assert.fail;
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080052import static org.onlab.junit.TestTools.findAvailablePort;
Madan Jampani3961b752016-01-13 13:30:58 -080053
54/**
55 * Unit tests for NettyMessaging.
56 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080057public class NettyMessagingManagerTest {
Madan Jampani3961b752016-01-13 13:30:58 -080058
Madan Jampani05833872016-07-12 23:01:39 -070059 HybridLogicalClockService testClockService = new HybridLogicalClockService() {
60 AtomicLong counter = new AtomicLong();
61 @Override
62 public HybridLogicalTime timeNow() {
63 return new HybridLogicalTime(counter.incrementAndGet(), 0);
64 }
65
66 @Override
67 public void recordEventTime(HybridLogicalTime time) {
68 }
69 };
70
Aaron Kruglikov1b727382016-02-09 16:17:47 -080071 NettyMessagingManager netty1;
72 NettyMessagingManager netty2;
Madan Jampani3961b752016-01-13 13:30:58 -080073
Aaron Kruglikov1b727382016-02-09 16:17:47 -080074 private static final String DUMMY_NAME = "node";
75 private static final String IP_STRING = "127.0.0.1";
76
77 Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
78 Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
79 Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
Madan Jampani3961b752016-01-13 13:30:58 -080080
81 @Before
82 public void setUp() throws Exception {
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080083 ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080084 netty1 = new NettyMessagingManager();
85 netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
Madan Jampani05833872016-07-12 23:01:39 -070086 netty1.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080087 netty1.activate();
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080088
89 ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080090 netty2 = new NettyMessagingManager();
91 netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
Madan Jampani05833872016-07-12 23:01:39 -070092 netty2.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080093 netty2.activate();
Madan Jampani3961b752016-01-13 13:30:58 -080094 }
95
Madan Jampani0a70f732016-07-20 11:30:59 -070096 /**
97 * Returns a random String to be used as a test subject.
98 * @return string
99 */
100 private String nextSubject() {
101 return UUID.randomUUID().toString();
102 }
103
Madan Jampani3961b752016-01-13 13:30:58 -0800104 @After
105 public void tearDown() throws Exception {
106 if (netty1 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800107 netty1.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -0800108 }
109
110 if (netty2 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800111 netty2.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -0800112 }
113 }
114
115 @Test
116 public void testSendAsync() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700117 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800118 CountDownLatch latch1 = new CountDownLatch(1);
Madan Jampani0a70f732016-07-20 11:30:59 -0700119 CompletableFuture<Void> response = netty1.sendAsync(ep2, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800120 response.whenComplete((r, e) -> {
121 assertNull(e);
122 latch1.countDown();
123 });
124 Uninterruptibles.awaitUninterruptibly(latch1);
125
126 CountDownLatch latch2 = new CountDownLatch(1);
Madan Jampani0a70f732016-07-20 11:30:59 -0700127 response = netty1.sendAsync(invalidEndPoint, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800128 response.whenComplete((r, e) -> {
129 assertNotNull(e);
130 latch2.countDown();
131 });
132 Uninterruptibles.awaitUninterruptibly(latch2);
133 }
134
135 @Test
Brian O'Connor63298802016-09-29 18:21:40 -0700136 @Ignore // FIXME disabled on 9/29/16 due to random failures
Madan Jampani3961b752016-01-13 13:30:58 -0800137 public void testSendAndReceive() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700138 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800139 AtomicBoolean handlerInvoked = new AtomicBoolean(false);
140 AtomicReference<byte[]> request = new AtomicReference<>();
141 AtomicReference<Endpoint> sender = new AtomicReference<>();
142
143 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
144 handlerInvoked.set(true);
145 sender.set(ep);
146 request.set(data);
147 return "hello there".getBytes();
148 };
Madan Jampani0a70f732016-07-20 11:30:59 -0700149 netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
Madan Jampani3961b752016-01-13 13:30:58 -0800150
Madan Jampani0a70f732016-07-20 11:30:59 -0700151 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800152 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
153 assertTrue(handlerInvoked.get());
154 assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
155 assertEquals(ep1, sender.get());
156 }
157
158 /*
159 * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
160 * and response completion occurs on the expected thread.
161 */
162 @Test
Jonathan Hart6f4d5862016-08-17 09:18:01 -0700163 @Ignore
Madan Jampani3961b752016-01-13 13:30:58 -0800164 public void testSendAndReceiveWithExecutor() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700165 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800166 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
167 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
168 AtomicReference<String> handlerThreadName = new AtomicReference<>();
169 AtomicReference<String> completionThreadName = new AtomicReference<>();
170
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700171 final CountDownLatch latch = new CountDownLatch(1);
172
Madan Jampani3961b752016-01-13 13:30:58 -0800173 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
174 handlerThreadName.set(Thread.currentThread().getName());
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700175 try {
176 latch.await();
177 } catch (InterruptedException e1) {
178 Thread.currentThread().interrupt();
179 fail("InterruptedException");
180 }
Madan Jampani3961b752016-01-13 13:30:58 -0800181 return "hello there".getBytes();
182 };
Madan Jampani0a70f732016-07-20 11:30:59 -0700183 netty2.registerHandler(subject, handler, handlerExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800184
185 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
Madan Jampani0a70f732016-07-20 11:30:59 -0700186 subject,
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800187 "hello world".getBytes(),
188 completionExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800189 response.whenComplete((r, e) -> {
190 completionThreadName.set(Thread.currentThread().getName());
191 });
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700192 latch.countDown();
Madan Jampani3961b752016-01-13 13:30:58 -0800193
194 // Verify that the message was request handling and response completion happens on the correct thread.
195 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
196 assertEquals("completion-thread", completionThreadName.get());
197 assertEquals("handler-thread", handlerThreadName.get());
198 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800199
200 private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
201 return new ClusterMetadataService() {
202 @Override
203 public ClusterMetadata getClusterMetadata() {
204 return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
205 name, Sets.newHashSet(), Sets.newHashSet());
206 }
207
208 @Override
209 public ControllerNode getLocalNode() {
210 return new ControllerNode() {
211 @Override
212 public NodeId id() {
213 return null;
214 }
215
216 @Override
217 public IpAddress ip() {
218 return IpAddress.valueOf(ipAddress);
219 }
220
221 @Override
222 public int tcpPort() {
223 return ep.port();
224 }
225 };
226 }
227
228 @Override
229 public void addListener(ClusterMetadataEventListener listener) {}
230
231 @Override
232 public void removeListener(ClusterMetadataEventListener listener) {}
233 };
234 }
Jonathan Hart6f4d5862016-08-17 09:18:01 -0700235}