blob: 025804ae8f34b5723f89fba5233f5eedebf30e4d [file] [log] [blame]
tom73094832014-09-29 13:47:08 -07001package org.onlab.onos.store.cluster.impl;
2
3import com.google.common.collect.ImmutableSet;
4import org.apache.felix.scr.annotations.Activate;
5import org.apache.felix.scr.annotations.Component;
6import org.apache.felix.scr.annotations.Deactivate;
tom1d416c52014-09-29 20:55:24 -07007import org.apache.felix.scr.annotations.Reference;
8import org.apache.felix.scr.annotations.ReferenceCardinality;
tom73094832014-09-29 13:47:08 -07009import org.apache.felix.scr.annotations.Service;
tom73094832014-09-29 13:47:08 -070010import org.onlab.onos.cluster.ClusterEvent;
11import org.onlab.onos.cluster.ClusterStore;
12import org.onlab.onos.cluster.ClusterStoreDelegate;
13import org.onlab.onos.cluster.ControllerNode;
14import org.onlab.onos.cluster.DefaultControllerNode;
15import org.onlab.onos.cluster.NodeId;
16import org.onlab.onos.store.AbstractStore;
17import org.onlab.packet.IpPrefix;
18import org.slf4j.Logger;
19import org.slf4j.LoggerFactory;
20
21import java.io.IOException;
tom73094832014-09-29 13:47:08 -070022import java.util.Map;
23import java.util.Set;
tom73094832014-09-29 13:47:08 -070024import java.util.concurrent.ConcurrentHashMap;
tom73094832014-09-29 13:47:08 -070025
tom73094832014-09-29 13:47:08 -070026import static org.onlab.onos.cluster.ControllerNode.State;
27import static org.onlab.packet.IpPrefix.valueOf;
tom73094832014-09-29 13:47:08 -070028
29/**
30 * Distributed implementation of the cluster nodes store.
31 */
32@Component(immediate = true)
33@Service
34public class DistributedClusterStore
35 extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
36 implements ClusterStore {
37
tom73094832014-09-29 13:47:08 -070038 private final Logger log = LoggerFactory.getLogger(getClass());
39
tom1d416c52014-09-29 20:55:24 -070040 private DefaultControllerNode localNode;
tom73094832014-09-29 13:47:08 -070041 private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
42 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
43
tom1d416c52014-09-29 20:55:24 -070044 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
tom81583142014-09-30 01:40:29 -070045 private ClusterCommunicationAdminService communicationAdminService;
tom73094832014-09-29 13:47:08 -070046
tom1d416c52014-09-29 20:55:24 -070047 private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
tom73094832014-09-29 13:47:08 -070048
49 @Activate
50 public void activate() {
51 loadClusterDefinition();
tom1d416c52014-09-29 20:55:24 -070052 establishSelfIdentity();
tom81583142014-09-30 01:40:29 -070053
54 // Start-up the comm service and prime it with the loaded nodes.
55 communicationAdminService.startUp(localNode, nodesDelegate);
56 for (DefaultControllerNode node : nodes.values()) {
57 communicationAdminService.addNode(node);
58 }
tom73094832014-09-29 13:47:08 -070059 log.info("Started");
60 }
61
62 @Deactivate
63 public void deactivate() {
tom73094832014-09-29 13:47:08 -070064 log.info("Stopped");
65 }
66
tom1d416c52014-09-29 20:55:24 -070067 /**
68 * Loads the cluster definition file.
69 */
tom73094832014-09-29 13:47:08 -070070 private void loadClusterDefinition() {
tom1d416c52014-09-29 20:55:24 -070071 ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
tom73094832014-09-29 13:47:08 -070072 try {
tom1d416c52014-09-29 20:55:24 -070073 Set<DefaultControllerNode> storedNodes = cds.read();
74 for (DefaultControllerNode node : storedNodes) {
75 nodes.put(node.id(), node);
tom73094832014-09-29 13:47:08 -070076 }
77 } catch (IOException e) {
tom1d416c52014-09-29 20:55:24 -070078 log.error("Unable to read cluster definitions", e);
tom73094832014-09-29 13:47:08 -070079 }
80 }
81
82 /**
tom1d416c52014-09-29 20:55:24 -070083 * Determines who the local controller node is.
tom73094832014-09-29 13:47:08 -070084 */
tom1d416c52014-09-29 20:55:24 -070085 private void establishSelfIdentity() {
86 // Establishes the controller's own identity.
87 IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
88 localNode = nodes.get(new NodeId(ip.toString()));
tom73094832014-09-29 13:47:08 -070089
tom1d416c52014-09-29 20:55:24 -070090 // As a fall-back, let's make sure we at least know who we are.
91 if (localNode == null) {
92 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
93 nodes.put(localNode.id(), localNode);
tom1d416c52014-09-29 20:55:24 -070094 }
tom81583142014-09-30 01:40:29 -070095 states.put(localNode.id(), State.ACTIVE);
tom73094832014-09-29 13:47:08 -070096 }
97
98 @Override
99 public ControllerNode getLocalNode() {
tom1d416c52014-09-29 20:55:24 -0700100 return localNode;
tom73094832014-09-29 13:47:08 -0700101 }
102
103 @Override
104 public Set<ControllerNode> getNodes() {
105 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
106 return builder.addAll(nodes.values()).build();
107 }
108
109 @Override
110 public ControllerNode getNode(NodeId nodeId) {
111 return nodes.get(nodeId);
112 }
113
114 @Override
115 public State getState(NodeId nodeId) {
116 State state = states.get(nodeId);
117 return state == null ? State.INACTIVE : state;
118 }
119
120 @Override
121 public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
122 DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
123 nodes.put(nodeId, node);
tom81583142014-09-30 01:40:29 -0700124 communicationAdminService.addNode(node);
tom73094832014-09-29 13:47:08 -0700125 return node;
126 }
127
128 @Override
129 public void removeNode(NodeId nodeId) {
tom1d416c52014-09-29 20:55:24 -0700130 DefaultControllerNode node = nodes.remove(nodeId);
131 if (node != null) {
tom81583142014-09-30 01:40:29 -0700132 communicationAdminService.removeNode(node);
tom5a8779c2014-09-29 14:48:43 -0700133 }
tom73094832014-09-29 13:47:08 -0700134 }
135
tom1d416c52014-09-29 20:55:24 -0700136 // Entity to handle back calls from the connection manager.
137 private class InnerNodesDelegate implements ClusterNodesDelegate {
138 @Override
tom81583142014-09-30 01:40:29 -0700139 public DefaultControllerNode nodeDetected(NodeId nodeId, IpPrefix ip, int tcpPort) {
140 DefaultControllerNode node = nodes.get(nodeId);
141 if (node == null) {
142 node = (DefaultControllerNode) addNode(nodeId, ip, tcpPort);
143 }
144 states.put(nodeId, State.ACTIVE);
145 return node;
tom73094832014-09-29 13:47:08 -0700146 }
tom73094832014-09-29 13:47:08 -0700147 @Override
tom81583142014-09-30 01:40:29 -0700148 public void nodeVanished(NodeId nodeId) {
149 states.put(nodeId, State.INACTIVE);
tom73094832014-09-29 13:47:08 -0700150 }
151 }
tom81583142014-09-30 01:40:29 -0700152
tom73094832014-09-29 13:47:08 -0700153}