blob: 5708e7768a79897bf7dbdb59445f91183db36e2e [file] [log] [blame]
tom73094832014-09-29 13:47:08 -07001package org.onlab.onos.store.cluster.impl;
2
Madan Jampani890bc352014-10-01 22:35:29 -07003import com.google.common.cache.Cache;
4import com.google.common.cache.CacheBuilder;
5import com.google.common.cache.RemovalListener;
6import com.google.common.cache.RemovalNotification;
tom73094832014-09-29 13:47:08 -07007import com.google.common.collect.ImmutableSet;
Madan Jampani890bc352014-10-01 22:35:29 -07008
tom73094832014-09-29 13:47:08 -07009import org.apache.felix.scr.annotations.Activate;
tom73094832014-09-29 13:47:08 -070010import org.apache.felix.scr.annotations.Deactivate;
tom1d416c52014-09-29 20:55:24 -070011import org.apache.felix.scr.annotations.Reference;
12import org.apache.felix.scr.annotations.ReferenceCardinality;
tom73094832014-09-29 13:47:08 -070013import org.onlab.onos.cluster.ClusterEvent;
14import org.onlab.onos.cluster.ClusterStore;
15import org.onlab.onos.cluster.ClusterStoreDelegate;
16import org.onlab.onos.cluster.ControllerNode;
17import org.onlab.onos.cluster.DefaultControllerNode;
18import org.onlab.onos.cluster.NodeId;
19import org.onlab.onos.store.AbstractStore;
Madan Jampani890bc352014-10-01 22:35:29 -070020import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
tom73094832014-09-29 13:47:08 -070021import org.onlab.packet.IpPrefix;
22import org.slf4j.Logger;
23import org.slf4j.LoggerFactory;
24
25import java.io.IOException;
tom73094832014-09-29 13:47:08 -070026import java.util.Map;
27import java.util.Set;
tom73094832014-09-29 13:47:08 -070028import java.util.concurrent.ConcurrentHashMap;
Madan Jampani890bc352014-10-01 22:35:29 -070029import java.util.concurrent.TimeUnit;
tom73094832014-09-29 13:47:08 -070030
tom73094832014-09-29 13:47:08 -070031import static org.onlab.onos.cluster.ControllerNode.State;
32import static org.onlab.packet.IpPrefix.valueOf;
tom73094832014-09-29 13:47:08 -070033
34/**
35 * Distributed implementation of the cluster nodes store.
36 */
Yuta HIGUCHI3215ebd2014-10-07 14:24:37 -070037//@Component(immediate = true)
38//@Service
tom73094832014-09-29 13:47:08 -070039public class DistributedClusterStore
40 extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
41 implements ClusterStore {
42
tom73094832014-09-29 13:47:08 -070043 private final Logger log = LoggerFactory.getLogger(getClass());
44
tom1d416c52014-09-29 20:55:24 -070045 private DefaultControllerNode localNode;
tom73094832014-09-29 13:47:08 -070046 private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
47 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
Madan Jampani890bc352014-10-01 22:35:29 -070048 private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
49 .maximumSize(1000)
Yuta HIGUCHIbbfc96a2014-10-13 18:05:44 -070050 .expireAfterWrite(/*ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * */3, TimeUnit.MILLISECONDS)
Madan Jampani890bc352014-10-01 22:35:29 -070051 .removalListener(new LivenessCacheRemovalListener()).build();
tom73094832014-09-29 13:47:08 -070052
tom1d416c52014-09-29 20:55:24 -070053 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Madan Jampani890bc352014-10-01 22:35:29 -070054 private ClusterCommunicationAdminService clusterCommunicationAdminService;
tom73094832014-09-29 13:47:08 -070055
tom1d416c52014-09-29 20:55:24 -070056 private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
tom73094832014-09-29 13:47:08 -070057
58 @Activate
Madan Jampani890bc352014-10-01 22:35:29 -070059 public void activate() throws IOException {
tom73094832014-09-29 13:47:08 -070060 loadClusterDefinition();
tom1d416c52014-09-29 20:55:24 -070061 establishSelfIdentity();
tom81583142014-09-30 01:40:29 -070062
63 // Start-up the comm service and prime it with the loaded nodes.
Madan Jampani890bc352014-10-01 22:35:29 -070064 clusterCommunicationAdminService.initialize(localNode, nodesDelegate);
tom81583142014-09-30 01:40:29 -070065 for (DefaultControllerNode node : nodes.values()) {
Madan Jampani890bc352014-10-01 22:35:29 -070066 clusterCommunicationAdminService.addNode(node);
tom81583142014-09-30 01:40:29 -070067 }
tom73094832014-09-29 13:47:08 -070068 log.info("Started");
69 }
70
71 @Deactivate
72 public void deactivate() {
tom73094832014-09-29 13:47:08 -070073 log.info("Stopped");
74 }
75
tom1d416c52014-09-29 20:55:24 -070076 /**
77 * Loads the cluster definition file.
78 */
tom73094832014-09-29 13:47:08 -070079 private void loadClusterDefinition() {
tom1d416c52014-09-29 20:55:24 -070080 ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
tom73094832014-09-29 13:47:08 -070081 try {
tom1d416c52014-09-29 20:55:24 -070082 Set<DefaultControllerNode> storedNodes = cds.read();
83 for (DefaultControllerNode node : storedNodes) {
84 nodes.put(node.id(), node);
tom73094832014-09-29 13:47:08 -070085 }
86 } catch (IOException e) {
tom1d416c52014-09-29 20:55:24 -070087 log.error("Unable to read cluster definitions", e);
tom73094832014-09-29 13:47:08 -070088 }
89 }
90
91 /**
tom1d416c52014-09-29 20:55:24 -070092 * Determines who the local controller node is.
tom73094832014-09-29 13:47:08 -070093 */
tom1d416c52014-09-29 20:55:24 -070094 private void establishSelfIdentity() {
95 // Establishes the controller's own identity.
96 IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
97 localNode = nodes.get(new NodeId(ip.toString()));
tom73094832014-09-29 13:47:08 -070098
tom1d416c52014-09-29 20:55:24 -070099 // As a fall-back, let's make sure we at least know who we are.
100 if (localNode == null) {
101 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
102 nodes.put(localNode.id(), localNode);
tom1d416c52014-09-29 20:55:24 -0700103 }
tom81583142014-09-30 01:40:29 -0700104 states.put(localNode.id(), State.ACTIVE);
tom73094832014-09-29 13:47:08 -0700105 }
106
107 @Override
108 public ControllerNode getLocalNode() {
tom1d416c52014-09-29 20:55:24 -0700109 return localNode;
tom73094832014-09-29 13:47:08 -0700110 }
111
112 @Override
113 public Set<ControllerNode> getNodes() {
114 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
115 return builder.addAll(nodes.values()).build();
116 }
117
118 @Override
119 public ControllerNode getNode(NodeId nodeId) {
120 return nodes.get(nodeId);
121 }
122
123 @Override
124 public State getState(NodeId nodeId) {
125 State state = states.get(nodeId);
126 return state == null ? State.INACTIVE : state;
127 }
128
129 @Override
130 public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
131 DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
132 nodes.put(nodeId, node);
Madan Jampani890bc352014-10-01 22:35:29 -0700133 clusterCommunicationAdminService.addNode(node);
tom73094832014-09-29 13:47:08 -0700134 return node;
135 }
136
137 @Override
138 public void removeNode(NodeId nodeId) {
tomd33e6402014-09-30 03:14:43 -0700139 if (nodeId.equals(localNode.id())) {
tomd33e6402014-09-30 03:14:43 -0700140 nodes.clear();
tom28e1fa22014-09-30 10:38:21 -0700141 nodes.put(localNode.id(), localNode);
142
tomd33e6402014-09-30 03:14:43 -0700143 } else {
144 // Remove the other node.
145 DefaultControllerNode node = nodes.remove(nodeId);
146 if (node != null) {
Madan Jampani890bc352014-10-01 22:35:29 -0700147 clusterCommunicationAdminService.removeNode(node);
tomd33e6402014-09-30 03:14:43 -0700148 }
tom5a8779c2014-09-29 14:48:43 -0700149 }
tom73094832014-09-29 13:47:08 -0700150 }
151
tom1d416c52014-09-29 20:55:24 -0700152 // Entity to handle back calls from the connection manager.
153 private class InnerNodesDelegate implements ClusterNodesDelegate {
154 @Override
tom81583142014-09-30 01:40:29 -0700155 public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
156 DefaultControllerNode node = nodes.get(nodeId);
157 if (node == null) {
158 node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
159 }
160 states.put(nodeId, State.ACTIVE);
Madan Jampani890bc352014-10-01 22:35:29 -0700161 livenessCache.put(nodeId, node);
tom81583142014-09-30 01:40:29 -0700162 return node;
tom73094832014-09-29 13:47:08 -0700163 }
tom28e1fa22014-09-30 10:38:21 -0700164
tom73094832014-09-29 13:47:08 -0700165 @Override
tom81583142014-09-30 01:40:29 -0700166 public void nodeVanished(NodeId nodeId) {
167 states.put(nodeId, State.INACTIVE);
tom73094832014-09-29 13:47:08 -0700168 }
tomd33e6402014-09-30 03:14:43 -0700169
170 @Override
171 public void nodeRemoved(NodeId nodeId) {
172 removeNode(nodeId);
173 }
tom73094832014-09-29 13:47:08 -0700174 }
tom81583142014-09-30 01:40:29 -0700175
Madan Jampani890bc352014-10-01 22:35:29 -0700176 private class LivenessCacheRemovalListener implements RemovalListener<NodeId, ControllerNode> {
177
178 @Override
179 public void onRemoval(RemovalNotification<NodeId, ControllerNode> entry) {
180 NodeId nodeId = entry.getKey();
181 log.warn("Failed to receive heartbeats from controller: " + nodeId);
182 nodesDelegate.nodeVanished(nodeId);
183 }
184 }
tom73094832014-09-29 13:47:08 -0700185}