blob: ae04226a3511571d22cbc7b7b27fb70db28039ca [file] [log] [blame]
tom73094832014-09-29 13:47:08 -07001package org.onlab.onos.store.cluster.impl;
2
3import com.google.common.collect.ImmutableSet;
4import org.apache.felix.scr.annotations.Activate;
5import org.apache.felix.scr.annotations.Component;
6import org.apache.felix.scr.annotations.Deactivate;
tom1d416c52014-09-29 20:55:24 -07007import org.apache.felix.scr.annotations.Reference;
8import org.apache.felix.scr.annotations.ReferenceCardinality;
tom73094832014-09-29 13:47:08 -07009import org.apache.felix.scr.annotations.Service;
tom73094832014-09-29 13:47:08 -070010import org.onlab.onos.cluster.ClusterEvent;
11import org.onlab.onos.cluster.ClusterStore;
12import org.onlab.onos.cluster.ClusterStoreDelegate;
13import org.onlab.onos.cluster.ControllerNode;
14import org.onlab.onos.cluster.DefaultControllerNode;
15import org.onlab.onos.cluster.NodeId;
16import org.onlab.onos.store.AbstractStore;
tom1d416c52014-09-29 20:55:24 -070017import org.onlab.onos.store.cluster.messaging.SerializationService;
tom73094832014-09-29 13:47:08 -070018import org.onlab.packet.IpPrefix;
19import org.slf4j.Logger;
20import org.slf4j.LoggerFactory;
21
22import java.io.IOException;
tom73094832014-09-29 13:47:08 -070023import java.util.Map;
24import java.util.Set;
tom73094832014-09-29 13:47:08 -070025import java.util.concurrent.ConcurrentHashMap;
tom73094832014-09-29 13:47:08 -070026
tom73094832014-09-29 13:47:08 -070027import static org.onlab.onos.cluster.ControllerNode.State;
28import static org.onlab.packet.IpPrefix.valueOf;
tom73094832014-09-29 13:47:08 -070029
30/**
31 * Distributed implementation of the cluster nodes store.
32 */
33@Component(immediate = true)
34@Service
35public class DistributedClusterStore
36 extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
37 implements ClusterStore {
38
tom73094832014-09-29 13:47:08 -070039 private final Logger log = LoggerFactory.getLogger(getClass());
40
tom1d416c52014-09-29 20:55:24 -070041 private DefaultControllerNode localNode;
tom73094832014-09-29 13:47:08 -070042 private final Map<NodeId, DefaultControllerNode> nodes = new ConcurrentHashMap<>();
43 private final Map<NodeId, State> states = new ConcurrentHashMap<>();
44
tom1d416c52014-09-29 20:55:24 -070045 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
46 private CommunicationsDelegate commsDelegate;
tom73094832014-09-29 13:47:08 -070047
tom1d416c52014-09-29 20:55:24 -070048 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
49 private SerializationService serializationService;
tom73094832014-09-29 13:47:08 -070050
tom1d416c52014-09-29 20:55:24 -070051 private final ClusterNodesDelegate nodesDelegate = new InnerNodesDelegate();
52 private ConnectionManager connectionManager;
tom73094832014-09-29 13:47:08 -070053
54 @Activate
55 public void activate() {
56 loadClusterDefinition();
tom1d416c52014-09-29 20:55:24 -070057 establishSelfIdentity();
58 connectionManager = new ConnectionManager(localNode, nodesDelegate,
59 commsDelegate, serializationService);
tom73094832014-09-29 13:47:08 -070060 log.info("Started");
61 }
62
63 @Deactivate
64 public void deactivate() {
tom73094832014-09-29 13:47:08 -070065 log.info("Stopped");
66 }
67
tom1d416c52014-09-29 20:55:24 -070068 /**
69 * Loads the cluster definition file.
70 */
tom73094832014-09-29 13:47:08 -070071 private void loadClusterDefinition() {
tom1d416c52014-09-29 20:55:24 -070072 ClusterDefinitionStore cds = new ClusterDefinitionStore("../config/cluster.json");
tom73094832014-09-29 13:47:08 -070073 try {
tom1d416c52014-09-29 20:55:24 -070074 Set<DefaultControllerNode> storedNodes = cds.read();
75 for (DefaultControllerNode node : storedNodes) {
76 nodes.put(node.id(), node);
tom73094832014-09-29 13:47:08 -070077 }
78 } catch (IOException e) {
tom1d416c52014-09-29 20:55:24 -070079 log.error("Unable to read cluster definitions", e);
tom73094832014-09-29 13:47:08 -070080 }
81 }
82
83 /**
tom1d416c52014-09-29 20:55:24 -070084 * Determines who the local controller node is.
tom73094832014-09-29 13:47:08 -070085 */
tom1d416c52014-09-29 20:55:24 -070086 private void establishSelfIdentity() {
87 // Establishes the controller's own identity.
88 IpPrefix ip = valueOf(System.getProperty("onos.ip", "127.0.1.1"));
89 localNode = nodes.get(new NodeId(ip.toString()));
tom73094832014-09-29 13:47:08 -070090
tom1d416c52014-09-29 20:55:24 -070091 // As a fall-back, let's make sure we at least know who we are.
92 if (localNode == null) {
93 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
94 nodes.put(localNode.id(), localNode);
95 states.put(localNode.id(), State.ACTIVE);
96 }
tom73094832014-09-29 13:47:08 -070097 }
98
99 @Override
100 public ControllerNode getLocalNode() {
tom1d416c52014-09-29 20:55:24 -0700101 return localNode;
tom73094832014-09-29 13:47:08 -0700102 }
103
104 @Override
105 public Set<ControllerNode> getNodes() {
106 ImmutableSet.Builder<ControllerNode> builder = ImmutableSet.builder();
107 return builder.addAll(nodes.values()).build();
108 }
109
110 @Override
111 public ControllerNode getNode(NodeId nodeId) {
112 return nodes.get(nodeId);
113 }
114
115 @Override
116 public State getState(NodeId nodeId) {
117 State state = states.get(nodeId);
118 return state == null ? State.INACTIVE : state;
119 }
120
121 @Override
122 public ControllerNode addNode(NodeId nodeId, IpPrefix ip, int tcpPort) {
123 DefaultControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
124 nodes.put(nodeId, node);
tom1d416c52014-09-29 20:55:24 -0700125 connectionManager.addNode(node);
tom73094832014-09-29 13:47:08 -0700126 return node;
127 }
128
129 @Override
130 public void removeNode(NodeId nodeId) {
tom1d416c52014-09-29 20:55:24 -0700131 DefaultControllerNode node = nodes.remove(nodeId);
132 if (node != null) {
133 connectionManager.removeNode(node);
tom5a8779c2014-09-29 14:48:43 -0700134 }
tom73094832014-09-29 13:47:08 -0700135 }
136
tom1d416c52014-09-29 20:55:24 -0700137 // Entity to handle back calls from the connection manager.
138 private class InnerNodesDelegate implements ClusterNodesDelegate {
139 @Override
140 public void nodeDetected(DefaultControllerNode node) {
141 nodes.put(node.id(), node);
142 states.put(node.id(), State.ACTIVE);
tom73094832014-09-29 13:47:08 -0700143 }
144
145 @Override
tom1d416c52014-09-29 20:55:24 -0700146 public void nodeVanished(DefaultControllerNode node) {
147 states.put(node.id(), State.INACTIVE);
tom73094832014-09-29 13:47:08 -0700148 }
149 }
tom73094832014-09-29 13:47:08 -0700150}