Cluster scaling enchancements
- Updated ConfigFileBasedClusterMetadataProvider to handle both file and http protocols.
- Server open logic updated to handle joining an existing cluster.
Change-Id: Idbaa39733c7bf814510c94c4b21e3714b3f97f8f
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
index 5016811..267396c 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
@@ -19,16 +19,14 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.nio.file.StandardWatchEventKinds;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
+import java.net.URL;
+import java.net.URLConnection;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.felix.scr.annotations.Activate;
@@ -87,11 +85,12 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataProviderRegistry providerRegistry;
- private static final ProviderId PROVIDER_ID = new ProviderId("config", "none");
+ private static final ProviderId PROVIDER_ID = new ProviderId("file", "none");
private final AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>();
- private final ExecutorService configFileChangeDetector =
- Executors.newSingleThreadExecutor(groupedThreads("onos/cluster/metadata/config-watcher", ""));
+ private final ScheduledExecutorService configFileChangeDetector =
+ Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/cluster/metadata/config-watcher", ""));
+ private String metadataUrl;
private ObjectMapper mapper;
private ClusterMetadataProviderService providerService;
@@ -108,14 +107,8 @@
module.addDeserializer(PartitionId.class, new PartitionIdDeserializer());
mapper.registerModule(module);
providerService = providerRegistry.register(this);
- configFileChangeDetector.execute(() -> {
- try {
- watchConfigFile();
- } catch (IOException e) {
- log.warn("Failure in setting up a watch for config "
- + "file updates. updates to {} will be ignored", CONFIG_FILE, e);
- }
- });
+ metadataUrl = System.getProperty("onos.cluster.metadata.uri", "file://" + CONFIG_DIR + "/" + CONFIG_FILE);
+ configFileChangeDetector.scheduleWithFixedDelay(() -> watchUrl(metadataUrl), 100, 500, TimeUnit.MILLISECONDS);
log.info("Started");
}
@@ -136,7 +129,7 @@
checkState(isAvailable());
synchronized (this) {
if (cachedMetadata.get() == null) {
- cachedMetadata.set(fetchMetadata());
+ cachedMetadata.set(fetchMetadata(metadataUrl));
}
return cachedMetadata.get();
}
@@ -170,23 +163,45 @@
@Override
public boolean isAvailable() {
- return CONFIG_FILE.exists();
+ try {
+ URL url = new URL(metadataUrl);
+ if (url.getProtocol().equals("file")) {
+ File file = new File(metadataUrl.replaceFirst("file://", ""));
+ return file.exists();
+ } else if (url.getProtocol().equals("http")) {
+ url.openStream();
+ return true;
+ } else {
+ // Unsupported protocol
+ return false;
+ }
+ } catch (Exception e) {
+ return false;
+ }
}
- private Versioned<ClusterMetadata> fetchMetadata() {
- ClusterMetadata metadata = null;
- long version = 0;
+ private Versioned<ClusterMetadata> fetchMetadata(String metadataUrl) {
try {
- metadata = mapper.readValue(CONFIG_FILE, ClusterMetadata.class);
- version = CONFIG_FILE.lastModified();
+ URL url = new URL(metadataUrl);
+ ClusterMetadata metadata = null;
+ long version = 0;
+ if (url.getProtocol().equals("file")) {
+ File file = new File(metadataUrl.replaceFirst("file://", ""));
+ version = file.lastModified();
+ metadata = mapper.readValue(new FileInputStream(file), ClusterMetadata.class);
+ } else if (url.getProtocol().equals("http")) {
+ URLConnection conn = url.openConnection();
+ version = conn.getLastModified();
+ metadata = mapper.readValue(conn.getInputStream(), ClusterMetadata.class);
+ }
+ return new Versioned<>(new ClusterMetadata(PROVIDER_ID,
+ metadata.getName(),
+ Sets.newHashSet(metadata.getNodes()),
+ Sets.newHashSet(metadata.getPartitions())),
+ version);
} catch (IOException e) {
- Throwables.propagate(e);
+ throw Throwables.propagate(e);
}
- return new Versioned<>(new ClusterMetadata(PROVIDER_ID,
- metadata.getName(),
- Sets.newHashSet(metadata.getNodes()),
- Sets.newHashSet(metadata.getPartitions())),
- version);
}
private static class PartitionDeserializer extends JsonDeserializer<Partition> {
@@ -256,32 +271,16 @@
}
/**
- * Monitors the config file for any updates and notifies providerService accordingly.
+ * Monitors the metadata url for any updates and notifies providerService accordingly.
* @throws IOException
*/
- private void watchConfigFile() throws IOException {
- WatchService watcher = FileSystems.getDefault().newWatchService();
- Path configFilePath = FileSystems.getDefault().getPath(CONFIG_DIR);
- configFilePath.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY);
- while (true) {
- try {
- final WatchKey watchKey = watcher.take();
- for (WatchEvent<?> event : watchKey.pollEvents()) {
- final Path changed = (Path) event.context();
- log.info("{} was updated", changed);
- // TODO: Fix concurrency issues
- Versioned<ClusterMetadata> latestMetadata = fetchMetadata();
- cachedMetadata.set(latestMetadata);
- providerService.clusterMetadataChanged(latestMetadata);
- }
- if (!watchKey.reset()) {
- log.debug("WatchKey has been unregistered");
- break;
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- break;
- }
+ private void watchUrl(String metadataUrl) {
+ // TODO: We are merely polling the url.
+ // This can be easily addressed for files. For http urls we need to move to a push style protocol.
+ Versioned<ClusterMetadata> latestMetadata = fetchMetadata(metadataUrl);
+ if (cachedMetadata.get() != null && cachedMetadata.get().version() < latestMetadata.version()) {
+ cachedMetadata.set(latestMetadata);
+ providerService.clusterMetadataChanged(latestMetadata);
}
}
}
\ No newline at end of file