Integrated Kryo serializers with the communications manager and IO loop stuff.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
index 8acc8fe..bac32e9 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
@@ -13,6 +13,7 @@
 import org.onlab.onos.cluster.NodeId;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
 import org.onlab.onos.store.cluster.messaging.HelloMessage;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
@@ -83,9 +84,11 @@
 
     private final Timer timer = new Timer("onos-comm-initiator");
     private final TimerTask connectionCustodian = new ConnectionCustodian();
+    private GoodbyeSubscriber goodbyeSubscriber = new GoodbyeSubscriber();
 
     @Activate
     public void activate() {
+        addSubscriber(MessageSubject.GOODBYE, goodbyeSubscriber);
         log.info("Activated but waiting for delegate");
     }
 
@@ -102,9 +105,20 @@
     }
 
     @Override
+    public boolean send(ClusterMessage message) {
+        boolean ok = true;
+        for (DefaultControllerNode node : nodes) {
+            if (!node.equals(localNode)) {
+                ok = send(message, node.id()) && ok;
+            }
+        }
+        return ok;
+    }
+
+    @Override
     public boolean send(ClusterMessage message, NodeId toNodeId) {
         ClusterMessageStream stream = streams.get(toNodeId);
-        if (stream != null) {
+        if (stream != null && !toNodeId.equals(localNode.id())) {
             try {
                 stream.write(message);
                 return true;
@@ -140,6 +154,7 @@
 
     @Override
     public void removeNode(DefaultControllerNode node) {
+        send(new GoodbyeMessage(node.id()));
         nodes.remove(node);
         ClusterMessageStream stream = streams.remove(node.id());
         if (stream != null) {
@@ -159,6 +174,16 @@
         log.info("Started");
     }
 
+    @Override
+    public void clearAllNodesAndStreams() {
+        nodes.clear();
+        send(new GoodbyeMessage(localNode.id()));
+        for (ClusterMessageStream stream : streams.values()) {
+            stream.close();
+        }
+        streams.clear();
+    }
+
     /**
      * Dispatches the specified message to all subscribers to its subject.
      *
@@ -304,4 +329,11 @@
         }
     }
 
+    private class GoodbyeSubscriber implements MessageSubscriber {
+        @Override
+        public void receive(ClusterMessage message, NodeId fromNodeId) {
+            log.info("Received goodbye message from {}", fromNodeId);
+            nodesDelegate.nodeRemoved(fromNodeId);
+        }
+    }
 }