ONOS-1982: MessagingService is now a OSGi service. Has implementations based on Netty and IOLoop

Change-Id: Ia4c99de18e91be1b49bd1fddd86fe89fb83e859c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 149a858..ec297e2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -17,17 +17,17 @@
 
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.hazelcast.util.AddressUtil;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
 import org.joda.time.DateTime;
-import org.onlab.netty.NettyMessagingManager;
 import org.onlab.packet.IpAddress;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterDefinitionService;
 import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.ClusterStore;
 import org.onosproject.cluster.ClusterStoreDelegate;
@@ -37,18 +37,12 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.cluster.messaging.Endpoint;
-import org.onosproject.store.consistent.impl.DatabaseDefinition;
-import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
+import org.onosproject.store.cluster.messaging.MessagingService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.serializers.KryoSerializer;
 import org.slf4j.Logger;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.InetAddress;
-import java.net.NetworkInterface;
-import java.net.SocketException;
-import java.util.Enumeration;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
@@ -59,12 +53,7 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static com.hazelcast.util.AddressUtil.matchInterface;
-import static java.net.NetworkInterface.getNetworkInterfaces;
-import static java.util.Collections.list;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
-import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
 import static org.slf4j.LoggerFactory.getLogger;
 
 @Component(immediate = true)
@@ -79,11 +68,9 @@
 
     private static final Logger log = getLogger(DistributedClusterStore.class);
 
-    public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
     public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
 
     // TODO: make these configurable.
-    private static final int HEARTBEAT_FD_PORT = 2419;
     private static final int HEARTBEAT_INTERVAL_MS = 100;
     private static final int PHI_FAILURE_THRESHOLD = 10;
 
@@ -99,16 +86,10 @@
     };
 
     private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
-    private static final byte SITE_LOCAL_BYTE = (byte) 0xC0;
-    private static final String ONOS_NIC = "ONOS_NIC";
 
-    private ClusterDefinition clusterDefinition;
-
-    private Set<ControllerNode> seedNodes;
     private final Map<NodeId, ControllerNode> allNodes = Maps.newConcurrentMap();
     private final Map<NodeId, State> nodeStates = Maps.newConcurrentMap();
     private final Map<NodeId, DateTime> nodeStateLastUpdatedTimes = Maps.newConcurrentMap();
-    private NettyMessagingManager messagingService;
     private ScheduledExecutorService heartBeatSender = Executors.newSingleThreadScheduledExecutor(
             groupedThreads("onos/cluster/membership", "heartbeat-sender"));
     private ExecutorService heartBeatMessageHandler = Executors.newSingleThreadExecutor(
@@ -118,45 +99,16 @@
 
     private ControllerNode localNode;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterDefinitionService clusterDefinitionService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MessagingService messagingService;
+
     @Activate
     public void activate() {
-        File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
-        ClusterDefinitionStore clusterDefinitionStore =
-                new ClusterDefinitionStore(clusterDefinitionFile.getPath());
+        localNode = clusterDefinitionService.localNode();
 
-        if (!clusterDefinitionFile.exists()) {
-            createDefaultClusterDefinition(clusterDefinitionStore);
-        }
-
-        try {
-            clusterDefinition = clusterDefinitionStore.read();
-            seedNodes = ImmutableSet
-                    .copyOf(clusterDefinition.getNodes())
-                    .stream()
-                    .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
-                                                        IpAddress.valueOf(n.getIp()),
-                                                        n.getTcpPort()))
-                    .collect(Collectors.toSet());
-        } catch (IOException e) {
-            throw new IllegalStateException("Failed to read cluster definition.", e);
-        }
-
-        seedNodes.forEach(node -> {
-            allNodes.put(node.id(), node);
-            updateState(node.id(), State.INACTIVE);
-        });
-
-        establishSelfIdentity();
-
-        messagingService = new NettyMessagingManager(HEARTBEAT_FD_PORT);
-        try {
-            messagingService.activate();
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IllegalStateException(
-                    "Failed to cleanly initialize membership and"
-                            + " failure detector communication channel.", e);
-        }
         messagingService.registerHandler(HEARTBEAT_MESSAGE,
                                          new HeartbeatMessageHandler(), heartBeatMessageHandler);
 
@@ -165,56 +117,15 @@
         heartBeatSender.scheduleWithFixedDelay(this::heartbeat, 0,
                                                HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS);
 
+        addNode(localNode);
+        updateState(localNode.id(), State.ACTIVE);
+
         log.info("Started");
     }
 
-    private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
-        // Assumes IPv4 is returned.
-        String ip = DistributedClusterStore.getSiteLocalAddress();
-        String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
-        NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
-        try {
-            store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
-        } catch (IOException e) {
-            log.warn("Unable to write default cluster definition", e);
-        }
-    }
-
-    /**
-     * Returns the address that matches the IP prefix given in ONOS_NIC
-     * environment variable if one was specified, or the first site local
-     * address if one can be found or the loopback address otherwise.
-     *
-     * @return site-local address in string form
-     */
-    public static String getSiteLocalAddress() {
-        try {
-            String ipPrefix = System.getenv(ONOS_NIC);
-            for (NetworkInterface nif : list(getNetworkInterfaces())) {
-                for (InetAddress address : list(nif.getInetAddresses())) {
-                    IpAddress ip = IpAddress.valueOf(address);
-                    if (ipPrefix == null && address.isSiteLocalAddress() ||
-                            ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
-                        return ip.toString();
-                    }
-                }
-            }
-
-        } catch (SocketException e) {
-            log.error("Unable to get network interfaces", e);
-        }
-
-        return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
-    }
-
     @Deactivate
     public void deactivate() {
-        try {
-            messagingService.deactivate();
-        } catch (Exception e) {
-            log.trace("Failed to cleanly shutdown cluster membership messaging", e);
-        }
-
+        messagingService.unregisterHandler(HEARTBEAT_MESSAGE);
         heartBeatSender.shutdownNow();
         heartBeatMessageHandler.shutdownNow();
 
@@ -262,9 +173,7 @@
     @Override
     public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
         ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
-        allNodes.put(node.id(), node);
-        updateState(nodeId, State.INACTIVE);
-        notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
+        addNode(node);
         return node;
     }
 
@@ -278,22 +187,10 @@
         }
     }
 
-    @Override
-    public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
-        try {
-            Set<NodeInfo> infos = Sets.newHashSet();
-            nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
-                                                       n.ip().toString(),
-                                                       n.tcpPort())));
-
-            ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
-            new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
-
-            DatabaseDefinition ddef = DatabaseDefinition.from(infos);
-            new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
-        } catch (IOException e) {
-            log.error("Unable to form cluster", e);
-        }
+    private void addNode(ControllerNode node) {
+        allNodes.put(node.id(), node);
+        updateState(node.id(), State.INACTIVE);
+        notifyDelegate(new ClusterEvent(ClusterEvent.Type.INSTANCE_ADDED, node));
     }
 
     private void updateState(NodeId nodeId, State newState) {
@@ -301,18 +198,6 @@
         nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
     }
 
-    private void establishSelfIdentity() {
-        try {
-            IpAddress ip = findLocalIp();
-            localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
-            allNodes.put(localNode.id(), localNode);
-            updateState(localNode.id(), State.ACTIVE);
-            log.info("Local Node: {}", localNode);
-        } catch (SocketException e) {
-            throw new IllegalStateException("Cannot determine local IP", e);
-        }
-    }
-
     private void heartbeat() {
         try {
             Set<ControllerNode> peers = allNodes.values()
@@ -351,7 +236,7 @@
     }
 
     private void heartbeatToPeer(byte[] messagePayload, ControllerNode peer) {
-        Endpoint remoteEp = new Endpoint(peer.ip(), HEARTBEAT_FD_PORT);
+        Endpoint remoteEp = new Endpoint(peer.ip(), peer.tcpPort());
         try {
             messagingService.sendAsync(remoteEp, HEARTBEAT_MESSAGE, messagePayload);
         } catch (IOException e) {
@@ -359,22 +244,6 @@
         }
     }
 
-    private IpAddress findLocalIp() throws SocketException {
-        Enumeration<NetworkInterface> interfaces =
-                NetworkInterface.getNetworkInterfaces();
-        while (interfaces.hasMoreElements()) {
-            NetworkInterface iface = interfaces.nextElement();
-            Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
-            while (inetAddresses.hasMoreElements()) {
-                IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
-                if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
-                    return ip;
-                }
-            }
-        }
-        throw new IllegalStateException("Unable to determine local ip");
-    }
-
     private class HeartbeatMessageHandler implements Consumer<byte[]> {
         @Override
         public void accept(byte[] message) {