[ONOS-6342] Refactor transaction architecture to support a shared cache for transactional primitives
Change-Id: I2a17965100895f5aa4d2202028047bb980c11d26
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
index 645215c..d676ab1 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/CatalystSerializers.java
@@ -50,7 +50,7 @@
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTreeEvent;
import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Task;
import org.onosproject.store.service.Versioned;
import org.onosproject.store.service.WorkQueueStats;
@@ -97,7 +97,7 @@
serializer.register(TransactionId.class, factory);
serializer.register(MapUpdate.class, factory);
serializer.register(MapUpdate.Type.class, factory);
- serializer.register(MapTransaction.class, factory);
+ serializer.register(TransactionLog.class, factory);
serializer.register(Versioned.class, factory);
serializer.register(MapEvent.class, factory);
serializer.register(Task.class, factory);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index fccf48e..2f3fe11 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -15,11 +15,9 @@
*/
package org.onosproject.store.primitives.impl;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.CommitStatus;
import org.onosproject.store.service.Serializer;
@@ -27,7 +25,7 @@
import org.onosproject.store.service.TransactionalMap;
import org.onosproject.utils.MeteringAgent;
-import com.google.common.collect.Sets;
+import static com.google.common.base.MoreObjects.toStringHelper;
/**
* Default implementation of transaction context.
@@ -35,17 +33,12 @@
public class DefaultTransactionContext implements TransactionContext {
private final AtomicBoolean isOpen = new AtomicBoolean(false);
- private final DistributedPrimitiveCreator creator;
private final TransactionId transactionId;
private final TransactionCoordinator transactionCoordinator;
- private final Set<TransactionParticipant> txParticipants = Sets.newConcurrentHashSet();
private final MeteringAgent monitor;
- public DefaultTransactionContext(TransactionId transactionId,
- DistributedPrimitiveCreator creator,
- TransactionCoordinator transactionCoordinator) {
+ public DefaultTransactionContext(TransactionId transactionId, TransactionCoordinator transactionCoordinator) {
this.transactionId = transactionId;
- this.creator = creator;
this.transactionCoordinator = transactionCoordinator;
this.monitor = new MeteringAgent("transactionContext", "*", true);
}
@@ -75,8 +68,7 @@
@Override
public CompletableFuture<CommitStatus> commit() {
final MeteringAgent.Context timer = monitor.startTimer("commit");
- return transactionCoordinator.commit(transactionId, txParticipants)
- .whenComplete((r, e) -> timer.stop(e));
+ return transactionCoordinator.commit().whenComplete((r, e) -> timer.stop(e));
}
@Override
@@ -85,14 +77,14 @@
}
@Override
- public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName,
- Serializer serializer) {
- // FIXME: Do not create duplicates.
- DefaultTransactionalMap<K, V> txMap = new DefaultTransactionalMap<K, V>(mapName,
- DistributedPrimitives.newMeteredMap(creator.<K, V>newAsyncConsistentMap(mapName, serializer)),
- this,
- serializer);
- txParticipants.add(txMap);
- return txMap;
+ public <K, V> TransactionalMap<K, V> getTransactionalMap(String mapName, Serializer serializer) {
+ return transactionCoordinator.getTransactionalMap(mapName, serializer);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("transactionId", transactionId)
+ .toString();
}
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
index c599306..12f36a0 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContextBuilder.java
@@ -15,31 +15,26 @@
*/
package org.onosproject.store.primitives.impl;
-import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionContextBuilder;
+
/**
* Default Transaction Context Builder.
*/
public class DefaultTransactionContextBuilder extends TransactionContextBuilder {
private final TransactionId transactionId;
- private final DistributedPrimitiveCreator primitiveCreator;
- private final TransactionCoordinator transactionCoordinator;
+ private final TransactionManager transactionManager;
- public DefaultTransactionContextBuilder(TransactionId transactionId,
- DistributedPrimitiveCreator primitiveCreator,
- TransactionCoordinator transactionCoordinator) {
+ public DefaultTransactionContextBuilder(TransactionId transactionId, TransactionManager transactionManager) {
this.transactionId = transactionId;
- this.primitiveCreator = primitiveCreator;
- this.transactionCoordinator = transactionCoordinator;
+ this.transactionManager = transactionManager;
}
@Override
public TransactionContext build() {
return new DefaultTransactionContext(transactionId,
- primitiveCreator,
- transactionCoordinator);
+ new TransactionCoordinator(transactionId, transactionManager));
}
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
deleted file mode 100644
index d138c9b..0000000
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMap.java
+++ /dev/null
@@ -1,294 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Laboratory
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.store.primitives.impl;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import org.apache.commons.lang3.tuple.Pair;
-import org.onlab.util.HexString;
-import org.onosproject.store.primitives.MapUpdate;
-import org.onosproject.store.service.AsyncConsistentMap;
-import org.onosproject.store.service.ConsistentMap;
-import org.onosproject.store.service.MapTransaction;
-import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.TransactionContext;
-import org.onosproject.store.service.TransactionalMap;
-import org.onosproject.store.service.Versioned;
-
-import static com.google.common.base.Preconditions.*;
-
-import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-/**
- * Default Transactional Map implementation that provides a repeatable reads
- * transaction isolation level.
- *
- * @param <K> key type
- * @param <V> value type.
- */
-public class DefaultTransactionalMap<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
-
- private final TransactionContext txContext;
- private static final String TX_CLOSED_ERROR = "Transaction is closed";
- private final AsyncConsistentMap<K, V> backingMap;
- private final ConsistentMap<K, V> backingConsistentMap;
- private final String name;
- private final Serializer serializer;
- private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
- private final Map<K, V> writeCache = Maps.newConcurrentMap();
- private final Set<K> deleteSet = Sets.newConcurrentHashSet();
-
- private static final String ERROR_NULL_VALUE = "Null values are not allowed";
- private static final String ERROR_NULL_KEY = "Null key is not allowed";
-
- private final LoadingCache<K, String> keyCache = CacheBuilder.newBuilder()
- .softValues()
- .build(new CacheLoader<K, String>() {
-
- @Override
- public String load(K key) {
- return HexString.toHexString(serializer.encode(key));
- }
- });
-
- protected K dK(String key) {
- return serializer.decode(HexString.fromHexString(key));
- }
-
- public DefaultTransactionalMap(
- String name,
- AsyncConsistentMap<K, V> backingMap,
- TransactionContext txContext,
- Serializer serializer) {
- this.name = name;
- this.backingMap = backingMap;
- this.backingConsistentMap = backingMap.asConsistentMap();
- this.txContext = txContext;
- this.serializer = serializer;
- }
-
- @Override
- public V get(K key) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- checkNotNull(key, ERROR_NULL_KEY);
- if (deleteSet.contains(key)) {
- return null;
- }
- V latest = writeCache.get(key);
- if (latest != null) {
- return latest;
- } else {
- Versioned<V> v = readCache.computeIfAbsent(key, k -> backingConsistentMap.get(k));
- return v != null ? v.value() : null;
- }
- }
-
- @Override
- public boolean containsKey(K key) {
- return get(key) != null;
- }
-
- @Override
- public V put(K key, V value) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- checkNotNull(value, ERROR_NULL_VALUE);
-
- V latest = get(key);
- writeCache.put(key, value);
- deleteSet.remove(key);
- return latest;
- }
-
- @Override
- public V remove(K key) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- V latest = get(key);
- if (latest != null) {
- writeCache.remove(key);
- deleteSet.add(key);
- }
- return latest;
- }
-
- @Override
- public boolean remove(K key, V value) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- checkNotNull(value, ERROR_NULL_VALUE);
- V latest = get(key);
- if (Objects.equal(value, latest)) {
- remove(key);
- return true;
- }
- return false;
- }
-
- @Override
- public boolean replace(K key, V oldValue, V newValue) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- checkNotNull(oldValue, ERROR_NULL_VALUE);
- checkNotNull(newValue, ERROR_NULL_VALUE);
- V latest = get(key);
- if (Objects.equal(oldValue, latest)) {
- put(key, newValue);
- return true;
- }
- return false;
- }
-
- @Override
- public V putIfAbsent(K key, V value) {
- checkState(txContext.isOpen(), TX_CLOSED_ERROR);
- checkNotNull(value, ERROR_NULL_VALUE);
- V latest = get(key);
- if (latest == null) {
- put(key, value);
- }
- return latest;
- }
-
- @Override
- public CompletableFuture<Boolean> prepare() {
- return backingMap.prepare(new MapTransaction<>(txContext.transactionId(), updates()));
- }
-
- @Override
- public CompletableFuture<Void> commit() {
- return backingMap.commit(txContext.transactionId());
- }
-
- @Override
- public CompletableFuture<Void> rollback() {
- return backingMap.rollback(txContext.transactionId());
- }
-
- @Override
- public CompletableFuture<Boolean> prepareAndCommit() {
- return backingMap.prepareAndCommit(new MapTransaction<>(txContext.transactionId(), updates()));
- }
-
- @Override
- public int totalUpdates() {
- return updates().size();
- }
-
- @Override
- public boolean hasPendingUpdates() {
- return updatesStream().findAny().isPresent();
- }
-
- protected Stream<MapUpdate<K, V>> updatesStream() {
- return Stream.concat(
- // 1st stream: delete ops
- deleteSet.stream()
- .map(key -> Pair.of(key, readCache.get(key)))
- .filter(e -> e.getValue() != null)
- .map(e -> MapUpdate.<K, V>newBuilder()
- .withMapName(name)
- .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
- .withKey(e.getKey())
- .withCurrentVersion(e.getValue().version())
- .build()),
- // 2nd stream: write ops
- writeCache.entrySet().stream()
- .map(e -> {
- Versioned<V> original = readCache.get(e.getKey());
- if (original == null) {
- return MapUpdate.<K, V>newBuilder()
- .withMapName(name)
- .withType(MapUpdate.Type.PUT_IF_ABSENT)
- .withKey(e.getKey())
- .withValue(e.getValue())
- .build();
- } else {
- return MapUpdate.<K, V>newBuilder()
- .withMapName(name)
- .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
- .withKey(e.getKey())
- .withCurrentVersion(original.version())
- .withValue(e.getValue())
- .build();
- }
- }));
- }
-
- protected List<MapUpdate<K, V>> updates() {
- return updatesStream().collect(Collectors.toList());
- }
-
- protected List<MapUpdate<String, byte[]>> toMapUpdates() {
- List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
- deleteSet.forEach(key -> {
- Versioned<V> original = readCache.get(key);
- if (original != null) {
- updates.add(MapUpdate.<String, byte[]>newBuilder()
- .withMapName(name)
- .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
- .withKey(keyCache.getUnchecked(key))
- .withCurrentVersion(original.version())
- .build());
- }
- });
- writeCache.forEach((key, value) -> {
- Versioned<V> original = readCache.get(key);
- if (original == null) {
- updates.add(MapUpdate.<String, byte[]>newBuilder()
- .withMapName(name)
- .withType(MapUpdate.Type.PUT_IF_ABSENT)
- .withKey(keyCache.getUnchecked(key))
- .withValue(serializer.encode(value))
- .build());
- } else {
- updates.add(MapUpdate.<String, byte[]>newBuilder()
- .withMapName(name)
- .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
- .withKey(keyCache.getUnchecked(key))
- .withCurrentVersion(original.version())
- .withValue(serializer.encode(value))
- .build());
- }
- });
- return updates;
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(this)
- .add("backingMap", backingMap)
- .add("updates", updates())
- .toString();
- }
-
- /**
- * Discards all changes made to this transactional map.
- */
- protected void abort() {
- readCache.clear();
- writeCache.clear();
- deleteSet.clear();
- }
-}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
new file mode 100644
index 0000000..9acc377
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionalMapParticipant.java
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Map;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.lang3.tuple.Pair;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Versioned;
+
+/**
+ * Repeatable read based map participant.
+ */
+public class DefaultTransactionalMapParticipant<K, V> extends TransactionalMapParticipant<K, V> {
+ private final Map<K, Versioned<V>> readCache = Maps.newConcurrentMap();
+
+ public DefaultTransactionalMapParticipant(
+ ConsistentMap<K, V> backingMap, Transaction<MapUpdate<K, V>> transaction) {
+ super(backingMap, transaction);
+ }
+
+ @Override
+ protected V read(K key) {
+ Versioned<V> value = readCache.computeIfAbsent(key, backingMap::get);
+ return value != null ? value.value() : null;
+ }
+
+ @Override
+ protected Stream<MapUpdate<K, V>> records() {
+ return Stream.concat(deleteStream(), writeStream());
+ }
+
+ /**
+ * Returns a transaction record stream for deleted keys.
+ */
+ private Stream<MapUpdate<K, V>> deleteStream() {
+ return deleteSet.stream()
+ .map(key -> Pair.of(key, readCache.get(key)))
+ .filter(e -> e.getValue() != null)
+ .map(e -> MapUpdate.<K, V>newBuilder()
+ .withType(MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
+ .withKey(e.getKey())
+ .withCurrentVersion(e.getValue().version())
+ .build());
+ }
+
+ /**
+ * Returns a transaction record stream for updated keys.
+ */
+ private Stream<MapUpdate<K, V>> writeStream() {
+ return writeCache.entrySet().stream().map(entry -> {
+ Versioned<V> original = readCache.get(entry.getKey());
+ if (original == null) {
+ return MapUpdate.<K, V>newBuilder()
+ .withType(MapUpdate.Type.PUT_IF_ABSENT)
+ .withKey(entry.getKey())
+ .withValue(entry.getValue())
+ .build();
+ } else {
+ return MapUpdate.<K, V>newBuilder()
+ .withType(MapUpdate.Type.PUT_IF_VERSION_MATCH)
+ .withKey(entry.getKey())
+ .withCurrentVersion(original.version())
+ .withValue(entry.getValue())
+ .build();
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
index 2f4a665..c0b323a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentMap.java
@@ -29,10 +29,12 @@
import java.util.function.Predicate;
import org.onosproject.core.ApplicationId;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
@@ -170,8 +172,18 @@
}
@Override
- public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
- return delegateMap.prepare(transaction);
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ return delegateMap.begin(transactionId);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
+ return delegateMap.prepare(transactionLog);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
+ return delegateMap.prepareAndCommit(transactionLog);
}
@Override
@@ -185,11 +197,6 @@
}
@Override
- public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
- return delegateMap.prepareAndCommit(transaction);
- }
-
- @Override
public void addStatusChangeListener(Consumer<Status> listener) {
delegateMap.addStatusChangeListener(listener);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
index 48c0b85..f474f7b 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DelegatingAsyncConsistentTreeMap.java
@@ -16,10 +16,12 @@
package org.onosproject.store.primitives.impl;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
@@ -253,9 +255,18 @@
}
@Override
- public CompletableFuture<Boolean> prepare(
- MapTransaction<String, V> transaction) {
- return delegateMap.prepare(transaction);
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ return delegateMap.begin(transactionId);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, V>> transactionLog) {
+ return delegateMap.prepare(transactionLog);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, V>> transactionLog) {
+ return delegateMap.prepareAndCommit(transactionLog);
}
@Override
@@ -269,12 +280,6 @@
}
@Override
- public CompletableFuture<Boolean> prepareAndCommit(
- MapTransaction<String, V> transaction) {
- return delegateMap.prepareAndCommit(transaction);
- }
-
- @Override
public boolean equals(Object other) {
if (other instanceof DelegatingAsyncConsistentTreeMap) {
DelegatingAsyncConsistentTreeMap<V> that =
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
index f8844fb..ccffead 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/MeteredAsyncConsistentMap.java
@@ -26,11 +26,13 @@
import java.util.function.Function;
import java.util.function.Predicate;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import com.google.common.base.Throwables;
@@ -63,6 +65,7 @@
private static final String ENTRY_SET = "entrySet";
private static final String REPLACE = "replace";
private static final String COMPUTE_IF_ABSENT = "computeIfAbsent";
+ private static final String BEGIN = "begin";
private static final String PREPARE = "prepare";
private static final String COMMIT = "commit";
private static final String ROLLBACK = "rollback";
@@ -249,10 +252,17 @@
}
@Override
- public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ final MeteringAgent.Context timer = monitor.startTimer(BEGIN);
+ return super.begin(transactionId)
+ .whenComplete((r, e) -> timer.stop(e));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
final MeteringAgent.Context timer = monitor.startTimer(PREPARE);
- return super.prepare(transaction)
- .whenComplete((r, e) -> timer.stop(e));
+ return super.prepare(transactionLog)
+ .whenComplete((r, e) -> timer.stop(e));
}
@Override
@@ -270,10 +280,10 @@
}
@Override
- public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
final MeteringAgent.Context timer = monitor.startTimer(PREPARE_AND_COMMIT);
- return super.prepareAndCommit(transaction)
- .whenComplete((r, e) -> timer.stop(e));
+ return super.prepareAndCommit(transactionLog)
+ .whenComplete((r, e) -> timer.stop(e));
}
private class InternalMeteredMapEventListener implements MapEventListener<K, V> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
index 9adcb33..bf77105 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedAsyncConsistentMap.java
@@ -18,7 +18,6 @@
import static com.google.common.base.Preconditions.checkNotNull;
import java.util.Collection;
-import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -37,12 +36,12 @@
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
/**
@@ -200,54 +199,28 @@
}
@Override
- public CompletableFuture<Boolean> prepare(MapTransaction<K, V> transaction) {
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ throw new UnsupportedOperationException();
+ }
- Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
- transaction.updates().forEach(update -> {
- AsyncConsistentMap<K, V> map = getMap(update.key());
- updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
- });
- Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
- Maps.transformValues(updatesGroupedByMap,
- list -> new MapTransaction<>(transaction.transactionId(), list));
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K, V>> transactionLog) {
+ throw new UnsupportedOperationException();
+ }
- return Tools.allOf(transactionsByMap.entrySet()
- .stream()
- .map(e -> e.getKey().prepare(e.getValue()))
- .collect(Collectors.toList()))
- .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K, V>> transactionLog) {
+ throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
- return CompletableFuture.allOf(getMaps().stream()
- .map(e -> e.commit(transactionId))
- .toArray(CompletableFuture[]::new));
+ throw new UnsupportedOperationException();
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
- return CompletableFuture.allOf(getMaps().stream()
- .map(e -> e.rollback(transactionId))
- .toArray(CompletableFuture[]::new));
- }
-
- @Override
- public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K, V> transaction) {
- Map<AsyncConsistentMap<K, V>, List<MapUpdate<K, V>>> updatesGroupedByMap = Maps.newIdentityHashMap();
- transaction.updates().forEach(update -> {
- AsyncConsistentMap<K, V> map = getMap(update.key());
- updatesGroupedByMap.computeIfAbsent(map, k -> Lists.newLinkedList()).add(update);
- });
- Map<AsyncConsistentMap<K, V>, MapTransaction<K, V>> transactionsByMap =
- Maps.transformValues(updatesGroupedByMap,
- list -> new MapTransaction<>(transaction.transactionId(), list));
-
- return Tools.allOf(transactionsByMap.entrySet()
- .stream()
- .map(e -> e.getKey().prepareAndCommit(e.getValue()))
- .collect(Collectors.toList()))
- .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+ throw new UnsupportedOperationException();
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedTransactionalMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedTransactionalMap.java
new file mode 100644
index 0000000..90b3524
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/PartitionedTransactionalMap.java
@@ -0,0 +1,99 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Map;
+
+import com.google.common.base.MoreObjects;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.service.TransactionalMap;
+
+/**
+ * Partitioned transactional map.
+ */
+public class PartitionedTransactionalMap<K, V> implements TransactionalMap<K, V> {
+ protected final Map<PartitionId, TransactionalMapParticipant<K, V>> partitions;
+ protected final Hasher<K> hasher;
+
+ public PartitionedTransactionalMap(
+ Map<PartitionId, TransactionalMapParticipant<K, V>> partitions, Hasher<K> hasher) {
+ this.partitions = partitions;
+ this.hasher = hasher;
+ }
+
+ /**
+ * Returns the collection of map partitions.
+ *
+ * @return a collection of map partitions
+ */
+ @SuppressWarnings("unchecked")
+ Collection<TransactionParticipant> participants() {
+ return (Collection) partitions.values();
+ }
+
+ /**
+ * Returns the partition for the given key.
+ *
+ * @param key the key for which to return the partition
+ * @return the partition for the given key
+ */
+ private TransactionalMap<K, V> partition(K key) {
+ return partitions.get(hasher.hash(key));
+ }
+
+ @Override
+ public V get(K key) {
+ return partition(key).get(key);
+ }
+
+ @Override
+ public boolean containsKey(K key) {
+ return partition(key).containsKey(key);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ return partition(key).put(key, value);
+ }
+
+ @Override
+ public V remove(K key) {
+ return partition(key).remove(key);
+ }
+
+ @Override
+ public V putIfAbsent(K key, V value) {
+ return partition(key).putIfAbsent(key, value);
+ }
+
+ @Override
+ public boolean remove(K key, V value) {
+ return partition(key).remove(key, value);
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V newValue) {
+ return partition(key).replace(key, oldValue, newValue);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("partitions", partitions.values())
+ .toString();
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
index 48d9b7e..1fc821d 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StorageManager.java
@@ -42,7 +42,6 @@
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.AsyncAtomicValue;
-import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.AsyncConsistentMultimap;
import org.onosproject.store.service.AsyncConsistentTreeMap;
@@ -69,7 +68,6 @@
import org.slf4j.Logger;
import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.Futures;
/**
* Implementation for {@code StorageService} and {@code StorageAdminService}.
@@ -98,8 +96,7 @@
private final Supplier<TransactionId> transactionIdGenerator =
() -> TransactionId.from(UUID.randomUUID().toString());
private DistributedPrimitiveCreator federatedPrimitiveCreator;
- private AsyncConsistentMap<TransactionId, Transaction.State> transactions;
- private TransactionCoordinator transactionCoordinator;
+ private TransactionManager transactionManager;
@Activate
public void activate() {
@@ -108,13 +105,7 @@
.filter(id -> !id.equals(PartitionId.from(0)))
.forEach(id -> partitionMap.put(id, partitionService.getDistributedPrimitiveCreator(id)));
federatedPrimitiveCreator = new FederatedDistributedPrimitiveCreator(partitionMap);
- transactions = this.<TransactionId, Transaction.State>consistentMapBuilder()
- .withName("onos-transactions")
- .withSerializer(Serializer.using(KryoNamespaces.API,
- Transaction.class,
- Transaction.State.class))
- .buildAsyncMap();
- transactionCoordinator = new TransactionCoordinator(transactions);
+ transactionManager = new TransactionManager(this, partitionService);
log.info("Started");
}
@@ -187,9 +178,7 @@
@Override
public TransactionContextBuilder transactionContextBuilder() {
checkPermission(STORAGE_WRITE);
- return new DefaultTransactionContextBuilder(transactionIdGenerator.get(),
- federatedPrimitiveCreator,
- transactionCoordinator);
+ return new DefaultTransactionContextBuilder(transactionIdGenerator.get(), transactionManager);
}
@Override
@@ -259,7 +248,7 @@
@Override
public Collection<TransactionId> getPendingTransactions() {
- return Futures.getUnchecked(transactions.keySet());
+ return transactionManager.getPendingTransactions();
}
private List<MapInfo> listMapInfo(DistributedPrimitiveCreator creator) {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
index be5f05a..82b3919 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/Transaction.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-present Open Networking Laboratory
+ * Copyright 2017-present Open Networking Laboratory
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -16,88 +16,263 @@
package org.onosproject.store.primitives.impl;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
-import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Transactional;
+import org.onosproject.store.service.Version;
+import org.onosproject.store.service.TransactionContext;
+import org.onosproject.store.service.TransactionException;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkState;
/**
- * An immutable transaction object.
+ * Manages a transaction within the context of a single primitive.
+ * <p>
+ * The {@code Transaction} object is used to manage the transaction for a single partition primitive that implements
+ * the {@link Transactional} interface. It's used as a proxy for {@link TransactionContext}s to manage the transaction
+ * as it relates to a single piece of atomic state.
*/
-public class Transaction {
+public class Transaction<T> {
+ /**
+ * Transaction state.
+ * <p>
+ * The transaction state is used to indicate the phase within which the transaction is currently running.
+ */
enum State {
+
/**
- * Indicates a new transaction that is about to be prepared. All transactions
- * start their life in this state.
+ * Active transaction state.
+ * <p>
+ * The {@code ACTIVE} state represents a transaction in progress. Active transactions may or may not affect
+ * concurrently running transactions depending on the transaction's isolation level.
+ */
+ ACTIVE,
+
+ /**
+ * Preparing transaction state.
+ * <p>
+ * Once a transaction commitment begins, it enters the {@code PREPARING} phase of the two-phase commit protocol.
*/
PREPARING,
/**
- * Indicates a transaction that is successfully prepared i.e. all participants voted to commit
+ * Prepared transaction state.
+ * <p>
+ * Once the first phase of the two-phase commit protocol is complete, the transaction's state is set to
+ * {@code PREPARED}.
*/
PREPARED,
/**
- * Indicates a transaction that is about to be committed.
+ * Committing transaction state.
+ * <p>
+ * The {@code COMMITTING} state represents a transaction within the second phase of the two-phase commit
+ * protocol.
*/
COMMITTING,
/**
- * Indicates a transaction that has successfully committed.
+ * Committed transaction state.
+ * <p>
+ * Once the second phase of the two-phase commit protocol is complete, the transaction's state is set to
+ * {@code COMMITTED}.
*/
COMMITTED,
/**
- * Indicates a transaction that is about to be rolled back.
+ * Rolling back transaction state.
+ * <p>
+ * In the event of a two-phase lock failure, when the transaction is rolled back it will enter the
+ * {@code ROLLING_BACK} state while the rollback is in progress.
*/
- ROLLINGBACK,
+ ROLLING_BACK,
/**
- * Indicates a transaction that has been rolled back and all locks are released.
+ * Rolled back transaction state.
+ * <p>
+ * Once a transaction has been rolled back, it will enter the {@code ROLLED_BACK} state.
*/
- ROLLEDBACK
+ ROLLED_BACK,
}
- private final TransactionId transactionId;
- private final List<MapUpdate<String, byte[]>> updates;
- private final State state;
+ private static final String TX_OPEN_ERROR = "transaction already open";
+ private static final String TX_CLOSED_ERROR = "transaction not open";
+ private static final String TX_INACTIVE_ERROR = "transaction is not active";
+ private static final String TX_UNPREPARED_ERROR = "transaction has not been prepared";
- public Transaction(TransactionId transactionId, List<MapUpdate<String, byte[]>> updates) {
- this(transactionId, updates, State.PREPARING);
- }
+ protected final TransactionId transactionId;
+ protected final Transactional<T> transactionalObject;
+ private final AtomicBoolean open = new AtomicBoolean();
+ private volatile State state = State.ACTIVE;
- private Transaction(TransactionId transactionId,
- List<MapUpdate<String, byte[]>> updates,
- State state) {
+ public Transaction(TransactionId transactionId, Transactional<T> transactionalObject) {
this.transactionId = transactionId;
- this.updates = ImmutableList.copyOf(updates);
- this.state = state;
+ this.transactionalObject = transactionalObject;
}
- public TransactionId id() {
+ /**
+ * Returns the transaction identifier.
+ *
+ * @return the transaction identifier
+ */
+ public TransactionId transactionId() {
return transactionId;
}
- public List<MapUpdate<String, byte[]>> updates() {
- return updates;
- }
-
+ /**
+ * Returns the current transaction state.
+ *
+ * @return the current transaction state
+ */
public State state() {
return state;
}
- public Transaction transition(State newState) {
- return new Transaction(transactionId, updates, newState);
+ /**
+ * Returns a boolean indicating whether the transaction is open.
+ *
+ * @return indicates whether the transaction is open
+ */
+ public boolean isOpen() {
+ return open.get();
+ }
+
+ /**
+ * Opens the transaction, throwing an {@link IllegalStateException} if it's already open.
+ */
+ protected void open() {
+ if (!open.compareAndSet(false, true)) {
+ throw new IllegalStateException(TX_OPEN_ERROR);
+ }
+ }
+
+ /**
+ * Checks that the transaction is open and throws an {@link IllegalStateException} if not.
+ */
+ protected void checkOpen() {
+ checkState(isOpen(), TX_CLOSED_ERROR);
+ }
+
+ /**
+ * Checks that the transaction state is {@code ACTIVE} and throws an {@link IllegalStateException} if not.
+ */
+ protected void checkActive() {
+ checkState(state == State.ACTIVE, TX_INACTIVE_ERROR);
+ }
+
+ /**
+ * Checks that the transaction state is {@code PREPARED} and throws an {@link IllegalStateException} if not.
+ */
+ protected void checkPrepared() {
+ checkState(state == State.PREPARED, TX_UNPREPARED_ERROR);
+ }
+
+ /**
+ * Updates the transaction state.
+ *
+ * @param state the updated transaction state
+ */
+ protected void setState(State state) {
+ this.state = state;
+ }
+
+ /**
+ * Begins the transaction.
+ * <p>
+ * Locks are acquired when the transaction is begun to prevent concurrent transactions from operating on the shared
+ * resource to which this transaction relates.
+ *
+ * @return a completable future to be completed once the transaction has been started
+ */
+ public CompletableFuture<Version> begin() {
+ open();
+ return transactionalObject.begin(transactionId);
+ }
+
+ /**
+ * Prepares the transaction.
+ * <p>
+ * When preparing the transaction, the given list of updates for the shared resource will be prepared, and
+ * concurrent modification checks will be performed. The returned future may be completed with a
+ * {@link TransactionException} if a concurrent modification is detected for an isolation level that does
+ * not allow such modifications.
+ *
+ * @param updates the transaction updates
+ * @return a completable future to be completed once the transaction has been prepared
+ */
+ public CompletableFuture<Boolean> prepare(List<T> updates) {
+ checkOpen();
+ checkActive();
+ setState(State.PREPARING);
+ return transactionalObject.prepare(new TransactionLog<T>(transactionId, updates))
+ .thenApply(succeeded -> {
+ setState(State.PREPARED);
+ return succeeded;
+ });
+ }
+
+ /**
+ * Prepares and commits the transaction in a single atomic operation.
+ * <p>
+ * Both the prepare and commit phases of the protocol must be executed within a single atomic operation. This method
+ * is used to optimize committing transactions that operate only on a single partition within a single primitive.
+ *
+ * @param updates the transaction updates
+ * @return a completable future to be completed once the transaction has been prepared
+ */
+ public CompletableFuture<Boolean> prepareAndCommit(List<T> updates) {
+ checkOpen();
+ checkActive();
+ setState(State.PREPARING);
+ return transactionalObject.prepareAndCommit(new TransactionLog<T>(transactionId, updates))
+ .thenApply(succeeded -> {
+ setState(State.COMMITTED);
+ return succeeded;
+ });
+ }
+
+ /**
+ * Commits the transaction.
+ * <p>
+ * Performs the second phase of the two-phase commit protocol, committing the previously
+ * {@link #prepare(List) prepared} updates.
+ *
+ * @return a completable future to be completed once the transaction has been committed
+ */
+ public CompletableFuture<Void> commit() {
+ checkOpen();
+ checkPrepared();
+ setState(State.COMMITTING);
+ return transactionalObject.commit(transactionId).thenRun(() -> {
+ setState(State.COMMITTED);
+ });
+ }
+
+ /**
+ * Rolls back the transaction.
+ * <p>
+ * Rolls back the first phase of the two-phase commit protocol, cancelling prepared updates.
+ *
+ * @return a completable future to be completed once the transaction has been rolled back
+ */
+ public CompletableFuture<Void> rollback() {
+ checkOpen();
+ checkPrepared();
+ setState(State.ROLLING_BACK);
+ return transactionalObject.rollback(transactionId).thenRun(() -> {
+ setState(State.ROLLED_BACK);
+ });
}
@Override
public String toString() {
- return MoreObjects.toStringHelper(getClass())
+ return toStringHelper(this)
.add("transactionId", transactionId)
- .add("updates", updates)
.add("state", state)
.toString();
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
index 160ecf9..9826266 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
@@ -19,78 +19,123 @@
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import com.google.common.collect.Sets;
import org.onlab.util.Tools;
import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.CommitStatus;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.TransactionalMap;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
/**
- * Coordinator for a two-phase commit protocol.
+ * Transaction coordinator.
*/
public class TransactionCoordinator {
+ protected final TransactionId transactionId;
+ protected final TransactionManager transactionManager;
+ protected final Set<TransactionParticipant> transactionParticipants = Sets.newConcurrentHashSet();
- private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
-
- public TransactionCoordinator(AsyncConsistentMap<TransactionId, Transaction.State> transactions) {
- this.transactions = transactions;
+ public TransactionCoordinator(TransactionId transactionId, TransactionManager transactionManager) {
+ this.transactionId = transactionId;
+ this.transactionManager = transactionManager;
}
/**
- * Commits a transaction.
+ * Returns a transactional map for this transaction.
*
- * @param transactionId transaction identifier
- * @param transactionParticipants set of transaction participants
- * @return future for commit result
+ * @param name the transactional map name
+ * @param serializer the serializer
+ * @param <K> key type
+ * @param <V> value type
+ * @return a transactional map for this transaction
*/
- CompletableFuture<CommitStatus> commit(TransactionId transactionId,
- Set<TransactionParticipant> transactionParticipants) {
- int totalUpdates = transactionParticipants.stream()
- .map(TransactionParticipant::totalUpdates)
- .reduce(Math::addExact)
- .orElse(0);
+ public <K, V> TransactionalMap<K, V> getTransactionalMap(String name, Serializer serializer) {
+ PartitionedTransactionalMap<K, V> map = transactionManager.getTransactionalMap(name, serializer, this);
+ transactionParticipants.addAll(map.participants());
+ return map;
+ }
- if (totalUpdates == 0) {
+ /**
+ * Commits the transaction.
+ *
+ * @return the transaction commit status
+ */
+ public CompletableFuture<CommitStatus> commit() {
+ long totalParticipants = transactionParticipants.stream()
+ .filter(TransactionParticipant::hasPendingUpdates)
+ .count();
+
+ if (totalParticipants == 0) {
return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
- } else if (totalUpdates == 1) {
+ } else if (totalParticipants == 1) {
return transactionParticipants.stream()
- .filter(p -> p.totalUpdates() == 1)
- .findFirst()
- .get()
- .prepareAndCommit()
- .thenApply(v -> v ? CommitStatus.SUCCESS : CommitStatus.FAILURE);
+ .filter(TransactionParticipant::hasPendingUpdates)
+ .findFirst()
+ .get()
+ .prepareAndCommit()
+ .thenApply(v -> v ? CommitStatus.SUCCESS : CommitStatus.FAILURE);
} else {
- CompletableFuture<CommitStatus> status = transactions.put(transactionId, Transaction.State.PREPARING)
- .thenCompose(v -> this.doPrepare(transactionParticipants))
+ Set<TransactionParticipant> transactionParticipants = this.transactionParticipants.stream()
+ .filter(TransactionParticipant::hasPendingUpdates)
+ .collect(Collectors.toSet());
+
+ CompletableFuture<CommitStatus> status = transactionManager.updateState(
+ transactionId, Transaction.State.PREPARING)
+ .thenCompose(v -> prepare(transactionParticipants))
.thenCompose(result -> result
- ? transactions.put(transactionId, Transaction.State.COMMITTING)
- .thenCompose(v -> doCommit(transactionParticipants))
- .thenApply(v -> CommitStatus.SUCCESS)
- : transactions.put(transactionId, Transaction.State.ROLLINGBACK)
- .thenCompose(v -> doRollback(transactionParticipants))
- .thenApply(v -> CommitStatus.FAILURE));
- return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
+ ? transactionManager.updateState(transactionId, Transaction.State.COMMITTING)
+ .thenCompose(v -> commit(transactionParticipants))
+ .thenApply(v -> CommitStatus.SUCCESS)
+ : transactionManager.updateState(transactionId, Transaction.State.ROLLING_BACK)
+ .thenCompose(v -> rollback(transactionParticipants))
+ .thenApply(v -> CommitStatus.FAILURE));
+ return status.thenCompose(v -> transactionManager.remove(transactionId).thenApply(u -> v));
}
}
- private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
+ /**
+ * Performs the prepare phase of the two-phase commit protocol for the given transaction participants.
+ *
+ * @param transactionParticipants the transaction participants for which to prepare the transaction
+ * @return a completable future indicating whether <em>all</em> prepares succeeded
+ */
+ protected CompletableFuture<Boolean> prepare(Set<TransactionParticipant> transactionParticipants) {
return Tools.allOf(transactionParticipants.stream()
- .filter(TransactionParticipant::hasPendingUpdates)
- .map(TransactionParticipant::prepare)
- .collect(Collectors.toList()))
- .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
+ .map(TransactionParticipant::prepare)
+ .collect(Collectors.toList()))
+ .thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
}
- private CompletableFuture<Void> doCommit(Set<TransactionParticipant> transactionParticipants) {
+ /**
+ * Performs the commit phase of the two-phase commit protocol for the given transaction participants.
+ *
+ * @param transactionParticipants the transaction participants for which to commit the transaction
+ * @return a completable future to be completed once the commits are complete
+ */
+ protected CompletableFuture<Void> commit(Set<TransactionParticipant> transactionParticipants) {
return CompletableFuture.allOf(transactionParticipants.stream()
- .filter(TransactionParticipant::hasPendingUpdates)
- .map(TransactionParticipant::commit)
- .toArray(CompletableFuture[]::new));
+ .map(TransactionParticipant::commit)
+ .toArray(CompletableFuture[]::new));
}
- private CompletableFuture<Void> doRollback(Set<TransactionParticipant> transactionParticipants) {
+ /**
+ * Rolls back transactions for the given participants.
+ *
+ * @param transactionParticipants the transaction participants for which to roll back the transaction
+ * @return a completable future to be completed once the rollbacks are complete
+ */
+ protected CompletableFuture<Void> rollback(Set<TransactionParticipant> transactionParticipants) {
return CompletableFuture.allOf(transactionParticipants.stream()
- .filter(TransactionParticipant::hasPendingUpdates)
- .map(TransactionParticipant::rollback)
- .toArray(CompletableFuture[]::new));
+ .map(TransactionParticipant::rollback)
+ .toArray(CompletableFuture[]::new));
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("transactionId", transactionId)
+ .add("participants", transactionParticipants)
+ .toString();
}
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
new file mode 100644
index 0000000..5c246de
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionManager.java
@@ -0,0 +1,153 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.primitives.impl;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.hash.Hashing;
+import com.google.common.util.concurrent.Futures;
+import org.onosproject.cluster.PartitionId;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.primitives.PartitionService;
+import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.TransactionException;
+
+/**
+ * Transaction manager for managing state shared across multiple transactions.
+ */
+public class TransactionManager {
+ private static final int DEFAULT_CACHE_SIZE = 100;
+
+ private final PartitionService partitionService;
+ private final List<PartitionId> sortedPartitions;
+ private final AsyncConsistentMap<TransactionId, Transaction.State> transactions;
+ private final int cacheSize;
+ private final Map<PartitionId, Cache<String, AsyncConsistentMap>> partitionCache = Maps.newConcurrentMap();
+
+ public TransactionManager(StorageService storageService, PartitionService partitionService) {
+ this(storageService, partitionService, DEFAULT_CACHE_SIZE);
+ }
+
+ public TransactionManager(StorageService storageService, PartitionService partitionService, int cacheSize) {
+ this.partitionService = partitionService;
+ this.cacheSize = cacheSize;
+ this.transactions = storageService.<TransactionId, Transaction.State>consistentMapBuilder()
+ .withName("onos-transactions")
+ .withSerializer(Serializer.using(KryoNamespaces.API,
+ Transaction.class,
+ Transaction.State.class))
+ .buildAsyncMap();
+ this.sortedPartitions = Lists.newArrayList(partitionService.getAllPartitionIds());
+ Collections.sort(sortedPartitions);
+ }
+
+ /**
+ * Returns the collection of currently pending transactions.
+ *
+ * @return a collection of currently pending transactions
+ */
+ public Collection<TransactionId> getPendingTransactions() {
+ return Futures.getUnchecked(transactions.keySet());
+ }
+
+ /**
+ * Returns a partitioned transactional map for use within a transaction context.
+ * <p>
+ * The transaction coordinator will return a map that takes advantage of caching that's shared across transaction
+ * contexts.
+ *
+ * @param name the map name
+ * @param serializer the map serializer
+ * @param transactionCoordinator the transaction coordinator for which the map is being created
+ * @param <K> key type
+ * @param <V> value type
+ * @return a partitioned transactional map
+ */
+ <K, V> PartitionedTransactionalMap<K, V> getTransactionalMap(
+ String name,
+ Serializer serializer,
+ TransactionCoordinator transactionCoordinator) {
+ Map<PartitionId, TransactionalMapParticipant<K, V>> partitions = new HashMap<>();
+ for (PartitionId partitionId : partitionService.getAllPartitionIds()) {
+ partitions.put(partitionId, getTransactionalMapPartition(
+ name, partitionId, serializer, transactionCoordinator));
+ }
+
+ Hasher<K> hasher = key -> {
+ int hashCode = Hashing.sha256().hashBytes(serializer.encode(key)).asInt();
+ return sortedPartitions.get(Math.abs(hashCode) % sortedPartitions.size());
+ };
+ return new PartitionedTransactionalMap<>(partitions, hasher);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <K, V> TransactionalMapParticipant<K, V> getTransactionalMapPartition(
+ String mapName,
+ PartitionId partitionId,
+ Serializer serializer,
+ TransactionCoordinator transactionCoordinator) {
+ Cache<String, AsyncConsistentMap> mapCache = partitionCache.computeIfAbsent(partitionId, p ->
+ CacheBuilder.newBuilder().maximumSize(cacheSize / partitionService.getNumberOfPartitions()).build());
+ try {
+ AsyncConsistentMap<K, V> baseMap = partitionService.getDistributedPrimitiveCreator(partitionId)
+ .newAsyncConsistentMap(mapName, serializer);
+ AsyncConsistentMap<K, V> asyncMap = mapCache.get(mapName, () ->
+ DistributedPrimitives.newCachingMap(baseMap));
+
+ Transaction<MapUpdate<K, V>> transaction = new Transaction<>(
+ transactionCoordinator.transactionId,
+ baseMap);
+ return new DefaultTransactionalMapParticipant<>(asyncMap.asConsistentMap(), transaction);
+ } catch (ExecutionException e) {
+ throw new TransactionException(e);
+ }
+ }
+
+ /**
+ * Updates the state of a transaction in the transaction registry.
+ *
+ * @param transactionId the transaction identifier
+ * @param state the state of the transaction
+ * @return a completable future to be completed once the transaction state has been updated in the registry
+ */
+ CompletableFuture<Void> updateState(TransactionId transactionId, Transaction.State state) {
+ return transactions.put(transactionId, state).thenApply(v -> null);
+ }
+
+ /**
+ * Removes the given transaction from the transaction registry.
+ *
+ * @param transactionId the transaction identifier
+ * @return a completable future to be completed once the transaction state has been removed from the registry
+ */
+ CompletableFuture<Void> remove(TransactionId transactionId) {
+ return transactions.remove(transactionId).thenApply(v -> null);
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
index 0780bf6..f97d448 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionParticipant.java
@@ -23,40 +23,38 @@
public interface TransactionParticipant {
/**
- * Returns if this participant has updates that need to be committed.
- * @return {@code true} if yes; {@code false} otherwise
+ * Returns a boolean indicating whether the participant has pending updates.
+ *
+ * @return indicates whether the participant has pending updates
*/
- default boolean hasPendingUpdates() {
- return totalUpdates() > 0;
- }
-
- /**
- * Returns the number of updates that need to committed for this participant.
- * @return update count.
- */
- int totalUpdates();
-
- /**
- * Executes the prepare and commit steps in a single go.
- * @return {@code true} is successful i.e updates are committed; {@code false} otherwise
- */
- CompletableFuture<Boolean> prepareAndCommit();
+ boolean hasPendingUpdates();
/**
* Executes the prepare phase.
+ *
* @return {@code true} is successful; {@code false} otherwise
*/
CompletableFuture<Boolean> prepare();
/**
* Attempts to execute the commit phase for previously prepared transaction.
+ *
* @return future that is completed when the operation completes
*/
CompletableFuture<Void> commit();
/**
+ * Executes the prepare and commit phases atomically.
+ *
+ * @return {@code true} is successful; {@code false} otherwise
+ */
+ CompletableFuture<Boolean> prepareAndCommit();
+
+ /**
* Attempts to execute the rollback phase for previously prepared transaction.
+ *
* @return future that is completed when the operation completes
*/
CompletableFuture<Void> rollback();
+
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
new file mode 100644
index 0000000..4429a1b
--- /dev/null
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionalMapParticipant.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2017-present Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.store.primitives.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.base.Objects;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.onosproject.store.primitives.MapUpdate;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedPrimitive;
+import org.onosproject.store.service.TransactionException;
+import org.onosproject.store.service.TransactionalMap;
+import org.onosproject.store.service.Version;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+/**
+ * Base class for participants within a single {@link TransactionalMap}.
+ * <p>
+ * This class provides the basic functionality required by transactional map participants and provides methods
+ * for defining operations specific to individual isolation levels.
+ *
+ * @param <K> key type
+ * @param <V> value type.
+ */
+public abstract class TransactionalMapParticipant<K, V> implements TransactionalMap<K, V>, TransactionParticipant {
+ private static final String TX_CLOSED_ERROR = "Transaction is closed";
+ private static final String ERROR_NULL_VALUE = "Null values are not allowed";
+ private static final String ERROR_NULL_KEY = "Null key is not allowed";
+
+ protected final ConsistentMap<K, V> backingMap;
+ protected final Transaction<MapUpdate<K, V>> transaction;
+ protected final Map<K, V> writeCache = Maps.newConcurrentMap();
+ protected final Set<K> deleteSet = Sets.newConcurrentHashSet();
+ protected final List<MapUpdate<K, V>> log = new ArrayList<>();
+ protected volatile Version lock;
+
+ protected TransactionalMapParticipant(
+ ConsistentMap<K, V> backingMap,
+ Transaction<MapUpdate<K, V>> transaction) {
+ this.backingMap = backingMap;
+ this.transaction = transaction;
+ }
+
+ /**
+ * Starts the transaction for this partition when a read occurs.
+ * <p>
+ * Acquiring a pessimistic lock at the start of the transaction ensures that underlying cached maps have been
+ * synchronized prior to a read.
+ */
+ private void beginTransaction() {
+ if (!transaction.isOpen()) {
+ try {
+ lock = transaction.begin()
+ .get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new TransactionException.Interrupted();
+ } catch (TimeoutException e) {
+ throw new TransactionException.Timeout();
+ } catch (ExecutionException e) {
+ Throwables.propagateIfPossible(e.getCause());
+ throw new TransactionException(e.getCause());
+ }
+ }
+ }
+
+ @Override
+ public V get(K key) {
+ // Start the transaction for this primitive/partition if necessary.
+ beginTransaction();
+
+ checkState(transaction.isOpen(), TX_CLOSED_ERROR);
+ checkNotNull(key, ERROR_NULL_KEY);
+
+ if (deleteSet.contains(key)) {
+ return null;
+ }
+
+ V latest = writeCache.get(key);
+ if (latest != null) {
+ return latest;
+ } else {
+ return read(key);
+ }
+ }
+
+ /**
+ * Executes a get operation based on the transaction isolation level.
+ *
+ * @param key the key to look up
+ * @return the value
+ */
+ protected abstract V read(K key);
+
+ @Override
+ public boolean containsKey(K key) {
+ return get(key) != null;
+ }
+
+ @Override
+ public V put(K key, V value) {
+ checkNotNull(value, ERROR_NULL_VALUE);
+
+ V latest = get(key);
+ writeCache.put(key, value);
+ deleteSet.remove(key);
+ return latest;
+ }
+
+ @Override
+ public V remove(K key) {
+ V latest = get(key);
+ if (latest != null) {
+ writeCache.remove(key);
+ deleteSet.add(key);
+ }
+ return latest;
+ }
+
+ @Override
+ public boolean remove(K key, V value) {
+ checkNotNull(value, ERROR_NULL_VALUE);
+
+ V latest = get(key);
+ if (Objects.equal(value, latest)) {
+ remove(key);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean replace(K key, V oldValue, V newValue) {
+ checkNotNull(oldValue, ERROR_NULL_VALUE);
+ checkNotNull(newValue, ERROR_NULL_VALUE);
+
+ V latest = get(key);
+ if (Objects.equal(oldValue, latest)) {
+ put(key, newValue);
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public V putIfAbsent(K key, V value) {
+ checkNotNull(value, ERROR_NULL_VALUE);
+
+ V latest = get(key);
+ if (latest == null) {
+ put(key, value);
+ }
+ return latest;
+ }
+
+ @Override
+ public boolean hasPendingUpdates() {
+ return records().findAny().isPresent();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare() {
+ return transaction.prepare(log());
+ }
+
+ @Override
+ public CompletableFuture<Void> commit() {
+ return transaction.commit();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit() {
+ return transaction.prepareAndCommit(log());
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback() {
+ return transaction.rollback();
+ }
+
+ /**
+ * Returns a list of updates performed within this map partition.
+ *
+ * @return a list of map updates
+ */
+ protected List<MapUpdate<K, V>> log() {
+ return records().collect(Collectors.toList());
+ }
+
+ /**
+ * Returns a stream of updates performed within this map partition.
+ *
+ * @return a stream of map updates
+ */
+ protected abstract Stream<MapUpdate<K, V>> records();
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(this)
+ .add("backingMap", backingMap)
+ .add("updates", log())
+ .toString();
+ }
+}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
index 2afb5df..c2f167a 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentMap.java
@@ -29,11 +29,13 @@
import java.util.stream.Collectors;
import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.Maps;
@@ -266,9 +268,27 @@
}
@Override
- public CompletableFuture<Boolean> prepare(MapTransaction<K1, V1> transaction) {
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
try {
- return backingMap.prepare(transaction.map(keyEncoder, valueEncoder));
+ return backingMap.begin(transactionId);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<K1, V1>> transactionLog) {
+ try {
+ return backingMap.prepare(transactionLog.map(record -> record.map(keyEncoder, valueEncoder)));
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<K1, V1>> transactionLog) {
+ try {
+ return backingMap.prepareAndCommit(transactionLog.map(record -> record.map(keyEncoder, valueEncoder)));
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
@@ -276,18 +296,17 @@
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
- return backingMap.commit(transactionId);
+ try {
+ return backingMap.commit(transactionId);
+ } catch (Exception e) {
+ return Tools.exceptionalFuture(e);
+ }
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
- return backingMap.rollback(transactionId);
- }
-
- @Override
- public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<K1, V1> transaction) {
try {
- return backingMap.prepareAndCommit(transaction.map(keyEncoder, valueEncoder));
+ return backingMap.rollback(transactionId);
} catch (Exception e) {
return Tools.exceptionalFuture(e);
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
index 747008f..2f0683e 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TranscodingAsyncConsistentTreeMap.java
@@ -18,11 +18,13 @@
import com.google.common.collect.Maps;
import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
@@ -361,27 +363,30 @@
}
@Override
- public CompletableFuture<Boolean> prepare(
- MapTransaction<String, V1> transaction) {
- throw new UnsupportedOperationException("This operation is not yet " +
- "supported.");
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ throw new UnsupportedOperationException("This operation is not yet supported.");
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, V1>> transactionLog) {
+ throw new UnsupportedOperationException("This operation is not yet supported.");
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, V1>> transactionLog) {
+ throw new UnsupportedOperationException("This operation is not yet supported.");
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
- throw new UnsupportedOperationException("This operation is not yet " +
- "supported."); }
+ throw new UnsupportedOperationException("This operation is not yet supported.");
+ }
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
- throw new UnsupportedOperationException("This operation is not yet " +
- "supported."); }
+ throw new UnsupportedOperationException("This operation is not yet supported.");
+ }
- @Override
- public CompletableFuture<Boolean> prepareAndCommit(
- MapTransaction<String, V1> transaction) {
- throw new UnsupportedOperationException("This operation is not yet " +
- "supported."); }
private class InternalBackingMapEventListener
implements MapEventListener<String, V2> {
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index afa67f5..8a13c01 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -36,6 +36,7 @@
import org.onlab.util.Match;
import org.onlab.util.Tools;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Clear;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.ContainsKey;
@@ -47,6 +48,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionBegin;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
@@ -58,7 +60,8 @@
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import com.google.common.collect.ImmutableSet;
@@ -294,8 +297,22 @@
}
@Override
- public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]> transaction) {
- return client.submit(new TransactionPrepare(transaction)).thenApply(v -> v == PrepareResult.OK);
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ return client.submit(new TransactionBegin()).thenApply(Version::new);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(
+ TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+ return client.submit(new TransactionPrepare(transactionLog))
+ .thenApply(v -> v == PrepareResult.OK);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(
+ TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+ return client.submit(new TransactionPrepareAndCommit(transactionLog))
+ .thenApply(v -> v == PrepareResult.OK);
}
@Override
@@ -305,13 +322,7 @@
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
- return client.submit(new TransactionRollback(transactionId))
- .thenApply(v -> null);
- }
-
- @Override
- public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String, byte[]> transaction) {
- return client.submit(new TransactionPrepareAndCommit(transaction)).thenApply(v -> v == PrepareResult.OK);
+ return client.submit(new TransactionRollback(transactionId)).thenApply(v -> null);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
index 20b13f1..0c5dd76 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapCommands.java
@@ -30,8 +30,9 @@
import java.util.Set;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Versioned;
import com.google.common.base.MoreObjects;
@@ -202,39 +203,49 @@
}
/**
+ * Transaction begin query.
+ */
+ public static class TransactionBegin extends MapQuery<Long> {
+ @Override
+ public ConsistencyLevel consistency() {
+ return ConsistencyLevel.LINEARIZABLE;
+ }
+ }
+
+ /**
* Map prepare command.
*/
@SuppressWarnings("serial")
public static class TransactionPrepare extends MapCommand<PrepareResult> {
- private MapTransaction<String, byte[]> mapTransaction;
+ private TransactionLog<MapUpdate<String, byte[]>> transactionLog;
public TransactionPrepare() {
}
- public TransactionPrepare(MapTransaction<String, byte[]> mapTransaction) {
- this.mapTransaction = mapTransaction;
+ public TransactionPrepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+ this.transactionLog = transactionLog;
}
- public MapTransaction<String, byte[]> transaction() {
- return mapTransaction;
+ public TransactionLog<MapUpdate<String, byte[]>> transactionLog() {
+ return transactionLog;
}
@Override
public void writeObject(BufferOutput<?> buffer, Serializer serializer) {
super.writeObject(buffer, serializer);
- serializer.writeObject(mapTransaction, buffer);
+ serializer.writeObject(transactionLog, buffer);
}
@Override
public void readObject(BufferInput<?> buffer, Serializer serializer) {
super.readObject(buffer, serializer);
- mapTransaction = serializer.readObject(buffer);
+ transactionLog = serializer.readObject(buffer);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("mapTransaction", mapTransaction)
+ .add("transactionLog", transactionLog)
.toString();
}
}
@@ -247,8 +258,8 @@
public TransactionPrepareAndCommit() {
}
- public TransactionPrepareAndCommit(MapTransaction<String, byte[]> mapTransaction) {
- super(mapTransaction);
+ public TransactionPrepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+ super(transactionLog);
}
@Override
@@ -592,6 +603,7 @@
registry.register(Size.class, -769);
registry.register(Listen.class, -770);
registry.register(Unlisten.class, -771);
+ registry.register(TransactionBegin.class, -777);
registry.register(TransactionPrepare.class, -772);
registry.register(TransactionCommit.class, -773);
registry.register(TransactionRollback.class, -774);
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
index 3a99b02..e844b8f 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMapState.java
@@ -52,6 +52,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.KeySet;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Listen;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Size;
+import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionBegin;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionCommit;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepare;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.TransactionPrepareAndCommit;
@@ -60,7 +61,7 @@
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.UpdateAndGet;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentMapCommands.Values;
import org.onosproject.store.service.MapEvent;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
import org.onosproject.store.service.Versioned;
import org.slf4j.Logger;
@@ -110,6 +111,7 @@
// Commands
executor.register(UpdateAndGet.class, this::updateAndGet);
executor.register(AtomixConsistentMapCommands.Clear.class, this::clear);
+ executor.register(TransactionBegin.class, this::begin);
executor.register(TransactionPrepare.class, this::prepare);
executor.register(TransactionCommit.class, this::commit);
executor.register(TransactionRollback.class, this::rollback);
@@ -374,6 +376,20 @@
}
/**
+ * Handles a begin commit.
+ *
+ * @param commit transaction begin commit
+ * @return transaction state version
+ */
+ protected long begin(Commit<? extends TransactionBegin> commit) {
+ try {
+ return commit.index();
+ } finally {
+ commit.close();
+ }
+ }
+
+ /**
* Handles an prepare and commit commit.
*
* @param commit transaction prepare and commit commit
@@ -382,7 +398,7 @@
protected PrepareResult prepareAndCommit(Commit<? extends TransactionPrepareAndCommit> commit) {
PrepareResult prepareResult = prepare(commit);
if (prepareResult == PrepareResult.OK) {
- commitInternal(commit.operation().transaction().transactionId(), commit.index());
+ commitInternal(commit.operation().transactionLog().transactionId(), commit.index());
}
return prepareResult;
}
@@ -396,8 +412,8 @@
protected PrepareResult prepare(Commit<? extends TransactionPrepare> commit) {
boolean ok = false;
try {
- MapTransaction<String, byte[]> transaction = commit.operation().transaction();
- for (MapUpdate<String, byte[]> update : transaction.updates()) {
+ TransactionLog<MapUpdate<String, byte[]>> transaction = commit.operation().transactionLog();
+ for (MapUpdate<String, byte[]> update : transaction.records()) {
String key = update.key();
if (preparedKeys.contains(key)) {
return PrepareResult.CONCURRENT_TRANSACTION;
@@ -416,7 +432,7 @@
// No violations detected. Add to pendingTransactions and mark
// modified keys as locked for updates.
pendingTransactions.put(transaction.transactionId(), commit);
- transaction.updates().forEach(u -> preparedKeys.add(u.key()));
+ transaction.records().forEach(u -> preparedKeys.add(u.key()));
ok = true;
return PrepareResult.OK;
} catch (Exception e) {
@@ -453,16 +469,16 @@
if (prepareCommit == null) {
return CommitResult.UNKNOWN_TRANSACTION_ID;
}
- MapTransaction<String, byte[]> transaction = prepareCommit.operation().transaction();
+ TransactionLog<MapUpdate<String, byte[]>> transaction = prepareCommit.operation().transactionLog();
long totalReferencesToCommit = transaction
- .updates()
+ .records()
.stream()
.filter(update -> update.type() != MapUpdate.Type.REMOVE_IF_VERSION_MATCH)
.count();
CountDownCompleter<Commit<? extends TransactionPrepare>> completer =
new CountDownCompleter<>(prepareCommit, totalReferencesToCommit, Commit::close);
List<MapEvent<String, byte[]>> eventsToPublish = Lists.newArrayList();
- for (MapUpdate<String, byte[]> update : transaction.updates()) {
+ for (MapUpdate<String, byte[]> update : transaction.records()) {
String key = update.key();
checkState(preparedKeys.remove(key), "key is not prepared");
MapEntryValue previousValue = mapEntries.remove(key);
@@ -496,8 +512,8 @@
return RollbackResult.UNKNOWN_TRANSACTION_ID;
} else {
prepareCommit.operation()
- .transaction()
- .updates()
+ .transactionLog()
+ .records()
.forEach(u -> preparedKeys.remove(u.key()));
prepareCommit.close();
return RollbackResult.OK;
@@ -648,7 +664,7 @@
@Override
public byte[] value() {
- MapTransaction<String, byte[]> transaction = completer.object().operation().transaction();
+ TransactionLog<MapUpdate<String, byte[]>> transaction = completer.object().operation().transactionLog();
return valueForKey(key, transaction);
}
@@ -662,8 +678,8 @@
completer.countDown();
}
- private byte[] valueForKey(String key, MapTransaction<String, byte[]> transaction) {
- MapUpdate<String, byte[]> update = transaction.updates()
+ private byte[] valueForKey(String key, TransactionLog<MapUpdate<String, byte[]>> transaction) {
+ MapUpdate<String, byte[]> update = transaction.records()
.stream()
.filter(u -> u.key().equals(key))
.findFirst()
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
index 3dd34fd..bf462c9 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentTreeMap.java
@@ -21,6 +21,7 @@
import io.atomix.resource.AbstractResource;
import io.atomix.resource.ResourceTypeInfo;
import org.onlab.util.Match;
+import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FirstKey;
import org.onosproject.store.primitives.resources.impl.AtomixConsistentTreeMapCommands.FloorEntry;
@@ -30,7 +31,8 @@
import org.onosproject.store.service.AsyncConsistentTreeMap;
import org.onosproject.store.service.MapEvent;
import org.onosproject.store.service.MapEventListener;
-import org.onosproject.store.service.MapTransaction;
+import org.onosproject.store.service.TransactionLog;
+import org.onosproject.store.service.Version;
import org.onosproject.store.service.Versioned;
import java.util.Collection;
@@ -368,40 +370,38 @@
@Override
public CompletableFuture<NavigableSet<String>> navigableKeySet() {
- throw new UnsupportedOperationException("This operation is not yet " +
- "supported.");
+ throw new UnsupportedOperationException("This operation is not yet supported.");
}
@Override
public CompletableFuture<NavigableMap<String, byte[]>> subMap(
String upperKey, String lowerKey, boolean inclusiveUpper,
boolean inclusiveLower) {
- throw new UnsupportedOperationException("This operation is not yet " +
- "supported."); }
-
- @Override
- public CompletableFuture<Boolean> prepareAndCommit(MapTransaction<String,
- byte[]> transaction) {
- throw new UnsupportedOperationException("This operation is not yet " +
- "supported.");
+ throw new UnsupportedOperationException("This operation is not yet supported.");
}
@Override
- public CompletableFuture<Boolean> prepare(MapTransaction<String, byte[]>
- transaction) {
- throw new UnsupportedOperationException("This operation is not yet " +
- "supported.");
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ throw new UnsupportedOperationException("This operation is not yet supported.");
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+ throw new UnsupportedOperationException("This operation is not yet supported.");
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<MapUpdate<String, byte[]>> transactionLog) {
+ throw new UnsupportedOperationException("This operation is not yet supported.");
}
@Override
public CompletableFuture<Void> commit(TransactionId transactionId) {
- throw new UnsupportedOperationException("This operation is not yet " +
- "supported.");
+ throw new UnsupportedOperationException("This operation is not yet supported.");
}
@Override
public CompletableFuture<Void> rollback(TransactionId transactionId) {
- throw new UnsupportedOperationException("This operation is not yet " +
- "supported.");
+ throw new UnsupportedOperationException("This operation is not yet supported.");
}
}