Support for a distributed queue primitive.
Change-Id: I13abb93ec1703105ff0137e137738483a5b6a143
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
index a222627..c7c98c5 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseManager.java
@@ -16,6 +16,7 @@
package org.onosproject.store.consistent.impl;
+import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -46,6 +47,7 @@
import static org.onlab.util.Tools.groupedThreads;
import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.IdGenerator;
import org.onosproject.store.cluster.impl.ClusterDefinitionManager;
import org.onosproject.store.cluster.impl.NodeInfo;
@@ -55,6 +57,7 @@
import org.onosproject.store.service.AtomicCounterBuilder;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.ConsistentMapException;
+import org.onosproject.store.service.DistributedQueueBuilder;
import org.onosproject.store.service.EventuallyConsistentMapBuilder;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapInfo;
@@ -98,16 +101,21 @@
private static final int RAFT_ELECTION_TIMEOUT_MILLIS = 3000;
private static final int DATABASE_OPERATION_TIMEOUT_MILLIS = 5000;
+ protected static final MessageSubject QUEUE_UPDATED_TOPIC = new MessageSubject("distributed-queue-updated");
+
private ClusterCoordinator coordinator;
protected PartitionedDatabase partitionedDatabase;
protected Database inMemoryDatabase;
+ protected NodeId localNodeId;
private TransactionManager transactionManager;
private final IdGenerator transactionIdGenerator = () -> RandomUtils.nextLong();
private ExecutorService eventDispatcher;
+ private ExecutorService queuePollExecutor;
- private final Set<DefaultAsyncConsistentMap> maps = Sets.newCopyOnWriteArraySet();
+ private final Map<String, DefaultAsyncConsistentMap> maps = Maps.newConcurrentMap();
+ private final Map<String, DefaultDistributedQueue> queues = Maps.newConcurrentMap();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClusterService clusterService;
@@ -121,6 +129,7 @@
@Activate
public void activate() {
+ localNodeId = clusterService.getLocalNode().id();
// load database configuration
File databaseDefFile = new File(PARTITION_DEFINITION_FILE);
log.info("Loading database definition: {}", databaseDefFile.getAbsolutePath());
@@ -201,6 +210,19 @@
eventDispatcher = Executors.newSingleThreadExecutor(
groupedThreads("onos/store/manager", "map-event-dispatcher"));
+
+ queuePollExecutor = Executors.newFixedThreadPool(4,
+ groupedThreads("onos/store/manager", "queue-poll-handler"));
+
+ clusterCommunicator.<String>addSubscriber(QUEUE_UPDATED_TOPIC,
+ data -> new String(data, Charsets.UTF_8),
+ name -> {
+ DefaultDistributedQueue q = queues.get(name);
+ if (q != null) {
+ q.tryPoll();
+ }
+ },
+ queuePollExecutor);
log.info("Started");
}
@@ -226,8 +248,10 @@
log.info("Successfully closed databases.");
}
});
- maps.forEach(map -> clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name())));
+ clusterCommunicator.removeSubscriber(QUEUE_UPDATED_TOPIC);
+ maps.values().forEach(this::unregisterMap);
eventDispatcher.shutdown();
+ queuePollExecutor.shutdown();
log.info("Stopped");
}
@@ -318,6 +342,12 @@
return new DefaultDistributedSetBuilder<>(this);
}
+
+ @Override
+ public <E> DistributedQueueBuilder<E> queueBuilder() {
+ return new DefaultDistributedQueueBuilder<>(this);
+ }
+
@Override
public AtomicCounterBuilder atomicCounterBuilder() {
return new DefaultAtomicCounterBuilder(inMemoryDatabase, partitionedDatabase);
@@ -386,8 +416,8 @@
}
protected <K, V> void registerMap(DefaultAsyncConsistentMap<K, V> map) {
- // TODO: Support different local instances of the same map.
- if (!maps.add(map)) {
+ // TODO: Support multiple local instances of the same map.
+ if (maps.putIfAbsent(map.name(), map) != null) {
throw new IllegalStateException("Map by name " + map.name() + " already exists");
}
@@ -397,6 +427,19 @@
eventDispatcher);
}
+ protected <K, V> void unregisterMap(DefaultAsyncConsistentMap<K, V> map) {
+ if (maps.remove(map.name()) != null) {
+ clusterCommunicator.removeSubscriber(mapUpdatesSubject(map.name()));
+ }
+ }
+
+ protected <E> void registerQueue(DefaultDistributedQueue<E> queue) {
+ // TODO: Support multiple local instances of the same queue.
+ if (queues.putIfAbsent(queue.name(), queue) != null) {
+ throw new IllegalStateException("Queue by name " + queue.name() + " already exists");
+ }
+ }
+
protected static MessageSubject mapUpdatesSubject(String mapName) {
return new MessageSubject(mapName + "-map-updates");
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
index 815b142..518b526 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseProxy.java
@@ -21,6 +21,7 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -249,6 +250,36 @@
CompletableFuture<Long> counterGet(String counterName);
/**
+ * Returns the size of queue.
+ * @param queueName queue name
+ * @return queue size
+ */
+ CompletableFuture<Long> queueSize(String queueName);
+
+ /**
+ * Inserts an entry into the queue.
+ * @param queueName queue name
+ * @param entry queue entry
+ * @return set of nodes to notify about the queue update
+ */
+ CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry);
+
+ /**
+ * Removes an entry from the queue if the queue is non-empty.
+ * @param queueName queue name
+ * @param nodeId If the queue is empty the identifier of node to notify when an entry becomes available
+ * @return entry. Can be null if queue is empty
+ */
+ CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId);
+
+ /**
+ * Returns but does not remove an entry from the queue.
+ * @param queueName queue name
+ * @return entry. Can be null if queue is empty
+ */
+ CompletableFuture<byte[]> queuePeek(String queueName);
+
+ /**
* Prepare and commit the specified transaction.
*
* @param transaction transaction to commit (after preparation)
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
index 9b6a322..db28826 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseSerializer.java
@@ -21,6 +21,7 @@
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.serializers.KryoSerializer;
import org.onosproject.store.service.DatabaseUpdate;
@@ -78,6 +79,7 @@
.register(Result.Status.class)
.register(DefaultTransaction.class)
.register(Transaction.State.class)
+ .register(NodeId.class)
.build();
private static final KryoSerializer SERIALIZER = new KryoSerializer() {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
index 73eacdd..0af7f42 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DatabaseState.java
@@ -21,6 +21,7 @@
import java.util.Map.Entry;
import java.util.Set;
+import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -113,6 +114,18 @@
Long counterGetAndAdd(String counterName, long delta);
@Query
+ Long queueSize(String queueName);
+
+ @Query
+ byte[] queuePeek(String queueName);
+
+ @Command
+ byte[] queuePop(String queueName, NodeId requestor);
+
+ @Command
+ Set<NodeId> queuePush(String queueName, byte[] entry);
+
+ @Query
Long counterGet(String counterName);
@Command
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
index 52a1b20..5d0ca83 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabase.java
@@ -28,6 +28,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
+import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -187,6 +188,26 @@
}
@Override
+ public CompletableFuture<Long> queueSize(String queueName) {
+ return checkOpen(() -> proxy.queueSize(queueName));
+ }
+
+ @Override
+ public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
+ return checkOpen(() -> proxy.queuePush(queueName, entry));
+ }
+
+ @Override
+ public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
+ return checkOpen(() -> proxy.queuePop(queueName, nodeId));
+ }
+
+ @Override
+ public CompletableFuture<byte[]> queuePeek(String queueName) {
+ return checkOpen(() -> proxy.queuePeek(queueName));
+ }
+
+ @Override
public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
return checkOpen(() -> proxy.prepareAndCommit(transaction));
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
index 7edeb44..f1bba25 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDatabaseState.java
@@ -19,13 +19,16 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Queue;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -46,6 +49,8 @@
private Long nextVersion;
private Map<String, AtomicLong> counters;
private Map<String, Map<String, Versioned<byte[]>>> tables;
+ private Map<String, Queue<byte[]>> queues;
+ private Map<String, Set<NodeId>> queueUpdateNotificationTargets;
/**
* This locks map has a structure similar to the "tables" map above and
@@ -77,6 +82,16 @@
locks = Maps.newConcurrentMap();
context.put("locks", locks);
}
+ queues = context.get("queues");
+ if (queues == null) {
+ queues = Maps.newConcurrentMap();
+ context.put("queues", queues);
+ }
+ queueUpdateNotificationTargets = context.get("queueUpdateNotificationTargets");
+ if (queueUpdateNotificationTargets == null) {
+ queueUpdateNotificationTargets = Maps.newConcurrentMap();
+ context.put("queueUpdateNotificationTargets", queueUpdateNotificationTargets);
+ }
nextVersion = context.get("nextVersion");
if (nextVersion == null) {
nextVersion = new Long(0);
@@ -288,6 +303,36 @@
}
@Override
+ public Long queueSize(String queueName) {
+ return Long.valueOf(getQueue(queueName).size());
+ }
+
+ @Override
+ public byte[] queuePeek(String queueName) {
+ Queue<byte[]> queue = getQueue(queueName);
+ return queue.peek();
+ }
+
+ @Override
+ public byte[] queuePop(String queueName, NodeId requestor) {
+ Queue<byte[]> queue = getQueue(queueName);
+ if (queue.size() == 0 && requestor != null) {
+ getQueueUpdateNotificationTargets(queueName).add(requestor);
+ return null;
+ } else {
+ return queue.remove();
+ }
+ }
+
+ @Override
+ public Set<NodeId> queuePush(String queueName, byte[] entry) {
+ getQueue(queueName).add(entry);
+ Set<NodeId> notifyList = ImmutableSet.copyOf(getQueueUpdateNotificationTargets(queueName));
+ getQueueUpdateNotificationTargets(queueName).clear();
+ return notifyList;
+ }
+
+ @Override
public boolean prepareAndCommit(Transaction transaction) {
if (prepare(transaction)) {
return commit(transaction);
@@ -335,6 +380,14 @@
return counters.computeIfAbsent(counterName, name -> new AtomicLong(0));
}
+ private Queue<byte[]> getQueue(String queueName) {
+ return queues.computeIfAbsent(queueName, name -> new LinkedList<>());
+ }
+
+ private Set<NodeId> getQueueUpdateNotificationTargets(String queueName) {
+ return queueUpdateNotificationTargets.computeIfAbsent(queueName, name -> new HashSet<>());
+ }
+
private boolean isUpdatePossible(DatabaseUpdate update) {
Versioned<byte[]> existingEntry = get(update.tableName(), update.key());
switch (update.type()) {
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
new file mode 100644
index 0000000..0bcbdc4
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueue.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.Serializer;
+
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+
+/**
+ * DistributedQueue implementation that provides FIFO ordering semantics.
+ *
+ * @param <E> queue entry type
+ */
+public class DefaultDistributedQueue<E> implements DistributedQueue<E> {
+
+ private final String name;
+ private final Database database;
+ private final Serializer serializer;
+ private final NodeId localNodeId;
+ private final Set<CompletableFuture<E>> pendingFutures = Sets.newIdentityHashSet();
+ private final Consumer<Set<NodeId>> notifyConsumers;
+
+ private static final String ERROR_NULL_ENTRY = "Null entries are not allowed";
+
+ public DefaultDistributedQueue(String name,
+ Database database,
+ Serializer serializer,
+ NodeId localNodeId,
+ Consumer<Set<NodeId>> notifyConsumers) {
+ this.name = checkNotNull(name, "queue name cannot be null");
+ this.database = checkNotNull(database, "database cannot be null");
+ this.serializer = checkNotNull(serializer, "serializer cannot be null");
+ this.localNodeId = localNodeId;
+ this.notifyConsumers = notifyConsumers;
+ }
+
+ @Override
+ public long size() {
+ return Futures.getUnchecked(database.queueSize(name));
+ }
+
+ @Override
+ public void push(E entry) {
+ checkNotNull(entry, ERROR_NULL_ENTRY);
+ Futures.getUnchecked(database.queuePush(name, serializer.encode(entry))
+ .thenAccept(notifyConsumers)
+ .thenApply(v -> null));
+ }
+
+ @Override
+ public CompletableFuture<E> pop() {
+ return database.queuePop(name, localNodeId)
+ .thenCompose(v -> {
+ if (v != null) {
+ return CompletableFuture.completedFuture(serializer.decode(v));
+ } else {
+ CompletableFuture<E> newPendingFuture = new CompletableFuture<>();
+ pendingFutures.add(newPendingFuture);
+ return newPendingFuture;
+ }
+ });
+ }
+
+ @Override
+ public E peek() {
+ return Futures.getUnchecked(database.queuePeek(name)
+ .thenApply(v -> v != null ? serializer.decode(v) : null));
+ }
+
+ public String name() {
+ return name;
+ }
+
+ protected void tryPoll() {
+ Set<CompletableFuture<E>> completedFutures = Sets.newHashSet();
+ for (CompletableFuture<E> future : pendingFutures) {
+ E entry = Futures.getUnchecked(database.queuePop(name, localNodeId)
+ .thenApply(v -> v != null ? serializer.decode(v) : null));
+ if (entry != null) {
+ future.complete(entry);
+ completedFutures.add(future);
+ } else {
+ break;
+ }
+ }
+ pendingFutures.removeAll(completedFutures);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
new file mode 100644
index 0000000..6095fab
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/DefaultDistributedQueueBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.consistent.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Set;
+import java.util.function.Consumer;
+
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.service.DistributedQueue;
+import org.onosproject.store.service.DistributedQueueBuilder;
+import org.onosproject.store.service.Serializer;
+
+import com.google.common.base.Charsets;
+
+/**
+ * Default implementation of a {@code DistributedQueueBuilder}.
+ *
+ * @param <E> queue entry type
+ */
+public class DefaultDistributedQueueBuilder<E> implements DistributedQueueBuilder<E> {
+
+ private Serializer serializer;
+ private String name;
+ private boolean persistenceEnabled = true;
+ private final DatabaseManager databaseManager;
+
+ public DefaultDistributedQueueBuilder(
+ DatabaseManager databaseManager) {
+ this.databaseManager = databaseManager;
+ }
+
+ @Override
+ public DistributedQueueBuilder<E> withName(String name) {
+ checkArgument(name != null && !name.isEmpty());
+ this.name = name;
+ return this;
+ }
+
+ @Override
+ public DistributedQueueBuilder<E> withSerializer(Serializer serializer) {
+ checkArgument(serializer != null);
+ this.serializer = serializer;
+ return this;
+ }
+
+ @Override
+ public DistributedQueueBuilder<E> withPersistenceDisabled() {
+ persistenceEnabled = false;
+ return this;
+ }
+
+ private boolean validInputs() {
+ return name != null && serializer != null;
+ }
+
+ @Override
+ public DistributedQueue<E> build() {
+ checkState(validInputs());
+ Consumer<Set<NodeId>> notifyOthers = nodes -> databaseManager.clusterCommunicator.multicast(name,
+ DatabaseManager.QUEUE_UPDATED_TOPIC,
+ s -> s.getBytes(Charsets.UTF_8),
+ nodes);
+ DefaultDistributedQueue<E> queue = new DefaultDistributedQueue<>(
+ name,
+ persistenceEnabled ? databaseManager.partitionedDatabase : databaseManager.inMemoryDatabase,
+ serializer,
+ databaseManager.localNodeId,
+ notifyOthers);
+ databaseManager.registerQueue(queue);
+ return queue;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
index 7b279dc..083ca86 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/consistent/impl/PartitionedDatabase.java
@@ -27,6 +27,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import org.onosproject.cluster.NodeId;
import org.onosproject.store.service.DatabaseUpdate;
import org.onosproject.store.service.Transaction;
import org.onosproject.store.service.Versioned;
@@ -276,6 +277,31 @@
return partitioner.getPartition(counterName, counterName).counterGetAndAdd(counterName, delta);
}
+
+ @Override
+ public CompletableFuture<Long> queueSize(String queueName) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(queueName, queueName).queueSize(queueName);
+ }
+
+ @Override
+ public CompletableFuture<Set<NodeId>> queuePush(String queueName, byte[] entry) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(queueName, queueName).queuePush(queueName, entry);
+ }
+
+ @Override
+ public CompletableFuture<byte[]> queuePop(String queueName, NodeId nodeId) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(queueName, queueName).queuePop(queueName, nodeId);
+ }
+
+ @Override
+ public CompletableFuture<byte[]> queuePeek(String queueName) {
+ checkState(isOpen.get(), DB_NOT_OPEN);
+ return partitioner.getPartition(queueName, queueName).queuePeek(queueName);
+ }
+
@Override
public CompletableFuture<Boolean> prepareAndCommit(Transaction transaction) {
Map<Database, Transaction> subTransactions = createSubTransactions(transaction);