Cluster scaling enchancements
	- Updated ConfigFileBasedClusterMetadataProvider to handle both file and http protocols.
	- Server open logic updated to handle joining an existing cluster.

Change-Id: Idbaa39733c7bf814510c94c4b21e3714b3f97f8f
diff --git a/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java b/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
index 74375dd..dfcc1f3 100644
--- a/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
@@ -43,6 +43,9 @@
      * @param partitionInfo partition descriptions
      */
     private void displayPartitions(List<PartitionInfo> partitionInfo) {
+        if (partitionInfo.isEmpty()) {
+            return;
+        }
         print("----------------------------------------------------------");
         print(FMT, "Name", "Term", "Members", "");
         print("----------------------------------------------------------");
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java
index 49a24db..f655fcc 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ClusterMetadataManager.java
@@ -19,10 +19,10 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.net.InetAddress;
+import java.net.MalformedURLException;
 import java.net.NetworkInterface;
 import java.net.SocketException;
-import java.net.URI;
-import java.net.URISyntaxException;
+import java.net.URL;
 import java.util.Collection;
 import java.util.Enumeration;
 
@@ -47,8 +47,6 @@
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 
-import com.google.common.base.Throwables;
-
 /**
  * Implementation of ClusterMetadataService.
  */
@@ -126,11 +124,15 @@
      * @return primary cluster metadata provider
      */
     private ClusterMetadataProvider getPrimaryProvider() {
+        String metadataUri = System.getProperty("onos.cluster.metadata.uri");
         try {
-            URI uri = new URI(System.getProperty("onos.cluster.metadata.uri", "config:///cluster.json"));
-            return getProvider(uri.getScheme());
-        } catch (URISyntaxException e) {
-            Throwables.propagate(e);
+            String protocol = metadataUri == null ? null : new URL(metadataUri).getProtocol();
+            if (protocol != null && (!protocol.equals("file") && !protocol.equals("http"))) {
+                return getProvider(protocol);
+            }
+            // file provider supports both "file" and "http" uris
+            return getProvider("file");
+        } catch (MalformedURLException e) {
             return null;
         }
     }
diff --git a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
index 5016811..267396c 100644
--- a/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
+++ b/core/net/src/main/java/org/onosproject/cluster/impl/ConfigFileBasedClusterMetadataProvider.java
@@ -19,16 +19,14 @@
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.file.FileSystems;
-import java.nio.file.Path;
-import java.nio.file.StandardWatchEventKinds;
-import java.nio.file.WatchEvent;
-import java.nio.file.WatchKey;
-import java.nio.file.WatchService;
+import java.net.URL;
+import java.net.URLConnection;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.felix.scr.annotations.Activate;
@@ -87,11 +85,12 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterMetadataProviderRegistry providerRegistry;
 
-    private static final ProviderId PROVIDER_ID = new ProviderId("config", "none");
+    private static final ProviderId PROVIDER_ID = new ProviderId("file", "none");
     private final AtomicReference<Versioned<ClusterMetadata>> cachedMetadata = new AtomicReference<>();
-    private final ExecutorService configFileChangeDetector =
-            Executors.newSingleThreadExecutor(groupedThreads("onos/cluster/metadata/config-watcher", ""));
+    private final ScheduledExecutorService configFileChangeDetector =
+            Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/cluster/metadata/config-watcher", ""));
 
+    private String metadataUrl;
     private ObjectMapper mapper;
     private ClusterMetadataProviderService providerService;
 
@@ -108,14 +107,8 @@
         module.addDeserializer(PartitionId.class, new PartitionIdDeserializer());
         mapper.registerModule(module);
         providerService = providerRegistry.register(this);
-        configFileChangeDetector.execute(() -> {
-            try {
-                watchConfigFile();
-            } catch (IOException e) {
-                log.warn("Failure in setting up a watch for config "
-                        + "file updates. updates to {} will be ignored", CONFIG_FILE, e);
-            }
-        });
+        metadataUrl = System.getProperty("onos.cluster.metadata.uri", "file://" + CONFIG_DIR + "/" + CONFIG_FILE);
+        configFileChangeDetector.scheduleWithFixedDelay(() -> watchUrl(metadataUrl), 100, 500, TimeUnit.MILLISECONDS);
         log.info("Started");
     }
 
@@ -136,7 +129,7 @@
         checkState(isAvailable());
         synchronized (this) {
             if (cachedMetadata.get() == null) {
-                cachedMetadata.set(fetchMetadata());
+                cachedMetadata.set(fetchMetadata(metadataUrl));
             }
             return cachedMetadata.get();
         }
@@ -170,23 +163,45 @@
 
     @Override
     public boolean isAvailable() {
-        return CONFIG_FILE.exists();
+        try {
+            URL url = new URL(metadataUrl);
+            if (url.getProtocol().equals("file")) {
+                File file = new File(metadataUrl.replaceFirst("file://", ""));
+                return file.exists();
+            } else if (url.getProtocol().equals("http")) {
+                url.openStream();
+                return true;
+            } else {
+                // Unsupported protocol
+                return false;
+            }
+        } catch (Exception e) {
+            return false;
+        }
     }
 
-    private Versioned<ClusterMetadata> fetchMetadata() {
-        ClusterMetadata metadata = null;
-        long version = 0;
+    private Versioned<ClusterMetadata> fetchMetadata(String metadataUrl) {
         try {
-            metadata = mapper.readValue(CONFIG_FILE, ClusterMetadata.class);
-            version = CONFIG_FILE.lastModified();
+            URL url = new URL(metadataUrl);
+            ClusterMetadata metadata = null;
+            long version = 0;
+            if (url.getProtocol().equals("file")) {
+                File file = new File(metadataUrl.replaceFirst("file://", ""));
+                version = file.lastModified();
+                metadata = mapper.readValue(new FileInputStream(file), ClusterMetadata.class);
+            } else if (url.getProtocol().equals("http")) {
+                URLConnection conn = url.openConnection();
+                version = conn.getLastModified();
+                metadata = mapper.readValue(conn.getInputStream(), ClusterMetadata.class);
+            }
+            return new Versioned<>(new ClusterMetadata(PROVIDER_ID,
+                                                       metadata.getName(),
+                                                       Sets.newHashSet(metadata.getNodes()),
+                                                       Sets.newHashSet(metadata.getPartitions())),
+                                   version);
         } catch (IOException e) {
-            Throwables.propagate(e);
+            throw Throwables.propagate(e);
         }
-        return new Versioned<>(new ClusterMetadata(PROVIDER_ID,
-                                                   metadata.getName(),
-                                                   Sets.newHashSet(metadata.getNodes()),
-                                                   Sets.newHashSet(metadata.getPartitions())),
-                               version);
     }
 
     private static class PartitionDeserializer extends JsonDeserializer<Partition> {
@@ -256,32 +271,16 @@
     }
 
     /**
-     * Monitors the config file for any updates and notifies providerService accordingly.
+     * Monitors the metadata url for any updates and notifies providerService accordingly.
      * @throws IOException
      */
-    private void watchConfigFile() throws IOException {
-        WatchService watcher = FileSystems.getDefault().newWatchService();
-        Path configFilePath = FileSystems.getDefault().getPath(CONFIG_DIR);
-        configFilePath.register(watcher, StandardWatchEventKinds.ENTRY_MODIFY);
-        while (true) {
-            try {
-                final WatchKey watchKey = watcher.take();
-                for (WatchEvent<?> event : watchKey.pollEvents()) {
-                    final Path changed = (Path) event.context();
-                    log.info("{} was updated", changed);
-                    // TODO: Fix concurrency issues
-                    Versioned<ClusterMetadata> latestMetadata = fetchMetadata();
-                    cachedMetadata.set(latestMetadata);
-                    providerService.clusterMetadataChanged(latestMetadata);
-                }
-                if (!watchKey.reset()) {
-                    log.debug("WatchKey has been unregistered");
-                    break;
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                break;
-            }
+    private void watchUrl(String metadataUrl) {
+        // TODO: We are merely polling the url.
+        // This can be easily addressed for files. For http urls we need to move to a push style protocol.
+        Versioned<ClusterMetadata> latestMetadata = fetchMetadata(metadataUrl);
+        if (cachedMetadata.get() != null && cachedMetadata.get().version() < latestMetadata.version()) {
+            cachedMetadata.set(latestMetadata);
+            providerService.clusterMetadataChanged(latestMetadata);
         }
     }
 }
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 5cdc5a9..89e1e82 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -23,8 +23,10 @@
 import java.io.File;
 import java.util.Collection;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
@@ -83,7 +85,9 @@
 
     @Override
     public CompletableFuture<Void> open() {
-        openServer();
+        if (partition.getMembers().contains(localNodeId)) {
+            openServer();
+        }
         return openClient().thenAccept(v -> isOpened.set(true))
                            .thenApply(v -> null);
     }
@@ -120,6 +124,10 @@
         return Collections2.transform(partition.getMembers(), this::toAddress);
     }
 
+    /**
+     * Attempts to rejoin the partition.
+     * @return future that is completed after the operation is complete
+     */
     private CompletableFuture<Void> openServer() {
         if (!partition.getMembers().contains(localNodeId) || server != null) {
             return CompletableFuture.completedFuture(null);
@@ -135,6 +143,26 @@
         return server.open().thenRun(() -> this.server = server);
     }
 
+    /**
+     * Attempts to join the partition as a new member.
+     * @return future that is completed after the operation is complete
+     */
+    private CompletableFuture<Void> joinCluster() {
+        Set<NodeId> otherMembers = partition.getMembers()
+                 .stream()
+                 .filter(nodeId -> !nodeId.equals(localNodeId))
+                 .collect(Collectors.toSet());
+        StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
+                this,
+                serializer,
+                () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
+                                     partition.getId(),
+                                     messagingService),
+                RESOURCE_TYPES,
+                logFolder);
+        return server.join(Collections2.transform(otherMembers, this::toAddress)).thenRun(() -> this.server = server);
+    }
+
     private CompletableFuture<StoragePartitionClient> openClient() {
         client = new StoragePartitionClient(this,
                 serializer,
@@ -149,7 +177,7 @@
      * Closes the partition server if it was previously opened.
      * @return future that is completed when the operation completes
      */
-    public CompletableFuture<Void> closeServer() {
+    public CompletableFuture<Void> leaveCluster() {
         return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
     }
 
@@ -181,15 +209,21 @@
      * @return partition info
      */
     public Optional<PartitionInfo> info() {
-        return server != null ? Optional.of(server.info()) : Optional.empty();
+        return server != null && !server.isClosed() ? Optional.of(server.info()) : Optional.empty();
     }
 
-    public void onUpdate(Partition partition) {
-        this.partition = partition;
+    public void onUpdate(Partition newValue) {
+        if (partition.getMembers().contains(localNodeId) && newValue.getMembers().contains(localNodeId)) {
+            return;
+        }
+        if (!partition.getMembers().contains(localNodeId) && !newValue.getMembers().contains(localNodeId)) {
+            return;
+        }
+        this.partition = newValue;
         if (partition.getMembers().contains(localNodeId)) {
-            openServer();
+            joinCluster();
         } else if (!partition.getMembers().contains(localNodeId)) {
-            closeServer();
+            leaveCluster();
         }
     }
 }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
index 6d613c3..7c59079 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionServer.java
@@ -77,7 +77,7 @@
                 return CompletableFuture.completedFuture(null);
             }
             synchronized (this) {
-                server = buildServer();
+                server = buildServer(partition.getMemberAddresses());
             }
             serverOpenFuture = server.open();
         } else {
@@ -109,12 +109,12 @@
         return server.close();
     }
 
-    private CopycatServer buildServer() {
+    private CopycatServer buildServer(Collection<Address> clusterMembers) {
         ResourceTypeResolver resourceResolver = new ServiceLoaderResourceResolver();
         ResourceRegistry registry = new ResourceRegistry();
         resourceTypes.forEach(registry::register);
         resourceResolver.resolve(registry);
-        CopycatServer server = CopycatServer.builder(localAddress, partition.getMemberAddresses())
+        CopycatServer server = CopycatServer.builder(localAddress, clusterMembers)
                 .withName("partition-" + partition.getId())
                 .withSerializer(serializer.clone())
                 .withTransport(transport.get())
@@ -130,6 +130,18 @@
         return server;
     }
 
+    public CompletableFuture<Void> join(Collection<Address> otherMembers) {
+        server = buildServer(otherMembers);
+
+        return server.open().whenComplete((r, e) -> {
+            if (e == null) {
+                log.info("Successfully joined partition {}", partition.getId());
+            } else {
+                log.info("Failed to join partition {}", partition.getId(), e);
+            }
+        }).thenApply(v -> null);
+    }
+
     @Override
     public boolean isOpen() {
         return server.isOpen();