[ONOS-7088] Distribute storage partitions evenly during upgrades

Change-Id: Id82f86ddedbe6c7de2322717338c5c341177bc9e
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index dbe712d..c9cfe53 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -17,17 +17,21 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.nio.file.FileVisitResult;
 import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import io.atomix.protocols.raft.RaftServer;
 import io.atomix.protocols.raft.cluster.MemberId;
-import io.atomix.protocols.raft.cluster.RaftMember;
 import io.atomix.protocols.raft.storage.RaftStorage;
 import io.atomix.storage.StorageLevel;
-import org.onosproject.core.Version;
+import org.onosproject.cluster.Partition;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
 import org.onosproject.store.service.PartitionInfo;
@@ -101,23 +105,46 @@
     }
 
     /**
+     * Deletes the server.
+     */
+    public void delete() {
+        try {
+            Files.walkFileTree(partition.getDataFolder().toPath(), new SimpleFileVisitor<Path>() {
+                @Override
+                public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+                    Files.delete(file);
+                    return FileVisitResult.CONTINUE;
+                }
+                @Override
+                public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+                    Files.delete(dir);
+                    return FileVisitResult.CONTINUE;
+                }
+            });
+        } catch (IOException e) {
+            log.error("Failed to delete partition: {}", e);
+        }
+    }
+
+    /**
      * Forks the existing partition into a new partition.
      *
-     * @param version the version from which to fork the server
+     * @param fromPartition the partition from which to fork the server
      * @return future to be completed once the fork operation is complete
      */
-    public CompletableFuture<Void> fork(Version version) {
-        log.info("Forking server for partition {} ({}->{})", partition.getId(), version, partition.getVersion());
+    public CompletableFuture<Void> fork(Partition fromPartition) {
+        log.info("Forking server for partition {} ({}->{})",
+                partition.getId(), fromPartition.getVersion(), partition.getVersion());
         RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
-                .withName(partition.getName(version))
-                .withType(RaftMember.Type.PASSIVE)
+                .withName(String.format("partition-%s", fromPartition.getId()))
                 .withProtocol(new RaftServerCommunicator(
-                        partition.getName(version),
+                        String.format("partition-%s-%s", fromPartition.getId(), fromPartition.getVersion()),
                         Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
                         clusterCommunicator))
                 .withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
                 .withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
                 .withStorage(RaftStorage.newBuilder()
+                        .withPrefix(String.format("partition-%s", partition.getId()))
                         .withStorageLevel(StorageLevel.MAPPED)
                         .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
                         .withDirectory(partition.getDataFolder())
@@ -125,12 +152,27 @@
                         .build());
         StoragePartition.RAFT_SERVICES.forEach(builder::addService);
         RaftServer server = builder.build();
-        return server.join(partition.getMemberIds(version))
-                .thenCompose(v -> server.shutdown())
+
+        // Create a collection of members currently in the source partition.
+        Collection<MemberId> members = fromPartition.getMembers()
+                .stream()
+                .map(id -> MemberId.from(id.id()))
+                .collect(Collectors.toList());
+
+        // If this node is a member of the partition, join the partition. Otherwise, listen to the partition.
+        CompletableFuture<RaftServer> future = members.contains(localMemberId)
+                ? server.bootstrap(members) : server.listen(members);
+
+        // TODO: We should leave the cluster for nodes that aren't normally members to ensure the source
+        // cluster's configuration is kept consistent for rolling back upgrades, but Atomix deletes configuration
+        // files when a node leaves the cluster so we can't do that here.
+        return future.thenCompose(v -> server.shutdown())
                 .thenCompose(v -> {
                     // Delete the cluster configuration file from the forked partition.
                     try {
-                        Files.delete(new File(partition.getDataFolder(), "atomix.conf").toPath());
+                        Files.delete(new File(
+                                partition.getDataFolder(),
+                                String.format("partition-%s.conf", partition.getId())).toPath());
                     } catch (IOException e) {
                         log.error("Failed to delete partition configuration: {}", e);
                     }
@@ -141,24 +183,25 @@
                 }).whenComplete((r, e) -> {
                     if (e == null) {
                         log.info("Successfully forked server for partition {} ({}->{})",
-                                partition.getId(), version, partition.getVersion());
+                                partition.getId(), fromPartition.getVersion(), partition.getVersion());
                     } else {
                         log.info("Failed to fork server for partition {} ({}->{})",
-                                partition.getId(), version, partition.getVersion(), e);
+                                partition.getId(), fromPartition.getVersion(), partition.getVersion(), e);
                     }
                 }).thenApply(v -> null);
     }
 
     private RaftServer buildServer() {
         RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
-                .withName(partition.getName())
+                .withName(String.format("partition-%s", partition.getId()))
                 .withProtocol(new RaftServerCommunicator(
-                        partition.getName(),
+                        String.format("partition-%s-%s", partition.getId(), partition.getVersion()),
                         Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
                         clusterCommunicator))
                 .withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
                 .withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
                 .withStorage(RaftStorage.newBuilder()
+                        .withPrefix(String.format("partition-%s", partition.getId()))
                         .withStorageLevel(StorageLevel.MAPPED)
                         .withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
                         .withDirectory(partition.getDataFolder())
@@ -169,13 +212,13 @@
     }
 
     public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
-        log.info("Joining partition {} ({})", partition.getId(), partition.getVersion());
+        log.info("Joining partition {} ({})", partition.getId(), partition.getName());
         server = buildServer();
         return server.join(otherMembers).whenComplete((r, e) -> {
             if (e == null) {
-                log.info("Successfully joined partition {} ({})", partition.getId(), partition.getVersion());
+                log.info("Successfully joined partition {} ({})", partition.getId(), partition.getName());
             } else {
-                log.info("Failed to join partition {} ({})", partition.getId(), partition.getVersion(), e);
+                log.info("Failed to join partition {} ({})", partition.getId(), partition.getName(), e);
             }
         }).thenApply(v -> null);
     }