ClusterCommunicationManager bugfixes

Change-Id: I0cb433bf2197c7c745733657607d9e62bb23567d
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 7b05401..3a9fd8d 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -26,6 +26,7 @@
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.ClusterMessageSerializer;
 import org.onlab.onos.store.serializers.KryoPoolUtil;
 import org.onlab.onos.store.serializers.KryoSerializer;
 import org.onlab.util.KryoPool;
@@ -50,8 +51,6 @@
     private ClusterService clusterService;
 
     private ClusterNodesDelegate nodesDelegate;
-    // FIXME: `members` should go away and should be using ClusterService
-    private Map<NodeId, ControllerNode> members = new HashMap<>();
     private final Timer timer = new Timer("onos-controller-heatbeats");
     public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
 
@@ -59,11 +58,14 @@
     private MessagingService messagingService;
 
     private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+        @Override
         protected void setupKryoPool() {
             serializerPool = KryoPool.newBuilder()
                     .register(KryoPoolUtil.API)
-                    .register(ClusterMessage.class)
+                    .register(ClusterMessage.class, new ClusterMessageSerializer())
                     .register(ClusterMembershipEvent.class)
+                    .register(byte[].class)
+                    .register(MessageSubject.class)
                     .build()
                     .populate(1);
         }
@@ -73,7 +75,15 @@
     @Activate
     public void activate() {
         localNode = clusterService.getLocalNode();
-        messagingService = new NettyMessagingService(localNode.tcpPort());
+        NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
+        // FIXME: workaround until it becomes a service.
+        try {
+            netty.activate();
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            log.error("NettyMessagingService#activate", e);
+        }
+        messagingService = netty;
         log.info("Started");
     }
 
@@ -86,7 +96,7 @@
     @Override
     public boolean broadcast(ClusterMessage message) {
         boolean ok = true;
-        for (ControllerNode node : members.values()) {
+        for (ControllerNode node : clusterService.getNodes()) {
             if (!node.equals(localNode)) {
                 ok = unicast(message, node.id()) && ok;
             }
@@ -107,7 +117,7 @@
 
     @Override
     public boolean unicast(ClusterMessage message, NodeId toNodeId) {
-        ControllerNode node = members.get(toNodeId);
+        ControllerNode node = clusterService.getNode(toNodeId);
         checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
         Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
         try {
@@ -137,7 +147,7 @@
 
     @Override
     public void addNode(ControllerNode node) {
-        members.put(node.id(), node);
+        //members.put(node.id(), node);
     }
 
     @Override
@@ -146,7 +156,7 @@
                 localNode.id(),
                 new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
                 SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
-        members.remove(node.id());
+        //members.remove(node.id());
     }
 
     // Sends a heart beat to all peers.