Added ability to form a cluster via REST API.
Change-Id: Ib71f6b4caed1b1c4b9db78596ee35bf5cab05184
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 0472cff..f5bd773 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.hazelcast.util.AddressUtil;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -37,6 +38,8 @@
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.AbstractStore;
+import org.onosproject.store.consistent.impl.DatabaseDefinition;
+import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
@@ -55,11 +58,13 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.hazelcast.util.AddressUtil.matchInterface;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static java.util.Collections.list;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
+import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@@ -74,17 +79,14 @@
private static final Logger log = getLogger(DistributedClusterStore.class);
+ public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
+ public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
+
// TODO: make these configurable.
private static final int HEARTBEAT_FD_PORT = 2419;
private static final int HEARTBEAT_INTERVAL_MS = 100;
private static final int PHI_FAILURE_THRESHOLD = 10;
- private static final String CONFIG_DIR = "../config";
- private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
- private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
-
- public static final int DEFAULT_PORT = 9876;
-
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -97,6 +99,8 @@
};
private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+ private static final byte SITE_LOCAL_BYTE = (byte) 0xC0;
+ private static final String ONOS_NIC = "ONOS_NIC";
private ClusterDefinition clusterDefinition;
@@ -116,7 +120,7 @@
@Activate
public void activate() {
- File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
+ File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
ClusterDefinitionStore clusterDefinitionStore =
new ClusterDefinitionStore(clusterDefinitionFile.getPath());
@@ -129,13 +133,12 @@
seedNodes = ImmutableSet
.copyOf(clusterDefinition.getNodes())
.stream()
- .map(nodeInfo -> new DefaultControllerNode(new NodeId(nodeInfo.getId()),
- IpAddress.valueOf(nodeInfo.getIp()),
- nodeInfo.getTcpPort()))
+ .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
+ IpAddress.valueOf(n.getIp()),
+ n.getTcpPort()))
.collect(Collectors.toSet());
} catch (IOException e) {
- throw new IllegalStateException(
- "Failed to read cluster definition.", e);
+ throw new IllegalStateException("Failed to read cluster definition.", e);
}
seedNodes.forEach(node -> {
@@ -179,26 +182,30 @@
}
/**
- * Returns the site local address if one can be found, loopback otherwise.
+ * Returns the address that matches the IP prefix given in ONOS_NIC
+ * environment variable if one was specified, or the first site local
+ * address if one can be found or the loopback address otherwise.
*
* @return site-local address in string form
*/
public static String getSiteLocalAddress() {
try {
+ String ipPrefix = System.getenv(ONOS_NIC);
for (NetworkInterface nif : list(getNetworkInterfaces())) {
for (InetAddress address : list(nif.getInetAddresses())) {
- if (address.getAddress()[0] == (byte) 0xC0) {
- return address.toString().substring(1);
+ IpAddress ip = IpAddress.valueOf(address);
+ if (ipPrefix == null && address.isSiteLocalAddress() ||
+ ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
+ return ip.toString();
}
}
}
- return InetAddress.getLoopbackAddress().toString().substring(1);
} catch (SocketException e) {
log.error("Unable to get network interfaces", e);
}
- return null;
+ return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
@Deactivate
@@ -255,9 +262,6 @@
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- checkNotNull(ip, "IP address must not be null");
- checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
allNodes.put(node.id(), node);
updateState(nodeId, State.INACTIVE);
@@ -275,6 +279,24 @@
}
}
+ @Override
+ public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
+ try {
+ Set<NodeInfo> infos = Sets.newHashSet();
+ nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
+ n.ip().toString(),
+ n.tcpPort())));
+
+ ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
+ new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
+
+ DatabaseDefinition ddef = DatabaseDefinition.from(infos);
+ new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
+ } catch (IOException e) {
+ log.error("Unable to form cluster", e);
+ }
+ }
+
private void updateState(NodeId nodeId, State newState) {
nodeStates.put(nodeId, newState);
nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
@@ -387,4 +409,5 @@
public DateTime getLastUpdated(NodeId nodeId) {
return nodeStateLastUpdatedTimes.get(nodeId);
}
+
}