Upgrade to Atomix 3.0-rc5
* Upgrade Raft primitives to Atomix 3.0
* Replace cluster store and messaging implementations with Atomix cluster management/messaging
* Add test scripts for installing/starting Atomix cluster
* Replace core primitives with Atomix primitives.
Change-Id: I7623653c81292a34f21b01f5f38ca11b5ef15cad
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
index c1b5de1..8086206 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
@@ -15,17 +15,19 @@
*/
package org.onosproject.cluster.impl;
-import com.fasterxml.jackson.core.JsonGenerator;
-import com.fasterxml.jackson.core.JsonParser;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.DeserializationContext;
-import com.fasterxml.jackson.databind.JsonDeserializer;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.JsonSerializer;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializerProvider;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.google.common.collect.Sets;
import com.google.common.io.Files;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -37,26 +39,14 @@
import org.onosproject.cluster.ClusterMetadataProvider;
import org.onosproject.cluster.ClusterMetadataProviderRegistry;
import org.onosproject.cluster.ClusterMetadataProviderService;
-import org.onosproject.cluster.ControllerNode;
import org.onosproject.cluster.DefaultControllerNode;
-import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.Node;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-
import static com.google.common.base.Preconditions.checkState;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
@@ -70,11 +60,6 @@
private final Logger log = getLogger(getClass());
- // constants for filed names (used in serialization)
- private static final String ID = "id";
- private static final String PORT = "port";
- private static final String IP = "ip";
-
private static final String CONFIG_DIR = "../config";
private static final String CONFIG_FILE_NAME = "cluster.json";
private static final File CONFIG_FILE = new File(CONFIG_DIR, CONFIG_FILE_NAME);
@@ -94,16 +79,6 @@
@Activate
public void activate() {
mapper = new ObjectMapper();
- SimpleModule module = new SimpleModule();
- module.addSerializer(NodeId.class, new NodeIdSerializer());
- module.addDeserializer(NodeId.class, new NodeIdDeserializer());
- module.addSerializer(ControllerNode.class, new ControllerNodeSerializer());
- module.addDeserializer(ControllerNode.class, new ControllerNodeDeserializer());
- module.addSerializer(Partition.class, new PartitionSerializer());
- module.addDeserializer(Partition.class, new PartitionDeserializer());
- module.addSerializer(PartitionId.class, new PartitionIdSerializer());
- module.addDeserializer(PartitionId.class, new PartitionIdDeserializer());
- mapper.registerModule(module);
providerService = providerRegistry.register(this);
metadataUrl = System.getProperty("onos.cluster.metadata.uri", "file://" + CONFIG_DIR + "/" + CONFIG_FILE);
configFileChangeDetector.scheduleWithFixedDelay(() -> watchUrl(metadataUrl), 100, 500, TimeUnit.MILLISECONDS);
@@ -133,12 +108,31 @@
}
}
+ private ClusterMetadataPrototype toPrototype(ClusterMetadata metadata) {
+ ClusterMetadataPrototype prototype = new ClusterMetadataPrototype();
+ prototype.setName(metadata.getName());
+ prototype.setCluster(metadata.getNodes()
+ .stream()
+ .map(this::toPrototype)
+ .collect(Collectors.toSet()));
+ return prototype;
+ }
+
+ private NodePrototype toPrototype(Node node) {
+ NodePrototype prototype = new NodePrototype();
+ prototype.setId(node.id().id());
+ prototype.setIp(node.ip().toString());
+ prototype.setPort(node.tcpPort());
+ return prototype;
+ }
+
@Override
public void setClusterMetadata(ClusterMetadata metadata) {
try {
File configFile = new File(metadataUrl.replaceFirst("file://", ""));
Files.createParentDirs(configFile);
- mapper.writeValue(configFile, metadata);
+ ClusterMetadataPrototype metadataPrototype = toPrototype(metadata);
+ mapper.writeValue(configFile, metadataPrototype);
cachedMetadata.set(fetchMetadata(metadataUrl));
providerService.clusterMetadataChanged(new Versioned<>(metadata, configFile.lastModified()));
} catch (IOException e) {
@@ -202,12 +196,12 @@
private Versioned<ClusterMetadata> fetchMetadata(String metadataUrl) {
try {
URL url = new URL(metadataUrl);
- ClusterMetadata metadata = null;
+ ClusterMetadataPrototype metadata = null;
long version = 0;
if ("file".equals(url.getProtocol())) {
File file = new File(metadataUrl.replaceFirst("file://", ""));
version = file.lastModified();
- metadata = mapper.readValue(new FileInputStream(file), ClusterMetadata.class);
+ metadata = mapper.readValue(new FileInputStream(file), ClusterMetadataPrototype.class);
} else if ("http".equals(url.getProtocol())) {
try {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
@@ -219,7 +213,7 @@
return null;
}
version = conn.getLastModified();
- metadata = mapper.readValue(conn.getInputStream(), ClusterMetadata.class);
+ metadata = mapper.readValue(conn.getInputStream(), ClusterMetadataPrototype.class);
} catch (IOException e) {
log.warn("Could not reach metadata URL {}. Retrying...", url);
return null;
@@ -231,105 +225,35 @@
throw new NullPointerException();
}
- // If the configured partitions are empty then return a null metadata to indicate that the configuration
- // needs to be polled until the partitions are populated.
- if (metadata.getPartitions().isEmpty() || metadata.getPartitions().stream()
- .map(partition -> partition.getMembers().size())
- .reduce(Math::min)
- .orElse(0) == 0) {
- return null;
- }
- return new Versioned<>(new ClusterMetadata(PROVIDER_ID,
- metadata.getName(),
- Sets.newHashSet(metadata.getNodes()),
- Sets.newHashSet(metadata.getPartitions())),
- version);
+ return new Versioned<>(new ClusterMetadata(
+ PROVIDER_ID,
+ metadata.getName(),
+ metadata.getNode() != null ?
+ new DefaultControllerNode(
+ metadata.getNode().getId() != null
+ ? NodeId.nodeId(metadata.getNode().getId())
+ : metadata.getNode().getIp() != null
+ ? NodeId.nodeId(IpAddress.valueOf(metadata.getNode().getIp()).toString())
+ : NodeId.nodeId(UUID.randomUUID().toString()),
+ metadata.getNode().getIp() != null
+ ? IpAddress.valueOf(metadata.getNode().getIp())
+ : null,
+ metadata.getNode().getPort() != null
+ ? metadata.getNode().getPort()
+ : DefaultControllerNode.DEFAULT_PORT) : null,
+ metadata.getCluster()
+ .stream()
+ .map(node -> new DefaultControllerNode(
+ NodeId.nodeId(node.getId()),
+ IpAddress.valueOf(node.getIp()),
+ node.getPort() != null ? node.getPort() : 5679))
+ .collect(Collectors.toSet())),
+ version);
} catch (IOException e) {
throw new IllegalArgumentException(e);
}
}
- private static class PartitionSerializer extends JsonSerializer<Partition> {
- @Override
- public void serialize(Partition partition, JsonGenerator jgen, SerializerProvider serializerProvider)
- throws IOException, JsonProcessingException {
- jgen.writeStartObject();
- jgen.writeNumberField("id", partition.getId().asInt());
- jgen.writeArrayFieldStart("members");
- for (NodeId nodeId : partition.getMembers()) {
- jgen.writeString(nodeId.id());
- }
- jgen.writeEndArray();
- jgen.writeEndObject();
- }
- }
-
- private static class PartitionDeserializer extends JsonDeserializer<Partition> {
- @Override
- public Partition deserialize(JsonParser jp, DeserializationContext ctxt)
- throws IOException, JsonProcessingException {
- return jp.readValueAs(DefaultPartition.class);
- }
- }
-
- private static class PartitionIdSerializer extends JsonSerializer<PartitionId> {
- @Override
- public void serialize(PartitionId partitionId, JsonGenerator jgen, SerializerProvider provider)
- throws IOException, JsonProcessingException {
- jgen.writeNumber(partitionId.asInt());
- }
- }
-
- private class PartitionIdDeserializer extends JsonDeserializer<PartitionId> {
- @Override
- public PartitionId deserialize(JsonParser jp, DeserializationContext ctxt)
- throws IOException, JsonProcessingException {
- JsonNode node = jp.getCodec().readTree(jp);
- return new PartitionId(node.asInt());
- }
- }
-
- private static class ControllerNodeSerializer extends JsonSerializer<ControllerNode> {
- @Override
- public void serialize(ControllerNode node, JsonGenerator jgen, SerializerProvider provider)
- throws IOException, JsonProcessingException {
- jgen.writeStartObject();
- jgen.writeStringField(ID, node.id().toString());
- jgen.writeStringField(IP, node.ip().toString());
- jgen.writeNumberField(PORT, node.tcpPort());
- jgen.writeEndObject();
- }
- }
-
- private static class ControllerNodeDeserializer extends JsonDeserializer<ControllerNode> {
- @Override
- public ControllerNode deserialize(JsonParser jp, DeserializationContext ctxt)
- throws IOException, JsonProcessingException {
- JsonNode node = jp.getCodec().readTree(jp);
- NodeId nodeId = new NodeId(node.get(ID).textValue());
- IpAddress ip = IpAddress.valueOf(node.get(IP).textValue());
- int port = node.get(PORT).asInt();
- return new DefaultControllerNode(nodeId, ip, port);
- }
- }
-
- private static class NodeIdSerializer extends JsonSerializer<NodeId> {
- @Override
- public void serialize(NodeId nodeId, JsonGenerator jgen, SerializerProvider provider)
- throws IOException, JsonProcessingException {
- jgen.writeString(nodeId.toString());
- }
- }
-
- private class NodeIdDeserializer extends JsonDeserializer<NodeId> {
- @Override
- public NodeId deserialize(JsonParser jp, DeserializationContext ctxt)
- throws IOException, JsonProcessingException {
- JsonNode node = jp.getCodec().readTree(jp);
- return new NodeId(node.asText());
- }
- }
-
/**
* Monitors the metadata url for any updates and notifies providerService accordingly.
*/
@@ -350,4 +274,64 @@
log.error("Unable to parse metadata : ", e);
}
}
+
+ private static class ClusterMetadataPrototype {
+ private String name;
+ private NodePrototype node;
+ private Set<NodePrototype> cluster;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public NodePrototype getNode() {
+ return node;
+ }
+
+ public void setNode(NodePrototype node) {
+ this.node = node;
+ }
+
+ public Set<NodePrototype> getCluster() {
+ return cluster;
+ }
+
+ public void setCluster(Set<NodePrototype> cluster) {
+ this.cluster = cluster;
+ }
+ }
+
+ private static class NodePrototype {
+ private String id;
+ private String ip;
+ private Integer port;
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ public String getIp() {
+ return ip;
+ }
+
+ public void setIp(String ip) {
+ this.ip = ip;
+ }
+
+ public Integer getPort() {
+ return port;
+ }
+
+ public void setPort(Integer port) {
+ this.port = port;
+ }
+ }
}
\ No newline at end of file