Support for watching changes to static cluster metadata file
Change-Id: I5f9f89997288ca9a33a9e41f7520b875aceeffbe
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
index 825aa0a..5016811 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
@@ -15,11 +15,20 @@
*/
package org.onosproject.cluster.impl;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.io.File;
import java.io.IOException;
+import java.nio.file.FileSystems;
+import java.nio.file.Path;
+import java.nio.file.StandardWatchEventKinds;
+import java.nio.file.WatchEvent;
+import java.nio.file.WatchKey;
+import java.nio.file.WatchService;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.felix.scr.annotations.Activate;
@@ -71,13 +80,17 @@
private static final String PORT = "port";
private static final String IP = "ip";
- private static final File CONFIG_FILE = new File("../config/cluster.json");
+ private static final String CONFIG_DIR = "../config";
+ private static final String CONFIG_FILE_NAME = "cluster.json";
+ private static final File CONFIG_FILE = new File(CONFIG_DIR, CONFIG_FILE_NAME);
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterMetadataProviderRegistry providerRegistry;
private static final ProviderId PROVIDER_ID = new ProviderId("config", "none");
- private AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>();
+ private final AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>();
+ private final ExecutorService configFileChangeDetector =
+ Executors.newSingleThreadExecutor(groupedThreads("onos/cluster/metadata/config-watcher", ""));
private ObjectMapper mapper;
private ClusterMetadataProviderService providerService;
@@ -95,11 +108,20 @@
module.addDeserializer(PartitionId.class, new PartitionIdDeserializer());
mapper.registerModule(module);
providerService = providerRegistry.register(this);
+ configFileChangeDetector.execute(() -> {
+ try {
+ watchConfigFile();
+ } catch (IOException e) {
+ log.warn("Failure in setting up a watch for config "
+ + "file updates. updates to {} will be ignored", CONFIG_FILE, e);
+ }
+ });
log.info("Started");
}
@Deactivate
public void deactivate() {
+ configFileChangeDetector.shutdown();
providerRegistry.unregister(this);
log.info("Stopped");
}
@@ -114,7 +136,7 @@
checkState(isAvailable());
synchronized (this) {
if (cachedMetadata.get() == null) {
- loadMetadata();
+ cachedMetadata.set(fetchMetadata());
}
return cachedMetadata.get();
}
@@ -151,20 +173,20 @@
return CONFIG_FILE.exists();
}
- private void loadMetadata() {
+ private Versioned<ClusterMetadata> fetchMetadata() {
ClusterMetadata metadata = null;
long version = 0;
try {
metadata = mapper.readValue(CONFIG_FILE, ClusterMetadata.class);
- version = metadata.hashCode();
+ version = CONFIG_FILE.lastModified();
} catch (IOException e) {
Throwables.propagate(e);
}
- cachedMetadata.set(new Versioned<>(new ClusterMetadata(PROVIDER_ID,
- metadata.getName(),
- Sets.newHashSet(metadata.getNodes()),
- Sets.newHashSet(metadata.getPartitions())),
- version));
+ return new Versioned<>(new ClusterMetadata(PROVIDER_ID,
+ metadata.getName(),
+ Sets.newHashSet(metadata.getNodes()),
+ Sets.newHashSet(metadata.getPartitions())),
+ version);
}
private static class PartitionDeserializer extends JsonDeserializer<Partition> {
@@ -232,4 +254,34 @@
return new NodeId(node.asText());
}
}
+
+ /**
+ * Monitors the config file for any updates and notifies providerService accordingly.
+ * @throws IOException
+ */
+ private void watchConfigFile() throws IOException {
+ WatchService watcher = FileSystems.getDefault().newWatchService();
+ Path configFilePath = FileSystems.getDefault().getPath(CONFIG_DIR);
+ configFilePath.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY);
+ while (true) {
+ try {
+ final WatchKey watchKey = watcher.take();
+ for (WatchEvent<?> event : watchKey.pollEvents()) {
+ final Path changed = (Path) event.context();
+ log.info("{} was updated", changed);
+ // TODO: Fix concurrency issues
+ Versioned<ClusterMetadata> latestMetadata = fetchMetadata();
+ cachedMetadata.set(latestMetadata);
+ providerService.clusterMetadataChanged(latestMetadata);
+ }
+ if (!watchKey.reset()) {
+ log.debug("WatchKey has been unregistered");
+ break;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
}
\ No newline at end of file