[ONOS-6460] Retry failed resource register/unregister/allocate/release transactions until success in ResourceStore

Change-Id: I51661691d4152dddc1b6ea56eae94f85d63a1634
diff --git a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
index 9d040f5..6695ac3 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/resource/impl/ConsistentResourceStore.java
@@ -22,6 +22,9 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -51,6 +54,7 @@
 import org.onosproject.store.AbstractStore;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.CommitStatus;
+import org.onosproject.store.service.DistributedPrimitive;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.TransactionContext;
@@ -120,94 +124,104 @@
             resources.forEach(r -> log.trace("registering {}", r));
         }
 
-        TransactionContext tx = service.transactionContextBuilder().build();
-        tx.begin();
+        // Retry the transaction until successful.
+        while (true) {
+            TransactionContext tx = service.transactionContextBuilder().build();
+            tx.begin();
 
-        // the order is preserved by LinkedHashMap
-        Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
-                .filter(x -> x.parent().isPresent())
-                .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList()));
+            // the order is preserved by LinkedHashMap
+            Map<DiscreteResource, List<Resource>> resourceMap = resources.stream()
+                    .filter(x -> x.parent().isPresent())
+                    .collect(Collectors.groupingBy(x -> x.parent().get(), LinkedHashMap::new, Collectors.toList()));
 
-        TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
-        TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
-        for (Map.Entry<DiscreteResource, List<Resource>> entry : resourceMap.entrySet()) {
-            DiscreteResourceId parentId = entry.getKey().id();
-            if (!discreteTxStore.lookup(parentId).isPresent()) {
-                return abortTransaction(tx);
+            TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
+            TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
+            for (Map.Entry<DiscreteResource, List<Resource>> entry : resourceMap.entrySet()) {
+                DiscreteResourceId parentId = entry.getKey().id();
+                if (!discreteTxStore.lookup(parentId).isPresent()) {
+                    return abortTransaction(tx);
+                }
+
+                if (!register(discreteTxStore, continuousTxStore, parentId, entry.getValue())) {
+                    return abortTransaction(tx);
+                }
             }
 
-            if (!register(discreteTxStore, continuousTxStore, parentId, entry.getValue())) {
-                return abortTransaction(tx);
+            try {
+                CommitStatus status = commitTransaction(tx);
+                if (status == CommitStatus.SUCCESS) {
+                    log.trace("Transaction commit succeeded on registration: resources={}", resources);
+                    List<ResourceEvent> events = resources.stream()
+                            .filter(x -> x.parent().isPresent())
+                            .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
+                            .collect(Collectors.toList());
+                    notifyDelegate(events);
+                    return true;
+                }
+            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                log.warn("Transaction commit failed on registration", e);
+                return false;
             }
         }
-
-        return tx.commit().whenComplete((status, error) -> {
-            if (status == CommitStatus.SUCCESS) {
-                log.trace("Transaction commit succeeded on registration: resources={}", resources);
-                List<ResourceEvent> events = resources.stream()
-                        .filter(x -> x.parent().isPresent())
-                        .map(x -> new ResourceEvent(RESOURCE_ADDED, x))
-                        .collect(Collectors.toList());
-                notifyDelegate(events);
-            } else {
-                log.warn("Transaction commit failed on registration", error);
-            }
-        }).join() == CommitStatus.SUCCESS;
     }
 
     @Override
     public boolean unregister(List<? extends ResourceId> ids) {
         checkNotNull(ids);
 
-        TransactionContext tx = service.transactionContextBuilder().build();
-        tx.begin();
+        // Retry the transaction until successful.
+        while (true) {
+            TransactionContext tx = service.transactionContextBuilder().build();
+            tx.begin();
 
-        TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
-        TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
-        // Look up resources by resource IDs
-        List<Resource> resources = ids.stream()
-                .filter(x -> x.parent().isPresent())
-                .map(x -> {
-                    // avoid access to consistent map in the case of discrete resource
-                    if (x instanceof DiscreteResourceId) {
-                        return Optional.of(Resources.discrete((DiscreteResourceId) x).resource());
-                    } else {
-                        return continuousTxStore.lookup((ContinuousResourceId) x);
-                    }
-                })
-                .flatMap(Tools::stream)
-                .collect(Collectors.toList());
-        // the order is preserved by LinkedHashMap
-        Map<DiscreteResourceId, List<Resource>> resourceMap = resources.stream()
-                .collect(Collectors.groupingBy(x -> x.parent().get().id(), LinkedHashMap::new, Collectors.toList()));
+            TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
+            TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
+            // Look up resources by resource IDs
+            List<Resource> resources = ids.stream()
+                    .filter(x -> x.parent().isPresent())
+                    .map(x -> {
+                        // avoid access to consistent map in the case of discrete resource
+                        if (x instanceof DiscreteResourceId) {
+                            return Optional.of(Resources.discrete((DiscreteResourceId) x).resource());
+                        } else {
+                            return continuousTxStore.lookup((ContinuousResourceId) x);
+                        }
+                    })
+                    .flatMap(Tools::stream)
+                    .collect(Collectors.toList());
+            // the order is preserved by LinkedHashMap
+            Map<DiscreteResourceId, List<Resource>> resourceMap = resources.stream().collect(
+                    Collectors.groupingBy(x -> x.parent().get().id(), LinkedHashMap::new, Collectors.toList()));
 
-        for (Map.Entry<DiscreteResourceId, List<Resource>> entry : resourceMap.entrySet()) {
-            if (!unregister(discreteTxStore, continuousTxStore, entry.getKey(), entry.getValue())) {
-                log.warn("Failed to unregister {}: Failed to remove {} values.",
-                        entry.getKey(), entry.getValue().size());
-                log.debug("Failed to unregister {}: Failed to remove values: {}",
-                        entry.getKey(), entry.getValue());
-                return abortTransaction(tx);
+            for (Map.Entry<DiscreteResourceId, List<Resource>> entry : resourceMap.entrySet()) {
+                if (!unregister(discreteTxStore, continuousTxStore, entry.getKey(), entry.getValue())) {
+                    log.warn("Failed to unregister {}: Failed to remove {} values.",
+                            entry.getKey(), entry.getValue().size());
+                    return abortTransaction(tx);
+                }
             }
-        }
 
-        return tx.commit().whenComplete((status, error) -> {
-            if (status == CommitStatus.SUCCESS) {
-                List<ResourceEvent> events = resources.stream()
-                        .filter(x -> x.parent().isPresent())
-                        .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
-                        .collect(Collectors.toList());
-                notifyDelegate(events);
-            } else {
+            try {
+                CommitStatus status = commitTransaction(tx);
+                if (status == CommitStatus.SUCCESS) {
+                    List<ResourceEvent> events = resources.stream()
+                            .filter(x -> x.parent().isPresent())
+                            .map(x -> new ResourceEvent(RESOURCE_REMOVED, x))
+                            .collect(Collectors.toList());
+                    notifyDelegate(events);
+                    return true;
+                }
+            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                 String message = resources.stream()
                         .map(Resource::simpleTypeName)
                         .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
                         .entrySet().stream()
-                        .map(e -> String.format("%d %s type resources", e.getValue(), e.getKey()))
+                        .map(entry -> String.format("%d %s type resources", entry.getValue(), entry.getKey()))
                         .collect(Collectors.joining(", "));
-                log.warn("Failed to unregister {}: Commit failed.", message, error);
+                log.warn("Failed to unregister {}: {}", message, e);
+                return false;
             }
-        }).join() == CommitStatus.SUCCESS;
+        }
     }
 
     @Override
@@ -215,51 +229,69 @@
         checkNotNull(resources);
         checkNotNull(consumer);
 
-        TransactionContext tx = service.transactionContextBuilder().build();
-        tx.begin();
+        while (true) {
+            TransactionContext tx = service.transactionContextBuilder().build();
+            tx.begin();
 
-        TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
-        TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
-        for (Resource resource : resources) {
-            if (resource instanceof DiscreteResource) {
-                if (!discreteTxStore.allocate(consumer.consumerId(), (DiscreteResource) resource)) {
-                    return abortTransaction(tx);
-                }
-            } else if (resource instanceof ContinuousResource) {
-                if (!continuousTxStore.allocate(consumer.consumerId(), (ContinuousResource) resource)) {
-                    return abortTransaction(tx);
+            TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
+            TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
+            for (Resource resource : resources) {
+                if (resource instanceof DiscreteResource) {
+                    if (!discreteTxStore.allocate(consumer.consumerId(), (DiscreteResource) resource)) {
+                        return abortTransaction(tx);
+                    }
+                } else if (resource instanceof ContinuousResource) {
+                    if (!continuousTxStore.allocate(consumer.consumerId(), (ContinuousResource) resource)) {
+                        return abortTransaction(tx);
+                    }
                 }
             }
-        }
 
-        return tx.commit().join() == CommitStatus.SUCCESS;
+            try {
+                if (commitTransaction(tx) == CommitStatus.SUCCESS) {
+                    return true;
+                }
+            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                log.warn("Failed to allocate {}: {}", resources, e);
+                return false;
+            }
+        }
     }
 
     @Override
     public boolean release(List<ResourceAllocation> allocations) {
         checkNotNull(allocations);
 
-        TransactionContext tx = service.transactionContextBuilder().build();
-        tx.begin();
+        while (true) {
+            TransactionContext tx = service.transactionContextBuilder().build();
+            tx.begin();
 
-        TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
-        TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
-        for (ResourceAllocation allocation : allocations) {
-            Resource resource = allocation.resource();
-            ResourceConsumerId consumerId = allocation.consumerId();
+            TransactionalDiscreteResourceSubStore discreteTxStore = discreteStore.transactional(tx);
+            TransactionalContinuousResourceSubStore continuousTxStore = continuousStore.transactional(tx);
+            for (ResourceAllocation allocation : allocations) {
+                Resource resource = allocation.resource();
+                ResourceConsumerId consumerId = allocation.consumerId();
 
-            if (resource instanceof DiscreteResource) {
-                if (!discreteTxStore.release(consumerId, (DiscreteResource) resource)) {
-                    return abortTransaction(tx);
-                }
-            } else if (resource instanceof ContinuousResource) {
-                if (!continuousTxStore.release(consumerId, (ContinuousResource) resource)) {
-                    return abortTransaction(tx);
+                if (resource instanceof DiscreteResource) {
+                    if (!discreteTxStore.release(consumerId, (DiscreteResource) resource)) {
+                        return abortTransaction(tx);
+                    }
+                } else if (resource instanceof ContinuousResource) {
+                    if (!continuousTxStore.release(consumerId, (ContinuousResource) resource)) {
+                        return abortTransaction(tx);
+                    }
                 }
             }
-        }
 
-        return tx.commit().join() == CommitStatus.SUCCESS;
+            try {
+                if (commitTransaction(tx) == CommitStatus.SUCCESS) {
+                    return true;
+                }
+            } catch (InterruptedException | ExecutionException | TimeoutException e) {
+                log.warn("Failed to release {}: {}", allocations, e);
+                return false;
+            }
+        }
     }
 
     // computational complexity: O(1) if the resource is discrete type.
@@ -327,6 +359,17 @@
     }
 
     /**
+     * Commits a transaction.
+     *
+     * @param tx the transaction to commit
+     * @return the transaction status
+     */
+    private CommitStatus commitTransaction(TransactionContext tx)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        return tx.commit().get(DistributedPrimitive.DEFAULT_OPERTATION_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+    }
+
+    /**
      * Abort the transaction.
      *
      * @param tx transaction context