blob: 9c8a016fad3af7b4d32ac6a9bb51d63129d9b04a [file] [log] [blame]
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani3961b752016-01-13 13:30:58 -080017
Aaron Kruglikov1b727382016-02-09 16:17:47 -080018import com.google.common.collect.Sets;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070019import com.google.common.util.concurrent.MoreExecutors;
20import com.google.common.util.concurrent.Uninterruptibles;
Madan Jampani3961b752016-01-13 13:30:58 -080021import org.junit.After;
22import org.junit.Before;
Jonathan Hart6f4d5862016-08-17 09:18:01 -070023import org.junit.Ignore;
Madan Jampani3961b752016-01-13 13:30:58 -080024import org.junit.Test;
25import org.onlab.packet.IpAddress;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080026import org.onosproject.cluster.ClusterMetadata;
27import org.onosproject.cluster.ClusterMetadataEventListener;
28import org.onosproject.cluster.ClusterMetadataService;
29import org.onosproject.cluster.ControllerNode;
30import org.onosproject.cluster.NodeId;
Madan Jampani05833872016-07-12 23:01:39 -070031import org.onosproject.core.HybridLogicalClockService;
32import org.onosproject.core.HybridLogicalTime;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080033import org.onosproject.net.provider.ProviderId;
Madan Jampani3961b752016-01-13 13:30:58 -080034import org.onosproject.store.cluster.messaging.Endpoint;
35
Jonathan Hart6f4d5862016-08-17 09:18:01 -070036import java.util.Arrays;
37import java.util.UUID;
38import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.CountDownLatch;
40import java.util.concurrent.ExecutorService;
41import java.util.concurrent.Executors;
42import java.util.concurrent.atomic.AtomicBoolean;
43import java.util.concurrent.atomic.AtomicLong;
44import java.util.concurrent.atomic.AtomicReference;
45import java.util.function.BiFunction;
Madan Jampani3961b752016-01-13 13:30:58 -080046
Jonathan Hart6f4d5862016-08-17 09:18:01 -070047import static org.junit.Assert.assertEquals;
48import static org.junit.Assert.assertNotNull;
49import static org.junit.Assert.assertNull;
50import static org.junit.Assert.assertTrue;
51import static org.junit.Assert.fail;
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080052import static org.onlab.junit.TestTools.findAvailablePort;
Madan Jampani3961b752016-01-13 13:30:58 -080053
54/**
55 * Unit tests for NettyMessaging.
56 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080057public class NettyMessagingManagerTest {
Madan Jampani3961b752016-01-13 13:30:58 -080058
Madan Jampani05833872016-07-12 23:01:39 -070059 HybridLogicalClockService testClockService = new HybridLogicalClockService() {
60 AtomicLong counter = new AtomicLong();
61 @Override
62 public HybridLogicalTime timeNow() {
63 return new HybridLogicalTime(counter.incrementAndGet(), 0);
64 }
65
66 @Override
67 public void recordEventTime(HybridLogicalTime time) {
68 }
69 };
70
Aaron Kruglikov1b727382016-02-09 16:17:47 -080071 NettyMessagingManager netty1;
72 NettyMessagingManager netty2;
Madan Jampani3961b752016-01-13 13:30:58 -080073
Aaron Kruglikov1b727382016-02-09 16:17:47 -080074 private static final String DUMMY_NAME = "node";
75 private static final String IP_STRING = "127.0.0.1";
76
77 Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
78 Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
79 Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
Madan Jampani3961b752016-01-13 13:30:58 -080080
81 @Before
82 public void setUp() throws Exception {
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080083 ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080084 netty1 = new NettyMessagingManager();
85 netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
Madan Jampani05833872016-07-12 23:01:39 -070086 netty1.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080087 netty1.activate();
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080088
89 ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080090 netty2 = new NettyMessagingManager();
91 netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
Madan Jampani05833872016-07-12 23:01:39 -070092 netty2.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080093 netty2.activate();
Madan Jampani3961b752016-01-13 13:30:58 -080094 }
95
Madan Jampani0a70f732016-07-20 11:30:59 -070096 /**
97 * Returns a random String to be used as a test subject.
98 * @return string
99 */
100 private String nextSubject() {
101 return UUID.randomUUID().toString();
102 }
103
Madan Jampani3961b752016-01-13 13:30:58 -0800104 @After
105 public void tearDown() throws Exception {
106 if (netty1 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800107 netty1.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -0800108 }
109
110 if (netty2 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800111 netty2.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -0800112 }
113 }
114
115 @Test
116 public void testSendAsync() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700117 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800118 CountDownLatch latch1 = new CountDownLatch(1);
Madan Jampani0a70f732016-07-20 11:30:59 -0700119 CompletableFuture<Void> response = netty1.sendAsync(ep2, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800120 response.whenComplete((r, e) -> {
121 assertNull(e);
122 latch1.countDown();
123 });
124 Uninterruptibles.awaitUninterruptibly(latch1);
125
126 CountDownLatch latch2 = new CountDownLatch(1);
Madan Jampani0a70f732016-07-20 11:30:59 -0700127 response = netty1.sendAsync(invalidEndPoint, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800128 response.whenComplete((r, e) -> {
129 assertNotNull(e);
130 latch2.countDown();
131 });
132 Uninterruptibles.awaitUninterruptibly(latch2);
133 }
134
135 @Test
136 public void testSendAndReceive() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700137 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800138 AtomicBoolean handlerInvoked = new AtomicBoolean(false);
139 AtomicReference<byte[]> request = new AtomicReference<>();
140 AtomicReference<Endpoint> sender = new AtomicReference<>();
141
142 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
143 handlerInvoked.set(true);
144 sender.set(ep);
145 request.set(data);
146 return "hello there".getBytes();
147 };
Madan Jampani0a70f732016-07-20 11:30:59 -0700148 netty2.registerHandler(subject, handler, MoreExecutors.directExecutor());
Madan Jampani3961b752016-01-13 13:30:58 -0800149
Madan Jampani0a70f732016-07-20 11:30:59 -0700150 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, subject, "hello world".getBytes());
Madan Jampani3961b752016-01-13 13:30:58 -0800151 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
152 assertTrue(handlerInvoked.get());
153 assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
154 assertEquals(ep1, sender.get());
155 }
156
157 /*
158 * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
159 * and response completion occurs on the expected thread.
160 */
161 @Test
Jonathan Hart6f4d5862016-08-17 09:18:01 -0700162 @Ignore
Madan Jampani3961b752016-01-13 13:30:58 -0800163 public void testSendAndReceiveWithExecutor() {
Madan Jampani0a70f732016-07-20 11:30:59 -0700164 String subject = nextSubject();
Madan Jampani3961b752016-01-13 13:30:58 -0800165 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
166 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
167 AtomicReference<String> handlerThreadName = new AtomicReference<>();
168 AtomicReference<String> completionThreadName = new AtomicReference<>();
169
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700170 final CountDownLatch latch = new CountDownLatch(1);
171
Madan Jampani3961b752016-01-13 13:30:58 -0800172 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
173 handlerThreadName.set(Thread.currentThread().getName());
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700174 try {
175 latch.await();
176 } catch (InterruptedException e1) {
177 Thread.currentThread().interrupt();
178 fail("InterruptedException");
179 }
Madan Jampani3961b752016-01-13 13:30:58 -0800180 return "hello there".getBytes();
181 };
Madan Jampani0a70f732016-07-20 11:30:59 -0700182 netty2.registerHandler(subject, handler, handlerExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800183
184 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
Madan Jampani0a70f732016-07-20 11:30:59 -0700185 subject,
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800186 "hello world".getBytes(),
187 completionExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800188 response.whenComplete((r, e) -> {
189 completionThreadName.set(Thread.currentThread().getName());
190 });
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700191 latch.countDown();
Madan Jampani3961b752016-01-13 13:30:58 -0800192
193 // Verify that the message was request handling and response completion happens on the correct thread.
194 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
195 assertEquals("completion-thread", completionThreadName.get());
196 assertEquals("handler-thread", handlerThreadName.get());
197 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800198
199 private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
200 return new ClusterMetadataService() {
201 @Override
202 public ClusterMetadata getClusterMetadata() {
203 return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
204 name, Sets.newHashSet(), Sets.newHashSet());
205 }
206
207 @Override
208 public ControllerNode getLocalNode() {
209 return new ControllerNode() {
210 @Override
211 public NodeId id() {
212 return null;
213 }
214
215 @Override
216 public IpAddress ip() {
217 return IpAddress.valueOf(ipAddress);
218 }
219
220 @Override
221 public int tcpPort() {
222 return ep.port();
223 }
224 };
225 }
226
227 @Override
228 public void addListener(ClusterMetadataEventListener listener) {}
229
230 @Override
231 public void removeListener(ClusterMetadataEventListener listener) {}
232 };
233 }
Jonathan Hart6f4d5862016-08-17 09:18:01 -0700234}