[ONOS-7054] Implement prototype of ISSU protocol
Change-Id: Id543c0de9c97b68f977c824cbc987b35d81beb2d
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index bb071a2..123c3b6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -16,16 +16,19 @@
package org.onosproject.store.primitives.impl;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.cluster.MemberId;
-import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
import org.onosproject.store.service.PartitionInfo;
import org.onosproject.store.service.Serializer;
@@ -46,23 +49,21 @@
private final MemberId localMemberId;
private final StoragePartition partition;
- private final Supplier<RaftServerProtocol> protocol;
- private final File dataFolder;
+ private final UnifiedClusterCommunicationService clusterCommunicator;
private RaftServer server;
public StoragePartitionServer(
StoragePartition partition,
MemberId localMemberId,
- Supplier<RaftServerProtocol> protocol,
- File dataFolder) {
+ UnifiedClusterCommunicationService clusterCommunicator) {
this.partition = partition;
this.localMemberId = localMemberId;
- this.protocol = protocol;
- this.dataFolder = dataFolder;
+ this.clusterCommunicator = clusterCommunicator;
}
@Override
public CompletableFuture<Void> open() {
+ log.info("Starting server for partition {} ({})", partition.getId(), partition.getVersion());
CompletableFuture<RaftServer> serverOpenFuture;
if (partition.getMemberIds().contains(localMemberId)) {
if (server != null && server.isRunning()) {
@@ -77,9 +78,11 @@
}
return serverOpenFuture.whenComplete((r, e) -> {
if (e == null) {
- log.info("Successfully started server for partition {}", partition.getId());
+ log.info("Successfully started server for partition {} ({})",
+ partition.getId(), partition.getVersion());
} else {
- log.info("Failed to start server for partition {}", partition.getId(), e);
+ log.info("Failed to start server for partition {} ({})",
+ partition.getId(), partition.getVersion(), e);
}
}).thenApply(v -> null);
}
@@ -97,16 +100,68 @@
return server.leave();
}
- private RaftServer buildServer() {
+ /**
+ * Forks the existing partition into a new partition.
+ *
+ * @param version the version from which to fork the server
+ * @return future to be completed once the fork operation is complete
+ */
+ public CompletableFuture<Void> fork(Version version) {
+ log.info("Forking server for partition {} ({}->{})", partition.getId(), version, partition.getVersion());
RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
- .withName("partition-" + partition.getId())
- .withProtocol(protocol.get())
+ .withName(partition.getName(version))
+ .withType(RaftMember.Type.PASSIVE)
+ .withProtocol(new RaftServerCommunicator(
+ partition.getName(version),
+ Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+ clusterCommunicator))
.withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
.withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
.withStorage(RaftStorage.newBuilder()
.withStorageLevel(StorageLevel.MAPPED)
.withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
- .withDirectory(dataFolder)
+ .withDirectory(partition.getDataFolder())
+ .withMaxSegmentSize(MAX_SEGMENT_SIZE)
+ .build());
+ StoragePartition.RAFT_SERVICES.forEach(builder::addService);
+ RaftServer server = builder.build();
+ return server.join(partition.getMemberIds(version))
+ .thenCompose(v -> server.shutdown())
+ .thenCompose(v -> {
+ // Delete the cluster configuration file from the forked partition.
+ try {
+ Files.delete(new File(partition.getDataFolder(), "atomix.conf").toPath());
+ } catch (IOException e) {
+ log.error("Failed to delete partition configuration: {}", e);
+ }
+
+ // Build and bootstrap a new server.
+ this.server = buildServer();
+ return this.server.bootstrap();
+ }).whenComplete((r, e) -> {
+ if (e == null) {
+ log.info("Successfully forked server for partition {} ({}->{})",
+ partition.getId(), version, partition.getVersion());
+ } else {
+ log.info("Failed to fork server for partition {} ({}->{})",
+ partition.getId(), version, partition.getVersion(), e);
+ }
+ }).thenApply(v -> null);
+ }
+
+ private RaftServer buildServer() {
+ RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
+ .withName(partition.getName())
+ .withProtocol(new RaftServerCommunicator(
+ partition.getName(),
+ Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+ clusterCommunicator))
+ .withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
+ .withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
+ .withStorage(RaftStorage.newBuilder()
+ .withStorageLevel(StorageLevel.MAPPED)
+ .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
+ .withDirectory(partition.getDataFolder())
.withMaxSegmentSize(MAX_SEGMENT_SIZE)
.build());
StoragePartition.RAFT_SERVICES.forEach(builder::addService);
@@ -114,12 +169,13 @@
}
public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
+ log.info("Joining partition {} ({})", partition.getId(), partition.getVersion());
server = buildServer();
return server.join(otherMembers).whenComplete((r, e) -> {
if (e == null) {
- log.info("Successfully joined partition {}", partition.getId());
+ log.info("Successfully joined partition {} ({})", partition.getId(), partition.getVersion());
} else {
- log.info("Failed to join partition {}", partition.getId(), e);
+ log.info("Failed to join partition {} ({})", partition.getId(), partition.getVersion(), e);
}
}).thenApply(v -> null);
}