Store flows rules asynchronously via master in DistributedFlowRuleStore
Change-Id: I5e41a982204e395d2a1ac05017905069d61dc702
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 7b6ae55..3382abe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -19,9 +19,12 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -69,6 +72,7 @@
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTree;
import org.onosproject.store.service.EventuallyConsistentMap;
@@ -108,12 +112,13 @@
RETRY.setStackTrace(new StackTraceElement[0]);
}
+ private static final int SCHEDULED_THREAD_POOL_SIZE = 8;
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
private static final int MAX_RETRY_DELAY_MILLIS = 50;
private static final String FLOW_TABLE = "onos-flow-table";
- private static final MessageSubject APPLY_FLOWS = new MessageSubject("onos-flow-apply");
+ private static final MessageSubject APPLY_BATCH_FLOWS = new MessageSubject("onos-flow-apply");
private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -145,8 +150,11 @@
new InternalTableStatsListener();
private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
+ private ScheduledExecutorService scheduledExecutor;
private ExecutorService messageHandlingExecutor;
+ private final Random random = new Random();
+ private AsyncDocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> asyncFlows;
private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
private IdGenerator idGenerator;
private NodeId local;
@@ -157,6 +165,10 @@
local = clusterService.getLocalNode().id();
+ scheduledExecutor = Executors.newScheduledThreadPool(
+ SCHEDULED_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/flow", "schedulers", log));
+
messageHandlingExecutor = Executors.newFixedThreadPool(
MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/flow", "message-handlers", log));
@@ -170,16 +182,16 @@
.build();
deviceTableStats.addListener(tableStatsListener);
- flows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
+ asyncFlows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
.withName(FLOW_TABLE)
.withSerializer(serializer)
- .buildDocumentTree()
- .asDocumentTree();
+ .buildDocumentTree();
+ flows = asyncFlows.asDocumentTree();
clusterCommunicator.addSubscriber(
- APPLY_FLOWS,
+ APPLY_BATCH_FLOWS,
serializer::decode,
- this::applyFlows,
+ this::applyBatchFlows,
messageHandlingExecutor);
clusterCommunicator.addSubscriber(
COMPLETE_BATCH,
@@ -194,9 +206,10 @@
public void deactivate() {
deviceTableStats.removeListener(tableStatsListener);
deviceTableStats.destroy();
- clusterCommunicator.removeSubscriber(APPLY_FLOWS);
+ clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
messageHandlingExecutor.shutdownNow();
+ scheduledExecutor.shutdownNow();
log.info("Stopped");
}
@@ -220,6 +233,52 @@
}
/**
+ * Retries the given asynchronous supplier until successful.
+ * <p>
+ * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
+ * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
+ *
+ * @param supplier the supplier to retry
+ * @param <T> the return type
+ * @return the return value of the given supplier once it runs successfully
+ */
+ private <T> CompletableFuture<T> retryAsyncUntilSuccess(Supplier<CompletableFuture<T>> supplier) {
+ return retryAsyncUntilSuccess(supplier, new CompletableFuture<>());
+ }
+
+ /**
+ * Retries the given asynchronous supplier until successful.
+ * <p>
+ * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
+ * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
+ *
+ * @param supplier the supplier to retry
+ * @param future future to be completed once the operation has been successful
+ * @param <T> the return type
+ * @return the return value of the given supplier once it runs successfully
+ */
+ private <T> CompletableFuture<T> retryAsyncUntilSuccess(
+ Supplier<CompletableFuture<T>> supplier,
+ CompletableFuture<T> future) {
+ supplier.get().whenComplete((result, error) -> {
+ if (error == null) {
+ future.complete(result);
+ } else {
+ Throwable cause = error.getCause() != null ? error.getCause() : error;
+ if (cause instanceof StorageException.ConcurrentModification) {
+ scheduledExecutor.schedule(
+ () -> retryAsyncUntilSuccess(supplier, future),
+ random.nextInt(50),
+ TimeUnit.MILLISECONDS);
+ } else {
+ future.completeExceptionally(error);
+ }
+ }
+ });
+ return future;
+ }
+
+ /**
* Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
* retried after a randomized delay.
*
@@ -232,19 +291,6 @@
}
/**
- * Handles a flow rule batch event forwarded to the master node.
- * <p>
- * If this node is the master for the associated device, notifies event listeners to install flow rules.
- *
- * @param event the event to handle
- */
- private void applyFlows(FlowRuleBatchEvent event) {
- if (mastershipService.isLocalMaster(event.deviceId())) {
- notifyDelegate(event);
- }
- }
-
- /**
* Handles a completed batch event received from the master node.
* <p>
* If this node is the source of the batch, notifies event listeners to complete the operations.
@@ -338,106 +384,159 @@
if (master == null) {
log.warn("No master for {} ", deviceId);
- updateStoreInternal(operation);
-
- notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+ updateStoreInternal(operation).whenComplete((result, error) -> {
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+ });
return;
}
pendingBatches.add(operation.id());
- Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
- if (currentOps.isEmpty()) {
- batchOperationComplete(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), deviceId)));
- } else if (Objects.equals(local, master)) {
- notifyDelegate(FlowRuleBatchEvent.requested(
- new FlowRuleBatchRequest(operation.id(), currentOps),
- operation.deviceId()));
+ // If the local node is the master, apply the flows. Otherwise, send them to the master.
+ if (Objects.equals(local, master)) {
+ applyBatchFlows(operation);
} else {
- clusterCommunicator.unicast(FlowRuleBatchEvent.requested(
- new FlowRuleBatchRequest(operation.id(), currentOps),
- operation.deviceId()),
- APPLY_FLOWS,
+ log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", master, deviceId);
+ clusterCommunicator.unicast(
+ operation,
+ APPLY_BATCH_FLOWS,
serializer::encode,
master);
}
}
- private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
- return operation.getOperations().stream().map(
- op -> {
- switch (op.operator()) {
- case ADD:
- addBatchEntry(op);
- return op;
- case REMOVE:
- if (removeBatchEntry(op)) {
- return op;
- }
- return null;
- case MODIFY:
- //TODO: figure this out at some point
- break;
- default:
- log.warn("Unknown flow operation operator: {}", op.operator());
- }
- return null;
- }
- ).filter(Objects::nonNull).collect(Collectors.toSet());
- }
-
- @SuppressWarnings("unchecked")
- private void addBatchEntry(FlowRuleBatchEntry batchEntry) {
- StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
- DocumentPath path = getPathFor(entry.deviceId(), entry.id());
- retryUntilSuccess(() -> {
- Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
- if (value != null) {
- Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
- entries.put(entry, entry);
- if (flows.replace(path, entries, value.version())) {
- log.trace("Stored new flow rule: {}", entry);
- return null;
+ /**
+ * Asynchronously applies a batch of flows to the store.
+ * <p>
+ * This operation is performed on the master node to ensure that events occur <em>after</em> flows have been stored
+ * and are visible to the master node. If a non-master node stores flows and then triggers events on the master,
+ * the flows may not yet be visible to the master node due to the nature of sequentially consistent reads on the
+ * underlying {@code DocumentTree} primitive.
+ */
+ private void applyBatchFlows(FlowRuleBatchOperation operation) {
+ updateStoreInternal(operation).whenComplete((operations, error) -> {
+ if (error == null) {
+ if (operations.isEmpty()) {
+ batchOperationComplete(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
} else {
- log.trace("Failed to store new flow rule: {}", entry);
- return retry();
+ notifyDelegate(FlowRuleBatchEvent.requested(
+ new FlowRuleBatchRequest(operation.id(), operations),
+ operation.deviceId()));
}
- } else {
- // If there are no entries stored for the device, initialize the device's flows.
- flows.createRecursive(path, Maps.newHashMap());
- return retry();
}
});
}
+ private CompletableFuture<Set<FlowRuleBatchEntry>> updateStoreInternal(FlowRuleBatchOperation operation) {
+ return Tools.allOf(operation.getOperations().stream().map(op -> {
+ switch (op.operator()) {
+ case ADD:
+ return addBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
+ case REMOVE:
+ return removeBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
+ case MODIFY:
+ return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
+ default:
+ log.warn("Unknown flow operation operator: {}", op.operator());
+ return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
+ }
+ }).collect(Collectors.toList()))
+ .thenApply(results -> results.stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet()));
+ }
+
@SuppressWarnings("unchecked")
- private boolean removeBatchEntry(FlowRuleBatchEntry batchEntry) {
- FlowRule rule = batchEntry.target();
- DocumentPath path = getPathFor(rule.deviceId(), rule.id());
- return retryUntilSuccess(() -> {
- Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
- if (value != null) {
- Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
- StoredFlowEntry entry = entries.get(rule);
- if (entry != null) {
- entry.setState(FlowEntryState.PENDING_REMOVE);
- if (flows.replace(path, entries, value.version())) {
- log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
- return true;
+ private CompletableFuture<Boolean> addBatchEntry(FlowRuleBatchEntry batchEntry) {
+ StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
+ DocumentPath path = getPathFor(entry.deviceId(), entry.id());
+ return retryAsyncUntilSuccess(() -> {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncFlows.get(path).whenComplete((value, getError) -> {
+ if (getError == null) {
+ if (value != null) {
+ Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+ entries.put(entry, entry);
+ asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, replaceError) -> {
+ if (replaceError == null) {
+ if (succeeded) {
+ log.trace("Stored new flow rule: {}", entry);
+ future.complete(true);
+ } else {
+ log.trace("Failed to store new flow rule: {}", entry);
+ future.completeExceptionally(RETRY);
+ }
+ } else {
+ future.completeExceptionally(replaceError);
+ }
+ });
} else {
- log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
- return retry();
+ // If there are no entries stored for the device, initialize the device's flows.
+ Map<StoredFlowEntry, StoredFlowEntry> map = Maps.newHashMap();
+ map.put(entry, entry);
+ asyncFlows.createRecursive(path, map).whenComplete((succeeded, createError) -> {
+ if (createError == null) {
+ if (succeeded) {
+ log.trace("Stored new flow rule: {}", entry);
+ future.complete(true);
+ } else {
+ log.trace("Failed to store new flow rule: {}", entry);
+ future.completeExceptionally(RETRY);
+ }
+ } else {
+ future.completeExceptionally(createError);
+ }
+ });
}
} else {
- return false;
+ future.completeExceptionally(getError);
}
- } else {
- return false;
- }
+ });
+ return future;
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ private CompletableFuture<Boolean> removeBatchEntry(FlowRuleBatchEntry batchEntry) {
+ FlowRule rule = batchEntry.target();
+ DocumentPath path = getPathFor(rule.deviceId(), rule.id());
+ return retryAsyncUntilSuccess(() -> {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncFlows.get(path).whenComplete((value, getError) -> {
+ if (getError == null) {
+ if (value != null) {
+ Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+ StoredFlowEntry entry = entries.get(rule);
+ if (entry != null) {
+ entry.setState(FlowEntryState.PENDING_REMOVE);
+ asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, error) -> {
+ if (error == null) {
+ if (succeeded) {
+ log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
+ future.complete(true);
+ } else {
+ log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
+ future.completeExceptionally(RETRY);
+ }
+ } else {
+ future.completeExceptionally(error);
+ }
+ });
+ } else {
+ future.complete(false);
+ }
+ } else {
+ future.complete(false);
+ }
+ } else {
+ future.completeExceptionally(getError);
+ }
+ });
+ return future;
});
}
@@ -528,9 +627,7 @@
return null;
}
} else {
- // If there are no entries stored for the device, initialize the device's flows.
- flows.createRecursive(path, Maps.newHashMap());
- return retry();
+ return null;
}
});
}