[ONOS-7054] Implement prototype of ISSU protocol

Change-Id: Id543c0de9c97b68f977c824cbc987b35d81beb2d
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index bb071a2..123c3b6 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -16,16 +16,19 @@
 package org.onosproject.store.primitives.impl;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
-import java.util.function.Supplier;
 
 import io.atomix.protocols.raft.RaftServer;
 import io.atomix.protocols.raft.cluster.MemberId;
-import io.atomix.protocols.raft.protocol.RaftServerProtocol;
+import io.atomix.protocols.raft.cluster.RaftMember;
 import io.atomix.protocols.raft.storage.RaftStorage;
 import io.atomix.storage.StorageLevel;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.UnifiedClusterCommunicationService;
 import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
 import org.onosproject.store.service.PartitionInfo;
 import org.onosproject.store.service.Serializer;
@@ -46,23 +49,21 @@
 
     private final MemberId localMemberId;
     private final StoragePartition partition;
-    private final Supplier<RaftServerProtocol> protocol;
-    private final File dataFolder;
+    private final UnifiedClusterCommunicationService clusterCommunicator;
     private RaftServer server;
 
     public StoragePartitionServer(
             StoragePartition partition,
             MemberId localMemberId,
-            Supplier<RaftServerProtocol> protocol,
-            File dataFolder) {
+            UnifiedClusterCommunicationService clusterCommunicator) {
         this.partition = partition;
         this.localMemberId = localMemberId;
-        this.protocol = protocol;
-        this.dataFolder = dataFolder;
+        this.clusterCommunicator = clusterCommunicator;
     }
 
     @Override
     public CompletableFuture<Void> open() {
+        log.info("Starting server for partition {} ({})", partition.getId(), partition.getVersion());
         CompletableFuture<RaftServer> serverOpenFuture;
         if (partition.getMemberIds().contains(localMemberId)) {
             if (server != null && server.isRunning()) {
@@ -77,9 +78,11 @@
         }
         return serverOpenFuture.whenComplete((r, e) -> {
             if (e == null) {
-                log.info("Successfully started server for partition {}", partition.getId());
+                log.info("Successfully started server for partition {} ({})",
+                        partition.getId(), partition.getVersion());
             } else {
-                log.info("Failed to start server for partition {}", partition.getId(), e);
+                log.info("Failed to start server for partition {} ({})",
+                        partition.getId(), partition.getVersion(), e);
             }
         }).thenApply(v -> null);
     }
@@ -97,16 +100,68 @@
         return server.leave();
     }
 
-    private RaftServer buildServer() {
+    /**
+     * Forks the existing partition into a new partition.
+     *
+     * @param version the version from which to fork the server
+     * @return future to be completed once the fork operation is complete
+     */
+    public CompletableFuture<Void> fork(Version version) {
+        log.info("Forking server for partition {} ({}->{})", partition.getId(), version, partition.getVersion());
         RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
-                .withName("partition-" + partition.getId())
-                .withProtocol(protocol.get())
+                .withName(partition.getName(version))
+                .withType(RaftMember.Type.PASSIVE)
+                .withProtocol(new RaftServerCommunicator(
+                        partition.getName(version),
+                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+                        clusterCommunicator))
                 .withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
                 .withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
                 .withStorage(RaftStorage.newBuilder()
                         .withStorageLevel(StorageLevel.MAPPED)
                         .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
-                        .withDirectory(dataFolder)
+                        .withDirectory(partition.getDataFolder())
+                        .withMaxSegmentSize(MAX_SEGMENT_SIZE)
+                        .build());
+        StoragePartition.RAFT_SERVICES.forEach(builder::addService);
+        RaftServer server = builder.build();
+        return server.join(partition.getMemberIds(version))
+                .thenCompose(v -> server.shutdown())
+                .thenCompose(v -> {
+                    // Delete the cluster configuration file from the forked partition.
+                    try {
+                        Files.delete(new File(partition.getDataFolder(), "atomix.conf").toPath());
+                    } catch (IOException e) {
+                        log.error("Failed to delete partition configuration: {}", e);
+                    }
+
+                    // Build and bootstrap a new server.
+                    this.server = buildServer();
+                    return this.server.bootstrap();
+                }).whenComplete((r, e) -> {
+                    if (e == null) {
+                        log.info("Successfully forked server for partition {} ({}->{})",
+                                partition.getId(), version, partition.getVersion());
+                    } else {
+                        log.info("Failed to fork server for partition {} ({}->{})",
+                                partition.getId(), version, partition.getVersion(), e);
+                    }
+                }).thenApply(v -> null);
+    }
+
+    private RaftServer buildServer() {
+        RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
+                .withName(partition.getName())
+                .withProtocol(new RaftServerCommunicator(
+                        partition.getName(),
+                        Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
+                        clusterCommunicator))
+                .withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
+                .withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
+                .withStorage(RaftStorage.newBuilder()
+                        .withStorageLevel(StorageLevel.MAPPED)
+                        .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
+                        .withDirectory(partition.getDataFolder())
                         .withMaxSegmentSize(MAX_SEGMENT_SIZE)
                         .build());
         StoragePartition.RAFT_SERVICES.forEach(builder::addService);
@@ -114,12 +169,13 @@
     }
 
     public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
+        log.info("Joining partition {} ({})", partition.getId(), partition.getVersion());
         server = buildServer();
         return server.join(otherMembers).whenComplete((r, e) -> {
             if (e == null) {
-                log.info("Successfully joined partition {}", partition.getId());
+                log.info("Successfully joined partition {} ({})", partition.getId(), partition.getVersion());
             } else {
-                log.info("Failed to join partition {}", partition.getId(), e);
+                log.info("Failed to join partition {} ({})", partition.getId(), partition.getVersion(), e);
             }
         }).thenApply(v -> null);
     }