Added ability to form a cluster via REST API.
Change-Id: Ib71f6b4caed1b1c4b9db78596ee35bf5cab05184
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java b/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
index e567540..c3eb3c1 100644
--- a/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterAdminService.java
@@ -17,12 +17,24 @@
import org.onlab.packet.IpAddress;
+import java.util.Set;
+
/**
* Service for administering the cluster node membership.
*/
public interface ClusterAdminService {
/**
+ * Forms cluster configuration based on the specified set of node
+ * information. This method resets and restarts the controller
+ * instance.
+ *
+ * @param nodes set of nodes that form the cluster
+ * @param ipPrefix IP address prefix, e.g. 10.0.1.*
+ */
+ void formCluster(Set<ControllerNode> nodes, String ipPrefix);
+
+ /**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
diff --git a/core/api/src/main/java/org/onosproject/cluster/ClusterStore.java b/core/api/src/main/java/org/onosproject/cluster/ClusterStore.java
index 4229e6a..cdb03b2 100644
--- a/core/api/src/main/java/org/onosproject/cluster/ClusterStore.java
+++ b/core/api/src/main/java/org/onosproject/cluster/ClusterStore.java
@@ -15,12 +15,12 @@
*/
package org.onosproject.cluster;
-import java.util.Set;
-
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.store.Store;
+import java.util.Set;
+
/**
* Manages inventory of controller cluster nodes; not intended for direct use.
*/
@@ -65,6 +65,16 @@
DateTime getLastUpdated(NodeId nodeId);
/**
+ * Forms cluster configuration based on the specified set of node
+ * information. Assumes subsequent restart for the new configuration to
+ * take hold.
+ *
+ * @param nodes set of nodes that form the cluster
+ * @param ipPrefix IP address prefix, e.g. 10.0.1.*
+ */
+ void formCluster(Set<ControllerNode> nodes, String ipPrefix);
+
+ /**
* Adds a new controller node to the cluster.
*
* @param nodeId controller node identifier
diff --git a/core/api/src/main/java/org/onosproject/cluster/DefaultControllerNode.java b/core/api/src/main/java/org/onosproject/cluster/DefaultControllerNode.java
index 31185dd..7d701d9 100644
--- a/core/api/src/main/java/org/onosproject/cluster/DefaultControllerNode.java
+++ b/core/api/src/main/java/org/onosproject/cluster/DefaultControllerNode.java
@@ -26,7 +26,7 @@
*/
public class DefaultControllerNode implements ControllerNode {
- private static final int DEFAULT_PORT = 9876;
+ public static final int DEFAULT_PORT = 9876;
private final NodeId id;
private final IpAddress ip;
diff --git a/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java b/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java
index 4abb5e6..1f9a20a 100644
--- a/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java
+++ b/core/common/src/main/java/org/onosproject/codec/impl/CodecManager.java
@@ -15,15 +15,13 @@
*/
package org.onosproject.codec.impl;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
+import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.Ethernet;
+import org.onosproject.cluster.ControllerNode;
import org.onosproject.codec.CodecService;
import org.onosproject.codec.JsonCodec;
import org.onosproject.core.Application;
@@ -50,7 +48,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
/**
* Implementation of the JSON codec brokering service.
@@ -67,6 +67,7 @@
public void activate() {
codecs.clear();
registerCodec(Application.class, new ApplicationCodec());
+ registerCodec(ControllerNode.class, new ControllerNodeCodec());
registerCodec(Annotations.class, new AnnotationsCodec());
registerCodec(Device.class, new DeviceCodec());
registerCodec(Port.class, new PortCodec());
diff --git a/core/common/src/main/java/org/onosproject/codec/impl/ControllerNodeCodec.java b/core/common/src/main/java/org/onosproject/codec/impl/ControllerNodeCodec.java
new file mode 100644
index 0000000..4b06ff0
--- /dev/null
+++ b/core/common/src/main/java/org/onosproject/codec/impl/ControllerNodeCodec.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.codec.impl;
+
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.codec.CodecContext;
+import org.onosproject.codec.JsonCodec;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
+
+/**
+ * Device JSON codec.
+ */
+public final class ControllerNodeCodec extends JsonCodec<ControllerNode> {
+
+ @Override
+ public ObjectNode encode(ControllerNode node, CodecContext context) {
+ checkNotNull(node, "Controller node cannot be null");
+ ClusterService service = context.get(ClusterService.class);
+ return context.mapper().createObjectNode()
+ .put("id", node.id().toString())
+ .put("ip", node.ip().toString())
+ .put("tcpPort", node.tcpPort())
+ .put("status", service.getState(node.id()).toString());
+ }
+
+
+ @Override
+ public ControllerNode decode(ObjectNode json, CodecContext context) {
+ checkNotNull(json, "JSON cannot be null");
+ String ip = json.path("ip").asText();
+ return new DefaultControllerNode(new NodeId(json.path("id").asText(ip)),
+ IpAddress.valueOf(ip),
+ json.path("tcpPort").asInt(DEFAULT_PORT));
+ }
+
+
+}
diff --git a/core/net/pom.xml b/core/net/pom.xml
index c47f91d..d8aecf9 100644
--- a/core/net/pom.xml
+++ b/core/net/pom.xml
@@ -82,6 +82,11 @@
<groupId>org.apache.karaf.features</groupId>
<artifactId>org.apache.karaf.features.core</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.karaf.system</groupId>
+ <artifactId>org.apache.karaf.system.core</artifactId>
+ </dependency>
</dependencies>
<build>
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
index d9da6c6..3f7e460 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
@@ -15,18 +15,13 @@
*/
package org.onosproject.cluster.impl;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Set;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.apache.karaf.system.SystemService;
import org.joda.time.DateTime;
import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterAdminService;
@@ -41,6 +36,12 @@
import org.onosproject.event.EventDeliveryService;
import org.slf4j.Logger;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
/**
* Implementation of the cluster service.
*/
@@ -62,6 +63,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected SystemService systemService;
+
@Activate
public void activate() {
store.setDelegate(delegate);
@@ -105,6 +109,20 @@
}
@Override
+ public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
+ checkNotNull(nodes, "Nodes cannot be null");
+ checkArgument(!nodes.isEmpty(), "Nodes cannot be empty");
+ checkNotNull(ipPrefix, "IP prefix cannot be null");
+ store.formCluster(nodes, ipPrefix);
+ try {
+ log.warn("Shutting down container for cluster reconfiguration!");
+ systemService.reboot("now", SystemService.Swipe.NONE);
+ } catch (Exception e) {
+ log.error("Unable to reboot container", e);
+ }
+ }
+
+ @Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
checkNotNull(nodeId, INSTANCE_ID_NULL);
checkNotNull(ip, "IP address cannot be null");
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionStore.java
index a522a85..e3c521e 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/ClusterDefinitionStore.java
@@ -18,6 +18,8 @@
import static com.google.common.base.Preconditions.checkNotNull;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.Files;
+
import java.io.File;
import java.io.IOException;
@@ -43,8 +45,7 @@
*/
public ClusterDefinition read() throws IOException {
ObjectMapper mapper = new ObjectMapper();
- ClusterDefinition definition = mapper.readValue(file, ClusterDefinition.class);
- return definition;
+ return mapper.readValue(file, ClusterDefinition.class);
}
/**
@@ -55,7 +56,8 @@
public void write(ClusterDefinition definition) throws IOException {
checkNotNull(definition);
// write back to file
- final ObjectMapper mapper = new ObjectMapper();
+ Files.createParentDirs(file);
+ ObjectMapper mapper = new ObjectMapper();
mapper.writeValue(file, definition);
}
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
index 0472cff..f5bd773 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/DistributedClusterStore.java
@@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import com.hazelcast.util.AddressUtil;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -37,6 +38,8 @@
import org.onosproject.cluster.DefaultControllerNode;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.AbstractStore;
+import org.onosproject.store.consistent.impl.DatabaseDefinition;
+import org.onosproject.store.consistent.impl.DatabaseDefinitionStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.slf4j.Logger;
@@ -55,11 +58,13 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static com.hazelcast.util.AddressUtil.matchInterface;
import static java.net.NetworkInterface.getNetworkInterfaces;
import static java.util.Collections.list;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.cluster.DefaultControllerNode.DEFAULT_PORT;
+import static org.onosproject.store.consistent.impl.DatabaseManager.PARTITION_DEFINITION_FILE;
import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@@ -74,17 +79,14 @@
private static final Logger log = getLogger(DistributedClusterStore.class);
+ public static final String CLUSTER_DEFINITION_FILE = "../config/cluster.json";
+ public static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
+
// TODO: make these configurable.
private static final int HEARTBEAT_FD_PORT = 2419;
private static final int HEARTBEAT_INTERVAL_MS = 100;
private static final int PHI_FAILURE_THRESHOLD = 10;
- private static final String CONFIG_DIR = "../config";
- private static final String CLUSTER_DEFINITION_FILE = "cluster.json";
- private static final String HEARTBEAT_MESSAGE = "onos-cluster-heartbeat";
-
- public static final int DEFAULT_PORT = 9876;
-
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
@Override
protected void setupKryoPool() {
@@ -97,6 +99,8 @@
};
private static final String INSTANCE_ID_NULL = "Instance ID cannot be null";
+ private static final byte SITE_LOCAL_BYTE = (byte) 0xC0;
+ private static final String ONOS_NIC = "ONOS_NIC";
private ClusterDefinition clusterDefinition;
@@ -116,7 +120,7 @@
@Activate
public void activate() {
- File clusterDefinitionFile = new File(CONFIG_DIR, CLUSTER_DEFINITION_FILE);
+ File clusterDefinitionFile = new File(CLUSTER_DEFINITION_FILE);
ClusterDefinitionStore clusterDefinitionStore =
new ClusterDefinitionStore(clusterDefinitionFile.getPath());
@@ -129,13 +133,12 @@
seedNodes = ImmutableSet
.copyOf(clusterDefinition.getNodes())
.stream()
- .map(nodeInfo -> new DefaultControllerNode(new NodeId(nodeInfo.getId()),
- IpAddress.valueOf(nodeInfo.getIp()),
- nodeInfo.getTcpPort()))
+ .map(n -> new DefaultControllerNode(new NodeId(n.getId()),
+ IpAddress.valueOf(n.getIp()),
+ n.getTcpPort()))
.collect(Collectors.toSet());
} catch (IOException e) {
- throw new IllegalStateException(
- "Failed to read cluster definition.", e);
+ throw new IllegalStateException("Failed to read cluster definition.", e);
}
seedNodes.forEach(node -> {
@@ -179,26 +182,30 @@
}
/**
- * Returns the site local address if one can be found, loopback otherwise.
+ * Returns the address that matches the IP prefix given in ONOS_NIC
+ * environment variable if one was specified, or the first site local
+ * address if one can be found or the loopback address otherwise.
*
* @return site-local address in string form
*/
public static String getSiteLocalAddress() {
try {
+ String ipPrefix = System.getenv(ONOS_NIC);
for (NetworkInterface nif : list(getNetworkInterfaces())) {
for (InetAddress address : list(nif.getInetAddresses())) {
- if (address.getAddress()[0] == (byte) 0xC0) {
- return address.toString().substring(1);
+ IpAddress ip = IpAddress.valueOf(address);
+ if (ipPrefix == null && address.isSiteLocalAddress() ||
+ ipPrefix != null && matchInterface(ip.toString(), ipPrefix)) {
+ return ip.toString();
}
}
}
- return InetAddress.getLoopbackAddress().toString().substring(1);
} catch (SocketException e) {
log.error("Unable to get network interfaces", e);
}
- return null;
+ return IpAddress.valueOf(InetAddress.getLoopbackAddress()).toString();
}
@Deactivate
@@ -255,9 +262,6 @@
@Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
- checkNotNull(nodeId, INSTANCE_ID_NULL);
- checkNotNull(ip, "IP address must not be null");
- checkArgument(tcpPort > 5000, "Tcp port must be greater than 5000");
ControllerNode node = new DefaultControllerNode(nodeId, ip, tcpPort);
allNodes.put(node.id(), node);
updateState(nodeId, State.INACTIVE);
@@ -275,6 +279,24 @@
}
}
+ @Override
+ public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
+ try {
+ Set<NodeInfo> infos = Sets.newHashSet();
+ nodes.forEach(n -> infos.add(NodeInfo.from(n.id().toString(),
+ n.ip().toString(),
+ n.tcpPort())));
+
+ ClusterDefinition cdef = ClusterDefinition.from(infos, ipPrefix);
+ new ClusterDefinitionStore(CLUSTER_DEFINITION_FILE).write(cdef);
+
+ DatabaseDefinition ddef = DatabaseDefinition.from(infos);
+ new DatabaseDefinitionStore(PARTITION_DEFINITION_FILE).write(ddef);
+ } catch (IOException e) {
+ log.error("Unable to form cluster", e);
+ }
+ }
+
private void updateState(NodeId nodeId, State newState) {
nodeStates.put(nodeId, newState);
nodeStateLastUpdatedTimes.put(nodeId, DateTime.now());
@@ -387,4 +409,5 @@
public DateTime getLastUpdated(NodeId nodeId) {
return nodeStateLastUpdatedTimes.get(nodeId);
}
+
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
index 55d877d..827fec2 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/cluster/impl/HazelcastClusterStore.java
@@ -24,12 +24,12 @@
import com.hazelcast.core.MemberAttributeEvent;
import com.hazelcast.core.MembershipEvent;
import com.hazelcast.core.MembershipListener;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.joda.time.DateTime;
+import org.onlab.packet.IpAddress;
import org.onosproject.cluster.ClusterEvent;
import org.onosproject.cluster.ClusterStore;
import org.onosproject.cluster.ClusterStoreDelegate;
@@ -39,7 +39,6 @@
import org.onosproject.store.hz.AbsentInvalidatingLoadingCache;
import org.onosproject.store.hz.AbstractHazelcastStore;
import org.onosproject.store.hz.OptionalCacheLoader;
-import org.onlab.packet.IpAddress;
import java.util.Map;
import java.util.Set;
@@ -131,6 +130,11 @@
}
@Override
+ public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
+ throw new UnsupportedOperationException("formCluster not implemented");
+ }
+
+ @Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return addNode(new DefaultControllerNode(nodeId, ip, tcpPort));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinition.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinition.java
index 46754e5..11b56c1 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinition.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinition.java
@@ -15,16 +15,20 @@
*/
package org.onosproject.store.consistent.impl;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.onosproject.store.cluster.impl.NodeInfo;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import org.onosproject.store.cluster.impl.NodeInfo;
-
import static com.google.common.base.Preconditions.checkNotNull;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
/**
* Partitioned database configuration.
*/
@@ -34,11 +38,13 @@
/**
* Creates a new DatabaseDefinition.
+ *
* @param partitions partition map
- * @param nodes set of nodes
+ * @param nodes set of nodes
* @return database definition
*/
- public static DatabaseDefinition from(Map<String, Set<NodeInfo>> partitions, Set<NodeInfo> nodes) {
+ public static DatabaseDefinition from(Map<String, Set<NodeInfo>> partitions,
+ Set<NodeInfo> nodes) {
checkNotNull(partitions);
checkNotNull(nodes);
DatabaseDefinition definition = new DatabaseDefinition();
@@ -48,7 +54,18 @@
}
/**
+ * Creates a new DatabaseDefinition using default partitions.
+ *
+ * @param nodes set of nodes
+ * @return database definition
+ */
+ public static DatabaseDefinition from(Set<NodeInfo> nodes) {
+ return from(generateDefaultPartitions(nodes), nodes);
+ }
+
+ /**
* Returns the map of database partitions.
+ *
* @return db partition map
*/
public Map<String, Set<NodeInfo>> getPartitions() {
@@ -57,9 +74,35 @@
/**
* Returns the set of nodes.
+ *
* @return nodes
*/
public Set<NodeInfo> getNodes() {
return nodes;
}
+
+
+ /**
+ * Generates set of default partitions using permutations of the nodes.
+ *
+ * @param nodes information about cluster nodes
+ * @return default partition map
+ */
+ private static Map<String, Set<NodeInfo>> generateDefaultPartitions(Set<NodeInfo> nodes) {
+ List<NodeInfo> sorted = new ArrayList<>(nodes);
+ Collections.sort(sorted, (o1, o2) -> o1.getId().compareTo(o2.getId()));
+ Map<String, Set<NodeInfo>> partitions = Maps.newHashMap();
+
+ int length = nodes.size();
+ int count = 3;
+ for (int i = 0; i < length; i++) {
+ Set<NodeInfo> set = new HashSet<>(count);
+ for (int j = 0; j < count; j++) {
+ set.add(sorted.get((i + j) % length));
+ }
+ partitions.put("p" + (i + 1), set);
+ }
+ return partitions;
+ }
+
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java
index e54fe3c..b77667b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseDefinitionStore.java
@@ -20,13 +20,14 @@
import java.io.File;
import java.io.IOException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.io.Files;
/**
* Allows for reading and writing partitioned database definition as a JSON file.
*/
public class DatabaseDefinitionStore {
- private final File definitionfile;
+ private final File file;
/**
* Creates a reader/writer of the database definition file.
@@ -34,7 +35,7 @@
* @param filePath location of the definition file
*/
public DatabaseDefinitionStore(String filePath) {
- definitionfile = new File(checkNotNull(filePath));
+ file = new File(checkNotNull(filePath));
}
/**
@@ -43,7 +44,7 @@
* @param filePath location of the definition file
*/
public DatabaseDefinitionStore(File filePath) {
- definitionfile = checkNotNull(filePath);
+ file = checkNotNull(filePath);
}
/**
@@ -54,8 +55,7 @@
*/
public DatabaseDefinition read() throws IOException {
ObjectMapper mapper = new ObjectMapper();
- DatabaseDefinition definition = mapper.readValue(definitionfile, DatabaseDefinition.class);
- return definition;
+ return mapper.readValue(file, DatabaseDefinition.class);
}
/**
@@ -67,7 +67,8 @@
public void write(DatabaseDefinition definition) throws IOException {
checkNotNull(definition);
// write back to file
- final ObjectMapper mapper = new ObjectMapper();
- mapper.writeValue(definitionfile, definition);
+ Files.createParentDirs(file);
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.writeValue(file, definition);
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index eb5a70f..b939780 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -16,11 +16,9 @@
package org.onosproject.store.consistent.impl;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
-
import net.kuujo.copycat.CopycatConfig;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.Member;
@@ -34,7 +32,6 @@
import net.kuujo.copycat.protocol.Consistency;
import net.kuujo.copycat.protocol.Protocol;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -75,16 +72,18 @@
public class DatabaseManager implements StorageService, StorageAdminService {
private final Logger log = getLogger(getClass());
+
+ public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT
+ public static final String PARTITION_DEFINITION_FILE = "../config/tablets.json";
+ public static final String BASE_PARTITION_NAME = "p0";
+
+ private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60;
+ private static final int RAFT_ELECTION_TIMEOUT = 3000;
+ private static final int RAFT_HEARTBEAT_TIMEOUT = 1500;
+
private ClusterCoordinator coordinator;
private PartitionedDatabase partitionedDatabase;
private Database inMemoryDatabase;
- public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT
- private static final String CONFIG_DIR = "../config";
- private static final String PARTITION_DEFINITION_FILE = "tablets.json";
- private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60;
- public static final String BASE_PARTITION_NAME = "p0";
- private static final int RAFT_ELECTION_TIMEOUT = 3000;
- private static final int RAFT_HEARTBEAT_TIMEOUT = 1500;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -98,15 +97,14 @@
@Activate
public void activate() {
-
// load database configuration
- File file = new File(CONFIG_DIR, PARTITION_DEFINITION_FILE);
- log.info("Loading database definition: {}", file.getAbsolutePath());
+ File databaseDefFile = new File(PARTITION_DEFINITION_FILE);
+ log.info("Loading database definition: {}", databaseDefFile.getAbsolutePath());
Map<String, Set<NodeInfo>> partitionMap;
try {
- DatabaseDefinitionStore databaseDefStore = new DatabaseDefinitionStore(file);
- if (!file.exists()) {
+ DatabaseDefinitionStore databaseDefStore = new DatabaseDefinitionStore(databaseDefFile);
+ if (!databaseDefFile.exists()) {
createDefaultDatabaseDefinition(databaseDefStore);
}
partitionMap = databaseDefStore.read().getPartitions();
@@ -189,10 +187,9 @@
private void createDefaultDatabaseDefinition(DatabaseDefinitionStore store) {
// Assumes IPv4 is returned.
String ip = DistributedClusterStore.getSiteLocalAddress();
- NodeInfo node = NodeInfo.from(ip, ip, DistributedClusterStore.DEFAULT_PORT);
+ NodeInfo node = NodeInfo.from(ip, ip, COPYCAT_TCP_PORT);
try {
- store.write(DatabaseDefinition.from(ImmutableMap.of("p1", ImmutableSet.of(node)),
- ImmutableSet.of(node)));
+ store.write(DatabaseDefinition.from(ImmutableSet.of(node)));
} catch (IOException e) {
log.warn("Unable to write default cluster definition", e);
}
diff --git a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
index 3849555..e0a4b9d 100644
--- a/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
+++ b/core/store/trivial/src/main/java/org/onosproject/store/trivial/impl/SimpleClusterStore.java
@@ -15,10 +15,7 @@
*/
package org.onosproject.store.trivial.impl;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Set;
-
+import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -36,7 +33,9 @@
import org.onosproject.store.AbstractStore;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableSet;
+import java.util.Set;
+
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Manages inventory of infrastructure devices using trivial in-memory
@@ -94,6 +93,11 @@
}
@Override
+ public void formCluster(Set<ControllerNode> nodes, String ipPrefix) {
+
+ }
+
+ @Override
public ControllerNode addNode(NodeId nodeId, IpAddress ip, int tcpPort) {
return null;
}