[ONOS-7546] Simplify partitioning scheme for ISSU
Change-Id: I417e16bd7b97ba8e77ed5ce4f6224487ed13a1c2
diff --git a/core/api/src/main/java/org/onosproject/cluster/PartitionId.java b/core/api/src/main/java/org/onosproject/cluster/PartitionId.java
index 39852de..f8b65e7 100644
--- a/core/api/src/main/java/org/onosproject/cluster/PartitionId.java
+++ b/core/api/src/main/java/org/onosproject/cluster/PartitionId.java
@@ -25,11 +25,6 @@
public class PartitionId extends Identifier<Integer> implements Comparable<PartitionId> {
/**
- * The {@code PartitionId} for the shared coordination partition.
- */
- public static final PartitionId SHARED = PartitionId.from(0);
-
- /**
* Creates a partition identifier from an integer.
*
* @param id input integer
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ActiveStoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ActiveStoragePartition.java
deleted file mode 100644
index 5e9301d..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ActiveStoragePartition.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.io.File;
-import java.util.concurrent.CompletableFuture;
-
-import io.atomix.protocols.raft.cluster.MemberId;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Partition;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-
-/**
- * Storage partition active on the local node.
- */
-public class ActiveStoragePartition extends StoragePartition {
- public ActiveStoragePartition(
- Partition partition,
- ClusterCommunicationService clusterCommunicator,
- ClusterService clusterService) {
- super(partition, clusterCommunicator, clusterService);
- }
-
- @Override
- public String getName() {
- return partition.getId().toString();
- }
-
- @Override
- public File getDataFolder() {
- return new File(PARTITIONS_DIR + partition.getId());
- }
-
- @Override
- protected CompletableFuture<Void> openServer() {
- StoragePartitionServer server = new StoragePartitionServer(
- this,
- MemberId.from(localNodeId.id()),
- clusterCommunicator);
- return server.open().thenRun(() -> this.server = server);
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index e7ed20a..fcb90f4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -85,9 +85,9 @@
@Activate
public void activate() {
- partition = new ActiveStoragePartition(
+ partition = new StoragePartition(
new DefaultPartition(
- PartitionId.SHARED,
+ PartitionId.from(0),
null,
clusterService.getNodes()
.stream()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ForkedStoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ForkedStoragePartition.java
deleted file mode 100644
index 70b12c3..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ForkedStoragePartition.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.io.File;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-
-import io.atomix.protocols.raft.cluster.MemberId;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Partition;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-
-/**
- * Storage partition forked from a previous version partition.
- */
-public class ForkedStoragePartition extends StoragePartition {
- final Partition source;
-
- public ForkedStoragePartition(
- Partition partition,
- Partition source,
- ClusterCommunicationService clusterCommunicator,
- ClusterService clusterService) {
- super(partition, clusterCommunicator, clusterService);
- this.source = source;
- }
-
- @Override
- public String getName() {
- return partition.getId().toString();
- }
-
- @Override
- public File getDataFolder() {
- return new File(PARTITIONS_DIR + partition.getId());
- }
-
- @Override
- protected CompletableFuture<Void> openServer() {
- StoragePartitionServer server = new StoragePartitionServer(
- this,
- MemberId.from(localNodeId.id()),
- clusterCommunicator);
-
- CompletableFuture<Void> future;
- if (partition.getMembers().size() == 1) {
- future = server.fork(source);
- } else {
- future = server.join(partition.getMembers().stream()
- .filter(nodeId -> !nodeId.equals(localNodeId))
- .map(nodeId -> MemberId.from(nodeId.id()))
- .collect(Collectors.toList()));
- }
- return future.thenRun(() -> this.server = server);
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/InactiveStoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/InactiveStoragePartition.java
deleted file mode 100644
index 3924cd7..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/InactiveStoragePartition.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.io.File;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CompletableFuture;
-
-import com.google.common.hash.Hashing;
-import io.atomix.protocols.raft.cluster.MemberId;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.Partition;
-import org.onosproject.core.Version;
-import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
-
-/**
- * Storage partition inactive on the local node.
- */
-public class InactiveStoragePartition extends StoragePartition {
- static final String INACTIVE_DIR = PARTITIONS_DIR + "archive/";
-
- public InactiveStoragePartition(
- Partition partition,
- ClusterCommunicationService clusterCommunicator,
- ClusterService clusterService) {
- super(partition, clusterCommunicator, clusterService);
- }
-
- @Override
- public String getName() {
- Version version = partition.getVersion();
- if (version != null) {
- long hashCode = Hashing.sha256().hashString(version.toString(), StandardCharsets.UTF_8).asLong();
- return String.format("%s-%d", partition.getId(), hashCode);
- }
- return partition.getId().toString();
- }
-
- @Override
- public File getDataFolder() {
- return new File(INACTIVE_DIR + partition.getId());
- }
-
- @Override
- protected CompletableFuture<Void> openServer() {
- StoragePartitionServer server = new StoragePartitionServer(
- this,
- MemberId.from(localNodeId.id()),
- clusterCommunicator);
- return server.open().thenRun(() -> this.server = server);
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 5f22d15..28cc69e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -16,12 +16,8 @@
package org.onosproject.store.primitives.impl;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
@@ -36,22 +32,15 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
-import org.onosproject.cluster.ClusterEvent;
-import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataDiff;
import org.onosproject.cluster.ClusterMetadataEvent;
import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.DefaultPartition;
-import org.onosproject.cluster.Member;
-import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionDiff;
import org.onosproject.cluster.PartitionId;
-import org.onosproject.core.Version;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -61,10 +50,6 @@
import org.onosproject.store.primitives.PartitionService;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.PartitionInfo;
-import org.onosproject.upgrade.Upgrade;
-import org.onosproject.upgrade.UpgradeEvent;
-import org.onosproject.upgrade.UpgradeEventListener;
-import org.onosproject.upgrade.UpgradeService;
import org.slf4j.Logger;
import static org.onosproject.security.AppGuard.checkPermission;
@@ -90,18 +75,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected MembershipService membershipService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UpgradeService upgradeService;
-
- private final Map<PartitionId, StoragePartition> inactivePartitions = Maps.newConcurrentMap();
- private final Map<PartitionId, StoragePartition> activePartitions = Maps.newConcurrentMap();
+ private final Map<PartitionId, StoragePartition> partitions = Maps.newConcurrentMap();
private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
- private final ClusterEventListener clusterListener = new InternalClusterEventListener();
- private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
private final ClusterMetadataEventListener metadataListener = new InternalClusterMetadataListener();
@Activate
@@ -109,101 +85,55 @@
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
currentClusterMetadata.set(metadataService.getClusterMetadata());
- clusterService.addListener(clusterListener);
- upgradeService.addListener(upgradeListener);
metadataService.addListener(metadataListener);
- // If an upgrade is currently in progress and this node is an upgraded node, initialize upgrade partitions.
- CompletableFuture<Void> openFuture;
- if (upgradeService.isUpgrading() && upgradeService.isLocalUpgraded()) {
- currentClusterMetadata.get()
- .getPartitions()
- .forEach(partition -> {
- // Create a default partition and assign it to inactive partitions. This node will join
- // inactive partitions to participate in consensus for fault tolerance, but the partitions
- // won't be accessible via client proxies.
- inactivePartitions.put(partition.getId(), new InactiveStoragePartition(
- partition,
- clusterCommunicator,
- clusterService));
-
- // Create a forked partition and assign it to active partitions. These partitions will be
- // forked from commit logs for previous version partitions.
- Partition forkedPartition = computeInitialPartition(
- partition,
- upgradeService.getState().target(),
- getLocalNodes());
- activePartitions.put(partition.getId(), new ForkedStoragePartition(
- forkedPartition,
- partition,
- clusterCommunicator,
- clusterService));
- });
-
- // We have to fork existing partitions before we can start inactive partition servers to
- // avoid duplicate message handlers when both servers are running.
- openFuture = CompletableFuture.allOf(activePartitions.values().stream()
- .map(StoragePartition::open)
- .toArray(CompletableFuture[]::new))
- .thenCompose(v -> CompletableFuture.allOf(inactivePartitions.values().stream()
- .map(StoragePartition::open)
- .toArray(CompletableFuture[]::new)));
- } else {
- currentClusterMetadata.get()
- .getPartitions()
- .forEach(partition -> activePartitions.put(partition.getId(), new ActiveStoragePartition(
- partition,
- clusterCommunicator,
- clusterService)));
- openFuture = CompletableFuture.allOf(activePartitions.values().stream()
- .map(StoragePartition::open)
- .toArray(CompletableFuture[]::new));
- }
-
- openFuture.join();
+ currentClusterMetadata.get()
+ .getPartitions()
+ .forEach(partition -> partitions.put(partition.getId(), new StoragePartition(
+ partition,
+ clusterCommunicator,
+ clusterService)));
+ CompletableFuture.allOf(partitions.values().stream()
+ .map(StoragePartition::open)
+ .toArray(CompletableFuture[]::new))
+ .join();
log.info("Started");
}
@Deactivate
public void deactivate() {
- clusterService.removeListener(clusterListener);
- upgradeService.removeListener(upgradeListener);
metadataService.removeListener(metadataListener);
eventDispatcher.removeSink(PartitionEvent.class);
- CompletableFuture<Void> closeFuture = CompletableFuture.allOf(
- CompletableFuture.allOf(inactivePartitions.values().stream()
- .map(StoragePartition::close)
- .toArray(CompletableFuture[]::new)),
- CompletableFuture.allOf(activePartitions.values().stream()
- .map(StoragePartition::close)
- .toArray(CompletableFuture[]::new)));
- closeFuture.join();
+ CompletableFuture.allOf(partitions.values().stream()
+ .map(StoragePartition::close)
+ .toArray(CompletableFuture[]::new))
+ .join();
log.info("Stopped");
}
@Override
public int getNumberOfPartitions() {
checkPermission(PARTITION_READ);
- return activePartitions.size();
+ return partitions.size();
}
@Override
public Set<PartitionId> getAllPartitionIds() {
checkPermission(PARTITION_READ);
- return activePartitions.keySet();
+ return partitions.keySet();
}
@Override
public DistributedPrimitiveCreator getDistributedPrimitiveCreator(PartitionId partitionId) {
checkPermission(PARTITION_READ);
- return activePartitions.get(partitionId).client();
+ return partitions.get(partitionId).client();
}
@Override
public Set<NodeId> getConfiguredMembers(PartitionId partitionId) {
checkPermission(PARTITION_READ);
- StoragePartition partition = activePartitions.get(partitionId);
+ StoragePartition partition = partitions.get(partitionId);
return ImmutableSet.copyOf(partition.getMembers());
}
@@ -217,138 +147,12 @@
@Override
public List<PartitionInfo> partitionInfo() {
- return activePartitions.values()
+ return partitions.values()
.stream()
.flatMap(x -> Tools.stream(x.info()))
.collect(Collectors.toList());
}
- /**
- * Returns a list of nodes sorted by time ordered oldest to newest.
- *
- * @return a list of nodes sorted by time
- */
- private List<NodeId> getLocalNodes() {
- return membershipService.getLocalGroup()
- .members()
- .stream()
- .map(Member::nodeId)
- .collect(Collectors.toList());
- }
-
- /**
- * Computes an initial forked partition from the given source partition.
- *
- * @param sourcePartition the source partition from which to compute the partition
- * @param targetVersion the target partition version
- * @param members the set of members available to the partition
- * @return the computed forked partition
- */
- protected static Partition computeInitialPartition(
- Partition sourcePartition,
- Version targetVersion,
- List<NodeId> members) {
- return computePartition(sourcePartition, targetVersion, members, 1);
- }
-
- /**
- * Computes a final forked partition from the given source partition.
- *
- * @param sourcePartition the source partition from which to compute the partition
- * @param targetVersion the target partition version
- * @param members the set of members available to the partition
- * @return the computed forked partition
- */
- protected static Partition computeFinalPartition(
- Partition sourcePartition,
- Version targetVersion,
- List<NodeId> members) {
- return computePartition(sourcePartition, targetVersion, members, 0);
- }
-
- /**
- * Computes a forked partition from the given source partition.
- *
- * @param sourcePartition the source partition from which to compute the partition
- * @param targetVersion the target partition version
- * @param members the set of members available to the partition
- * @param delta the number of additional members to preserve outside the partition
- * @return the computed forked partition
- */
- private static Partition computePartition(
- Partition sourcePartition,
- Version targetVersion,
- List<NodeId> members,
- int delta) {
- // Create a collection of members of the forked/isolated partition. Initial membership
- // will include up to n upgraded nodes until all n nodes in the partition have been upgraded.
- List<NodeId> sortedMembers = members.stream()
- .sorted()
- .collect(Collectors.toList());
-
- // Create a list of members of the partition that have been upgraded according to the
- // version isolated cluster membership.
- List<NodeId> partitionMembers = sortedMembers.stream()
- .filter(nodeId -> sourcePartition.getMembers().contains(nodeId))
- .collect(Collectors.toList());
-
- // If additional members need to be added to the partition to make up a full member list,
- // add members in sorted order to create deterministic rebalancing of nodes.
- int totalMembers = sourcePartition.getMembers().size() + delta;
- if (partitionMembers.size() < totalMembers) {
- for (int i = partitionMembers.size(); i < totalMembers; i++) {
- Optional<NodeId> nextMember = sortedMembers.stream()
- .filter(nodeId -> !partitionMembers.contains(nodeId))
- .findFirst();
- if (nextMember.isPresent()) {
- partitionMembers.add(nextMember.get());
- } else {
- break;
- }
- }
- }
-
- return new DefaultPartition(
- sourcePartition.getId(),
- targetVersion,
- partitionMembers);
- }
-
- private void processInstanceReady(NodeId nodeId) {
- if (upgradeService.isUpgrading() && upgradeService.isLocalUpgraded()) {
- currentClusterMetadata.get()
- .getPartitions()
- .forEach(partition -> {
- StoragePartition activePartition = activePartitions.get(partition.getId());
- if (activePartition != null) {
- Partition newPartition = computeFinalPartition(
- partition,
- upgradeService.getState().target(),
- getLocalNodes());
- log.info("Updating storage partition {}: {}", partition, newPartition);
- activePartition.onUpdate(newPartition);
- }
- });
- }
- }
-
- private void processUpgradeComplete(Upgrade upgrade) {
- if (!inactivePartitions.isEmpty()) {
- List<CompletableFuture<Void>> futures = inactivePartitions.values()
- .stream()
- .map(StoragePartition::delete)
- .collect(Collectors.toList());
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenRun(() -> {
- try {
- Files.delete(new File(InactiveStoragePartition.INACTIVE_DIR).toPath());
- } catch (IOException e) {
- log.error("Failed to delete partition archive");
- }
- });
- inactivePartitions.clear();
- }
- }
-
private void processMetadataUpdate(ClusterMetadata clusterMetadata) {
ClusterMetadataDiff diffExaminer =
new ClusterMetadataDiff(currentClusterMetadata.get(), clusterMetadata);
@@ -356,28 +160,10 @@
.values()
.stream()
.filter(PartitionDiff::hasChanged)
- .forEach(diff -> activePartitions.get(diff.partitionId()).onUpdate(diff.newValue()));
+ .forEach(diff -> partitions.get(diff.partitionId()).onUpdate(diff.newValue()));
currentClusterMetadata.set(clusterMetadata);
}
- private class InternalClusterEventListener implements ClusterEventListener {
- @Override
- public void event(ClusterEvent event) {
- if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
- processInstanceReady(event.subject().id());
- }
- }
- }
-
- private class InternalUpgradeEventListener implements UpgradeEventListener {
- @Override
- public void event(UpgradeEvent event) {
- if (event.type() == UpgradeEvent.Type.COMMITTED) {
- processUpgradeComplete(event.subject());
- }
- }
- }
-
private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
@Override
public void event(ClusterMetadataEvent event) {
@@ -387,7 +173,7 @@
@Override
public List<PartitionClientInfo> partitionClientInfo() {
- return activePartitions.values()
+ return partitions.values()
.stream()
.map(StoragePartition::client)
.map(StoragePartitionClient::clientInfo)
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index de3229f..693cf4a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -15,6 +15,13 @@
*/
package org.onosproject.store.primitives.impl;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -63,13 +70,6 @@
import org.onosproject.store.service.WorkQueueStats;
import org.slf4j.Logger;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
-
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
import static org.slf4j.LoggerFactory.getLogger;
@@ -112,7 +112,6 @@
public void activate() {
Map<PartitionId, DistributedPrimitiveCreator> partitionMap = Maps.newHashMap();
partitionService.getAllPartitionIds().stream()
- .filter(id -> !id.equals(PartitionId.SHARED))
.forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap, BUCKETS);
transactionManager = new TransactionManager(this, partitionService, BUCKETS);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 9458ec5..2a29114 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -52,7 +52,7 @@
/**
* Storage partition.
*/
-public abstract class StoragePartition implements Managed<StoragePartition> {
+public class StoragePartition implements Managed<StoragePartition> {
static final String PARTITIONS_DIR =
System.getProperty("karaf.data") + "/db/partitions/";
@@ -133,14 +133,18 @@
*
* @return the partition data folder
*/
- public abstract File getDataFolder();
+ public File getDataFolder() {
+ return new File(PARTITIONS_DIR + partition.getId());
+ }
/**
* Returns the partition name.
*
* @return the partition name
*/
- public abstract String getName();
+ public String getName() {
+ return partition.getId().toString();
+ }
/**
* Returns the identifier of the {@link Partition partition} associated with this instance.
@@ -180,7 +184,13 @@
* Attempts to rejoin the partition.
* @return future that is completed after the operation is complete
*/
- protected abstract CompletableFuture<Void> openServer();
+ protected CompletableFuture<Void> openServer() {
+ StoragePartitionServer server = new StoragePartitionServer(
+ this,
+ MemberId.from(localNodeId.id()),
+ clusterCommunicator);
+ return server.open().thenRun(() -> this.server = server);
+ }
/**
* Attempts to join the partition as a new member.
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionManagerTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionManagerTest.java
deleted file mode 100644
index c93b10f..0000000
--- a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionManagerTest.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Copyright 2017-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.store.primitives.impl;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.junit.Test;
-import org.onosproject.cluster.DefaultPartition;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.Partition;
-import org.onosproject.cluster.PartitionId;
-import org.onosproject.core.Version;
-
-import static org.junit.Assert.assertTrue;
-
-/**
- * Partition manager test.
- */
-public class PartitionManagerTest {
-
- @Test
- public void testComputeInitialIncompletePartition() throws Exception {
- Partition sourcePartition = new DefaultPartition(
- PartitionId.from(1),
- Version.version("1.0.0"),
- Arrays.asList(
- NodeId.nodeId("1"),
- NodeId.nodeId("2"),
- NodeId.nodeId("3")));
- Version targetVersion = Version.version("1.0.1");
- List<NodeId> members = Arrays.asList(
- NodeId.nodeId("1"),
- NodeId.nodeId("2"));
- Partition forkedPartition = PartitionManager.computeInitialPartition(sourcePartition, targetVersion, members);
- assertTrue(forkedPartition.getMembers().size() == 2);
- assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("1")));
- assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("2")));
- }
-
- @Test
- public void testComputeInitialCompletePartition() throws Exception {
- Partition sourcePartition = new DefaultPartition(
- PartitionId.from(1),
- Version.version("1.0.0"),
- Arrays.asList(
- NodeId.nodeId("3"),
- NodeId.nodeId("4"),
- NodeId.nodeId("5")));
- Version targetVersion = Version.version("1.0.1");
- List<NodeId> members = Arrays.asList(
- NodeId.nodeId("1"),
- NodeId.nodeId("2"),
- NodeId.nodeId("3"),
- NodeId.nodeId("4"),
- NodeId.nodeId("5"));
- Partition forkedPartition = PartitionManager.computeInitialPartition(sourcePartition, targetVersion, members);
- assertTrue(forkedPartition.getMembers().size() == 4);
- assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("1")));
- assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("3")));
- assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("4")));
- assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("5")));
- }
-
- @Test
- public void testComputeFinalIncompletePartition() throws Exception {
- Partition sourcePartition = new DefaultPartition(
- PartitionId.from(1),
- Version.version("1.0.0"),
- Arrays.asList(
- NodeId.nodeId("1"),
- NodeId.nodeId("2"),
- NodeId.nodeId("3")));
- Version targetVersion = Version.version("1.0.1");
- List<NodeId> members = Arrays.asList(
- NodeId.nodeId("1"),
- NodeId.nodeId("2"));
- Partition forkedPartition = PartitionManager.computeFinalPartition(sourcePartition, targetVersion, members);
- assertTrue(forkedPartition.getMembers().size() == 2);
- assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("1")));
- assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("2")));
- }
-
- @Test
- public void testComputeFinalCompletePartition() throws Exception {
- Partition sourcePartition = new DefaultPartition(
- PartitionId.from(1),
- Version.version("1.0.0"),
- Arrays.asList(
- NodeId.nodeId("3"),
- NodeId.nodeId("4"),
- NodeId.nodeId("5")));
- Version targetVersion = Version.version("1.0.1");
- List<NodeId> members = Arrays.asList(
- NodeId.nodeId("1"),
- NodeId.nodeId("2"),
- NodeId.nodeId("3"),
- NodeId.nodeId("4"),
- NodeId.nodeId("5"));
- Partition forkedPartition = PartitionManager.computeFinalPartition(sourcePartition, targetVersion, members);
- assertTrue(forkedPartition.getMembers().size() == 3);
- assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("3")));
- assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("4")));
- assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("5")));
- }
-
-}
diff --git a/tools/test/scenarios/upgrade-rollback.xml b/tools/test/scenarios/upgrade-rollback.xml
index 13f4403..a8c2919 100644
--- a/tools/test/scenarios/upgrade-rollback.xml
+++ b/tools/test/scenarios/upgrade-rollback.xml
@@ -20,16 +20,8 @@
<group name="Upgrade-Rollback" requires="Distributed-Primitives-Setup">
<step name="Push-Bits" exec="onos-push-bits-through-proxy" if="${OCT}"/>
- <step name="Initialize-ECM"
- exec="onos ${OC1} ec-map-test foo put a b"/>
-
- <step name="Initialize-CM"
- exec="onos ${OC1} map-test foo put a b"
- requires="Initialize-ECM"/>
-
<step name="Initialize-Upgrade"
- exec="onos ${OC1} issu init"
- requires="Initialize-ECM"/>
+ exec="onos ${OC1} issu init"/>
<group name="Phase-1">
<sequential var="${OCMI#}"
@@ -67,52 +59,12 @@
<step name="Phase-One-Distributed-Primitives-Check-Apps-${#}"
exec="onos-check-apps ${OCMI#} distributedprimitives includes"
requires="Phase-One-Wait-for-Start-${#}"/>
-
- <step name="Phase-One-Check-ECM-${#}"
- exec="onos-execute-expect ${OCMI#} ec-map-test foo get a --retry 5 --expect b"
- requires="Phase-One-Distributed-Primitives-Check-Apps-${#}"/>
-
- <step name="Phase-One-Check-CM-${#}"
- exec="onos-execute-expect ${OCMI#} map-test foo get a --retry 5 --expect b"
- requires="Phase-One-Check-ECM-${#}"/>
</sequential>
</group>
- <group name="Change-ECM-values" requires="Phase-1">
- <step name="Change-Majority-ECM"
- exec="onos ${OCMA1} ec-map-test foo put a d"/>
-
- <step name="Change-Majority-CM"
- exec="onos ${OCMA1} map-test foo put a d"/>
-
- <step name="Change-Minority-ECM"
- exec="onos ${OCMI1} ec-map-test foo put a c"/>
-
- <step name="Change-Minority-CM"
- exec="onos ${OCMI1} map-test foo put a c"/>
- </group>
-
- <group name="Check-ECM-values" requires="Change-ECM-values">
- <parallel var="${OCMI#}">
- <step name="Parallel-Check-Minority-ECM-${#}"
- exec="onos-execute-expect ${OCMI#} ec-map-test foo get a --retry 5 --expect c"/>
-
- <step name="Parallel-Check-Minority-CM-${#}"
- exec="onos-execute-expect ${OCMI#} map-test foo get a --retry 5 --expect c"/>
- </parallel>
-
- <parallel var="${OCMA#}">
- <step name="Parallel-Check-Majority-ECM-${#}"
- exec="onos-execute-expect ${OCMA#} ec-map-test foo get a --retry 5 --expect d"/>
-
- <step name="Parallel-Check-Majority-CM-${#}"
- exec="onos-execute-expect ${OCMA#} map-test foo get a --retry 5 --expect d"/>
- </parallel>
- </group>
-
<step name="Run-Upgrade"
exec="onos ${OC1} issu upgrade"
- requires="Check-ECM-values"/>
+ requires="Phase-1"/>
<step name="Run-Rollback"
exec="onos ${OC1} issu rollback"
@@ -154,14 +106,6 @@
<step name="Phase-Two-Distributed-Primitives-Check-Apps-${#}"
exec="onos-check-apps ${OCMA#} distributedprimitives includes"
requires="Phase-Two-Wait-for-Start-${#}"/>
-
- <step name="Phase-Two-Check-ECM-${#}"
- exec="onos-execute-expect ${OC#} ec-map-test foo get a --retry 5 --expect d"
- requires="Phase-Two-Distributed-Primitives-Check-Apps-${#}"/>
-
- <step name="Phase-Two-Check-CM-${#}"
- exec="onos-execute-expect ${OC#} map-test foo get a --retry 5 --expect d"
- requires="Phase-Two-Check-ECM-${#}"/>
</sequential>
</group>
diff --git a/tools/test/scenarios/upgrade.xml b/tools/test/scenarios/upgrade.xml
index 0890ded..e947b47 100644
--- a/tools/test/scenarios/upgrade.xml
+++ b/tools/test/scenarios/upgrade.xml
@@ -20,16 +20,8 @@
<group name="Upgrade" requires="Distributed-Primitives-Setup">
<step name="Push-Bits" exec="onos-push-bits-through-proxy" if="${OCT}"/>
- <step name="Initialize-ECM"
- exec="onos ${OC1} ec-map-test foo put a b"/>
-
- <step name="Initialize-CM"
- exec="onos ${OC1} map-test foo put a b"
- requires="Initialize-ECM"/>
-
<step name="Initialize-Upgrade"
- exec="onos ${OC1} issu init"
- requires="Initialize-CM"/>
+ exec="onos ${OC1} issu init"/>
<group name="Phase-1">
<sequential var="${OCMI#}"
@@ -67,52 +59,12 @@
<step name="Phase-One-Distributed-Primitives-Check-Apps-${#}"
exec="onos-check-apps ${OCMI#} distributedprimitives includes"
requires="Phase-One-Wait-for-Start-${#}"/>
-
- <step name="Phase-One-Check-ECM-${#}"
- exec="onos-execute-expect ${OCMI#} ec-map-test foo get a --retry 5 --expect b"
- requires="Phase-One-Distributed-Primitives-Check-Apps-${#}"/>
-
- <step name="Phase-One-Check-CM-${#}"
- exec="onos-execute-expect ${OCMI#} map-test foo get a --retry 5 --expect b"
- requires="Phase-One-Check-ECM-${#}"/>
</sequential>
</group>
- <group name="Change-ECM-values" requires="Phase-1">
- <step name="Change-Majority-ECM"
- exec="onos ${OCMA1} ec-map-test foo put a d"/>
-
- <step name="Change-Majority-CM"
- exec="onos ${OCMA1} map-test foo put a d"/>
-
- <step name="Change-Minority-ECM"
- exec="onos ${OCMI1} ec-map-test foo put a c"/>
-
- <step name="Change-Minority-CM"
- exec="onos ${OCMI1} map-test foo put a c"/>
- </group>
-
- <group name="Check-ECM-values" requires="Change-ECM-values">
- <parallel var="${OCMI#}">
- <step name="Parallel-Check-Minority-ECM-${#}"
- exec="onos-execute-expect ${OCMI#} ec-map-test foo get a --retry 5 --expect c"/>
-
- <step name="Parallel-Check-Minority-CM-${#}"
- exec="onos-execute-expect ${OCMI#} map-test foo get a --retry 5 --expect c"/>
- </parallel>
-
- <parallel var="${OCMA#}">
- <step name="Parallel-Check-Majority-ECM-${#}"
- exec="onos-execute-expect ${OCMA#} ec-map-test foo get a --retry 5 --expect d"/>
-
- <step name="Parallel-Check-Majority-CM-${#}"
- exec="onos-execute-expect ${OCMA#} map-test foo get a --retry 5 --expect d"/>
- </parallel>
- </group>
-
<step name="Run-Upgrade"
exec="onos ${OC1} issu upgrade"
- requires="Check-ECM-values"/>
+ requires="Phase-1"/>
<group name="Phase-2" requires="Run-Upgrade">
<sequential var="${OCMA#}"
@@ -150,14 +102,6 @@
<step name="Phase-Two-Distributed-Primitives-Check-Apps-${#}"
exec="onos-check-apps ${OCMA#} distributedprimitives includes"
requires="Phase-Two-Wait-for-Start-${#}"/>
-
- <step name="Phase-Two-Check-ECM-${#}"
- exec="onos-execute-expect ${OC#} ec-map-test foo get a --retry 5 --expect c"
- requires="Phase-Two-Distributed-Primitives-Check-Apps-${#}"/>
-
- <step name="Phase-Two-Check-CM-${#}"
- exec="onos-execute-expect ${OC#} map-test foo get a --retry 5 --expect c"
- requires="Phase-Two-Check-ECM-${#}"/>
</sequential>
</group>