ONOS-1982: MessagingService is now a OSGi service. Has implementations based on Netty and IOLoop

Change-Id: Ia4c99de18e91be1b49bd1fddd86fe89fb83e859c
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterDefinitionService.java b/core/api/src/main/java/org/onosproject/cluster/ClusterDefinitionService.java
new file mode 100644
index 0000000..dbe5f71
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterDefinitionService.java
@@ -0,0 +1,32 @@
+package org.onosproject.cluster;
+
+import java.util.Set;
+
+/**
+ * Service for obtaining the static definition of a controller cluster.
+ */
+public interface ClusterDefinitionService {
+
+    /**
+     * Returns the local controller node.
+     * @return local controller node
+     */
+    ControllerNode localNode();
+
+    /**
+     * Returns the set of seed nodes that should be used for discovering other members
+     * of the cluster.
+     * @return set of seed controller nodes
+     */
+    Set<ControllerNode> seedNodes();
+
+    /**
+     * Forms cluster configuration based on the specified set of node
+     * information. Assumes subsequent restart for the new configuration to
+     * take hold.
+     *
+     * @param nodes    set of nodes that form the cluster
+     * @param ipPrefix IP address prefix, e.g. 10.0.1.*
+     */
+    void formCluster(Set<ControllerNode> nodes, String ipPrefix);
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterStore.java b/core/api/src/main/java/org/onosproject/cluster/ClusterStore.java
index 68798fd..0481d51 100644
--- a/core/api/src/main/java/org/onosproject/cluster/ClusterStore.java
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterStore.java
@@ -65,16 +65,6 @@
     DateTime getLastUpdated(NodeId nodeId);
 
     /**
-     * Forms cluster configuration based on the specified set of node
-     * information. Assumes subsequent restart for the new configuration to
-     * take hold.
-     *
-     * @param nodes    set of nodes that form the cluster
-     * @param ipPrefix IP address prefix, e.g. 10.0.1.*
-     */
-    void formCluster(Set<ControllerNode> nodes, String ipPrefix);
-
-    /**
      * Adds a new controller node to the cluster.
      *
      * @param nodeId  controller node identifier
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
index c8ffe88..980d40f 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
@@ -25,6 +25,7 @@
 import org.joda.time.DateTime;
 import org.onlab.packet.IpAddress;
 import org.onosproject.cluster.ClusterAdminService;
+import org.onosproject.cluster.ClusterDefinitionService;
 import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ClusterService;
@@ -58,6 +59,9 @@
             listenerRegistry = new ListenerRegistry<>();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterDefinitionService clusterDefinitionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterStore store;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -70,6 +74,8 @@
     public void activate() {
         store.setDelegate(delegate);
         eventDispatcher.addSink(ClusterEvent.class, listenerRegistry);
+        clusterDefinitionService.seedNodes()
+                                .forEach(node -> store.addNode(node.id(), node.ip(), node.tcpPort()));
         log.info("Started");
     }
 
@@ -113,7 +119,7 @@
         checkNotNull(nodes, "Nodes cannot be null");
         checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
         checkNotNull(ipPrefix, "IP prefix cannot be null");
-        store.formCluster(nodes, ipPrefix);
+        clusterDefinitionService.formCluster(nodes, ipPrefix);
         try {
             log.warn("Shutting down container for cluster reconfiguration!");
             systemService.reboot("now", SystemService.Swipe.NONE);
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java
new file mode 100644
index 0000000..4d12723
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionManager.java
@@ -0,0 +1,172 @@
+package org.onosproject.store.cluster.impl;
+
+import static com.hazelcast.util.AddressUtil.matchInterface;
+import static java.net.NetworkInterface.getNetworkInterfaces;
+import static java.util.Collections.list;
+import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
+import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Enumeration;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterDefinitionService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.consistent.impl.DatabaseDefinition;
+import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
+import org.slf4j.Logger;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+import com.hazelcast.util.AddressUtil;
+
+/**
+ * Implementation of ClusterDefinitionService.
+ */
+@Component(immediate = true)
+@Service
+public class ClusterDefinitionManager implements ClusterDefinitionService {
+
+    public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
+    private static final String ONOS_NIC = "ONOS_NIC";
+    private static final Logger log = getLogger(ClusterDefinitionManager.class);
+    private ControllerNode localNode;
+    private Set<ControllerNode> seedNodes;
+
+    @Activate
+    public void activate() {
+        File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
+        ClusterDefinitionStore clusterDefinitionStore =
+                new ClusterDefinitionStore(clusterDefinitionFile.getPath());
+
+        if (!clusterDefinitionFile.exists()) {
+            createDefaultClusterDefinition(clusterDefinitionStore);
+        }
+
+        try {
+            ClusterDefinition clusterDefinition = clusterDefinitionStore.read();
+            establishSelfIdentity(clusterDefinition);
+            seedNodes = ImmutableSet
+                    .copyOf(clusterDefinition.getNodes())
+                    .stream()
+                    .filter(n -> !localNode.id().equals(new NodeId(n.getId())))
+                    .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
+                                                        IpAddress.valueOf(n.getIp()),
+                                                        n.getTcpPort()))
+                    .collect(Collectors.toSet());
+        } catch (IOException e) {
+            throw new IllegalStateException("Failed to read cluster definition.", e);
+        }
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        log.info("Stopped");
+    }
+
+    @Override
+    public ControllerNode localNode() {
+        return localNode;
+    }
+
+    @Override
+    public Set<ControllerNode> seedNodes() {
+        return seedNodes;
+    }
+
+    @Override
+    public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
+        try {
+            Set<NodeInfo> infos = Sets.newHashSet();
+            nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
+                                                       n.ip().toString(),
+                                                       n.tcpPort())));
+
+            ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
+            new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
+
+            DatabaseDefinition ddef = DatabaseDefinition.from(infos);
+            new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
+        } catch (IOException e) {
+            log.error("Unable to form cluster", e);
+        }
+    }
+
+    private IpAddress findLocalIp(ClusterDefinition clusterDefinition) throws SocketException {
+        Enumeration<NetworkInterface> interfaces =
+                NetworkInterface.getNetworkInterfaces();
+        while (interfaces.hasMoreElements()) {
+            NetworkInterface iface = interfaces.nextElement();
+            Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
+            while (inetAddresses.hasMoreElements()) {
+                IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
+                if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
+                    return ip;
+                }
+            }
+        }
+        throw new IllegalStateException("Unable to determine local ip");
+    }
+
+    private void establishSelfIdentity(ClusterDefinition clusterDefinition) {
+        try {
+            IpAddress ip = findLocalIp(clusterDefinition);
+            localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
+        } catch (SocketException e) {
+            throw new IllegalStateException("Cannot determine local IP", e);
+        }
+    }
+
+    private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
+        // Assumes IPv4 is returned.
+        String ip = getSiteLocalAddress();
+        String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
+        NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
+        try {
+            store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
+        } catch (IOException e) {
+            log.warn("Unable to write default cluster definition", e);
+        }
+    }
+
+    /**
+     * Returns the address that matches the IP prefix given in ONOS_NIC
+     * environment variable if one was specified, or the first site local
+     * address if one can be found or the loopback address otherwise.
+     *
+     * @return site-local address in string form
+     */
+    public static String getSiteLocalAddress() {
+        try {
+            String ipPrefix = System.getenv(ONOS_NIC);
+            for (NetworkInterface nif : list(getNetworkInterfaces())) {
+                for (InetAddress address : list(nif.getInetAddresses())) {
+                    IpAddress ip = IpAddress.valueOf(address);
+                    if (ipPrefix == null && address.isSiteLocalAddress() ||
+                            ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
+                        return ip.toString();
+                    }
+                }
+            }
+        } catch (SocketException e) {
+            log.error("Unable to get network interfaces", e);
+        }
+
+        return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 149a858..ec297e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -17,17 +17,17 @@
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.hazelcast.util.AddressUtil;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.joda.time.DateTime;
-import org.onlab.netty.NettyMessagingManager;
 import org.onlab.packet.IpAddress;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterDefinitionService;
 import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.ClusterStore;
 import org.onosproject.cluster.ClusterStoreDelegate;
@@ -37,18 +37,12 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.consistent.impl.DatabaseDefinition;
-import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
+import org.onosproject.store.cluster.messaging.MessagingService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.slf4j.Logger;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Enumeration;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -59,12 +53,7 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.hazelcast.util.AddressUtil.matchInterface;
-import static java.net.NetworkInterface.getNetworkInterfaces;
-import static java.util.Collections.list;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
-import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
 import static org.slf4j.LoggerFactory.getLogger;
 
 @Component(immediate = true)
@@ -79,11 +68,9 @@
 
     private static final Logger log = getLogger(DistributedClusterStore.class);
 
-    public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
     public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
 
     // TODO: make these configurable.
-    private static final int HEARTBEAT_FD_PORT = 2419;
     private static final int HEARTBEAT_INTERVAL_MS = 100;
     private static final int PHI_FAILURE_THRESHOLD = 10;
 
@@ -99,16 +86,10 @@
     };
 
     private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
-    private static final byte SITE_LOCAL_BYTE = (byte) 0xC0;
-    private static final String ONOS_NIC = "ONOS_NIC";
 
-    private ClusterDefinition clusterDefinition;
-
-    private Set<ControllerNode> seedNodes;
     private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
     private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
     private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
-    private NettyMessagingManager messagingService;
     private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
             groupedThreads("onos/cluster/membership", "heartbeat-sender"));
     private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
@@ -118,45 +99,16 @@
 
     private ControllerNode localNode;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterDefinitionService clusterDefinitionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MessagingService messagingService;
+
     @Activate
     public void activate() {
-        File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
-        ClusterDefinitionStore clusterDefinitionStore =
-                new ClusterDefinitionStore(clusterDefinitionFile.getPath());
+        localNode = clusterDefinitionService.localNode();
 
-        if (!clusterDefinitionFile.exists()) {
-            createDefaultClusterDefinition(clusterDefinitionStore);
-        }
-
-        try {
-            clusterDefinition = clusterDefinitionStore.read();
-            seedNodes = ImmutableSet
-                    .copyOf(clusterDefinition.getNodes())
-                    .stream()
-                    .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
-                                                        IpAddress.valueOf(n.getIp()),
-                                                        n.getTcpPort()))
-                    .collect(Collectors.toSet());
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to read cluster definition.", e);
-        }
-
-        seedNodes.forEach(node -> {
-            allNodes.put(node.id(), node);
-            updateState(node.id(), State.INACTIVE);
-        });
-
-        establishSelfIdentity();
-
-        messagingService = new NettyMessagingManager(HEARTBEAT_FD_PORT);
-        try {
-            messagingService.activate();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IllegalStateException(
-                    "Failed to cleanly initialize membership and"
-                            + " failure detector communication channel.", e);
-        }
         messagingService.registerHandler(HEARTBEAT_MESSAGE,
                                          new HeartbeatMessageHandler(), heartBeatMessageHandler);
 
@@ -165,56 +117,15 @@
         heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
                                                HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
 
+        addNode(localNode);
+        updateState(localNode.id(), State.ACTIVE);
+
         log.info("Started");
     }
 
-    private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
-        // Assumes IPv4 is returned.
-        String ip = DistributedClusterStore.getSiteLocalAddress();
-        String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
-        NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
-        try {
-            store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
-        } catch (IOException e) {
-            log.warn("Unable to write default cluster definition", e);
-        }
-    }
-
-    /**
-     * Returns the address that matches the IP prefix given in ONOS_NIC
-     * environment variable if one was specified, or the first site local
-     * address if one can be found or the loopback address otherwise.
-     *
-     * @return site-local address in string form
-     */
-    public static String getSiteLocalAddress() {
-        try {
-            String ipPrefix = System.getenv(ONOS_NIC);
-            for (NetworkInterface nif : list(getNetworkInterfaces())) {
-                for (InetAddress address : list(nif.getInetAddresses())) {
-                    IpAddress ip = IpAddress.valueOf(address);
-                    if (ipPrefix == null && address.isSiteLocalAddress() ||
-                            ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
-                        return ip.toString();
-                    }
-                }
-            }
-
-        } catch (SocketException e) {
-            log.error("Unable to get network interfaces", e);
-        }
-
-        return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
-    }
-
     @Deactivate
     public void deactivate() {
-        try {
-            messagingService.deactivate();
-        } catch (Exception e) {
-            log.trace("Failed to cleanly shutdown cluster membership messaging", e);
-        }
-
+        messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
         heartBeatSender.shutdownNow();
         heartBeatMessageHandler.shutdownNow();
 
@@ -262,9 +173,7 @@
     @Override
     public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
         ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
-        allNodes.put(node.id(), node);
-        updateState(nodeId, State.INACTIVE);
-        notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
+        addNode(node);
         return node;
     }
 
@@ -278,22 +187,10 @@
         }
     }
 
-    @Override
-    public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
-        try {
-            Set<NodeInfo> infos = Sets.newHashSet();
-            nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
-                                                       n.ip().toString(),
-                                                       n.tcpPort())));
-
-            ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
-            new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
-
-            DatabaseDefinition ddef = DatabaseDefinition.from(infos);
-            new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
-        } catch (IOException e) {
-            log.error("Unable to form cluster", e);
-        }
+    private void addNode(ControllerNode node) {
+        allNodes.put(node.id(), node);
+        updateState(node.id(), State.INACTIVE);
+        notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
     }
 
     private void updateState(NodeId nodeId, State newState) {
@@ -301,18 +198,6 @@
         nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
     }
 
-    private void establishSelfIdentity() {
-        try {
-            IpAddress ip = findLocalIp();
-            localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
-            allNodes.put(localNode.id(), localNode);
-            updateState(localNode.id(), State.ACTIVE);
-            log.info("Local Node: {}", localNode);
-        } catch (SocketException e) {
-            throw new IllegalStateException("Cannot determine local IP", e);
-        }
-    }
-
     private void heartbeat() {
         try {
             Set<ControllerNode> peers = allNodes.values()
@@ -351,7 +236,7 @@
     }
 
     private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
-        Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
+        Endpoint remoteEp = new Endpoint(peer.ip(), peer.tcpPort());
         try {
             messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
         } catch (IOException e) {
@@ -359,22 +244,6 @@
         }
     }
 
-    private IpAddress findLocalIp() throws SocketException {
-        Enumeration<NetworkInterface> interfaces =
-                NetworkInterface.getNetworkInterfaces();
-        while (interfaces.hasMoreElements()) {
-            NetworkInterface iface = interfaces.nextElement();
-            Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
-            while (inetAddresses.hasMoreElements()) {
-                IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
-                if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
-                    return ip;
-                }
-            }
-        }
-        throw new IllegalStateException("Unable to determine local ip");
-    }
-
     private class HeartbeatMessageHandler implements Consumer<byte[]> {
         @Override
         public void accept(byte[] message) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
index e88e2d2..c7be0db 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
@@ -130,11 +130,6 @@
     }
 
     @Override
-    public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
-        throw new UnsupportedOperationException("formCluster not implemented");
-    }
-
-    @Override
     public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
         return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
     }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 1a4512d..7475819 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -21,8 +21,6 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
-import org.onlab.netty.NettyMessagingManager;
-import org.onlab.nio.service.IOLoopMessagingManager;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.NodeId;
@@ -60,47 +58,16 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     private ClusterService clusterService;
 
-    // TODO: This probably should not be a OSGi service.
-    private MessagingService messagingService;
-
-    private final boolean useNetty = true;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MessagingService messagingService;
 
     @Activate
     public void activate() {
-        ControllerNode localNode = clusterService.getLocalNode();
-        if (useNetty) {
-            NettyMessagingManager netty = new NettyMessagingManager(localNode.ip(), localNode.tcpPort());
-            try {
-                netty.activate();
-                messagingService = netty;
-            } catch (Exception e) {
-                log.error("NettyMessagingService#activate", e);
-            }
-        } else {
-            IOLoopMessagingManager ioLoop = new IOLoopMessagingManager(localNode.ip(), localNode.tcpPort());
-            try {
-                ioLoop.activate();
-                messagingService = ioLoop;
-            } catch (Exception e) {
-                log.error("IOLoopMessagingService#activate", e);
-            }
-        }
-        log.info("Started on {}:{}", localNode.ip(), localNode.tcpPort());
+        log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
-        // TODO: cleanup messageingService if needed.
-        // FIXME: workaround until it becomes a service.
-        try {
-            if (useNetty) {
-                ((NettyMessagingManager) messagingService).deactivate();
-            } else {
-                ((IOLoopMessagingManager) messagingService).deactivate();
-            }
-        } catch (Exception e) {
-            log.error("MessagingService#deactivate", e);
-        }
         log.info("Stopped");
     }
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/IOLoopMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/IOLoopMessagingManager.java
new file mode 100644
index 0000000..9e52c3e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/IOLoopMessagingManager.java
@@ -0,0 +1,40 @@
+package org.onosproject.store.cluster.messaging.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.nio.service.IOLoopMessaging;
+import org.onosproject.cluster.ClusterDefinitionService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * IOLoop based MessagingService.
+ */
+@Component(immediate = true, enabled = false)
+@Service
+public class IOLoopMessagingManager extends IOLoopMessaging {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterDefinitionService clusterDefinitionService;
+
+    @Activate
+    public void activate() throws Exception {
+        ControllerNode localNode = clusterDefinitionService.localNode();
+        super.start(new Endpoint(localNode.ip(), localNode.tcpPort()));
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() throws Exception {
+        super.stop();
+        log.info("Stopped");
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
new file mode 100644
index 0000000..4777fdb
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/messaging/impl/NettyMessagingManager.java
@@ -0,0 +1,40 @@
+package org.onosproject.store.cluster.messaging.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.netty.NettyMessaging;
+import org.onosproject.cluster.ClusterDefinitionService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.store.cluster.messaging.Endpoint;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Netty based MessagingService.
+ */
+@Component(immediate = true, enabled = true)
+@Service
+public class NettyMessagingManager extends NettyMessaging {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterDefinitionService clusterDefinitionService;
+
+    @Activate
+    public void activate() throws Exception {
+        ControllerNode localNode = clusterDefinitionService.localNode();
+        super.start(new Endpoint(localNode.ip(), localNode.tcpPort()));
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() throws Exception {
+        super.stop();
+        log.info("Stopped");
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 71d1961..c06ebdf 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -45,7 +45,7 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.core.IdGenerator;
-import org.onosproject.store.cluster.impl.DistributedClusterStore;
+import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
 import org.onosproject.store.cluster.impl.NodeInfo;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
@@ -193,7 +193,7 @@
 
     private void createDefaultDatabaseDefinition(DatabaseDefinitionStore store) {
         // Assumes IPv4 is returned.
-        String ip = DistributedClusterStore.getSiteLocalAddress();
+        String ip = ClusterDefinitionManager.getSiteLocalAddress();
         NodeInfo node = NodeInfo.from(ip, ip, COPYCAT_TCP_PORT);
         try {
             store.write(DatabaseDefinition.from(ImmutableSet.of(node)));
diff --git a/core/store/dist/src/main/java/org/onosproject/store/hz/StoreManager.java b/core/store/dist/src/main/java/org/onosproject/store/hz/StoreManager.java
index dc159af..5cb5757 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/hz/StoreManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/hz/StoreManager.java
@@ -26,7 +26,7 @@
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Service;
-import org.onosproject.store.cluster.impl.DistributedClusterStore;
+import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,7 +66,7 @@
     }
 
     private void createDefaultHazelcastFile(File hazelcastFile) {
-        String ip = DistributedClusterStore.getSiteLocalAddress();
+        String ip = ClusterDefinitionManager.getSiteLocalAddress();
         String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
         InputStream his = getClass().getResourceAsStream("/hazelcast.xml");
         try {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
index 292330c..43af1b1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mastership/impl/ConsistentDeviceMastershipStore.java
@@ -140,7 +140,7 @@
         localNodeId = clusterService.getLocalNode().id();
         leadershipService.addListener(leadershipEventListener);
 
-        log.info("Started.");
+        log.info("Started");
     }
 
     @Deactivate
@@ -151,7 +151,7 @@
         transferExecutor.shutdown();
         leadershipService.removeListener(leadershipEventListener);
 
-        log.info("Stoppped.");
+        log.info("Stopped");
     }
 
     @Override
diff --git a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
index 1a106ba..0dcc6a1 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/cluster/messaging/impl/ClusterCommunicationManagerTest.java
@@ -22,7 +22,6 @@
 import org.onosproject.cluster.DefaultControllerNode;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.cluster.impl.ClusterNodesDelegate;
-import org.onlab.netty.NettyMessagingManager;
 import org.onlab.packet.IpAddress;
 
 import java.util.concurrent.CountDownLatch;
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
index ffc67e3..8fde858 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
@@ -93,11 +93,6 @@
     }
 
     @Override
-    public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
-
-    }
-
-    @Override
     public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
         return null;
     }
diff --git a/utils/netty/pom.xml b/utils/netty/pom.xml
index 9e5677c..f54c93f 100644
--- a/utils/netty/pom.xml
+++ b/utils/netty/pom.xml
@@ -80,5 +80,4 @@
           <version>${netty4.version}</version>
         </dependency>
     </dependencies>
-
 </project>
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
similarity index 89%
rename from utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java
rename to utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
index 0bbb8c1..44b7027 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingManager.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessaging.java
@@ -37,22 +37,20 @@
 import io.netty.channel.socket.nio.NioSocketChannel;
 
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
-import org.onlab.packet.IpAddress;
 import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.cluster.messaging.MessagingService;
 import org.slf4j.Logger;
@@ -66,14 +64,15 @@
 /**
  * Implementation of MessagingService based on <a href="http://netty.io/">Netty</a> framework.
  */
-public class NettyMessagingManager implements MessagingService {
+public class NettyMessaging implements MessagingService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGING_REQUEST_REPLY";
 
-    private final Endpoint localEp;
-    private final ConcurrentMap<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
+    private Endpoint localEp;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private final Map<String, Consumer<InternalMessage>> handlers = new ConcurrentHashMap<>();
     private final AtomicLong messageIdGenerator = new AtomicLong(0);
     private final Cache<Long, CompletableFuture<byte[]>> responseFutures = CacheBuilder.newBuilder()
             .expireAfterWrite(10, TimeUnit.SECONDS)
@@ -104,7 +103,8 @@
             clientChannelClass = EpollSocketChannel.class;
             return;
         } catch (Throwable e) {
-            log.warn("Failed to initialize native (epoll) transport. Reason: {}. Proceeding with nio.", e.getMessage());
+            log.debug("Failed to initialize native (epoll) transport. "
+                    + "Reason: {}. Proceeding with nio.", e.getMessage());
         }
         clientGroup = new NioEventLoopGroup();
         serverGroup = new NioEventLoopGroup();
@@ -112,43 +112,27 @@
         clientChannelClass = NioSocketChannel.class;
     }
 
-    public NettyMessagingManager(IpAddress ip, int port) {
-        localEp = new Endpoint(ip, port);
-    }
-
-    public NettyMessagingManager() {
-        this(8080);
-    }
-
-    public NettyMessagingManager(int port) {
-        try {
-            localEp = new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), port);
-        } catch (UnknownHostException e) {
-            // Cannot resolve the local host, something is very wrong. Bailing out.
-            throw new IllegalStateException("Cannot resolve local host", e);
+    public void start(Endpoint localEp) throws Exception {
+        if (started.get()) {
+            log.warn("Already running at local endpoint: {}", localEp);
+            return;
         }
-    }
-
-    public void activate() throws InterruptedException {
+        this.localEp = localEp;
         channels.setLifo(false);
         channels.setTestOnBorrow(true);
         channels.setTestOnReturn(true);
         initEventLoopGroup();
         startAcceptingConnections();
+        started.set(true);
     }
 
-    public void deactivate() throws Exception {
-        channels.close();
-        serverGroup.shutdownGracefully();
-        clientGroup.shutdownGracefully();
-    }
-
-    /**
-     * Returns the local endpoint for this instance.
-     * @return local end point.
-     */
-    public Endpoint localEp() {
-        return localEp;
+    public void stop() throws Exception {
+        if (started.get()) {
+            channels.close();
+            serverGroup.shutdownGracefully();
+            clientGroup.shutdownGracefully();
+            started.set(false);
+        }
     }
 
     @Override
@@ -237,7 +221,13 @@
             .childOption(ChannelOption.SO_KEEPALIVE, true);
 
         // Bind and start to accept incoming connections.
-        b.bind(localEp.port()).sync();
+        b.bind(localEp.port()).sync().addListener(future -> {
+            if (future.isSuccess()) {
+                log.info("{} accepting incoming connections on port {}", localEp.host(), localEp.port());
+            } else {
+                log.warn("{} failed to bind to port {}", localEp.host(), localEp.port(), future.cause());
+            }
+        });
     }
 
     private class OnosCommunicationChannelFactory
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
deleted file mode 100644
index 53a36e3..0000000
--- a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2014-2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onlab.netty;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import java.net.InetAddress;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
-
-import org.apache.commons.lang3.RandomUtils;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.onlab.packet.IpAddress;
-import org.onosproject.store.cluster.messaging.Endpoint;
-
-import com.google.common.util.concurrent.MoreExecutors;
-
-/**
- * Simple ping-pong test that exercises NettyMessagingService.
- */
-public class PingPongTest {
-
-    @Ignore("Turning off fragile test")
-    @Test
-    public void testPingPong() throws Exception {
-        NettyMessagingManager pinger = new NettyMessagingManager(8085);
-        NettyMessagingManager ponger = new NettyMessagingManager(9086);
-        try {
-            pinger.activate();
-            ponger.activate();
-            ponger.registerHandler("echo", Function.identity(), MoreExecutors.directExecutor());
-            byte[] payload = RandomUtils.nextBytes(100);
-            CompletableFuture<byte[]> responseFuture =
-                    pinger.sendAndReceive(
-                            new Endpoint(IpAddress.valueOf(InetAddress.getLocalHost()), 9086), "echo", payload);
-            assertArrayEquals(payload, responseFuture.get(10000, TimeUnit.MILLISECONDS));
-        } finally {
-            pinger.deactivate();
-            ponger.deactivate();
-        }
-    }
-}
diff --git a/utils/nio/pom.xml b/utils/nio/pom.xml
index ce38b35..5ea39b7 100644
--- a/utils/nio/pom.xml
+++ b/utils/nio/pom.xml
@@ -55,5 +55,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-
 </project>
diff --git a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
similarity index 92%
rename from utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java
rename to utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
index c183523..70f2f76 100644
--- a/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessagingManager.java
+++ b/utils/nio/src/main/java/org/onlab/nio/service/IOLoopMessaging.java
@@ -31,6 +31,7 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -39,7 +40,6 @@
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.onlab.nio.AcceptorLoop;
 import org.onlab.nio.SelectorLoop;
-import org.onlab.packet.IpAddress;
 import org.onosproject.store.cluster.messaging.Endpoint;
 import org.onosproject.store.cluster.messaging.MessagingService;
 import org.slf4j.Logger;
@@ -54,13 +54,12 @@
 /**
  * MessagingService implementation based on IOLoop.
  */
-public class IOLoopMessagingManager implements MessagingService {
+public class IOLoopMessaging implements MessagingService {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private static final String REPLY_MESSAGE_TYPE = "ONOS_REQUEST_REPLY";
 
-    static final int PORT = 9876;
     static final long TIMEOUT = 1000;
 
     static final boolean SO_NO_DELAY = false;
@@ -79,7 +78,8 @@
 
     private int lastWorker = -1;
 
-    private final Endpoint localEp;
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    private Endpoint localEp;
 
     private GenericKeyedObjectPool<Endpoint, DefaultMessageStream> streams =
             new GenericKeyedObjectPool<>(new DefaultMessageStreamFactory());
@@ -97,34 +97,17 @@
             })
             .build();
 
-
-    public IOLoopMessagingManager(int port) {
-        this(new Endpoint(IpAddress.valueOf("127.0.0.1"), port));
-    }
-
-    public IOLoopMessagingManager(IpAddress ip, int port) {
-        this(new Endpoint(ip, port));
-    }
-
-    public IOLoopMessagingManager(Endpoint localEp) {
-        this.localEp = localEp;
-    }
-
-    /**
-     * Returns the local endpoint.
-     *
-     * @return local endpoint
-     */
-    public Endpoint localEp() {
-        return localEp;
-    }
-
     /**
      * Activates IO Loops.
      *
      * @throws IOException is activation fails
      */
-    public void activate() throws IOException {
+    public void start(Endpoint localEp) throws IOException {
+        if (started.get()) {
+            log.warn("IOMessaging is already running at {}", localEp);
+            return;
+        }
+        this.localEp = localEp;
         streams.setLifo(false);
         this.acceptorLoop = new DefaultAcceptorLoop(new InetSocketAddress(localEp.host().toString(), localEp.port()));
 
@@ -136,16 +119,20 @@
         acceptorThreadPool.execute(acceptorLoop);
         ioLoops.forEach(loop -> loop.awaitStart(TIMEOUT));
         acceptorLoop.awaitStart(TIMEOUT);
+        started.set(true);
     }
 
     /**
      * Shuts down IO loops.
      */
-    public void deactivate() {
-        ioLoops.forEach(SelectorLoop::shutdown);
-        acceptorLoop.shutdown();
-        ioThreadPool.shutdown();
-        acceptorThreadPool.shutdown();
+    public void stop() {
+        if (started.get()) {
+            ioLoops.forEach(SelectorLoop::shutdown);
+            acceptorLoop.shutdown();
+            ioThreadPool.shutdown();
+            acceptorThreadPool.shutdown();
+            started.set(false);
+        }
     }