[ONOS-6460] Retry failed resource register/unregister/allocate/release transactions until success in ResourceStore
Change-Id: I51661691d4152dddc1b6ea56eae94f85d63a1634
diff --git a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
index 9d040f5..6695ac3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
@@ -22,6 +22,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -51,6 +54,7 @@
import org.onosproject.store.AbstractStore;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.CommitStatus;
+import org.onosproject.store.service.DistributedPrimitive;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TransactionContext;
@@ -120,94 +124,104 @@
resources.forEach(r -> log.trace("registering {}", r));
}
- TransactionContext tx = service.transactionContextBuilder().build();
- tx.begin();
+ // Retry the transaction until successful.
+ while (true) {
+ TransactionContext tx = service.transactionContextBuilder().build();
+ tx.begin();
- // the order is preserved by LinkedHashMap
- Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
- .filter(x -> x.parent().isPresent())
- .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList()));
+ // the order is preserved by LinkedHashMap
+ Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
+ .filter(x -> x.parent().isPresent())
+ .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList()));
- TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
- TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
- for (Map.Entry<DiscreteResource, List<Resource>> entry : resourceMap.entrySet()) {
- DiscreteResourceId parentId = entry.getKey().id();
- if (!discreteTxStore.lookup(parentId).isPresent()) {
- return abortTransaction(tx);
+ TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
+ TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
+ for (Map.Entry<DiscreteResource, List<Resource>> entry : resourceMap.entrySet()) {
+ DiscreteResourceId parentId = entry.getKey().id();
+ if (!discreteTxStore.lookup(parentId).isPresent()) {
+ return abortTransaction(tx);
+ }
+
+ if (!register(discreteTxStore, continuousTxStore, parentId, entry.getValue())) {
+ return abortTransaction(tx);
+ }
}
- if (!register(discreteTxStore, continuousTxStore, parentId, entry.getValue())) {
- return abortTransaction(tx);
+ try {
+ CommitStatus status = commitTransaction(tx);
+ if (status == CommitStatus.SUCCESS) {
+ log.trace("Transaction commit succeeded on registration: resources={}", resources);
+ List<ResourceEvent> events = resources.stream()
+ .filter(x -> x.parent().isPresent())
+ .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
+ .collect(Collectors.toList());
+ notifyDelegate(events);
+ return true;
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.warn("Transaction commit failed on registration", e);
+ return false;
}
}
-
- return tx.commit().whenComplete((status, error) -> {
- if (status == CommitStatus.SUCCESS) {
- log.trace("Transaction commit succeeded on registration: resources={}", resources);
- List<ResourceEvent> events = resources.stream()
- .filter(x -> x.parent().isPresent())
- .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
- .collect(Collectors.toList());
- notifyDelegate(events);
- } else {
- log.warn("Transaction commit failed on registration", error);
- }
- }).join() == CommitStatus.SUCCESS;
}
@Override
public boolean unregister(List<? extends ResourceId> ids) {
checkNotNull(ids);
- TransactionContext tx = service.transactionContextBuilder().build();
- tx.begin();
+ // Retry the transaction until successful.
+ while (true) {
+ TransactionContext tx = service.transactionContextBuilder().build();
+ tx.begin();
- TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
- TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
- // Look up resources by resource IDs
- List<Resource> resources = ids.stream()
- .filter(x -> x.parent().isPresent())
- .map(x -> {
- // avoid access to consistent map in the case of discrete resource
- if (x instanceof DiscreteResourceId) {
- return Optional.of(Resources.discrete((DiscreteResourceId) x).resource());
- } else {
- return continuousTxStore.lookup((ContinuousResourceId) x);
- }
- })
- .flatMap(Tools::stream)
- .collect(Collectors.toList());
- // the order is preserved by LinkedHashMap
- Map<DiscreteResourceId, List<Resource>> resourceMap = resources.stream()
- .collect(Collectors.groupingBy(x -> x.parent().get().id(), LinkedHashMap::new, Collectors.toList()));
+ TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
+ TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
+ // Look up resources by resource IDs
+ List<Resource> resources = ids.stream()
+ .filter(x -> x.parent().isPresent())
+ .map(x -> {
+ // avoid access to consistent map in the case of discrete resource
+ if (x instanceof DiscreteResourceId) {
+ return Optional.of(Resources.discrete((DiscreteResourceId) x).resource());
+ } else {
+ return continuousTxStore.lookup((ContinuousResourceId) x);
+ }
+ })
+ .flatMap(Tools::stream)
+ .collect(Collectors.toList());
+ // the order is preserved by LinkedHashMap
+ Map<DiscreteResourceId, List<Resource>> resourceMap = resources.stream().collect(
+ Collectors.groupingBy(x -> x.parent().get().id(), LinkedHashMap::new, Collectors.toList()));
- for (Map.Entry<DiscreteResourceId, List<Resource>> entry : resourceMap.entrySet()) {
- if (!unregister(discreteTxStore, continuousTxStore, entry.getKey(), entry.getValue())) {
- log.warn("Failed to unregister {}: Failed to remove {} values.",
- entry.getKey(), entry.getValue().size());
- log.debug("Failed to unregister {}: Failed to remove values: {}",
- entry.getKey(), entry.getValue());
- return abortTransaction(tx);
+ for (Map.Entry<DiscreteResourceId, List<Resource>> entry : resourceMap.entrySet()) {
+ if (!unregister(discreteTxStore, continuousTxStore, entry.getKey(), entry.getValue())) {
+ log.warn("Failed to unregister {}: Failed to remove {} values.",
+ entry.getKey(), entry.getValue().size());
+ return abortTransaction(tx);
+ }
}
- }
- return tx.commit().whenComplete((status, error) -> {
- if (status == CommitStatus.SUCCESS) {
- List<ResourceEvent> events = resources.stream()
- .filter(x -> x.parent().isPresent())
- .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
- .collect(Collectors.toList());
- notifyDelegate(events);
- } else {
+ try {
+ CommitStatus status = commitTransaction(tx);
+ if (status == CommitStatus.SUCCESS) {
+ List<ResourceEvent> events = resources.stream()
+ .filter(x -> x.parent().isPresent())
+ .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
+ .collect(Collectors.toList());
+ notifyDelegate(events);
+ return true;
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
String message = resources.stream()
.map(Resource::simpleTypeName)
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
.entrySet().stream()
- .map(e -> String.format("%d %s type resources", e.getValue(), e.getKey()))
+ .map(entry -> String.format("%d %s type resources", entry.getValue(), entry.getKey()))
.collect(Collectors.joining(", "));
- log.warn("Failed to unregister {}: Commit failed.", message, error);
+ log.warn("Failed to unregister {}: {}", message, e);
+ return false;
}
- }).join() == CommitStatus.SUCCESS;
+ }
}
@Override
@@ -215,51 +229,69 @@
checkNotNull(resources);
checkNotNull(consumer);
- TransactionContext tx = service.transactionContextBuilder().build();
- tx.begin();
+ while (true) {
+ TransactionContext tx = service.transactionContextBuilder().build();
+ tx.begin();
- TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
- TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
- for (Resource resource : resources) {
- if (resource instanceof DiscreteResource) {
- if (!discreteTxStore.allocate(consumer.consumerId(), (DiscreteResource) resource)) {
- return abortTransaction(tx);
- }
- } else if (resource instanceof ContinuousResource) {
- if (!continuousTxStore.allocate(consumer.consumerId(), (ContinuousResource) resource)) {
- return abortTransaction(tx);
+ TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
+ TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
+ for (Resource resource : resources) {
+ if (resource instanceof DiscreteResource) {
+ if (!discreteTxStore.allocate(consumer.consumerId(), (DiscreteResource) resource)) {
+ return abortTransaction(tx);
+ }
+ } else if (resource instanceof ContinuousResource) {
+ if (!continuousTxStore.allocate(consumer.consumerId(), (ContinuousResource) resource)) {
+ return abortTransaction(tx);
+ }
}
}
- }
- return tx.commit().join() == CommitStatus.SUCCESS;
+ try {
+ if (commitTransaction(tx) == CommitStatus.SUCCESS) {
+ return true;
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.warn("Failed to allocate {}: {}", resources, e);
+ return false;
+ }
+ }
}
@Override
public boolean release(List<ResourceAllocation> allocations) {
checkNotNull(allocations);
- TransactionContext tx = service.transactionContextBuilder().build();
- tx.begin();
+ while (true) {
+ TransactionContext tx = service.transactionContextBuilder().build();
+ tx.begin();
- TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
- TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
- for (ResourceAllocation allocation : allocations) {
- Resource resource = allocation.resource();
- ResourceConsumerId consumerId = allocation.consumerId();
+ TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
+ TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
+ for (ResourceAllocation allocation : allocations) {
+ Resource resource = allocation.resource();
+ ResourceConsumerId consumerId = allocation.consumerId();
- if (resource instanceof DiscreteResource) {
- if (!discreteTxStore.release(consumerId, (DiscreteResource) resource)) {
- return abortTransaction(tx);
- }
- } else if (resource instanceof ContinuousResource) {
- if (!continuousTxStore.release(consumerId, (ContinuousResource) resource)) {
- return abortTransaction(tx);
+ if (resource instanceof DiscreteResource) {
+ if (!discreteTxStore.release(consumerId, (DiscreteResource) resource)) {
+ return abortTransaction(tx);
+ }
+ } else if (resource instanceof ContinuousResource) {
+ if (!continuousTxStore.release(consumerId, (ContinuousResource) resource)) {
+ return abortTransaction(tx);
+ }
}
}
- }
- return tx.commit().join() == CommitStatus.SUCCESS;
+ try {
+ if (commitTransaction(tx) == CommitStatus.SUCCESS) {
+ return true;
+ }
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.warn("Failed to release {}: {}", allocations, e);
+ return false;
+ }
+ }
}
// computational complexity: O(1) if the resource is discrete type.
@@ -327,6 +359,17 @@
}
/**
+ * Commits a transaction.
+ *
+ * @param tx the transaction to commit
+ * @return the transaction status
+ */
+ private CommitStatus commitTransaction(TransactionContext tx)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ return tx.commit().get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ }
+
+ /**
* Abort the transaction.
*
* @param tx transaction context
diff --git a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/EmptyDiscreteResources.java b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/EmptyDiscreteResources.java
index d956334..68122bd 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/EmptyDiscreteResources.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/EmptyDiscreteResources.java
@@ -20,6 +20,7 @@
import org.onosproject.net.resource.DiscreteResource;
import org.onosproject.net.resource.DiscreteResourceId;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
@@ -64,7 +65,7 @@
@Override
public int hashCode() {
- return INSTANCE.hashCode();
+ return Objects.hash(values());
}
@Override