Added Netty based messaging. Updated cluster management to use Netty based messaging
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index d4b7289..e25c964 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -1,6 +1,11 @@
package org.onlab.onos.store.cluster.impl;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.ImmutableSet;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -14,6 +19,8 @@
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
+import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,6 +29,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import static org.onlab.onos.cluster.ControllerNode.State;
import static org.onlab.packet.IpPrefix.valueOf;
@@ -40,21 +48,25 @@
private DefaultControllerNode localNode;
private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
+ private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
+ .maximumSize(1000)
+ .expireAfterWrite(OnosClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
+ .removalListener(new LivenessCacheRemovalListener()).build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- private ClusterCommunicationAdminService communicationAdminService;
+ private ClusterCommunicationAdminService clusterCommunicationAdminService;
private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
@Activate
- public void activate() {
+ public void activate() throws IOException {
loadClusterDefinition();
establishSelfIdentity();
// Start-up the comm service and prime it with the loaded nodes.
- communicationAdminService.startUp(localNode, nodesDelegate);
+ clusterCommunicationAdminService.initialize(localNode, nodesDelegate);
for (DefaultControllerNode node : nodes.values()) {
- communicationAdminService.addNode(node);
+ clusterCommunicationAdminService.addNode(node);
}
log.info("Started");
}
@@ -121,15 +133,13 @@
public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
nodes.put(nodeId, node);
- communicationAdminService.addNode(node);
+ clusterCommunicationAdminService.addNode(node);
return node;
}
@Override
public void removeNode(NodeId nodeId) {
if (nodeId.equals(localNode.id())) {
- // We are being ejected from the cluster, so remove all other nodes.
- communicationAdminService.clearAllNodesAndStreams();
nodes.clear();
nodes.put(localNode.id(), localNode);
@@ -137,7 +147,7 @@
// Remove the other node.
DefaultControllerNode node = nodes.remove(nodeId);
if (node != null) {
- communicationAdminService.removeNode(node);
+ clusterCommunicationAdminService.removeNode(node);
}
}
}
@@ -151,6 +161,7 @@
node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
}
states.put(nodeId, State.ACTIVE);
+ livenessCache.put(nodeId, node);
return node;
}
@@ -165,4 +176,13 @@
}
}
+ private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> {
+
+ @Override
+ public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) {
+ NodeId nodeId = entry.getKey();
+ log.warn("Failed to receive heartbeats from controller: " + nodeId);
+ nodesDelegate.nodeVanished(nodeId);
+ }
+ }
}