Merge remote-tracking branch 'origin/master'
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
index 3ec8c07..302a0c7 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/IOLoopTestClient.java
@@ -233,7 +233,7 @@
}
@Override
- protected void connect(SelectionKey key) {
+ protected void connect(SelectionKey key) throws IOException {
super.connect(key);
TestMessageStream b = (TestMessageStream) key.attachment();
Worker w = ((CustomIOLoop) b.loop()).worker;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index 08a182b..5cd9d9e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -129,6 +130,7 @@
if (self == null) {
self = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(self.id(), self);
+ states.put(self.id(), State.ACTIVE);
}
}
@@ -219,7 +221,10 @@
@Override
public void removeNode(NodeId nodeId) {
nodes.remove(nodeId);
- streams.remove(nodeId);
+ TLVMessageStream stream = streams.remove(nodeId);
+ if (stream != null) {
+ stream.close();
+ }
}
// Listens and accepts inbound connections from other cluster nodes.
@@ -256,12 +261,13 @@
protected void processMessages(List<TLVMessage> messages, MessageStream<TLVMessage> stream) {
TLVMessageStream tlvStream = (TLVMessageStream) stream;
for (TLVMessage message : messages) {
- // TODO: add type-based dispatching here...
- log.info("Got message {}", message.type());
-
- // FIXME: hack to get going
+ // TODO: add type-based dispatching here... this is just a hack to get going
if (message.type() == HELLO_MSG) {
processHello(message, tlvStream);
+ } else if (message.type() == ECHO_MSG) {
+ processEcho(message, tlvStream);
+ } else {
+ log.info("Deal with other messages");
}
}
}
@@ -271,7 +277,7 @@
TLVMessageStream stream = super.acceptStream(channel);
try {
InetSocketAddress sa = (InetSocketAddress) channel.getRemoteAddress();
- log.info("Accepted a new connection from node {}", IpPrefix.valueOf(sa.getAddress().getAddress()));
+ log.info("Accepted connection from node {}", valueOf(sa.getAddress().getAddress()));
stream.write(createHello(self));
} catch (IOException e) {
@@ -285,31 +291,55 @@
TLVMessageStream stream = super.connectStream(channel);
DefaultControllerNode node = nodesByChannel.get(channel);
if (node != null) {
- log.info("Opened connection to node {}", node.id());
+ log.debug("Opened connection to node {}", node.id());
nodesByChannel.remove(channel);
}
return stream;
}
@Override
- protected void connect(SelectionKey key) {
- super.connect(key);
- TLVMessageStream stream = (TLVMessageStream) key.attachment();
- send(stream, createHello(self));
+ protected void connect(SelectionKey key) throws IOException {
+ try {
+ super.connect(key);
+ TLVMessageStream stream = (TLVMessageStream) key.attachment();
+ send(stream, createHello(self));
+ } catch (IOException e) {
+ if (!Objects.equals(e.getMessage(), "Connection refused")) {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ protected void removeStream(MessageStream<TLVMessage> stream) {
+ DefaultControllerNode node = ((TLVMessageStream) stream).node();
+ if (node != null) {
+ log.info("Closed connection to node {}", node.id());
+ states.put(node.id(), State.INACTIVE);
+ streams.remove(node.id());
+ }
+ super.removeStream(stream);
}
}
- // FIXME: pure hack for now
+ // Processes a HELLO message from a peer controller node.
private void processHello(TLVMessage message, TLVMessageStream stream) {
+ // FIXME: pure hack for now
String data = new String(message.data());
- log.info("Processing hello with data [{}]", data);
- String[] fields = new String(data).split(":");
+ String[] fields = data.split(":");
DefaultControllerNode node = new DefaultControllerNode(new NodeId(fields[0]),
- IpPrefix.valueOf(fields[1]),
+ valueOf(fields[1]),
Integer.parseInt(fields[2]));
stream.setNode(node);
nodes.put(node.id(), node);
streams.put(node.id(), stream);
+ states.put(node.id(), State.ACTIVE);
+ }
+
+ // Processes an ECHO message from a peer controller node.
+ private void processEcho(TLVMessage message, TLVMessageStream tlvStream) {
+ // TODO: implement heart-beat refresh
+ log.info("Dealing with echoes...");
}
// Sends message to the specified stream.
@@ -321,6 +351,7 @@
}
}
+ // Creates a hello message to be sent to a peer controller node.
private TLVMessage createHello(DefaultControllerNode self) {
return new TLVMessage(HELLO_MSG, (self.id() + ":" + self.ip() + ":" + self.tcpPort()).getBytes());
}
@@ -335,7 +366,7 @@
try {
openConnection(node, findLeastUtilizedLoop());
} catch (IOException e) {
- log.warn("Unable to connect", e);
+ log.debug("Unable to connect", e);
}
}
}
diff --git a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
index 805b58a..dc3ecaf 100644
--- a/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
+++ b/utils/nio/src/main/java/org/onlab/nio/IOLoop.java
@@ -93,14 +93,9 @@
*
* @param key selection key holding the pending connect operation.
*/
- protected void connect(SelectionKey key) {
- try {
- SocketChannel ch = (SocketChannel) key.channel();
- ch.finishConnect();
- } catch (IOException | IllegalStateException e) {
- log.warn("Unable to complete connection", e);
- }
-
+ protected void connect(SelectionKey key) throws IOException {
+ SocketChannel ch = (SocketChannel) key.channel();
+ ch.finishConnect();
if (key.isValid()) {
key.interestOps(SelectionKey.OP_READ);
}
@@ -124,7 +119,11 @@
// If there is a pending connect operation, complete it.
if (key.isConnectable()) {
- connect(key);
+ try {
+ connect(key);
+ } catch (IOException | IllegalStateException e) {
+ log.warn("Unable to complete connection", e);
+ }
}
// If there is a read operation, slurp as much data as possible.
diff --git a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
index a7416e9..c38f0f5 100644
--- a/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
+++ b/utils/nio/src/main/java/org/onlab/nio/MessageStream.java
@@ -10,6 +10,7 @@
import java.nio.channels.SelectionKey;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -262,7 +263,7 @@
try {
channel.write(outbound);
} catch (IOException e) {
- if (!closed && !e.getMessage().equals("Broken pipe")) {
+ if (!closed && !Objects.equals(e.getMessage(), "Broken pipe")) {
log.warn("Unable to write data", e);
ioError = e;
}
diff --git a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
index bdcc97a..bbeedd0 100644
--- a/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
+++ b/utils/nio/src/test/java/org/onlab/nio/IOLoopTestClient.java
@@ -230,7 +230,7 @@
}
@Override
- protected void connect(SelectionKey key) {
+ protected void connect(SelectionKey key) throws IOException {
super.connect(key);
TestMessageStream b = (TestMessageStream) key.attachment();
Worker w = ((CustomIOLoop) b.loop()).worker;