Cluster scaling enchancements
- Updated ConfigFileBasedClusterMetadataProvider to handle both file and http protocols.
- Server open logic updated to handle joining an existing cluster.
Change-Id: Idbaa39733c7bf814510c94c4b21e3714b3f97f8f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 5cdc5a9..89e1e82 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -23,8 +23,10 @@
import java.io.File;
import java.util.Collection;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
@@ -83,7 +85,9 @@
@Override
public CompletableFuture<Void> open() {
- openServer();
+ if (partition.getMembers().contains(localNodeId)) {
+ openServer();
+ }
return openClient().thenAccept(v -> isOpened.set(true))
.thenApply(v -> null);
}
@@ -120,6 +124,10 @@
return Collections2.transform(partition.getMembers(), this::toAddress);
}
+ /**
+ * Attempts to rejoin the partition.
+ * @return future that is completed after the operation is complete
+ */
private CompletableFuture<Void> openServer() {
if (!partition.getMembers().contains(localNodeId) || server != null) {
return CompletableFuture.completedFuture(null);
@@ -135,6 +143,26 @@
return server.open().thenRun(() -> this.server = server);
}
+ /**
+ * Attempts to join the partition as a new member.
+ * @return future that is completed after the operation is complete
+ */
+ private CompletableFuture<Void> joinCluster() {
+ Set<NodeId> otherMembers = partition.getMembers()
+ .stream()
+ .filter(nodeId -> !nodeId.equals(localNodeId))
+ .collect(Collectors.toSet());
+ StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
+ this,
+ serializer,
+ () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
+ partition.getId(),
+ messagingService),
+ RESOURCE_TYPES,
+ logFolder);
+ return server.join(Collections2.transform(otherMembers, this::toAddress)).thenRun(() -> this.server = server);
+ }
+
private CompletableFuture<StoragePartitionClient> openClient() {
client = new StoragePartitionClient(this,
serializer,
@@ -149,7 +177,7 @@
* Closes the partition server if it was previously opened.
* @return future that is completed when the operation completes
*/
- public CompletableFuture<Void> closeServer() {
+ public CompletableFuture<Void> leaveCluster() {
return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
}
@@ -181,15 +209,21 @@
* @return partition info
*/
public Optional<PartitionInfo> info() {
- return server != null ? Optional.of(server.info()) : Optional.empty();
+ return server != null && !server.isClosed() ? Optional.of(server.info()) : Optional.empty();
}
- public void onUpdate(Partition partition) {
- this.partition = partition;
+ public void onUpdate(Partition newValue) {
+ if (partition.getMembers().contains(localNodeId) && newValue.getMembers().contains(localNodeId)) {
+ return;
+ }
+ if (!partition.getMembers().contains(localNodeId) && !newValue.getMembers().contains(localNodeId)) {
+ return;
+ }
+ this.partition = newValue;
if (partition.getMembers().contains(localNodeId)) {
- openServer();
+ joinCluster();
} else if (!partition.getMembers().contains(localNodeId)) {
- closeServer();
+ leaveCluster();
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 6d613c3..7c59079 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -77,7 +77,7 @@
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
- server = buildServer();
+ server = buildServer(partition.getMemberAddresses());
}
serverOpenFuture = server.open();
} else {
@@ -109,12 +109,12 @@
return server.close();
}
- private CopycatServer buildServer() {
+ private CopycatServer buildServer(Collection<Address> clusterMembers) {
ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver();
ResourceRegistry registry = new ResourceRegistry();
resourceTypes.forEach(registry::register);
resourceResolver.resolve(registry);
- CopycatServer server = CopycatServer.builder(localAddress, partition.getMemberAddresses())
+ CopycatServer server = CopycatServer.builder(localAddress, clusterMembers)
.withName("partition-" + partition.getId())
.withSerializer(serializer.clone())
.withTransport(transport.get())
@@ -130,6 +130,18 @@
return server;
}
+ public CompletableFuture<Void> join(Collection<Address> otherMembers) {
+ server = buildServer(otherMembers);
+
+ return server.open().whenComplete((r, e) -> {
+ if (e == null) {
+ log.info("Successfully joined partition {}", partition.getId());
+ } else {
+ log.info("Failed to join partition {}", partition.getId(), e);
+ }
+ }).thenApply(v -> null);
+ }
+
@Override
public boolean isOpen() {
return server.isOpen();