[ONOS-7088] Distribute storage partitions evenly during upgrades
Change-Id: Id82f86ddedbe6c7de2322717338c5c341177bc9e
diff --git a/core/api/src/main/java/org/onosproject/cluster/DefaultPartition.java b/core/api/src/main/java/org/onosproject/cluster/DefaultPartition.java
index 83ab5b9..d0f2f3d 100644
--- a/core/api/src/main/java/org/onosproject/cluster/DefaultPartition.java
+++ b/core/api/src/main/java/org/onosproject/cluster/DefaultPartition.java
@@ -23,6 +23,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+import org.onosproject.core.Version;
/**
* Default {@link Partition} implementation.
@@ -30,6 +31,7 @@
public class DefaultPartition implements Partition {
private final PartitionId id;
+ private final Version version;
private final Collection<NodeId> members;
/**
@@ -37,6 +39,7 @@
*/
protected DefaultPartition() {
id = null;
+ version = null;
members = null;
}
@@ -44,10 +47,12 @@
* Constructs a partition.
*
* @param id partition identifier
+ * @param version partition version
* @param members partition member nodes
*/
- public DefaultPartition(PartitionId id, Collection<NodeId> members) {
+ public DefaultPartition(PartitionId id, Version version, Collection<NodeId> members) {
this.id = checkNotNull(id);
+ this.version = version;
this.members = ImmutableSet.copyOf(members);
}
@@ -58,23 +63,30 @@
*/
public DefaultPartition(Partition other) {
this.id = checkNotNull(other.getId());
+ this.version = checkNotNull(other.getVersion());
this.members = ImmutableSet.copyOf(other.getMembers());
}
@Override
public PartitionId getId() {
- return this.id;
+ return id;
+ }
+
+ @Override
+ public Version getVersion() {
+ return version;
}
@Override
public Collection<NodeId> getMembers() {
- return this.members;
+ return members;
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
.add("id", id)
+ .add("version", version)
.add("members", members)
.toString();
}
diff --git a/core/api/src/main/java/org/onosproject/cluster/Partition.java b/core/api/src/main/java/org/onosproject/cluster/Partition.java
index c73357d..ebff406 100644
--- a/core/api/src/main/java/org/onosproject/cluster/Partition.java
+++ b/core/api/src/main/java/org/onosproject/cluster/Partition.java
@@ -17,6 +17,8 @@
import java.util.Collection;
+import org.onosproject.core.Version;
+
/**
* A partition or shard is a group of controller nodes that are work together to maintain state.
* A ONOS cluster is typically made of of one or partitions over which the the data is partitioned.
@@ -30,6 +32,13 @@
PartitionId getId();
/**
+ * Returns the partition version.
+ *
+ * @return the partition version
+ */
+ Version getVersion();
+
+ /**
* Returns the controller nodes that are members of this partition.
* @return collection of controller node identifiers
*/
diff --git a/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataDiffTest.java b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataDiffTest.java
index a8ae78f..f97bd6a 100644
--- a/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataDiffTest.java
+++ b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataDiffTest.java
@@ -24,6 +24,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
+import org.onosproject.core.Version;
/**
* Unit tests for ClusterMetadataDiff.
@@ -35,7 +36,7 @@
PartitionId pid1 = PartitionId.from(1);
NodeId nid1 = NodeId.nodeId("10.0.0.1");
ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
- Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
+ Partition p1 = new DefaultPartition(pid1, Version.version("1.0.0"), ImmutableSet.of(nid1));
ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
ClusterMetadataDiff diff = new ClusterMetadataDiff(md1, md1);
assertTrue(diff.nodesAdded().isEmpty());
@@ -53,8 +54,8 @@
NodeId nid2 = NodeId.nodeId("10.0.0.2");
ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
ControllerNode n2 = new DefaultControllerNode(nid2, IpAddress.valueOf("10.0.0.2"), 9876);
- Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
- Partition p12 = new DefaultPartition(pid1, ImmutableSet.of(nid1, nid2));
+ Partition p1 = new DefaultPartition(pid1, Version.version("1.0.0"), ImmutableSet.of(nid1));
+ Partition p12 = new DefaultPartition(pid1, Version.version("1.0.0"), ImmutableSet.of(nid1, nid2));
ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
ClusterMetadata md12 = new ClusterMetadata("foo", ImmutableSet.of(n1, n2), ImmutableSet.of(p12));
ClusterMetadataDiff diff = new ClusterMetadataDiff(md1, md12);
@@ -77,8 +78,8 @@
NodeId nid2 = NodeId.nodeId("10.0.0.2");
ControllerNode n1 = new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
ControllerNode n2 = new DefaultControllerNode(nid2, IpAddress.valueOf("10.0.0.2"), 9876);
- Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
- Partition p12 = new DefaultPartition(pid1, ImmutableSet.of(nid1, nid2));
+ Partition p1 = new DefaultPartition(pid1, Version.version("1.0.0"), ImmutableSet.of(nid1));
+ Partition p12 = new DefaultPartition(pid1, Version.version("1.0.0"), ImmutableSet.of(nid1, nid2));
ClusterMetadata md1 = new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
ClusterMetadata md12 = new ClusterMetadata("foo", ImmutableSet.of(n1, n2), ImmutableSet.of(p12));
ClusterMetadataDiff diff = new ClusterMetadataDiff(md12, md1);
diff --git a/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataEventTest.java b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataEventTest.java
index 2648d36..036839a 100644
--- a/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataEventTest.java
+++ b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataEventTest.java
@@ -19,6 +19,7 @@
import com.google.common.testing.EqualsTester;
import org.junit.Test;
import org.onlab.packet.IpAddress;
+import org.onosproject.core.Version;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -37,9 +38,9 @@
new DefaultControllerNode(nid1, IpAddress.valueOf("10.0.0.1"), 9876);
private final ControllerNode n2 =
new DefaultControllerNode(nid2, IpAddress.valueOf("10.0.0.2"), 9876);
- private final Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
- private final Partition p2 = new DefaultPartition(pid2, ImmutableSet.of(nid1, nid2));
- private final Partition p3 = new DefaultPartition(pid2, ImmutableSet.of(nid2));
+ private final Partition p1 = new DefaultPartition(pid1, Version.version("1.0.0"), ImmutableSet.of(nid1));
+ private final Partition p2 = new DefaultPartition(pid2, Version.version("1.0.0"), ImmutableSet.of(nid1, nid2));
+ private final Partition p3 = new DefaultPartition(pid2, Version.version("1.0.0"), ImmutableSet.of(nid2));
private final ClusterMetadata metadata1 =
new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
private final ClusterMetadata metadata2 =
diff --git a/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataServiceAdapter.java b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataServiceAdapter.java
index 3904cbc..443ee29 100644
--- a/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataServiceAdapter.java
+++ b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataServiceAdapter.java
@@ -18,6 +18,7 @@
import org.onlab.packet.IpAddress;
import com.google.common.collect.Sets;
+import org.onosproject.core.Version;
/**
* Test adapter for the ClusterMetadata service.
@@ -28,7 +29,7 @@
public ClusterMetadata getClusterMetadata() {
final NodeId nid = new NodeId("test-node");
final IpAddress addr = IpAddress.valueOf(0);
- final Partition p = new DefaultPartition(PartitionId.from(1), Sets.newHashSet(nid));
+ final Partition p = new DefaultPartition(PartitionId.from(1), Version.version("1.0.0"), Sets.newHashSet(nid));
return new ClusterMetadata("test-cluster",
Sets.newHashSet(new DefaultControllerNode(nid, addr)),
Sets.newHashSet(p));
diff --git a/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataTest.java b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataTest.java
index 9d85ae9..3d86d93 100644
--- a/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataTest.java
+++ b/core/api/src/test/java/org/onosproject/cluster/ClusterMetadataTest.java
@@ -19,6 +19,7 @@
import com.google.common.testing.EqualsTester;
import org.junit.Test;
import org.onlab.packet.IpAddress;
+import org.onosproject.core.Version;
import org.onosproject.net.provider.ProviderId;
import static org.hamcrest.Matchers.contains;
@@ -41,8 +42,8 @@
private final ControllerNode n2 =
new DefaultControllerNode(nid2, IpAddress.valueOf("10.0.0.2"), 9876);
- private final Partition p1 = new DefaultPartition(pid1, ImmutableSet.of(nid1));
- private final Partition p2 = new DefaultPartition(pid2, ImmutableSet.of(nid1, nid2));
+ private final Partition p1 = new DefaultPartition(pid1, Version.version("1.0.0"), ImmutableSet.of(nid1));
+ private final Partition p2 = new DefaultPartition(pid2, Version.version("1.0.0"), ImmutableSet.of(nid1, nid2));
private final ClusterMetadata metadata1 =
new ClusterMetadata("foo", ImmutableSet.of(n1), ImmutableSet.of(p1));
diff --git a/core/api/src/test/java/org/onosproject/cluster/DefaultPartitionTest.java b/core/api/src/test/java/org/onosproject/cluster/DefaultPartitionTest.java
index be711a7..105c94e 100644
--- a/core/api/src/test/java/org/onosproject/cluster/DefaultPartitionTest.java
+++ b/core/api/src/test/java/org/onosproject/cluster/DefaultPartitionTest.java
@@ -21,6 +21,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.testing.EqualsTester;
+import org.onosproject.core.Version;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
@@ -43,13 +44,13 @@
PartitionId pid2 = new PartitionId(2);
PartitionId pid3 = new PartitionId(3);
- DefaultPartition partition1 = new DefaultPartition(pid1, ImmutableSet.of(id1));
- DefaultPartition sameAsPartition1 = new DefaultPartition(pid1, ImmutableSet.of(id1));
+ DefaultPartition partition1 = new DefaultPartition(pid1, Version.version("1.0.0"), ImmutableSet.of(id1));
+ DefaultPartition sameAsPartition1 = new DefaultPartition(pid1, Version.version("1.0.0"), ImmutableSet.of(id1));
- DefaultPartition partition2 = new DefaultPartition(pid2, ImmutableSet.of(id2));
+ DefaultPartition partition2 = new DefaultPartition(pid2, Version.version("1.0.0"), ImmutableSet.of(id2));
DefaultPartition copyOfPartition2 = new DefaultPartition(partition2);
- DefaultPartition partition3 = new DefaultPartition(pid3, ImmutableSet.of(id1, id2, id3));
+ DefaultPartition partition3 = new DefaultPartition(pid3, Version.version("1.0.0"), ImmutableSet.of(id1, id2, id3));
/**
* Checks that the default partition implementation is an immutable
diff --git a/core/api/src/test/java/org/onosproject/store/primitives/PartitionEventTest.java b/core/api/src/test/java/org/onosproject/store/primitives/PartitionEventTest.java
index 0aec80a..abcc659 100644
--- a/core/api/src/test/java/org/onosproject/store/primitives/PartitionEventTest.java
+++ b/core/api/src/test/java/org/onosproject/store/primitives/PartitionEventTest.java
@@ -22,6 +22,7 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.Version;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
@@ -41,9 +42,9 @@
private final Partition p1 =
- new DefaultPartition(pid1, ImmutableSet.of(nid1));
+ new DefaultPartition(pid1, Version.version("1.0.0"), ImmutableSet.of(nid1));
private final Partition p2 =
- new DefaultPartition(pid2, ImmutableSet.of(nid1, nid2));
+ new DefaultPartition(pid2, Version.version("1.0.0"), ImmutableSet.of(nid1, nid2));
private final PartitionEvent event1 =
new PartitionEvent(PartitionEvent.Type.UPDATED, p1, time);
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
index 92a3894..95a9b19 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterManager.java
@@ -52,6 +52,7 @@
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
+import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
import org.slf4j.Logger;
@@ -88,6 +89,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected SystemService systemService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VersionService versionService;
+
private final AtomicReference<ClusterMetadata> currentMetadata = new AtomicReference<>();
private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
@@ -196,7 +200,7 @@
}
}
- private static Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes, int partitionSize) {
+ private Set<Partition> buildDefaultPartitions(Collection<ControllerNode> nodes, int partitionSize) {
List<ControllerNode> sorted = new ArrayList<>(nodes);
Collections.sort(sorted, (o1, o2) -> o1.id().toString().compareTo(o2.id().toString()));
Set<Partition> partitions = Sets.newHashSet();
@@ -209,7 +213,7 @@
for (int j = 0; j < count; j++) {
set.add(sorted.get((i + j) % length).id());
}
- partitions.add(new DefaultPartition(PartitionId.from((index + 1)), set));
+ partitions.add(new DefaultPartition(PartitionId.from((index + 1)), versionService.version(), set));
}
return partitions;
}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/DefaultClusterMetadataProvider.java b/core/net/src/main/java/org/onosproject/cluster/impl/DefaultClusterMetadataProvider.java
index b47f2cd..83fe78b 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/DefaultClusterMetadataProvider.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/DefaultClusterMetadataProvider.java
@@ -41,6 +41,7 @@
import org.onosproject.cluster.NodeId;
import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.VersionService;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
@@ -58,6 +59,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataProviderRegistry providerRegistry;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected VersionService versionService;
+
private static final String ONOS_IP = "ONOS_IP";
private static final String ONOS_INTERFACE = "ONOS_INTERFACE";
private static final String ONOS_ALLOW_IPV6 = "ONOS_ALLOW_IPV6";
@@ -72,7 +76,10 @@
ControllerNode localNode =
new DefaultControllerNode(new NodeId(localIp), IpAddress.valueOf(localIp), DEFAULT_ONOS_PORT);
// partition 1
- Partition partition = new DefaultPartition(PartitionId.from(1), ImmutableSet.of(localNode.id()));
+ Partition partition = new DefaultPartition(
+ PartitionId.from(1),
+ versionService.version(),
+ ImmutableSet.of(localNode.id()));
ClusterMetadata metadata = new ClusterMetadata(PROVIDER_ID,
"default",
ImmutableSet.of(localNode),
diff --git a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
index 7a83724..ab1b1c3 100644
--- a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
+++ b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
@@ -49,7 +49,10 @@
import org.slf4j.Logger;
import static org.onosproject.security.AppGuard.checkPermission;
-import static org.onosproject.security.AppPermission.Type.*;
+import static org.onosproject.security.AppPermission.Type.CLUSTER_EVENT;
+import static org.onosproject.security.AppPermission.Type.UPGRADE_EVENT;
+import static org.onosproject.security.AppPermission.Type.UPGRADE_READ;
+import static org.onosproject.security.AppPermission.Type.UPGRADE_WRITE;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -86,6 +89,8 @@
@Activate
public void activate() {
+ eventDispatcher.addSink(UpgradeEvent.class, listenerRegistry);
+
state = coordinationService.<Upgrade>atomicValueBuilder()
.withName("onos-upgrade-state")
.withSerializer(Serializer.using(KryoNamespaces.API))
@@ -138,6 +143,7 @@
@Deactivate
public void deactivate() {
+ eventDispatcher.removeSink(UpgradeEvent.class);
state.removeListener(stateListener);
clusterService.removeListener(clusterListener);
log.info("Stopped");
diff --git a/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
index 78127ee..4df9d46 100644
--- a/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
@@ -35,6 +35,7 @@
import org.onosproject.cluster.MembershipGroup;
import org.onosproject.cluster.MembershipServiceAdapter;
import org.onosproject.cluster.NodeId;
+import org.onosproject.common.event.impl.TestEventDispatcher;
import org.onosproject.core.Version;
import org.onosproject.core.VersionServiceAdapter;
import org.onosproject.store.service.AsyncAtomicValue;
@@ -49,6 +50,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.onosproject.net.NetTestTools.injectEventDispatcher;
/**
* Upgrade manager test.
@@ -66,6 +68,7 @@
@SuppressWarnings("unchecked")
private UpgradeManager createUpgradeManager(Version version, Upgrade state, List<Version> versions) {
UpgradeManager upgradeManager = new UpgradeManager();
+ injectEventDispatcher(upgradeManager, new TestEventDispatcher());
upgradeManager.membershipService = new MembershipServiceAdapter() {
@Override
public MembershipGroup getLocalGroup() {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ActiveStoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ActiveStoragePartition.java
new file mode 100644
index 0000000..5e9301d
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ActiveStoragePartition.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.io.File;
+import java.util.concurrent.CompletableFuture;
+
+import io.atomix.protocols.raft.cluster.MemberId;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Partition;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+
+/**
+ * Storage partition active on the local node.
+ */
+public class ActiveStoragePartition extends StoragePartition {
+ public ActiveStoragePartition(
+ Partition partition,
+ ClusterCommunicationService clusterCommunicator,
+ ClusterService clusterService) {
+ super(partition, clusterCommunicator, clusterService);
+ }
+
+ @Override
+ public String getName() {
+ return partition.getId().toString();
+ }
+
+ @Override
+ public File getDataFolder() {
+ return new File(PARTITIONS_DIR + partition.getId());
+ }
+
+ @Override
+ protected CompletableFuture<Void> openServer() {
+ StoragePartitionServer server = new StoragePartitionServer(
+ this,
+ MemberId.from(localNodeId.id()),
+ clusterCommunicator);
+ return server.open().thenRun(() -> this.server = server);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
index 605bf25..e7ed20a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CoordinationManager.java
@@ -15,6 +15,9 @@
*/
package org.onosproject.store.primitives.impl;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -52,14 +55,10 @@
import org.onosproject.store.service.WorkQueue;
import org.slf4j.Logger;
-import java.io.File;
import java.util.List;
-import java.util.function.Supplier;
-import java.util.stream.Collectors;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.STORAGE_WRITE;
-import static org.onosproject.store.primitives.impl.PartitionManager.PARTITIONS_DIR;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -86,18 +85,16 @@
@Activate
public void activate() {
- partition = new StoragePartition(
+ partition = new ActiveStoragePartition(
new DefaultPartition(
PartitionId.SHARED,
+ null,
clusterService.getNodes()
.stream()
.map(ControllerNode::id)
.collect(Collectors.toSet())),
- null,
- null,
clusterCommunicator,
- clusterService,
- new File(PARTITIONS_DIR + "/coordination"));
+ clusterService);
partition.open().join();
primitiveCreator = partition.client();
log.info("Started");
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ForkedStoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ForkedStoragePartition.java
new file mode 100644
index 0000000..70b12c3
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/ForkedStoragePartition.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.io.File;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import io.atomix.protocols.raft.cluster.MemberId;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Partition;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+
+/**
+ * Storage partition forked from a previous version partition.
+ */
+public class ForkedStoragePartition extends StoragePartition {
+ final Partition source;
+
+ public ForkedStoragePartition(
+ Partition partition,
+ Partition source,
+ ClusterCommunicationService clusterCommunicator,
+ ClusterService clusterService) {
+ super(partition, clusterCommunicator, clusterService);
+ this.source = source;
+ }
+
+ @Override
+ public String getName() {
+ return partition.getId().toString();
+ }
+
+ @Override
+ public File getDataFolder() {
+ return new File(PARTITIONS_DIR + partition.getId());
+ }
+
+ @Override
+ protected CompletableFuture<Void> openServer() {
+ StoragePartitionServer server = new StoragePartitionServer(
+ this,
+ MemberId.from(localNodeId.id()),
+ clusterCommunicator);
+
+ CompletableFuture<Void> future;
+ if (partition.getMembers().size() == 1) {
+ future = server.fork(source);
+ } else {
+ future = server.join(partition.getMembers().stream()
+ .filter(nodeId -> !nodeId.equals(localNodeId))
+ .map(nodeId -> MemberId.from(nodeId.id()))
+ .collect(Collectors.toList()));
+ }
+ return future.thenRun(() -> this.server = server);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/InactiveStoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/InactiveStoragePartition.java
new file mode 100644
index 0000000..3924cd7
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/InactiveStoragePartition.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+
+import com.google.common.hash.Hashing;
+import io.atomix.protocols.raft.cluster.MemberId;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.Partition;
+import org.onosproject.core.Version;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+
+/**
+ * Storage partition inactive on the local node.
+ */
+public class InactiveStoragePartition extends StoragePartition {
+ static final String INACTIVE_DIR = PARTITIONS_DIR + "archive/";
+
+ public InactiveStoragePartition(
+ Partition partition,
+ ClusterCommunicationService clusterCommunicator,
+ ClusterService clusterService) {
+ super(partition, clusterCommunicator, clusterService);
+ }
+
+ @Override
+ public String getName() {
+ Version version = partition.getVersion();
+ if (version != null) {
+ long hashCode = Hashing.sha256().hashString(version.toString(), StandardCharsets.UTF_8).asLong();
+ return String.format("%s-%d", partition.getId(), hashCode);
+ }
+ return partition.getId().toString();
+ }
+
+ @Override
+ public File getDataFolder() {
+ return new File(INACTIVE_DIR + partition.getId());
+ }
+
+ @Override
+ protected CompletableFuture<Void> openServer() {
+ StoragePartitionServer server = new StoragePartitionServer(
+ this,
+ MemberId.from(localNodeId.id()),
+ clusterCommunicator);
+ return server.open().thenRun(() -> this.server = server);
+ }
+}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
index 8d5440d..74fef44 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionManager.java
@@ -17,8 +17,11 @@
package org.onosproject.store.primitives.impl;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,17 +36,22 @@
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
import org.onosproject.cluster.ClusterMetadata;
import org.onosproject.cluster.ClusterMetadataDiff;
import org.onosproject.cluster.ClusterMetadataEvent;
import org.onosproject.cluster.ClusterMetadataEventListener;
import org.onosproject.cluster.ClusterMetadataService;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.Member;
+import org.onosproject.cluster.MembershipService;
import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.Partition;
import org.onosproject.cluster.PartitionDiff;
import org.onosproject.cluster.PartitionId;
import org.onosproject.core.Version;
-import org.onosproject.core.VersionService;
import org.onosproject.event.AbstractListenerManager;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
@@ -53,6 +61,9 @@
import org.onosproject.store.primitives.PartitionService;
import org.onosproject.store.service.PartitionClientInfo;
import org.onosproject.store.service.PartitionInfo;
+import org.onosproject.upgrade.Upgrade;
+import org.onosproject.upgrade.UpgradeEvent;
+import org.onosproject.upgrade.UpgradeEventListener;
import org.onosproject.upgrade.UpgradeService;
import org.slf4j.Logger;
@@ -68,9 +79,6 @@
public class PartitionManager extends AbstractListenerManager<PartitionEvent, PartitionEventListener>
implements PartitionService, PartitionAdminService {
- static final String PARTITIONS_DIR =
- System.getProperty("karaf.data") + "/db/partitions/";
-
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -83,44 +91,53 @@
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected UpgradeService upgradeService;
+ protected MembershipService membershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected VersionService versionService;
+ protected UpgradeService upgradeService;
private final Map<PartitionId, StoragePartition> inactivePartitions = Maps.newConcurrentMap();
private final Map<PartitionId, StoragePartition> activePartitions = Maps.newConcurrentMap();
private final AtomicReference<ClusterMetadata> currentClusterMetadata = new AtomicReference<>();
- private final InternalClusterMetadataListener metadataListener = new InternalClusterMetadataListener();
+
+ private final ClusterEventListener clusterListener = new InternalClusterEventListener();
+ private final UpgradeEventListener upgradeListener = new InternalUpgradeEventListener();
+ private final ClusterMetadataEventListener metadataListener = new InternalClusterMetadataListener();
@Activate
public void activate() {
eventDispatcher.addSink(PartitionEvent.class, listenerRegistry);
currentClusterMetadata.set(metadataService.getClusterMetadata());
+
+ clusterService.addListener(clusterListener);
+ upgradeService.addListener(upgradeListener);
metadataService.addListener(metadataListener);
// If an upgrade is currently in progress and this node is an upgraded node, initialize upgrade partitions.
CompletableFuture<Void> openFuture;
if (upgradeService.isUpgrading() && upgradeService.isLocalUpgraded()) {
- Version sourceVersion = upgradeService.getState().source();
- Version targetVersion = upgradeService.getState().target();
currentClusterMetadata.get()
.getPartitions()
.forEach(partition -> {
- inactivePartitions.put(partition.getId(), new StoragePartition(
+ // Create a default partition and assign it to inactive partitions. This node will join
+ // inactive partitions to participate in consensus for fault tolerance, but the partitions
+ // won't be accessible via client proxies.
+ inactivePartitions.put(partition.getId(), new InactiveStoragePartition(
partition,
- sourceVersion,
- null,
clusterCommunicator,
- clusterService,
- new File(PARTITIONS_DIR + sourceVersion + "/" + partition.getId())));
- activePartitions.put(partition.getId(), new StoragePartition(
+ clusterService));
+
+ // Create a forked partition and assign it to active partitions. These partitions will be
+ // forked from commit logs for previous version partitions.
+ Partition forkedPartition = computeInitialPartition(
partition,
- targetVersion,
- sourceVersion,
+ upgradeService.getState().target(),
+ getLocalNodes());
+ activePartitions.put(partition.getId(), new ForkedStoragePartition(
+ forkedPartition,
+ partition,
clusterCommunicator,
- clusterService,
- new File(PARTITIONS_DIR + targetVersion + "/" + partition.getId())));
+ clusterService));
});
// We have to fork existing partitions before we can start inactive partition servers to
@@ -132,16 +149,12 @@
.map(StoragePartition::open)
.toArray(CompletableFuture[]::new)));
} else {
- Version version = versionService.version();
currentClusterMetadata.get()
.getPartitions()
- .forEach(partition -> activePartitions.put(partition.getId(), new StoragePartition(
+ .forEach(partition -> activePartitions.put(partition.getId(), new ActiveStoragePartition(
partition,
- version,
- null,
clusterCommunicator,
- clusterService,
- new File(PARTITIONS_DIR + version + "/" + partition.getId()))));
+ clusterService)));
openFuture = CompletableFuture.allOf(activePartitions.values().stream()
.map(StoragePartition::open)
.toArray(CompletableFuture[]::new));
@@ -153,6 +166,8 @@
@Deactivate
public void deactivate() {
+ clusterService.removeListener(clusterListener);
+ upgradeService.removeListener(upgradeListener);
metadataService.removeListener(metadataListener);
eventDispatcher.removeSink(PartitionEvent.class);
@@ -208,6 +223,132 @@
.collect(Collectors.toList());
}
+ /**
+ * Returns a list of nodes sorted by time ordered oldest to newest.
+ *
+ * @return a list of nodes sorted by time
+ */
+ private List<NodeId> getLocalNodes() {
+ return membershipService.getLocalGroup()
+ .members()
+ .stream()
+ .map(Member::nodeId)
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * Computes an initial forked partition from the given source partition.
+ *
+ * @param sourcePartition the source partition from which to compute the partition
+ * @param targetVersion the target partition version
+ * @param members the set of members available to the partition
+ * @return the computed forked partition
+ */
+ protected static Partition computeInitialPartition(
+ Partition sourcePartition,
+ Version targetVersion,
+ List<NodeId> members) {
+ return computePartition(sourcePartition, targetVersion, members, 1);
+ }
+
+ /**
+ * Computes a final forked partition from the given source partition.
+ *
+ * @param sourcePartition the source partition from which to compute the partition
+ * @param targetVersion the target partition version
+ * @param members the set of members available to the partition
+ * @return the computed forked partition
+ */
+ protected static Partition computeFinalPartition(
+ Partition sourcePartition,
+ Version targetVersion,
+ List<NodeId> members) {
+ return computePartition(sourcePartition, targetVersion, members, 0);
+ }
+
+ /**
+ * Computes a forked partition from the given source partition.
+ *
+ * @param sourcePartition the source partition from which to compute the partition
+ * @param targetVersion the target partition version
+ * @param members the set of members available to the partition
+ * @param delta the number of additional members to preserve outside the partition
+ * @return the computed forked partition
+ */
+ private static Partition computePartition(
+ Partition sourcePartition,
+ Version targetVersion,
+ List<NodeId> members,
+ int delta) {
+ // Create a collection of members of the forked/isolated partition. Initial membership
+ // will include up to n upgraded nodes until all n nodes in the partition have been upgraded.
+ List<NodeId> sortedMembers = members.stream()
+ .sorted()
+ .collect(Collectors.toList());
+
+ // Create a list of members of the partition that have been upgraded according to the
+ // version isolated cluster membership.
+ List<NodeId> partitionMembers = sortedMembers.stream()
+ .filter(nodeId -> sourcePartition.getMembers().contains(nodeId))
+ .collect(Collectors.toList());
+
+ // If additional members need to be added to the partition to make up a full member list,
+ // add members in sorted order to create deterministic rebalancing of nodes.
+ int totalMembers = sourcePartition.getMembers().size() + delta;
+ if (partitionMembers.size() < totalMembers) {
+ for (int i = partitionMembers.size(); i < totalMembers; i++) {
+ Optional<NodeId> nextMember = sortedMembers.stream()
+ .filter(nodeId -> !partitionMembers.contains(nodeId))
+ .findFirst();
+ if (nextMember.isPresent()) {
+ partitionMembers.add(nextMember.get());
+ } else {
+ break;
+ }
+ }
+ }
+
+ return new DefaultPartition(
+ sourcePartition.getId(),
+ targetVersion,
+ partitionMembers);
+ }
+
+ private void processInstanceReady(NodeId nodeId) {
+ if (upgradeService.isUpgrading() && upgradeService.isLocalUpgraded()) {
+ currentClusterMetadata.get()
+ .getPartitions()
+ .forEach(partition -> {
+ StoragePartition activePartition = activePartitions.get(partition.getId());
+ if (activePartition != null) {
+ Partition newPartition = computeFinalPartition(
+ partition,
+ upgradeService.getState().target(),
+ getLocalNodes());
+ log.info("Updating storage partition {}: {}", partition, newPartition);
+ activePartition.onUpdate(newPartition);
+ }
+ });
+ }
+ }
+
+ private void processUpgradeComplete(Upgrade upgrade) {
+ if (!inactivePartitions.isEmpty()) {
+ List<CompletableFuture<Void>> futures = inactivePartitions.values()
+ .stream()
+ .map(StoragePartition::delete)
+ .collect(Collectors.toList());
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])).thenRun(() -> {
+ try {
+ Files.delete(new File(InactiveStoragePartition.INACTIVE_DIR).toPath());
+ } catch (IOException e) {
+ log.error("Failed to delete partition archive");
+ }
+ });
+ inactivePartitions.clear();
+ }
+ }
+
private void processMetadataUpdate(ClusterMetadata clusterMetadata) {
ClusterMetadataDiff diffExaminer =
new ClusterMetadataDiff(currentClusterMetadata.get(), clusterMetadata);
@@ -218,6 +359,24 @@
.forEach(diff -> activePartitions.get(diff.partitionId()).onUpdate(diff.newValue()));
}
+ private class InternalClusterEventListener implements ClusterEventListener {
+ @Override
+ public void event(ClusterEvent event) {
+ if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
+ processInstanceReady(event.subject().id());
+ }
+ }
+ }
+
+ private class InternalUpgradeEventListener implements UpgradeEventListener {
+ @Override
+ public void event(UpgradeEvent event) {
+ if (event.type() == UpgradeEvent.Type.COMMITTED) {
+ processUpgradeComplete(event.subject());
+ }
+ }
+ }
+
private class InternalClusterMetadataListener implements ClusterMetadataEventListener {
@Override
public void event(ClusterMetadataEvent event) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 5458edd..c419d98 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -51,18 +51,17 @@
/**
* Storage partition.
*/
-public class StoragePartition implements Managed<StoragePartition> {
+public abstract class StoragePartition implements Managed<StoragePartition> {
- private final AtomicBoolean isOpened = new AtomicBoolean(false);
- private final ClusterCommunicationService clusterCommunicator;
- private final ClusterService clusterService;
- private final Version version;
- private final Version source;
- private final File dataFolder;
- private Partition partition;
- private NodeId localNodeId;
- private StoragePartitionServer server;
- private StoragePartitionClient client;
+ static final String PARTITIONS_DIR =
+ System.getProperty("karaf.data") + "/db/partitions/";
+
+ protected final AtomicBoolean isOpened = new AtomicBoolean(false);
+ protected final ClusterCommunicationService clusterCommunicator;
+ protected Partition partition;
+ protected NodeId localNodeId;
+ protected StoragePartitionServer server;
+ protected StoragePartitionClient client;
public static final Map<String, Supplier<RaftService>> RAFT_SERVICES =
ImmutableMap.<String, Supplier<RaftService>>builder()
@@ -83,18 +82,11 @@
public StoragePartition(
Partition partition,
- Version version,
- Version source,
ClusterCommunicationService clusterCommunicator,
- ClusterService clusterService,
- File dataFolder) {
+ ClusterService clusterService) {
this.partition = partition;
- this.version = version;
- this.source = source;
this.clusterCommunicator = clusterCommunicator;
- this.clusterService = clusterService;
this.localNodeId = clusterService.getLocalNode().id();
- this.dataFolder = dataFolder;
}
/**
@@ -107,12 +99,7 @@
@Override
public CompletableFuture<Void> open() {
- if (source != null) {
- return forkServer(source)
- .thenCompose(v -> openClient())
- .thenAccept(v -> isOpened.set(true))
- .thenApply(v -> null);
- } else if (partition.getMembers().contains(localNodeId)) {
+ if (partition.getMembers().contains(localNodeId)) {
return openServer()
.thenCompose(v -> openClient())
.thenAccept(v -> isOpened.set(true))
@@ -131,22 +118,35 @@
}
/**
+ * Deletes the partition.
+ *
+ * @return future to be completed once the partition has been deleted
+ */
+ public CompletableFuture<Void> delete() {
+ return closeServer().thenCompose(v -> closeClient()).thenRun(() -> deleteServer());
+ }
+
+ /**
+ * Returns the partition data folder.
+ *
+ * @return the partition data folder
+ */
+ public abstract File getDataFolder();
+
+ /**
* Returns the partition name.
*
* @return the partition name
*/
- public String getName() {
- return getName(version);
- }
+ public abstract String getName();
/**
- * Returns the partition name for the given version.
+ * Returns the identifier of the {@link Partition partition} associated with this instance.
*
- * @param version the version for which to return the partition name
- * @return the partition name for the given version
+ * @return partition identifier
*/
- String getName(Version version) {
- return version != null ? String.format("partition-%d-%s", partition.getId().id(), version) : "partition-core";
+ public PartitionId getId() {
+ return partition.getId();
}
/**
@@ -155,24 +155,7 @@
* @return the partition version
*/
public Version getVersion() {
- return version;
- }
-
- /**
- * Returns the partition data folder.
- *
- * @return the partition data folder
- */
- public File getDataFolder() {
- return dataFolder;
- }
-
- /**
- * Returns the identifier of the {@link Partition partition} associated with this instance.
- * @return partition identifier
- */
- public PartitionId getId() {
- return partition.getId();
+ return partition.getVersion();
}
/**
@@ -188,64 +171,14 @@
* @return partition member identifiers
*/
public Collection<MemberId> getMemberIds() {
- return source != null ?
- clusterService.getNodes()
- .stream()
- .filter(node -> {
- Version nodeVersion = clusterService.getVersion(node.id());
- return nodeVersion != null && nodeVersion.equals(version);
- })
- .map(node -> MemberId.from(node.id().id()))
- .collect(Collectors.toList()) :
- Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
- }
-
- Collection<MemberId> getMemberIds(Version version) {
- if (source == null || version.equals(source)) {
- return Collections2.transform(partition.getMembers(), n -> MemberId.from(n.id()));
- } else {
- return clusterService.getNodes()
- .stream()
- .filter(node -> {
- Version nodeVersion = clusterService.getVersion(node.id());
- return nodeVersion != null && nodeVersion.equals(version);
- })
- .map(node -> MemberId.from(node.id().id()))
- .collect(Collectors.toList());
- }
+ return Collections2.transform(getMembers(), n -> MemberId.from(n.id()));
}
/**
* Attempts to rejoin the partition.
* @return future that is completed after the operation is complete
*/
- private CompletableFuture<Void> openServer() {
- StoragePartitionServer server = new StoragePartitionServer(
- this,
- MemberId.from(localNodeId.id()),
- clusterCommunicator);
- return server.open().thenRun(() -> this.server = server);
- }
-
- /**
- * Forks the server from the given version.
- *
- * @return future to be completed once the server has been forked
- */
- private CompletableFuture<Void> forkServer(Version version) {
- StoragePartitionServer server = new StoragePartitionServer(
- this,
- MemberId.from(localNodeId.id()),
- clusterCommunicator);
-
- CompletableFuture<Void> future;
- if (getMemberIds().size() == 1) {
- future = server.fork(version);
- } else {
- future = server.join(getMemberIds());
- }
- return future.thenRun(() -> this.server = server);
- }
+ protected abstract CompletableFuture<Void> openServer();
/**
* Attempts to join the partition as a new member.
@@ -267,7 +200,7 @@
client = new StoragePartitionClient(this,
MemberId.from(localNodeId.id()),
new RaftClientCommunicator(
- getName(),
+ String.format("partition-%s-%s", partition.getId(), partition.getVersion()),
Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
clusterCommunicator));
return client.open().thenApply(v -> client);
@@ -278,7 +211,9 @@
* @return future that is completed when the operation completes
*/
public CompletableFuture<Void> leaveCluster() {
- return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
+ return server != null
+ ? server.closeAndExit().thenRun(() -> server.delete())
+ : CompletableFuture.completedFuture(null);
}
@Override
@@ -286,6 +221,19 @@
return isOpened.get();
}
+ private CompletableFuture<Void> closeServer() {
+ if (server != null) {
+ return server.close();
+ }
+ return CompletableFuture.completedFuture(null);
+ }
+
+ private void deleteServer() {
+ if (server != null) {
+ server.delete();
+ }
+ }
+
private CompletableFuture<Void> closeClient() {
if (client != null) {
return client.close();
@@ -306,8 +254,7 @@
* Process updates to partitions and handles joining or leaving a partition.
* @param newValue new Partition
*/
- public void onUpdate(Partition newValue) {
-
+ void onUpdate(Partition newValue) {
boolean wasPresent = partition.getMembers().contains(localNodeId);
boolean isPresent = newValue.getMembers().contains(localNodeId);
this.partition = newValue;
@@ -315,7 +262,7 @@
// no action needed
return;
}
- //only need to do action if our membership changed
+ // Only need to do action if our membership changed
if (wasPresent) {
leaveCluster();
} else if (isPresent) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index dbe712d..c9cfe53 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -17,17 +17,21 @@
import java.io.File;
import java.io.IOException;
+import java.nio.file.FileVisitResult;
import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
import io.atomix.protocols.raft.RaftServer;
import io.atomix.protocols.raft.cluster.MemberId;
-import io.atomix.protocols.raft.cluster.RaftMember;
import io.atomix.protocols.raft.storage.RaftStorage;
import io.atomix.storage.StorageLevel;
-import org.onosproject.core.Version;
+import org.onosproject.cluster.Partition;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.primitives.resources.impl.AtomixSerializerAdapter;
import org.onosproject.store.service.PartitionInfo;
@@ -101,23 +105,46 @@
}
/**
+ * Deletes the server.
+ */
+ public void delete() {
+ try {
+ Files.walkFileTree(partition.getDataFolder().toPath(), new SimpleFileVisitor<Path>() {
+ @Override
+ public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+ Files.delete(file);
+ return FileVisitResult.CONTINUE;
+ }
+ @Override
+ public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
+ Files.delete(dir);
+ return FileVisitResult.CONTINUE;
+ }
+ });
+ } catch (IOException e) {
+ log.error("Failed to delete partition: {}", e);
+ }
+ }
+
+ /**
* Forks the existing partition into a new partition.
*
- * @param version the version from which to fork the server
+ * @param fromPartition the partition from which to fork the server
* @return future to be completed once the fork operation is complete
*/
- public CompletableFuture<Void> fork(Version version) {
- log.info("Forking server for partition {} ({}->{})", partition.getId(), version, partition.getVersion());
+ public CompletableFuture<Void> fork(Partition fromPartition) {
+ log.info("Forking server for partition {} ({}->{})",
+ partition.getId(), fromPartition.getVersion(), partition.getVersion());
RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
- .withName(partition.getName(version))
- .withType(RaftMember.Type.PASSIVE)
+ .withName(String.format("partition-%s", fromPartition.getId()))
.withProtocol(new RaftServerCommunicator(
- partition.getName(version),
+ String.format("partition-%s-%s", fromPartition.getId(), fromPartition.getVersion()),
Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
clusterCommunicator))
.withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
.withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
.withStorage(RaftStorage.newBuilder()
+ .withPrefix(String.format("partition-%s", partition.getId()))
.withStorageLevel(StorageLevel.MAPPED)
.withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
.withDirectory(partition.getDataFolder())
@@ -125,12 +152,27 @@
.build());
StoragePartition.RAFT_SERVICES.forEach(builder::addService);
RaftServer server = builder.build();
- return server.join(partition.getMemberIds(version))
- .thenCompose(v -> server.shutdown())
+
+ // Create a collection of members currently in the source partition.
+ Collection<MemberId> members = fromPartition.getMembers()
+ .stream()
+ .map(id -> MemberId.from(id.id()))
+ .collect(Collectors.toList());
+
+ // If this node is a member of the partition, join the partition. Otherwise, listen to the partition.
+ CompletableFuture<RaftServer> future = members.contains(localMemberId)
+ ? server.bootstrap(members) : server.listen(members);
+
+ // TODO: We should leave the cluster for nodes that aren't normally members to ensure the source
+ // cluster's configuration is kept consistent for rolling back upgrades, but Atomix deletes configuration
+ // files when a node leaves the cluster so we can't do that here.
+ return future.thenCompose(v -> server.shutdown())
.thenCompose(v -> {
// Delete the cluster configuration file from the forked partition.
try {
- Files.delete(new File(partition.getDataFolder(), "atomix.conf").toPath());
+ Files.delete(new File(
+ partition.getDataFolder(),
+ String.format("partition-%s.conf", partition.getId())).toPath());
} catch (IOException e) {
log.error("Failed to delete partition configuration: {}", e);
}
@@ -141,24 +183,25 @@
}).whenComplete((r, e) -> {
if (e == null) {
log.info("Successfully forked server for partition {} ({}->{})",
- partition.getId(), version, partition.getVersion());
+ partition.getId(), fromPartition.getVersion(), partition.getVersion());
} else {
log.info("Failed to fork server for partition {} ({}->{})",
- partition.getId(), version, partition.getVersion(), e);
+ partition.getId(), fromPartition.getVersion(), partition.getVersion(), e);
}
}).thenApply(v -> null);
}
private RaftServer buildServer() {
RaftServer.Builder builder = RaftServer.newBuilder(localMemberId)
- .withName(partition.getName())
+ .withName(String.format("partition-%s", partition.getId()))
.withProtocol(new RaftServerCommunicator(
- partition.getName(),
+ String.format("partition-%s-%s", partition.getId(), partition.getVersion()),
Serializer.using(StorageNamespaces.RAFT_PROTOCOL),
clusterCommunicator))
.withElectionTimeout(Duration.ofMillis(ELECTION_TIMEOUT_MILLIS))
.withHeartbeatInterval(Duration.ofMillis(HEARTBEAT_INTERVAL_MILLIS))
.withStorage(RaftStorage.newBuilder()
+ .withPrefix(String.format("partition-%s", partition.getId()))
.withStorageLevel(StorageLevel.MAPPED)
.withSerializer(new AtomixSerializerAdapter(Serializer.using(StorageNamespaces.RAFT_STORAGE)))
.withDirectory(partition.getDataFolder())
@@ -169,13 +212,13 @@
}
public CompletableFuture<Void> join(Collection<MemberId> otherMembers) {
- log.info("Joining partition {} ({})", partition.getId(), partition.getVersion());
+ log.info("Joining partition {} ({})", partition.getId(), partition.getName());
server = buildServer();
return server.join(otherMembers).whenComplete((r, e) -> {
if (e == null) {
- log.info("Successfully joined partition {} ({})", partition.getId(), partition.getVersion());
+ log.info("Successfully joined partition {} ({})", partition.getId(), partition.getName());
} else {
- log.info("Failed to join partition {} ({})", partition.getId(), partition.getVersion(), e);
+ log.info("Failed to join partition {} ({})", partition.getId(), partition.getName(), e);
}
}).thenApply(v -> null);
}
diff --git a/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionManagerTest.java b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionManagerTest.java
new file mode 100644
index 0000000..c93b10f
--- /dev/null
+++ b/core/store/primitives/src/test/java/org/onosproject/store/primitives/impl/PartitionManagerTest.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.junit.Test;
+import org.onosproject.cluster.DefaultPartition;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.cluster.Partition;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.core.Version;
+
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Partition manager test.
+ */
+public class PartitionManagerTest {
+
+ @Test
+ public void testComputeInitialIncompletePartition() throws Exception {
+ Partition sourcePartition = new DefaultPartition(
+ PartitionId.from(1),
+ Version.version("1.0.0"),
+ Arrays.asList(
+ NodeId.nodeId("1"),
+ NodeId.nodeId("2"),
+ NodeId.nodeId("3")));
+ Version targetVersion = Version.version("1.0.1");
+ List<NodeId> members = Arrays.asList(
+ NodeId.nodeId("1"),
+ NodeId.nodeId("2"));
+ Partition forkedPartition = PartitionManager.computeInitialPartition(sourcePartition, targetVersion, members);
+ assertTrue(forkedPartition.getMembers().size() == 2);
+ assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("1")));
+ assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("2")));
+ }
+
+ @Test
+ public void testComputeInitialCompletePartition() throws Exception {
+ Partition sourcePartition = new DefaultPartition(
+ PartitionId.from(1),
+ Version.version("1.0.0"),
+ Arrays.asList(
+ NodeId.nodeId("3"),
+ NodeId.nodeId("4"),
+ NodeId.nodeId("5")));
+ Version targetVersion = Version.version("1.0.1");
+ List<NodeId> members = Arrays.asList(
+ NodeId.nodeId("1"),
+ NodeId.nodeId("2"),
+ NodeId.nodeId("3"),
+ NodeId.nodeId("4"),
+ NodeId.nodeId("5"));
+ Partition forkedPartition = PartitionManager.computeInitialPartition(sourcePartition, targetVersion, members);
+ assertTrue(forkedPartition.getMembers().size() == 4);
+ assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("1")));
+ assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("3")));
+ assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("4")));
+ assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("5")));
+ }
+
+ @Test
+ public void testComputeFinalIncompletePartition() throws Exception {
+ Partition sourcePartition = new DefaultPartition(
+ PartitionId.from(1),
+ Version.version("1.0.0"),
+ Arrays.asList(
+ NodeId.nodeId("1"),
+ NodeId.nodeId("2"),
+ NodeId.nodeId("3")));
+ Version targetVersion = Version.version("1.0.1");
+ List<NodeId> members = Arrays.asList(
+ NodeId.nodeId("1"),
+ NodeId.nodeId("2"));
+ Partition forkedPartition = PartitionManager.computeFinalPartition(sourcePartition, targetVersion, members);
+ assertTrue(forkedPartition.getMembers().size() == 2);
+ assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("1")));
+ assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("2")));
+ }
+
+ @Test
+ public void testComputeFinalCompletePartition() throws Exception {
+ Partition sourcePartition = new DefaultPartition(
+ PartitionId.from(1),
+ Version.version("1.0.0"),
+ Arrays.asList(
+ NodeId.nodeId("3"),
+ NodeId.nodeId("4"),
+ NodeId.nodeId("5")));
+ Version targetVersion = Version.version("1.0.1");
+ List<NodeId> members = Arrays.asList(
+ NodeId.nodeId("1"),
+ NodeId.nodeId("2"),
+ NodeId.nodeId("3"),
+ NodeId.nodeId("4"),
+ NodeId.nodeId("5"));
+ Partition forkedPartition = PartitionManager.computeFinalPartition(sourcePartition, targetVersion, members);
+ assertTrue(forkedPartition.getMembers().size() == 3);
+ assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("3")));
+ assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("4")));
+ assertTrue(forkedPartition.getMembers().contains(NodeId.nodeId("5")));
+ }
+
+}