Store flows rules asynchronously via master in DistributedFlowRuleStore

Change-Id: I5e41a982204e395d2a1ac05017905069d61dc702
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestAsyncDocumentTree.java b/core/api/src/test/java/org/onosproject/store/service/TestAsyncDocumentTree.java
new file mode 100644
index 0000000..f1d4fec
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/TestAsyncDocumentTree.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
+
+/**
+ * Test asynchronous document tree.
+ */
+public class TestAsyncDocumentTree<V> implements AsyncDocumentTree<V> {
+    private final DocumentTree<V> tree;
+
+    public TestAsyncDocumentTree(String name) {
+        this.tree = new TestDocumentTree<>(name);
+    }
+
+    @Override
+    public String name() {
+        return tree.name();
+    }
+
+    @Override
+    public DocumentPath root() {
+        return tree.root();
+    }
+
+    @Override
+    public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
+        return CompletableFuture.completedFuture(tree.getChildren(path));
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> get(DocumentPath path) {
+        return CompletableFuture.completedFuture(tree.get(path));
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
+        return CompletableFuture.completedFuture(tree.set(path, value));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> create(DocumentPath path, V value) {
+        return CompletableFuture.completedFuture(tree.create(path, value));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) {
+        return CompletableFuture.completedFuture(tree.createRecursive(path, value));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
+        return CompletableFuture.completedFuture(tree.replace(path, newValue, version));
+    }
+
+    @Override
+    public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
+        return CompletableFuture.completedFuture(tree.replace(path, newValue, currentValue));
+    }
+
+    @Override
+    public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
+        return CompletableFuture.completedFuture(tree.removeNode(path));
+    }
+
+    @Override
+    public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
+        tree.addListener(path, listener);
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
+        tree.removeListener(listener);
+        return CompletableFuture.completedFuture(null);
+    }
+
+    @Override
+    public CompletableFuture<Version> begin(TransactionId transactionId) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<V>> transactionLog) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<V>> transactionLog) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> commit(TransactionId transactionId) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> rollback(TransactionId transactionId) {
+        return null;
+    }
+
+    @Override
+    public DocumentTree<V> asDocumentTree() {
+        return tree;
+    }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 7b6ae55..3382abe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -19,9 +19,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -69,6 +72,7 @@
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.impl.MastershipBasedTimestamp;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.DocumentPath;
 import org.onosproject.store.service.DocumentTree;
 import org.onosproject.store.service.EventuallyConsistentMap;
@@ -108,12 +112,13 @@
         RETRY.setStackTrace(new StackTraceElement[0]);
     }
 
+    private static final int SCHEDULED_THREAD_POOL_SIZE = 8;
     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
     private static final int MAX_RETRY_DELAY_MILLIS = 50;
 
     private static final String FLOW_TABLE = "onos-flow-table";
 
-    private static final MessageSubject APPLY_FLOWS = new MessageSubject("onos-flow-apply");
+    private static final MessageSubject APPLY_BATCH_FLOWS = new MessageSubject("onos-flow-apply");
     private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -145,8 +150,11 @@
             new InternalTableStatsListener();
 
     private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
+    private ScheduledExecutorService scheduledExecutor;
     private ExecutorService messageHandlingExecutor;
+    private final Random random = new Random();
 
+    private AsyncDocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> asyncFlows;
     private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
     private IdGenerator idGenerator;
     private NodeId local;
@@ -157,6 +165,10 @@
 
         local = clusterService.getLocalNode().id();
 
+        scheduledExecutor = Executors.newScheduledThreadPool(
+                SCHEDULED_THREAD_POOL_SIZE,
+                groupedThreads("onos/store/flow", "schedulers", log));
+
         messageHandlingExecutor = Executors.newFixedThreadPool(
                 MESSAGE_HANDLER_THREAD_POOL_SIZE,
                 groupedThreads("onos/store/flow", "message-handlers", log));
@@ -170,16 +182,16 @@
                 .build();
         deviceTableStats.addListener(tableStatsListener);
 
-        flows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
+        asyncFlows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
                 .withName(FLOW_TABLE)
                 .withSerializer(serializer)
-                .buildDocumentTree()
-                .asDocumentTree();
+                .buildDocumentTree();
+        flows = asyncFlows.asDocumentTree();
 
         clusterCommunicator.addSubscriber(
-                APPLY_FLOWS,
+                APPLY_BATCH_FLOWS,
                 serializer::decode,
-                this::applyFlows,
+                this::applyBatchFlows,
                 messageHandlingExecutor);
         clusterCommunicator.addSubscriber(
                 COMPLETE_BATCH,
@@ -194,9 +206,10 @@
     public void deactivate() {
         deviceTableStats.removeListener(tableStatsListener);
         deviceTableStats.destroy();
-        clusterCommunicator.removeSubscriber(APPLY_FLOWS);
+        clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
         clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
         messageHandlingExecutor.shutdownNow();
+        scheduledExecutor.shutdownNow();
         log.info("Stopped");
     }
 
@@ -220,6 +233,52 @@
     }
 
     /**
+     * Retries the given asynchronous supplier until successful.
+     * <p>
+     * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
+     * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
+     *
+     * @param supplier the supplier to retry
+     * @param <T> the return type
+     * @return the return value of the given supplier once it runs successfully
+     */
+    private <T> CompletableFuture<T> retryAsyncUntilSuccess(Supplier<CompletableFuture<T>> supplier) {
+        return retryAsyncUntilSuccess(supplier, new CompletableFuture<>());
+    }
+
+    /**
+     * Retries the given asynchronous supplier until successful.
+     * <p>
+     * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
+     * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
+     *
+     * @param supplier the supplier to retry
+     * @param future future to be completed once the operation has been successful
+     * @param <T> the return type
+     * @return the return value of the given supplier once it runs successfully
+     */
+    private <T> CompletableFuture<T> retryAsyncUntilSuccess(
+            Supplier<CompletableFuture<T>> supplier,
+            CompletableFuture<T> future) {
+        supplier.get().whenComplete((result, error) -> {
+            if (error == null) {
+                future.complete(result);
+            } else {
+                Throwable cause = error.getCause() != null ? error.getCause() : error;
+                if (cause instanceof StorageException.ConcurrentModification) {
+                    scheduledExecutor.schedule(
+                            () -> retryAsyncUntilSuccess(supplier, future),
+                            random.nextInt(50),
+                            TimeUnit.MILLISECONDS);
+                } else {
+                    future.completeExceptionally(error);
+                }
+            }
+        });
+        return future;
+    }
+
+    /**
      * Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
      * retried after a randomized delay.
      *
@@ -232,19 +291,6 @@
     }
 
     /**
-     * Handles a flow rule batch event forwarded to the master node.
-     * <p>
-     * If this node is the master for the associated device, notifies event listeners to install flow rules.
-     *
-     * @param event the event to handle
-     */
-    private void applyFlows(FlowRuleBatchEvent event) {
-        if (mastershipService.isLocalMaster(event.deviceId())) {
-            notifyDelegate(event);
-        }
-    }
-
-    /**
      * Handles a completed batch event received from the master node.
      * <p>
      * If this node is the source of the batch, notifies event listeners to complete the operations.
@@ -338,106 +384,159 @@
         if (master == null) {
             log.warn("No master for {} ", deviceId);
 
-            updateStoreInternal(operation);
-
-            notifyDelegate(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+            updateStoreInternal(operation).whenComplete((result, error) -> {
+                notifyDelegate(FlowRuleBatchEvent.completed(
+                        new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                        new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+            });
             return;
         }
 
         pendingBatches.add(operation.id());
 
-        Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
-        if (currentOps.isEmpty()) {
-            batchOperationComplete(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), deviceId)));
-        } else if (Objects.equals(local, master)) {
-            notifyDelegate(FlowRuleBatchEvent.requested(
-                    new FlowRuleBatchRequest(operation.id(), currentOps),
-                    operation.deviceId()));
+        // If the local node is the master, apply the flows. Otherwise, send them to the master.
+        if (Objects.equals(local, master)) {
+            applyBatchFlows(operation);
         } else {
-            clusterCommunicator.unicast(FlowRuleBatchEvent.requested(
-                    new FlowRuleBatchRequest(operation.id(), currentOps),
-                    operation.deviceId()),
-                    APPLY_FLOWS,
+            log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", master, deviceId);
+            clusterCommunicator.unicast(
+                    operation,
+                    APPLY_BATCH_FLOWS,
                     serializer::encode,
                     master);
         }
     }
 
-    private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
-        return operation.getOperations().stream().map(
-                op -> {
-                    switch (op.operator()) {
-                        case ADD:
-                            addBatchEntry(op);
-                            return op;
-                        case REMOVE:
-                            if (removeBatchEntry(op)) {
-                                return op;
-                            }
-                            return null;
-                        case MODIFY:
-                            //TODO: figure this out at some point
-                            break;
-                        default:
-                            log.warn("Unknown flow operation operator: {}", op.operator());
-                    }
-                    return null;
-                }
-        ).filter(Objects::nonNull).collect(Collectors.toSet());
-    }
-
-    @SuppressWarnings("unchecked")
-    private void addBatchEntry(FlowRuleBatchEntry batchEntry) {
-        StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
-        DocumentPath path = getPathFor(entry.deviceId(), entry.id());
-        retryUntilSuccess(() -> {
-            Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
-            if (value != null) {
-                Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
-                entries.put(entry, entry);
-                if (flows.replace(path, entries, value.version())) {
-                    log.trace("Stored new flow rule: {}", entry);
-                    return null;
+    /**
+     * Asynchronously applies a batch of flows to the store.
+     * <p>
+     * This operation is performed on the master node to ensure that events occur <em>after</em> flows have been stored
+     * and are visible to the master node. If a non-master node stores flows and then triggers events on the master,
+     * the flows may not yet be visible to the master node due to the nature of sequentially consistent reads on the
+     * underlying {@code DocumentTree} primitive.
+     */
+    private void applyBatchFlows(FlowRuleBatchOperation operation) {
+        updateStoreInternal(operation).whenComplete((operations, error) -> {
+            if (error == null) {
+                if (operations.isEmpty()) {
+                    batchOperationComplete(FlowRuleBatchEvent.completed(
+                            new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                            new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
                 } else {
-                    log.trace("Failed to store new flow rule: {}", entry);
-                    return retry();
+                    notifyDelegate(FlowRuleBatchEvent.requested(
+                            new FlowRuleBatchRequest(operation.id(), operations),
+                            operation.deviceId()));
                 }
-            } else {
-                // If there are no entries stored for the device, initialize the device's flows.
-                flows.createRecursive(path, Maps.newHashMap());
-                return retry();
             }
         });
     }
 
+    private CompletableFuture<Set<FlowRuleBatchEntry>> updateStoreInternal(FlowRuleBatchOperation operation) {
+        return Tools.allOf(operation.getOperations().stream().map(op -> {
+            switch (op.operator()) {
+                case ADD:
+                    return addBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
+                case REMOVE:
+                    return removeBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
+                case MODIFY:
+                    return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
+                default:
+                    log.warn("Unknown flow operation operator: {}", op.operator());
+                    return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
+            }
+        }).collect(Collectors.toList()))
+                .thenApply(results -> results.stream()
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toSet()));
+    }
+
     @SuppressWarnings("unchecked")
-    private boolean removeBatchEntry(FlowRuleBatchEntry batchEntry) {
-        FlowRule rule = batchEntry.target();
-        DocumentPath path = getPathFor(rule.deviceId(), rule.id());
-        return retryUntilSuccess(() -> {
-            Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
-            if (value != null) {
-                Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
-                StoredFlowEntry entry = entries.get(rule);
-                if (entry != null) {
-                    entry.setState(FlowEntryState.PENDING_REMOVE);
-                    if (flows.replace(path, entries, value.version())) {
-                        log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
-                        return true;
+    private CompletableFuture<Boolean> addBatchEntry(FlowRuleBatchEntry batchEntry) {
+        StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
+        DocumentPath path = getPathFor(entry.deviceId(), entry.id());
+        return retryAsyncUntilSuccess(() -> {
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            asyncFlows.get(path).whenComplete((value, getError) -> {
+                if (getError == null) {
+                    if (value != null) {
+                        Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+                        entries.put(entry, entry);
+                        asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, replaceError) -> {
+                            if (replaceError == null) {
+                                if (succeeded) {
+                                    log.trace("Stored new flow rule: {}", entry);
+                                    future.complete(true);
+                                } else {
+                                    log.trace("Failed to store new flow rule: {}", entry);
+                                    future.completeExceptionally(RETRY);
+                                }
+                            } else {
+                                future.completeExceptionally(replaceError);
+                            }
+                        });
                     } else {
-                        log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
-                        return retry();
+                        // If there are no entries stored for the device, initialize the device's flows.
+                        Map<StoredFlowEntry, StoredFlowEntry> map = Maps.newHashMap();
+                        map.put(entry, entry);
+                        asyncFlows.createRecursive(path, map).whenComplete((succeeded, createError) -> {
+                            if (createError == null) {
+                                if (succeeded) {
+                                    log.trace("Stored new flow rule: {}", entry);
+                                    future.complete(true);
+                                } else {
+                                    log.trace("Failed to store new flow rule: {}", entry);
+                                    future.completeExceptionally(RETRY);
+                                }
+                            } else {
+                                future.completeExceptionally(createError);
+                            }
+                        });
                     }
                 } else {
-                    return false;
+                    future.completeExceptionally(getError);
                 }
-            } else {
-                return false;
-            }
+            });
+            return future;
+        });
+    }
+
+    @SuppressWarnings("unchecked")
+    private CompletableFuture<Boolean> removeBatchEntry(FlowRuleBatchEntry batchEntry) {
+        FlowRule rule = batchEntry.target();
+        DocumentPath path = getPathFor(rule.deviceId(), rule.id());
+        return retryAsyncUntilSuccess(() -> {
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            asyncFlows.get(path).whenComplete((value, getError) -> {
+                if (getError == null) {
+                    if (value != null) {
+                        Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+                        StoredFlowEntry entry = entries.get(rule);
+                        if (entry != null) {
+                            entry.setState(FlowEntryState.PENDING_REMOVE);
+                            asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, error) -> {
+                                if (error == null) {
+                                    if (succeeded) {
+                                        log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
+                                        future.complete(true);
+                                    } else {
+                                        log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
+                                        future.completeExceptionally(RETRY);
+                                    }
+                                } else {
+                                    future.completeExceptionally(error);
+                                }
+                            });
+                        } else {
+                            future.complete(false);
+                        }
+                    } else {
+                        future.complete(false);
+                    }
+                } else {
+                    future.completeExceptionally(getError);
+                }
+            });
+            return future;
         });
     }
 
@@ -528,9 +627,7 @@
                     return null;
                 }
             } else {
-                // If there are no entries stored for the device, initialize the device's flows.
-                flows.createRecursive(path, Maps.newHashMap());
-                return retry();
+                return null;
             }
         });
     }
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
index d49f1cc..8144d22 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
@@ -42,11 +42,9 @@
 import org.onosproject.net.intent.IntentTestsMocks;
 import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 import org.onosproject.store.service.AsyncDocumentTree;
-import org.onosproject.store.service.AsyncDocumentTreeAdapter;
-import org.onosproject.store.service.DocumentTree;
 import org.onosproject.store.service.DocumentTreeBuilder;
 import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.TestDocumentTree;
+import org.onosproject.store.service.TestAsyncDocumentTree;
 import org.onosproject.store.service.TestStorageService;
 import org.onosproject.store.service.TestTopic;
 import org.onosproject.store.service.Topic;
@@ -145,13 +143,7 @@
                 @Override
                 @SuppressWarnings("unchecked")
                 public AsyncDocumentTree<V> build() {
-                    String name = name();
-                    return new AsyncDocumentTreeAdapter() {
-                        @Override
-                        public DocumentTree asDocumentTree() {
-                            return new TestDocumentTree(name);
-                        }
-                    };
+                    return new TestAsyncDocumentTree<>(name());
                 }
             };
         }
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index ed46a96..8f1ffa3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -247,8 +247,8 @@
         AtomixDocumentTree atomixDocumentTree = new AtomixDocumentTree(client.newProxyBuilder()
                 .withName(name)
                 .withServiceType(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), ordering))
-                .withReadConsistency(ReadConsistency.LINEARIZABLE)
-                .withCommunicationStrategy(CommunicationStrategy.LEADER)
+                .withReadConsistency(ReadConsistency.SEQUENTIAL)
+                .withCommunicationStrategy(CommunicationStrategy.ANY)
                 .withTimeout(Duration.ofSeconds(30))
                 .withMaxRetries(5)
                 .build()