Write to flow table bucket in calling thread to reduce latency

Change-Id: I0067aa6dba6c738d647585e8b063efa9e2fbe15c
diff --git a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
index 722ebfa..8ba93c6 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/flow/impl/DeviceFlowTable.java
@@ -313,31 +313,43 @@
      * @return a future to be completed with the function result once it has been run
      */
     private <T> CompletableFuture<T> runInTerm(FlowId flowId, BiFunction<FlowBucket, Long, T> function) {
+        DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
+        if (!replicaInfo.isMaster(localNodeId)) {
+            return Tools.exceptionalFuture(new IllegalStateException());
+        }
+
+        FlowBucket bucket = getBucket(flowId);
+
+        // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
+        // the change to be executed once the master has been synchronized.
+        final long term = replicaInfo.term();
         CompletableFuture<T> future = new CompletableFuture<>();
-        executor.execute(() -> {
-            DeviceReplicaInfo replicaInfo = lifecycleManager.getReplicaInfo();
-            if (!replicaInfo.isMaster(localNodeId)) {
-                future.completeExceptionally(new IllegalStateException());
-                return;
-            }
-
-            FlowBucket bucket = getBucket(flowId);
-
-            // If the master's term is not currently active (has not been synchronized with prior replicas), enqueue
-            // the change to be executed once the master has been synchronized.
-            final long term = replicaInfo.term();
-            if (activeTerm < term) {
-                log.debug("Enqueueing operation for device {}", deviceId);
-                flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
-                        .add(() -> future.complete(function.apply(bucket, term)));
-            } else {
-                future.complete(function.apply(bucket, term));
-            }
-        });
+        if (activeTerm < term) {
+            log.debug("Enqueueing operation for device {}", deviceId);
+            flowTasks.computeIfAbsent(bucket.bucketId().bucket(), b -> new LinkedList<>())
+                    .add(() -> future.complete(apply(function, bucket, term)));
+        } else {
+            future.complete(apply(function, bucket, term));
+        }
         return future;
     }
 
     /**
+     * Applies the given function to the given bucket.
+     *
+     * @param function the function to apply
+     * @param bucket the bucket to which to apply the function
+     * @param term the term in which to apply the function
+     * @param <T> the expected result type
+     * @return the function result
+     */
+    private <T> T apply(BiFunction<FlowBucket, Long, T> function, FlowBucket bucket, long term) {
+        synchronized (bucket) {
+            return function.apply(bucket, term);
+        }
+    }
+
+    /**
      * Schedules bucket backups.
      */
     private void scheduleBackups() {
@@ -483,7 +495,9 @@
         if (log.isDebugEnabled()) {
             log.debug("Backing up {} flow entries in bucket {} to {}", bucket.count(), bucket.bucketId(), nodeId);
         }
-        return sendWithTimestamp(bucket, backupSubject, nodeId);
+        synchronized (bucket) {
+            return sendWithTimestamp(bucket, backupSubject, nodeId);
+        }
     }
 
     /**