Merge remote-tracking branch 'origin/master'
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
index bac32e9..2e2887c 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManager.java
@@ -12,11 +12,13 @@
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMembershipMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
-import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
+import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
import org.onlab.onos.store.cluster.messaging.MessageSubscriber;
+import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
@@ -84,16 +86,20 @@
private final Timer timer = new Timer("onos-comm-initiator");
private final TimerTask connectionCustodian = new ConnectionCustodian();
- private GoodbyeSubscriber goodbyeSubscriber = new GoodbyeSubscriber();
+ private MembershipSubscriber membershipSubscriber = new MembershipSubscriber();
@Activate
public void activate() {
- addSubscriber(MessageSubject.GOODBYE, goodbyeSubscriber);
+ addSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
+ addSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
log.info("Activated but waiting for delegate");
}
@Deactivate
public void deactivate() {
+ removeSubscriber(MessageSubject.NEW_MEMBER, membershipSubscriber);
+ removeSubscriber(MessageSubject.LEAVING_MEMBER, membershipSubscriber);
+
connectionCustodian.cancel();
if (connectionListener != null) {
connectionListener.shutdown();
@@ -154,7 +160,7 @@
@Override
public void removeNode(DefaultControllerNode node) {
- send(new GoodbyeMessage(node.id()));
+ send(new LeavingMemberMessage(node.id()));
nodes.remove(node);
ClusterMessageStream stream = streams.remove(node.id());
if (stream != null) {
@@ -177,7 +183,7 @@
@Override
public void clearAllNodesAndStreams() {
nodes.clear();
- send(new GoodbyeMessage(localNode.id()));
+ send(new LeavingMemberMessage(localNode.id()));
for (ClusterMessageStream stream : streams.values()) {
stream.close();
}
@@ -187,7 +193,7 @@
/**
* Dispatches the specified message to all subscribers to its subject.
*
- * @param message message to dispatch
+ * @param message message to dispatch
* @param fromNodeId node from which the message was received
*/
void dispatch(ClusterMessage message, NodeId fromNodeId) {
@@ -200,7 +206,7 @@
}
/**
- * Removes the stream associated with the specified node.
+ * Adds the stream associated with the specified node.
*
* @param nodeId newly detected cluster node id
* @param ip node IP listen address
@@ -212,6 +218,7 @@
DefaultControllerNode node = nodesDelegate.nodeDetected(nodeId, ip, tcpPort);
stream.setNode(node);
streams.put(node.id(), stream);
+ send(new NewMemberMessage(node.id(), node.ip(), node.tcpPort()));
return node;
}
@@ -329,11 +336,19 @@
}
}
- private class GoodbyeSubscriber implements MessageSubscriber {
+ private class MembershipSubscriber implements MessageSubscriber {
@Override
public void receive(ClusterMessage message, NodeId fromNodeId) {
- log.info("Received goodbye message from {}", fromNodeId);
- nodesDelegate.nodeRemoved(fromNodeId);
+ MessageSubject subject = message.subject();
+ ClusterMembershipMessage cmm = (ClusterMembershipMessage) message;
+ if (message.subject() == MessageSubject.NEW_MEMBER) {
+ log.info("Node {} arrived", cmm.nodeId());
+ nodesDelegate.nodeDetected(cmm.nodeId(), cmm.ipAddress(), cmm.tcpPort());
+
+ } else if (subject == MessageSubject.LEAVING_MEMBER) {
+ log.info("Node {} is leaving", cmm.nodeId());
+ nodesDelegate.nodeRemoved(cmm.nodeId());
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index 647690e..d4b7289 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -128,10 +128,11 @@
@Override
public void removeNode(NodeId nodeId) {
if (nodeId.equals(localNode.id())) {
- // FIXME: this is still broken
// We are being ejected from the cluster, so remove all other nodes.
communicationAdminService.clearAllNodesAndStreams();
nodes.clear();
+ nodes.put(localNode.id(), localNode);
+
} else {
// Remove the other node.
DefaultControllerNode node = nodes.remove(nodeId);
@@ -152,6 +153,7 @@
states.put(nodeId, State.ACTIVE);
return node;
}
+
@Override
public void nodeVanished(NodeId nodeId) {
states.put(nodeId, State.INACTIVE);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
index 260d2b2..c6ebca9 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
@@ -23,9 +23,10 @@
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.EchoMessage;
-import org.onlab.onos.store.cluster.messaging.GoodbyeMessage;
+import org.onlab.onos.store.cluster.messaging.LeavingMemberMessage;
import org.onlab.onos.store.cluster.messaging.HelloMessage;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.NewMemberMessage;
import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.onos.store.serializers.ConnectPointSerializer;
import org.onlab.onos.store.serializers.DefaultLinkSerializer;
@@ -97,7 +98,8 @@
MessageSubject.class,
HelloMessage.class,
- GoodbyeMessage.class,
+ NewMemberMessage.class,
+ LeavingMemberMessage.class,
EchoMessage.class
)
.register(IpPrefix.class, new IpPrefixSerializer())
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMembershipMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMembershipMessage.java
new file mode 100644
index 0000000..ea00185
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMembershipMessage.java
@@ -0,0 +1,66 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
+
+/**
+ * Base for cluster membership messages.
+ */
+public abstract class ClusterMembershipMessage extends ClusterMessage {
+
+ private NodeId nodeId;
+ private IpPrefix ipAddress;
+ private int tcpPort;
+
+ // For serialization
+ protected ClusterMembershipMessage() {
+ super(MessageSubject.HELLO);
+ nodeId = null;
+ ipAddress = null;
+ tcpPort = 0;
+ }
+
+ /**
+ * Creates a new membership message for the specified end-point data.
+ *
+ * @param subject message subject
+ * @param nodeId sending node identification
+ * @param ipAddress sending node IP address
+ * @param tcpPort sending node TCP port
+ */
+ protected ClusterMembershipMessage(MessageSubject subject, NodeId nodeId,
+ IpPrefix ipAddress, int tcpPort) {
+ super(subject);
+ this.nodeId = nodeId;
+ this.ipAddress = ipAddress;
+ this.tcpPort = tcpPort;
+ }
+
+ /**
+ * Returns the sending node identifer.
+ *
+ * @return node identifier
+ */
+ public NodeId nodeId() {
+ return nodeId;
+ }
+
+ /**
+ * Returns the sending node IP address.
+ *
+ * @return node IP address
+ */
+ public IpPrefix ipAddress() {
+ return ipAddress;
+ }
+
+ /**
+ * Returns the sending node TCP listen port.
+ *
+ * @return TCP listen port
+ */
+ public int tcpPort() {
+ return tcpPort;
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/GoodbyeMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/GoodbyeMessage.java
deleted file mode 100644
index e9326f3..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/GoodbyeMessage.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package org.onlab.onos.store.cluster.messaging;
-
-import org.onlab.onos.cluster.NodeId;
-
-/**
- * Goodbye message that nodes use to leave the cluster for good.
- */
-public class GoodbyeMessage extends ClusterMessage {
-
- private NodeId nodeId;
-
- // For serialization
- private GoodbyeMessage() {
- super(MessageSubject.GOODBYE);
- nodeId = null;
- }
-
- /**
- * Creates a new goodbye message.
- *
- * @param nodeId sending node identification
- */
- public GoodbyeMessage(NodeId nodeId) {
- super(MessageSubject.HELLO);
- this.nodeId = nodeId;
- }
-
- /**
- * Returns the sending node identifer.
- *
- * @return node identifier
- */
- public NodeId nodeId() {
- return nodeId;
- }
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
index 923e21e..d692e4e 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/HelloMessage.java
@@ -6,18 +6,10 @@
/**
* Hello message that nodes use to greet each other.
*/
-public class HelloMessage extends ClusterMessage {
-
- private NodeId nodeId;
- private IpPrefix ipAddress;
- private int tcpPort;
+public class HelloMessage extends ClusterMembershipMessage {
// For serialization
private HelloMessage() {
- super(MessageSubject.HELLO);
- nodeId = null;
- ipAddress = null;
- tcpPort = 0;
}
/**
@@ -28,37 +20,7 @@
* @param tcpPort sending node TCP port
*/
public HelloMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
- super(MessageSubject.HELLO);
- this.nodeId = nodeId;
- this.ipAddress = ipAddress;
- this.tcpPort = tcpPort;
- }
-
- /**
- * Returns the sending node identifer.
- *
- * @return node identifier
- */
- public NodeId nodeId() {
- return nodeId;
- }
-
- /**
- * Returns the sending node IP address.
- *
- * @return node IP address
- */
- public IpPrefix ipAddress() {
- return ipAddress;
- }
-
- /**
- * Returns the sending node TCP listen port.
- *
- * @return TCP listen port
- */
- public int tcpPort() {
- return tcpPort;
+ super(MessageSubject.HELLO, nodeId, ipAddress, tcpPort);
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/LeavingMemberMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/LeavingMemberMessage.java
new file mode 100644
index 0000000..59686b8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/LeavingMemberMessage.java
@@ -0,0 +1,24 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+
+/**
+ * Announcement message that nodes use to gossip about team departures.
+ */
+public class LeavingMemberMessage extends ClusterMembershipMessage {
+
+ // For serialization
+ private LeavingMemberMessage() {
+ super();
+ }
+
+ /**
+ * Creates a new goodbye message.
+ *
+ * @param nodeId sending node identification
+ */
+ public LeavingMemberMessage(NodeId nodeId) {
+ super(MessageSubject.LEAVING_MEMBER, nodeId, null, 0);
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index bf86c5b..c7badf2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -8,8 +8,11 @@
/** Represents a first greeting message. */
HELLO,
- /** Signifies node's intent to leave the cluster. */
- GOODBYE,
+ /** Signifies announcement about new member. */
+ NEW_MEMBER,
+
+ /** Signifies announcement about leaving member. */
+ LEAVING_MEMBER,
/** Signifies a heart-beat message. */
ECHO
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/NewMemberMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/NewMemberMessage.java
new file mode 100644
index 0000000..53bc282
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/NewMemberMessage.java
@@ -0,0 +1,26 @@
+package org.onlab.onos.store.cluster.messaging;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.packet.IpPrefix;
+
+/**
+ * Announcement message that nodes use to gossip about new arrivals.
+ */
+public class NewMemberMessage extends ClusterMembershipMessage {
+
+ // For serialization
+ private NewMemberMessage() {
+ }
+
+ /**
+ * Creates a new gossip message for the specified end-point data.
+ *
+ * @param nodeId sending node identification
+ * @param ipAddress sending node IP address
+ * @param tcpPort sending node TCP port
+ */
+ public NewMemberMessage(NodeId nodeId, IpPrefix ipAddress, int tcpPort) {
+ super(MessageSubject.NEW_MEMBER, nodeId, ipAddress, tcpPort);
+ }
+
+}