ONOS-1982: MessagingService is now a OSGi service. Has implementations based on Netty and IOLoop

Change-Id: Ia4c99de18e91be1b49bd1fddd86fe89fb83e859c
diff --git a/utils/netty/pom.xml b/utils/netty/pom.xml
index 9e5677c..f54c93f 100644
--- a/utils/netty/pom.xml
+++ b/utils/netty/pom.xml
@@ -80,5 +80,4 @@
           <version>${netty4.version}</version>
         </dependency>
     </dependencies>
-
 </project>
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
similarity index 89%
rename from utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java
rename to utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index 0bbb8c1..44b7027 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -37,22 +37,20 @@
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.onlab.packet.IpAddress;
 import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.cluster.messaging.MessagingService;
 import org.slf4j.Logger;
@@ -66,14 +64,15 @@
 /**
  * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
  */
-public class NettyMessagingManager implements MessagingService {
+public class NettyMessaging implements MessagingService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
 
-    private final Endpoint localEp;
-    private final ConcurrentMap<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
+    private Endpoint localEp;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
     private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
             .expireAfterWrite(10, TimeUnit.SECONDS)
@@ -104,7 +103,8 @@
             clientChannelClass = EpollSocketChannel.class;
             return;
         } catch (Throwable e) {
-            log.warn("Failed to initialize native (epoll) transport. Reason: {}. Proceeding with nio.", e.getMessage());
+            log.debug("Failed to initialize native (epoll) transport. "
+                    + "Reason: {}. Proceeding with nio.", e.getMessage());
         }
         clientGroup = new NioEventLoopGroup();
         serverGroup = new NioEventLoopGroup();
@@ -112,43 +112,27 @@
         clientChannelClass = NioSocketChannel.class;
     }
 
-    public NettyMessagingManager(IpAddress ip, int port) {
-        localEp = new Endpoint(ip, port);
-    }
-
-    public NettyMessagingManager() {
-        this(8080);
-    }
-
-    public NettyMessagingManager(int port) {
-        try {
-            localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
-        } catch (UnknownHostException e) {
-            // Cannot resolve the local host, something is very wrong. Bailing out.
-            throw new IllegalStateException("Cannot resolve local host", e);
+    public void start(Endpoint localEp) throws Exception {
+        if (started.get()) {
+            log.warn("Already running at local endpoint: {}", localEp);
+            return;
         }
-    }
-
-    public void activate() throws InterruptedException {
+        this.localEp = localEp;
         channels.setLifo(false);
         channels.setTestOnBorrow(true);
         channels.setTestOnReturn(true);
         initEventLoopGroup();
         startAcceptingConnections();
+        started.set(true);
     }
 
-    public void deactivate() throws Exception {
-        channels.close();
-        serverGroup.shutdownGracefully();
-        clientGroup.shutdownGracefully();
-    }
-
-    /**
-     * Returns the local endpoint for this instance.
-     * @return local end point.
-     */
-    public Endpoint localEp() {
-        return localEp;
+    public void stop() throws Exception {
+        if (started.get()) {
+            channels.close();
+            serverGroup.shutdownGracefully();
+            clientGroup.shutdownGracefully();
+            started.set(false);
+        }
     }
 
     @Override
@@ -237,7 +221,13 @@
             .childOption(ChannelOption.SO_KEEPALIVE, true);
 
         // Bind and start to accept incoming connections.
-        b.bind(localEp.port()).sync();
+        b.bind(localEp.port()).sync().addListener(future -> {
+            if (future.isSuccess()) {
+                log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
+            } else {
+                log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
+            }
+        });
     }
 
     private class OnosCommunicationChannelFactory
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
deleted file mode 100644
index 53a36e3..0000000
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import java.net.InetAddress;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-import org.apache.commons.lang3.RandomUtils;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.onlab.packet.IpAddress;
-import org.onosproject.store.cluster.messaging.Endpoint;
-
-import com.google.common.util.concurrent.MoreExecutors;
-
-/**
- * Simple ping-pong test that exercises NettyMessagingService.
- */
-public class PingPongTest {
-
-    @Ignore("Turning off fragile test")
-    @Test
-    public void testPingPong() throws Exception {
-        NettyMessagingManager pinger = new NettyMessagingManager(8085);
-        NettyMessagingManager ponger = new NettyMessagingManager(9086);
-        try {
-            pinger.activate();
-            ponger.activate();
-            ponger.registerHandler("echo", Function.identity(), MoreExecutors.directExecutor());
-            byte[] payload = RandomUtils.nextBytes(100);
-            CompletableFuture<byte[]> responseFuture =
-                    pinger.sendAndReceive(
-                            new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), 9086), "echo", payload);
-            assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));
-        } finally {
-            pinger.deactivate();
-            ponger.deactivate();
-        }
-    }
-}
diff --git a/utils/nio/pom.xml b/utils/nio/pom.xml
index ce38b35..5ea39b7 100644
--- a/utils/nio/pom.xml
+++ b/utils/nio/pom.xml
@@ -55,5 +55,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-
 </project>
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
similarity index 92%
rename from utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java
rename to utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
index c183523..70f2f76 100644
--- a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java
+++ b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
@@ -31,6 +31,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -39,7 +40,6 @@
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.onlab.nio.AcceptorLoop;
 import org.onlab.nio.SelectorLoop;
-import org.onlab.packet.IpAddress;
 import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.cluster.messaging.MessagingService;
 import org.slf4j.Logger;
@@ -54,13 +54,12 @@
 /**
  * MessagingService implementation based on IOLoop.
  */
-public class IOLoopMessagingManager implements MessagingService {
+public class IOLoopMessaging implements MessagingService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
 
-    static final int PORT = 9876;
     static final long TIMEOUT = 1000;
 
     static final boolean SO_NO_DELAY = false;
@@ -79,7 +78,8 @@
 
     private int lastWorker = -1;
 
-    private final Endpoint localEp;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private Endpoint localEp;
 
     private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
             new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
@@ -97,34 +97,17 @@
             })
             .build();
 
-
-    public IOLoopMessagingManager(int port) {
-        this(new Endpoint(IpAddress.valueOf("127.0.0.1"), port));
-    }
-
-    public IOLoopMessagingManager(IpAddress ip, int port) {
-        this(new Endpoint(ip, port));
-    }
-
-    public IOLoopMessagingManager(Endpoint localEp) {
-        this.localEp = localEp;
-    }
-
-    /**
-     * Returns the local endpoint.
-     *
-     * @return local endpoint
-     */
-    public Endpoint localEp() {
-        return localEp;
-    }
-
     /**
      * Activates IO Loops.
      *
      * @throws IOException is activation fails
      */
-    public void activate() throws IOException {
+    public void start(Endpoint localEp) throws IOException {
+        if (started.get()) {
+            log.warn("IOMessaging is already running at {}", localEp);
+            return;
+        }
+        this.localEp = localEp;
         streams.setLifo(false);
         this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
 
@@ -136,16 +119,20 @@
         acceptorThreadPool.execute(acceptorLoop);
         ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
         acceptorLoop.awaitStart(TIMEOUT);
+        started.set(true);
     }
 
     /**
      * Shuts down IO loops.
      */
-    public void deactivate() {
-        ioLoops.forEach(SelectorLoop::shutdown);
-        acceptorLoop.shutdown();
-        ioThreadPool.shutdown();
-        acceptorThreadPool.shutdown();
+    public void stop() {
+        if (started.get()) {
+            ioLoops.forEach(SelectorLoop::shutdown);
+            acceptorLoop.shutdown();
+            ioThreadPool.shutdown();
+            acceptorThreadPool.shutdown();
+            started.set(false);
+        }
     }