Store flows rules asynchronously via master in DistributedFlowRuleStore
Change-Id: I5e41a982204e395d2a1ac05017905069d61dc702
diff --git a/core/api/src/test/java/org/onosproject/store/service/TestAsyncDocumentTree.java b/core/api/src/test/java/org/onosproject/store/service/TestAsyncDocumentTree.java
new file mode 100644
index 0000000..f1d4fec
--- /dev/null
+++ b/core/api/src/test/java/org/onosproject/store/service/TestAsyncDocumentTree.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.store.service;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+import org.onosproject.store.primitives.NodeUpdate;
+import org.onosproject.store.primitives.TransactionId;
+
+/**
+ * Test asynchronous document tree.
+ */
+public class TestAsyncDocumentTree<V> implements AsyncDocumentTree<V> {
+ private final DocumentTree<V> tree;
+
+ public TestAsyncDocumentTree(String name) {
+ this.tree = new TestDocumentTree<>(name);
+ }
+
+ @Override
+ public String name() {
+ return tree.name();
+ }
+
+ @Override
+ public DocumentPath root() {
+ return tree.root();
+ }
+
+ @Override
+ public CompletableFuture<Map<String, Versioned<V>>> getChildren(DocumentPath path) {
+ return CompletableFuture.completedFuture(tree.getChildren(path));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> get(DocumentPath path) {
+ return CompletableFuture.completedFuture(tree.get(path));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> set(DocumentPath path, V value) {
+ return CompletableFuture.completedFuture(tree.set(path, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> create(DocumentPath path, V value) {
+ return CompletableFuture.completedFuture(tree.create(path, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> createRecursive(DocumentPath path, V value) {
+ return CompletableFuture.completedFuture(tree.createRecursive(path, value));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, long version) {
+ return CompletableFuture.completedFuture(tree.replace(path, newValue, version));
+ }
+
+ @Override
+ public CompletableFuture<Boolean> replace(DocumentPath path, V newValue, V currentValue) {
+ return CompletableFuture.completedFuture(tree.replace(path, newValue, currentValue));
+ }
+
+ @Override
+ public CompletableFuture<Versioned<V>> removeNode(DocumentPath path) {
+ return CompletableFuture.completedFuture(tree.removeNode(path));
+ }
+
+ @Override
+ public CompletableFuture<Void> addListener(DocumentPath path, DocumentTreeListener<V> listener) {
+ tree.addListener(path, listener);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Void> removeListener(DocumentTreeListener<V> listener) {
+ tree.removeListener(listener);
+ return CompletableFuture.completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<Version> begin(TransactionId transactionId) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepare(TransactionLog<NodeUpdate<V>> transactionLog) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> prepareAndCommit(TransactionLog<NodeUpdate<V>> transactionLog) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> commit(TransactionId transactionId) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> rollback(TransactionId transactionId) {
+ return null;
+ }
+
+ @Override
+ public DocumentTree<V> asDocumentTree() {
+ return tree;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
index 7b6ae55..3382abe 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DistributedFlowRuleStore.java
@@ -19,9 +19,12 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -69,6 +72,7 @@
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.impl.MastershipBasedTimestamp;
import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.AsyncDocumentTree;
import org.onosproject.store.service.DocumentPath;
import org.onosproject.store.service.DocumentTree;
import org.onosproject.store.service.EventuallyConsistentMap;
@@ -108,12 +112,13 @@
RETRY.setStackTrace(new StackTraceElement[0]);
}
+ private static final int SCHEDULED_THREAD_POOL_SIZE = 8;
private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
private static final int MAX_RETRY_DELAY_MILLIS = 50;
private static final String FLOW_TABLE = "onos-flow-table";
- private static final MessageSubject APPLY_FLOWS = new MessageSubject("onos-flow-apply");
+ private static final MessageSubject APPLY_BATCH_FLOWS = new MessageSubject("onos-flow-apply");
private static final MessageSubject COMPLETE_BATCH = new MessageSubject("onos-flow-batch-complete");
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -145,8 +150,11 @@
new InternalTableStatsListener();
private Set<Long> pendingBatches = Sets.newConcurrentHashSet();
+ private ScheduledExecutorService scheduledExecutor;
private ExecutorService messageHandlingExecutor;
+ private final Random random = new Random();
+ private AsyncDocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> asyncFlows;
private DocumentTree<Map<StoredFlowEntry, StoredFlowEntry>> flows;
private IdGenerator idGenerator;
private NodeId local;
@@ -157,6 +165,10 @@
local = clusterService.getLocalNode().id();
+ scheduledExecutor = Executors.newScheduledThreadPool(
+ SCHEDULED_THREAD_POOL_SIZE,
+ groupedThreads("onos/store/flow", "schedulers", log));
+
messageHandlingExecutor = Executors.newFixedThreadPool(
MESSAGE_HANDLER_THREAD_POOL_SIZE,
groupedThreads("onos/store/flow", "message-handlers", log));
@@ -170,16 +182,16 @@
.build();
deviceTableStats.addListener(tableStatsListener);
- flows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
+ asyncFlows = storageService.<Map<StoredFlowEntry, StoredFlowEntry>>documentTreeBuilder()
.withName(FLOW_TABLE)
.withSerializer(serializer)
- .buildDocumentTree()
- .asDocumentTree();
+ .buildDocumentTree();
+ flows = asyncFlows.asDocumentTree();
clusterCommunicator.addSubscriber(
- APPLY_FLOWS,
+ APPLY_BATCH_FLOWS,
serializer::decode,
- this::applyFlows,
+ this::applyBatchFlows,
messageHandlingExecutor);
clusterCommunicator.addSubscriber(
COMPLETE_BATCH,
@@ -194,9 +206,10 @@
public void deactivate() {
deviceTableStats.removeListener(tableStatsListener);
deviceTableStats.destroy();
- clusterCommunicator.removeSubscriber(APPLY_FLOWS);
+ clusterCommunicator.removeSubscriber(APPLY_BATCH_FLOWS);
clusterCommunicator.removeSubscriber(COMPLETE_BATCH);
messageHandlingExecutor.shutdownNow();
+ scheduledExecutor.shutdownNow();
log.info("Stopped");
}
@@ -220,6 +233,52 @@
}
/**
+ * Retries the given asynchronous supplier until successful.
+ * <p>
+ * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
+ * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
+ *
+ * @param supplier the supplier to retry
+ * @param <T> the return type
+ * @return the return value of the given supplier once it runs successfully
+ */
+ private <T> CompletableFuture<T> retryAsyncUntilSuccess(Supplier<CompletableFuture<T>> supplier) {
+ return retryAsyncUntilSuccess(supplier, new CompletableFuture<>());
+ }
+
+ /**
+ * Retries the given asynchronous supplier until successful.
+ * <p>
+ * This method retries the given supplier until no {@code ConcurrentModification} exceptions are thrown. In
+ * between retries, it waits a semi-random interval to attempt to avoid transaction conflicts with other processes.
+ *
+ * @param supplier the supplier to retry
+ * @param future future to be completed once the operation has been successful
+ * @param <T> the return type
+ * @return the return value of the given supplier once it runs successfully
+ */
+ private <T> CompletableFuture<T> retryAsyncUntilSuccess(
+ Supplier<CompletableFuture<T>> supplier,
+ CompletableFuture<T> future) {
+ supplier.get().whenComplete((result, error) -> {
+ if (error == null) {
+ future.complete(result);
+ } else {
+ Throwable cause = error.getCause() != null ? error.getCause() : error;
+ if (cause instanceof StorageException.ConcurrentModification) {
+ scheduledExecutor.schedule(
+ () -> retryAsyncUntilSuccess(supplier, future),
+ random.nextInt(50),
+ TimeUnit.MILLISECONDS);
+ } else {
+ future.completeExceptionally(error);
+ }
+ }
+ });
+ return future;
+ }
+
+ /**
* Return method for {@link #retryUntilSuccess(Supplier)} callbacks to indicate that the callback needs to be
* retried after a randomized delay.
*
@@ -232,19 +291,6 @@
}
/**
- * Handles a flow rule batch event forwarded to the master node.
- * <p>
- * If this node is the master for the associated device, notifies event listeners to install flow rules.
- *
- * @param event the event to handle
- */
- private void applyFlows(FlowRuleBatchEvent event) {
- if (mastershipService.isLocalMaster(event.deviceId())) {
- notifyDelegate(event);
- }
- }
-
- /**
* Handles a completed batch event received from the master node.
* <p>
* If this node is the source of the batch, notifies event listeners to complete the operations.
@@ -338,106 +384,159 @@
if (master == null) {
log.warn("No master for {} ", deviceId);
- updateStoreInternal(operation);
-
- notifyDelegate(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+ updateStoreInternal(operation).whenComplete((result, error) -> {
+ notifyDelegate(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
+ });
return;
}
pendingBatches.add(operation.id());
- Set<FlowRuleBatchEntry> currentOps = updateStoreInternal(operation);
- if (currentOps.isEmpty()) {
- batchOperationComplete(FlowRuleBatchEvent.completed(
- new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
- new CompletedBatchOperation(true, Collections.emptySet(), deviceId)));
- } else if (Objects.equals(local, master)) {
- notifyDelegate(FlowRuleBatchEvent.requested(
- new FlowRuleBatchRequest(operation.id(), currentOps),
- operation.deviceId()));
+ // If the local node is the master, apply the flows. Otherwise, send them to the master.
+ if (Objects.equals(local, master)) {
+ applyBatchFlows(operation);
} else {
- clusterCommunicator.unicast(FlowRuleBatchEvent.requested(
- new FlowRuleBatchRequest(operation.id(), currentOps),
- operation.deviceId()),
- APPLY_FLOWS,
+ log.trace("Forwarding storeBatch to {}, which is the primary (master) for device {}", master, deviceId);
+ clusterCommunicator.unicast(
+ operation,
+ APPLY_BATCH_FLOWS,
serializer::encode,
master);
}
}
- private Set<FlowRuleBatchEntry> updateStoreInternal(FlowRuleBatchOperation operation) {
- return operation.getOperations().stream().map(
- op -> {
- switch (op.operator()) {
- case ADD:
- addBatchEntry(op);
- return op;
- case REMOVE:
- if (removeBatchEntry(op)) {
- return op;
- }
- return null;
- case MODIFY:
- //TODO: figure this out at some point
- break;
- default:
- log.warn("Unknown flow operation operator: {}", op.operator());
- }
- return null;
- }
- ).filter(Objects::nonNull).collect(Collectors.toSet());
- }
-
- @SuppressWarnings("unchecked")
- private void addBatchEntry(FlowRuleBatchEntry batchEntry) {
- StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
- DocumentPath path = getPathFor(entry.deviceId(), entry.id());
- retryUntilSuccess(() -> {
- Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
- if (value != null) {
- Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
- entries.put(entry, entry);
- if (flows.replace(path, entries, value.version())) {
- log.trace("Stored new flow rule: {}", entry);
- return null;
+ /**
+ * Asynchronously applies a batch of flows to the store.
+ * <p>
+ * This operation is performed on the master node to ensure that events occur <em>after</em> flows have been stored
+ * and are visible to the master node. If a non-master node stores flows and then triggers events on the master,
+ * the flows may not yet be visible to the master node due to the nature of sequentially consistent reads on the
+ * underlying {@code DocumentTree} primitive.
+ */
+ private void applyBatchFlows(FlowRuleBatchOperation operation) {
+ updateStoreInternal(operation).whenComplete((operations, error) -> {
+ if (error == null) {
+ if (operations.isEmpty()) {
+ batchOperationComplete(FlowRuleBatchEvent.completed(
+ new FlowRuleBatchRequest(operation.id(), Collections.emptySet()),
+ new CompletedBatchOperation(true, Collections.emptySet(), operation.deviceId())));
} else {
- log.trace("Failed to store new flow rule: {}", entry);
- return retry();
+ notifyDelegate(FlowRuleBatchEvent.requested(
+ new FlowRuleBatchRequest(operation.id(), operations),
+ operation.deviceId()));
}
- } else {
- // If there are no entries stored for the device, initialize the device's flows.
- flows.createRecursive(path, Maps.newHashMap());
- return retry();
}
});
}
+ private CompletableFuture<Set<FlowRuleBatchEntry>> updateStoreInternal(FlowRuleBatchOperation operation) {
+ return Tools.allOf(operation.getOperations().stream().map(op -> {
+ switch (op.operator()) {
+ case ADD:
+ return addBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
+ case REMOVE:
+ return removeBatchEntry(op).thenApply(succeeded -> succeeded ? op : null);
+ case MODIFY:
+ return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
+ default:
+ log.warn("Unknown flow operation operator: {}", op.operator());
+ return CompletableFuture.<FlowRuleBatchEntry>completedFuture(null);
+ }
+ }).collect(Collectors.toList()))
+ .thenApply(results -> results.stream()
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet()));
+ }
+
@SuppressWarnings("unchecked")
- private boolean removeBatchEntry(FlowRuleBatchEntry batchEntry) {
- FlowRule rule = batchEntry.target();
- DocumentPath path = getPathFor(rule.deviceId(), rule.id());
- return retryUntilSuccess(() -> {
- Versioned<Map<StoredFlowEntry, StoredFlowEntry>> value = flows.get(path);
- if (value != null) {
- Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
- StoredFlowEntry entry = entries.get(rule);
- if (entry != null) {
- entry.setState(FlowEntryState.PENDING_REMOVE);
- if (flows.replace(path, entries, value.version())) {
- log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
- return true;
+ private CompletableFuture<Boolean> addBatchEntry(FlowRuleBatchEntry batchEntry) {
+ StoredFlowEntry entry = new DefaultFlowEntry(batchEntry.target());
+ DocumentPath path = getPathFor(entry.deviceId(), entry.id());
+ return retryAsyncUntilSuccess(() -> {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncFlows.get(path).whenComplete((value, getError) -> {
+ if (getError == null) {
+ if (value != null) {
+ Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+ entries.put(entry, entry);
+ asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, replaceError) -> {
+ if (replaceError == null) {
+ if (succeeded) {
+ log.trace("Stored new flow rule: {}", entry);
+ future.complete(true);
+ } else {
+ log.trace("Failed to store new flow rule: {}", entry);
+ future.completeExceptionally(RETRY);
+ }
+ } else {
+ future.completeExceptionally(replaceError);
+ }
+ });
} else {
- log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
- return retry();
+ // If there are no entries stored for the device, initialize the device's flows.
+ Map<StoredFlowEntry, StoredFlowEntry> map = Maps.newHashMap();
+ map.put(entry, entry);
+ asyncFlows.createRecursive(path, map).whenComplete((succeeded, createError) -> {
+ if (createError == null) {
+ if (succeeded) {
+ log.trace("Stored new flow rule: {}", entry);
+ future.complete(true);
+ } else {
+ log.trace("Failed to store new flow rule: {}", entry);
+ future.completeExceptionally(RETRY);
+ }
+ } else {
+ future.completeExceptionally(createError);
+ }
+ });
}
} else {
- return false;
+ future.completeExceptionally(getError);
}
- } else {
- return false;
- }
+ });
+ return future;
+ });
+ }
+
+ @SuppressWarnings("unchecked")
+ private CompletableFuture<Boolean> removeBatchEntry(FlowRuleBatchEntry batchEntry) {
+ FlowRule rule = batchEntry.target();
+ DocumentPath path = getPathFor(rule.deviceId(), rule.id());
+ return retryAsyncUntilSuccess(() -> {
+ CompletableFuture<Boolean> future = new CompletableFuture<>();
+ asyncFlows.get(path).whenComplete((value, getError) -> {
+ if (getError == null) {
+ if (value != null) {
+ Map<StoredFlowEntry, StoredFlowEntry> entries = Maps.newHashMap(value.value());
+ StoredFlowEntry entry = entries.get(rule);
+ if (entry != null) {
+ entry.setState(FlowEntryState.PENDING_REMOVE);
+ asyncFlows.replace(path, entries, value.version()).whenComplete((succeeded, error) -> {
+ if (error == null) {
+ if (succeeded) {
+ log.trace("Updated flow rule state to PENDING_REMOVE: {}", entry);
+ future.complete(true);
+ } else {
+ log.trace("Failed to update flow rule state to PENDING_REMOVE: {}", entry);
+ future.completeExceptionally(RETRY);
+ }
+ } else {
+ future.completeExceptionally(error);
+ }
+ });
+ } else {
+ future.complete(false);
+ }
+ } else {
+ future.complete(false);
+ }
+ } else {
+ future.completeExceptionally(getError);
+ }
+ });
+ return future;
});
}
@@ -528,9 +627,7 @@
return null;
}
} else {
- // If there are no entries stored for the device, initialize the device's flows.
- flows.createRecursive(path, Maps.newHashMap());
- return retry();
+ return null;
}
});
}
diff --git a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
index d49f1cc..8144d22 100644
--- a/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
+++ b/core/store/dist/src/test/java/org/onosproject/store/flow/impl/DistributedFlowRuleStoreTest.java
@@ -42,11 +42,9 @@
import org.onosproject.net.intent.IntentTestsMocks;
import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import org.onosproject.store.service.AsyncDocumentTree;
-import org.onosproject.store.service.AsyncDocumentTreeAdapter;
-import org.onosproject.store.service.DocumentTree;
import org.onosproject.store.service.DocumentTreeBuilder;
import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.TestDocumentTree;
+import org.onosproject.store.service.TestAsyncDocumentTree;
import org.onosproject.store.service.TestStorageService;
import org.onosproject.store.service.TestTopic;
import org.onosproject.store.service.Topic;
@@ -145,13 +143,7 @@
@Override
@SuppressWarnings("unchecked")
public AsyncDocumentTree<V> build() {
- String name = name();
- return new AsyncDocumentTreeAdapter() {
- @Override
- public DocumentTree asDocumentTree() {
- return new TestDocumentTree(name);
- }
- };
+ return new TestAsyncDocumentTree<>(name());
}
};
}
diff --git a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
index ed46a96..8f1ffa3 100644
--- a/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
+++ b/core/store/primitives/src/main/java/org/onosproject/store/primitives/impl/StoragePartitionClient.java
@@ -247,8 +247,8 @@
AtomixDocumentTree atomixDocumentTree = new AtomixDocumentTree(client.newProxyBuilder()
.withName(name)
.withServiceType(String.format("%s-%s", DistributedPrimitive.Type.DOCUMENT_TREE.name(), ordering))
- .withReadConsistency(ReadConsistency.LINEARIZABLE)
- .withCommunicationStrategy(CommunicationStrategy.LEADER)
+ .withReadConsistency(ReadConsistency.SEQUENTIAL)
+ .withCommunicationStrategy(CommunicationStrategy.ANY)
.withTimeout(Duration.ofSeconds(30))
.withMaxRetries(5)
.build()