Added ability to properly register/deregister new connections and have the node status properly reflected.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index 08a182b..5cd9d9e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -30,6 +30,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -129,6 +130,7 @@
         if (self == null) {
             self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
             nodes.put(self.id(), self);
+            states.put(self.id(), State.ACTIVE);
         }
     }
 
@@ -219,7 +221,10 @@
     @Override
     public void removeNode(NodeId nodeId) {
         nodes.remove(nodeId);
-        streams.remove(nodeId);
+        TLVMessageStream stream = streams.remove(nodeId);
+        if (stream != null) {
+            stream.close();
+        }
     }
 
     // Listens and accepts inbound connections from other cluster nodes.
@@ -256,12 +261,13 @@
         protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
             TLVMessageStream tlvStream = (TLVMessageStream) stream;
             for (TLVMessage message : messages) {
-                // TODO: add type-based dispatching here...
-                log.info("Got message {}", message.type());
-
-                // FIXME: hack to get going
+                // TODO: add type-based dispatching here... this is just a hack to get going
                 if (message.type() == HELLO_MSG) {
                     processHello(message, tlvStream);
+                } else if (message.type() == ECHO_MSG) {
+                    processEcho(message, tlvStream);
+                } else {
+                    log.info("Deal with other messages");
                 }
             }
         }
@@ -271,7 +277,7 @@
             TLVMessageStream stream = super.acceptStream(channel);
             try {
                 InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
-                log.info("Accepted a new connection from node {}", IpPrefix.valueOf(sa.getAddress().getAddress()));
+                log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
                 stream.write(createHello(self));
 
             } catch (IOException e) {
@@ -285,31 +291,55 @@
             TLVMessageStream stream = super.connectStream(channel);
             DefaultControllerNode node = nodesByChannel.get(channel);
             if (node != null) {
-                log.info("Opened connection to node {}", node.id());
+                log.debug("Opened connection to node {}", node.id());
                 nodesByChannel.remove(channel);
             }
             return stream;
         }
 
         @Override
-        protected void connect(SelectionKey key) {
-            super.connect(key);
-            TLVMessageStream stream = (TLVMessageStream) key.attachment();
-            send(stream, createHello(self));
+        protected void connect(SelectionKey key) throws IOException {
+            try {
+                super.connect(key);
+                TLVMessageStream stream = (TLVMessageStream) key.attachment();
+                send(stream, createHello(self));
+            } catch (IOException e) {
+                if (!Objects.equals(e.getMessage(), "Connection refused")) {
+                    throw e;
+                }
+            }
+        }
+
+        @Override
+        protected void removeStream(MessageStream<TLVMessage> stream) {
+            DefaultControllerNode node = ((TLVMessageStream) stream).node();
+            if (node != null) {
+                log.info("Closed connection to node {}", node.id());
+                states.put(node.id(), State.INACTIVE);
+                streams.remove(node.id());
+            }
+            super.removeStream(stream);
         }
     }
 
-    // FIXME: pure hack for now
+    // Processes a HELLO message from a peer controller node.
     private void processHello(TLVMessage message, TLVMessageStream stream) {
+        // FIXME: pure hack for now
         String data = new String(message.data());
-        log.info("Processing hello with data [{}]", data);
-        String[] fields = new String(data).split(":");
+        String[] fields = data.split(":");
         DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]),
-                                                               IpPrefix.valueOf(fields[1]),
+                                                               valueOf(fields[1]),
                                                                Integer.parseInt(fields[2]));
         stream.setNode(node);
         nodes.put(node.id(), node);
         streams.put(node.id(), stream);
+        states.put(node.id(), State.ACTIVE);
+    }
+
+    // Processes an ECHO message from a peer controller node.
+    private void processEcho(TLVMessage message, TLVMessageStream tlvStream) {
+        // TODO: implement heart-beat refresh
+        log.info("Dealing with echoes...");
     }
 
     // Sends message to the specified stream.
@@ -321,6 +351,7 @@
         }
     }
 
+    // Creates a hello message to be sent to a peer controller node.
     private TLVMessage createHello(DefaultControllerNode self) {
         return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes());
     }
@@ -335,7 +366,7 @@
                     try {
                         openConnection(node, findLeastUtilizedLoop());
                     } catch (IOException e) {
-                        log.warn("Unable to connect", e);
+                        log.debug("Unable to connect", e);
                     }
                 }
             }