blob: 4e28e3c24fc79439baa62dd039d74a05c459476c [file] [log] [blame]
Madan Jampaniafeebbd2015-05-19 15:26:01 -07001package org.onosproject.store.cluster.impl;
2
Jonathan Hart052a6c52015-07-16 14:10:10 -07003import com.google.common.collect.ImmutableSet;
4import com.google.common.collect.Sets;
Madan Jampaniafeebbd2015-05-19 15:26:01 -07005import org.apache.felix.scr.annotations.Activate;
6import org.apache.felix.scr.annotations.Component;
7import org.apache.felix.scr.annotations.Deactivate;
8import org.apache.felix.scr.annotations.Service;
9import org.onlab.packet.IpAddress;
10import org.onosproject.cluster.ClusterDefinitionService;
11import org.onosproject.cluster.ControllerNode;
12import org.onosproject.cluster.DefaultControllerNode;
13import org.onosproject.cluster.NodeId;
14import org.onosproject.store.consistent.impl.DatabaseDefinition;
15import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
16import org.slf4j.Logger;
17
Jonathan Hart052a6c52015-07-16 14:10:10 -070018import java.io.File;
19import java.io.IOException;
20import java.net.InetAddress;
21import java.net.NetworkInterface;
22import java.net.SocketException;
23import java.util.Enumeration;
24import java.util.Set;
25import java.util.stream.Collectors;
26
27import static java.net.NetworkInterface.getNetworkInterfaces;
28import static java.util.Collections.list;
29import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
30import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
31import static org.slf4j.LoggerFactory.getLogger;
Madan Jampaniafeebbd2015-05-19 15:26:01 -070032
33/**
34 * Implementation of ClusterDefinitionService.
35 */
36@Component(immediate = true)
37@Service
38public class ClusterDefinitionManager implements ClusterDefinitionService {
39
40 public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
41 private static final String ONOS_NIC = "ONOS_NIC";
42 private static final Logger log = getLogger(ClusterDefinitionManager.class);
43 private ControllerNode localNode;
44 private Set<ControllerNode> seedNodes;
45
46 @Activate
47 public void activate() {
48 File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
49 ClusterDefinitionStore clusterDefinitionStore =
50 new ClusterDefinitionStore(clusterDefinitionFile.getPath());
51
52 if (!clusterDefinitionFile.exists()) {
53 createDefaultClusterDefinition(clusterDefinitionStore);
54 }
55
56 try {
57 ClusterDefinition clusterDefinition = clusterDefinitionStore.read();
58 establishSelfIdentity(clusterDefinition);
59 seedNodes = ImmutableSet
60 .copyOf(clusterDefinition.getNodes())
61 .stream()
62 .filter(n -> !localNode.id().equals(new NodeId(n.getId())))
63 .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
64 IpAddress.valueOf(n.getIp()),
65 n.getTcpPort()))
66 .collect(Collectors.toSet());
67 } catch (IOException e) {
68 throw new IllegalStateException("Failed to read cluster definition.", e);
69 }
70
71 log.info("Started");
72 }
73
74 @Deactivate
75 public void deactivate() {
76 log.info("Stopped");
77 }
78
79 @Override
80 public ControllerNode localNode() {
81 return localNode;
82 }
83
84 @Override
85 public Set<ControllerNode> seedNodes() {
86 return seedNodes;
87 }
88
89 @Override
90 public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
91 try {
92 Set<NodeInfo> infos = Sets.newHashSet();
93 nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
94 n.ip().toString(),
95 n.tcpPort())));
96
97 ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
98 new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
99
100 DatabaseDefinition ddef = DatabaseDefinition.from(infos);
101 new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
102 } catch (IOException e) {
103 log.error("Unable to form cluster", e);
104 }
105 }
106
107 private IpAddress findLocalIp(ClusterDefinition clusterDefinition) throws SocketException {
108 Enumeration<NetworkInterface> interfaces =
109 NetworkInterface.getNetworkInterfaces();
110 while (interfaces.hasMoreElements()) {
111 NetworkInterface iface = interfaces.nextElement();
112 Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
113 while (inetAddresses.hasMoreElements()) {
114 IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
Jonathan Hart052a6c52015-07-16 14:10:10 -0700115 if (clusterDefinition.getNodes().stream()
116 .map(NodeInfo::getIp)
117 .map(IpAddress::valueOf)
118 .anyMatch(nodeIp -> ip.equals(nodeIp))) {
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700119 return ip;
120 }
121 }
122 }
123 throw new IllegalStateException("Unable to determine local ip");
124 }
125
126 private void establishSelfIdentity(ClusterDefinition clusterDefinition) {
127 try {
128 IpAddress ip = findLocalIp(clusterDefinition);
129 localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
130 } catch (SocketException e) {
131 throw new IllegalStateException("Cannot determine local IP", e);
132 }
133 }
134
135 private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
136 // Assumes IPv4 is returned.
137 String ip = getSiteLocalAddress();
138 String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
139 NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
140 try {
141 store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
142 } catch (IOException e) {
143 log.warn("Unable to write default cluster definition", e);
144 }
145 }
146
147 /**
148 * Returns the address that matches the IP prefix given in ONOS_NIC
149 * environment variable if one was specified, or the first site local
150 * address if one can be found or the loopback address otherwise.
151 *
152 * @return site-local address in string form
153 */
154 public static String getSiteLocalAddress() {
155 try {
156 String ipPrefix = System.getenv(ONOS_NIC);
157 for (NetworkInterface nif : list(getNetworkInterfaces())) {
158 for (InetAddress address : list(nif.getInetAddresses())) {
159 IpAddress ip = IpAddress.valueOf(address);
160 if (ipPrefix == null && address.isSiteLocalAddress() ||
161 ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
162 return ip.toString();
163 }
164 }
165 }
166 } catch (SocketException e) {
167 log.error("Unable to get network interfaces", e);
168 }
169
170 return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
171 }
Thomas Vachuskaba082b82015-05-20 13:47:38 -0700172
173 // Indicates whether the specified interface address matches the given prefix.
174 // FIXME: Add a facility to IpPrefix to make this more robust
175 private static boolean matchInterface(String ip, String ipPrefix) {
176 String s = ipPrefix.replaceAll("\\.\\*", "");
177 return ip.startsWith(s);
178 }
Madan Jampaniafeebbd2015-05-19 15:26:01 -0700179}