blob: 44cd3a19acec09654c5083ce251f0752ff506d4c [file] [log] [blame]
Madan Jampani94c23532015-02-05 17:40:01 -08001package org.onosproject.store.consistent.impl;
2
3import java.util.Map;
4import java.util.concurrent.CompletableFuture;
5import java.util.concurrent.Executors;
6
7import net.kuujo.copycat.CopycatConfig;
8import net.kuujo.copycat.cluster.ClusterConfig;
9import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
10import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
11import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
12
13public interface PartitionedDatabaseManager {
14 /**
15 * Opens the database.
16 *
17 * @return A completable future to be completed with the result once complete.
18 */
19 CompletableFuture<PartitionedDatabase> open();
20
21 /**
22 * Closes the database.
23 *
24 * @return A completable future to be completed with the result once complete.
25 */
26 CompletableFuture<Void> close();
27
28 /**
29 * Sets the partitioner to use for mapping keys to partitions.
30 *
31 * @param partitioner partitioner
32 */
33 void setPartitioner(Partitioner<String> partitioner);
34
35 /**
36 * Registers a new partition.
37 *
38 * @param partitionName partition name.
39 * @param partition partition.
40 */
41 void registerPartition(String partitionName, Database partition);
42
43 /**
44 * Returns all the registered database partitions.
45 *
46 * @return mapping of all registered database partitions.
47 */
48 Map<String, Database> getRegisteredPartitions();
49
50
51 /**
52 * Creates a new partitioned database.
53 *
54 * @param name The database name.
55 * @param clusterConfig The cluster configuration.
56 * @param partitionedDatabaseConfig The database configuration.
57
58 * @return The database.
59 */
60 public static PartitionedDatabase create(
61 String name,
62 ClusterConfig clusterConfig,
63 PartitionedDatabaseConfig partitionedDatabaseConfig) {
64 CopycatConfig copycatConfig = new CopycatConfig()
65 .withName(name)
66 .withClusterConfig(clusterConfig)
67 .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
68 ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
69 PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
70 partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
71 partitionedDatabase.registerPartition(partitionName ,
72 coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
73 .withDefaultSerializer(copycatConfig.getDefaultSerializer().copy())
74 .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
75 partitionedDatabase.setPartitioner(
76 new SimpleKeyHashPartitioner<>(partitionedDatabase.getRegisteredPartitions()));
77 return partitionedDatabase;
78 }
79}