Further simplified the store & connection manager relationship.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index ae04226..025804a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -14,7 +14,6 @@
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,20 +42,20 @@
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private CommunicationsDelegate commsDelegate;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private SerializationService serializationService;
+ private ClusterCommunicationAdminService communicationAdminService;
private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
- private ConnectionManager connectionManager;
@Activate
public void activate() {
loadClusterDefinition();
establishSelfIdentity();
- connectionManager = new ConnectionManager(localNode, nodesDelegate,
- commsDelegate, serializationService);
+
+ // Start-up the comm service and prime it with the loaded nodes.
+ communicationAdminService.startUp(localNode, nodesDelegate);
+ for (DefaultControllerNode node : nodes.values()) {
+ communicationAdminService.addNode(node);
+ }
log.info("Started");
}
@@ -92,8 +91,8 @@
if (localNode == null) {
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
nodes.put(localNode.id(), localNode);
- states.put(localNode.id(), State.ACTIVE);
}
+ states.put(localNode.id(), State.ACTIVE);
}
@Override
@@ -122,7 +121,7 @@
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
- connectionManager.addNode(node);
+ communicationAdminService.addNode(node);
return node;
}
@@ -130,21 +129,25 @@
public void removeNode(NodeId nodeId) {
DefaultControllerNode node = nodes.remove(nodeId);
if (node != null) {
- connectionManager.removeNode(node);
+ communicationAdminService.removeNode(node);
}
}
// Entity to handle back calls from the connection manager.
private class InnerNodesDelegate implements ClusterNodesDelegate {
@Override
- public void nodeDetected(DefaultControllerNode node) {
- nodes.put(node.id(), node);
- states.put(node.id(), State.ACTIVE);
+ public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
+ DefaultControllerNode node = nodes.get(nodeId);
+ if (node == null) {
+ node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
+ }
+ states.put(nodeId, State.ACTIVE);
+ return node;
}
-
@Override
- public void nodeVanished(DefaultControllerNode node) {
- states.put(node.id(), State.INACTIVE);
+ public void nodeVanished(NodeId nodeId) {
+ states.put(nodeId, State.INACTIVE);
}
}
+
}