Added ability to form a cluster via REST API.
Change-Id: Ib71f6b4caed1b1c4b9db78596ee35bf5cab05184
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionStore.java
index a522a85..e3c521e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionStore.java
@@ -18,6 +18,8 @@
import static com.google.common.base.Preconditions.checkNotNull;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.Files;
+
import java.io.File;
import java.io.IOException;
@@ -43,8 +45,7 @@
*/
public ClusterDefinition read() throws IOException {
ObjectMapper mapper = new ObjectMapper();
- ClusterDefinition definition = mapper.readValue(file, ClusterDefinition.class);
- return definition;
+ return mapper.readValue(file, ClusterDefinition.class);
}
/**
@@ -55,7 +56,8 @@
public void write(ClusterDefinition definition) throws IOException {
checkNotNull(definition);
// write back to file
- final ObjectMapper mapper = new ObjectMapper();
+ Files.createParentDirs(file);
+ ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(file, definition);
}
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 0472cff..f5bd773 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.hazelcast.util.AddressUtil;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -37,6 +38,8 @@
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.AbstractStore;
+import org.onosproject.store.consistent.impl.DatabaseDefinition;
+import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
@@ -55,11 +58,13 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.hazelcast.util.AddressUtil.matchInterface;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static java.util.Collections.list;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
+import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@@ -74,17 +79,14 @@
private static final Logger log = getLogger(DistributedClusterStore.class);
+ public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
+ public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
+
// TODO: make these configurable.
private static final int HEARTBEAT_FD_PORT = 2419;
private static final int HEARTBEAT_INTERVAL_MS = 100;
private static final int PHI_FAILURE_THRESHOLD = 10;
- private static final String CONFIG_DIR = "../config";
- private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
- private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
-
- public static final int DEFAULT_PORT = 9876;
-
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -97,6 +99,8 @@
};
private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+ private static final byte SITE_LOCAL_BYTE = (byte) 0xC0;
+ private static final String ONOS_NIC = "ONOS_NIC";
private ClusterDefinition clusterDefinition;
@@ -116,7 +120,7 @@
@Activate
public void activate() {
- File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
+ File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
ClusterDefinitionStore clusterDefinitionStore =
new ClusterDefinitionStore(clusterDefinitionFile.getPath());
@@ -129,13 +133,12 @@
seedNodes = ImmutableSet
.copyOf(clusterDefinition.getNodes())
.stream()
- .map(nodeInfo -> new DefaultControllerNode(new NodeId(nodeInfo.getId()),
- IpAddress.valueOf(nodeInfo.getIp()),
- nodeInfo.getTcpPort()))
+ .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
+ IpAddress.valueOf(n.getIp()),
+ n.getTcpPort()))
.collect(Collectors.toSet());
} catch (IOException e) {
- throw new IllegalStateException(
- "Failed to read cluster definition.", e);
+ throw new IllegalStateException("Failed to read cluster definition.", e);
}
seedNodes.forEach(node -> {
@@ -179,26 +182,30 @@
}
/**
- * Returns the site local address if one can be found, loopback otherwise.
+ * Returns the address that matches the IP prefix given in ONOS_NIC
+ * environment variable if one was specified, or the first site local
+ * address if one can be found or the loopback address otherwise.
*
* @return site-local address in string form
*/
public static String getSiteLocalAddress() {
try {
+ String ipPrefix = System.getenv(ONOS_NIC);
for (NetworkInterface nif : list(getNetworkInterfaces())) {
for (InetAddress address : list(nif.getInetAddresses())) {
- if (address.getAddress()[0] == (byte) 0xC0) {
- return address.toString().substring(1);
+ IpAddress ip = IpAddress.valueOf(address);
+ if (ipPrefix == null && address.isSiteLocalAddress() ||
+ ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
+ return ip.toString();
}
}
}
- return InetAddress.getLoopbackAddress().toString().substring(1);
} catch (SocketException e) {
log.error("Unable to get network interfaces", e);
}
- return null;
+ return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
@Deactivate
@@ -255,9 +262,6 @@
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- checkNotNull(ip, "IP address must not be null");
- checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
allNodes.put(node.id(), node);
updateState(nodeId, State.INACTIVE);
@@ -275,6 +279,24 @@
}
}
+ @Override
+ public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
+ try {
+ Set<NodeInfo> infos = Sets.newHashSet();
+ nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
+ n.ip().toString(),
+ n.tcpPort())));
+
+ ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
+ new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
+
+ DatabaseDefinition ddef = DatabaseDefinition.from(infos);
+ new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
+ } catch (IOException e) {
+ log.error("Unable to form cluster", e);
+ }
+ }
+
private void updateState(NodeId nodeId, State newState) {
nodeStates.put(nodeId, newState);
nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
@@ -387,4 +409,5 @@
public DateTime getLastUpdated(NodeId nodeId) {
return nodeStateLastUpdatedTimes.get(nodeId);
}
+
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
index 55d877d..827fec2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
@@ -24,12 +24,12 @@
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
+import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
@@ -39,7 +39,6 @@
import org.onosproject.store.hz.AbsentInvalidatingLoadingCache;
import org.onosproject.store.hz.AbstractHazelcastStore;
import org.onosproject.store.hz.OptionalCacheLoader;
-import org.onlab.packet.IpAddress;
import java.util.Map;
import java.util.Set;
@@ -131,6 +130,11 @@
}
@Override
+ public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
+ throw new UnsupportedOperationException("formCluster not implemented");
+ }
+
+ @Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinition.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinition.java
index 46754e5..11b56c1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinition.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinition.java
@@ -15,16 +15,20 @@
*/
package org.onosproject.store.consistent.impl;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.onosproject.store.cluster.impl.NodeInfo;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.onosproject.store.cluster.impl.NodeInfo;
-
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
/**
* Partitioned database configuration.
*/
@@ -34,11 +38,13 @@
/**
* Creates a new DatabaseDefinition.
+ *
* @param partitions partition map
- * @param nodes set of nodes
+ * @param nodes set of nodes
* @return database definition
*/
- public static DatabaseDefinition from(Map<String, Set<NodeInfo>> partitions, Set<NodeInfo> nodes) {
+ public static DatabaseDefinition from(Map<String, Set<NodeInfo>> partitions,
+ Set<NodeInfo> nodes) {
checkNotNull(partitions);
checkNotNull(nodes);
DatabaseDefinition definition = new DatabaseDefinition();
@@ -48,7 +54,18 @@
}
/**
+ * Creates a new DatabaseDefinition using default partitions.
+ *
+ * @param nodes set of nodes
+ * @return database definition
+ */
+ public static DatabaseDefinition from(Set<NodeInfo> nodes) {
+ return from(generateDefaultPartitions(nodes), nodes);
+ }
+
+ /**
* Returns the map of database partitions.
+ *
* @return db partition map
*/
public Map<String, Set<NodeInfo>> getPartitions() {
@@ -57,9 +74,35 @@
/**
* Returns the set of nodes.
+ *
* @return nodes
*/
public Set<NodeInfo> getNodes() {
return nodes;
}
+
+
+ /**
+ * Generates set of default partitions using permutations of the nodes.
+ *
+ * @param nodes information about cluster nodes
+ * @return default partition map
+ */
+ private static Map<String, Set<NodeInfo>> generateDefaultPartitions(Set<NodeInfo> nodes) {
+ List<NodeInfo> sorted = new ArrayList<>(nodes);
+ Collections.sort(sorted, (o1, o2) -> o1.getId().compareTo(o2.getId()));
+ Map<String, Set<NodeInfo>> partitions = Maps.newHashMap();
+
+ int length = nodes.size();
+ int count = 3;
+ for (int i = 0; i < length; i++) {
+ Set<NodeInfo> set = new HashSet<>(count);
+ for (int j = 0; j < count; j++) {
+ set.add(sorted.get((i + j) % length));
+ }
+ partitions.put("p" + (i + 1), set);
+ }
+ return partitions;
+ }
+
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java
index e54fe3c..b77667b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java
@@ -20,13 +20,14 @@
import java.io.File;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.Files;
/**
* Allows for reading and writing partitioned database definition as a JSON file.
*/
public class DatabaseDefinitionStore {
- private final File definitionfile;
+ private final File file;
/**
* Creates a reader/writer of the database definition file.
@@ -34,7 +35,7 @@
* @param filePath location of the definition file
*/
public DatabaseDefinitionStore(String filePath) {
- definitionfile = new File(checkNotNull(filePath));
+ file = new File(checkNotNull(filePath));
}
/**
@@ -43,7 +44,7 @@
* @param filePath location of the definition file
*/
public DatabaseDefinitionStore(File filePath) {
- definitionfile = checkNotNull(filePath);
+ file = checkNotNull(filePath);
}
/**
@@ -54,8 +55,7 @@
*/
public DatabaseDefinition read() throws IOException {
ObjectMapper mapper = new ObjectMapper();
- DatabaseDefinition definition = mapper.readValue(definitionfile, DatabaseDefinition.class);
- return definition;
+ return mapper.readValue(file, DatabaseDefinition.class);
}
/**
@@ -67,7 +67,8 @@
public void write(DatabaseDefinition definition) throws IOException {
checkNotNull(definition);
// write back to file
- final ObjectMapper mapper = new ObjectMapper();
- mapper.writeValue(definitionfile, definition);
+ Files.createParentDirs(file);
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.writeValue(file, definition);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index eb5a70f..b939780 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -16,11 +16,9 @@
package org.onosproject.store.consistent.impl;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import net.kuujo.copycat.CopycatConfig;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.Member;
@@ -34,7 +32,6 @@
import net.kuujo.copycat.protocol.Consistency;
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -75,16 +72,18 @@
public class DatabaseManager implements StorageService, StorageAdminService {
private final Logger log = getLogger(getClass());
+
+ public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT
+ public static final String PARTITION_DEFINITION_FILE = "../config/tablets.json";
+ public static final String BASE_PARTITION_NAME = "p0";
+
+ private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60;
+ private static final int RAFT_ELECTION_TIMEOUT = 3000;
+ private static final int RAFT_HEARTBEAT_TIMEOUT = 1500;
+
private ClusterCoordinator coordinator;
private PartitionedDatabase partitionedDatabase;
private Database inMemoryDatabase;
- public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT
- private static final String CONFIG_DIR = "../config";
- private static final String PARTITION_DEFINITION_FILE = "tablets.json";
- private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60;
- public static final String BASE_PARTITION_NAME = "p0";
- private static final int RAFT_ELECTION_TIMEOUT = 3000;
- private static final int RAFT_HEARTBEAT_TIMEOUT = 1500;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -98,15 +97,14 @@
@Activate
public void activate() {
-
// load database configuration
- File file = new File(CONFIG_DIR, PARTITION_DEFINITION_FILE);
- log.info("Loading database definition: {}", file.getAbsolutePath());
+ File databaseDefFile = new File(PARTITION_DEFINITION_FILE);
+ log.info("Loading database definition: {}", databaseDefFile.getAbsolutePath());
Map<String, Set<NodeInfo>> partitionMap;
try {
- DatabaseDefinitionStore databaseDefStore = new DatabaseDefinitionStore(file);
- if (!file.exists()) {
+ DatabaseDefinitionStore databaseDefStore = new DatabaseDefinitionStore(databaseDefFile);
+ if (!databaseDefFile.exists()) {
createDefaultDatabaseDefinition(databaseDefStore);
}
partitionMap = databaseDefStore.read().getPartitions();
@@ -189,10 +187,9 @@
private void createDefaultDatabaseDefinition(DatabaseDefinitionStore store) {
// Assumes IPv4 is returned.
String ip = DistributedClusterStore.getSiteLocalAddress();
- NodeInfo node = NodeInfo.from(ip, ip, DistributedClusterStore.DEFAULT_PORT);
+ NodeInfo node = NodeInfo.from(ip, ip, COPYCAT_TCP_PORT);
try {
- store.write(DatabaseDefinition.from(ImmutableMap.of("p1", ImmutableSet.of(node)),
- ImmutableSet.of(node)));
+ store.write(DatabaseDefinition.from(ImmutableSet.of(node)));
} catch (IOException e) {
log.warn("Unable to write default cluster definition", e);
}