Store flows rules asynchronously via master in DistributedFlowRuleStore

Change-Id: I5e41a982204e395d2a1ac05017905069d61dc702
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 7b6ae55..3382abe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -19,9 +19,12 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -69,6 +72,7 @@
 import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.impl.MastershipBasedTimestamp;
 import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDocumentTree;
 import org.onosproject.store.service.DocumentPath;
 import org.onosproject.store.service.DocumentTree;
 import org.onosproject.store.service.EventuallyConsistentMap;
@@ -108,12 +112,13 @@
         RETRY.setStackTrace(new StackTraceElement[0]);
     }
 
+    private static final int SCHEDULED_THREAD_POOL_SIZE = 8;
     private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
     private static final int MAX_RETRY_DELAY_MILLIS = 50;
 
     private static final String FLOW_TABLE = "onos-flow-table";
 
-    private static final MessageSubject APPLY_FLOWS = new MessageSubject("onos-flow-apply");
+    private static final MessageSubject APPLY_BATCH_FLOWS = new MessageSubject("onos-flow-apply");
     private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -145,8 +150,11 @@
             new InternalTableStatsListener();
 
     private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
+    private ScheduledExecutorService scheduledExecutor;
     private ExecutorService messageHandlingExecutor;
+    private final Random random = new Random();
 
+    private AsyncDocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> asyncFlows;
     private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
     private IdGenerator idGenerator;
     private NodeId local;
@@ -157,6 +165,10 @@
 
         local = clusterService.getLocalNode().id();
 
+        scheduledExecutor = Executors.newScheduledThreadPool(
+                SCHEDULED_THREAD_POOL_SIZE,
+                groupedThreads("onos/store/flow", "schedulers", log));
+
         messageHandlingExecutor = Executors.newFixedThreadPool(
                 MESSAGE_HANDLER_THREAD_POOL_SIZE,
                 groupedThreads("onos/store/flow", "message-handlers", log));
@@ -170,16 +182,16 @@
                 .build();
         deviceTableStats.addListener(tableStatsListener);
 
-        flows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
+        asyncFlows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
                 .withName(FLOW_TABLE)
                 .withSerializer(serializer)
-                .buildDocumentTree()
-                .asDocumentTree();
+                .buildDocumentTree();
+        flows = asyncFlows.asDocumentTree();
 
         clusterCommunicator.addSubscriber(
-                APPLY_FLOWS,
+                APPLY_BATCH_FLOWS,
                 serializer::decode,
-                this::applyFlows,
+                this::applyBatchFlows,
                 messageHandlingExecutor);
         clusterCommunicator.addSubscriber(
                 COMPLETE_BATCH,
@@ -194,9 +206,10 @@
     public void deactivate() {
         deviceTableStats.removeListener(tableStatsListener);
         deviceTableStats.destroy();
-        clusterCommunicator.removeSubscriber(APPLY_FLOWS);
+        clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
         clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
         messageHandlingExecutor.shutdownNow();
+        scheduledExecutor.shutdownNow();
         log.info("Stopped");
     }
 
@@ -220,6 +233,52 @@
     }
 
     /**
+     * Retries the given asynchronous supplier until successful.
+     * <p>
+     * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
+     * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
+     *
+     * @param supplier the supplier to retry
+     * @param <T> the return type
+     * @return the return value of the given supplier once it runs successfully
+     */
+    private <T> CompletableFuture<T> retryAsyncUntilSuccess(Supplier<CompletableFuture<T>> supplier) {
+        return retryAsyncUntilSuccess(supplier, new CompletableFuture<>());
+    }
+
+    /**
+     * Retries the given asynchronous supplier until successful.
+     * <p>
+     * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
+     * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
+     *
+     * @param supplier the supplier to retry
+     * @param future future to be completed once the operation has been successful
+     * @param <T> the return type
+     * @return the return value of the given supplier once it runs successfully
+     */
+    private <T> CompletableFuture<T> retryAsyncUntilSuccess(
+            Supplier<CompletableFuture<T>> supplier,
+            CompletableFuture<T> future) {
+        supplier.get().whenComplete((result, error) -> {
+            if (error == null) {
+                future.complete(result);
+            } else {
+                Throwable cause = error.getCause() != null ? error.getCause() : error;
+                if (cause instanceof StorageException.ConcurrentModification) {
+                    scheduledExecutor.schedule(
+                            () -> retryAsyncUntilSuccess(supplier, future),
+                            random.nextInt(50),
+                            TimeUnit.MILLISECONDS);
+                } else {
+                    future.completeExceptionally(error);
+                }
+            }
+        });
+        return future;
+    }
+
+    /**
      * Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
      * retried after a randomized delay.
      *
@@ -232,19 +291,6 @@
     }
 
     /**
-     * Handles a flow rule batch event forwarded to the master node.
-     * <p>
-     * If this node is the master for the associated device, notifies event listeners to install flow rules.
-     *
-     * @param event the event to handle
-     */
-    private void applyFlows(FlowRuleBatchEvent event) {
-        if (mastershipService.isLocalMaster(event.deviceId())) {
-            notifyDelegate(event);
-        }
-    }
-
-    /**
      * Handles a completed batch event received from the master node.
      * <p>
      * If this node is the source of the batch, notifies event listeners to complete the operations.
@@ -338,106 +384,159 @@
         if (master == null) {
             log.warn("No master for {} ", deviceId);
 
-            updateStoreInternal(operation);
-
-            notifyDelegate(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+            updateStoreInternal(operation).whenComplete((result, error) -> {
+                notifyDelegate(FlowRuleBatchEvent.completed(
+                        new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                        new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+            });
             return;
         }
 
         pendingBatches.add(operation.id());
 
-        Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
-        if (currentOps.isEmpty()) {
-            batchOperationComplete(FlowRuleBatchEvent.completed(
-                    new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
-                    new CompletedBatchOperation(true, Collections.emptySet(), deviceId)));
-        } else if (Objects.equals(local, master)) {
-            notifyDelegate(FlowRuleBatchEvent.requested(
-                    new FlowRuleBatchRequest(operation.id(), currentOps),
-                    operation.deviceId()));
+        // If the local node is the master, apply the flows. Otherwise, send them to the master.
+        if (Objects.equals(local, master)) {
+            applyBatchFlows(operation);
         } else {
-            clusterCommunicator.unicast(FlowRuleBatchEvent.requested(
-                    new FlowRuleBatchRequest(operation.id(), currentOps),
-                    operation.deviceId()),
-                    APPLY_FLOWS,
+            log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", master, deviceId);
+            clusterCommunicator.unicast(
+                    operation,
+                    APPLY_BATCH_FLOWS,
                     serializer::encode,
                     master);
         }
     }
 
-    private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
-        return operation.getOperations().stream().map(
-                op -> {
-                    switch (op.operator()) {
-                        case ADD:
-                            addBatchEntry(op);
-                            return op;
-                        case REMOVE:
-                            if (removeBatchEntry(op)) {
-                                return op;
-                            }
-                            return null;
-                        case MODIFY:
-                            //TODO: figure this out at some point
-                            break;
-                        default:
-                            log.warn("Unknown flow operation operator: {}", op.operator());
-                    }
-                    return null;
-                }
-        ).filter(Objects::nonNull).collect(Collectors.toSet());
-    }
-
-    @SuppressWarnings("unchecked")
-    private void addBatchEntry(FlowRuleBatchEntry batchEntry) {
-        StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
-        DocumentPath path = getPathFor(entry.deviceId(), entry.id());
-        retryUntilSuccess(() -> {
-            Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
-            if (value != null) {
-                Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
-                entries.put(entry, entry);
-                if (flows.replace(path, entries, value.version())) {
-                    log.trace("Stored new flow rule: {}", entry);
-                    return null;
+    /**
+     * Asynchronously applies a batch of flows to the store.
+     * <p>
+     * This operation is performed on the master node to ensure that events occur <em>after</em> flows have been stored
+     * and are visible to the master node. If a non-master node stores flows and then triggers events on the master,
+     * the flows may not yet be visible to the master node due to the nature of sequentially consistent reads on the
+     * underlying {@code DocumentTree} primitive.
+     */
+    private void applyBatchFlows(FlowRuleBatchOperation operation) {
+        updateStoreInternal(operation).whenComplete((operations, error) -> {
+            if (error == null) {
+                if (operations.isEmpty()) {
+                    batchOperationComplete(FlowRuleBatchEvent.completed(
+                            new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+                            new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
                 } else {
-                    log.trace("Failed to store new flow rule: {}", entry);
-                    return retry();
+                    notifyDelegate(FlowRuleBatchEvent.requested(
+                            new FlowRuleBatchRequest(operation.id(), operations),
+                            operation.deviceId()));
                 }
-            } else {
-                // If there are no entries stored for the device, initialize the device's flows.
-                flows.createRecursive(path, Maps.newHashMap());
-                return retry();
             }
         });
     }
 
+    private CompletableFuture<Set<FlowRuleBatchEntry>> updateStoreInternal(FlowRuleBatchOperation operation) {
+        return Tools.allOf(operation.getOperations().stream().map(op -> {
+            switch (op.operator()) {
+                case ADD:
+                    return addBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
+                case REMOVE:
+                    return removeBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
+                case MODIFY:
+                    return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
+                default:
+                    log.warn("Unknown flow operation operator: {}", op.operator());
+                    return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
+            }
+        }).collect(Collectors.toList()))
+                .thenApply(results -> results.stream()
+                        .filter(Objects::nonNull)
+                        .collect(Collectors.toSet()));
+    }
+
     @SuppressWarnings("unchecked")
-    private boolean removeBatchEntry(FlowRuleBatchEntry batchEntry) {
-        FlowRule rule = batchEntry.target();
-        DocumentPath path = getPathFor(rule.deviceId(), rule.id());
-        return retryUntilSuccess(() -> {
-            Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
-            if (value != null) {
-                Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
-                StoredFlowEntry entry = entries.get(rule);
-                if (entry != null) {
-                    entry.setState(FlowEntryState.PENDING_REMOVE);
-                    if (flows.replace(path, entries, value.version())) {
-                        log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
-                        return true;
+    private CompletableFuture<Boolean> addBatchEntry(FlowRuleBatchEntry batchEntry) {
+        StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
+        DocumentPath path = getPathFor(entry.deviceId(), entry.id());
+        return retryAsyncUntilSuccess(() -> {
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            asyncFlows.get(path).whenComplete((value, getError) -> {
+                if (getError == null) {
+                    if (value != null) {
+                        Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+                        entries.put(entry, entry);
+                        asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, replaceError) -> {
+                            if (replaceError == null) {
+                                if (succeeded) {
+                                    log.trace("Stored new flow rule: {}", entry);
+                                    future.complete(true);
+                                } else {
+                                    log.trace("Failed to store new flow rule: {}", entry);
+                                    future.completeExceptionally(RETRY);
+                                }
+                            } else {
+                                future.completeExceptionally(replaceError);
+                            }
+                        });
                     } else {
-                        log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
-                        return retry();
+                        // If there are no entries stored for the device, initialize the device's flows.
+                        Map<StoredFlowEntry, StoredFlowEntry> map = Maps.newHashMap();
+                        map.put(entry, entry);
+                        asyncFlows.createRecursive(path, map).whenComplete((succeeded, createError) -> {
+                            if (createError == null) {
+                                if (succeeded) {
+                                    log.trace("Stored new flow rule: {}", entry);
+                                    future.complete(true);
+                                } else {
+                                    log.trace("Failed to store new flow rule: {}", entry);
+                                    future.completeExceptionally(RETRY);
+                                }
+                            } else {
+                                future.completeExceptionally(createError);
+                            }
+                        });
                     }
                 } else {
-                    return false;
+                    future.completeExceptionally(getError);
                 }
-            } else {
-                return false;
-            }
+            });
+            return future;
+        });
+    }
+
+    @SuppressWarnings("unchecked")
+    private CompletableFuture<Boolean> removeBatchEntry(FlowRuleBatchEntry batchEntry) {
+        FlowRule rule = batchEntry.target();
+        DocumentPath path = getPathFor(rule.deviceId(), rule.id());
+        return retryAsyncUntilSuccess(() -> {
+            CompletableFuture<Boolean> future = new CompletableFuture<>();
+            asyncFlows.get(path).whenComplete((value, getError) -> {
+                if (getError == null) {
+                    if (value != null) {
+                        Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+                        StoredFlowEntry entry = entries.get(rule);
+                        if (entry != null) {
+                            entry.setState(FlowEntryState.PENDING_REMOVE);
+                            asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, error) -> {
+                                if (error == null) {
+                                    if (succeeded) {
+                                        log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
+                                        future.complete(true);
+                                    } else {
+                                        log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
+                                        future.completeExceptionally(RETRY);
+                                    }
+                                } else {
+                                    future.completeExceptionally(error);
+                                }
+                            });
+                        } else {
+                            future.complete(false);
+                        }
+                    } else {
+                        future.complete(false);
+                    }
+                } else {
+                    future.completeExceptionally(getError);
+                }
+            });
+            return future;
         });
     }
 
@@ -528,9 +627,7 @@
                     return null;
                 }
             } else {
-                // If there are no entries stored for the device, initialize the device's flows.
-                flows.createRecursive(path, Maps.newHashMap());
-                return retry();
+                return null;
             }
         });
     }