WIP: Partitioned Database based on Raft.
Removed the implementation based on previous Copycat API.

Change-Id: I6b9d67e943e17095f585ae2a2cb6304c248cd686
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
new file mode 100644
index 0000000..cc1936a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
@@ -0,0 +1,69 @@
+package org.onosproject.store.consistent.impl;
+
+
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
+import net.kuujo.copycat.resource.Resource;
+
+/**
+ * Database.
+ */
+public interface Database extends DatabaseProxy<String, byte[]>, Resource<Database> {
+
+  /**
+   * Creates a new database with the default cluster configuration.<p>
+   *
+   * The database will be constructed with the default cluster configuration. The default cluster configuration
+   * searches for two resources on the classpath - {@code cluster} and {cluster-defaults} - in that order. Configuration
+   * options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p>
+   *
+   * Additionally, the database will be constructed with an database configuration that searches the classpath for
+   * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+   * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
+   * as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
+   * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
+   *
+   * @param name The database name.
+   * @return The database.
+   */
+  static Database create(String name) {
+    return create(name, new ClusterConfig(), new DatabaseConfig());
+  }
+
+  /**
+   * Creates a new database.<p>
+   *
+   * The database will be constructed with an database configuration that searches the classpath for
+   * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+   * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
+   * as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
+   * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
+   *
+   * @param name The database name.
+   * @param cluster The cluster configuration.
+   * @return The database.
+   */
+  static Database create(String name, ClusterConfig cluster) {
+    return create(name, cluster, new DatabaseConfig());
+  }
+
+  /**
+   * Creates a new database.
+   *
+   * @param name The database name.
+   * @param cluster The cluster configuration.
+   * @param config The database configuration.
+
+   * @return The database.
+   */
+  static Database create(String name, ClusterConfig cluster, DatabaseConfig config) {
+    ClusterCoordinator coordinator =
+            new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
+    return coordinator.<Database>getResource(name, config.resolve(cluster))
+      .addStartupTask(() -> coordinator.open().thenApply(v -> null))
+      .addShutdownTask(coordinator::close);
+  }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
new file mode 100644
index 0000000..eda1e99
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
@@ -0,0 +1,108 @@
+package org.onosproject.store.consistent.impl;
+
+import com.typesafe.config.ConfigValueFactory;
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
+import net.kuujo.copycat.protocol.Consistency;
+import net.kuujo.copycat.resource.ResourceConfig;
+import net.kuujo.copycat.state.StateLogConfig;
+import net.kuujo.copycat.util.internal.Assert;
+
+import java.util.Map;
+
+/**
+ * Database configuration.
+ *
+ */
+public class DatabaseConfig extends ResourceConfig<DatabaseConfig> {
+  private static final String DATABASE_CONSISTENCY = "consistency";
+
+  private static final String DEFAULT_CONFIGURATION = "database-defaults";
+  private static final String CONFIGURATION = "database";
+
+  public DatabaseConfig() {
+    super(CONFIGURATION, DEFAULT_CONFIGURATION);
+  }
+
+  public DatabaseConfig(Map<String, Object> config) {
+    super(config, CONFIGURATION, DEFAULT_CONFIGURATION);
+  }
+
+  public DatabaseConfig(String resource) {
+    super(resource, CONFIGURATION, DEFAULT_CONFIGURATION);
+  }
+
+  protected DatabaseConfig(DatabaseConfig config) {
+    super(config);
+  }
+
+  @Override
+  public DatabaseConfig copy() {
+    return new DatabaseConfig(this);
+  }
+
+  /**
+   * Sets the database read consistency.
+   *
+   * @param consistency The database read consistency.
+   * @throws java.lang.NullPointerException If the consistency is {@code null}
+   */
+  public void setConsistency(String consistency) {
+    this.config = config.withValue(DATABASE_CONSISTENCY,
+            ConfigValueFactory.fromAnyRef(
+                    Consistency.parse(Assert.isNotNull(consistency, "consistency")).toString()));
+  }
+
+  /**
+   * Sets the database read consistency.
+   *
+   * @param consistency The database read consistency.
+   * @throws java.lang.NullPointerException If the consistency is {@code null}
+   */
+  public void setConsistency(Consistency consistency) {
+    this.config = config.withValue(DATABASE_CONSISTENCY,
+            ConfigValueFactory.fromAnyRef(
+                    Assert.isNotNull(consistency, "consistency").toString()));
+  }
+
+  /**
+   * Returns the database read consistency.
+   *
+   * @return The database read consistency.
+   */
+  public Consistency getConsistency() {
+    return Consistency.parse(config.getString(DATABASE_CONSISTENCY));
+  }
+
+  /**
+   * Sets the database read consistency, returning the configuration for method chaining.
+   *
+   * @param consistency The database read consistency.
+   * @return The database configuration.
+   * @throws java.lang.NullPointerException If the consistency is {@code null}
+   */
+  public DatabaseConfig withConsistency(String consistency) {
+    setConsistency(consistency);
+    return this;
+  }
+
+  /**
+   * Sets the database read consistency, returning the configuration for method chaining.
+   *
+   * @param consistency The database read consistency.
+   * @return The database configuration.
+   * @throws java.lang.NullPointerException If the consistency is {@code null}
+   */
+  public DatabaseConfig withConsistency(Consistency consistency) {
+    setConsistency(consistency);
+    return this;
+  }
+
+  @Override
+  public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
+    return new StateLogConfig(toMap())
+      .resolve(cluster)
+      .withResourceType(DefaultDatabase.class);
+  }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
new file mode 100644
index 0000000..f83e8f8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -0,0 +1,168 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Database proxy.
+ */
+public interface DatabaseProxy<K, V> {
+
+  /**
+   * Gets the table size.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Integer> size(String tableName);
+
+  /**
+   * Checks whether the table is empty.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> isEmpty(String tableName);
+
+  /**
+   * Checks whether the table contains a key.
+   *
+   * @param tableName table name
+   * @param key The key to check.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> containsKey(String tableName, K key);
+
+  /**
+   * Checks whether the table contains a value.
+   *
+   * @param tableName table name
+   * @param value The value to check.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> containsValue(String tableName, V value);
+
+  /**
+   * Gets a value from the table.
+   *
+   * @param tableName table name
+   * @param key The key to get.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Versioned<V>> get(String tableName, K key);
+
+  /**
+   * Puts a value in the table.
+   *
+   * @param tableName table name
+   * @param key The key to set.
+   * @param value The value to set.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Versioned<V>> put(String tableName, K key, V value);
+
+  /**
+   * Removes a value from the table.
+   *
+   * @param tableName table name
+   * @param key The key to remove.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Versioned<V>> remove(String tableName, K key);
+
+  /**
+   * Clears the table.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Void> clear(String tableName);
+
+  /**
+   * Gets a set of keys in the table.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Set<K>> keySet(String tableName);
+
+  /**
+   * Gets a collection of values in the table.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Collection<Versioned<V>>> values(String tableName);
+
+  /**
+   * Gets a set of entries in the table.
+   *
+   * @param tableName table name
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet(String tableName);
+
+  /**
+   * Puts a value in the table if the given key does not exist.
+   *
+   * @param tableName table name
+   * @param key The key to set.
+   * @param value The value to set if the given key does not exist.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Versioned<V>> putIfAbsent(String tableName, K key, V value);
+
+  /**
+   * Removes a key and if the existing value for that key matches the specified value.
+   *
+   * @param tableName table name
+   * @param key The key to remove.
+   * @param value The value to remove.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> remove(String tableName, K key, V value);
+
+  /**
+   * Removes a key and if the existing version for that key matches the specified version.
+   *
+   * @param tableName table name
+   * @param key The key to remove.
+   * @param version The expected version.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> remove(String tableName, K key, long version);
+
+  /**
+   * Replaces the entry for the specified key only if currently mapped to the specified value.
+   *
+   * @param tableName table name
+   * @param key The key to replace.
+   * @param oldValue The value to replace.
+   * @param newValue The value with which to replace the given key and value.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> replace(String tableName, K key, V oldValue, V newValue);
+
+  /**
+   * Replaces the entry for the specified key only if currently mapped to the specified version.
+   *
+   * @param tableName table name
+   * @param key The key to update
+   * @param oldVersion existing version in the map for this replace to succeed.
+   * @param newValue The value with which to replace the given key and version.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
+
+  /**
+   * Perform a atomic batch update operation i.e. either all operations in batch succeed or
+   * none do and no state changes are made.
+   *
+   * @param updates list of updates to apply atomically.
+   * @return A completable future to be completed with the result once complete.
+   */
+  CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<K, V>> updates);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
new file mode 100644
index 0000000..89a51e8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -0,0 +1,77 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.kuujo.copycat.state.Command;
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.Query;
+import net.kuujo.copycat.state.StateContext;
+
+/**
+ * Database state.
+ *
+ */
+public interface DatabaseState<K, V> {
+
+  /**
+   * Initializes the database state.
+   *
+   * @param context The map state context.
+   */
+  @Initializer
+  public void init(StateContext<DatabaseState<K, V>> context);
+
+  @Query
+  int size(String tableName);
+
+  @Query
+  boolean isEmpty(String tableName);
+
+  @Query
+  boolean containsKey(String tableName, K key);
+
+  @Query
+  boolean containsValue(String tableName, V value);
+
+  @Query
+  Versioned<V> get(String tableName, K key);
+
+  @Command
+  Versioned<V> put(String tableName, K key, V value);
+
+  @Command
+  Versioned<V> remove(String tableName, K key);
+
+  @Command
+  void clear(String tableName);
+
+  @Query
+  Set<K> keySet(String tableName);
+
+  @Query
+  Collection<Versioned<V>> values(String tableName);
+
+  @Query
+  Set<Entry<K, Versioned<V>>> entrySet(String tableName);
+
+  @Command
+  Versioned<V> putIfAbsent(String tableName, K key, V value);
+
+  @Command
+  boolean remove(String tableName, K key, V value);
+
+  @Command
+  boolean remove(String tableName, K key, long version);
+
+  @Command
+  boolean replace(String tableName, K key, V oldValue, V newValue);
+
+  @Command
+  boolean replace(String tableName, K key, long oldVersion, V newValue);
+
+  @Command
+  boolean batchUpdate(List<UpdateOperation<K, V>> updates);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
new file mode 100644
index 0000000..837f3b4
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -0,0 +1,146 @@
+package org.onosproject.store.consistent.impl;
+
+import net.kuujo.copycat.resource.internal.ResourceContext;
+import net.kuujo.copycat.state.StateMachine;
+import net.kuujo.copycat.resource.internal.AbstractResource;
+import net.kuujo.copycat.state.internal.DefaultStateMachine;
+import net.kuujo.copycat.util.concurrent.Futures;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+/**
+ * Default database.
+ */
+public class DefaultDatabase extends AbstractResource<Database> implements Database {
+  private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
+  private DatabaseProxy<String, byte[]> proxy;
+
+  @SuppressWarnings("unchecked")
+  public DefaultDatabase(ResourceContext context) {
+    super(context);
+    this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
+  }
+
+  /**
+   * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
+   * return the completed future result.
+   *
+   * @param supplier The supplier to call if the database is open.
+   * @param <T> The future result type.
+   * @return A completable future that if this database is closed is immediately failed.
+   */
+  protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
+    if (proxy == null) {
+      return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
+    }
+    return supplier.get();
+  }
+
+  @Override
+  public CompletableFuture<Integer> size(String tableName) {
+    return checkOpen(() -> proxy.size(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> isEmpty(String tableName) {
+    return checkOpen(() -> proxy.isEmpty(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+    return checkOpen(() -> proxy.containsKey(tableName, key));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+    return checkOpen(() -> proxy.containsValue(tableName, value));
+  }
+
+  @Override
+  public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+    return checkOpen(() -> proxy.get(tableName, key));
+  }
+
+  @Override
+  public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+    return checkOpen(() -> proxy.put(tableName, key, value));
+  }
+
+  @Override
+  public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+    return checkOpen(() -> proxy.remove(tableName, key));
+  }
+
+  @Override
+  public CompletableFuture<Void> clear(String tableName) {
+    return checkOpen(() -> proxy.clear(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Set<String>> keySet(String tableName) {
+    return checkOpen(() -> proxy.keySet(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+    return checkOpen(() -> proxy.values(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+    return checkOpen(() -> proxy.entrySet(tableName));
+  }
+
+  @Override
+  public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+    return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+    return checkOpen(() -> proxy.remove(tableName, key, value));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+    return checkOpen(() -> proxy.remove(tableName, key, version));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+    return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+    return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
+  }
+
+  @Override
+  public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+      return checkOpen(() -> proxy.atomicBatchUpdate(updates));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public synchronized CompletableFuture<Database> open() {
+    return runStartupTasks()
+      .thenCompose(v -> stateMachine.open())
+      .thenRun(() -> {
+        this.proxy = stateMachine.createProxy(DatabaseProxy.class);
+      })
+      .thenApply(v -> null);
+  }
+
+  @Override
+  public synchronized CompletableFuture<Void> close() {
+    proxy = null;
+    return stateMachine.close()
+      .thenCompose(v -> runShutdownTasks());
+  }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
new file mode 100644
index 0000000..2b20f53
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -0,0 +1,217 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.StateContext;
+
+/**
+ * Default database state.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
+
+    private Long nextVersion;
+    private Map<String, Map<K, Versioned<V>>> tables;
+
+    @Initializer
+    @Override
+    public void init(StateContext<DatabaseState<K, V>> context) {
+        tables = context.get("tables");
+        if (tables == null) {
+            tables = new HashMap<>();
+            context.put("tables", tables);
+        }
+        nextVersion = context.get("nextVersion");
+        if (nextVersion == null) {
+            nextVersion = new Long(0);
+            context.put("nextVersion", nextVersion);
+        }
+    }
+
+    private Map<K, Versioned<V>> getTableMap(String tableName) {
+        Map<K, Versioned<V>> table = tables.get(tableName);
+        if (table == null) {
+            table = new HashMap<>();
+            tables.put(tableName, table);
+        }
+        return table;
+    }
+
+    @Override
+    public int size(String tableName) {
+      return getTableMap(tableName).size();
+    }
+
+    @Override
+    public boolean isEmpty(String tableName) {
+        return getTableMap(tableName).isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(String tableName, K key) {
+        return getTableMap(tableName).containsKey(key);
+    }
+
+    @Override
+    public boolean containsValue(String tableName, V value) {
+        return getTableMap(tableName).values().stream().anyMatch(v -> checkEquality(v.value(), value));
+    }
+
+    @Override
+    public Versioned<V> get(String tableName, K key) {
+        return getTableMap(tableName).get(key);
+    }
+
+    @Override
+    public Versioned<V> put(String tableName, K key, V value) {
+        return getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion));
+    }
+
+    @Override
+    public Versioned<V> remove(String tableName, K key) {
+        return getTableMap(tableName).remove(key);
+    }
+
+    @Override
+    public void clear(String tableName) {
+        getTableMap(tableName).clear();
+    }
+
+    @Override
+    public Set<K> keySet(String tableName) {
+        return getTableMap(tableName).keySet();
+    }
+
+    @Override
+    public Collection<Versioned<V>> values(String tableName) {
+        return getTableMap(tableName).values();
+    }
+
+    @Override
+    public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
+        return getTableMap(tableName).entrySet();
+    }
+
+    @Override
+    public Versioned<V> putIfAbsent(String tableName, K key, V value) {
+        Versioned<V> existingValue = getTableMap(tableName).get(key);
+        return existingValue != null ? existingValue : put(tableName, key, value);
+    }
+
+    @Override
+    public boolean remove(String tableName, K key, V value) {
+        Versioned<V> existing = getTableMap(tableName).get(key);
+        if (existing != null && existing.value().equals(value)) {
+            getTableMap(tableName).remove(key);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean remove(String tableName, K key, long version) {
+        Versioned<V> existing = getTableMap(tableName).get(key);
+        if (existing != null && existing.version() == version) {
+            remove(tableName, key);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean replace(String tableName, K key, V oldValue, V newValue) {
+        Versioned<V> existing = getTableMap(tableName).get(key);
+        if (existing != null && existing.value().equals(oldValue)) {
+            put(tableName, key, newValue);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean replace(String tableName, K key, long oldVersion, V newValue) {
+        Versioned<V> existing = getTableMap(tableName).get(key);
+        if (existing != null && existing.version() == oldVersion) {
+            put(tableName, key, newValue);
+            return true;
+        }
+        return false;
+    }
+
+    @Override
+    public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
+        if (updates.stream().anyMatch(update -> !checkIfUpdateIsPossible(update))) {
+            return false;
+        } else {
+            updates.stream().forEach(this::doUpdate);
+            return true;
+        }
+    }
+
+    private void doUpdate(UpdateOperation<K, V> update) {
+        String tableName = update.tableName();
+        K key = update.key();
+        switch (update.type()) {
+        case PUT:
+            put(tableName, key, update.value());
+            return;
+        case REMOVE:
+            remove(tableName, key);
+            return;
+        case PUT_IF_ABSENT:
+            putIfAbsent(tableName, key, update.value());
+            return;
+        case PUT_IF_VERSION_MATCH:
+            replace(tableName, key, update.currentValue(), update.value());
+            return;
+        case PUT_IF_VALUE_MATCH:
+            replace(tableName, key, update.currentVersion(), update.value());
+            return;
+        case REMOVE_IF_VERSION_MATCH:
+            remove(tableName, key, update.currentVersion());
+            return;
+        case REMOVE_IF_VALUE_MATCH:
+            remove(tableName, key, update.currentValue());
+            return;
+        default:
+            throw new IllegalStateException("Unsupported type: " + update.type());
+        }
+    }
+
+    private boolean checkIfUpdateIsPossible(UpdateOperation<K, V> update) {
+        Versioned<V> existingEntry = get(update.tableName(), update.key());
+        switch (update.type()) {
+        case PUT:
+        case REMOVE:
+            return true;
+        case PUT_IF_ABSENT:
+            return existingEntry == null;
+        case PUT_IF_VERSION_MATCH:
+            return existingEntry != null && existingEntry.version() == update.currentVersion();
+        case PUT_IF_VALUE_MATCH:
+            return existingEntry != null && existingEntry.value().equals(update.currentValue());
+        case REMOVE_IF_VERSION_MATCH:
+            return existingEntry == null || existingEntry.version() == update.currentVersion();
+        case REMOVE_IF_VALUE_MATCH:
+            return existingEntry == null || existingEntry.value().equals(update.currentValue());
+        default:
+            throw new IllegalStateException("Unsupported type: " + update.type());
+        }
+    }
+
+    private boolean checkEquality(V value1, V value2) {
+        if (value1 instanceof byte[]) {
+            return Arrays.equals((byte[]) value1, (byte[]) value2);
+        }
+        return value1.equals(value2);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
new file mode 100644
index 0000000..d35aca2
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -0,0 +1,208 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+
+/**
+ * A database that partitions the keys across one or more database partitions.
+ */
+public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
+
+    private Partitioner<String> partitioner;
+    private final ClusterCoordinator coordinator;
+    private final Map<String, Database> partitions = Maps.newConcurrentMap();
+
+    protected PartitionedDatabase(ClusterCoordinator coordinator) {
+        this.coordinator = coordinator;
+    }
+
+    @Override
+    public void registerPartition(String name, Database partition) {
+        partitions.put(name, partition);
+    }
+
+    @Override
+    public Map<String, Database> getRegisteredPartitions() {
+        return ImmutableMap.copyOf(partitions);
+    }
+
+    @Override
+    public CompletableFuture<Integer> size(String tableName) {
+        AtomicInteger totalSize = new AtomicInteger(0);
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> totalSize.get());
+    }
+
+    @Override
+    public CompletableFuture<Boolean> isEmpty(String tableName) {
+        return size(tableName).thenApply(size -> size == 0);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+        return partitioner.getPartition(tableName, key).containsKey(tableName, key);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+        AtomicBoolean containsValue = new AtomicBoolean(false);
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> containsValue.get());
+    }
+
+    @Override
+    public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+        return partitioner.getPartition(tableName, key).get(tableName, key);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+        return partitioner.getPartition(tableName, key).put(tableName, key, value);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+        return partitioner.getPartition(tableName, key).remove(tableName, key);
+    }
+
+    @Override
+    public CompletableFuture<Void> clear(String tableName) {
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.clear(tableName))
+                    .toArray(CompletableFuture[]::new));
+    }
+
+    @Override
+    public CompletableFuture<Set<String>> keySet(String tableName) {
+        Set<String> keySet = Sets.newConcurrentHashSet();
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> keySet);
+    }
+
+    @Override
+    public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+        List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.values(tableName))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> values);
+    }
+
+    @Override
+    public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+        Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
+        return CompletableFuture.allOf(partitions
+                    .values()
+                    .stream()
+                    .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
+                    .toArray(CompletableFuture[]::new))
+                .thenApply(v -> entrySet);
+    }
+
+    @Override
+    public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+        return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+        return partitioner.getPartition(tableName, key).remove(tableName, key, value);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+        return partitioner.getPartition(tableName, key).remove(tableName, key, version);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+        return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+        return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
+    }
+
+    @Override
+    public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+        Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
+        for (UpdateOperation<String, byte[]> update : updates) {
+            Database partition = partitioner.getPartition(update.tableName(), update.key());
+            List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
+            if (partitionUpdates == null) {
+                partitionUpdates = Lists.newArrayList();
+                perPartitionUpdates.put(partition, partitionUpdates);
+            }
+            partitionUpdates.add(update);
+        }
+        if (perPartitionUpdates.size() > 1) {
+            // TODO
+            throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
+        } else {
+            Entry<Database, List<UpdateOperation<String, byte[]>>> only =
+                    perPartitionUpdates.entrySet().iterator().next();
+            return only.getKey().atomicBatchUpdate(only.getValue());
+        }
+    }
+
+    @Override
+    public void setPartitioner(Partitioner<String> partitioner) {
+        this.partitioner = partitioner;
+    }
+
+    @Override
+    public CompletableFuture<PartitionedDatabase> open() {
+        return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
+                                                        .values()
+                                                        .stream()
+                                                        .map(Database::open)
+                                                        .collect(Collectors.toList())
+                                                        .toArray(new CompletableFuture[partitions.size()]))
+                                 .thenApply(v -> this));
+
+    }
+
+    @Override
+    public CompletableFuture<Void> close() {
+        CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
+                .values()
+                .stream()
+                .map(database -> database.close())
+                .collect(Collectors.toList())
+                .toArray(new CompletableFuture[partitions.size()]));
+        CompletableFuture<Void> closeCoordinator = coordinator.close();
+        return closePartitions.thenCompose(v -> closeCoordinator);
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
new file mode 100644
index 0000000..6d375cc
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
@@ -0,0 +1,31 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Partitioned database configuration.
+ */
+public class PartitionedDatabaseConfig {
+    private final Map<String, DatabaseConfig> partitions = new HashMap<>();
+
+    /**
+     * Returns the configuration for all partitions.
+     * @return partition map to configuartion mapping.
+     */
+    public Map<String, DatabaseConfig> partitions() {
+        return Collections.unmodifiableMap(partitions);
+    }
+
+    /**
+     * Adds the specified partition name and configuration.
+     * @param name partition name.
+     * @param config partition config
+     * @return this instance
+     */
+    public PartitionedDatabaseConfig withPartition(String name, DatabaseConfig config) {
+        partitions.put(name, config);
+        return this;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
new file mode 100644
index 0000000..44cd3a1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
@@ -0,0 +1,79 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+import net.kuujo.copycat.CopycatConfig;
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
+import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
+
+public interface PartitionedDatabaseManager {
+    /**
+     * Opens the database.
+     *
+     * @return A completable future to be completed with the result once complete.
+     */
+    CompletableFuture<PartitionedDatabase> open();
+
+    /**
+     * Closes the database.
+     *
+     * @return A completable future to be completed with the result once complete.
+     */
+    CompletableFuture<Void> close();
+
+    /**
+     * Sets the partitioner to use for mapping keys to partitions.
+     *
+     * @param partitioner partitioner
+     */
+    void setPartitioner(Partitioner<String> partitioner);
+
+    /**
+     * Registers a new partition.
+     *
+     * @param partitionName partition name.
+     * @param partition partition.
+     */
+    void registerPartition(String partitionName, Database partition);
+
+    /**
+     * Returns all the registered database partitions.
+     *
+     * @return mapping of all registered database partitions.
+     */
+    Map<String, Database> getRegisteredPartitions();
+
+
+    /**
+     * Creates a new partitioned database.
+     *
+     * @param name The database name.
+     * @param clusterConfig The cluster configuration.
+     * @param partitionedDatabaseConfig The database configuration.
+
+     * @return The database.
+     */
+    public static PartitionedDatabase create(
+            String name,
+            ClusterConfig clusterConfig,
+            PartitionedDatabaseConfig partitionedDatabaseConfig) {
+        CopycatConfig copycatConfig = new CopycatConfig()
+            .withName(name)
+            .withClusterConfig(clusterConfig)
+            .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
+        ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
+        PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
+        partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
+            partitionedDatabase.registerPartition(partitionName ,
+                    coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
+                        .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
+                        .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
+        partitionedDatabase.setPartitioner(
+                new SimpleKeyHashPartitioner<>(partitionedDatabase.getRegisteredPartitions()));
+        return partitionedDatabase;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java
new file mode 100644
index 0000000..df8d56e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java
@@ -0,0 +1,17 @@
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Partitioner is responsible for mapping keys to individual database partitions.
+ *
+ * @param <K> key type.
+ */
+public interface Partitioner<K> {
+
+    /**
+     * Returns the database partition.
+     * @param tableName table name
+     * @param key key
+     * @return Database partition
+     */
+    Database getPartition(String tableName, K key);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
new file mode 100644
index 0000000..5410f9f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
@@ -0,0 +1,31 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+/**
+ * A simple Partitioner that uses the key hashCode to map
+ * key to a partition.
+ *
+ * @param <K> key type.
+ */
+public class SimpleKeyHashPartitioner<K> implements Partitioner<K> {
+
+    private final Map<String, Database> partitionMap;
+    private final List<String> sortedPartitionNames;
+
+    public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) {
+        this.partitionMap = ImmutableMap.copyOf(partitionMap);
+        sortedPartitionNames = Lists.newArrayList(this.partitionMap.keySet());
+        Collections.sort(sortedPartitionNames);
+    }
+
+    @Override
+    public Database getPartition(String tableName, K key) {
+        return partitionMap.get(sortedPartitionNames.get(Math.abs(key.hashCode()) % partitionMap.size()));
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
new file mode 100644
index 0000000..9d52a09
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
@@ -0,0 +1,151 @@
+package org.onosproject.store.consistent.impl;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Database update operation.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+public class UpdateOperation<K, V> {
+
+    /**
+     * Type of database update operation.
+     */
+    public static enum Type {
+        PUT,
+        PUT_IF_ABSENT,
+        PUT_IF_VERSION_MATCH,
+        PUT_IF_VALUE_MATCH,
+        REMOVE,
+        REMOVE_IF_VERSION_MATCH,
+        REMOVE_IF_VALUE_MATCH,
+    }
+
+    private Type type;
+    private String tableName;
+    private K key;
+    private V value;
+    private V currentValue;
+    private long currentVersion;
+
+    /**
+     * Returns the type of update operation.
+     * @return type of update.
+     */
+    public Type type() {
+        return type;
+    }
+
+    /**
+     * Returns the tableName being updated.
+     * @return table name.
+     */
+    public String tableName() {
+        return tableName;
+    }
+
+    /**
+     * Returns the item key being updated.
+     * @return item key
+     */
+    public K key() {
+        return key;
+    }
+
+    /**
+     * Returns the new value.
+     * @return item's target value.
+     */
+    public V value() {
+        return value;
+    }
+
+    /**
+     * Returns the expected current value in the database value for the key.
+     * @return current value in database.
+     */
+    public V currentValue() {
+        return currentValue;
+    }
+
+    /**
+     * Returns the expected current version in the database for the key.
+     * @return expected version.
+     */
+    public long currentVersion() {
+        return currentVersion;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("type", type)
+            .add("tableName", tableName)
+            .add("key", key)
+            .add("value", value)
+            .add("currentValue", currentValue)
+            .add("currentVersion", currentVersion)
+            .toString();
+    }
+
+    /**
+     * UpdatOperation builder.
+     *
+     * @param <K> key type.
+     * @param <V> value type.
+     */
+    public static final class Builder<K, V> {
+
+        private UpdateOperation<K, V> operation = new UpdateOperation<>();
+
+        /**
+         * Creates a new builder instance.
+         * @param <K> key type.
+         * @param <V> value type.
+         *
+         * @return builder.
+         */
+        public static <K, V> Builder<K, V> builder() {
+            return new Builder<>();
+        }
+
+        private Builder() {
+        }
+
+        public UpdateOperation<K, V> build() {
+            return operation;
+        }
+
+        public Builder<K, V> withType(Type type) {
+            operation.type = type;
+            return this;
+        }
+
+        public Builder<K, V> withTableName(String tableName) {
+            operation.tableName = tableName;
+            return this;
+        }
+
+        public Builder<K, V> withKey(K key) {
+            operation.key = key;
+            return this;
+        }
+
+        public Builder<K, V> withCurrentValue(V value) {
+            operation.currentValue = value;
+            return this;
+        }
+
+        public Builder<K, V> withValue(V value) {
+            operation.value = value;
+            return this;
+        }
+
+        public Builder<K, V> withCurrentVersion(long version) {
+            operation.currentVersion = version;
+            return this;
+        }
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
new file mode 100644
index 0000000..6eb908d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
@@ -0,0 +1,50 @@
+package org.onosproject.store.consistent.impl;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Versioned value.
+ *
+ * @param <V> value type.
+ */
+public class Versioned<V> {
+
+    private final V value;
+    private final long version;
+
+    /**
+     * Constructs a new versioned value.
+     * @param value value
+     * @param version version
+     */
+    public Versioned(V value, long version) {
+        this.value = value;
+        this.version = version;
+    }
+
+    /**
+     * Returns the value.
+     *
+     * @return value.
+     */
+    public V value() {
+        return value;
+    }
+
+    /**
+     * Returns the version.
+     *
+     * @return version
+     */
+    public long version() {
+        return version;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(this)
+            .add("value", value)
+            .add("version", version)
+            .toString();
+    }
+}