Cluster scaling enchancements
- Updated ConfigFileBasedClusterMetadataProvider to handle both file and http protocols.
- Server open logic updated to handle joining an existing cluster.
Change-Id: Idbaa39733c7bf814510c94c4b21e3714b3f97f8f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 6d613c3..7c59079 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -77,7 +77,7 @@
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
- server = buildServer();
+ server = buildServer(partition.getMemberAddresses());
}
serverOpenFuture = server.open();
} else {
@@ -109,12 +109,12 @@
return server.close();
}
- private CopycatServer buildServer() {
+ private CopycatServer buildServer(Collection<Address> clusterMembers) {
ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver();
ResourceRegistry registry = new ResourceRegistry();
resourceTypes.forEach(registry::register);
resourceResolver.resolve(registry);
- CopycatServer server = CopycatServer.builder(localAddress, partition.getMemberAddresses())
+ CopycatServer server = CopycatServer.builder(localAddress, clusterMembers)
.withName("partition-" + partition.getId())
.withSerializer(serializer.clone())
.withTransport(transport.get())
@@ -130,6 +130,18 @@
return server;
}
+ public CompletableFuture<Void> join(Collection<Address> otherMembers) {
+ server = buildServer(otherMembers);
+
+ return server.open().whenComplete((r, e) -> {
+ if (e == null) {
+ log.info("Successfully joined partition {}", partition.getId());
+ } else {
+ log.info("Failed to join partition {}", partition.getId(), e);
+ }
+ }).thenApply(v -> null);
+ }
+
@Override
public boolean isOpen() {
return server.isOpen();