blob: e25c9640127890437c0d92415b93126f188ea69b [file] [log] [blame]
tom73094832014-09-29 13:47:08 -07001package org.onlab.onos.store.cluster.impl;
2
Madan Jampani890bc352014-10-01 22:35:29 -07003import com.google.common.cache.Cache;
4import com.google.common.cache.CacheBuilder;
5import com.google.common.cache.RemovalListener;
6import com.google.common.cache.RemovalNotification;
tom73094832014-09-29 13:47:08 -07007import com.google.common.collect.ImmutableSet;
Madan Jampani890bc352014-10-01 22:35:29 -07008
tom73094832014-09-29 13:47:08 -07009import org.apache.felix.scr.annotations.Activate;
10import org.apache.felix.scr.annotations.Component;
11import org.apache.felix.scr.annotations.Deactivate;
tom1d416c52014-09-29 20:55:24 -070012import org.apache.felix.scr.annotations.Reference;
13import org.apache.felix.scr.annotations.ReferenceCardinality;
tom73094832014-09-29 13:47:08 -070014import org.apache.felix.scr.annotations.Service;
tom73094832014-09-29 13:47:08 -070015import org.onlab.onos.cluster.ClusterEvent;
16import org.onlab.onos.cluster.ClusterStore;
17import org.onlab.onos.cluster.ClusterStoreDelegate;
18import org.onlab.onos.cluster.ControllerNode;
19import org.onlab.onos.cluster.DefaultControllerNode;
20import org.onlab.onos.cluster.NodeId;
21import org.onlab.onos.store.AbstractStore;
Madan Jampani890bc352014-10-01 22:35:29 -070022import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
23import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager;
tom73094832014-09-29 13:47:08 -070024import org.onlab.packet.IpPrefix;
25import org.slf4j.Logger;
26import org.slf4j.LoggerFactory;
27
28import java.io.IOException;
tom73094832014-09-29 13:47:08 -070029import java.util.Map;
30import java.util.Set;
tom73094832014-09-29 13:47:08 -070031import java.util.concurrent.ConcurrentHashMap;
Madan Jampani890bc352014-10-01 22:35:29 -070032import java.util.concurrent.TimeUnit;
tom73094832014-09-29 13:47:08 -070033
tom73094832014-09-29 13:47:08 -070034import static org.onlab.onos.cluster.ControllerNode.State;
35import static org.onlab.packet.IpPrefix.valueOf;
tom73094832014-09-29 13:47:08 -070036
37/**
38 * Distributed implementation of the cluster nodes store.
39 */
40@Component(immediate = true)
41@Service
42public class DistributedClusterStore
43 extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
44 implements ClusterStore {
45
tom73094832014-09-29 13:47:08 -070046 private final Logger log = LoggerFactory.getLogger(getClass());
47
tom1d416c52014-09-29 20:55:24 -070048 private DefaultControllerNode localNode;
tom73094832014-09-29 13:47:08 -070049 private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
50 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
Madan Jampani890bc352014-10-01 22:35:29 -070051 private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
52 .maximumSize(1000)
53 .expireAfterWrite(OnosClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
54 .removalListener(new LivenessCacheRemovalListener()).build();
tom73094832014-09-29 13:47:08 -070055
tom1d416c52014-09-29 20:55:24 -070056 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani890bc352014-10-01 22:35:29 -070057 private ClusterCommunicationAdminService clusterCommunicationAdminService;
tom73094832014-09-29 13:47:08 -070058
tom1d416c52014-09-29 20:55:24 -070059 private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
tom73094832014-09-29 13:47:08 -070060
61 @Activate
Madan Jampani890bc352014-10-01 22:35:29 -070062 public void activate() throws IOException {
tom73094832014-09-29 13:47:08 -070063 loadClusterDefinition();
tom1d416c52014-09-29 20:55:24 -070064 establishSelfIdentity();
tom81583142014-09-30 01:40:29 -070065
66 // Start-up the comm service and prime it with the loaded nodes.
Madan Jampani890bc352014-10-01 22:35:29 -070067 clusterCommunicationAdminService.initialize(localNode, nodesDelegate);
tom81583142014-09-30 01:40:29 -070068 for (DefaultControllerNode node : nodes.values()) {
Madan Jampani890bc352014-10-01 22:35:29 -070069 clusterCommunicationAdminService.addNode(node);
tom81583142014-09-30 01:40:29 -070070 }
tom73094832014-09-29 13:47:08 -070071 log.info("Started");
72 }
73
74 @Deactivate
75 public void deactivate() {
tom73094832014-09-29 13:47:08 -070076 log.info("Stopped");
77 }
78
tom1d416c52014-09-29 20:55:24 -070079 /**
80 * Loads the cluster definition file.
81 */
tom73094832014-09-29 13:47:08 -070082 private void loadClusterDefinition() {
tom1d416c52014-09-29 20:55:24 -070083 ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
tom73094832014-09-29 13:47:08 -070084 try {
tom1d416c52014-09-29 20:55:24 -070085 Set<DefaultControllerNode> storedNodes = cds.read();
86 for (DefaultControllerNode node : storedNodes) {
87 nodes.put(node.id(), node);
tom73094832014-09-29 13:47:08 -070088 }
89 } catch (IOException e) {
tom1d416c52014-09-29 20:55:24 -070090 log.error("Unable to read cluster definitions", e);
tom73094832014-09-29 13:47:08 -070091 }
92 }
93
94 /**
tom1d416c52014-09-29 20:55:24 -070095 * Determines who the local controller node is.
tom73094832014-09-29 13:47:08 -070096 */
tom1d416c52014-09-29 20:55:24 -070097 private void establishSelfIdentity() {
98 // Establishes the controller's own identity.
99 IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
100 localNode = nodes.get(new NodeId(ip.toString()));
tom73094832014-09-29 13:47:08 -0700101
tom1d416c52014-09-29 20:55:24 -0700102 // As a fall-back, let's make sure we at least know who we are.
103 if (localNode == null) {
104 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
105 nodes.put(localNode.id(), localNode);
tom1d416c52014-09-29 20:55:24 -0700106 }
tom81583142014-09-30 01:40:29 -0700107 states.put(localNode.id(), State.ACTIVE);
tom73094832014-09-29 13:47:08 -0700108 }
109
110 @Override
111 public ControllerNode getLocalNode() {
tom1d416c52014-09-29 20:55:24 -0700112 return localNode;
tom73094832014-09-29 13:47:08 -0700113 }
114
115 @Override
116 public Set<ControllerNode> getNodes() {
117 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
118 return builder.addAll(nodes.values()).build();
119 }
120
121 @Override
122 public ControllerNode getNode(NodeId nodeId) {
123 return nodes.get(nodeId);
124 }
125
126 @Override
127 public State getState(NodeId nodeId) {
128 State state = states.get(nodeId);
129 return state == null ? State.INACTIVE : state;
130 }
131
132 @Override
133 public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
134 DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
135 nodes.put(nodeId, node);
Madan Jampani890bc352014-10-01 22:35:29 -0700136 clusterCommunicationAdminService.addNode(node);
tom73094832014-09-29 13:47:08 -0700137 return node;
138 }
139
140 @Override
141 public void removeNode(NodeId nodeId) {
tomd33e6402014-09-30 03:14:43 -0700142 if (nodeId.equals(localNode.id())) {
tomd33e6402014-09-30 03:14:43 -0700143 nodes.clear();
tom28e1fa22014-09-30 10:38:21 -0700144 nodes.put(localNode.id(), localNode);
145
tomd33e6402014-09-30 03:14:43 -0700146 } else {
147 // Remove the other node.
148 DefaultControllerNode node = nodes.remove(nodeId);
149 if (node != null) {
Madan Jampani890bc352014-10-01 22:35:29 -0700150 clusterCommunicationAdminService.removeNode(node);
tomd33e6402014-09-30 03:14:43 -0700151 }
tom5a8779c2014-09-29 14:48:43 -0700152 }
tom73094832014-09-29 13:47:08 -0700153 }
154
tom1d416c52014-09-29 20:55:24 -0700155 // Entity to handle back calls from the connection manager.
156 private class InnerNodesDelegate implements ClusterNodesDelegate {
157 @Override
tom81583142014-09-30 01:40:29 -0700158 public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
159 DefaultControllerNode node = nodes.get(nodeId);
160 if (node == null) {
161 node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
162 }
163 states.put(nodeId, State.ACTIVE);
Madan Jampani890bc352014-10-01 22:35:29 -0700164 livenessCache.put(nodeId, node);
tom81583142014-09-30 01:40:29 -0700165 return node;
tom73094832014-09-29 13:47:08 -0700166 }
tom28e1fa22014-09-30 10:38:21 -0700167
tom73094832014-09-29 13:47:08 -0700168 @Override
tom81583142014-09-30 01:40:29 -0700169 public void nodeVanished(NodeId nodeId) {
170 states.put(nodeId, State.INACTIVE);
tom73094832014-09-29 13:47:08 -0700171 }
tomd33e6402014-09-30 03:14:43 -0700172
173 @Override
174 public void nodeRemoved(NodeId nodeId) {
175 removeNode(nodeId);
176 }
tom73094832014-09-29 13:47:08 -0700177 }
tom81583142014-09-30 01:40:29 -0700178
Madan Jampani890bc352014-10-01 22:35:29 -0700179 private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> {
180
181 @Override
182 public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) {
183 NodeId nodeId = entry.getKey();
184 log.warn("Failed to receive heartbeats from controller: " + nodeId);
185 nodesDelegate.nodeVanished(nodeId);
186 }
187 }
tom73094832014-09-29 13:47:08 -0700188}