blob: 4913a06cf4f98e5448d8c299c8df2b06fd6e0d23 [file] [log] [blame]
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani3961b752016-01-13 13:30:58 -080017
18import java.util.Arrays;
19import java.util.concurrent.CompletableFuture;
20import java.util.concurrent.CountDownLatch;
21import java.util.concurrent.ExecutorService;
22import java.util.concurrent.Executors;
23import java.util.concurrent.atomic.AtomicBoolean;
Madan Jampani05833872016-07-12 23:01:39 -070024import java.util.concurrent.atomic.AtomicLong;
Madan Jampani3961b752016-01-13 13:30:58 -080025import java.util.concurrent.atomic.AtomicReference;
26import java.util.function.BiFunction;
27
Aaron Kruglikov1b727382016-02-09 16:17:47 -080028import com.google.common.collect.Sets;
Madan Jampani05833872016-07-12 23:01:39 -070029
Madan Jampani3961b752016-01-13 13:30:58 -080030import org.junit.After;
31import org.junit.Before;
32import org.junit.Test;
33import org.onlab.packet.IpAddress;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080034import org.onosproject.cluster.ClusterMetadata;
35import org.onosproject.cluster.ClusterMetadataEventListener;
36import org.onosproject.cluster.ClusterMetadataService;
37import org.onosproject.cluster.ControllerNode;
38import org.onosproject.cluster.NodeId;
Madan Jampani05833872016-07-12 23:01:39 -070039import org.onosproject.core.HybridLogicalClockService;
40import org.onosproject.core.HybridLogicalTime;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080041import org.onosproject.net.provider.ProviderId;
Madan Jampani3961b752016-01-13 13:30:58 -080042import org.onosproject.store.cluster.messaging.Endpoint;
43
44import com.google.common.util.concurrent.MoreExecutors;
45import com.google.common.util.concurrent.Uninterruptibles;
46
47import static org.junit.Assert.*;
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080048import static org.onlab.junit.TestTools.findAvailablePort;
Madan Jampani3961b752016-01-13 13:30:58 -080049
50/**
51 * Unit tests for NettyMessaging.
52 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080053public class NettyMessagingManagerTest {
Madan Jampani3961b752016-01-13 13:30:58 -080054
Madan Jampani05833872016-07-12 23:01:39 -070055 HybridLogicalClockService testClockService = new HybridLogicalClockService() {
56 AtomicLong counter = new AtomicLong();
57 @Override
58 public HybridLogicalTime timeNow() {
59 return new HybridLogicalTime(counter.incrementAndGet(), 0);
60 }
61
62 @Override
63 public void recordEventTime(HybridLogicalTime time) {
64 }
65 };
66
Aaron Kruglikov1b727382016-02-09 16:17:47 -080067 NettyMessagingManager netty1;
68 NettyMessagingManager netty2;
Madan Jampani3961b752016-01-13 13:30:58 -080069
Aaron Kruglikov1b727382016-02-09 16:17:47 -080070 private static final String DUMMY_NAME = "node";
71 private static final String IP_STRING = "127.0.0.1";
72
73 Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
74 Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
75 Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
Madan Jampani3961b752016-01-13 13:30:58 -080076
77 @Before
78 public void setUp() throws Exception {
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080079 ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080080 netty1 = new NettyMessagingManager();
81 netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
Madan Jampani05833872016-07-12 23:01:39 -070082 netty1.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080083 netty1.activate();
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080084
85 ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080086 netty2 = new NettyMessagingManager();
87 netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
Madan Jampani05833872016-07-12 23:01:39 -070088 netty2.clockService = testClockService;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080089 netty2.activate();
Madan Jampani3961b752016-01-13 13:30:58 -080090 }
91
92 @After
93 public void tearDown() throws Exception {
94 if (netty1 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -080095 netty1.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -080096 }
97
98 if (netty2 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -080099 netty2.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -0800100 }
101 }
102
103 @Test
104 public void testSendAsync() {
105 CountDownLatch latch1 = new CountDownLatch(1);
106 CompletableFuture<Void> response = netty1.sendAsync(ep2, "test-subject", "hello world".getBytes());
107 response.whenComplete((r, e) -> {
108 assertNull(e);
109 latch1.countDown();
110 });
111 Uninterruptibles.awaitUninterruptibly(latch1);
112
113 CountDownLatch latch2 = new CountDownLatch(1);
114 response = netty1.sendAsync(invalidEndPoint, "test-subject", "hello world".getBytes());
115 response.whenComplete((r, e) -> {
116 assertNotNull(e);
117 latch2.countDown();
118 });
119 Uninterruptibles.awaitUninterruptibly(latch2);
120 }
121
122 @Test
123 public void testSendAndReceive() {
124 AtomicBoolean handlerInvoked = new AtomicBoolean(false);
125 AtomicReference<byte[]> request = new AtomicReference<>();
126 AtomicReference<Endpoint> sender = new AtomicReference<>();
127
128 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
129 handlerInvoked.set(true);
130 sender.set(ep);
131 request.set(data);
132 return "hello there".getBytes();
133 };
134 netty2.registerHandler("test-subject", handler, MoreExecutors.directExecutor());
135
136 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, "test-subject", "hello world".getBytes());
137 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
138 assertTrue(handlerInvoked.get());
139 assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
140 assertEquals(ep1, sender.get());
141 }
142
143 /*
144 * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
145 * and response completion occurs on the expected thread.
146 */
147 @Test
148 public void testSendAndReceiveWithExecutor() {
149 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
150 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
151 AtomicReference<String> handlerThreadName = new AtomicReference<>();
152 AtomicReference<String> completionThreadName = new AtomicReference<>();
153
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700154 final CountDownLatch latch = new CountDownLatch(1);
155
Madan Jampani3961b752016-01-13 13:30:58 -0800156 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
157 handlerThreadName.set(Thread.currentThread().getName());
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700158 try {
159 latch.await();
160 } catch (InterruptedException e1) {
161 Thread.currentThread().interrupt();
162 fail("InterruptedException");
163 }
Madan Jampani3961b752016-01-13 13:30:58 -0800164 return "hello there".getBytes();
165 };
166 netty2.registerHandler("test-subject", handler, handlerExecutor);
167
168 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800169 "test-subject",
170 "hello world".getBytes(),
171 completionExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800172 response.whenComplete((r, e) -> {
173 completionThreadName.set(Thread.currentThread().getName());
174 });
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700175 latch.countDown();
Madan Jampani3961b752016-01-13 13:30:58 -0800176
177 // Verify that the message was request handling and response completion happens on the correct thread.
178 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
179 assertEquals("completion-thread", completionThreadName.get());
180 assertEquals("handler-thread", handlerThreadName.get());
181 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800182
183 private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
184 return new ClusterMetadataService() {
185 @Override
186 public ClusterMetadata getClusterMetadata() {
187 return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
188 name, Sets.newHashSet(), Sets.newHashSet());
189 }
190
191 @Override
192 public ControllerNode getLocalNode() {
193 return new ControllerNode() {
194 @Override
195 public NodeId id() {
196 return null;
197 }
198
199 @Override
200 public IpAddress ip() {
201 return IpAddress.valueOf(ipAddress);
202 }
203
204 @Override
205 public int tcpPort() {
206 return ep.port();
207 }
208 };
209 }
210
211 @Override
212 public void addListener(ClusterMetadataEventListener listener) {}
213
214 @Override
215 public void removeListener(ClusterMetadataEventListener listener) {}
216 };
217 }
218}