Migrating netty messaging into netty messaging manager.

Change-Id: I971db195c9dc155cdf76850f0427ef9b9210113c
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
new file mode 100644
index 0000000..d04c777
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManagerTest.java
@@ -0,0 +1,176 @@
+package org.onosproject.store.cluster.messaging.impl;
+
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiFunction;
+
+import com.google.common.collect.Sets;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterMetadata;
+import org.onosproject.cluster.ClusterMetadataEventListener;
+import org.onosproject.cluster.ClusterMetadataService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.cluster.messaging.Endpoint;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.Uninterruptibles;
+
+import static org.junit.Assert.*;
+import static org.onlab.junit.TestTools.findAvailablePort;
+
+/**
+ * Unit tests for NettyMessaging.
+ */
+public class NettyMessagingManagerTest {
+
+    NettyMessagingManager netty1;
+    NettyMessagingManager netty2;
+
+    private static final String DUMMY_NAME = "node";
+    private static final String IP_STRING = "127.0.0.1";
+
+    Endpoint ep1 = new Endpoint(IpAddress.valueOf(IP_STRING), 5001);
+    Endpoint ep2 = new Endpoint(IpAddress.valueOf(IP_STRING), 5002);
+    Endpoint invalidEndPoint = new Endpoint(IpAddress.valueOf(IP_STRING), 5003);
+
+    @Before
+    public void setUp() throws Exception {
+        ep1 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5001));
+        netty1 = new NettyMessagingManager();
+        netty1.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep1);
+        netty1.activate();
+
+        ep2 = new Endpoint(IpAddress.valueOf("127.0.0.1"), findAvailablePort(5003));
+        netty2 = new NettyMessagingManager();
+        netty2.clusterMetadataService = dummyMetadataService(DUMMY_NAME, IP_STRING, ep2);
+        netty2.activate();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (netty1 != null) {
+            netty1.deactivate();
+        }
+
+        if (netty2 != null) {
+            netty2.deactivate();
+        }
+    }
+
+    @Test
+    public void testSendAsync() {
+        CountDownLatch latch1 = new CountDownLatch(1);
+        CompletableFuture<Void> response = netty1.sendAsync(ep2, "test-subject", "hello world".getBytes());
+        response.whenComplete((r, e) -> {
+            assertNull(e);
+            latch1.countDown();
+        });
+        Uninterruptibles.awaitUninterruptibly(latch1);
+
+        CountDownLatch latch2 = new CountDownLatch(1);
+        response = netty1.sendAsync(invalidEndPoint, "test-subject", "hello world".getBytes());
+        response.whenComplete((r, e) -> {
+            assertNotNull(e);
+            latch2.countDown();
+        });
+        Uninterruptibles.awaitUninterruptibly(latch2);
+    }
+
+    @Test
+    public void testSendAndReceive() {
+        AtomicBoolean handlerInvoked = new AtomicBoolean(false);
+        AtomicReference<byte[]> request = new AtomicReference<>();
+        AtomicReference<Endpoint> sender = new AtomicReference<>();
+
+        BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
+            handlerInvoked.set(true);
+            sender.set(ep);
+            request.set(data);
+            return "hello there".getBytes();
+        };
+        netty2.registerHandler("test-subject", handler, MoreExecutors.directExecutor());
+
+        CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2, "test-subject", "hello world".getBytes());
+        assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
+        assertTrue(handlerInvoked.get());
+        assertTrue(Arrays.equals(request.get(), "hello world".getBytes()));
+        assertEquals(ep1, sender.get());
+    }
+
+    /*
+     * Supplies executors when registering a handler and calling sendAndReceive and verifies the request handling
+     * and response completion occurs on the expected thread.
+     */
+    @Test
+    public void testSendAndReceiveWithExecutor() {
+        ExecutorService completionExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "completion-thread"));
+        ExecutorService handlerExecutor = Executors.newSingleThreadExecutor(r -> new Thread(r, "handler-thread"));
+        AtomicReference<String> handlerThreadName = new AtomicReference<>();
+        AtomicReference<String> completionThreadName = new AtomicReference<>();
+
+        BiFunction<Endpoint, byte[], byte[]> handler = (ep, data) -> {
+            handlerThreadName.set(Thread.currentThread().getName());
+            return "hello there".getBytes();
+        };
+        netty2.registerHandler("test-subject", handler, handlerExecutor);
+
+        CompletableFuture<byte[]> response = netty1.sendAndReceive(ep2,
+                                                                   "test-subject",
+                                                                   "hello world".getBytes(),
+                                                                   completionExecutor);
+        response.whenComplete((r, e) -> {
+            completionThreadName.set(Thread.currentThread().getName());
+        });
+
+        // Verify that the message was request handling and response completion happens on the correct thread.
+        assertTrue(Arrays.equals("hello there".getBytes(), response.join()));
+        assertEquals("completion-thread", completionThreadName.get());
+        assertEquals("handler-thread", handlerThreadName.get());
+    }
+
+    private ClusterMetadataService dummyMetadataService(String name, String ipAddress, Endpoint ep) {
+        return new ClusterMetadataService() {
+            @Override
+            public ClusterMetadata getClusterMetadata() {
+                return new ClusterMetadata(new ProviderId(DUMMY_NAME, DUMMY_NAME),
+                                           name, Sets.newHashSet(), Sets.newHashSet());
+            }
+
+            @Override
+            public ControllerNode getLocalNode() {
+                return new ControllerNode() {
+                    @Override
+                    public NodeId id() {
+                        return null;
+                    }
+
+                    @Override
+                    public IpAddress ip() {
+                        return IpAddress.valueOf(ipAddress);
+                    }
+
+                    @Override
+                    public int tcpPort() {
+                        return ep.port();
+                    }
+                };
+            }
+
+            @Override
+            public void addListener(ClusterMetadataEventListener listener) {}
+
+            @Override
+            public void removeListener(ClusterMetadataEventListener listener) {}
+        };
+    }
+}
\ No newline at end of file