blob: cbcefd46c43d34c62c072308c15c2ef6824bd0c0 [file] [log] [blame]
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani3961b752016-01-13 13:30:58 -080017
18import java.util.Arrays;
Madan Jampani0a70f732016-07-20 11:30:59 -070019import java.util.UUID;
Madan Jampani3961b752016-01-13 13:30:58 -080020import java.util.concurrent.CompletableFuture;
21import java.util.concurrent.CountDownLatch;
22import java.util.concurrent.ExecutorService;
23import java.util.concurrent.Executors;
24import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani05833872016-07-12 23:01:39 -070025import java.util.concurrent.atomic.AtomicLong;
Madan Jampani3961b752016-01-13 13:30:58 -080026import java.util.concurrent.atomic.AtomicReference;
27import java.util.function.BiFunction;
28
Aaron Kruglikov1b727382016-02-09 16:17:47 -080029import com.google.common.collect.Sets;
Madan Jampani05833872016-07-12 23:01:39 -070030
Madan Jampani3961b752016-01-13 13:30:58 -080031import org.junit.After;
32import org.junit.Before;
33import org.junit.Test;
34import org.onlab.packet.IpAddress;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080035import org.onosproject.cluster.ClusterMetadata;
36import org.onosproject.cluster.ClusterMetadataEventListener;
37import org.onosproject.cluster.ClusterMetadataService;
38import org.onosproject.cluster.ControllerNode;
39import org.onosproject.cluster.NodeId;
Madan Jampani05833872016-07-12 23:01:39 -070040import org.onosproject.core.HybridLogicalClockService;
41import org.onosproject.core.HybridLogicalTime;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080042import org.onosproject.net.provider.ProviderId;
Madan Jampani3961b752016-01-13 13:30:58 -080043import org.onosproject.store.cluster.messaging.Endpoint;
44
45import com.google.common.util.concurrent.MoreExecutors;
46import com.google.common.util.concurrent.Uninterruptibles;
47
48import static org.junit.Assert.*;
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080049import static org.onlab.junit.TestTools.findAvailablePort;
Madan Jampani3961b752016-01-13 13:30:58 -080050
51/**
52 * Unit tests for NettyMessaging.
53 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080054public class NettyMessagingManagerTest {
Madan Jampani3961b752016-01-13 13:30:58 -080055
Madan Jampani05833872016-07-12 23:01:39 -070056 HybridLogicalClockService testClockService = new HybridLogicalClockService() {
57 AtomicLong counter = new AtomicLong();
58 @Override
59 public HybridLogicalTime timeNow() {
60 return new HybridLogicalTime(counter.incrementAndGet(), 0);
61 }
62
63 @Override
64 public void recordEventTime(HybridLogicalTime time) {
65 }
66 };
67
Aaron Kruglikov1b727382016-02-09 16:17:47 -080068 NettyMessagingManager netty1;
69 NettyMessagingManager netty2;
Madan Jampani3961b752016-01-13 13:30:58 -080070
Aaron Kruglikov1b727382016-02-09 16:17:47 -080071 private static final String DUMMY_NAME = "node";
72 private static final String IP_STRING = "127.0.0.1";
73
74 Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
75 Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
76 Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
Madan Jampani3961b752016-01-13 13:30:58 -080077
78 @Before
79 public void setUp() throws Exception {
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080080 ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080081 netty1 = new NettyMessagingManager();
82 netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
Madan Jampani05833872016-07-12 23:01:39 -070083 netty1.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080084 netty1.activate();
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080085
86 ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080087 netty2 = new NettyMessagingManager();
88 netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
Madan Jampani05833872016-07-12 23:01:39 -070089 netty2.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080090 netty2.activate();
Madan Jampani3961b752016-01-13 13:30:58 -080091 }
92
Madan Jampani0a70f732016-07-20 11:30:59 -070093 /**
94 * Returns a random String to be used as a test subject.
95 * @return string
96 */
97 private String nextSubject() {
98 return UUID.randomUUID().toString();
99 }
100
Madan Jampani3961b752016-01-13 13:30:58 -0800101 @After
102 public void tearDown() throws Exception {
103 if (netty1 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800104 netty1.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -0800105 }
106
107 if (netty2 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800108 netty2.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -0800109 }
110 }
111
112 @Test
113 public void testSendAsync() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700114 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800115 CountDownLatch latch1 = new CountDownLatch(1);
Madan Jampani0a70f732016-07-20 11:30:59 -0700116 CompletableFuture<Void> response = netty1.sendAsync(ep2, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800117 response.whenComplete((r, e) -> {
118 assertNull(e);
119 latch1.countDown();
120 });
121 Uninterruptibles.awaitUninterruptibly(latch1);
122
123 CountDownLatch latch2 = new CountDownLatch(1);
Madan Jampani0a70f732016-07-20 11:30:59 -0700124 response = netty1.sendAsync(invalidEndPoint, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800125 response.whenComplete((r, e) -> {
126 assertNotNull(e);
127 latch2.countDown();
128 });
129 Uninterruptibles.awaitUninterruptibly(latch2);
130 }
131
132 @Test
133 public void testSendAndReceive() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700134 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800135 AtomicBoolean handlerInvoked = new AtomicBoolean(false);
136 AtomicReference<byte[]> request = new AtomicReference<>();
137 AtomicReference<Endpoint> sender = new AtomicReference<>();
138
139 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
140 handlerInvoked.set(true);
141 sender.set(ep);
142 request.set(data);
143 return "hello there".getBytes();
144 };
Madan Jampani0a70f732016-07-20 11:30:59 -0700145 netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
Madan Jampani3961b752016-01-13 13:30:58 -0800146
Madan Jampani0a70f732016-07-20 11:30:59 -0700147 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800148 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
149 assertTrue(handlerInvoked.get());
150 assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
151 assertEquals(ep1, sender.get());
152 }
153
154 /*
155 * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
156 * and response completion occurs on the expected thread.
157 */
158 @Test
159 public void testSendAndReceiveWithExecutor() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700160 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800161 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
162 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
163 AtomicReference<String> handlerThreadName = new AtomicReference<>();
164 AtomicReference<String> completionThreadName = new AtomicReference<>();
165
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700166 final CountDownLatch latch = new CountDownLatch(1);
167
Madan Jampani3961b752016-01-13 13:30:58 -0800168 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
169 handlerThreadName.set(Thread.currentThread().getName());
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700170 try {
171 latch.await();
172 } catch (InterruptedException e1) {
173 Thread.currentThread().interrupt();
174 fail("InterruptedException");
175 }
Madan Jampani3961b752016-01-13 13:30:58 -0800176 return "hello there".getBytes();
177 };
Madan Jampani0a70f732016-07-20 11:30:59 -0700178 netty2.registerHandler(subject, handler, handlerExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800179
180 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
Madan Jampani0a70f732016-07-20 11:30:59 -0700181 subject,
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800182 "hello world".getBytes(),
183 completionExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800184 response.whenComplete((r, e) -> {
185 completionThreadName.set(Thread.currentThread().getName());
186 });
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700187 latch.countDown();
Madan Jampani3961b752016-01-13 13:30:58 -0800188
189 // Verify that the message was request handling and response completion happens on the correct thread.
190 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
191 assertEquals("completion-thread", completionThreadName.get());
192 assertEquals("handler-thread", handlerThreadName.get());
193 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800194
195 private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
196 return new ClusterMetadataService() {
197 @Override
198 public ClusterMetadata getClusterMetadata() {
199 return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
200 name, Sets.newHashSet(), Sets.newHashSet());
201 }
202
203 @Override
204 public ControllerNode getLocalNode() {
205 return new ControllerNode() {
206 @Override
207 public NodeId id() {
208 return null;
209 }
210
211 @Override
212 public IpAddress ip() {
213 return IpAddress.valueOf(ipAddress);
214 }
215
216 @Override
217 public int tcpPort() {
218 return ep.port();
219 }
220 };
221 }
222
223 @Override
224 public void addListener(ClusterMetadataEventListener listener) {}
225
226 @Override
227 public void removeListener(ClusterMetadataEventListener listener) {}
228 };
229 }
230}