blob: 5e64a39c6ce6dd07e184f0af8bc0f12f9598ad95 [file] [log] [blame]
tom73094832014-09-29 13:47:08 -07001package org.onlab.onos.store.cluster.impl;
2
Madan Jampani890bc352014-10-01 22:35:29 -07003import com.google.common.cache.Cache;
4import com.google.common.cache.CacheBuilder;
5import com.google.common.cache.RemovalListener;
6import com.google.common.cache.RemovalNotification;
tom73094832014-09-29 13:47:08 -07007import com.google.common.collect.ImmutableSet;
Madan Jampani890bc352014-10-01 22:35:29 -07008
tom73094832014-09-29 13:47:08 -07009import org.apache.felix.scr.annotations.Activate;
tom73094832014-09-29 13:47:08 -070010import org.apache.felix.scr.annotations.Deactivate;
tom1d416c52014-09-29 20:55:24 -070011import org.apache.felix.scr.annotations.Reference;
12import org.apache.felix.scr.annotations.ReferenceCardinality;
tom73094832014-09-29 13:47:08 -070013import org.onlab.onos.cluster.ClusterEvent;
14import org.onlab.onos.cluster.ClusterStore;
15import org.onlab.onos.cluster.ClusterStoreDelegate;
16import org.onlab.onos.cluster.ControllerNode;
17import org.onlab.onos.cluster.DefaultControllerNode;
18import org.onlab.onos.cluster.NodeId;
19import org.onlab.onos.store.AbstractStore;
Madan Jampani890bc352014-10-01 22:35:29 -070020import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
Madan Jampani3b0dfd52014-10-02 16:48:13 -070021import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
tom73094832014-09-29 13:47:08 -070022import org.onlab.packet.IpPrefix;
23import org.slf4j.Logger;
24import org.slf4j.LoggerFactory;
25
26import java.io.IOException;
tom73094832014-09-29 13:47:08 -070027import java.util.Map;
28import java.util.Set;
tom73094832014-09-29 13:47:08 -070029import java.util.concurrent.ConcurrentHashMap;
Madan Jampani890bc352014-10-01 22:35:29 -070030import java.util.concurrent.TimeUnit;
tom73094832014-09-29 13:47:08 -070031
tom73094832014-09-29 13:47:08 -070032import static org.onlab.onos.cluster.ControllerNode.State;
33import static org.onlab.packet.IpPrefix.valueOf;
tom73094832014-09-29 13:47:08 -070034
35/**
36 * Distributed implementation of the cluster nodes store.
37 */
Yuta HIGUCHI3215ebd2014-10-07 14:24:37 -070038//@Component(immediate = true)
39//@Service
tom73094832014-09-29 13:47:08 -070040public class DistributedClusterStore
41 extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
42 implements ClusterStore {
43
tom73094832014-09-29 13:47:08 -070044 private final Logger log = LoggerFactory.getLogger(getClass());
45
tom1d416c52014-09-29 20:55:24 -070046 private DefaultControllerNode localNode;
tom73094832014-09-29 13:47:08 -070047 private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
48 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
Madan Jampani890bc352014-10-01 22:35:29 -070049 private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
50 .maximumSize(1000)
Madan Jampani3b0dfd52014-10-02 16:48:13 -070051 .expireAfterWrite(ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
Madan Jampani890bc352014-10-01 22:35:29 -070052 .removalListener(new LivenessCacheRemovalListener()).build();
tom73094832014-09-29 13:47:08 -070053
tom1d416c52014-09-29 20:55:24 -070054 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani890bc352014-10-01 22:35:29 -070055 private ClusterCommunicationAdminService clusterCommunicationAdminService;
tom73094832014-09-29 13:47:08 -070056
tom1d416c52014-09-29 20:55:24 -070057 private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
tom73094832014-09-29 13:47:08 -070058
59 @Activate
Madan Jampani890bc352014-10-01 22:35:29 -070060 public void activate() throws IOException {
tom73094832014-09-29 13:47:08 -070061 loadClusterDefinition();
tom1d416c52014-09-29 20:55:24 -070062 establishSelfIdentity();
tom81583142014-09-30 01:40:29 -070063
64 // Start-up the comm service and prime it with the loaded nodes.
Madan Jampani890bc352014-10-01 22:35:29 -070065 clusterCommunicationAdminService.initialize(localNode, nodesDelegate);
tom81583142014-09-30 01:40:29 -070066 for (DefaultControllerNode node : nodes.values()) {
Madan Jampani890bc352014-10-01 22:35:29 -070067 clusterCommunicationAdminService.addNode(node);
tom81583142014-09-30 01:40:29 -070068 }
tom73094832014-09-29 13:47:08 -070069 log.info("Started");
70 }
71
72 @Deactivate
73 public void deactivate() {
tom73094832014-09-29 13:47:08 -070074 log.info("Stopped");
75 }
76
tom1d416c52014-09-29 20:55:24 -070077 /**
78 * Loads the cluster definition file.
79 */
tom73094832014-09-29 13:47:08 -070080 private void loadClusterDefinition() {
tom1d416c52014-09-29 20:55:24 -070081 ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
tom73094832014-09-29 13:47:08 -070082 try {
tom1d416c52014-09-29 20:55:24 -070083 Set<DefaultControllerNode> storedNodes = cds.read();
84 for (DefaultControllerNode node : storedNodes) {
85 nodes.put(node.id(), node);
tom73094832014-09-29 13:47:08 -070086 }
87 } catch (IOException e) {
tom1d416c52014-09-29 20:55:24 -070088 log.error("Unable to read cluster definitions", e);
tom73094832014-09-29 13:47:08 -070089 }
90 }
91
92 /**
tom1d416c52014-09-29 20:55:24 -070093 * Determines who the local controller node is.
tom73094832014-09-29 13:47:08 -070094 */
tom1d416c52014-09-29 20:55:24 -070095 private void establishSelfIdentity() {
96 // Establishes the controller's own identity.
97 IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
98 localNode = nodes.get(new NodeId(ip.toString()));
tom73094832014-09-29 13:47:08 -070099
tom1d416c52014-09-29 20:55:24 -0700100 // As a fall-back, let's make sure we at least know who we are.
101 if (localNode == null) {
102 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
103 nodes.put(localNode.id(), localNode);
tom1d416c52014-09-29 20:55:24 -0700104 }
tom81583142014-09-30 01:40:29 -0700105 states.put(localNode.id(), State.ACTIVE);
tom73094832014-09-29 13:47:08 -0700106 }
107
108 @Override
109 public ControllerNode getLocalNode() {
tom1d416c52014-09-29 20:55:24 -0700110 return localNode;
tom73094832014-09-29 13:47:08 -0700111 }
112
113 @Override
114 public Set<ControllerNode> getNodes() {
115 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
116 return builder.addAll(nodes.values()).build();
117 }
118
119 @Override
120 public ControllerNode getNode(NodeId nodeId) {
121 return nodes.get(nodeId);
122 }
123
124 @Override
125 public State getState(NodeId nodeId) {
126 State state = states.get(nodeId);
127 return state == null ? State.INACTIVE : state;
128 }
129
130 @Override
131 public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
132 DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
133 nodes.put(nodeId, node);
Madan Jampani890bc352014-10-01 22:35:29 -0700134 clusterCommunicationAdminService.addNode(node);
tom73094832014-09-29 13:47:08 -0700135 return node;
136 }
137
138 @Override
139 public void removeNode(NodeId nodeId) {
tomd33e6402014-09-30 03:14:43 -0700140 if (nodeId.equals(localNode.id())) {
tomd33e6402014-09-30 03:14:43 -0700141 nodes.clear();
tom28e1fa22014-09-30 10:38:21 -0700142 nodes.put(localNode.id(), localNode);
143
tomd33e6402014-09-30 03:14:43 -0700144 } else {
145 // Remove the other node.
146 DefaultControllerNode node = nodes.remove(nodeId);
147 if (node != null) {
Madan Jampani890bc352014-10-01 22:35:29 -0700148 clusterCommunicationAdminService.removeNode(node);
tomd33e6402014-09-30 03:14:43 -0700149 }
tom5a8779c2014-09-29 14:48:43 -0700150 }
tom73094832014-09-29 13:47:08 -0700151 }
152
tom1d416c52014-09-29 20:55:24 -0700153 // Entity to handle back calls from the connection manager.
154 private class InnerNodesDelegate implements ClusterNodesDelegate {
155 @Override
tom81583142014-09-30 01:40:29 -0700156 public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
157 DefaultControllerNode node = nodes.get(nodeId);
158 if (node == null) {
159 node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
160 }
161 states.put(nodeId, State.ACTIVE);
Madan Jampani890bc352014-10-01 22:35:29 -0700162 livenessCache.put(nodeId, node);
tom81583142014-09-30 01:40:29 -0700163 return node;
tom73094832014-09-29 13:47:08 -0700164 }
tom28e1fa22014-09-30 10:38:21 -0700165
tom73094832014-09-29 13:47:08 -0700166 @Override
tom81583142014-09-30 01:40:29 -0700167 public void nodeVanished(NodeId nodeId) {
168 states.put(nodeId, State.INACTIVE);
tom73094832014-09-29 13:47:08 -0700169 }
tomd33e6402014-09-30 03:14:43 -0700170
171 @Override
172 public void nodeRemoved(NodeId nodeId) {
173 removeNode(nodeId);
174 }
tom73094832014-09-29 13:47:08 -0700175 }
tom81583142014-09-30 01:40:29 -0700176
Madan Jampani890bc352014-10-01 22:35:29 -0700177 private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> {
178
179 @Override
180 public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) {
181 NodeId nodeId = entry.getKey();
182 log.warn("Failed to receive heartbeats from controller: " + nodeId);
183 nodesDelegate.nodeVanished(nodeId);
184 }
185 }
tom73094832014-09-29 13:47:08 -0700186}