blob: 4d127235cfe2c125e8a42d9186dc607846f3c0b0 [file] [log] [blame]
package org.onosproject.store.cluster.impl;
import static com.hazelcast.util.AddressUtil.matchInterface;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static java.util.Collections.list;
import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.util.Enumeration;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterDefinitionService;
import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.consistent.impl.DatabaseDefinition;
import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.hazelcast.util.AddressUtil;
/**
* Implementation of ClusterDefinitionService.
*/
@Component(immediate = true)
@Service
public class ClusterDefinitionManager implements ClusterDefinitionService {
public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
private static final String ONOS_NIC = "ONOS_NIC";
private static final Logger log = getLogger(ClusterDefinitionManager.class);
private ControllerNode localNode;
private Set<ControllerNode> seedNodes;
@Activate
public void activate() {
File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
ClusterDefinitionStore clusterDefinitionStore =
new ClusterDefinitionStore(clusterDefinitionFile.getPath());
if (!clusterDefinitionFile.exists()) {
createDefaultClusterDefinition(clusterDefinitionStore);
}
try {
ClusterDefinition clusterDefinition = clusterDefinitionStore.read();
establishSelfIdentity(clusterDefinition);
seedNodes = ImmutableSet
.copyOf(clusterDefinition.getNodes())
.stream()
.filter(n -> !localNode.id().equals(new NodeId(n.getId())))
.map(n -> new DefaultControllerNode(new NodeId(n.getId()),
IpAddress.valueOf(n.getIp()),
n.getTcpPort()))
.collect(Collectors.toSet());
} catch (IOException e) {
throw new IllegalStateException("Failed to read cluster definition.", e);
}
log.info("Started");
}
@Deactivate
public void deactivate() {
log.info("Stopped");
}
@Override
public ControllerNode localNode() {
return localNode;
}
@Override
public Set<ControllerNode> seedNodes() {
return seedNodes;
}
@Override
public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
try {
Set<NodeInfo> infos = Sets.newHashSet();
nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
n.ip().toString(),
n.tcpPort())));
ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
DatabaseDefinition ddef = DatabaseDefinition.from(infos);
new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
} catch (IOException e) {
log.error("Unable to form cluster", e);
}
}
private IpAddress findLocalIp(ClusterDefinition clusterDefinition) throws SocketException {
Enumeration<NetworkInterface> interfaces =
NetworkInterface.getNetworkInterfaces();
while (interfaces.hasMoreElements()) {
NetworkInterface iface = interfaces.nextElement();
Enumeration<InetAddress> inetAddresses = iface.getInetAddresses();
while (inetAddresses.hasMoreElements()) {
IpAddress ip = IpAddress.valueOf(inetAddresses.nextElement());
if (AddressUtil.matchInterface(ip.toString(), clusterDefinition.getIpPrefix())) {
return ip;
}
}
}
throw new IllegalStateException("Unable to determine local ip");
}
private void establishSelfIdentity(ClusterDefinition clusterDefinition) {
try {
IpAddress ip = findLocalIp(clusterDefinition);
localNode = new DefaultControllerNode(new NodeId(ip.toString()), ip);
} catch (SocketException e) {
throw new IllegalStateException("Cannot determine local IP", e);
}
}
private void createDefaultClusterDefinition(ClusterDefinitionStore store) {
// Assumes IPv4 is returned.
String ip = getSiteLocalAddress();
String ipPrefix = ip.replaceFirst("\\.[0-9]*$", ".*");
NodeInfo node = NodeInfo.from(ip, ip, DEFAULT_PORT);
try {
store.write(ClusterDefinition.from(ImmutableSet.of(node), ipPrefix));
} catch (IOException e) {
log.warn("Unable to write default cluster definition", e);
}
}
/**
* Returns the address that matches the IP prefix given in ONOS_NIC
* environment variable if one was specified, or the first site local
* address if one can be found or the loopback address otherwise.
*
* @return site-local address in string form
*/
public static String getSiteLocalAddress() {
try {
String ipPrefix = System.getenv(ONOS_NIC);
for (NetworkInterface nif : list(getNetworkInterfaces())) {
for (InetAddress address : list(nif.getInetAddresses())) {
IpAddress ip = IpAddress.valueOf(address);
if (ipPrefix == null && address.isSiteLocalAddress() ||
ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
return ip.toString();
}
}
}
} catch (SocketException e) {
log.error("Unable to get network interfaces", e);
}
return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
}