Cluster scaling enchancements
- Updated ConfigFileBasedClusterMetadataProvider to handle both file and http protocols.
- Server open logic updated to handle joining an existing cluster.
Change-Id: Idbaa39733c7bf814510c94c4b21e3714b3f97f8f
diff --git a/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java b/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
index 74375dd..dfcc1f3 100644
--- a/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
@@ -43,6 +43,9 @@
* @param partitionInfo partition descriptions
*/
private void displayPartitions(List<PartitionInfo> partitionInfo) {
+ if (partitionInfo.isEmpty()) {
+ return;
+ }
print("----------------------------------------------------------");
print(FMT, "Name", "Term", "Members", "");
print("----------------------------------------------------------");
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java
index 49a24db..f655fcc 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java
@@ -19,10 +19,10 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.net.InetAddress;
+import java.net.MalformedURLException;
import java.net.NetworkInterface;
import java.net.SocketException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.net.URL;
import java.util.Collection;
import java.util.Enumeration;
@@ -47,8 +47,6 @@
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
-import com.google.common.base.Throwables;
-
/**
* Implementation of ClusterMetadataService.
*/
@@ -126,11 +124,15 @@
* @return primary cluster metadata provider
*/
private ClusterMetadataProvider getPrimaryProvider() {
+ String metadataUri = System.getProperty("onos.cluster.metadata.uri");
try {
- URI uri = new URI(System.getProperty("onos.cluster.metadata.uri", "config:///cluster.json"));
- return getProvider(uri.getScheme());
- } catch (URISyntaxException e) {
- Throwables.propagate(e);
+ String protocol = metadataUri == null ? null : new URL(metadataUri).getProtocol();
+ if (protocol != null && (!protocol.equals("file") && !protocol.equals("http"))) {
+ return getProvider(protocol);
+ }
+ // file provider supports both "file" and "http" uris
+ return getProvider("file");
+ } catch (MalformedURLException e) {
return null;
}
}
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
index 5016811..267396c 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
@@ -19,16 +19,14 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.nio.file.StandardWatchEventKinds;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
+import java.net.URL;
+import java.net.URLConnection;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.felix.scr.annotations.Activate;
@@ -87,11 +85,12 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataProviderRegistry providerRegistry;
- private static final ProviderId PROVIDER_ID = new ProviderId("config", "none");
+ private static final ProviderId PROVIDER_ID = new ProviderId("file", "none");
private final AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>();
- private final ExecutorService configFileChangeDetector =
- Executors.newSingleThreadExecutor(groupedThreads("onos/cluster/metadata/config-watcher", ""));
+ private final ScheduledExecutorService configFileChangeDetector =
+ Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/cluster/metadata/config-watcher", ""));
+ private String metadataUrl;
private ObjectMapper mapper;
private ClusterMetadataProviderService providerService;
@@ -108,14 +107,8 @@
module.addDeserializer(PartitionId.class, new PartitionIdDeserializer());
mapper.registerModule(module);
providerService = providerRegistry.register(this);
- configFileChangeDetector.execute(() -> {
- try {
- watchConfigFile();
- } catch (IOException e) {
- log.warn("Failure in setting up a watch for config "
- + "file updates. updates to {} will be ignored", CONFIG_FILE, e);
- }
- });
+ metadataUrl = System.getProperty("onos.cluster.metadata.uri", "file://" + CONFIG_DIR + "/" + CONFIG_FILE);
+ configFileChangeDetector.scheduleWithFixedDelay(() -> watchUrl(metadataUrl), 100, 500, TimeUnit.MILLISECONDS);
log.info("Started");
}
@@ -136,7 +129,7 @@
checkState(isAvailable());
synchronized (this) {
if (cachedMetadata.get() == null) {
- cachedMetadata.set(fetchMetadata());
+ cachedMetadata.set(fetchMetadata(metadataUrl));
}
return cachedMetadata.get();
}
@@ -170,23 +163,45 @@
@Override
public boolean isAvailable() {
- return CONFIG_FILE.exists();
+ try {
+ URL url = new URL(metadataUrl);
+ if (url.getProtocol().equals("file")) {
+ File file = new File(metadataUrl.replaceFirst("file://", ""));
+ return file.exists();
+ } else if (url.getProtocol().equals("http")) {
+ url.openStream();
+ return true;
+ } else {
+ // Unsupported protocol
+ return false;
+ }
+ } catch (Exception e) {
+ return false;
+ }
}
- private Versioned<ClusterMetadata> fetchMetadata() {
- ClusterMetadata metadata = null;
- long version = 0;
+ private Versioned<ClusterMetadata> fetchMetadata(String metadataUrl) {
try {
- metadata = mapper.readValue(CONFIG_FILE, ClusterMetadata.class);
- version = CONFIG_FILE.lastModified();
+ URL url = new URL(metadataUrl);
+ ClusterMetadata metadata = null;
+ long version = 0;
+ if (url.getProtocol().equals("file")) {
+ File file = new File(metadataUrl.replaceFirst("file://", ""));
+ version = file.lastModified();
+ metadata = mapper.readValue(new FileInputStream(file), ClusterMetadata.class);
+ } else if (url.getProtocol().equals("http")) {
+ URLConnection conn = url.openConnection();
+ version = conn.getLastModified();
+ metadata = mapper.readValue(conn.getInputStream(), ClusterMetadata.class);
+ }
+ return new Versioned<>(new ClusterMetadata(PROVIDER_ID,
+ metadata.getName(),
+ Sets.newHashSet(metadata.getNodes()),
+ Sets.newHashSet(metadata.getPartitions())),
+ version);
} catch (IOException e) {
- Throwables.propagate(e);
+ throw Throwables.propagate(e);
}
- return new Versioned<>(new ClusterMetadata(PROVIDER_ID,
- metadata.getName(),
- Sets.newHashSet(metadata.getNodes()),
- Sets.newHashSet(metadata.getPartitions())),
- version);
}
private static class PartitionDeserializer extends JsonDeserializer<Partition> {
@@ -256,32 +271,16 @@
}
/**
- * Monitors the config file for any updates and notifies providerService accordingly.
+ * Monitors the metadata url for any updates and notifies providerService accordingly.
* @throws IOException
*/
- private void watchConfigFile() throws IOException {
- WatchService watcher = FileSystems.getDefault().newWatchService();
- Path configFilePath = FileSystems.getDefault().getPath(CONFIG_DIR);
- configFilePath.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY);
- while (true) {
- try {
- final WatchKey watchKey = watcher.take();
- for (WatchEvent<?> event : watchKey.pollEvents()) {
- final Path changed = (Path) event.context();
- log.info("{} was updated", changed);
- // TODO: Fix concurrency issues
- Versioned<ClusterMetadata> latestMetadata = fetchMetadata();
- cachedMetadata.set(latestMetadata);
- providerService.clusterMetadataChanged(latestMetadata);
- }
- if (!watchKey.reset()) {
- log.debug("WatchKey has been unregistered");
- break;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- }
+ private void watchUrl(String metadataUrl) {
+ // TODO: We are merely polling the url.
+ // This can be easily addressed for files. For http urls we need to move to a push style protocol.
+ Versioned<ClusterMetadata> latestMetadata = fetchMetadata(metadataUrl);
+ if (cachedMetadata.get() != null && cachedMetadata.get().version() < latestMetadata.version()) {
+ cachedMetadata.set(latestMetadata);
+ providerService.clusterMetadataChanged(latestMetadata);
}
}
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 5cdc5a9..89e1e82 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -23,8 +23,10 @@
import java.io.File;
import java.util.Collection;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.ControllerNode;
@@ -83,7 +85,9 @@
@Override
public CompletableFuture<Void> open() {
- openServer();
+ if (partition.getMembers().contains(localNodeId)) {
+ openServer();
+ }
return openClient().thenAccept(v -> isOpened.set(true))
.thenApply(v -> null);
}
@@ -120,6 +124,10 @@
return Collections2.transform(partition.getMembers(), this::toAddress);
}
+ /**
+ * Attempts to rejoin the partition.
+ * @return future that is completed after the operation is complete
+ */
private CompletableFuture<Void> openServer() {
if (!partition.getMembers().contains(localNodeId) || server != null) {
return CompletableFuture.completedFuture(null);
@@ -135,6 +143,26 @@
return server.open().thenRun(() -> this.server = server);
}
+ /**
+ * Attempts to join the partition as a new member.
+ * @return future that is completed after the operation is complete
+ */
+ private CompletableFuture<Void> joinCluster() {
+ Set<NodeId> otherMembers = partition.getMembers()
+ .stream()
+ .filter(nodeId -> !nodeId.equals(localNodeId))
+ .collect(Collectors.toSet());
+ StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
+ this,
+ serializer,
+ () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
+ partition.getId(),
+ messagingService),
+ RESOURCE_TYPES,
+ logFolder);
+ return server.join(Collections2.transform(otherMembers, this::toAddress)).thenRun(() -> this.server = server);
+ }
+
private CompletableFuture<StoragePartitionClient> openClient() {
client = new StoragePartitionClient(this,
serializer,
@@ -149,7 +177,7 @@
* Closes the partition server if it was previously opened.
* @return future that is completed when the operation completes
*/
- public CompletableFuture<Void> closeServer() {
+ public CompletableFuture<Void> leaveCluster() {
return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
}
@@ -181,15 +209,21 @@
* @return partition info
*/
public Optional<PartitionInfo> info() {
- return server != null ? Optional.of(server.info()) : Optional.empty();
+ return server != null && !server.isClosed() ? Optional.of(server.info()) : Optional.empty();
}
- public void onUpdate(Partition partition) {
- this.partition = partition;
+ public void onUpdate(Partition newValue) {
+ if (partition.getMembers().contains(localNodeId) && newValue.getMembers().contains(localNodeId)) {
+ return;
+ }
+ if (!partition.getMembers().contains(localNodeId) && !newValue.getMembers().contains(localNodeId)) {
+ return;
+ }
+ this.partition = newValue;
if (partition.getMembers().contains(localNodeId)) {
- openServer();
+ joinCluster();
} else if (!partition.getMembers().contains(localNodeId)) {
- closeServer();
+ leaveCluster();
}
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 6d613c3..7c59079 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -77,7 +77,7 @@
return CompletableFuture.completedFuture(null);
}
synchronized (this) {
- server = buildServer();
+ server = buildServer(partition.getMemberAddresses());
}
serverOpenFuture = server.open();
} else {
@@ -109,12 +109,12 @@
return server.close();
}
- private CopycatServer buildServer() {
+ private CopycatServer buildServer(Collection<Address> clusterMembers) {
ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver();
ResourceRegistry registry = new ResourceRegistry();
resourceTypes.forEach(registry::register);
resourceResolver.resolve(registry);
- CopycatServer server = CopycatServer.builder(localAddress, partition.getMemberAddresses())
+ CopycatServer server = CopycatServer.builder(localAddress, clusterMembers)
.withName("partition-" + partition.getId())
.withSerializer(serializer.clone())
.withTransport(transport.get())
@@ -130,6 +130,18 @@
return server;
}
+ public CompletableFuture<Void> join(Collection<Address> otherMembers) {
+ server = buildServer(otherMembers);
+
+ return server.open().whenComplete((r, e) -> {
+ if (e == null) {
+ log.info("Successfully joined partition {}", partition.getId());
+ } else {
+ log.info("Failed to join partition {}", partition.getId(), e);
+ }
+ }).thenApply(v -> null);
+ }
+
@Override
public boolean isOpen() {
return server.isOpen();