[ONOS-7088] Distribute storage partitions evenly during upgrades
Change-Id: Id82f86ddedbe6c7de2322717338c5c341177bc9e
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index dbe712d..c9cfe53 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -17,17 +17,21 @@
import java.io.File;
import java.io.IOException;
+import java.nio.file.FileVisitResult;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.cluster.MemberId;
-import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
-import org.onosproject.core.Version;
+import org.onosproject.cluster.Partition;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
import org.onosproject.store.service.PartitionInfo;
@@ -101,23 +105,46 @@
}
/**
+ * Deletes the server.
+ */
+ public void delete() {
+ try {
+ Files.walkFileTree(partition.getDataFolder().toPath(), new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } catch (IOException e) {
+ log.error("Failed to delete partition: {}", e);
+ }
+ }
+
+ /**
* Forks the existing partition into a new partition.
*
- * @param version the version from which to fork the server
+ * @param fromPartition the partition from which to fork the server
* @return future to be completed once the fork operation is complete
*/
- public CompletableFuture<Void> fork(Version version) {
- log.info("Forking server for partition {} ({}->{})", partition.getId(), version, partition.getVersion());
+ public CompletableFuture<Void> fork(Partition fromPartition) {
+ log.info("Forking server for partition {} ({}->{})",
+ partition.getId(), fromPartition.getVersion(), partition.getVersion());
RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
- .withName(partition.getName(version))
- .withType(RaftMember.Type.PASSIVE)
+ .withName(String.format("partition-%s", fromPartition.getId()))
.withProtocol(new RaftServerCommunicator(
- partition.getName(version),
+ String.format("partition-%s-%s", fromPartition.getId(), fromPartition.getVersion()),
Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
clusterCommunicator))
.withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
.withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
.withStorage(RaftStorage.newBuilder()
+ .withPrefix(String.format("partition-%s", partition.getId()))
.withStorageLevel(StorageLevel.MAPPED)
.withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
.withDirectory(partition.getDataFolder())
@@ -125,12 +152,27 @@
.build());
StoragePartition.RAFT_SERVICES.forEach(builder::addService);
RaftServer server = builder.build();
- return server.join(partition.getMemberIds(version))
- .thenCompose(v -> server.shutdown())
+
+ // Create a collection of members currently in the source partition.
+ Collection<MemberId> members = fromPartition.getMembers()
+ .stream()
+ .map(id -> MemberId.from(id.id()))
+ .collect(Collectors.toList());
+
+ // If this node is a member of the partition, join the partition. Otherwise, listen to the partition.
+ CompletableFuture<RaftServer> future = members.contains(localMemberId)
+ ? server.bootstrap(members) : server.listen(members);
+
+ // TODO: We should leave the cluster for nodes that aren't normally members to ensure the source
+ // cluster's configuration is kept consistent for rolling back upgrades, but Atomix deletes configuration
+ // files when a node leaves the cluster so we can't do that here.
+ return future.thenCompose(v -> server.shutdown())
.thenCompose(v -> {
// Delete the cluster configuration file from the forked partition.
try {
- Files.delete(new File(partition.getDataFolder(), "atomix.conf").toPath());
+ Files.delete(new File(
+ partition.getDataFolder(),
+ String.format("partition-%s.conf", partition.getId())).toPath());
} catch (IOException e) {
log.error("Failed to delete partition configuration: {}", e);
}
@@ -141,24 +183,25 @@
}).whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully forked server for partition {} ({}->{})",
- partition.getId(), version, partition.getVersion());
+ partition.getId(), fromPartition.getVersion(), partition.getVersion());
} else {
log.info("Failed to fork server for partition {} ({}->{})",
- partition.getId(), version, partition.getVersion(), e);
+ partition.getId(), fromPartition.getVersion(), partition.getVersion(), e);
}
}).thenApply(v -> null);
}
private RaftServer buildServer() {
RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
- .withName(partition.getName())
+ .withName(String.format("partition-%s", partition.getId()))
.withProtocol(new RaftServerCommunicator(
- partition.getName(),
+ String.format("partition-%s-%s", partition.getId(), partition.getVersion()),
Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
clusterCommunicator))
.withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
.withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
.withStorage(RaftStorage.newBuilder()
+ .withPrefix(String.format("partition-%s", partition.getId()))
.withStorageLevel(StorageLevel.MAPPED)
.withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
.withDirectory(partition.getDataFolder())
@@ -169,13 +212,13 @@
}
public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
- log.info("Joining partition {} ({})", partition.getId(), partition.getVersion());
+ log.info("Joining partition {} ({})", partition.getId(), partition.getName());
server = buildServer();
return server.join(otherMembers).whenComplete((r, e) -> {
if (e == null) {
- log.info("Successfully joined partition {} ({})", partition.getId(), partition.getVersion());
+ log.info("Successfully joined partition {} ({})", partition.getId(), partition.getName());
} else {
- log.info("Failed to join partition {} ({})", partition.getId(), partition.getVersion(), e);
+ log.info("Failed to join partition {} ({})", partition.getId(), partition.getName(), e);
}
}).thenApply(v -> null);
}