Cluster scaling enchancements
	- Updated ConfigFileBasedClusterMetadataProvider to handle both file and http protocols.
	- Server open logic updated to handle joining an existing cluster.

Change-Id: Idbaa39733c7bf814510c94c4b21e3714b3f97f8f
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
index 5cdc5a9..89e1e82 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartition.java
@@ -23,8 +23,10 @@
 import java.io.File;
 import java.util.Collection;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.ControllerNode;
@@ -83,7 +85,9 @@
 
     @Override
     public CompletableFuture<Void> open() {
-        openServer();
+        if (partition.getMembers().contains(localNodeId)) {
+            openServer();
+        }
         return openClient().thenAccept(v -> isOpened.set(true))
                            .thenApply(v -> null);
     }
@@ -120,6 +124,10 @@
         return Collections2.transform(partition.getMembers(), this::toAddress);
     }
 
+    /**
+     * Attempts to rejoin the partition.
+     * @return future that is completed after the operation is complete
+     */
     private CompletableFuture<Void> openServer() {
         if (!partition.getMembers().contains(localNodeId) || server != null) {
             return CompletableFuture.completedFuture(null);
@@ -135,6 +143,26 @@
         return server.open().thenRun(() -> this.server = server);
     }
 
+    /**
+     * Attempts to join the partition as a new member.
+     * @return future that is completed after the operation is complete
+     */
+    private CompletableFuture<Void> joinCluster() {
+        Set<NodeId> otherMembers = partition.getMembers()
+                 .stream()
+                 .filter(nodeId -> !nodeId.equals(localNodeId))
+                 .collect(Collectors.toSet());
+        StoragePartitionServer server = new StoragePartitionServer(toAddress(localNodeId),
+                this,
+                serializer,
+                () -> new CopycatTransport(CopycatTransport.Mode.SERVER,
+                                     partition.getId(),
+                                     messagingService),
+                RESOURCE_TYPES,
+                logFolder);
+        return server.join(Collections2.transform(otherMembers, this::toAddress)).thenRun(() -> this.server = server);
+    }
+
     private CompletableFuture<StoragePartitionClient> openClient() {
         client = new StoragePartitionClient(this,
                 serializer,
@@ -149,7 +177,7 @@
      * Closes the partition server if it was previously opened.
      * @return future that is completed when the operation completes
      */
-    public CompletableFuture<Void> closeServer() {
+    public CompletableFuture<Void> leaveCluster() {
         return server != null ? server.closeAndExit() : CompletableFuture.completedFuture(null);
     }
 
@@ -181,15 +209,21 @@
      * @return partition info
      */
     public Optional<PartitionInfo> info() {
-        return server != null ? Optional.of(server.info()) : Optional.empty();
+        return server != null && !server.isClosed() ? Optional.of(server.info()) : Optional.empty();
     }
 
-    public void onUpdate(Partition partition) {
-        this.partition = partition;
+    public void onUpdate(Partition newValue) {
+        if (partition.getMembers().contains(localNodeId) && newValue.getMembers().contains(localNodeId)) {
+            return;
+        }
+        if (!partition.getMembers().contains(localNodeId) && !newValue.getMembers().contains(localNodeId)) {
+            return;
+        }
+        this.partition = newValue;
         if (partition.getMembers().contains(localNodeId)) {
-            openServer();
+            joinCluster();
         } else if (!partition.getMembers().contains(localNodeId)) {
-            closeServer();
+            leaveCluster();
         }
     }
 }