[ONOS-7088] Distribute storage partitions evenly during upgrades
Change-Id: Id82f86ddedbe6c7de2322717338c5c341177bc9e
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ActiveStoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ActiveStoragePartition.java
new file mode 100644
index 0000000..5e9301d
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ActiveStoragePartition.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.io.File;
+import java.util.concurrent.CompletableFuture;
+
+import io.atomix.protocols.raft.cluster.MemberId;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Partition;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+
+/**
+ * Storage partition active on the local node.
+ */
+public class ActiveStoragePartition extends StoragePartition {
+ public ActiveStoragePartition(
+ Partition partition,
+ ClusterCommunicationService clusterCommunicator,
+ ClusterService clusterService) {
+ super(partition, clusterCommunicator, clusterService);
+ }
+
+ @Override
+ public String getName() {
+ return partition.getId().toString();
+ }
+
+ @Override
+ public File getDataFolder() {
+ return new File(PARTITIONS_DIR + partition.getId());
+ }
+
+ @Override
+ protected CompletableFuture<Void> openServer() {
+ StoragePartitionServer server = new StoragePartitionServer(
+ this,
+ MemberId.from(localNodeId.id()),
+ clusterCommunicator);
+ return server.open().thenRun(() -> this.server = server);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index 605bf25..e7ed20a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -15,6 +15,9 @@
*/
package org.onosproject.store.primitives.impl;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -52,14 +55,10 @@
import org.onosproject.store.service.WorkQueue;
import org.slf4j.Logger;
-import java.io.File;
import java.util.List;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
-import static org.onosproject.store.primitives.impl.PartitionManager.PARTITIONS_DIR;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -86,18 +85,16 @@
@Activate
public void activate() {
- partition = new StoragePartition(
+ partition = new ActiveStoragePartition(
new DefaultPartition(
PartitionId.SHARED,
+ null,
clusterService.getNodes()
.stream()
.map(ControllerNode::id)
.collect(Collectors.toSet())),
- null,
- null,
clusterCommunicator,
- clusterService,
- new File(PARTITIONS_DIR + "/coordination"));
+ clusterService);
partition.open().join();
primitiveCreator = partition.client();
log.info("Started");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ForkedStoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ForkedStoragePartition.java
new file mode 100644
index 0000000..70b12c3
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ForkedStoragePartition.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.io.File;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import io.atomix.protocols.raft.cluster.MemberId;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Partition;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+
+/**
+ * Storage partition forked from a previous version partition.
+ */
+public class ForkedStoragePartition extends StoragePartition {
+ final Partition source;
+
+ public ForkedStoragePartition(
+ Partition partition,
+ Partition source,
+ ClusterCommunicationService clusterCommunicator,
+ ClusterService clusterService) {
+ super(partition, clusterCommunicator, clusterService);
+ this.source = source;
+ }
+
+ @Override
+ public String getName() {
+ return partition.getId().toString();
+ }
+
+ @Override
+ public File getDataFolder() {
+ return new File(PARTITIONS_DIR + partition.getId());
+ }
+
+ @Override
+ protected CompletableFuture<Void> openServer() {
+ StoragePartitionServer server = new StoragePartitionServer(
+ this,
+ MemberId.from(localNodeId.id()),
+ clusterCommunicator);
+
+ CompletableFuture<Void> future;
+ if (partition.getMembers().size() == 1) {
+ future = server.fork(source);
+ } else {
+ future = server.join(partition.getMembers().stream()
+ .filter(nodeId -> !nodeId.equals(localNodeId))
+ .map(nodeId -> MemberId.from(nodeId.id()))
+ .collect(Collectors.toList()));
+ }
+ return future.thenRun(() -> this.server = server);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/InactiveStoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/InactiveStoragePartition.java
new file mode 100644
index 0000000..3924cd7
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/InactiveStoragePartition.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+
+import com.google.common.hash.Hashing;
+import io.atomix.protocols.raft.cluster.MemberId;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Partition;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+
+/**
+ * Storage partition inactive on the local node.
+ */
+public class InactiveStoragePartition extends StoragePartition {
+ static final String INACTIVE_DIR = PARTITIONS_DIR + "archive/";
+
+ public InactiveStoragePartition(
+ Partition partition,
+ ClusterCommunicationService clusterCommunicator,
+ ClusterService clusterService) {
+ super(partition, clusterCommunicator, clusterService);
+ }
+
+ @Override
+ public String getName() {
+ Version version = partition.getVersion();
+ if (version != null) {
+ long hashCode = Hashing.sha256().hashString(version.toString(), StandardCharsets.UTF_8).asLong();
+ return String.format("%s-%d", partition.getId(), hashCode);
+ }
+ return partition.getId().toString();
+ }
+
+ @Override
+ public File getDataFolder() {
+ return new File(INACTIVE_DIR + partition.getId());
+ }
+
+ @Override
+ protected CompletableFuture<Void> openServer() {
+ StoragePartitionServer server = new StoragePartitionServer(
+ this,
+ MemberId.from(localNodeId.id()),
+ clusterCommunicator);
+ return server.open().thenRun(() -> this.server = server);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 8d5440d..74fef44 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -17,8 +17,11 @@
package org.onosproject.store.primitives.impl;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,17 +36,22 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataDiff;
import org.onosproject.cluster.ClusterMetadataEvent;
import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.Member;
+import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionDiff;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
-import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -53,6 +61,9 @@
import org.onosproject.store.primitives.PartitionService;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.PartitionInfo;
+import org.onosproject.upgrade.Upgrade;
+import org.onosproject.upgrade.UpgradeEvent;
+import org.onosproject.upgrade.UpgradeEventListener;
import org.onosproject.upgrade.UpgradeService;
import org.slf4j.Logger;
@@ -68,9 +79,6 @@
public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
implements PartitionService, PartitionAdminService {
- static final String PARTITIONS_DIR =
- System.getProperty("karaf.data") + "/db/partitions/";
-
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -83,44 +91,53 @@
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UpgradeService upgradeService;
+ protected MembershipService membershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected VersionService versionService;
+ protected UpgradeService upgradeService;
private final Map<PartitionId, StoragePartition> inactivePartitions = Maps.newConcurrentMap();
private final Map<PartitionId, StoragePartition> activePartitions = Maps.newConcurrentMap();
private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
- private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+
+ private final ClusterEventListener clusterListener = new InternalClusterEventListener();
+ private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
+ private final ClusterMetadataEventListener metadataListener = new InternalClusterMetadataListener();
@Activate
public void activate() {
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
currentClusterMetadata.set(metadataService.getClusterMetadata());
+
+ clusterService.addListener(clusterListener);
+ upgradeService.addListener(upgradeListener);
metadataService.addListener(metadataListener);
// If an upgrade is currently in progress and this node is an upgraded node, initialize upgrade partitions.
CompletableFuture<Void> openFuture;
if (upgradeService.isUpgrading() && upgradeService.isLocalUpgraded()) {
- Version sourceVersion = upgradeService.getState().source();
- Version targetVersion = upgradeService.getState().target();
currentClusterMetadata.get()
.getPartitions()
.forEach(partition -> {
- inactivePartitions.put(partition.getId(), new StoragePartition(
+ // Create a default partition and assign it to inactive partitions. This node will join
+ // inactive partitions to participate in consensus for fault tolerance, but the partitions
+ // won't be accessible via client proxies.
+ inactivePartitions.put(partition.getId(), new InactiveStoragePartition(
partition,
- sourceVersion,
- null,
clusterCommunicator,
- clusterService,
- new File(PARTITIONS_DIR + sourceVersion + "/" + partition.getId())));
- activePartitions.put(partition.getId(), new StoragePartition(
+ clusterService));
+
+ // Create a forked partition and assign it to active partitions. These partitions will be
+ // forked from commit logs for previous version partitions.
+ Partition forkedPartition = computeInitialPartition(
partition,
- targetVersion,
- sourceVersion,
+ upgradeService.getState().target(),
+ getLocalNodes());
+ activePartitions.put(partition.getId(), new ForkedStoragePartition(
+ forkedPartition,
+ partition,
clusterCommunicator,
- clusterService,
- new File(PARTITIONS_DIR + targetVersion + "/" + partition.getId())));
+ clusterService));
});
// We have to fork existing partitions before we can start inactive partition servers to
@@ -132,16 +149,12 @@
.map(StoragePartition::open)
.toArray(CompletableFuture[]::new)));
} else {
- Version version = versionService.version();
currentClusterMetadata.get()
.getPartitions()
- .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
+ .forEach(partition -> activePartitions.put(partition.getId(), new ActiveStoragePartition(
partition,
- version,
- null,
clusterCommunicator,
- clusterService,
- new File(PARTITIONS_DIR + version + "/" + partition.getId()))));
+ clusterService)));
openFuture = CompletableFuture.allOf(activePartitions.values().stream()
.map(StoragePartition::open)
.toArray(CompletableFuture[]::new));
@@ -153,6 +166,8 @@
@Deactivate
public void deactivate() {
+ clusterService.removeListener(clusterListener);
+ upgradeService.removeListener(upgradeListener);
metadataService.removeListener(metadataListener);
eventDispatcher.removeSink(PartitionEvent.class);
@@ -208,6 +223,132 @@
.collect(Collectors.toList());
}
+ /**
+ * Returns a list of nodes sorted by time ordered oldest to newest.
+ *
+ * @return a list of nodes sorted by time
+ */
+ private List<NodeId> getLocalNodes() {
+ return membershipService.getLocalGroup()
+ .members()
+ .stream()
+ .map(Member::nodeId)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Computes an initial forked partition from the given source partition.
+ *
+ * @param sourcePartition the source partition from which to compute the partition
+ * @param targetVersion the target partition version
+ * @param members the set of members available to the partition
+ * @return the computed forked partition
+ */
+ protected static Partition computeInitialPartition(
+ Partition sourcePartition,
+ Version targetVersion,
+ List<NodeId> members) {
+ return computePartition(sourcePartition, targetVersion, members, 1);
+ }
+
+ /**
+ * Computes a final forked partition from the given source partition.
+ *
+ * @param sourcePartition the source partition from which to compute the partition
+ * @param targetVersion the target partition version
+ * @param members the set of members available to the partition
+ * @return the computed forked partition
+ */
+ protected static Partition computeFinalPartition(
+ Partition sourcePartition,
+ Version targetVersion,
+ List<NodeId> members) {
+ return computePartition(sourcePartition, targetVersion, members, 0);
+ }
+
+ /**
+ * Computes a forked partition from the given source partition.
+ *
+ * @param sourcePartition the source partition from which to compute the partition
+ * @param targetVersion the target partition version
+ * @param members the set of members available to the partition
+ * @param delta the number of additional members to preserve outside the partition
+ * @return the computed forked partition
+ */
+ private static Partition computePartition(
+ Partition sourcePartition,
+ Version targetVersion,
+ List<NodeId> members,
+ int delta) {
+ // Create a collection of members of the forked/isolated partition. Initial membership
+ // will include up to n upgraded nodes until all n nodes in the partition have been upgraded.
+ List<NodeId> sortedMembers = members.stream()
+ .sorted()
+ .collect(Collectors.toList());
+
+ // Create a list of members of the partition that have been upgraded according to the
+ // version isolated cluster membership.
+ List<NodeId> partitionMembers = sortedMembers.stream()
+ .filter(nodeId -> sourcePartition.getMembers().contains(nodeId))
+ .collect(Collectors.toList());
+
+ // If additional members need to be added to the partition to make up a full member list,
+ // add members in sorted order to create deterministic rebalancing of nodes.
+ int totalMembers = sourcePartition.getMembers().size() + delta;
+ if (partitionMembers.size() < totalMembers) {
+ for (int i = partitionMembers.size(); i < totalMembers; i++) {
+ Optional<NodeId> nextMember = sortedMembers.stream()
+ .filter(nodeId -> !partitionMembers.contains(nodeId))
+ .findFirst();
+ if (nextMember.isPresent()) {
+ partitionMembers.add(nextMember.get());
+ } else {
+ break;
+ }
+ }
+ }
+
+ return new DefaultPartition(
+ sourcePartition.getId(),
+ targetVersion,
+ partitionMembers);
+ }
+
+ private void processInstanceReady(NodeId nodeId) {
+ if (upgradeService.isUpgrading() && upgradeService.isLocalUpgraded()) {
+ currentClusterMetadata.get()
+ .getPartitions()
+ .forEach(partition -> {
+ StoragePartition activePartition = activePartitions.get(partition.getId());
+ if (activePartition != null) {
+ Partition newPartition = computeFinalPartition(
+ partition,
+ upgradeService.getState().target(),
+ getLocalNodes());
+ log.info("Updating storage partition {}: {}", partition, newPartition);
+ activePartition.onUpdate(newPartition);
+ }
+ });
+ }
+ }
+
+ private void processUpgradeComplete(Upgrade upgrade) {
+ if (!inactivePartitions.isEmpty()) {
+ List<CompletableFuture<Void>> futures = inactivePartitions.values()
+ .stream()
+ .map(StoragePartition::delete)
+ .collect(Collectors.toList());
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenRun(() -> {
+ try {
+ Files.delete(new File(InactiveStoragePartition.INACTIVE_DIR).toPath());
+ } catch (IOException e) {
+ log.error("Failed to delete partition archive");
+ }
+ });
+ inactivePartitions.clear();
+ }
+ }
+
private void processMetadataUpdate(ClusterMetadata clusterMetadata) {
ClusterMetadataDiff diffExaminer =
new ClusterMetadataDiff(currentClusterMetadata.get(), clusterMetadata);
@@ -218,6 +359,24 @@
.forEach(diff -> activePartitions.get(diff.partitionId()).onUpdate(diff.newValue()));
}
+ private class InternalClusterEventListener implements ClusterEventListener {
+ @Override
+ public void event(ClusterEvent event) {
+ if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
+ processInstanceReady(event.subject().id());
+ }
+ }
+ }
+
+ private class InternalUpgradeEventListener implements UpgradeEventListener {
+ @Override
+ public void event(UpgradeEvent event) {
+ if (event.type() == UpgradeEvent.Type.COMMITTED) {
+ processUpgradeComplete(event.subject());
+ }
+ }
+ }
+
private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
@Override
public void event(ClusterMetadataEvent event) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 5458edd..c419d98 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -51,18 +51,17 @@
/**
* Storage partition.
*/
-public class StoragePartition implements Managed<StoragePartition> {
+public abstract class StoragePartition implements Managed<StoragePartition> {
- private final AtomicBoolean isOpened = new AtomicBoolean(false);
- private final ClusterCommunicationService clusterCommunicator;
- private final ClusterService clusterService;
- private final Version version;
- private final Version source;
- private final File dataFolder;
- private Partition partition;
- private NodeId localNodeId;
- private StoragePartitionServer server;
- private StoragePartitionClient client;
+ static final String PARTITIONS_DIR =
+ System.getProperty("karaf.data") + "/db/partitions/";
+
+ protected final AtomicBoolean isOpened = new AtomicBoolean(false);
+ protected final ClusterCommunicationService clusterCommunicator;
+ protected Partition partition;
+ protected NodeId localNodeId;
+ protected StoragePartitionServer server;
+ protected StoragePartitionClient client;
public static final Map<String, Supplier<RaftService>> RAFT_SERVICES =
ImmutableMap.<String, Supplier<RaftService>>builder()
@@ -83,18 +82,11 @@
public StoragePartition(
Partition partition,
- Version version,
- Version source,
ClusterCommunicationService clusterCommunicator,
- ClusterService clusterService,
- File dataFolder) {
+ ClusterService clusterService) {
this.partition = partition;
- this.version = version;
- this.source = source;
this.clusterCommunicator = clusterCommunicator;
- this.clusterService = clusterService;
this.localNodeId = clusterService.getLocalNode().id();
- this.dataFolder = dataFolder;
}
/**
@@ -107,12 +99,7 @@
@Override
public CompletableFuture<Void> open() {
- if (source != null) {
- return forkServer(source)
- .thenCompose(v -> openClient())
- .thenAccept(v -> isOpened.set(true))
- .thenApply(v -> null);
- } else if (partition.getMembers().contains(localNodeId)) {
+ if (partition.getMembers().contains(localNodeId)) {
return openServer()
.thenCompose(v -> openClient())
.thenAccept(v -> isOpened.set(true))
@@ -131,22 +118,35 @@
}
/**
+ * Deletes the partition.
+ *
+ * @return future to be completed once the partition has been deleted
+ */
+ public CompletableFuture<Void> delete() {
+ return closeServer().thenCompose(v -> closeClient()).thenRun(() -> deleteServer());
+ }
+
+ /**
+ * Returns the partition data folder.
+ *
+ * @return the partition data folder
+ */
+ public abstract File getDataFolder();
+
+ /**
* Returns the partition name.
*
* @return the partition name
*/
- public String getName() {
- return getName(version);
- }
+ public abstract String getName();
/**
- * Returns the partition name for the given version.
+ * Returns the identifier of the {@link Partition partition} associated with this instance.
*
- * @param version the version for which to return the partition name
- * @return the partition name for the given version
+ * @return partition identifier
*/
- String getName(Version version) {
- return version != null ? String.format("partition-%d-%s", partition.getId().id(), version) : "partition-core";
+ public PartitionId getId() {
+ return partition.getId();
}
/**
@@ -155,24 +155,7 @@
* @return the partition version
*/
public Version getVersion() {
- return version;
- }
-
- /**
- * Returns the partition data folder.
- *
- * @return the partition data folder
- */
- public File getDataFolder() {
- return dataFolder;
- }
-
- /**
- * Returns the identifier of the {@link Partition partition} associated with this instance.
- * @return partition identifier
- */
- public PartitionId getId() {
- return partition.getId();
+ return partition.getVersion();
}
/**
@@ -188,64 +171,14 @@
* @return partition member identifiers
*/
public Collection<MemberId> getMemberIds() {
- return source != null ?
- clusterService.getNodes()
- .stream()
- .filter(node -> {
- Version nodeVersion = clusterService.getVersion(node.id());
- return nodeVersion != null && nodeVersion.equals(version);
- })
- .map(node -> MemberId.from(node.id().id()))
- .collect(Collectors.toList()) :
- Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
- }
-
- Collection<MemberId> getMemberIds(Version version) {
- if (source == null || version.equals(source)) {
- return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
- } else {
- return clusterService.getNodes()
- .stream()
- .filter(node -> {
- Version nodeVersion = clusterService.getVersion(node.id());
- return nodeVersion != null && nodeVersion.equals(version);
- })
- .map(node -> MemberId.from(node.id().id()))
- .collect(Collectors.toList());
- }
+ return Collections2.transform(getMembers(), n -> MemberId.from(n.id()));
}
/**
* Attempts to rejoin the partition.
* @return future that is completed after the operation is complete
*/
- private CompletableFuture<Void> openServer() {
- StoragePartitionServer server = new StoragePartitionServer(
- this,
- MemberId.from(localNodeId.id()),
- clusterCommunicator);
- return server.open().thenRun(() -> this.server = server);
- }
-
- /**
- * Forks the server from the given version.
- *
- * @return future to be completed once the server has been forked
- */
- private CompletableFuture<Void> forkServer(Version version) {
- StoragePartitionServer server = new StoragePartitionServer(
- this,
- MemberId.from(localNodeId.id()),
- clusterCommunicator);
-
- CompletableFuture<Void> future;
- if (getMemberIds().size() == 1) {
- future = server.fork(version);
- } else {
- future = server.join(getMemberIds());
- }
- return future.thenRun(() -> this.server = server);
- }
+ protected abstract CompletableFuture<Void> openServer();
/**
* Attempts to join the partition as a new member.
@@ -267,7 +200,7 @@
client = new StoragePartitionClient(this,
MemberId.from(localNodeId.id()),
new RaftClientCommunicator(
- getName(),
+ String.format("partition-%s-%s", partition.getId(), partition.getVersion()),
Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
clusterCommunicator));
return client.open().thenApply(v -> client);
@@ -278,7 +211,9 @@
* @return future that is completed when the operation completes
*/
public CompletableFuture<Void> leaveCluster() {
- return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
+ return server != null
+ ? server.closeAndExit().thenRun(() -> server.delete())
+ : CompletableFuture.completedFuture(null);
}
@Override
@@ -286,6 +221,19 @@
return isOpened.get();
}
+ private CompletableFuture<Void> closeServer() {
+ if (server != null) {
+ return server.close();
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private void deleteServer() {
+ if (server != null) {
+ server.delete();
+ }
+ }
+
private CompletableFuture<Void> closeClient() {
if (client != null) {
return client.close();
@@ -306,8 +254,7 @@
* Process updates to partitions and handles joining or leaving a partition.
* @param newValue new Partition
*/
- public void onUpdate(Partition newValue) {
-
+ void onUpdate(Partition newValue) {
boolean wasPresent = partition.getMembers().contains(localNodeId);
boolean isPresent = newValue.getMembers().contains(localNodeId);
this.partition = newValue;
@@ -315,7 +262,7 @@
// no action needed
return;
}
- //only need to do action if our membership changed
+ // Only need to do action if our membership changed
if (wasPresent) {
leaveCluster();
} else if (isPresent) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index dbe712d..c9cfe53 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -17,17 +17,21 @@
import java.io.File;
import java.io.IOException;
+import java.nio.file.FileVisitResult;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.cluster.MemberId;
-import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
-import org.onosproject.core.Version;
+import org.onosproject.cluster.Partition;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
import org.onosproject.store.service.PartitionInfo;
@@ -101,23 +105,46 @@
}
/**
+ * Deletes the server.
+ */
+ public void delete() {
+ try {
+ Files.walkFileTree(partition.getDataFolder().toPath(), new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } catch (IOException e) {
+ log.error("Failed to delete partition: {}", e);
+ }
+ }
+
+ /**
* Forks the existing partition into a new partition.
*
- * @param version the version from which to fork the server
+ * @param fromPartition the partition from which to fork the server
* @return future to be completed once the fork operation is complete
*/
- public CompletableFuture<Void> fork(Version version) {
- log.info("Forking server for partition {} ({}->{})", partition.getId(), version, partition.getVersion());
+ public CompletableFuture<Void> fork(Partition fromPartition) {
+ log.info("Forking server for partition {} ({}->{})",
+ partition.getId(), fromPartition.getVersion(), partition.getVersion());
RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
- .withName(partition.getName(version))
- .withType(RaftMember.Type.PASSIVE)
+ .withName(String.format("partition-%s", fromPartition.getId()))
.withProtocol(new RaftServerCommunicator(
- partition.getName(version),
+ String.format("partition-%s-%s", fromPartition.getId(), fromPartition.getVersion()),
Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
clusterCommunicator))
.withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
.withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
.withStorage(RaftStorage.newBuilder()
+ .withPrefix(String.format("partition-%s", partition.getId()))
.withStorageLevel(StorageLevel.MAPPED)
.withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
.withDirectory(partition.getDataFolder())
@@ -125,12 +152,27 @@
.build());
StoragePartition.RAFT_SERVICES.forEach(builder::addService);
RaftServer server = builder.build();
- return server.join(partition.getMemberIds(version))
- .thenCompose(v -> server.shutdown())
+
+ // Create a collection of members currently in the source partition.
+ Collection<MemberId> members = fromPartition.getMembers()
+ .stream()
+ .map(id -> MemberId.from(id.id()))
+ .collect(Collectors.toList());
+
+ // If this node is a member of the partition, join the partition. Otherwise, listen to the partition.
+ CompletableFuture<RaftServer> future = members.contains(localMemberId)
+ ? server.bootstrap(members) : server.listen(members);
+
+ // TODO: We should leave the cluster for nodes that aren't normally members to ensure the source
+ // cluster's configuration is kept consistent for rolling back upgrades, but Atomix deletes configuration
+ // files when a node leaves the cluster so we can't do that here.
+ return future.thenCompose(v -> server.shutdown())
.thenCompose(v -> {
// Delete the cluster configuration file from the forked partition.
try {
- Files.delete(new File(partition.getDataFolder(), "atomix.conf").toPath());
+ Files.delete(new File(
+ partition.getDataFolder(),
+ String.format("partition-%s.conf", partition.getId())).toPath());
} catch (IOException e) {
log.error("Failed to delete partition configuration: {}", e);
}
@@ -141,24 +183,25 @@
}).whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully forked server for partition {} ({}->{})",
- partition.getId(), version, partition.getVersion());
+ partition.getId(), fromPartition.getVersion(), partition.getVersion());
} else {
log.info("Failed to fork server for partition {} ({}->{})",
- partition.getId(), version, partition.getVersion(), e);
+ partition.getId(), fromPartition.getVersion(), partition.getVersion(), e);
}
}).thenApply(v -> null);
}
private RaftServer buildServer() {
RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
- .withName(partition.getName())
+ .withName(String.format("partition-%s", partition.getId()))
.withProtocol(new RaftServerCommunicator(
- partition.getName(),
+ String.format("partition-%s-%s", partition.getId(), partition.getVersion()),
Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
clusterCommunicator))
.withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
.withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
.withStorage(RaftStorage.newBuilder()
+ .withPrefix(String.format("partition-%s", partition.getId()))
.withStorageLevel(StorageLevel.MAPPED)
.withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
.withDirectory(partition.getDataFolder())
@@ -169,13 +212,13 @@
}
public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
- log.info("Joining partition {} ({})", partition.getId(), partition.getVersion());
+ log.info("Joining partition {} ({})", partition.getId(), partition.getName());
server = buildServer();
return server.join(otherMembers).whenComplete((r, e) -> {
if (e == null) {
- log.info("Successfully joined partition {} ({})", partition.getId(), partition.getVersion());
+ log.info("Successfully joined partition {} ({})", partition.getId(), partition.getName());
} else {
- log.info("Failed to join partition {} ({})", partition.getId(), partition.getVersion(), e);
+ log.info("Failed to join partition {} ({})", partition.getId(), partition.getName(), e);
}
}).thenApply(v -> null);
}