Write to flow table bucket in calling thread to reduce latency
Change-Id: I0067aa6dba6c738d647585e8b063efa9e2fbe15c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index 722ebfa..8ba93c6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -313,31 +313,43 @@
* @return a future to be completed with the function result once it has been run
*/
private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
+ DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+ if (!replicaInfo.isMaster(localNodeId)) {
+ return Tools.exceptionalFuture(new IllegalStateException());
+ }
+
+ FlowBucket bucket = getBucket(flowId);
+
+ // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
+ // the change to be executed once the master has been synchronized.
+ final long term = replicaInfo.term();
CompletableFuture<T> future = new CompletableFuture<>();
- executor.execute(() -> {
- DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
- if (!replicaInfo.isMaster(localNodeId)) {
- future.completeExceptionally(new IllegalStateException());
- return;
- }
-
- FlowBucket bucket = getBucket(flowId);
-
- // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
- // the change to be executed once the master has been synchronized.
- final long term = replicaInfo.term();
- if (activeTerm < term) {
- log.debug("Enqueueing operation for device {}", deviceId);
- flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
- .add(() -> future.complete(function.apply(bucket, term)));
- } else {
- future.complete(function.apply(bucket, term));
- }
- });
+ if (activeTerm < term) {
+ log.debug("Enqueueing operation for device {}", deviceId);
+ flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
+ .add(() -> future.complete(apply(function, bucket, term)));
+ } else {
+ future.complete(apply(function, bucket, term));
+ }
return future;
}
/**
+ * Applies the given function to the given bucket.
+ *
+ * @param function the function to apply
+ * @param bucket the bucket to which to apply the function
+ * @param term the term in which to apply the function
+ * @param <T> the expected result type
+ * @return the function result
+ */
+ private <T> T apply(BiFunction<FlowBucket, Long, T> function, FlowBucket bucket, long term) {
+ synchronized (bucket) {
+ return function.apply(bucket, term);
+ }
+ }
+
+ /**
* Schedules bucket backups.
*/
private void scheduleBackups() {
@@ -483,7 +495,9 @@
if (log.isDebugEnabled()) {
log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
}
- return sendWithTimestamp(bucket, backupSubject, nodeId);
+ synchronized (bucket) {
+ return sendWithTimestamp(bucket, backupSubject, nodeId);
+ }
}
/**