Support a inmemory p0 partition encompassing all nodes in the cluster. This will be used by leadership manager and other usecases
that need strong consistency for coordination and not durable storage
Change-Id: I8e590e46d82a3d43cae3157a04be820bb7e1b175
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
index 675d094..91cfa4b 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/Database.java
@@ -81,5 +81,4 @@
.addStartupTask(() -> coordinator.open().thenApply(v -> null))
.addShutdownTask(coordinator::close);
}
-
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
index 3f8b235..bd774b9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseConfig.java
@@ -36,6 +36,8 @@
private static final String DEFAULT_CONFIGURATION = "database-defaults";
private static final String CONFIGURATION = "database";
+ private String name;
+
public DatabaseConfig() {
super(CONFIGURATION, DEFAULT_CONFIGURATION);
}
@@ -114,6 +116,37 @@
return this;
}
+ /**
+ * Returns the database name.
+ *
+ * @return The database name
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Sets the database name, returning the configuration for method chaining.
+ *
+ * @param name The database name
+ * @return The database configuration
+ * @throws java.lang.NullPointerException If the name is {@code null}
+ */
+ public DatabaseConfig withName(String name) {
+ setName(Assert.isNotNull(name, "name"));
+ return this;
+ }
+
+ /**
+ * Sets the database name.
+ *
+ * @param name The database name
+ * @throws java.lang.NullPointerException If the name is {@code null}
+ */
+ public void setName(String name) {
+ this.name = Assert.isNotNull(name, "name");
+ }
+
@Override
public CoordinatedResourceConfig resolve(ClusterConfig cluster) {
return new StateLogConfig(toMap())
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index 4ef317f..eaeecfd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -16,12 +16,23 @@
package org.onosproject.store.consistent.impl;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+
+import net.kuujo.copycat.CopycatConfig;
import net.kuujo.copycat.cluster.ClusterConfig;
import net.kuujo.copycat.cluster.Member;
+import net.kuujo.copycat.cluster.Member.Type;
+import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
+import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
+import net.kuujo.copycat.log.BufferedLog;
import net.kuujo.copycat.log.FileLog;
+import net.kuujo.copycat.log.Log;
import net.kuujo.copycat.netty.NettyTcpProtocol;
import net.kuujo.copycat.protocol.Consistency;
+import net.kuujo.copycat.protocol.Protocol;
+import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -32,11 +43,9 @@
import org.onosproject.store.cluster.impl.NodeInfo;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.ecmap.EventuallyConsistentMapBuilderImpl;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.PartitionInfo;
-import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageAdminService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TransactionContext;
@@ -47,7 +56,9 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -61,13 +72,16 @@
public class DatabaseManager implements StorageService, StorageAdminService {
private final Logger log = getLogger(getClass());
+ private ClusterCoordinator coordinator;
private PartitionedDatabase partitionedDatabase;
+ private Database inMemoryDatabase;
public static final int COPYCAT_TCP_PORT = 7238; // 7238 = RAFT
private static final String CONFIG_DIR = "../config";
private static final String PARTITION_DEFINITION_FILE = "tablets.json";
private static final int DATABASE_STARTUP_TIMEOUT_SEC = 60;
-
- private final PartitionedDatabaseConfig databaseConfig = new PartitionedDatabaseConfig();
+ public static final String BASE_PARTITION_NAME = "p0";
+ private static final int RAFT_ELECTION_TIMEOUT = 3000;
+ private static final int RAFT_HEARTBEAT_TIMEOUT = 1500;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -82,8 +96,6 @@
@Activate
public void activate() {
- final String logDir = System.getProperty("karaf.data", "./data");
-
// load database configuration
File file = new File(CONFIG_DIR, PARTITION_DEFINITION_FILE);
log.info("Loading database definition: {}", file.getAbsolutePath());
@@ -107,47 +119,56 @@
String localNodeUri = nodeToUri(NodeInfo.of(clusterService.getLocalNode()));
ClusterConfig clusterConfig = new ClusterConfig()
- .withProtocol(new NettyTcpProtocol()
- .withSsl(false)
- .withConnectTimeout(60000)
- .withAcceptBacklog(1024)
- .withTrafficClass(-1)
- .withSoLinger(-1)
- .withReceiveBufferSize(32768)
- .withSendBufferSize(8192)
- .withThreads(1))
- .withElectionTimeout(3000)
- .withHeartbeatInterval(1500)
+ .withProtocol(newNettyProtocol())
+ .withElectionTimeout(RAFT_ELECTION_TIMEOUT)
+ .withHeartbeatInterval(RAFT_HEARTBEAT_TIMEOUT)
.withMembers(activeNodeUris)
.withLocalMember(localNodeUri);
- partitionMap.forEach((name, nodes) -> {
- Set<String> replicas = nodes.stream().map(this::nodeToUri).collect(Collectors.toSet());
- DatabaseConfig partitionConfig = new DatabaseConfig()
- .withElectionTimeout(3000)
- .withHeartbeatInterval(1500)
- .withConsistency(Consistency.STRONG)
- .withLog(new FileLog()
- .withDirectory(logDir)
- .withSegmentSize(1073741824) // 1GB
- .withFlushOnWrite(true)
- .withSegmentInterval(Long.MAX_VALUE))
- .withDefaultSerializer(new DatabaseSerializer())
- .withReplicas(replicas);
- databaseConfig.addPartition(name, partitionConfig);
- });
+ CopycatConfig copycatConfig = new CopycatConfig()
+ .withName("onos")
+ .withClusterConfig(clusterConfig)
+ .withDefaultSerializer(new DatabaseSerializer())
+ .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
- partitionedDatabase = PartitionedDatabaseManager.create("onos-store", clusterConfig, databaseConfig);
+ coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
+
+ DatabaseConfig inMemoryDatabaseConfig =
+ newDatabaseConfig(BASE_PARTITION_NAME, newInMemoryLog(), activeNodeUris);
+ inMemoryDatabase = coordinator
+ .getResource(inMemoryDatabaseConfig.getName(), inMemoryDatabaseConfig.resolve(clusterConfig)
+ .withSerializer(copycatConfig.getDefaultSerializer())
+ .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
+
+ List<Database> partitions = partitionMap.entrySet()
+ .stream()
+ .map(entry -> {
+ String[] replicas = entry.getValue().stream().map(this::nodeToUri).toArray(String[]::new);
+ return newDatabaseConfig(entry.getKey(), newPersistentLog(), replicas);
+ })
+ .map(config -> {
+ Database db = coordinator.getResource(config.getName(), config.resolve(clusterConfig)
+ .withSerializer(copycatConfig.getDefaultSerializer())
+ .withDefaultExecutor(copycatConfig.getDefaultExecutor()));
+ return db;
+ })
+ .collect(Collectors.toList());
+
+ partitionedDatabase = new PartitionedDatabase("onos-store", partitions);
CountDownLatch latch = new CountDownLatch(1);
- partitionedDatabase.open().whenComplete((db, error) -> {
- if (error != null) {
- log.warn("Failed to open database.", error);
- } else {
- latch.countDown();
- log.info("Successfully opened database.");
- }
- });
+
+ coordinator.open()
+ .thenCompose(v -> CompletableFuture.allOf(inMemoryDatabase.open(), partitionedDatabase.open())
+ .whenComplete((db, error) -> {
+ if (error != null) {
+ log.warn("Failed to create databases.", error);
+ } else {
+ latch.countDown();
+ log.info("Successfully created databases.");
+ }
+ }));
+
try {
if (!latch.await(DATABASE_STARTUP_TIMEOUT_SEC, TimeUnit.SECONDS)) {
log.warn("Timed out waiting for database to initialize.");
@@ -161,52 +182,87 @@
@Deactivate
public void deactivate() {
- partitionedDatabase.close().whenComplete((result, error) -> {
- if (error != null) {
- log.warn("Failed to cleanly close database.", error);
- } else {
- log.info("Successfully closed database.");
- }
- });
+ CompletableFuture.allOf(inMemoryDatabase.close(), partitionedDatabase.close())
+ .thenCompose(v -> coordinator.close())
+ .whenComplete((result, error) -> {
+ if (error != null) {
+ log.warn("Failed to cleanly close databases.", error);
+ } else {
+ log.info("Successfully closed databases.");
+ }
+ });
log.info("Stopped");
}
@Override
- public <K, V> ConsistentMap<K , V> createConsistentMap(String name, Serializer serializer) {
- return new DefaultConsistentMap<>(name, partitionedDatabase, serializer);
- }
-
- @Override
- public <K, V> AsyncConsistentMap<K , V> createAsyncConsistentMap(String name, Serializer serializer) {
- return new DefaultAsyncConsistentMap<>(name, partitionedDatabase, serializer);
- }
-
- @Override
public TransactionContext createTransactionContext() {
return new DefaultTransactionContext(partitionedDatabase);
}
@Override
public List<PartitionInfo> getPartitionInfo() {
- return partitionedDatabase.getRegisteredPartitions()
- .values()
+ return Lists.asList(
+ inMemoryDatabase,
+ partitionedDatabase.getPartitions().toArray(new Database[]{}))
.stream()
- .map(db -> toPartitionInfo(db, databaseConfig.partitions().get(db.name())))
+ .map(DatabaseManager::toPartitionInfo)
.collect(Collectors.toList());
}
+ private Protocol newNettyProtocol() {
+ return new NettyTcpProtocol()
+ .withSsl(false)
+ .withConnectTimeout(60000)
+ .withAcceptBacklog(1024)
+ .withTrafficClass(-1)
+ .withSoLinger(-1)
+ .withReceiveBufferSize(32768)
+ .withSendBufferSize(8192)
+ .withThreads(1);
+ }
+
+ private Log newPersistentLog() {
+ String logDir = System.getProperty("karaf.data", "./data");
+ return new FileLog()
+ .withDirectory(logDir)
+ .withSegmentSize(1073741824) // 1GB
+ .withFlushOnWrite(true)
+ .withSegmentInterval(Long.MAX_VALUE);
+ }
+
+ private Log newInMemoryLog() {
+ return new BufferedLog()
+ .withFlushOnWrite(false)
+ .withFlushInterval(Long.MAX_VALUE)
+ .withSegmentSize(10485760) // 10MB
+ .withSegmentInterval(Long.MAX_VALUE);
+ }
+
+ private DatabaseConfig newDatabaseConfig(String name, Log log, String[] replicas) {
+ return new DatabaseConfig()
+ .withName(name)
+ .withElectionTimeout(RAFT_ELECTION_TIMEOUT)
+ .withHeartbeatInterval(RAFT_HEARTBEAT_TIMEOUT)
+ .withConsistency(Consistency.STRONG)
+ .withLog(log)
+ .withDefaultSerializer(new DatabaseSerializer())
+ .withReplicas(replicas);
+ }
+
/**
* Maps a Raft Database object to a PartitionInfo object.
*
* @param database database containing input data
* @return PartitionInfo object
*/
- private static PartitionInfo toPartitionInfo(Database database, DatabaseConfig dbConfig) {
+ private static PartitionInfo toPartitionInfo(Database database) {
return new PartitionInfo(database.name(),
database.cluster().term(),
- database.cluster().members().stream()
+ database.cluster().members()
+ .stream()
+ .filter(member -> Type.ACTIVE.equals(member.type()))
.map(Member::uri)
- .filter(uri -> dbConfig.getReplicas().contains(uri))
+ .sorted()
.collect(Collectors.toList()),
database.cluster().leader() != null ?
database.cluster().leader().uri() : null);
@@ -219,4 +275,8 @@
clusterCommunicator);
}
-}
+ @Override
+ public <K, V> ConsistentMapBuilder<K, V> consistentMapBuilder() {
+ return new DefaultConsistentMapBuilder<>(inMemoryDatabase, partitionedDatabase);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
index cd0525e..a76a4e3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabasePartitioner.java
@@ -18,10 +18,9 @@
import static com.google.common.base.Preconditions.checkState;
-import java.util.Map;
-
+import java.util.List;
import com.google.common.base.Charsets;
-import com.google.common.collect.ImmutableSortedMap;
+import com.google.common.collect.ImmutableList;
import com.google.common.hash.Hashing;
/**
@@ -32,11 +31,11 @@
*/
public abstract class DatabasePartitioner implements Partitioner<String> {
// Database partitions sorted by their partition name.
- protected final Database[] sortedPartitions;
+ protected final List<Database> partitions;
- public DatabasePartitioner(Map<String, Database> partitionMap) {
- checkState(partitionMap != null && !partitionMap.isEmpty(), "Partition map cannot be null or empty");
- sortedPartitions = ImmutableSortedMap.<String, Database>copyOf(partitionMap).values().toArray(new Database[]{});
+ public DatabasePartitioner(List<Database> partitions) {
+ checkState(partitions != null && !partitions.isEmpty(), "Partitions cannot be null or empty");
+ this.partitions = ImmutableList.copyOf(partitions);
}
protected int hash(String key) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
index d9876fd..c86a6ea 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultAsyncConsistentMap.java
@@ -45,7 +45,7 @@
public class DefaultAsyncConsistentMap<K, V> implements AsyncConsistentMap<K, V> {
private final String name;
- private final DatabaseProxy<String, byte[]> proxy;
+ private final Database database;
private final Serializer serializer;
private static final String ERROR_NULL_KEY = "Key cannot be null";
@@ -66,39 +66,39 @@
}
public DefaultAsyncConsistentMap(String name,
- DatabaseProxy<String, byte[]> proxy,
+ Database database,
Serializer serializer) {
this.name = checkNotNull(name, "map name cannot be null");
- this.proxy = checkNotNull(proxy, "database proxy cannot be null");
+ this.database = checkNotNull(database, "database cannot be null");
this.serializer = checkNotNull(serializer, "serializer cannot be null");
}
@Override
public CompletableFuture<Integer> size() {
- return proxy.size(name);
+ return database.size(name);
}
@Override
public CompletableFuture<Boolean> isEmpty() {
- return proxy.isEmpty(name);
+ return database.isEmpty(name);
}
@Override
public CompletableFuture<Boolean> containsKey(K key) {
checkNotNull(key, ERROR_NULL_KEY);
- return proxy.containsKey(name, keyCache.getUnchecked(key));
+ return database.containsKey(name, keyCache.getUnchecked(key));
}
@Override
public CompletableFuture<Boolean> containsValue(V value) {
checkNotNull(value, ERROR_NULL_VALUE);
- return proxy.containsValue(name, serializer.encode(value));
+ return database.containsValue(name, serializer.encode(value));
}
@Override
public CompletableFuture<Versioned<V>> get(K key) {
checkNotNull(key, ERROR_NULL_KEY);
- return proxy.get(name, keyCache.getUnchecked(key))
+ return database.get(name, keyCache.getUnchecked(key))
.thenApply(v -> v != null
? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@@ -107,7 +107,7 @@
public CompletableFuture<Versioned<V>> put(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- return proxy.put(name, keyCache.getUnchecked(key), serializer.encode(value))
+ return database.put(name, keyCache.getUnchecked(key), serializer.encode(value))
.thenApply(v -> v != null
? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@@ -115,19 +115,19 @@
@Override
public CompletableFuture<Versioned<V>> remove(K key) {
checkNotNull(key, ERROR_NULL_KEY);
- return proxy.remove(name, keyCache.getUnchecked(key))
+ return database.remove(name, keyCache.getUnchecked(key))
.thenApply(v -> v != null
? new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
}
@Override
public CompletableFuture<Void> clear() {
- return proxy.clear(name);
+ return database.clear(name);
}
@Override
public CompletableFuture<Set<K>> keySet() {
- return proxy.keySet(name)
+ return database.keySet(name)
.thenApply(s -> s
.stream()
.map(this::dK)
@@ -136,7 +136,7 @@
@Override
public CompletableFuture<Collection<Versioned<V>>> values() {
- return proxy.values(name).thenApply(c -> c
+ return database.values(name).thenApply(c -> c
.stream()
.map(v -> new Versioned<V>(serializer.decode(v.value()), v.version(), v.creationTime()))
.collect(Collectors.toList()));
@@ -144,7 +144,7 @@
@Override
public CompletableFuture<Set<Entry<K, Versioned<V>>>> entrySet() {
- return proxy.entrySet(name).thenApply(s -> s
+ return database.entrySet(name).thenApply(s -> s
.stream()
.map(this::fromRawEntry)
.collect(Collectors.toSet()));
@@ -154,7 +154,7 @@
public CompletableFuture<Versioned<V>> putIfAbsent(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- return proxy.putIfAbsent(
+ return database.putIfAbsent(
name, keyCache.getUnchecked(key), serializer.encode(value)).thenApply(v ->
v != null ?
new Versioned<>(serializer.decode(v.value()), v.version(), v.creationTime()) : null);
@@ -164,13 +164,13 @@
public CompletableFuture<Boolean> remove(K key, V value) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(value, ERROR_NULL_VALUE);
- return proxy.remove(name, keyCache.getUnchecked(key), serializer.encode(value));
+ return database.remove(name, keyCache.getUnchecked(key), serializer.encode(value));
}
@Override
public CompletableFuture<Boolean> remove(K key, long version) {
checkNotNull(key, ERROR_NULL_KEY);
- return proxy.remove(name, keyCache.getUnchecked(key), version);
+ return database.remove(name, keyCache.getUnchecked(key), version);
}
@@ -179,14 +179,14 @@
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(newValue, ERROR_NULL_VALUE);
byte[] existing = oldValue != null ? serializer.encode(oldValue) : null;
- return proxy.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue));
+ return database.replace(name, keyCache.getUnchecked(key), existing, serializer.encode(newValue));
}
@Override
public CompletableFuture<Boolean> replace(K key, long oldVersion, V newValue) {
checkNotNull(key, ERROR_NULL_KEY);
checkNotNull(newValue, ERROR_NULL_VALUE);
- return proxy.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue));
+ return database.replace(name, keyCache.getUnchecked(key), oldVersion, serializer.encode(newValue));
}
private Map.Entry<K, Versioned<V>> fromRawEntry(Map.Entry<String, Versioned<byte[]>> e) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
index 123615c..046aafb 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMap.java
@@ -44,9 +44,9 @@
private final AsyncConsistentMap<K, V> asyncMap;
public DefaultConsistentMap(String name,
- DatabaseProxy<String, byte[]> proxy,
+ Database database,
Serializer serializer) {
- asyncMap = new DefaultAsyncConsistentMap<>(name, proxy, serializer);
+ asyncMap = new DefaultAsyncConsistentMap<>(name, database, serializer);
}
@Override
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
new file mode 100644
index 0000000..ba56e59
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultConsistentMapBuilder.java
@@ -0,0 +1,71 @@
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.ConsistentMapBuilder;
+import org.onosproject.store.service.Serializer;
+
+/**
+ * Default Consistent Map builder.
+ *
+ * @param <K> type for map key
+ * @param <V> type for map value
+ */
+public class DefaultConsistentMapBuilder<K, V> implements ConsistentMapBuilder<K, V> {
+
+ private Serializer serializer;
+ private String name;
+ private boolean partitionsEnabled = true;
+ private final Database partitionedDatabase;
+ private final Database inMemoryDatabase;
+
+ public DefaultConsistentMapBuilder(Database inMemoryDatabase, Database partitionedDatabase) {
+ this.inMemoryDatabase = inMemoryDatabase;
+ this.partitionedDatabase = partitionedDatabase;
+ }
+
+ @Override
+ public ConsistentMapBuilder<K, V> withName(String name) {
+ checkArgument(name != null && !name.isEmpty());
+ this.name = name;
+ return this;
+ }
+
+ @Override
+ public ConsistentMapBuilder<K, V> withSerializer(Serializer serializer) {
+ checkArgument(serializer != null);
+ this.serializer = serializer;
+ return this;
+ }
+
+ @Override
+ public ConsistentMapBuilder<K, V> withPartitionsDisabled() {
+ partitionsEnabled = false;
+ return this;
+ }
+
+ private boolean validInputs() {
+ return name != null && serializer != null;
+ }
+
+ @Override
+ public ConsistentMap<K, V> build() {
+ checkState(validInputs());
+ return new DefaultConsistentMap<>(
+ name,
+ partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
+ serializer);
+ }
+
+ @Override
+ public AsyncConsistentMap<K, V> buildAsyncMap() {
+ checkState(validInputs());
+ return new DefaultAsyncConsistentMap<>(
+ name,
+ partitionsEnabled ? partitionedDatabase : inMemoryDatabase,
+ serializer);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 88832e3..9ffd1e8 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -36,130 +36,143 @@
* Default database.
*/
public class DefaultDatabase extends AbstractResource<Database> implements Database {
- private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
- private DatabaseProxy<String, byte[]> proxy;
+ private final StateMachine<DatabaseState<String, byte[]>> stateMachine;
+ private DatabaseProxy<String, byte[]> proxy;
- @SuppressWarnings("unchecked")
- public DefaultDatabase(ResourceContext context) {
- super(context);
- this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
- }
-
- /**
- * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
- * return the completed future result.
- *
- * @param supplier The supplier to call if the database is open.
- * @param <T> The future result type.
- * @return A completable future that if this database is closed is immediately failed.
- */
- protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
- if (proxy == null) {
- return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
+ @SuppressWarnings("unchecked")
+ public DefaultDatabase(ResourceContext context) {
+ super(context);
+ this.stateMachine = new DefaultStateMachine(context, DatabaseState.class, DefaultDatabaseState.class);
}
- return supplier.get();
- }
- @Override
- public CompletableFuture<Integer> size(String tableName) {
- return checkOpen(() -> proxy.size(tableName));
- }
+ /**
+ * If the database is closed, returning a failed CompletableFuture. Otherwise, calls the given supplier to
+ * return the completed future result.
+ *
+ * @param supplier The supplier to call if the database is open.
+ * @param <T> The future result type.
+ * @return A completable future that if this database is closed is immediately failed.
+ */
+ protected <T> CompletableFuture<T> checkOpen(Supplier<CompletableFuture<T>> supplier) {
+ if (proxy == null) {
+ return Futures.exceptionalFuture(new IllegalStateException("Database closed"));
+ }
+ return supplier.get();
+ }
- @Override
- public CompletableFuture<Boolean> isEmpty(String tableName) {
- return checkOpen(() -> proxy.isEmpty(tableName));
- }
+ @Override
+ public CompletableFuture<Integer> size(String tableName) {
+ return checkOpen(() -> proxy.size(tableName));
+ }
- @Override
- public CompletableFuture<Boolean> containsKey(String tableName, String key) {
- return checkOpen(() -> proxy.containsKey(tableName, key));
- }
+ @Override
+ public CompletableFuture<Boolean> isEmpty(String tableName) {
+ return checkOpen(() -> proxy.isEmpty(tableName));
+ }
- @Override
- public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
- return checkOpen(() -> proxy.containsValue(tableName, value));
- }
+ @Override
+ public CompletableFuture<Boolean> containsKey(String tableName, String key) {
+ return checkOpen(() -> proxy.containsKey(tableName, key));
+ }
- @Override
- public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
- return checkOpen(() -> proxy.get(tableName, key));
- }
+ @Override
+ public CompletableFuture<Boolean> containsValue(String tableName, byte[] value) {
+ return checkOpen(() -> proxy.containsValue(tableName, value));
+ }
- @Override
- public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
- return checkOpen(() -> proxy.put(tableName, key, value));
- }
+ @Override
+ public CompletableFuture<Versioned<byte[]>> get(String tableName, String key) {
+ return checkOpen(() -> proxy.get(tableName, key));
+ }
- @Override
- public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
- return checkOpen(() -> proxy.remove(tableName, key));
- }
+ @Override
+ public CompletableFuture<Versioned<byte[]>> put(String tableName, String key, byte[] value) {
+ return checkOpen(() -> proxy.put(tableName, key, value));
+ }
- @Override
- public CompletableFuture<Void> clear(String tableName) {
- return checkOpen(() -> proxy.clear(tableName));
- }
+ @Override
+ public CompletableFuture<Versioned<byte[]>> remove(String tableName, String key) {
+ return checkOpen(() -> proxy.remove(tableName, key));
+ }
- @Override
- public CompletableFuture<Set<String>> keySet(String tableName) {
- return checkOpen(() -> proxy.keySet(tableName));
- }
+ @Override
+ public CompletableFuture<Void> clear(String tableName) {
+ return checkOpen(() -> proxy.clear(tableName));
+ }
- @Override
- public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
- return checkOpen(() -> proxy.values(tableName));
- }
+ @Override
+ public CompletableFuture<Set<String>> keySet(String tableName) {
+ return checkOpen(() -> proxy.keySet(tableName));
+ }
- @Override
- public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
- return checkOpen(() -> proxy.entrySet(tableName));
- }
+ @Override
+ public CompletableFuture<Collection<Versioned<byte[]>>> values(String tableName) {
+ return checkOpen(() -> proxy.values(tableName));
+ }
- @Override
- public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
- return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
- }
+ @Override
+ public CompletableFuture<Set<Map.Entry<String, Versioned<byte[]>>>> entrySet(String tableName) {
+ return checkOpen(() -> proxy.entrySet(tableName));
+ }
- @Override
- public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
- return checkOpen(() -> proxy.remove(tableName, key, value));
- }
+ @Override
+ public CompletableFuture<Versioned<byte[]>> putIfAbsent(String tableName, String key, byte[] value) {
+ return checkOpen(() -> proxy.putIfAbsent(tableName, key, value));
+ }
- @Override
- public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
- return checkOpen(() -> proxy.remove(tableName, key, version));
- }
+ @Override
+ public CompletableFuture<Boolean> remove(String tableName, String key, byte[] value) {
+ return checkOpen(() -> proxy.remove(tableName, key, value));
+ }
- @Override
- public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
- return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
- }
+ @Override
+ public CompletableFuture<Boolean> remove(String tableName, String key, long version) {
+ return checkOpen(() -> proxy.remove(tableName, key, version));
+ }
- @Override
- public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
- return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
- }
+ @Override
+ public CompletableFuture<Boolean> replace(String tableName, String key, byte[] oldValue, byte[] newValue) {
+ return checkOpen(() -> proxy.replace(tableName, key, oldValue, newValue));
+ }
- @Override
- public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
- return checkOpen(() -> proxy.atomicBatchUpdate(updates));
- }
+ @Override
+ public CompletableFuture<Boolean> replace(String tableName, String key, long oldVersion, byte[] newValue) {
+ return checkOpen(() -> proxy.replace(tableName, key, oldVersion, newValue));
+ }
- @Override
- @SuppressWarnings("unchecked")
- public synchronized CompletableFuture<Database> open() {
- return runStartupTasks()
- .thenCompose(v -> stateMachine.open())
- .thenRun(() -> {
- this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
- })
- .thenApply(v -> null);
- }
+ @Override
+ public CompletableFuture<Boolean> atomicBatchUpdate(List<UpdateOperation<String, byte[]>> updates) {
+ return checkOpen(() -> proxy.atomicBatchUpdate(updates));
+ }
- @Override
- public synchronized CompletableFuture<Void> close() {
- proxy = null;
- return stateMachine.close()
- .thenCompose(v -> runShutdownTasks());
- }
-}
+ @Override
+ @SuppressWarnings("unchecked")
+ public synchronized CompletableFuture<Database> open() {
+ return runStartupTasks()
+ .thenCompose(v -> stateMachine.open())
+ .thenRun(() -> {
+ this.proxy = stateMachine.createProxy(DatabaseProxy.class, this.getClass().getClassLoader());
+ })
+ .thenApply(v -> null);
+ }
+
+ @Override
+ public synchronized CompletableFuture<Void> close() {
+ proxy = null;
+ return stateMachine.close()
+ .thenCompose(v -> runShutdownTasks());
+ }
+
+ @Override
+ public int hashCode() {
+ return name().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other instanceof Database) {
+ return name().equals(((Database) other).name());
+ }
+ return false;
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
index 9f8c5bd..d21543a 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultTransactionContext.java
@@ -42,12 +42,12 @@
private final Map<String, DefaultTransactionalMap> txMaps = Maps.newHashMap();
private boolean isOpen = false;
- DatabaseProxy<String, byte[]> databaseProxy;
+ private final Database database;
private static final String TX_NOT_OPEN_ERROR = "Transaction is not open";
private static final int TRANSACTION_TIMEOUT_MILLIS = 2000;
- DefaultTransactionContext(DatabaseProxy<String, byte[]> proxy) {
- this.databaseProxy = proxy;
+ DefaultTransactionContext(Database database) {
+ this.database = checkNotNull(database, "Database must not be null");
}
@Override
@@ -63,7 +63,7 @@
checkNotNull(serializer, "serializer is null");
checkState(isOpen, TX_NOT_OPEN_ERROR);
if (!txMaps.containsKey(mapName)) {
- ConsistentMap<K, V> backingMap = new DefaultConsistentMap<>(mapName, databaseProxy, serializer);
+ ConsistentMap<K, V> backingMap = new DefaultConsistentMap<>(mapName, database, serializer);
DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<>(mapName, backingMap, this, serializer);
txMaps.put(mapName, txMap);
}
@@ -83,7 +83,7 @@
allUpdates.addAll(m.prepareDatabaseUpdates());
});
- if (!complete(databaseProxy.atomicBatchUpdate(allUpdates))) {
+ if (!complete(database.atomicBatchUpdate(allUpdates))) {
throw new TransactionException.OptimisticConcurrencyFailure();
}
} finally {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
index 5e5fa06..571e309 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DistributedLeadershipManager.java
@@ -101,20 +101,21 @@
@Activate
public void activate() {
- lockMap = storageService.createConsistentMap("onos-leader-locks", new Serializer() {
- KryoNamespace kryo = new KryoNamespace.Builder()
- .register(KryoNamespaces.API).build();
+ lockMap = storageService.<String, NodeId>consistentMapBuilder()
+ .withName("onos-leader-locks")
+ .withSerializer(new Serializer() {
+ KryoNamespace kryo = new KryoNamespace.Builder().register(KryoNamespaces.API).build();
+ @Override
+ public <T> byte[] encode(T object) {
+ return kryo.serialize(object);
+ }
- @Override
- public <T> byte[] encode(T object) {
- return kryo.serialize(object);
- }
-
- @Override
- public <T> T decode(byte[] bytes) {
- return kryo.deserialize(bytes);
- }
- });
+ @Override
+ public <T> T decode(byte[] bytes) {
+ return kryo.deserialize(bytes);
+ }
+ })
+ .withPartitionsDisabled().build();
localNodeId = clusterService.getLocalNode().id();
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 24ec92b..cb4ddfc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -25,58 +25,63 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.onosproject.store.service.UpdateOperation;
import org.onosproject.store.service.Versioned;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
-
+import net.kuujo.copycat.Task;
+import net.kuujo.copycat.cluster.Cluster;
import static com.google.common.base.Preconditions.checkState;
/**
* A database that partitions the keys across one or more database partitions.
*/
-public class PartitionedDatabase implements DatabaseProxy<String, byte[]>, PartitionedDatabaseManager {
+public class PartitionedDatabase implements Database {
- private Partitioner<String> partitioner;
- private final ClusterCoordinator coordinator;
- private final Map<String, Database> partitions = Maps.newConcurrentMap();
+ private final String name;
+ private final Partitioner<String> partitioner;
+ private final List<Database> partitions;
private final AtomicBoolean isOpen = new AtomicBoolean(false);
- private static final String DB_NOT_OPEN = "Database is not open";
+ private static final String DB_NOT_OPEN = "Partitioned Database is not open";
- protected PartitionedDatabase(ClusterCoordinator coordinator) {
- this.coordinator = coordinator;
+ public PartitionedDatabase(
+ String name,
+ Collection<Database> partitions) {
+ this.name = name;
+ this.partitions = partitions
+ .stream()
+ .sorted((db1, db2) -> db1.name().compareTo(db2.name()))
+ .collect(Collectors.toList());
+ this.partitioner = new SimpleKeyHashPartitioner(this.partitions);
+ }
+
+ /**
+ * Returns the databases for individual partitions.
+ * @return list of database partitions
+ */
+ public List<Database> getPartitions() {
+ return partitions;
}
/**
* Returns true if the database is open.
* @return true if open, false otherwise
*/
+ @Override
public boolean isOpen() {
return isOpen.get();
}
@Override
- public void registerPartition(String name, Database partition) {
- partitions.put(name, partition);
- }
-
- @Override
- public Map<String, Database> getRegisteredPartitions() {
- return ImmutableMap.copyOf(partitions);
- }
-
- @Override
public CompletableFuture<Integer> size(String tableName) {
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicInteger totalSize = new AtomicInteger(0);
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.size(tableName).thenApply(totalSize::addAndGet))
.toArray(CompletableFuture[]::new))
@@ -100,7 +105,6 @@
checkState(isOpen.get(), DB_NOT_OPEN);
AtomicBoolean containsValue = new AtomicBoolean(false);
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.containsValue(tableName, value).thenApply(v -> containsValue.compareAndSet(false, v)))
.toArray(CompletableFuture[]::new))
@@ -129,7 +133,6 @@
public CompletableFuture<Void> clear(String tableName) {
checkState(isOpen.get(), DB_NOT_OPEN);
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.clear(tableName))
.toArray(CompletableFuture[]::new));
@@ -140,7 +143,6 @@
checkState(isOpen.get(), DB_NOT_OPEN);
Set<String> keySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.keySet(tableName).thenApply(keySet::addAll))
.toArray(CompletableFuture[]::new))
@@ -152,7 +154,6 @@
checkState(isOpen.get(), DB_NOT_OPEN);
List<Versioned<byte[]>> values = new CopyOnWriteArrayList<>();
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.values(tableName).thenApply(values::addAll))
.toArray(CompletableFuture[]::new))
@@ -164,7 +165,6 @@
checkState(isOpen.get(), DB_NOT_OPEN);
Set<Entry<String, Versioned<byte[]>>> entrySet = Sets.newConcurrentHashSet();
return CompletableFuture.allOf(partitions
- .values()
.stream()
.map(p -> p.entrySet(tableName).thenApply(entrySet::addAll))
.toArray(CompletableFuture[]::new))
@@ -225,32 +225,47 @@
}
@Override
- public void setPartitioner(Partitioner<String> partitioner) {
- this.partitioner = partitioner;
- }
-
- @Override
- public CompletableFuture<PartitionedDatabase> open() {
- return coordinator.open().thenCompose(c -> CompletableFuture.allOf(partitions
- .values()
- .stream()
- .map(Database::open)
- .toArray(CompletableFuture[]::new))
- .thenApply(v -> {
- isOpen.set(true);
- return this; }));
-
+ public CompletableFuture<Database> open() {
+ return CompletableFuture.allOf(partitions
+ .stream()
+ .map(Database::open)
+ .toArray(CompletableFuture[]::new))
+ .thenApply(v -> {
+ isOpen.set(true);
+ return this; });
}
@Override
public CompletableFuture<Void> close() {
checkState(isOpen.get(), DB_NOT_OPEN);
- CompletableFuture<Void> closePartitions = CompletableFuture.allOf(partitions
- .values()
+ return CompletableFuture.allOf(partitions
.stream()
.map(database -> database.close())
.toArray(CompletableFuture[]::new));
- CompletableFuture<Void> closeCoordinator = coordinator.close();
- return closePartitions.thenCompose(v -> closeCoordinator);
}
-}
+
+ @Override
+ public boolean isClosed() {
+ return !isOpen.get();
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public Cluster cluster() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Database addStartupTask(Task<CompletableFuture<Void>> task) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Database addShutdownTask(Task<CompletableFuture<Void>> task) {
+ throw new UnsupportedOperationException();
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
deleted file mode 100644
index 3dad8e0..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseConfig.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Partitioned database configuration.
- */
-public class PartitionedDatabaseConfig {
- private final Map<String, DatabaseConfig> partitions = new HashMap<>();
-
- /**
- * Returns the configuration for all partitions.
- * @return partition map to configuartion mapping.
- */
- public Map<String, DatabaseConfig> partitions() {
- return Collections.unmodifiableMap(partitions);
- }
-
- /**
- * Adds the specified partition name and configuration.
- * @param name partition name.
- * @param config partition config
- * @return this instance
- */
- public PartitionedDatabaseConfig addPartition(String name, DatabaseConfig config) {
- partitions.put(name, config);
- return this;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
deleted file mode 100644
index 3ebce09..0000000
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabaseManager.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Copyright 2015 Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.consistent.impl;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executors;
-
-import net.kuujo.copycat.CopycatConfig;
-import net.kuujo.copycat.cluster.ClusterConfig;
-import net.kuujo.copycat.cluster.internal.coordinator.ClusterCoordinator;
-import net.kuujo.copycat.cluster.internal.coordinator.DefaultClusterCoordinator;
-import net.kuujo.copycat.util.concurrent.NamedThreadFactory;
-
-/**
- * Manages a PartitionedDatabase.
- */
-public interface PartitionedDatabaseManager {
- /**
- * Opens the database.
- *
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<PartitionedDatabase> open();
-
- /**
- * Closes the database.
- *
- * @return A completable future to be completed with the result once complete.
- */
- CompletableFuture<Void> close();
-
- /**
- * Sets the partitioner to use for mapping keys to partitions.
- *
- * @param partitioner partitioner
- */
- void setPartitioner(Partitioner<String> partitioner);
-
- /**
- * Registers a new partition.
- *
- * @param partitionName partition name.
- * @param partition partition.
- */
- void registerPartition(String partitionName, Database partition);
-
- /**
- * Returns all the registered database partitions.
- *
- * @return mapping of all registered database partitions.
- */
- Map<String, Database> getRegisteredPartitions();
-
-
- /**
- * Creates a new partitioned database.
- *
- * @param name The database name.
- * @param clusterConfig The cluster configuration.
- * @param partitionedDatabaseConfig The database configuration.
-
- * @return The database.
- */
- public static PartitionedDatabase create(
- String name,
- ClusterConfig clusterConfig,
- PartitionedDatabaseConfig partitionedDatabaseConfig) {
- CopycatConfig copycatConfig = new CopycatConfig()
- .withName(name)
- .withClusterConfig(clusterConfig)
- .withDefaultSerializer(new DatabaseSerializer())
- .withDefaultExecutor(Executors.newSingleThreadExecutor(new NamedThreadFactory("copycat-coordinator-%d")));
- ClusterCoordinator coordinator = new DefaultClusterCoordinator(copycatConfig.resolve());
- PartitionedDatabase partitionedDatabase = new PartitionedDatabase(coordinator);
- partitionedDatabaseConfig.partitions().forEach((partitionName, partitionConfig) ->
- partitionedDatabase.registerPartition(partitionName ,
- coordinator.getResource(partitionName, partitionConfig.resolve(clusterConfig)
- .withSerializer(copycatConfig.getDefaultSerializer())
- .withDefaultExecutor(copycatConfig.getDefaultExecutor()))));
- partitionedDatabase.setPartitioner(
- new SimpleKeyHashPartitioner(partitionedDatabase.getRegisteredPartitions()));
- return partitionedDatabase;
- }
-}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
index 2162f9a..4475bc9 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleKeyHashPartitioner.java
@@ -16,7 +16,7 @@
package org.onosproject.store.consistent.impl;
-import java.util.Map;
+import java.util.List;
/**
* A simple Partitioner for mapping keys to database partitions.
@@ -27,12 +27,12 @@
*/
public class SimpleKeyHashPartitioner extends DatabasePartitioner {
- public SimpleKeyHashPartitioner(Map<String, Database> partitionMap) {
- super(partitionMap);
+ public SimpleKeyHashPartitioner(List<Database> partitions) {
+ super(partitions);
}
@Override
public Database getPartition(String tableName, String key) {
- return sortedPartitions[hash(key) % sortedPartitions.length];
+ return partitions.get(hash(key) % partitions.size());
}
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
index 1adb921..adc5477 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/SimpleTableHashPartitioner.java
@@ -16,7 +16,7 @@
package org.onosproject.store.consistent.impl;
-import java.util.Map;
+import java.util.List;
/**
* A simple Partitioner that uses the table name hash to
@@ -28,12 +28,12 @@
*/
public class SimpleTableHashPartitioner extends DatabasePartitioner {
- public SimpleTableHashPartitioner(Map<String, Database> partitionMap) {
- super(partitionMap);
+ public SimpleTableHashPartitioner(List<Database> partitions) {
+ super(partitions);
}
@Override
public Database getPartition(String tableName, String key) {
- return sortedPartitions[hash(tableName) % sortedPartitions.length];
+ return partitions.get(hash(tableName) % partitions.size());
}
}
\ No newline at end of file