ONOS-4218: Fixes for resource store transaction failures
Change-Id: Ie48bb04d7daf6ed7b63c33a3c3c2703496179aa6
diff --git a/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
index c8151e3..8f7cd1f 100644
--- a/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
+++ b/core/api/src/main/java/org/onosproject/store/primitives/MapUpdate.java
@@ -21,6 +21,8 @@
import java.util.function.Function;
+import org.onlab.util.ByteArraySizeHashPrinter;
+
import com.google.common.base.MoreObjects;
/**
@@ -153,7 +155,7 @@
.add("mapName", mapName)
.add("type", type)
.add("key", key)
- .add("value", value)
+ .add("value", value instanceof byte[] ? new ByteArraySizeHashPrinter((byte[]) value) : value)
.add("currentValue", currentValue)
.add("currentVersion", currentVersion)
.toString();
diff --git a/core/api/src/main/java/org/onosproject/store/service/CommitStatus.java b/core/api/src/main/java/org/onosproject/store/service/CommitStatus.java
new file mode 100644
index 0000000..8e86184
--- /dev/null
+++ b/core/api/src/main/java/org/onosproject/store/service/CommitStatus.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2016 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+/**
+ * Completion status of transaction.
+ */
+public enum CommitStatus {
+ /**
+ * Indicates a successfully completed transaction with all the updates committed.
+ */
+ SUCCESS,
+
+ /**
+ * Indicates a aborted transaction i.e. no updates were committed.
+ */
+ FAILURE
+}
\ No newline at end of file
diff --git a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
index 3027fcf..8f37caf 100644
--- a/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
+++ b/core/api/src/main/java/org/onosproject/store/service/TransactionContext.java
@@ -16,6 +16,8 @@
package org.onosproject.store.service;
+import java.util.concurrent.CompletableFuture;
+
import org.onosproject.store.primitives.TransactionId;
/**
@@ -63,9 +65,9 @@
* Commits a transaction that was previously started thereby making its changes permanent
* and externally visible.
*
- * @return true if this transaction succeeded, otherwise false.
+ * @return A future that will be completed when the operation completes
*/
- boolean commit();
+ CompletableFuture<CommitStatus> commit();
/**
* Aborts any changes made in this transaction context and discarding all locally cached updates.
diff --git a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
index aed07cd..e7c27bc 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
@@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
@@ -41,6 +42,7 @@
import org.onosproject.net.resource.Resources;
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.CommitStatus;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.ConsistentMapException;
import org.onosproject.store.service.Serializer;
@@ -178,18 +180,18 @@
}
}
- boolean success = tx.commit();
- if (success) {
- log.trace("Transaction commit succeeded on registration: resources={}", resources);
- List<ResourceEvent> events = resources.stream()
- .filter(x -> x.parent().isPresent())
- .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
- .collect(Collectors.toList());
- notifyDelegate(events);
- } else {
- log.debug("Transaction commit failed on registration: resources={}", resources);
- }
- return success;
+ return tx.commit().whenComplete((status, error) -> {
+ if (status == CommitStatus.SUCCESS) {
+ log.trace("Transaction commit succeeded on registration: resources={}", resources);
+ List<ResourceEvent> events = resources.stream()
+ .filter(x -> x.parent().isPresent())
+ .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
+ .collect(Collectors.toList());
+ notifyDelegate(events);
+ } else {
+ log.warn("Transaction commit failed on registration", error);
+ }
+ }).join() == CommitStatus.SUCCESS;
}
@Override
@@ -252,17 +254,17 @@
}
}
- boolean success = tx.commit();
- if (success) {
- List<ResourceEvent> events = resources.stream()
- .filter(x -> x.parent().isPresent())
- .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
- .collect(Collectors.toList());
- notifyDelegate(events);
- } else {
- log.warn("Failed to unregister {}: Commit failed.", ids);
- }
- return success;
+ return tx.commit().whenComplete((status, error) -> {
+ if (status == CommitStatus.SUCCESS) {
+ List<ResourceEvent> events = resources.stream()
+ .filter(x -> x.parent().isPresent())
+ .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
+ .collect(Collectors.toList());
+ notifyDelegate(events);
+ } else {
+ log.warn("Failed to unregister {}: Commit failed.", ids, error);
+ }
+ }).join() == CommitStatus.SUCCESS;
}
@Override
@@ -308,7 +310,7 @@
}
}
- return tx.commit();
+ return tx.commit().join() == CommitStatus.SUCCESS;
}
@Override
@@ -348,7 +350,7 @@
}
}
- return tx.commit();
+ return tx.commit().join() == CommitStatus.SUCCESS;
}
// computational complexity: O(1) if the resource is discrete type.
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
index 43cacb3..30a6f26 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/DefaultTransactionContext.java
@@ -27,6 +27,7 @@
import org.onosproject.store.primitives.MapUpdate;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.primitives.resources.impl.CommitResult;
+import org.onosproject.store.service.CommitStatus;
import org.onosproject.store.service.ConsistentMapBuilder;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
@@ -96,22 +97,23 @@
@SuppressWarnings("unchecked")
@Override
- public boolean commit() {
+ public CompletableFuture<CommitStatus> commit() {
// TODO: rework commit implementation to be more intuitive
checkState(isOpen, TX_NOT_OPEN_ERROR);
- CommitResult result = null;
+ CommitStatus status;
try {
List<MapUpdate<String, byte[]>> updates = Lists.newLinkedList();
txMaps.values().forEach(m -> updates.addAll(m.toMapUpdates()));
Transaction transaction = new Transaction(transactionId, updates);
- result = Futures.getUnchecked(transactionCommitter.apply(transaction));
- return result == CommitResult.OK;
+ status = Futures.getUnchecked(transactionCommitter.apply(transaction)) == CommitResult.OK
+ ? CommitStatus.SUCCESS : CommitStatus.FAILURE;
} catch (Exception e) {
abort();
- return false;
+ status = CommitStatus.FAILURE;
} finally {
isOpen = false;
}
+ return CompletableFuture.completedFuture(status);
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
index 39d24cf..e9e75a5 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/NewDefaultTransactionContext.java
@@ -16,13 +16,16 @@
package org.onosproject.store.primitives.impl;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.onosproject.store.primitives.DistributedPrimitiveCreator;
import org.onosproject.store.primitives.TransactionId;
+import org.onosproject.store.service.CommitStatus;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.TransactionContext;
import org.onosproject.store.service.TransactionalMap;
+import org.onosproject.utils.MeteringAgent;
import com.google.common.collect.Sets;
@@ -36,6 +39,7 @@
private final TransactionId transactionId;
private final TransactionCoordinator transactionCoordinator;
private final Set<TransactionParticipant> txParticipants = Sets.newConcurrentHashSet();
+ private final MeteringAgent monitor;
public NewDefaultTransactionContext(TransactionId transactionId,
DistributedPrimitiveCreator creator,
@@ -43,6 +47,7 @@
this.transactionId = transactionId;
this.creator = creator;
this.transactionCoordinator = transactionCoordinator;
+ this.monitor = new MeteringAgent("transactionContext", "*", true);
}
@Override
@@ -68,9 +73,10 @@
}
@Override
- public boolean commit() {
- transactionCoordinator.commit(transactionId, txParticipants).getNow(null);
- return true;
+ public CompletableFuture<CommitStatus> commit() {
+ final MeteringAgent.Context timer = monitor.startTimer("commit");
+ return transactionCoordinator.commit(transactionId, txParticipants)
+ .whenComplete((r, e) -> timer.stop(e));
}
@Override
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
index 908a35d..1904894 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/TransactionCoordinator.java
@@ -22,6 +22,7 @@
import org.onlab.util.Tools;
import org.onosproject.store.primitives.TransactionId;
import org.onosproject.store.service.AsyncConsistentMap;
+import org.onosproject.store.service.CommitStatus;
/**
* Coordinator for a two-phase commit protocol.
@@ -37,45 +38,47 @@
/**
* Commits a transaction.
*
- * @param transactionId transaction
+ * @param transactionId transaction identifier
* @param transactionParticipants set of transaction participants
* @return future for commit result
*/
- CompletableFuture<Void> commit(TransactionId transactionId, Set<TransactionParticipant> transactionParticipants) {
+ CompletableFuture<CommitStatus> commit(TransactionId transactionId,
+ Set<TransactionParticipant> transactionParticipants) {
if (!transactionParticipants.stream().anyMatch(t -> t.hasPendingUpdates())) {
- return CompletableFuture.completedFuture(null);
+ return CompletableFuture.completedFuture(CommitStatus.SUCCESS);
}
- return transactions.put(transactionId, Transaction.State.PREPARING)
+ CompletableFuture<CommitStatus> status = transactions.put(transactionId, Transaction.State.PREPARING)
.thenCompose(v -> this.doPrepare(transactionParticipants))
.thenCompose(result -> result
? transactions.put(transactionId, Transaction.State.COMMITTING)
.thenCompose(v -> doCommit(transactionParticipants))
- .thenApply(v -> null)
+ .thenApply(v -> CommitStatus.SUCCESS)
: transactions.put(transactionId, Transaction.State.ROLLINGBACK)
.thenCompose(v -> doRollback(transactionParticipants))
- .thenApply(v -> null))
- .thenCompose(v -> transactions.remove(transactionId))
- .thenApply(v -> null);
+ .thenApply(v -> CommitStatus.FAILURE));
+ return status.thenCompose(v -> transactions.remove(transactionId).thenApply(u -> v));
}
private CompletableFuture<Boolean> doPrepare(Set<TransactionParticipant> transactionParticipants) {
- return Tools.allOf(transactionParticipants
- .stream()
- .map(TransactionParticipant::prepare)
- .collect(Collectors.toList()))
+ return Tools.allOf(transactionParticipants.stream()
+ .filter(TransactionParticipant::hasPendingUpdates)
+ .map(TransactionParticipant::prepare)
+ .collect(Collectors.toList()))
.thenApply(list -> list.stream().reduce(Boolean::logicalAnd).orElse(true));
}
private CompletableFuture<Void> doCommit(Set<TransactionParticipant> transactionParticipants) {
return CompletableFuture.allOf(transactionParticipants.stream()
- .map(p -> p.commit())
+ .filter(TransactionParticipant::hasPendingUpdates)
+ .map(TransactionParticipant::commit)
.toArray(CompletableFuture[]::new));
}
private CompletableFuture<Void> doRollback(Set<TransactionParticipant> transactionParticipants) {
return CompletableFuture.allOf(transactionParticipants.stream()
- .map(p -> p.rollback())
+ .filter(TransactionParticipant::hasPendingUpdates)
+ .map(TransactionParticipant::rollback)
.toArray(CompletableFuture[]::new));
}
}
\ No newline at end of file
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
index 41f6e25..bb320e4 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/resources/impl/AtomixConsistentMap.java
@@ -87,13 +87,7 @@
}
private void handleEvent(List<MapEvent<String, byte[]>> events) {
- events.forEach(event -> mapEventListeners.forEach(listener -> {
- try {
- listener.event(event);
- } catch (Exception e) {
- log.warn("Error processing map event", e);
- }
- }));
+ events.forEach(event -> mapEventListeners.forEach(listener -> listener.event(event)));
}
@Override