blob: 118c67abc9beccc277cc5de7dec0d4d8c370118c [file] [log] [blame]
/*
* Copyright 2016-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.store.cluster.messaging.impl;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.cluster.messaging.Endpoint;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Uninterruptibles;
import static org.junit.Assert.*;
import static org.onlab.junit.TestTools.findAvailablePort;
/**
* Unit tests for NettyMessaging.
*/
public class NettyMessagingManagerTest {
NettyMessagingManager netty1;
NettyMessagingManager netty2;
private static final String DUMMY_NAME = "node";
private static final String IP_STRING = "127.0.0.1";
Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
@Before
public void setUp() throws Exception {
ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
netty1 = new NettyMessagingManager();
netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
netty1.activate();
ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
netty2 = new NettyMessagingManager();
netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
netty2.activate();
}
@After
public void tearDown() throws Exception {
if (netty1 != null) {
netty1.deactivate();
}
if (netty2 != null) {
netty2.deactivate();
}
}
@Test
public void testSendAsync() {
CountDownLatch latch1 = new CountDownLatch(1);
CompletableFuture<Void> response = netty1.sendAsync(ep2, "test-subject", "hello world".getBytes());
response.whenComplete((r, e) -> {
assertNull(e);
latch1.countDown();
});
Uninterruptibles.awaitUninterruptibly(latch1);
CountDownLatch latch2 = new CountDownLatch(1);
response = netty1.sendAsync(invalidEndPoint, "test-subject", "hello world".getBytes());
response.whenComplete((r, e) -> {
assertNotNull(e);
latch2.countDown();
});
Uninterruptibles.awaitUninterruptibly(latch2);
}
@Test
public void testSendAndReceive() {
AtomicBoolean handlerInvoked = new AtomicBoolean(false);
AtomicReference<byte[]> request = new AtomicReference<>();
AtomicReference<Endpoint> sender = new AtomicReference<>();
BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
handlerInvoked.set(true);
sender.set(ep);
request.set(data);
return "hello there".getBytes();
};
netty2.registerHandler("test-subject", handler, MoreExecutors.directExecutor());
CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, "test-subject", "hello world".getBytes());
assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
assertTrue(handlerInvoked.get());
assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
assertEquals(ep1, sender.get());
}
/*
* Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
* and response completion occurs on the expected thread.
*/
@Test
public void testSendAndReceiveWithExecutor() {
ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
AtomicReference<String> handlerThreadName = new AtomicReference<>();
AtomicReference<String> completionThreadName = new AtomicReference<>();
final CountDownLatch latch = new CountDownLatch(1);
BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
handlerThreadName.set(Thread.currentThread().getName());
try {
latch.await();
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
fail("InterruptedException");
}
return "hello there".getBytes();
};
netty2.registerHandler("test-subject", handler, handlerExecutor);
CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
"test-subject",
"hello world".getBytes(),
completionExecutor);
response.whenComplete((r, e) -> {
completionThreadName.set(Thread.currentThread().getName());
});
latch.countDown();
// Verify that the message was request handling and response completion happens on the correct thread.
assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
assertEquals("completion-thread", completionThreadName.get());
assertEquals("handler-thread", handlerThreadName.get());
}
private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
return new ClusterMetadataService() {
@Override
public ClusterMetadata getClusterMetadata() {
return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
name, Sets.newHashSet(), Sets.newHashSet());
}
@Override
public ControllerNode getLocalNode() {
return new ControllerNode() {
@Override
public NodeId id() {
return null;
}
@Override
public IpAddress ip() {
return IpAddress.valueOf(ipAddress);
}
@Override
public int tcpPort() {
return ep.port();
}
};
}
@Override
public void addListener(ClusterMetadataEventListener listener) {}
@Override
public void removeListener(ClusterMetadataEventListener listener) {}
};
}
}