| package org.onlab.onos.store.cluster.impl; |
| |
| import com.google.common.cache.Cache; |
| import com.google.common.cache.CacheBuilder; |
| import com.google.common.cache.RemovalListener; |
| import com.google.common.cache.RemovalNotification; |
| import com.google.common.collect.ImmutableSet; |
| |
| import org.apache.felix.scr.annotations.Activate; |
| import org.apache.felix.scr.annotations.Deactivate; |
| import org.apache.felix.scr.annotations.Reference; |
| import org.apache.felix.scr.annotations.ReferenceCardinality; |
| import org.onlab.onos.cluster.ClusterEvent; |
| import org.onlab.onos.cluster.ClusterStore; |
| import org.onlab.onos.cluster.ClusterStoreDelegate; |
| import org.onlab.onos.cluster.ControllerNode; |
| import org.onlab.onos.cluster.DefaultControllerNode; |
| import org.onlab.onos.cluster.NodeId; |
| import org.onlab.onos.store.AbstractStore; |
| import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService; |
| import org.onlab.packet.IpPrefix; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.TimeUnit; |
| |
| import static org.onlab.onos.cluster.ControllerNode.State; |
| import static org.onlab.packet.IpPrefix.valueOf; |
| |
| /** |
| * Distributed implementation of the cluster nodes store. |
| */ |
| //@Component(immediate = true) |
| //@Service |
| public class DistributedClusterStore |
| extends AbstractStore<ClusterEvent, ClusterStoreDelegate> |
| implements ClusterStore { |
| |
| private final Logger log = LoggerFactory.getLogger(getClass()); |
| |
| private DefaultControllerNode localNode; |
| private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>(); |
| private final Map<NodeId, State> states = new ConcurrentHashMap<>(); |
| private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder() |
| .maximumSize(1000) |
| .expireAfterWrite(/*ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * */3, TimeUnit.MILLISECONDS) |
| .removalListener(new LivenessCacheRemovalListener()).build(); |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| private ClusterCommunicationAdminService clusterCommunicationAdminService; |
| |
| private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate(); |
| |
| @Activate |
| public void activate() throws IOException { |
| loadClusterDefinition(); |
| establishSelfIdentity(); |
| |
| // Start-up the comm service and prime it with the loaded nodes. |
| clusterCommunicationAdminService.initialize(localNode, nodesDelegate); |
| for (DefaultControllerNode node : nodes.values()) { |
| clusterCommunicationAdminService.addNode(node); |
| } |
| log.info("Started"); |
| } |
| |
| @Deactivate |
| public void deactivate() { |
| log.info("Stopped"); |
| } |
| |
| /** |
| * Loads the cluster definition file. |
| */ |
| private void loadClusterDefinition() { |
| ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json"); |
| try { |
| Set<DefaultControllerNode> storedNodes = cds.read(); |
| for (DefaultControllerNode node : storedNodes) { |
| nodes.put(node.id(), node); |
| } |
| } catch (IOException e) { |
| log.error("Unable to read cluster definitions", e); |
| } |
| } |
| |
| /** |
| * Determines who the local controller node is. |
| */ |
| private void establishSelfIdentity() { |
| // Establishes the controller's own identity. |
| IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1")); |
| localNode = nodes.get(new NodeId(ip.toString())); |
| |
| // As a fall-back, let's make sure we at least know who we are. |
| if (localNode == null) { |
| localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip); |
| nodes.put(localNode.id(), localNode); |
| } |
| states.put(localNode.id(), State.ACTIVE); |
| } |
| |
| @Override |
| public ControllerNode getLocalNode() { |
| return localNode; |
| } |
| |
| @Override |
| public Set<ControllerNode> getNodes() { |
| ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder(); |
| return builder.addAll(nodes.values()).build(); |
| } |
| |
| @Override |
| public ControllerNode getNode(NodeId nodeId) { |
| return nodes.get(nodeId); |
| } |
| |
| @Override |
| public State getState(NodeId nodeId) { |
| State state = states.get(nodeId); |
| return state == null ? State.INACTIVE : state; |
| } |
| |
| @Override |
| public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) { |
| DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort); |
| nodes.put(nodeId, node); |
| clusterCommunicationAdminService.addNode(node); |
| return node; |
| } |
| |
| @Override |
| public void removeNode(NodeId nodeId) { |
| if (nodeId.equals(localNode.id())) { |
| nodes.clear(); |
| nodes.put(localNode.id(), localNode); |
| |
| } else { |
| // Remove the other node. |
| DefaultControllerNode node = nodes.remove(nodeId); |
| if (node != null) { |
| clusterCommunicationAdminService.removeNode(node); |
| } |
| } |
| } |
| |
| // Entity to handle back calls from the connection manager. |
| private class InnerNodesDelegate implements ClusterNodesDelegate { |
| @Override |
| public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) { |
| DefaultControllerNode node = nodes.get(nodeId); |
| if (node == null) { |
| node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort); |
| } |
| states.put(nodeId, State.ACTIVE); |
| livenessCache.put(nodeId, node); |
| return node; |
| } |
| |
| @Override |
| public void nodeVanished(NodeId nodeId) { |
| states.put(nodeId, State.INACTIVE); |
| } |
| |
| @Override |
| public void nodeRemoved(NodeId nodeId) { |
| removeNode(nodeId); |
| } |
| } |
| |
| private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> { |
| |
| @Override |
| public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) { |
| NodeId nodeId = entry.getKey(); |
| log.warn("Failed to receive heartbeats from controller: " + nodeId); |
| nodesDelegate.nodeVanished(nodeId); |
| } |
| } |
| } |