ONOS-4218: Fixes for resource store transaction failures
Change-Id: Ie48bb04d7daf6ed7b63c33a3c3c2703496179aa6
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index 43cacb3..30a6f26 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -27,6 +27,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
+import org.onosproject.store.service.CommitStatus;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
@@ -96,22 +97,23 @@
@SuppressWarnings("unchecked")
@Override
- public boolean commit() {
+ public CompletableFuture<CommitStatus> commit() {
// TODO: rework commit implementation to be more intuitive
checkState(isOpen, TX_NOT_OPEN_ERROR);
- CommitResult result = null;
+ CommitStatus status;
try {
List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
txMaps.values().forEach(m -> updates.addAll(m.toMapUpdates()));
Transaction transaction = new Transaction(transactionId, updates);
- result = Futures.getUnchecked(transactionCommitter.apply(transaction));
- return result == CommitResult.OK;
+ status = Futures.getUnchecked(transactionCommitter.apply(transaction)) == CommitResult.OK
+ ? CommitStatus.SUCCESS : CommitStatus.FAILURE;
} catch (Exception e) {
abort();
- return false;
+ status = CommitStatus.FAILURE;
} finally {
isOpen = false;
}
+ return CompletableFuture.completedFuture(status);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
index 39d24cf..e9e75a5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
@@ -16,13 +16,16 @@
package org.onosproject.store.primitives.impl;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.CommitStatus;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
+import org.onosproject.utils.MeteringAgent;
import com.google.common.collect.Sets;
@@ -36,6 +39,7 @@
private final TransactionId transactionId;
private final TransactionCoordinator transactionCoordinator;
private final Set<TransactionParticipant> txParticipants = Sets.newConcurrentHashSet();
+ private final MeteringAgent monitor;
public NewDefaultTransactionContext(TransactionId transactionId,
DistributedPrimitiveCreator creator,
@@ -43,6 +47,7 @@
this.transactionId = transactionId;
this.creator = creator;
this.transactionCoordinator = transactionCoordinator;
+ this.monitor = new MeteringAgent("transactionContext", "*", true);
}
@Override
@@ -68,9 +73,10 @@
}
@Override
- public boolean commit() {
- transactionCoordinator.commit(transactionId, txParticipants).getNow(null);
- return true;
+ public CompletableFuture<CommitStatus> commit() {
+ final MeteringAgent.Context timer = monitor.startTimer("commit");
+ return transactionCoordinator.commit(transactionId, txParticipants)
+ .whenComplete((r, e) -> timer.stop(e));
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
index 908a35d..1904894 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
@@ -22,6 +22,7 @@
import org.onlab.util.Tools;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.CommitStatus;
/**
* Coordinator for a two-phase commit protocol.
@@ -37,45 +38,47 @@
/**
* Commits a transaction.
*
- * @param transactionId transaction
+ * @param transactionId transaction identifier
* @param transactionParticipants set of transaction participants
* @return future for commit result
*/
- CompletableFuture<Void> commit(TransactionId transactionId, Set<TransactionParticipant> transactionParticipants) {
+ CompletableFuture<CommitStatus> commit(TransactionId transactionId,
+ Set<TransactionParticipant> transactionParticipants) {
if (!transactionParticipants.stream().anyMatch(t -> t.hasPendingUpdates())) {
- return CompletableFuture.completedFuture(null);
+ return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
}
- return transactions.put(transactionId, Transaction.State.PREPARING)
+ CompletableFuture<CommitStatus> status = transactions.put(transactionId, Transaction.State.PREPARING)
.thenCompose(v -> this.doPrepare(transactionParticipants))
.thenCompose(result -> result
? transactions.put(transactionId, Transaction.State.COMMITTING)
.thenCompose(v -> doCommit(transactionParticipants))
- .thenApply(v -> null)
+ .thenApply(v -> CommitStatus.SUCCESS)
: transactions.put(transactionId, Transaction.State.ROLLINGBACK)
.thenCompose(v -> doRollback(transactionParticipants))
- .thenApply(v -> null))
- .thenCompose(v -> transactions.remove(transactionId))
- .thenApply(v -> null);
+ .thenApply(v -> CommitStatus.FAILURE));
+ return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
}
private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
- return Tools.allOf(transactionParticipants
- .stream()
- .map(TransactionParticipant::prepare)
- .collect(Collectors.toList()))
+ return Tools.allOf(transactionParticipants.stream()
+ .filter(TransactionParticipant::hasPendingUpdates)
+ .map(TransactionParticipant::prepare)
+ .collect(Collectors.toList()))
.thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
}
private CompletableFuture<Void> doCommit(Set<TransactionParticipant> transactionParticipants) {
return CompletableFuture.allOf(transactionParticipants.stream()
- .map(p -> p.commit())
+ .filter(TransactionParticipant::hasPendingUpdates)
+ .map(TransactionParticipant::commit)
.toArray(CompletableFuture[]::new));
}
private CompletableFuture<Void> doRollback(Set<TransactionParticipant> transactionParticipants) {
return CompletableFuture.allOf(transactionParticipants.stream()
- .map(p -> p.rollback())
+ .filter(TransactionParticipant::hasPendingUpdates)
+ .map(TransactionParticipant::rollback)
.toArray(CompletableFuture[]::new));
}
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 41f6e25..bb320e4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -87,13 +87,7 @@
}
private void handleEvent(List<MapEvent<String, byte[]>> events) {
- events.forEach(event -> mapEventListeners.forEach(listener -> {
- try {
- listener.event(event);
- } catch (Exception e) {
- log.warn("Error processing map event", e);
- }
- }));
+ events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
}
@Override