Support a inmemory p0 partition encompassing all nodes in the cluster. This will be used by leadership manager and other usecases
that need strong consistency for coordination and not durable storage

Change-Id: I8e590e46d82a3d43cae3157a04be820bb7e1b175
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 24ec92b..cb4ddfc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -25,58 +25,63 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.onosproject.store.service.UpdateOperation;
 import org.onosproject.store.service.Versioned;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
-
+import net.kuujo.copycat.Task;
+import net.kuujo.copycat.cluster.Cluster;
 import static com.google.common.base.Preconditions.checkState;
 
 /**
  * A database that partitions the keys across one or more database partitions.
  */
-public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
+public class PartitionedDatabase implements Database {
 
-    private Partitioner<String> partitioner;
-    private final ClusterCoordinator coordinator;
-    private final Map<String, Database> partitions = Maps.newConcurrentMap();
+    private final String name;
+    private final Partitioner<String> partitioner;
+    private final List<Database> partitions;
     private final AtomicBoolean isOpen = new AtomicBoolean(false);
-    private static final String DB_NOT_OPEN = "Database is not open";
+    private static final String DB_NOT_OPEN = "Partitioned Database is not open";
 
-    protected PartitionedDatabase(ClusterCoordinator coordinator) {
-        this.coordinator = coordinator;
+    public PartitionedDatabase(
+            String name,
+            Collection<Database> partitions) {
+        this.name = name;
+        this.partitions = partitions
+                .stream()
+                .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
+                .collect(Collectors.toList());
+        this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
+    }
+
+    /**
+     * Returns the databases for individual partitions.
+     * @return list of database partitions
+     */
+    public List<Database> getPartitions() {
+        return partitions;
     }
 
     /**
      * Returns true if the database is open.
      * @return true if open, false otherwise
      */
+    @Override
     public boolean isOpen() {
         return isOpen.get();
     }
 
     @Override
-    public void registerPartition(String name, Database partition) {
-        partitions.put(name, partition);
-    }
-
-    @Override
-    public Map<String, Database> getRegisteredPartitions() {
-        return ImmutableMap.copyOf(partitions);
-    }
-
-    @Override
     public CompletableFuture<Integer> size(String tableName) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         AtomicInteger totalSize = new AtomicInteger(0);
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
                     .toArray(CompletableFuture[]::new))
@@ -100,7 +105,6 @@
         checkState(isOpen.get(), DB_NOT_OPEN);
         AtomicBoolean containsValue = new AtomicBoolean(false);
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
                     .toArray(CompletableFuture[]::new))
@@ -129,7 +133,6 @@
     public CompletableFuture<Void> clear(String tableName) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.clear(tableName))
                     .toArray(CompletableFuture[]::new));
@@ -140,7 +143,6 @@
         checkState(isOpen.get(), DB_NOT_OPEN);
         Set<String> keySet = Sets.newConcurrentHashSet();
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
                     .toArray(CompletableFuture[]::new))
@@ -152,7 +154,6 @@
         checkState(isOpen.get(), DB_NOT_OPEN);
         List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.values(tableName).thenApply(values::addAll))
                     .toArray(CompletableFuture[]::new))
@@ -164,7 +165,6 @@
         checkState(isOpen.get(), DB_NOT_OPEN);
         Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
                     .toArray(CompletableFuture[]::new))
@@ -225,32 +225,47 @@
     }
 
     @Override
-    public void setPartitioner(Partitioner<String> partitioner) {
-        this.partitioner = partitioner;
-    }
-
-    @Override
-    public CompletableFuture<PartitionedDatabase> open() {
-        return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
-                                                        .values()
-                                                        .stream()
-                                                        .map(Database::open)
-                                                        .toArray(CompletableFuture[]::new))
-                                 .thenApply(v -> {
-                                     isOpen.set(true);
-                                     return this; }));
-
+    public CompletableFuture<Database> open() {
+        return CompletableFuture.allOf(partitions
+                    .stream()
+                    .map(Database::open)
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> {
+                    isOpen.set(true);
+                    return this; });
     }
 
     @Override
     public CompletableFuture<Void> close() {
         checkState(isOpen.get(), DB_NOT_OPEN);
-        CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
-                .values()
+        return CompletableFuture.allOf(partitions
                 .stream()
                 .map(database -> database.close())
                 .toArray(CompletableFuture[]::new));
-        CompletableFuture<Void> closeCoordinator = coordinator.close();
-        return closePartitions.thenCompose(v -> closeCoordinator);
     }
-}
+
+    @Override
+    public boolean isClosed() {
+        return !isOpen.get();
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Cluster cluster() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Database addStartupTask(Task<CompletableFuture<Void>> task) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
+        throw new UnsupportedOperationException();
+    }
+}
\ No newline at end of file