Merge remote-tracking branch 'origin/master'
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
index 3ec8c07..302a0c7 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
@@ -233,7 +233,7 @@
         }
 
         @Override
-        protected void connect(SelectionKey key) {
+        protected void connect(SelectionKey key) throws IOException {
             super.connect(key);
             TestMessageStream b = (TestMessageStream) key.attachment();
             Worker w = ((CustomIOLoop) b.loop()).worker;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index 08a182b..5cd9d9e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -30,6 +30,7 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -129,6 +130,7 @@
         if (self == null) {
             self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
             nodes.put(self.id(), self);
+            states.put(self.id(), State.ACTIVE);
         }
     }
 
@@ -219,7 +221,10 @@
     @Override
     public void removeNode(NodeId nodeId) {
         nodes.remove(nodeId);
-        streams.remove(nodeId);
+        TLVMessageStream stream = streams.remove(nodeId);
+        if (stream != null) {
+            stream.close();
+        }
     }
 
     // Listens and accepts inbound connections from other cluster nodes.
@@ -256,12 +261,13 @@
         protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
             TLVMessageStream tlvStream = (TLVMessageStream) stream;
             for (TLVMessage message : messages) {
-                // TODO: add type-based dispatching here...
-                log.info("Got message {}", message.type());
-
-                // FIXME: hack to get going
+                // TODO: add type-based dispatching here... this is just a hack to get going
                 if (message.type() == HELLO_MSG) {
                     processHello(message, tlvStream);
+                } else if (message.type() == ECHO_MSG) {
+                    processEcho(message, tlvStream);
+                } else {
+                    log.info("Deal with other messages");
                 }
             }
         }
@@ -271,7 +277,7 @@
             TLVMessageStream stream = super.acceptStream(channel);
             try {
                 InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
-                log.info("Accepted a new connection from node {}", IpPrefix.valueOf(sa.getAddress().getAddress()));
+                log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
                 stream.write(createHello(self));
 
             } catch (IOException e) {
@@ -285,31 +291,55 @@
             TLVMessageStream stream = super.connectStream(channel);
             DefaultControllerNode node = nodesByChannel.get(channel);
             if (node != null) {
-                log.info("Opened connection to node {}", node.id());
+                log.debug("Opened connection to node {}", node.id());
                 nodesByChannel.remove(channel);
             }
             return stream;
         }
 
         @Override
-        protected void connect(SelectionKey key) {
-            super.connect(key);
-            TLVMessageStream stream = (TLVMessageStream) key.attachment();
-            send(stream, createHello(self));
+        protected void connect(SelectionKey key) throws IOException {
+            try {
+                super.connect(key);
+                TLVMessageStream stream = (TLVMessageStream) key.attachment();
+                send(stream, createHello(self));
+            } catch (IOException e) {
+                if (!Objects.equals(e.getMessage(), "Connection refused")) {
+                    throw e;
+                }
+            }
+        }
+
+        @Override
+        protected void removeStream(MessageStream<TLVMessage> stream) {
+            DefaultControllerNode node = ((TLVMessageStream) stream).node();
+            if (node != null) {
+                log.info("Closed connection to node {}", node.id());
+                states.put(node.id(), State.INACTIVE);
+                streams.remove(node.id());
+            }
+            super.removeStream(stream);
         }
     }
 
-    // FIXME: pure hack for now
+    // Processes a HELLO message from a peer controller node.
     private void processHello(TLVMessage message, TLVMessageStream stream) {
+        // FIXME: pure hack for now
         String data = new String(message.data());
-        log.info("Processing hello with data [{}]", data);
-        String[] fields = new String(data).split(":");
+        String[] fields = data.split(":");
         DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]),
-                                                               IpPrefix.valueOf(fields[1]),
+                                                               valueOf(fields[1]),
                                                                Integer.parseInt(fields[2]));
         stream.setNode(node);
         nodes.put(node.id(), node);
         streams.put(node.id(), stream);
+        states.put(node.id(), State.ACTIVE);
+    }
+
+    // Processes an ECHO message from a peer controller node.
+    private void processEcho(TLVMessage message, TLVMessageStream tlvStream) {
+        // TODO: implement heart-beat refresh
+        log.info("Dealing with echoes...");
     }
 
     // Sends message to the specified stream.
@@ -321,6 +351,7 @@
         }
     }
 
+    // Creates a hello message to be sent to a peer controller node.
     private TLVMessage createHello(DefaultControllerNode self) {
         return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes());
     }
@@ -335,7 +366,7 @@
                     try {
                         openConnection(node, findLeastUtilizedLoop());
                     } catch (IOException e) {
-                        log.warn("Unable to connect", e);
+                        log.debug("Unable to connect", e);
                     }
                 }
             }
diff --git a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
index 805b58a..dc3ecaf 100644
--- a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
+++ b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
@@ -93,14 +93,9 @@
      *
      * @param key selection key holding the pending connect operation.
      */
-    protected void connect(SelectionKey key) {
-        try {
-            SocketChannel ch = (SocketChannel) key.channel();
-            ch.finishConnect();
-        } catch (IOException | IllegalStateException e) {
-            log.warn("Unable to complete connection", e);
-        }
-
+    protected void connect(SelectionKey key) throws IOException {
+        SocketChannel ch = (SocketChannel) key.channel();
+        ch.finishConnect();
         if (key.isValid()) {
             key.interestOps(SelectionKey.OP_READ);
         }
@@ -124,7 +119,11 @@
 
             // If there is a pending connect operation, complete it.
             if (key.isConnectable()) {
-                connect(key);
+                try {
+                    connect(key);
+                } catch (IOException | IllegalStateException e) {
+                    log.warn("Unable to complete connection", e);
+                }
             }
 
             // If there is a read operation, slurp as much data as possible.
diff --git a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
index a7416e9..c38f0f5 100644
--- a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
+++ b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
@@ -10,6 +10,7 @@
 import java.nio.channels.SelectionKey;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -262,7 +263,7 @@
                 try {
                     channel.write(outbound);
                 } catch (IOException e) {
-                    if (!closed && !e.getMessage().equals("Broken pipe")) {
+                    if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
                         log.warn("Unable to write data", e);
                         ioError = e;
                     }
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
index bdcc97a..bbeedd0 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
@@ -230,7 +230,7 @@
         }
 
         @Override
-        protected void connect(SelectionKey key) {
+        protected void connect(SelectionKey key) throws IOException {
             super.connect(key);
             TestMessageStream b = (TestMessageStream) key.attachment();
             Worker w = ((CustomIOLoop) b.loop()).worker;