ONOS-1982: MessagingService is now a OSGi service. Has implementations based on Netty and IOLoop

Change-Id: Ia4c99de18e91be1b49bd1fddd86fe89fb83e859c
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
similarity index 89%
rename from utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java
rename to utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index 0bbb8c1..44b7027 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -37,22 +37,20 @@
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.onlab.packet.IpAddress;
 import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.cluster.messaging.MessagingService;
 import org.slf4j.Logger;
@@ -66,14 +64,15 @@
 /**
  * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
  */
-public class NettyMessagingManager implements MessagingService {
+public class NettyMessaging implements MessagingService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
 
-    private final Endpoint localEp;
-    private final ConcurrentMap<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
+    private Endpoint localEp;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
     private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
             .expireAfterWrite(10, TimeUnit.SECONDS)
@@ -104,7 +103,8 @@
             clientChannelClass = EpollSocketChannel.class;
             return;
         } catch (Throwable e) {
-            log.warn("Failed to initialize native (epoll) transport. Reason: {}. Proceeding with nio.", e.getMessage());
+            log.debug("Failed to initialize native (epoll) transport. "
+                    + "Reason: {}. Proceeding with nio.", e.getMessage());
         }
         clientGroup = new NioEventLoopGroup();
         serverGroup = new NioEventLoopGroup();
@@ -112,43 +112,27 @@
         clientChannelClass = NioSocketChannel.class;
     }
 
-    public NettyMessagingManager(IpAddress ip, int port) {
-        localEp = new Endpoint(ip, port);
-    }
-
-    public NettyMessagingManager() {
-        this(8080);
-    }
-
-    public NettyMessagingManager(int port) {
-        try {
-            localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
-        } catch (UnknownHostException e) {
-            // Cannot resolve the local host, something is very wrong. Bailing out.
-            throw new IllegalStateException("Cannot resolve local host", e);
+    public void start(Endpoint localEp) throws Exception {
+        if (started.get()) {
+            log.warn("Already running at local endpoint: {}", localEp);
+            return;
         }
-    }
-
-    public void activate() throws InterruptedException {
+        this.localEp = localEp;
         channels.setLifo(false);
         channels.setTestOnBorrow(true);
         channels.setTestOnReturn(true);
         initEventLoopGroup();
         startAcceptingConnections();
+        started.set(true);
     }
 
-    public void deactivate() throws Exception {
-        channels.close();
-        serverGroup.shutdownGracefully();
-        clientGroup.shutdownGracefully();
-    }
-
-    /**
-     * Returns the local endpoint for this instance.
-     * @return local end point.
-     */
-    public Endpoint localEp() {
-        return localEp;
+    public void stop() throws Exception {
+        if (started.get()) {
+            channels.close();
+            serverGroup.shutdownGracefully();
+            clientGroup.shutdownGracefully();
+            started.set(false);
+        }
     }
 
     @Override
@@ -237,7 +221,13 @@
             .childOption(ChannelOption.SO_KEEPALIVE, true);
 
         // Bind and start to accept incoming connections.
-        b.bind(localEp.port()).sync();
+        b.bind(localEp.port()).sync().addListener(future -> {
+            if (future.isSuccess()) {
+                log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
+            } else {
+                log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
+            }
+        });
     }
 
     private class OnosCommunicationChannelFactory