Support a inmemory p0 partition encompassing all nodes in the cluster. This will be used by leadership manager and other usecases
that need strong consistency for coordination and not durable storage
Change-Id: I8e590e46d82a3d43cae3157a04be820bb7e1b175
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 24ec92b..cb4ddfc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -25,58 +25,63 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Versioned;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
-
+import net.kuujo.copycat.Task;
+import net.kuujo.copycat.cluster.Cluster;
import static com.google.common.base.Preconditions.checkState;
/**
* A database that partitions the keys across one or more database partitions.
*/
-public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
+public class PartitionedDatabase implements Database {
- private Partitioner<String> partitioner;
- private final ClusterCoordinator coordinator;
- private final Map<String, Database> partitions = Maps.newConcurrentMap();
+ private final String name;
+ private final Partitioner<String> partitioner;
+ private final List<Database> partitions;
private final AtomicBoolean isOpen = new AtomicBoolean(false);
- private static final String DB_NOT_OPEN = "Database is not open";
+ private static final String DB_NOT_OPEN = "Partitioned Database is not open";
- protected PartitionedDatabase(ClusterCoordinator coordinator) {
- this.coordinator = coordinator;
+ public PartitionedDatabase(
+ String name,
+ Collection<Database> partitions) {
+ this.name = name;
+ this.partitions = partitions
+ .stream()
+ .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
+ .collect(Collectors.toList());
+ this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
+ }
+
+ /**
+ * Returns the databases for individual partitions.
+ * @return list of database partitions
+ */
+ public List<Database> getPartitions() {
+ return partitions;
}
/**
* Returns true if the database is open.
* @return true if open, false otherwise
*/
+ @Override
public boolean isOpen() {
return isOpen.get();
}
@Override
- public void registerPartition(String name, Database partition) {
- partitions.put(name, partition);
- }
-
- @Override
- public Map<String, Database> getRegisteredPartitions() {
- return ImmutableMap.copyOf(partitions);
- }
-
- @Override
public CompletableFuture<Integer> size(String tableName) {
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicInteger totalSize = new AtomicInteger(0);
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
.toArray(CompletableFuture[]::new))
@@ -100,7 +105,6 @@
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicBoolean containsValue = new AtomicBoolean(false);
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
.toArray(CompletableFuture[]::new))
@@ -129,7 +133,6 @@
public CompletableFuture<Void> clear(String tableName) {
checkState(isOpen.get(), DB_NOT_OPEN);
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.clear(tableName))
.toArray(CompletableFuture[]::new));
@@ -140,7 +143,6 @@
checkState(isOpen.get(), DB_NOT_OPEN);
Set<String> keySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.keySet(tableName).thenApply(keySet::addAll))
.toArray(CompletableFuture[]::new))
@@ -152,7 +154,6 @@
checkState(isOpen.get(), DB_NOT_OPEN);
List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.values(tableName).thenApply(values::addAll))
.toArray(CompletableFuture[]::new))
@@ -164,7 +165,6 @@
checkState(isOpen.get(), DB_NOT_OPEN);
Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
.toArray(CompletableFuture[]::new))
@@ -225,32 +225,47 @@
}
@Override
- public void setPartitioner(Partitioner<String> partitioner) {
- this.partitioner = partitioner;
- }
-
- @Override
- public CompletableFuture<PartitionedDatabase> open() {
- return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
- .values()
- .stream()
- .map(Database::open)
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> {
- isOpen.set(true);
- return this; }));
-
+ public CompletableFuture<Database> open() {
+ return CompletableFuture.allOf(partitions
+ .stream()
+ .map(Database::open)
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> {
+ isOpen.set(true);
+ return this; });
}
@Override
public CompletableFuture<Void> close() {
checkState(isOpen.get(), DB_NOT_OPEN);
- CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
- .values()
+ return CompletableFuture.allOf(partitions
.stream()
.map(database -> database.close())
.toArray(CompletableFuture[]::new));
- CompletableFuture<Void> closeCoordinator = coordinator.close();
- return closePartitions.thenCompose(v -> closeCoordinator);
}
-}
+
+ @Override
+ public boolean isClosed() {
+ return !isOpen.get();
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Cluster cluster() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Database addStartupTask(Task<CompletableFuture<Void>> task) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file