Support a inmemory p0 partition encompassing all nodes in the cluster. This will be used by leadership manager and other usecases
that need strong consistency for coordination and not durable storage

Change-Id: I8e590e46d82a3d43cae3157a04be820bb7e1b175
diff --git a/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java b/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
index 3acd21f..8b3d166 100644
--- a/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
+++ b/cli/src/main/java/org/onosproject/cli/net/PartitionsListCommand.java
@@ -42,7 +42,9 @@
      * @param partitionInfo partition descriptions
      */
     private void displayPartitions(List<PartitionInfo> partitionInfo) {
+        print("----------------------------------------------------------");
         print(FMT, "Name", "Term", "Members", "");
+        print("----------------------------------------------------------");
 
         for (PartitionInfo info : partitionInfo) {
             boolean first = true;
@@ -56,6 +58,9 @@
                             member.equals(info.leader()) ? "*" : "");
                 }
             }
+            if (!first) {
+                print("----------------------------------------------------------");
+            }
         }
     }
 
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
index e0ed4bc..6d19259 100644
--- a/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMap.java
@@ -17,8 +17,8 @@
 package org.onosproject.store.service;
 
 import java.util.Collection;
-import java.util.Set;
 import java.util.Map.Entry;
+import java.util.Set;
 
 /**
  * A distributed, strongly consistent map.
diff --git a/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java
new file mode 100644
index 0000000..6a02f34
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/ConsistentMapBuilder.java
@@ -0,0 +1,75 @@
+package org.onosproject.store.service;
+
+
+/**
+ * Builder for consistent maps.
+ *
+ * @param <K> type for map key
+ * @param <V> type for map value
+ */
+public interface ConsistentMapBuilder<K, V> {
+
+    /**
+     * Sets the name of the map.
+     * <p>
+     * Each consistent map is identified by a unique map name.
+     * </p>
+     * <p>
+     * Note: This is a mandatory parameter.
+     * </p>
+     *
+     * @param name name of the consistent map
+     * @return this ConsistentMapBuilder
+     */
+    public ConsistentMapBuilder<K, V> withName(String name);
+
+    /**
+     * Sets a serializer that can be used to serialize
+     * both the keys and values inserted into the map. The serializer
+     * builder should be pre-populated with any classes that will be
+     * put into the map.
+     * <p>
+     * Note: This is a mandatory parameter.
+     * </p>
+     *
+     * @param serializer serializer
+     * @return this ConsistentMapBuilder
+     */
+    public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer);
+
+    /**
+     * Disables distribution of map entries across multiple database partitions.
+     * <p>
+     * When partitioning is disabled, the returned map will have a single partition
+     * that spans the entire cluster. Furthermore, the changes made to the map are
+     * ephemeral and do not survive a full cluster restart.
+     * </p>
+     * <p>
+     * Disabling partitions is more appropriate when the returned map is used for
+     * coordination activities such as leader election and not for long term data persistence.
+     * </p>
+     * <p>
+     * Note: By default partitions are enabled and entries in the map are durable.
+     * </p>
+     * @return this ConsistentMapBuilder
+     */
+    public ConsistentMapBuilder<K, V> withPartitionsDisabled();
+
+    /**
+     * Builds an consistent map based on the configuration options
+     * supplied to this builder.
+     *
+     * @return new consistent map
+     * @throws java.lang.RuntimeException if a mandatory parameter is missing
+     */
+    public ConsistentMap<K, V> build();
+
+    /**
+     * Builds an async consistent map based on the configuration options
+     * supplied to this builder.
+     *
+     * @return new async consistent map
+     * @throws java.lang.RuntimeException if a mandatory parameter is missing
+     */
+    public AsyncConsistentMap<K, V> buildAsyncMap();
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/StorageService.java b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
index a59e376..c8a394d 100644
--- a/core/api/src/main/java/org/onosproject/store/service/StorageService.java
+++ b/core/api/src/main/java/org/onosproject/store/service/StorageService.java
@@ -29,28 +29,6 @@
 public interface StorageService {
 
     /**
-     * Creates a ConsistentMap.
-     *
-     * @param name map name
-     * @param serializer serializer to use for serializing keys and values
-     * @return consistent map.
-     * @param <K> key type
-     * @param <V> value type
-     */
-    <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer);
-
-    /**
-     * Creates a AsyncConsistentMap.
-     *
-     * @param name map name
-     * @param serializer serializer to use for serializing keys and values
-     * @return async consistent map
-     * @param <K> key type
-     * @param <V> value type
-     */
-    <K, V> AsyncConsistentMap<K , V> createAsyncConsistentMap(String name, Serializer serializer);
-
-    /**
      * Creates a new transaction context.
      *
      * @return transaction context
@@ -66,4 +44,12 @@
      */
     <K, V> EventuallyConsistentMapBuilder<K, V> eventuallyConsistentMapBuilder();
 
-}
+    /**
+     * Creates a new EventuallyConsistentMapBuilder.
+     *
+     * @param <K> key type
+     * @param <V> value type
+     * @return builder for an eventually consistent map
+     */
+    <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder();
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
index 675d094..91cfa4b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
@@ -81,5 +81,4 @@
       .addStartupTask(() -> coordinator.open().thenApply(v -> null))
       .addShutdownTask(coordinator::close);
   }
-
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
index 3f8b235..bd774b9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
@@ -36,6 +36,8 @@
   private static final String DEFAULT_CONFIGURATION = "database-defaults";
   private static final String CONFIGURATION = "database";
 
+  private String name;
+
   public DatabaseConfig() {
     super(CONFIGURATION, DEFAULT_CONFIGURATION);
   }
@@ -114,6 +116,37 @@
     return this;
   }
 
+  /**
+   * Returns the database name.
+   *
+   * @return The database name
+   */
+  public String getName() {
+      return name;
+  }
+
+  /**
+   * Sets the database name, returning the configuration for method chaining.
+   *
+   * @param name The database name
+   * @return The database configuration
+   * @throws java.lang.NullPointerException If the name is {@code null}
+   */
+  public DatabaseConfig withName(String name) {
+      setName(Assert.isNotNull(name, "name"));
+      return this;
+  }
+
+  /**
+   * Sets the database name.
+   *
+   * @param name The database name
+   * @throws java.lang.NullPointerException If the name is {@code null}
+   */
+  public void setName(String name) {
+      this.name = Assert.isNotNull(name, "name");
+  }
+
   @Override
   public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
     return new StateLogConfig(toMap())
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 4ef317f..eaeecfd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -16,12 +16,23 @@
 
 package org.onosproject.store.consistent.impl;
 
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+
+import net.kuujo.copycat.CopycatConfig;
 import net.kuujo.copycat.cluster.ClusterConfig;
 import net.kuujo.copycat.cluster.Member;
+import net.kuujo.copycat.cluster.Member.Type;
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
+import net.kuujo.copycat.log.BufferedLog;
 import net.kuujo.copycat.log.FileLog;
+import net.kuujo.copycat.log.Log;
 import net.kuujo.copycat.netty.NettyTcpProtocol;
 import net.kuujo.copycat.protocol.Consistency;
+import net.kuujo.copycat.protocol.Protocol;
+import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -32,11 +43,9 @@
 import org.onosproject.store.cluster.impl.NodeInfo;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
 import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
 import org.onosproject.store.service.EventuallyConsistentMapBuilder;
 import org.onosproject.store.service.PartitionInfo;
-import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageAdminService;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.TransactionContext;
@@ -47,7 +56,9 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -61,13 +72,16 @@
 public class DatabaseManager implements StorageService, StorageAdminService {
 
     private final Logger log = getLogger(getClass());
+    private ClusterCoordinator coordinator;
     private PartitionedDatabase partitionedDatabase;
+    private Database inMemoryDatabase;
     public static final int COPYCAT_TCP_PORT = 7238; //  7238 = RAFT
     private static final String CONFIG_DIR = "../config";
     private static final String PARTITION_DEFINITION_FILE = "tablets.json";
     private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60;
-
-    private final PartitionedDatabaseConfig databaseConfig = new PartitionedDatabaseConfig();
+    public static final String BASE_PARTITION_NAME = "p0";
+    private static final int RAFT_ELECTION_TIMEOUT = 3000;
+    private static final int RAFT_HEARTBEAT_TIMEOUT = 1500;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ClusterService clusterService;
@@ -82,8 +96,6 @@
     @Activate
     public void activate() {
 
-        final String logDir = System.getProperty("karaf.data", "./data");
-
         // load database configuration
         File file = new File(CONFIG_DIR, PARTITION_DEFINITION_FILE);
         log.info("Loading database definition: {}", file.getAbsolutePath());
@@ -107,47 +119,56 @@
         String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode()));
 
         ClusterConfig clusterConfig = new ClusterConfig()
-            .withProtocol(new NettyTcpProtocol()
-                    .withSsl(false)
-                    .withConnectTimeout(60000)
-                    .withAcceptBacklog(1024)
-                    .withTrafficClass(-1)
-                    .withSoLinger(-1)
-                    .withReceiveBufferSize(32768)
-                    .withSendBufferSize(8192)
-                    .withThreads(1))
-            .withElectionTimeout(3000)
-            .withHeartbeatInterval(1500)
+            .withProtocol(newNettyProtocol())
+            .withElectionTimeout(RAFT_ELECTION_TIMEOUT)
+            .withHeartbeatInterval(RAFT_HEARTBEAT_TIMEOUT)
             .withMembers(activeNodeUris)
             .withLocalMember(localNodeUri);
 
-        partitionMap.forEach((name, nodes) -> {
-            Set<String> replicas = nodes.stream().map(this::nodeToUri).collect(Collectors.toSet());
-            DatabaseConfig partitionConfig = new DatabaseConfig()
-                            .withElectionTimeout(3000)
-                            .withHeartbeatInterval(1500)
-                            .withConsistency(Consistency.STRONG)
-                            .withLog(new FileLog()
-                                    .withDirectory(logDir)
-                                    .withSegmentSize(1073741824) // 1GB
-                                    .withFlushOnWrite(true)
-                                    .withSegmentInterval(Long.MAX_VALUE))
-                            .withDefaultSerializer(new DatabaseSerializer())
-                            .withReplicas(replicas);
-            databaseConfig.addPartition(name, partitionConfig);
-        });
+        CopycatConfig copycatConfig = new CopycatConfig()
+            .withName("onos")
+            .withClusterConfig(clusterConfig)
+            .withDefaultSerializer(new DatabaseSerializer())
+            .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
 
-        partitionedDatabase = PartitionedDatabaseManager.create("onos-store", clusterConfig, databaseConfig);
+        coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
+
+        DatabaseConfig inMemoryDatabaseConfig =
+                newDatabaseConfig(BASE_PARTITION_NAME, newInMemoryLog(), activeNodeUris);
+        inMemoryDatabase = coordinator
+                .getResource(inMemoryDatabaseConfig.getName(), inMemoryDatabaseConfig.resolve(clusterConfig)
+                .withSerializer(copycatConfig.getDefaultSerializer())
+                .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
+
+        List<Database> partitions = partitionMap.entrySet()
+            .stream()
+            .map(entry -> {
+                String[] replicas = entry.getValue().stream().map(this::nodeToUri).toArray(String[]::new);
+                return newDatabaseConfig(entry.getKey(), newPersistentLog(), replicas);
+                })
+            .map(config -> {
+                Database db = coordinator.getResource(config.getName(), config.resolve(clusterConfig)
+                        .withSerializer(copycatConfig.getDefaultSerializer())
+                        .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
+                return db;
+            })
+            .collect(Collectors.toList());
+
+        partitionedDatabase = new PartitionedDatabase("onos-store", partitions);
 
         CountDownLatch latch = new CountDownLatch(1);
-        partitionedDatabase.open().whenComplete((db, error) -> {
-            if (error != null) {
-                log.warn("Failed to open database.", error);
-            } else {
-                latch.countDown();
-                log.info("Successfully opened database.");
-            }
-        });
+
+        coordinator.open()
+            .thenCompose(v -> CompletableFuture.allOf(inMemoryDatabase.open(), partitionedDatabase.open())
+            .whenComplete((db, error) -> {
+                if (error != null) {
+                    log.warn("Failed to create databases.", error);
+                } else {
+                    latch.countDown();
+                    log.info("Successfully created databases.");
+                }
+            }));
+
         try {
             if (!latch.await(DATABASE_STARTUP_TIMEOUT_SEC, TimeUnit.SECONDS)) {
                 log.warn("Timed out waiting for database to initialize.");
@@ -161,52 +182,87 @@
 
     @Deactivate
     public void deactivate() {
-        partitionedDatabase.close().whenComplete((result, error) -> {
-            if (error != null) {
-                log.warn("Failed to cleanly close database.", error);
-            } else {
-                log.info("Successfully closed database.");
-            }
-        });
+        CompletableFuture.allOf(inMemoryDatabase.close(), partitionedDatabase.close())
+            .thenCompose(v -> coordinator.close())
+            .whenComplete((result, error) -> {
+                if (error != null) {
+                    log.warn("Failed to cleanly close databases.", error);
+                } else {
+                    log.info("Successfully closed databases.");
+                }
+            });
         log.info("Stopped");
     }
 
     @Override
-    public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
-        return new DefaultConsistentMap<>(name, partitionedDatabase, serializer);
-    }
-
-    @Override
-    public <K, V> AsyncConsistentMap<K , V> createAsyncConsistentMap(String name, Serializer serializer) {
-        return new DefaultAsyncConsistentMap<>(name, partitionedDatabase, serializer);
-    }
-
-    @Override
     public TransactionContext createTransactionContext() {
         return new DefaultTransactionContext(partitionedDatabase);
     }
 
     @Override
     public List<PartitionInfo> getPartitionInfo() {
-        return partitionedDatabase.getRegisteredPartitions()
-                .values()
+        return Lists.asList(
+                    inMemoryDatabase,
+                    partitionedDatabase.getPartitions().toArray(new Database[]{}))
                 .stream()
-                .map(db -> toPartitionInfo(db, databaseConfig.partitions().get(db.name())))
+                .map(DatabaseManager::toPartitionInfo)
                 .collect(Collectors.toList());
     }
 
+    private Protocol newNettyProtocol() {
+        return new NettyTcpProtocol()
+            .withSsl(false)
+            .withConnectTimeout(60000)
+            .withAcceptBacklog(1024)
+            .withTrafficClass(-1)
+            .withSoLinger(-1)
+            .withReceiveBufferSize(32768)
+            .withSendBufferSize(8192)
+            .withThreads(1);
+    }
+
+    private Log newPersistentLog() {
+        String logDir = System.getProperty("karaf.data", "./data");
+        return new FileLog()
+            .withDirectory(logDir)
+            .withSegmentSize(1073741824) // 1GB
+            .withFlushOnWrite(true)
+            .withSegmentInterval(Long.MAX_VALUE);
+    }
+
+    private Log newInMemoryLog() {
+        return new BufferedLog()
+            .withFlushOnWrite(false)
+            .withFlushInterval(Long.MAX_VALUE)
+            .withSegmentSize(10485760) // 10MB
+            .withSegmentInterval(Long.MAX_VALUE);
+    }
+
+    private DatabaseConfig newDatabaseConfig(String name, Log log, String[] replicas) {
+        return new DatabaseConfig()
+            .withName(name)
+            .withElectionTimeout(RAFT_ELECTION_TIMEOUT)
+            .withHeartbeatInterval(RAFT_HEARTBEAT_TIMEOUT)
+            .withConsistency(Consistency.STRONG)
+            .withLog(log)
+            .withDefaultSerializer(new DatabaseSerializer())
+            .withReplicas(replicas);
+    }
+
     /**
      * Maps a Raft Database object to a PartitionInfo object.
      *
      * @param database database containing input data
      * @return PartitionInfo object
      */
-    private static PartitionInfo toPartitionInfo(Database database, DatabaseConfig dbConfig) {
+    private static PartitionInfo toPartitionInfo(Database database) {
         return new PartitionInfo(database.name(),
                           database.cluster().term(),
-                          database.cluster().members().stream()
+                          database.cluster().members()
+                                  .stream()
+                                  .filter(member -> Type.ACTIVE.equals(member.type()))
                                   .map(Member::uri)
-                                  .filter(uri -> dbConfig.getReplicas().contains(uri))
+                                  .sorted()
                                   .collect(Collectors.toList()),
                           database.cluster().leader() != null ?
                                   database.cluster().leader().uri() : null);
@@ -219,4 +275,8 @@
                                                         clusterCommunicator);
     }
 
-}
+    @Override
+    public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+        return new DefaultConsistentMapBuilder<>(inMemoryDatabase, partitionedDatabase);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
index cd0525e..a76a4e3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
@@ -18,10 +18,9 @@
 
 import static com.google.common.base.Preconditions.checkState;
 
-import java.util.Map;
-
+import java.util.List;
 import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableList;
 import com.google.common.hash.Hashing;
 
 /**
@@ -32,11 +31,11 @@
  */
 public abstract class DatabasePartitioner implements Partitioner<String> {
     // Database partitions sorted by their partition name.
-    protected final Database[] sortedPartitions;
+    protected final List<Database> partitions;
 
-    public DatabasePartitioner(Map<String, Database> partitionMap) {
-        checkState(partitionMap != null && !partitionMap.isEmpty(), "Partition map cannot be null or empty");
-        sortedPartitions = ImmutableSortedMap.<String, Database>copyOf(partitionMap).values().toArray(new Database[]{});
+    public DatabasePartitioner(List<Database> partitions) {
+        checkState(partitions != null && !partitions.isEmpty(), "Partitions cannot be null or empty");
+        this.partitions = ImmutableList.copyOf(partitions);
     }
 
     protected int hash(String key) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index d9876fd..c86a6ea 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -45,7 +45,7 @@
 public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
 
     private final String name;
-    private final DatabaseProxy<String, byte[]> proxy;
+    private final Database database;
     private final Serializer serializer;
 
     private static final String ERROR_NULL_KEY = "Key cannot be null";
@@ -66,39 +66,39 @@
     }
 
     public DefaultAsyncConsistentMap(String name,
-            DatabaseProxy<String, byte[]> proxy,
+            Database database,
             Serializer serializer) {
         this.name = checkNotNull(name, "map name cannot be null");
-        this.proxy = checkNotNull(proxy, "database proxy cannot be null");
+        this.database = checkNotNull(database, "database cannot be null");
         this.serializer = checkNotNull(serializer, "serializer cannot be null");
     }
 
     @Override
     public CompletableFuture<Integer> size() {
-        return proxy.size(name);
+        return database.size(name);
     }
 
     @Override
     public CompletableFuture<Boolean> isEmpty() {
-        return proxy.isEmpty(name);
+        return database.isEmpty(name);
     }
 
     @Override
     public CompletableFuture<Boolean> containsKey(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
-        return proxy.containsKey(name, keyCache.getUnchecked(key));
+        return database.containsKey(name, keyCache.getUnchecked(key));
     }
 
     @Override
     public CompletableFuture<Boolean> containsValue(V value) {
         checkNotNull(value, ERROR_NULL_VALUE);
-        return proxy.containsValue(name, serializer.encode(value));
+        return database.containsValue(name, serializer.encode(value));
     }
 
     @Override
     public CompletableFuture<Versioned<V>> get(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
-        return proxy.get(name, keyCache.getUnchecked(key))
+        return database.get(name, keyCache.getUnchecked(key))
             .thenApply(v -> v != null
             ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
     }
@@ -107,7 +107,7 @@
     public CompletableFuture<Versioned<V>> put(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        return proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value))
+        return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
                 .thenApply(v -> v != null
                 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
     }
@@ -115,19 +115,19 @@
     @Override
     public CompletableFuture<Versioned<V>> remove(K key) {
         checkNotNull(key, ERROR_NULL_KEY);
-        return proxy.remove(name, keyCache.getUnchecked(key))
+        return database.remove(name, keyCache.getUnchecked(key))
                 .thenApply(v -> v != null
                 ? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
     }
 
     @Override
     public CompletableFuture<Void> clear() {
-        return proxy.clear(name);
+        return database.clear(name);
     }
 
     @Override
     public CompletableFuture<Set<K>> keySet() {
-        return proxy.keySet(name)
+        return database.keySet(name)
                 .thenApply(s -> s
                 .stream()
                 .map(this::dK)
@@ -136,7 +136,7 @@
 
     @Override
     public CompletableFuture<Collection<Versioned<V>>> values() {
-        return proxy.values(name).thenApply(c -> c
+        return database.values(name).thenApply(c -> c
             .stream()
             .map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime()))
             .collect(Collectors.toList()));
@@ -144,7 +144,7 @@
 
     @Override
     public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
-        return proxy.entrySet(name).thenApply(s -> s
+        return database.entrySet(name).thenApply(s -> s
                 .stream()
                 .map(this::fromRawEntry)
                 .collect(Collectors.toSet()));
@@ -154,7 +154,7 @@
     public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        return proxy.putIfAbsent(
+        return database.putIfAbsent(
                 name, keyCache.getUnchecked(key), serializer.encode(value)).thenApply(v ->
                 v != null ?
                 new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
@@ -164,13 +164,13 @@
     public CompletableFuture<Boolean> remove(K key, V value) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(value, ERROR_NULL_VALUE);
-        return proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value));
+        return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value));
     }
 
     @Override
     public CompletableFuture<Boolean> remove(K key, long version) {
         checkNotNull(key, ERROR_NULL_KEY);
-        return proxy.remove(name, keyCache.getUnchecked(key), version);
+        return database.remove(name, keyCache.getUnchecked(key), version);
 
     }
 
@@ -179,14 +179,14 @@
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(newValue, ERROR_NULL_VALUE);
         byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
-        return proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue));
+        return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue));
     }
 
     @Override
     public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
         checkNotNull(key, ERROR_NULL_KEY);
         checkNotNull(newValue, ERROR_NULL_VALUE);
-        return proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue));
+        return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue));
     }
 
     private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
index 123615c..046aafb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
@@ -44,9 +44,9 @@
     private final AsyncConsistentMap<K, V> asyncMap;
 
     public DefaultConsistentMap(String name,
-            DatabaseProxy<String, byte[]> proxy,
+            Database database,
             Serializer serializer) {
-        asyncMap = new DefaultAsyncConsistentMap<>(name, proxy, serializer);
+        asyncMap = new DefaultAsyncConsistentMap<>(name, database, serializer);
     }
 
     @Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
new file mode 100644
index 0000000..ba56e59
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
@@ -0,0 +1,71 @@
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.Serializer;
+
+/**
+ * Default Consistent Map builder.
+ *
+ * @param <K> type for map key
+ * @param <V> type for map value
+ */
+public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K, V> {
+
+    private Serializer serializer;
+    private String name;
+    private boolean partitionsEnabled = true;
+    private final Database partitionedDatabase;
+    private final Database inMemoryDatabase;
+
+    public DefaultConsistentMapBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
+        this.inMemoryDatabase = inMemoryDatabase;
+        this.partitionedDatabase = partitionedDatabase;
+    }
+
+    @Override
+    public ConsistentMapBuilder<K, V> withName(String name) {
+        checkArgument(name != null && !name.isEmpty());
+        this.name = name;
+        return this;
+    }
+
+    @Override
+    public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) {
+        checkArgument(serializer != null);
+        this.serializer = serializer;
+        return this;
+    }
+
+    @Override
+    public ConsistentMapBuilder<K, V> withPartitionsDisabled() {
+        partitionsEnabled = false;
+        return this;
+    }
+
+    private boolean validInputs() {
+        return name != null && serializer != null;
+    }
+
+    @Override
+    public ConsistentMap<K, V> build() {
+        checkState(validInputs());
+        return new DefaultConsistentMap<>(
+                name,
+                partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
+                serializer);
+    }
+
+    @Override
+    public AsyncConsistentMap<K, V> buildAsyncMap() {
+        checkState(validInputs());
+        return new DefaultAsyncConsistentMap<>(
+                name,
+                partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
+                serializer);
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 88832e3..9ffd1e8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -36,130 +36,143 @@
  * Default database.
  */
 public class DefaultDatabase extends AbstractResource<Database> implements Database {
-  private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
-  private DatabaseProxy<String, byte[]> proxy;
+    private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
+    private DatabaseProxy<String, byte[]> proxy;
 
-  @SuppressWarnings("unchecked")
-  public DefaultDatabase(ResourceContext context) {
-    super(context);
-    this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
-  }
-
-  /**
-   * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
-   * return the completed future result.
-   *
-   * @param supplier The supplier to call if the database is open.
-   * @param <T> The future result type.
-   * @return A completable future that if this database is closed is immediately failed.
-   */
-  protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
-    if (proxy == null) {
-      return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
+    @SuppressWarnings("unchecked")
+    public DefaultDatabase(ResourceContext context) {
+        super(context);
+        this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
     }
-    return supplier.get();
-  }
 
-  @Override
-  public CompletableFuture<Integer> size(String tableName) {
-    return checkOpen(() -> proxy.size(tableName));
-  }
+    /**
+     * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
+     * return the completed future result.
+     *
+     * @param supplier The supplier to call if the database is open.
+     * @param <T> The future result type.
+     * @return A completable future that if this database is closed is immediately failed.
+     */
+    protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
+        if (proxy == null) {
+            return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
+        }
+        return supplier.get();
+    }
 
-  @Override
-  public CompletableFuture<Boolean> isEmpty(String tableName) {
-    return checkOpen(() -> proxy.isEmpty(tableName));
-  }
+    @Override
+    public CompletableFuture<Integer> size(String tableName) {
+        return checkOpen(() -> proxy.size(tableName));
+    }
 
-  @Override
-  public CompletableFuture<Boolean> containsKey(String tableName, String key) {
-    return checkOpen(() -> proxy.containsKey(tableName, key));
-  }
+    @Override
+    public CompletableFuture<Boolean> isEmpty(String tableName) {
+        return checkOpen(() -> proxy.isEmpty(tableName));
+    }
 
-  @Override
-  public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
-    return checkOpen(() -> proxy.containsValue(tableName, value));
-  }
+    @Override
+    public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+        return checkOpen(() -> proxy.containsKey(tableName, key));
+    }
 
-  @Override
-  public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
-    return checkOpen(() -> proxy.get(tableName, key));
-  }
+    @Override
+    public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+        return checkOpen(() -> proxy.containsValue(tableName, value));
+    }
 
-  @Override
-  public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
-    return checkOpen(() -> proxy.put(tableName, key, value));
-  }
+    @Override
+    public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+        return checkOpen(() -> proxy.get(tableName, key));
+    }
 
-  @Override
-  public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
-    return checkOpen(() -> proxy.remove(tableName, key));
-  }
+    @Override
+    public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+        return checkOpen(() -> proxy.put(tableName, key, value));
+    }
 
-  @Override
-  public CompletableFuture<Void> clear(String tableName) {
-    return checkOpen(() -> proxy.clear(tableName));
-  }
+    @Override
+    public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+        return checkOpen(() -> proxy.remove(tableName, key));
+    }
 
-  @Override
-  public CompletableFuture<Set<String>> keySet(String tableName) {
-    return checkOpen(() -> proxy.keySet(tableName));
-  }
+    @Override
+    public CompletableFuture<Void> clear(String tableName) {
+        return checkOpen(() -> proxy.clear(tableName));
+    }
 
-  @Override
-  public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
-    return checkOpen(() -> proxy.values(tableName));
-  }
+    @Override
+    public CompletableFuture<Set<String>> keySet(String tableName) {
+        return checkOpen(() -> proxy.keySet(tableName));
+    }
 
-  @Override
-  public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
-    return checkOpen(() -> proxy.entrySet(tableName));
-  }
+    @Override
+    public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+        return checkOpen(() -> proxy.values(tableName));
+    }
 
-  @Override
-  public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
-    return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
-  }
+    @Override
+    public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+        return checkOpen(() -> proxy.entrySet(tableName));
+    }
 
-  @Override
-  public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
-    return checkOpen(() -> proxy.remove(tableName, key, value));
-  }
+    @Override
+    public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+        return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
+    }
 
-  @Override
-  public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
-    return checkOpen(() -> proxy.remove(tableName, key, version));
-  }
+    @Override
+    public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+        return checkOpen(() -> proxy.remove(tableName, key, value));
+    }
 
-  @Override
-  public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
-    return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
-  }
+    @Override
+    public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+        return checkOpen(() -> proxy.remove(tableName, key, version));
+    }
 
-  @Override
-  public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
-    return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
-  }
+    @Override
+    public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+        return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
+    }
 
-  @Override
-  public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
-      return checkOpen(() -> proxy.atomicBatchUpdate(updates));
-  }
+    @Override
+    public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+        return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
+    }
 
-  @Override
-  @SuppressWarnings("unchecked")
-  public synchronized CompletableFuture<Database> open() {
-    return runStartupTasks()
-      .thenCompose(v -> stateMachine.open())
-      .thenRun(() -> {
-        this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
-      })
-      .thenApply(v -> null);
-  }
+    @Override
+    public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+        return checkOpen(() -> proxy.atomicBatchUpdate(updates));
+    }
 
-  @Override
-  public synchronized CompletableFuture<Void> close() {
-    proxy = null;
-    return stateMachine.close()
-      .thenCompose(v -> runShutdownTasks());
-  }
-}
+    @Override
+    @SuppressWarnings("unchecked")
+    public synchronized CompletableFuture<Database> open() {
+        return runStartupTasks()
+                .thenCompose(v -> stateMachine.open())
+                .thenRun(() -> {
+                    this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
+                })
+                .thenApply(v -> null);
+    }
+
+    @Override
+    public synchronized CompletableFuture<Void> close() {
+        proxy = null;
+        return stateMachine.close()
+                .thenCompose(v -> runShutdownTasks());
+    }
+
+    @Override
+    public int hashCode() {
+        return name().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other instanceof Database) {
+            return name().equals(((Database) other).name());
+        }
+        return false;
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
index 9f8c5bd..d21543a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
@@ -42,12 +42,12 @@
 
     private final Map<String, DefaultTransactionalMap> txMaps = Maps.newHashMap();
     private boolean isOpen = false;
-    DatabaseProxy<String, byte[]> databaseProxy;
+    private final Database database;
     private static final String TX_NOT_OPEN_ERROR = "Transaction is not open";
     private static final int TRANSACTION_TIMEOUT_MILLIS = 2000;
 
-    DefaultTransactionContext(DatabaseProxy<String, byte[]> proxy) {
-        this.databaseProxy = proxy;
+    DefaultTransactionContext(Database database) {
+        this.database = checkNotNull(database, "Database must not be null");
     }
 
     @Override
@@ -63,7 +63,7 @@
         checkNotNull(serializer, "serializer is null");
         checkState(isOpen, TX_NOT_OPEN_ERROR);
         if (!txMaps.containsKey(mapName)) {
-            ConsistentMap<K, V> backingMap = new DefaultConsistentMap<>(mapName, databaseProxy, serializer);
+            ConsistentMap<K, V> backingMap = new DefaultConsistentMap<>(mapName, database, serializer);
             DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<>(mapName, backingMap, this, serializer);
             txMaps.put(mapName, txMap);
         }
@@ -83,7 +83,7 @@
                     allUpdates.addAll(m.prepareDatabaseUpdates());
                 });
 
-            if (!complete(databaseProxy.atomicBatchUpdate(allUpdates))) {
+            if (!complete(database.atomicBatchUpdate(allUpdates))) {
                 throw new TransactionException.OptimisticConcurrencyFailure();
             }
         } finally {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 5e5fa06..571e309 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -101,20 +101,21 @@
 
     @Activate
     public void activate() {
-        lockMap = storageService.createConsistentMap("onos-leader-locks", new Serializer() {
-            KryoNamespace kryo = new KryoNamespace.Builder()
-                        .register(KryoNamespaces.API).build();
+        lockMap = storageService.<String, NodeId>consistentMapBuilder()
+                    .withName("onos-leader-locks")
+                    .withSerializer(new Serializer() {
+                        KryoNamespace kryo = new KryoNamespace.Builder().register(KryoNamespaces.API).build();
+                        @Override
+                        public <T> byte[] encode(T object) {
+                            return kryo.serialize(object);
+                        }
 
-            @Override
-            public <T> byte[] encode(T object) {
-                return kryo.serialize(object);
-            }
-
-            @Override
-            public <T> T decode(byte[] bytes) {
-                return kryo.deserialize(bytes);
-            }
-        });
+                        @Override
+                        public <T> T decode(byte[] bytes) {
+                            return kryo.deserialize(bytes);
+                        }
+                    })
+                    .withPartitionsDisabled().build();
 
         localNodeId = clusterService.getLocalNode().id();
 
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 24ec92b..cb4ddfc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -25,58 +25,63 @@
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.onosproject.store.service.UpdateOperation;
 import org.onosproject.store.service.Versioned;
 
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
-import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
-
+import net.kuujo.copycat.Task;
+import net.kuujo.copycat.cluster.Cluster;
 import static com.google.common.base.Preconditions.checkState;
 
 /**
  * A database that partitions the keys across one or more database partitions.
  */
-public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
+public class PartitionedDatabase implements Database {
 
-    private Partitioner<String> partitioner;
-    private final ClusterCoordinator coordinator;
-    private final Map<String, Database> partitions = Maps.newConcurrentMap();
+    private final String name;
+    private final Partitioner<String> partitioner;
+    private final List<Database> partitions;
     private final AtomicBoolean isOpen = new AtomicBoolean(false);
-    private static final String DB_NOT_OPEN = "Database is not open";
+    private static final String DB_NOT_OPEN = "Partitioned Database is not open";
 
-    protected PartitionedDatabase(ClusterCoordinator coordinator) {
-        this.coordinator = coordinator;
+    public PartitionedDatabase(
+            String name,
+            Collection<Database> partitions) {
+        this.name = name;
+        this.partitions = partitions
+                .stream()
+                .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
+                .collect(Collectors.toList());
+        this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
+    }
+
+    /**
+     * Returns the databases for individual partitions.
+     * @return list of database partitions
+     */
+    public List<Database> getPartitions() {
+        return partitions;
     }
 
     /**
      * Returns true if the database is open.
      * @return true if open, false otherwise
      */
+    @Override
     public boolean isOpen() {
         return isOpen.get();
     }
 
     @Override
-    public void registerPartition(String name, Database partition) {
-        partitions.put(name, partition);
-    }
-
-    @Override
-    public Map<String, Database> getRegisteredPartitions() {
-        return ImmutableMap.copyOf(partitions);
-    }
-
-    @Override
     public CompletableFuture<Integer> size(String tableName) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         AtomicInteger totalSize = new AtomicInteger(0);
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
                     .toArray(CompletableFuture[]::new))
@@ -100,7 +105,6 @@
         checkState(isOpen.get(), DB_NOT_OPEN);
         AtomicBoolean containsValue = new AtomicBoolean(false);
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
                     .toArray(CompletableFuture[]::new))
@@ -129,7 +133,6 @@
     public CompletableFuture<Void> clear(String tableName) {
         checkState(isOpen.get(), DB_NOT_OPEN);
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.clear(tableName))
                     .toArray(CompletableFuture[]::new));
@@ -140,7 +143,6 @@
         checkState(isOpen.get(), DB_NOT_OPEN);
         Set<String> keySet = Sets.newConcurrentHashSet();
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
                     .toArray(CompletableFuture[]::new))
@@ -152,7 +154,6 @@
         checkState(isOpen.get(), DB_NOT_OPEN);
         List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.values(tableName).thenApply(values::addAll))
                     .toArray(CompletableFuture[]::new))
@@ -164,7 +165,6 @@
         checkState(isOpen.get(), DB_NOT_OPEN);
         Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
         return CompletableFuture.allOf(partitions
-                    .values()
                     .stream()
                     .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
                     .toArray(CompletableFuture[]::new))
@@ -225,32 +225,47 @@
     }
 
     @Override
-    public void setPartitioner(Partitioner<String> partitioner) {
-        this.partitioner = partitioner;
-    }
-
-    @Override
-    public CompletableFuture<PartitionedDatabase> open() {
-        return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
-                                                        .values()
-                                                        .stream()
-                                                        .map(Database::open)
-                                                        .toArray(CompletableFuture[]::new))
-                                 .thenApply(v -> {
-                                     isOpen.set(true);
-                                     return this; }));
-
+    public CompletableFuture<Database> open() {
+        return CompletableFuture.allOf(partitions
+                    .stream()
+                    .map(Database::open)
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> {
+                    isOpen.set(true);
+                    return this; });
     }
 
     @Override
     public CompletableFuture<Void> close() {
         checkState(isOpen.get(), DB_NOT_OPEN);
-        CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
-                .values()
+        return CompletableFuture.allOf(partitions
                 .stream()
                 .map(database -> database.close())
                 .toArray(CompletableFuture[]::new));
-        CompletableFuture<Void> closeCoordinator = coordinator.close();
-        return closePartitions.thenCompose(v -> closeCoordinator);
     }
-}
+
+    @Override
+    public boolean isClosed() {
+        return !isOpen.get();
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public Cluster cluster() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Database addStartupTask(Task<CompletableFuture<Void>> task) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
+        throw new UnsupportedOperationException();
+    }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
deleted file mode 100644
index 3dad8e0..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Partitioned database configuration.
- */
-public class PartitionedDatabaseConfig {
-    private final Map<String, DatabaseConfig> partitions = new HashMap<>();
-
-    /**
-     * Returns the configuration for all partitions.
-     * @return partition map to configuartion mapping.
-     */
-    public Map<String, DatabaseConfig> partitions() {
-        return Collections.unmodifiableMap(partitions);
-    }
-
-    /**
-     * Adds the specified partition name and configuration.
-     * @param name partition name.
-     * @param config partition config
-     * @return this instance
-     */
-    public PartitionedDatabaseConfig addPartition(String name, DatabaseConfig config) {
-        partitions.put(name, config);
-        return this;
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
deleted file mode 100644
index 3ebce09..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-
-import net.kuujo.copycat.CopycatConfig;
-import net.kuujo.copycat.cluster.ClusterConfig;
-import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
-import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
-import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
-
-/**
- * Manages a PartitionedDatabase.
- */
-public interface PartitionedDatabaseManager {
-    /**
-     * Opens the database.
-     *
-     * @return A completable future to be completed with the result once complete.
-     */
-    CompletableFuture<PartitionedDatabase> open();
-
-    /**
-     * Closes the database.
-     *
-     * @return A completable future to be completed with the result once complete.
-     */
-    CompletableFuture<Void> close();
-
-    /**
-     * Sets the partitioner to use for mapping keys to partitions.
-     *
-     * @param partitioner partitioner
-     */
-    void setPartitioner(Partitioner<String> partitioner);
-
-    /**
-     * Registers a new partition.
-     *
-     * @param partitionName partition name.
-     * @param partition partition.
-     */
-    void registerPartition(String partitionName, Database partition);
-
-    /**
-     * Returns all the registered database partitions.
-     *
-     * @return mapping of all registered database partitions.
-     */
-    Map<String, Database> getRegisteredPartitions();
-
-
-    /**
-     * Creates a new partitioned database.
-     *
-     * @param name The database name.
-     * @param clusterConfig The cluster configuration.
-     * @param partitionedDatabaseConfig The database configuration.
-
-     * @return The database.
-     */
-    public static PartitionedDatabase create(
-            String name,
-            ClusterConfig clusterConfig,
-            PartitionedDatabaseConfig partitionedDatabaseConfig) {
-        CopycatConfig copycatConfig = new CopycatConfig()
-            .withName(name)
-            .withClusterConfig(clusterConfig)
-            .withDefaultSerializer(new DatabaseSerializer())
-            .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
-        ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
-        PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
-        partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
-            partitionedDatabase.registerPartition(partitionName ,
-                    coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
-                        .withSerializer(copycatConfig.getDefaultSerializer())
-                        .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
-        partitionedDatabase.setPartitioner(
-                new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));
-        return partitionedDatabase;
-    }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
index 2162f9a..4475bc9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
@@ -16,7 +16,7 @@
 
 package org.onosproject.store.consistent.impl;
 
-import java.util.Map;
+import java.util.List;
 
 /**
  * A simple Partitioner for mapping keys to database partitions.
@@ -27,12 +27,12 @@
  */
 public class SimpleKeyHashPartitioner extends DatabasePartitioner {
 
-    public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) {
-        super(partitionMap);
+    public SimpleKeyHashPartitioner(List<Database> partitions) {
+        super(partitions);
     }
 
     @Override
     public Database getPartition(String tableName, String key) {
-        return sortedPartitions[hash(key) % sortedPartitions.length];
+        return partitions.get(hash(key) % partitions.size());
     }
 }
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
index 1adb921..adc5477 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
@@ -16,7 +16,7 @@
 
 package org.onosproject.store.consistent.impl;
 
-import java.util.Map;
+import java.util.List;
 
 /**
  * A simple Partitioner that uses the table name hash to
@@ -28,12 +28,12 @@
  */
 public class SimpleTableHashPartitioner extends DatabasePartitioner {
 
-    public SimpleTableHashPartitioner(Map<String, Database> partitionMap) {
-        super(partitionMap);
+    public SimpleTableHashPartitioner(List<Database> partitions) {
+        super(partitions);
     }
 
     @Override
     public Database getPartition(String tableName, String key) {
-        return sortedPartitions[hash(tableName) % sortedPartitions.length];
+        return partitions.get(hash(tableName) % partitions.size());
     }
 }
\ No newline at end of file