WIP: Partitioned Database based on Raft.
Removed the implementation based on previous Copycat API.
Change-Id: I6b9d67e943e17095f585ae2a2cb6304c248cd686
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
new file mode 100644
index 0000000..cc1936a
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
@@ -0,0 +1,69 @@
+package org.onosproject.store.consistent.impl;
+
+
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+import net.kuujo.copycat.cluster.internal.coordinator.CoordinatorConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
+import net.kuujo.copycat.resource.Resource;
+
+/**
+ * Database.
+ */
+public interface Database extends DatabaseProxy<String, byte[]>, Resource<Database> {
+
+ /**
+ * Creates a new database with the default cluster configuration.<p>
+ *
+ * The database will be constructed with the default cluster configuration. The default cluster configuration
+ * searches for two resources on the classpath - {@code cluster} and {cluster-defaults} - in that order. Configuration
+ * options specified in {@code cluster.conf} will override those in {cluster-defaults.conf}.<p>
+ *
+ * Additionally, the database will be constructed with an database configuration that searches the classpath for
+ * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+ * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
+ * as the map resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
+ * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
+ *
+ * @param name The database name.
+ * @return The database.
+ */
+ static Database create(String name) {
+ return create(name, new ClusterConfig(), new DatabaseConfig());
+ }
+
+ /**
+ * Creates a new database.<p>
+ *
+ * The database will be constructed with an database configuration that searches the classpath for
+ * three configuration files - {@code {name}}, {@code database}, {@code database-defaults}, {@code resource}, and
+ * {@code resource-defaults} - in that order. The first resource is a configuration resource with the same name
+ * as the database resource. If the resource is namespaced - e.g. `databases.my-database.conf` - then resource
+ * configurations will be loaded according to namespaces as well; for example, `databases.conf`.
+ *
+ * @param name The database name.
+ * @param cluster The cluster configuration.
+ * @return The database.
+ */
+ static Database create(String name, ClusterConfig cluster) {
+ return create(name, cluster, new DatabaseConfig());
+ }
+
+ /**
+ * Creates a new database.
+ *
+ * @param name The database name.
+ * @param cluster The cluster configuration.
+ * @param config The database configuration.
+
+ * @return The database.
+ */
+ static Database create(String name, ClusterConfig cluster, DatabaseConfig config) {
+ ClusterCoordinator coordinator =
+ new DefaultClusterCoordinator(new CoordinatorConfig().withName(name).withClusterConfig(cluster));
+ return coordinator.<Database>getResource(name, config.resolve(cluster))
+ .addStartupTask(() -> coordinator.open().thenApply(v -> null))
+ .addShutdownTask(coordinator::close);
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
new file mode 100644
index 0000000..eda1e99
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
@@ -0,0 +1,108 @@
+package org.onosproject.store.consistent.impl;
+
+import com.typesafe.config.ConfigValueFactory;
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.CoordinatedResourceConfig;
+import net.kuujo.copycat.protocol.Consistency;
+import net.kuujo.copycat.resource.ResourceConfig;
+import net.kuujo.copycat.state.StateLogConfig;
+import net.kuujo.copycat.util.internal.Assert;
+
+import java.util.Map;
+
+/**
+ * Database configuration.
+ *
+ */
+public class DatabaseConfig extends ResourceConfig<DatabaseConfig> {
+ private static final String DATABASE_CONSISTENCY = "consistency";
+
+ private static final String DEFAULT_CONFIGURATION = "database-defaults";
+ private static final String CONFIGURATION = "database";
+
+ public DatabaseConfig() {
+ super(CONFIGURATION, DEFAULT_CONFIGURATION);
+ }
+
+ public DatabaseConfig(Map<String, Object> config) {
+ super(config, CONFIGURATION, DEFAULT_CONFIGURATION);
+ }
+
+ public DatabaseConfig(String resource) {
+ super(resource, CONFIGURATION, DEFAULT_CONFIGURATION);
+ }
+
+ protected DatabaseConfig(DatabaseConfig config) {
+ super(config);
+ }
+
+ @Override
+ public DatabaseConfig copy() {
+ return new DatabaseConfig(this);
+ }
+
+ /**
+ * Sets the database read consistency.
+ *
+ * @param consistency The database read consistency.
+ * @throws java.lang.NullPointerException If the consistency is {@code null}
+ */
+ public void setConsistency(String consistency) {
+ this.config = config.withValue(DATABASE_CONSISTENCY,
+ ConfigValueFactory.fromAnyRef(
+ Consistency.parse(Assert.isNotNull(consistency, "consistency")).toString()));
+ }
+
+ /**
+ * Sets the database read consistency.
+ *
+ * @param consistency The database read consistency.
+ * @throws java.lang.NullPointerException If the consistency is {@code null}
+ */
+ public void setConsistency(Consistency consistency) {
+ this.config = config.withValue(DATABASE_CONSISTENCY,
+ ConfigValueFactory.fromAnyRef(
+ Assert.isNotNull(consistency, "consistency").toString()));
+ }
+
+ /**
+ * Returns the database read consistency.
+ *
+ * @return The database read consistency.
+ */
+ public Consistency getConsistency() {
+ return Consistency.parse(config.getString(DATABASE_CONSISTENCY));
+ }
+
+ /**
+ * Sets the database read consistency, returning the configuration for method chaining.
+ *
+ * @param consistency The database read consistency.
+ * @return The database configuration.
+ * @throws java.lang.NullPointerException If the consistency is {@code null}
+ */
+ public DatabaseConfig withConsistency(String consistency) {
+ setConsistency(consistency);
+ return this;
+ }
+
+ /**
+ * Sets the database read consistency, returning the configuration for method chaining.
+ *
+ * @param consistency The database read consistency.
+ * @return The database configuration.
+ * @throws java.lang.NullPointerException If the consistency is {@code null}
+ */
+ public DatabaseConfig withConsistency(Consistency consistency) {
+ setConsistency(consistency);
+ return this;
+ }
+
+ @Override
+ public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
+ return new StateLogConfig(toMap())
+ .resolve(cluster)
+ .withResourceType(DefaultDatabase.class);
+ }
+
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
new file mode 100644
index 0000000..f83e8f8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -0,0 +1,168 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Database proxy.
+ */
+public interface DatabaseProxy<K, V> {
+
+ /**
+ * Gets the table size.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Integer> size(String tableName);
+
+ /**
+ * Checks whether the table is empty.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> isEmpty(String tableName);
+
+ /**
+ * Checks whether the table contains a key.
+ *
+ * @param tableName table name
+ * @param key The key to check.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> containsKey(String tableName, K key);
+
+ /**
+ * Checks whether the table contains a value.
+ *
+ * @param tableName table name
+ * @param value The value to check.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> containsValue(String tableName, V value);
+
+ /**
+ * Gets a value from the table.
+ *
+ * @param tableName table name
+ * @param key The key to get.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Versioned<V>> get(String tableName, K key);
+
+ /**
+ * Puts a value in the table.
+ *
+ * @param tableName table name
+ * @param key The key to set.
+ * @param value The value to set.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Versioned<V>> put(String tableName, K key, V value);
+
+ /**
+ * Removes a value from the table.
+ *
+ * @param tableName table name
+ * @param key The key to remove.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Versioned<V>> remove(String tableName, K key);
+
+ /**
+ * Clears the table.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Void> clear(String tableName);
+
+ /**
+ * Gets a set of keys in the table.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Set<K>> keySet(String tableName);
+
+ /**
+ * Gets a collection of values in the table.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Collection<Versioned<V>>> values(String tableName);
+
+ /**
+ * Gets a set of entries in the table.
+ *
+ * @param tableName table name
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Set<Map.Entry<K, Versioned<V>>>> entrySet(String tableName);
+
+ /**
+ * Puts a value in the table if the given key does not exist.
+ *
+ * @param tableName table name
+ * @param key The key to set.
+ * @param value The value to set if the given key does not exist.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Versioned<V>> putIfAbsent(String tableName, K key, V value);
+
+ /**
+ * Removes a key and if the existing value for that key matches the specified value.
+ *
+ * @param tableName table name
+ * @param key The key to remove.
+ * @param value The value to remove.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> remove(String tableName, K key, V value);
+
+ /**
+ * Removes a key and if the existing version for that key matches the specified version.
+ *
+ * @param tableName table name
+ * @param key The key to remove.
+ * @param version The expected version.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> remove(String tableName, K key, long version);
+
+ /**
+ * Replaces the entry for the specified key only if currently mapped to the specified value.
+ *
+ * @param tableName table name
+ * @param key The key to replace.
+ * @param oldValue The value to replace.
+ * @param newValue The value with which to replace the given key and value.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> replace(String tableName, K key, V oldValue, V newValue);
+
+ /**
+ * Replaces the entry for the specified key only if currently mapped to the specified version.
+ *
+ * @param tableName table name
+ * @param key The key to update
+ * @param oldVersion existing version in the map for this replace to succeed.
+ * @param newValue The value with which to replace the given key and version.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> replace(String tableName, K key, long oldVersion, V newValue);
+
+ /**
+ * Perform a atomic batch update operation i.e. either all operations in batch succeed or
+ * none do and no state changes are made.
+ *
+ * @param updates list of updates to apply atomically.
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<K, V>> updates);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
new file mode 100644
index 0000000..89a51e8
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -0,0 +1,77 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.kuujo.copycat.state.Command;
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.Query;
+import net.kuujo.copycat.state.StateContext;
+
+/**
+ * Database state.
+ *
+ */
+public interface DatabaseState<K, V> {
+
+ /**
+ * Initializes the database state.
+ *
+ * @param context The map state context.
+ */
+ @Initializer
+ public void init(StateContext<DatabaseState<K, V>> context);
+
+ @Query
+ int size(String tableName);
+
+ @Query
+ boolean isEmpty(String tableName);
+
+ @Query
+ boolean containsKey(String tableName, K key);
+
+ @Query
+ boolean containsValue(String tableName, V value);
+
+ @Query
+ Versioned<V> get(String tableName, K key);
+
+ @Command
+ Versioned<V> put(String tableName, K key, V value);
+
+ @Command
+ Versioned<V> remove(String tableName, K key);
+
+ @Command
+ void clear(String tableName);
+
+ @Query
+ Set<K> keySet(String tableName);
+
+ @Query
+ Collection<Versioned<V>> values(String tableName);
+
+ @Query
+ Set<Entry<K, Versioned<V>>> entrySet(String tableName);
+
+ @Command
+ Versioned<V> putIfAbsent(String tableName, K key, V value);
+
+ @Command
+ boolean remove(String tableName, K key, V value);
+
+ @Command
+ boolean remove(String tableName, K key, long version);
+
+ @Command
+ boolean replace(String tableName, K key, V oldValue, V newValue);
+
+ @Command
+ boolean replace(String tableName, K key, long oldVersion, V newValue);
+
+ @Command
+ boolean batchUpdate(List<UpdateOperation<K, V>> updates);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
new file mode 100644
index 0000000..837f3b4
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -0,0 +1,146 @@
+package org.onosproject.store.consistent.impl;
+
+import net.kuujo.copycat.resource.internal.ResourceContext;
+import net.kuujo.copycat.state.StateMachine;
+import net.kuujo.copycat.resource.internal.AbstractResource;
+import net.kuujo.copycat.state.internal.DefaultStateMachine;
+import net.kuujo.copycat.util.concurrent.Futures;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+/**
+ * Default database.
+ */
+public class DefaultDatabase extends AbstractResource<Database> implements Database {
+ private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
+ private DatabaseProxy<String, byte[]> proxy;
+
+ @SuppressWarnings("unchecked")
+ public DefaultDatabase(ResourceContext context) {
+ super(context);
+ this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
+ }
+
+ /**
+ * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
+ * return the completed future result.
+ *
+ * @param supplier The supplier to call if the database is open.
+ * @param <T> The future result type.
+ * @return A completable future that if this database is closed is immediately failed.
+ */
+ protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
+ if (proxy == null) {
+ return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
+ }
+ return supplier.get();
+ }
+
+ @Override
+ public CompletableFuture<Integer> size(String tableName) {
+ return checkOpen(() -> proxy.size(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty(String tableName) {
+ return checkOpen(() -> proxy.isEmpty(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+ return checkOpen(() -> proxy.containsKey(tableName, key));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+ return checkOpen(() -> proxy.containsValue(tableName, value));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+ return checkOpen(() -> proxy.get(tableName, key));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ return checkOpen(() -> proxy.put(tableName, key, value));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+ return checkOpen(() -> proxy.remove(tableName, key));
+ }
+
+ @Override
+ public CompletableFuture<Void> clear(String tableName) {
+ return checkOpen(() -> proxy.clear(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> keySet(String tableName) {
+ return checkOpen(() -> proxy.keySet(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+ return checkOpen(() -> proxy.values(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+ return checkOpen(() -> proxy.entrySet(tableName));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+ return checkOpen(() -> proxy.remove(tableName, key, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+ return checkOpen(() -> proxy.remove(tableName, key, version));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+ return checkOpen(() -> proxy.atomicBatchUpdate(updates));
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public synchronized CompletableFuture<Database> open() {
+ return runStartupTasks()
+ .thenCompose(v -> stateMachine.open())
+ .thenRun(() -> {
+ this.proxy = stateMachine.createProxy(DatabaseProxy.class);
+ })
+ .thenApply(v -> null);
+ }
+
+ @Override
+ public synchronized CompletableFuture<Void> close() {
+ proxy = null;
+ return stateMachine.close()
+ .thenCompose(v -> runShutdownTasks());
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
new file mode 100644
index 0000000..2b20f53
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -0,0 +1,217 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import net.kuujo.copycat.state.Initializer;
+import net.kuujo.copycat.state.StateContext;
+
+/**
+ * Default database state.
+ *
+ * @param <K> key type
+ * @param <V> value type
+ */
+public class DefaultDatabaseState<K, V> implements DatabaseState<K, V> {
+
+ private Long nextVersion;
+ private Map<String, Map<K, Versioned<V>>> tables;
+
+ @Initializer
+ @Override
+ public void init(StateContext<DatabaseState<K, V>> context) {
+ tables = context.get("tables");
+ if (tables == null) {
+ tables = new HashMap<>();
+ context.put("tables", tables);
+ }
+ nextVersion = context.get("nextVersion");
+ if (nextVersion == null) {
+ nextVersion = new Long(0);
+ context.put("nextVersion", nextVersion);
+ }
+ }
+
+ private Map<K, Versioned<V>> getTableMap(String tableName) {
+ Map<K, Versioned<V>> table = tables.get(tableName);
+ if (table == null) {
+ table = new HashMap<>();
+ tables.put(tableName, table);
+ }
+ return table;
+ }
+
+ @Override
+ public int size(String tableName) {
+ return getTableMap(tableName).size();
+ }
+
+ @Override
+ public boolean isEmpty(String tableName) {
+ return getTableMap(tableName).isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(String tableName, K key) {
+ return getTableMap(tableName).containsKey(key);
+ }
+
+ @Override
+ public boolean containsValue(String tableName, V value) {
+ return getTableMap(tableName).values().stream().anyMatch(v -> checkEquality(v.value(), value));
+ }
+
+ @Override
+ public Versioned<V> get(String tableName, K key) {
+ return getTableMap(tableName).get(key);
+ }
+
+ @Override
+ public Versioned<V> put(String tableName, K key, V value) {
+ return getTableMap(tableName).put(key, new Versioned<>(value, ++nextVersion));
+ }
+
+ @Override
+ public Versioned<V> remove(String tableName, K key) {
+ return getTableMap(tableName).remove(key);
+ }
+
+ @Override
+ public void clear(String tableName) {
+ getTableMap(tableName).clear();
+ }
+
+ @Override
+ public Set<K> keySet(String tableName) {
+ return getTableMap(tableName).keySet();
+ }
+
+ @Override
+ public Collection<Versioned<V>> values(String tableName) {
+ return getTableMap(tableName).values();
+ }
+
+ @Override
+ public Set<Entry<K, Versioned<V>>> entrySet(String tableName) {
+ return getTableMap(tableName).entrySet();
+ }
+
+ @Override
+ public Versioned<V> putIfAbsent(String tableName, K key, V value) {
+ Versioned<V> existingValue = getTableMap(tableName).get(key);
+ return existingValue != null ? existingValue : put(tableName, key, value);
+ }
+
+ @Override
+ public boolean remove(String tableName, K key, V value) {
+ Versioned<V> existing = getTableMap(tableName).get(key);
+ if (existing != null && existing.value().equals(value)) {
+ getTableMap(tableName).remove(key);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean remove(String tableName, K key, long version) {
+ Versioned<V> existing = getTableMap(tableName).get(key);
+ if (existing != null && existing.version() == version) {
+ remove(tableName, key);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean replace(String tableName, K key, V oldValue, V newValue) {
+ Versioned<V> existing = getTableMap(tableName).get(key);
+ if (existing != null && existing.value().equals(oldValue)) {
+ put(tableName, key, newValue);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean replace(String tableName, K key, long oldVersion, V newValue) {
+ Versioned<V> existing = getTableMap(tableName).get(key);
+ if (existing != null && existing.version() == oldVersion) {
+ put(tableName, key, newValue);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean batchUpdate(List<UpdateOperation<K, V>> updates) {
+ if (updates.stream().anyMatch(update -> !checkIfUpdateIsPossible(update))) {
+ return false;
+ } else {
+ updates.stream().forEach(this::doUpdate);
+ return true;
+ }
+ }
+
+ private void doUpdate(UpdateOperation<K, V> update) {
+ String tableName = update.tableName();
+ K key = update.key();
+ switch (update.type()) {
+ case PUT:
+ put(tableName, key, update.value());
+ return;
+ case REMOVE:
+ remove(tableName, key);
+ return;
+ case PUT_IF_ABSENT:
+ putIfAbsent(tableName, key, update.value());
+ return;
+ case PUT_IF_VERSION_MATCH:
+ replace(tableName, key, update.currentValue(), update.value());
+ return;
+ case PUT_IF_VALUE_MATCH:
+ replace(tableName, key, update.currentVersion(), update.value());
+ return;
+ case REMOVE_IF_VERSION_MATCH:
+ remove(tableName, key, update.currentVersion());
+ return;
+ case REMOVE_IF_VALUE_MATCH:
+ remove(tableName, key, update.currentValue());
+ return;
+ default:
+ throw new IllegalStateException("Unsupported type: " + update.type());
+ }
+ }
+
+ private boolean checkIfUpdateIsPossible(UpdateOperation<K, V> update) {
+ Versioned<V> existingEntry = get(update.tableName(), update.key());
+ switch (update.type()) {
+ case PUT:
+ case REMOVE:
+ return true;
+ case PUT_IF_ABSENT:
+ return existingEntry == null;
+ case PUT_IF_VERSION_MATCH:
+ return existingEntry != null && existingEntry.version() == update.currentVersion();
+ case PUT_IF_VALUE_MATCH:
+ return existingEntry != null && existingEntry.value().equals(update.currentValue());
+ case REMOVE_IF_VERSION_MATCH:
+ return existingEntry == null || existingEntry.version() == update.currentVersion();
+ case REMOVE_IF_VALUE_MATCH:
+ return existingEntry == null || existingEntry.value().equals(update.currentValue());
+ default:
+ throw new IllegalStateException("Unsupported type: " + update.type());
+ }
+ }
+
+ private boolean checkEquality(V value1, V value2) {
+ if (value1 instanceof byte[]) {
+ return Arrays.equals((byte[]) value1, (byte[]) value2);
+ }
+ return value1.equals(value2);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
new file mode 100644
index 0000000..d35aca2
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -0,0 +1,208 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+
+/**
+ * A database that partitions the keys across one or more database partitions.
+ */
+public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
+
+ private Partitioner<String> partitioner;
+ private final ClusterCoordinator coordinator;
+ private final Map<String, Database> partitions = Maps.newConcurrentMap();
+
+ protected PartitionedDatabase(ClusterCoordinator coordinator) {
+ this.coordinator = coordinator;
+ }
+
+ @Override
+ public void registerPartition(String name, Database partition) {
+ partitions.put(name, partition);
+ }
+
+ @Override
+ public Map<String, Database> getRegisteredPartitions() {
+ return ImmutableMap.copyOf(partitions);
+ }
+
+ @Override
+ public CompletableFuture<Integer> size(String tableName) {
+ AtomicInteger totalSize = new AtomicInteger(0);
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> totalSize.get());
+ }
+
+ @Override
+ public CompletableFuture<Boolean> isEmpty(String tableName) {
+ return size(tableName).thenApply(size -> size == 0);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+ return partitioner.getPartition(tableName, key).containsKey(tableName, key);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+ AtomicBoolean containsValue = new AtomicBoolean(false);
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> containsValue.get());
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+ return partitioner.getPartition(tableName, key).get(tableName, key);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ return partitioner.getPartition(tableName, key).put(tableName, key, value);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+ return partitioner.getPartition(tableName, key).remove(tableName, key);
+ }
+
+ @Override
+ public CompletableFuture<Void> clear(String tableName) {
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.clear(tableName))
+ .toArray(CompletableFuture[]::new));
+ }
+
+ @Override
+ public CompletableFuture<Set<String>> keySet(String tableName) {
+ Set<String> keySet = Sets.newConcurrentHashSet();
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.keySet(tableName).thenApply(keySet::addAll))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> keySet);
+ }
+
+ @Override
+ public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+ List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.values(tableName))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> values);
+ }
+
+ @Override
+ public CompletableFuture<Set<Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+ Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
+ return CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> entrySet);
+ }
+
+ @Override
+ public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ return partitioner.getPartition(tableName, key).putIfAbsent(tableName, key, value);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+ return partitioner.getPartition(tableName, key).remove(tableName, key, value);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+ return partitioner.getPartition(tableName, key).remove(tableName, key, version);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ return partitioner.getPartition(tableName, key).replace(tableName, key, oldValue, newValue);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ return partitioner.getPartition(tableName, key).replace(tableName, key, oldVersion, newValue);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+ Map<Database, List<UpdateOperation<String, byte[]>>> perPartitionUpdates = Maps.newHashMap();
+ for (UpdateOperation<String, byte[]> update : updates) {
+ Database partition = partitioner.getPartition(update.tableName(), update.key());
+ List<UpdateOperation<String, byte[]>> partitionUpdates = perPartitionUpdates.get(partition);
+ if (partitionUpdates == null) {
+ partitionUpdates = Lists.newArrayList();
+ perPartitionUpdates.put(partition, partitionUpdates);
+ }
+ partitionUpdates.add(update);
+ }
+ if (perPartitionUpdates.size() > 1) {
+ // TODO
+ throw new UnsupportedOperationException("Cross partition transactional updates are not supported.");
+ } else {
+ Entry<Database, List<UpdateOperation<String, byte[]>>> only =
+ perPartitionUpdates.entrySet().iterator().next();
+ return only.getKey().atomicBatchUpdate(only.getValue());
+ }
+ }
+
+ @Override
+ public void setPartitioner(Partitioner<String> partitioner) {
+ this.partitioner = partitioner;
+ }
+
+ @Override
+ public CompletableFuture<PartitionedDatabase> open() {
+ return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(Database::open)
+ .collect(Collectors.toList())
+ .toArray(new CompletableFuture[partitions.size()]))
+ .thenApply(v -> this));
+
+ }
+
+ @Override
+ public CompletableFuture<Void> close() {
+ CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
+ .values()
+ .stream()
+ .map(database -> database.close())
+ .collect(Collectors.toList())
+ .toArray(new CompletableFuture[partitions.size()]));
+ CompletableFuture<Void> closeCoordinator = coordinator.close();
+ return closePartitions.thenCompose(v -> closeCoordinator);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
new file mode 100644
index 0000000..6d375cc
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
@@ -0,0 +1,31 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Partitioned database configuration.
+ */
+public class PartitionedDatabaseConfig {
+ private final Map<String, DatabaseConfig> partitions = new HashMap<>();
+
+ /**
+ * Returns the configuration for all partitions.
+ * @return partition map to configuartion mapping.
+ */
+ public Map<String, DatabaseConfig> partitions() {
+ return Collections.unmodifiableMap(partitions);
+ }
+
+ /**
+ * Adds the specified partition name and configuration.
+ * @param name partition name.
+ * @param config partition config
+ * @return this instance
+ */
+ public PartitionedDatabaseConfig withPartition(String name, DatabaseConfig config) {
+ partitions.put(name, config);
+ return this;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
new file mode 100644
index 0000000..44cd3a1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
@@ -0,0 +1,79 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+
+import net.kuujo.copycat.CopycatConfig;
+import net.kuujo.copycat.cluster.ClusterConfig;
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
+import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
+
+public interface PartitionedDatabaseManager {
+ /**
+ * Opens the database.
+ *
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<PartitionedDatabase> open();
+
+ /**
+ * Closes the database.
+ *
+ * @return A completable future to be completed with the result once complete.
+ */
+ CompletableFuture<Void> close();
+
+ /**
+ * Sets the partitioner to use for mapping keys to partitions.
+ *
+ * @param partitioner partitioner
+ */
+ void setPartitioner(Partitioner<String> partitioner);
+
+ /**
+ * Registers a new partition.
+ *
+ * @param partitionName partition name.
+ * @param partition partition.
+ */
+ void registerPartition(String partitionName, Database partition);
+
+ /**
+ * Returns all the registered database partitions.
+ *
+ * @return mapping of all registered database partitions.
+ */
+ Map<String, Database> getRegisteredPartitions();
+
+
+ /**
+ * Creates a new partitioned database.
+ *
+ * @param name The database name.
+ * @param clusterConfig The cluster configuration.
+ * @param partitionedDatabaseConfig The database configuration.
+
+ * @return The database.
+ */
+ public static PartitionedDatabase create(
+ String name,
+ ClusterConfig clusterConfig,
+ PartitionedDatabaseConfig partitionedDatabaseConfig) {
+ CopycatConfig copycatConfig = new CopycatConfig()
+ .withName(name)
+ .withClusterConfig(clusterConfig)
+ .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
+ ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
+ PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
+ partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
+ partitionedDatabase.registerPartition(partitionName ,
+ coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
+ .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
+ .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
+ partitionedDatabase.setPartitioner(
+ new SimpleKeyHashPartitioner<>(partitionedDatabase.getRegisteredPartitions()));
+ return partitionedDatabase;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java
new file mode 100644
index 0000000..df8d56e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Partitioner.java
@@ -0,0 +1,17 @@
+package org.onosproject.store.consistent.impl;
+
+/**
+ * Partitioner is responsible for mapping keys to individual database partitions.
+ *
+ * @param <K> key type.
+ */
+public interface Partitioner<K> {
+
+ /**
+ * Returns the database partition.
+ * @param tableName table name
+ * @param key key
+ * @return Database partition
+ */
+ Database getPartition(String tableName, K key);
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
new file mode 100644
index 0000000..5410f9f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
@@ -0,0 +1,31 @@
+package org.onosproject.store.consistent.impl;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+
+/**
+ * A simple Partitioner that uses the key hashCode to map
+ * key to a partition.
+ *
+ * @param <K> key type.
+ */
+public class SimpleKeyHashPartitioner<K> implements Partitioner<K> {
+
+ private final Map<String, Database> partitionMap;
+ private final List<String> sortedPartitionNames;
+
+ public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) {
+ this.partitionMap = ImmutableMap.copyOf(partitionMap);
+ sortedPartitionNames = Lists.newArrayList(this.partitionMap.keySet());
+ Collections.sort(sortedPartitionNames);
+ }
+
+ @Override
+ public Database getPartition(String tableName, K key) {
+ return partitionMap.get(sortedPartitionNames.get(Math.abs(key.hashCode()) % partitionMap.size()));
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
new file mode 100644
index 0000000..9d52a09
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/UpdateOperation.java
@@ -0,0 +1,151 @@
+package org.onosproject.store.consistent.impl;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Database update operation.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+public class UpdateOperation<K, V> {
+
+ /**
+ * Type of database update operation.
+ */
+ public static enum Type {
+ PUT,
+ PUT_IF_ABSENT,
+ PUT_IF_VERSION_MATCH,
+ PUT_IF_VALUE_MATCH,
+ REMOVE,
+ REMOVE_IF_VERSION_MATCH,
+ REMOVE_IF_VALUE_MATCH,
+ }
+
+ private Type type;
+ private String tableName;
+ private K key;
+ private V value;
+ private V currentValue;
+ private long currentVersion;
+
+ /**
+ * Returns the type of update operation.
+ * @return type of update.
+ */
+ public Type type() {
+ return type;
+ }
+
+ /**
+ * Returns the tableName being updated.
+ * @return table name.
+ */
+ public String tableName() {
+ return tableName;
+ }
+
+ /**
+ * Returns the item key being updated.
+ * @return item key
+ */
+ public K key() {
+ return key;
+ }
+
+ /**
+ * Returns the new value.
+ * @return item's target value.
+ */
+ public V value() {
+ return value;
+ }
+
+ /**
+ * Returns the expected current value in the database value for the key.
+ * @return current value in database.
+ */
+ public V currentValue() {
+ return currentValue;
+ }
+
+ /**
+ * Returns the expected current version in the database for the key.
+ * @return expected version.
+ */
+ public long currentVersion() {
+ return currentVersion;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("type", type)
+ .add("tableName", tableName)
+ .add("key", key)
+ .add("value", value)
+ .add("currentValue", currentValue)
+ .add("currentVersion", currentVersion)
+ .toString();
+ }
+
+ /**
+ * UpdatOperation builder.
+ *
+ * @param <K> key type.
+ * @param <V> value type.
+ */
+ public static final class Builder<K, V> {
+
+ private UpdateOperation<K, V> operation = new UpdateOperation<>();
+
+ /**
+ * Creates a new builder instance.
+ * @param <K> key type.
+ * @param <V> value type.
+ *
+ * @return builder.
+ */
+ public static <K, V> Builder<K, V> builder() {
+ return new Builder<>();
+ }
+
+ private Builder() {
+ }
+
+ public UpdateOperation<K, V> build() {
+ return operation;
+ }
+
+ public Builder<K, V> withType(Type type) {
+ operation.type = type;
+ return this;
+ }
+
+ public Builder<K, V> withTableName(String tableName) {
+ operation.tableName = tableName;
+ return this;
+ }
+
+ public Builder<K, V> withKey(K key) {
+ operation.key = key;
+ return this;
+ }
+
+ public Builder<K, V> withCurrentValue(V value) {
+ operation.currentValue = value;
+ return this;
+ }
+
+ public Builder<K, V> withValue(V value) {
+ operation.value = value;
+ return this;
+ }
+
+ public Builder<K, V> withCurrentVersion(long version) {
+ operation.currentVersion = version;
+ return this;
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
new file mode 100644
index 0000000..6eb908d
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Versioned.java
@@ -0,0 +1,50 @@
+package org.onosproject.store.consistent.impl;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Versioned value.
+ *
+ * @param <V> value type.
+ */
+public class Versioned<V> {
+
+ private final V value;
+ private final long version;
+
+ /**
+ * Constructs a new versioned value.
+ * @param value value
+ * @param version version
+ */
+ public Versioned(V value, long version) {
+ this.value = value;
+ this.version = version;
+ }
+
+ /**
+ * Returns the value.
+ *
+ * @return value.
+ */
+ public V value() {
+ return value;
+ }
+
+ /**
+ * Returns the version.
+ *
+ * @return version
+ */
+ public long version() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("value", value)
+ .add("version", version)
+ .toString();
+ }
+}