Added Netty based messaging. Updated cluster management to use Netty based messaging
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index d4b7289..e25c964 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -1,6 +1,11 @@
 package org.onlab.onos.store.cluster.impl;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.ImmutableSet;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -14,6 +19,8 @@
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
+import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager;
 import org.onlab.packet.IpPrefix;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -22,6 +29,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import static org.onlab.onos.cluster.ControllerNode.State;
 import static org.onlab.packet.IpPrefix.valueOf;
@@ -40,21 +48,25 @@
     private DefaultControllerNode localNode;
     private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
     private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+    private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
+            .maximumSize(1000)
+            .expireAfterWrite(OnosClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
+            .removalListener(new LivenessCacheRemovalListener()).build();
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
-    private ClusterCommunicationAdminService communicationAdminService;
+    private ClusterCommunicationAdminService clusterCommunicationAdminService;
 
     private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
 
     @Activate
-    public void activate() {
+    public void activate() throws IOException {
         loadClusterDefinition();
         establishSelfIdentity();
 
         // Start-up the comm service and prime it with the loaded nodes.
-        communicationAdminService.startUp(localNode, nodesDelegate);
+        clusterCommunicationAdminService.initialize(localNode, nodesDelegate);
         for (DefaultControllerNode node : nodes.values()) {
-            communicationAdminService.addNode(node);
+            clusterCommunicationAdminService.addNode(node);
         }
         log.info("Started");
     }
@@ -121,15 +133,13 @@
     public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
         DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
         nodes.put(nodeId, node);
-        communicationAdminService.addNode(node);
+        clusterCommunicationAdminService.addNode(node);
         return node;
     }
 
     @Override
     public void removeNode(NodeId nodeId) {
         if (nodeId.equals(localNode.id())) {
-            // We are being ejected from the cluster, so remove all other nodes.
-            communicationAdminService.clearAllNodesAndStreams();
             nodes.clear();
             nodes.put(localNode.id(), localNode);
 
@@ -137,7 +147,7 @@
             // Remove the other node.
             DefaultControllerNode node = nodes.remove(nodeId);
             if (node != null) {
-                communicationAdminService.removeNode(node);
+                clusterCommunicationAdminService.removeNode(node);
             }
         }
     }
@@ -151,6 +161,7 @@
                 node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
             }
             states.put(nodeId, State.ACTIVE);
+            livenessCache.put(nodeId, node);
             return node;
         }
 
@@ -165,4 +176,13 @@
         }
     }
 
+    private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> {
+
+        @Override
+        public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) {
+            NodeId nodeId = entry.getKey();
+            log.warn("Failed to receive heartbeats from controller: " + nodeId);
+            nodesDelegate.nodeVanished(nodeId);
+        }
+    }
 }