blob: 118c67abc9beccc277cc5de7dec0d4d8c370118c [file] [log] [blame]
Brian O'Connor7cbbbb72016-04-09 02:13:23 -07001/*
2 * Copyright 2016-present Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080016package org.onosproject.store.cluster.messaging.impl;
Madan Jampani3961b752016-01-13 13:30:58 -080017
18import java.util.Arrays;
19import java.util.concurrent.CompletableFuture;
20import java.util.concurrent.CountDownLatch;
21import java.util.concurrent.ExecutorService;
22import java.util.concurrent.Executors;
23import java.util.concurrent.atomic.AtomicBoolean;
24import java.util.concurrent.atomic.AtomicReference;
25import java.util.function.BiFunction;
26
Aaron Kruglikov1b727382016-02-09 16:17:47 -080027import com.google.common.collect.Sets;
Madan Jampani3961b752016-01-13 13:30:58 -080028import org.junit.After;
29import org.junit.Before;
30import org.junit.Test;
31import org.onlab.packet.IpAddress;
Aaron Kruglikov1b727382016-02-09 16:17:47 -080032import org.onosproject.cluster.ClusterMetadata;
33import org.onosproject.cluster.ClusterMetadataEventListener;
34import org.onosproject.cluster.ClusterMetadataService;
35import org.onosproject.cluster.ControllerNode;
36import org.onosproject.cluster.NodeId;
37import org.onosproject.net.provider.ProviderId;
Madan Jampani3961b752016-01-13 13:30:58 -080038import org.onosproject.store.cluster.messaging.Endpoint;
39
40import com.google.common.util.concurrent.MoreExecutors;
41import com.google.common.util.concurrent.Uninterruptibles;
42
43import static org.junit.Assert.*;
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080044import static org.onlab.junit.TestTools.findAvailablePort;
Madan Jampani3961b752016-01-13 13:30:58 -080045
46/**
47 * Unit tests for NettyMessaging.
48 */
Aaron Kruglikov1b727382016-02-09 16:17:47 -080049public class NettyMessagingManagerTest {
Madan Jampani3961b752016-01-13 13:30:58 -080050
Aaron Kruglikov1b727382016-02-09 16:17:47 -080051 NettyMessagingManager netty1;
52 NettyMessagingManager netty2;
Madan Jampani3961b752016-01-13 13:30:58 -080053
Aaron Kruglikov1b727382016-02-09 16:17:47 -080054 private static final String DUMMY_NAME = "node";
55 private static final String IP_STRING = "127.0.0.1";
56
57 Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
58 Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
59 Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
Madan Jampani3961b752016-01-13 13:30:58 -080060
61 @Before
62 public void setUp() throws Exception {
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080063 ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080064 netty1 = new NettyMessagingManager();
65 netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
66 netty1.activate();
HIGUCHI Yuta97f7e472016-01-22 10:03:48 -080067
68 ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
Aaron Kruglikov1b727382016-02-09 16:17:47 -080069 netty2 = new NettyMessagingManager();
70 netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
71 netty2.activate();
Madan Jampani3961b752016-01-13 13:30:58 -080072 }
73
74 @After
75 public void tearDown() throws Exception {
76 if (netty1 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -080077 netty1.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -080078 }
79
80 if (netty2 != null) {
Aaron Kruglikov1b727382016-02-09 16:17:47 -080081 netty2.deactivate();
Madan Jampani3961b752016-01-13 13:30:58 -080082 }
83 }
84
85 @Test
86 public void testSendAsync() {
87 CountDownLatch latch1 = new CountDownLatch(1);
88 CompletableFuture<Void> response = netty1.sendAsync(ep2, "test-subject", "hello world".getBytes());
89 response.whenComplete((r, e) -> {
90 assertNull(e);
91 latch1.countDown();
92 });
93 Uninterruptibles.awaitUninterruptibly(latch1);
94
95 CountDownLatch latch2 = new CountDownLatch(1);
96 response = netty1.sendAsync(invalidEndPoint, "test-subject", "hello world".getBytes());
97 response.whenComplete((r, e) -> {
98 assertNotNull(e);
99 latch2.countDown();
100 });
101 Uninterruptibles.awaitUninterruptibly(latch2);
102 }
103
104 @Test
105 public void testSendAndReceive() {
106 AtomicBoolean handlerInvoked = new AtomicBoolean(false);
107 AtomicReference<byte[]> request = new AtomicReference<>();
108 AtomicReference<Endpoint> sender = new AtomicReference<>();
109
110 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
111 handlerInvoked.set(true);
112 sender.set(ep);
113 request.set(data);
114 return "hello there".getBytes();
115 };
116 netty2.registerHandler("test-subject", handler, MoreExecutors.directExecutor());
117
118 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, "test-subject", "hello world".getBytes());
119 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
120 assertTrue(handlerInvoked.get());
121 assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
122 assertEquals(ep1, sender.get());
123 }
124
125 /*
126 * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
127 * and response completion occurs on the expected thread.
128 */
129 @Test
130 public void testSendAndReceiveWithExecutor() {
131 ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
132 ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
133 AtomicReference<String> handlerThreadName = new AtomicReference<>();
134 AtomicReference<String> completionThreadName = new AtomicReference<>();
135
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700136 final CountDownLatch latch = new CountDownLatch(1);
137
Madan Jampani3961b752016-01-13 13:30:58 -0800138 BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
139 handlerThreadName.set(Thread.currentThread().getName());
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700140 try {
141 latch.await();
142 } catch (InterruptedException e1) {
143 Thread.currentThread().interrupt();
144 fail("InterruptedException");
145 }
Madan Jampani3961b752016-01-13 13:30:58 -0800146 return "hello there".getBytes();
147 };
148 netty2.registerHandler("test-subject", handler, handlerExecutor);
149
150 CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800151 "test-subject",
152 "hello world".getBytes(),
153 completionExecutor);
Madan Jampani3961b752016-01-13 13:30:58 -0800154 response.whenComplete((r, e) -> {
155 completionThreadName.set(Thread.currentThread().getName());
156 });
HIGUCHI Yutaf093cf22016-05-25 22:25:18 -0700157 latch.countDown();
Madan Jampani3961b752016-01-13 13:30:58 -0800158
159 // Verify that the message was request handling and response completion happens on the correct thread.
160 assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
161 assertEquals("completion-thread", completionThreadName.get());
162 assertEquals("handler-thread", handlerThreadName.get());
163 }
Aaron Kruglikov1b727382016-02-09 16:17:47 -0800164
165 private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
166 return new ClusterMetadataService() {
167 @Override
168 public ClusterMetadata getClusterMetadata() {
169 return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
170 name, Sets.newHashSet(), Sets.newHashSet());
171 }
172
173 @Override
174 public ControllerNode getLocalNode() {
175 return new ControllerNode() {
176 @Override
177 public NodeId id() {
178 return null;
179 }
180
181 @Override
182 public IpAddress ip() {
183 return IpAddress.valueOf(ipAddress);
184 }
185
186 @Override
187 public int tcpPort() {
188 return ep.port();
189 }
190 };
191 }
192
193 @Override
194 public void addListener(ClusterMetadataEventListener listener) {}
195
196 @Override
197 public void removeListener(ClusterMetadataEventListener listener) {}
198 };
199 }
200}