blob: 81f00ded873325b663dc0a6067055b707b67d27c [file] [log] [blame]
package org.onosproject.store.consistent.impl;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import net.kuujo.copycat.CopycatConfig;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
/**
* Manages a PartitionedDatabase.
*/
public interface PartitionedDatabaseManager {
/**
* Opens the database.
*
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<PartitionedDatabase> open();
/**
* Closes the database.
*
* @return A completable future to be completed with the result once complete.
*/
CompletableFuture<Void> close();
/**
* Sets the partitioner to use for mapping keys to partitions.
*
* @param partitioner partitioner
*/
void setPartitioner(Partitioner<String> partitioner);
/**
* Registers a new partition.
*
* @param partitionName partition name.
* @param partition partition.
*/
void registerPartition(String partitionName, Database partition);
/**
* Returns all the registered database partitions.
*
* @return mapping of all registered database partitions.
*/
Map<String, Database> getRegisteredPartitions();
/**
* Creates a new partitioned database.
*
* @param name The database name.
* @param clusterConfig The cluster configuration.
* @param partitionedDatabaseConfig The database configuration.
* @return The database.
*/
public static PartitionedDatabase create(
String name,
ClusterConfig clusterConfig,
PartitionedDatabaseConfig partitionedDatabaseConfig) {
CopycatConfig copycatConfig = new CopycatConfig()
.withName(name)
.withClusterConfig(clusterConfig)
.withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
partitionedDatabase.registerPartition(partitionName ,
coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
.withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
.withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
partitionedDatabase.setPartitioner(
new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));
return partitionedDatabase;
}
}