[SDFAB-1100] In-order flowrule processing

Extends the FlowRuleService and its api by adding in-order processing
capabilities. This is achieved by introducing stripe key as way to
indicate how to process the flowrules. Key is an object which is used
to select a specific executor. Operations having the same key is guaranteed
that will be processed by the same executor.

Change-Id: I5ab4d42e8a2b8cb869f3dc2305dbc5084d31f08b
(cherry picked from commit 07af21d9e116414b74152dc812cc980ca2c4c0a5)
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
index 9c0f522..14af90d 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
@@ -24,7 +24,9 @@
 import java.util.Set;
 
 import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.net.flow.FlowRuleOperation.Type.*;
+import static org.onosproject.net.flow.FlowRuleOperation.Type.ADD;
+import static org.onosproject.net.flow.FlowRuleOperation.Type.REMOVE;
+import static org.onosproject.net.flow.FlowRuleOperation.Type.MODIFY;
 
 /**
  * A batch of flow rule operations that are broken into stages.
@@ -34,17 +36,21 @@
 
     private final List<Set<FlowRuleOperation>> stages;
     private final FlowRuleOperationsContext callback;
+    private final Integer stripeKey;
 
     private FlowRuleOperations(List<Set<FlowRuleOperation>> stages,
-                               FlowRuleOperationsContext cb) {
+                               FlowRuleOperationsContext cb,
+                               Integer stripeKey) {
         this.stages = stages;
         this.callback = cb;
+        this.stripeKey = stripeKey;
     }
 
     // kryo-constructor
     protected FlowRuleOperations() {
         this.stages = Lists.newArrayList();
         this.callback = null;
+        this.stripeKey = null;
     }
 
     /**
@@ -67,6 +73,18 @@
     }
 
     /**
+     * Returns the stripe key. Expectation is that FlowRuleOperations with the
+     * same key will be executed sequentially in the same order as they are
+     * submitted to the FlowRuleService. Operations without a key or with
+     * different keys might be executed in parallel.
+     *
+     * @return the stripe key associated to this or null if not present
+     */
+    public Integer stripeKey() {
+        return stripeKey;
+    }
+
+    /**
      * Returns a new builder.
      *
      * @return new builder
@@ -89,6 +107,7 @@
 
         private final ImmutableList.Builder<Set<FlowRuleOperation>> listBuilder = ImmutableList.builder();
         private ImmutableSet.Builder<FlowRuleOperation> currentStage = ImmutableSet.builder();
+        private Integer stripeKey = null;
 
         // prevent use of the default constructor outside of this file; use the above method
         private Builder() {}
@@ -160,6 +179,25 @@
         }
 
         /**
+         * Provides semantics similar to lock striping. FlowRuleOperations
+         * with the same key will be executed sequentially. Operations without
+         * a key or with different keys might be executed in parallel.
+         * <p>
+         * This parameter is useful to correlate different operations with
+         * potentially conflicting writes, to guarantee that operations are
+         * executed in-order. For example, to handle the case where one operation
+         * that removes a flow rule is followed by one that adds the same flow rule.
+         * </p>
+         *
+         * @param stripeKey an integer key
+         * @return this
+         */
+        public Builder striped(int stripeKey) {
+            this.stripeKey = stripeKey;
+            return this;
+        }
+
+        /**
          * Builds the immutable flow rule operations.
          *
          * @return flow rule operations
@@ -178,7 +216,7 @@
             checkNotNull(cb);
 
             closeStage();
-            return new FlowRuleOperations(listBuilder.build(), cb);
+            return new FlowRuleOperations(listBuilder.build(), cb, stripeKey);
         }
     }
 }
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
index 9cada03..807d09e 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
@@ -196,4 +196,33 @@
     default long getActiveFlowRuleCount(DeviceId deviceId) {
         return 0;
     }
+
+    /**
+     * Applies the specified flow rules onto their respective devices. Similar
+     * to {@link FlowRuleService#applyFlowRules(FlowRule...)} but expectation is
+     * that flow rules applied by subsequent calls using the same key will be
+     * executed sequentially. Flow rules applied through {@link FlowRuleService#applyFlowRules(FlowRule...)}
+     * might be executed in parallel.
+     *
+     * @param stripeKey an integer key
+     * @param flowRules one or more flow rules
+     */
+    default void applyFlowRules(int stripeKey, FlowRule... flowRules) {
+
+    }
+
+    /**
+     * Removes the specified flow rules from their respective devices. Similar
+     * to {@link FlowRuleService#removeFlowRules(FlowRule...)} but expectation is
+     * that flow rules removed by subsequent calls using the same key will be
+     * executed sequentially. Flow rules applied through {@link FlowRuleService#removeFlowRules(FlowRule...)}
+     * might be executed in parallel.
+     *
+     * @param stripeKey an integer key
+     * @param flowRules one or more flow rules
+     */
+    default void removeFlowRules(int stripeKey, FlowRule... flowRules) {
+
+    }
+
 }
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 670103b..e7cd691 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -21,6 +21,8 @@
 import com.google.common.collect.Maps;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import org.onlab.util.PredictableExecutor;
+import org.onlab.util.PredictableExecutor.PickyRunnable;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
@@ -143,11 +145,11 @@
 
     private final FlowRuleDriverProvider driverProvider = new FlowRuleDriverProvider();
 
-    protected ExecutorService deviceInstallers =
-            Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d", log));
+    protected ExecutorService deviceInstallers = Executors.newFixedThreadPool(32,
+            groupedThreads("onos/flowservice", "device-installer-%d", log));
 
-    protected ExecutorService operationsService =
-            Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d", log));
+    protected ExecutorService operationsService = new PredictableExecutor(32,
+            groupedThreads("onos/flowservice", "operations-%d", log));
 
     private IdGenerator idGenerator;
 
@@ -267,7 +269,7 @@
                 log.info("Configured. FallbackFlowPollFrequency is {} seconds",
                          fallbackFlowPollFrequency);
             } catch (NumberFormatException e) {
-                log.warn("Configured fallbackFlowPollFrequency value '{}' " +
+                log.warn("Configured fallbackFlowPollFrequency value " +
                                  "is not a number, using current value of {} seconds",
                          fallbackFlowPollFrequency);
             }
@@ -312,11 +314,7 @@
     public void applyFlowRules(FlowRule... flowRules) {
         checkPermission(FLOWRULE_WRITE);
 
-        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
-        for (FlowRule flowRule : flowRules) {
-            builder.add(flowRule);
-        }
-        apply(builder.build());
+        apply(buildFlowRuleOperations(true, null, flowRules));
     }
 
     @Override
@@ -339,11 +337,7 @@
     public void removeFlowRules(FlowRule... flowRules) {
         checkPermission(FLOWRULE_WRITE);
 
-        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
-        for (FlowRule flowRule : flowRules) {
-            builder.remove(flowRule);
-        }
-        apply(builder.build());
+        apply(buildFlowRuleOperations(false, null, flowRules));
     }
 
     @Override
@@ -395,7 +389,26 @@
     @Override
     public void apply(FlowRuleOperations ops) {
         checkPermission(FLOWRULE_WRITE);
-        operationsService.execute(new FlowOperationsProcessor(ops));
+        if (ops.stripeKey() == null) {
+            // Null means that we don't care about the in-order processing
+            // this approach maximizes the throughput but it can introduce
+            // consistency issues as the original order between conflictual
+            // writes is not maintained. If conflictual writes can be easily
+            // handled using different stages, this is the approach to use.
+            operationsService.execute(new FlowOperationsProcessor(ops));
+        } else {
+            // Following approach is suggested when it is hard to handle
+            // conflictual writes in the same FlowRuleOperations object. Apps
+            // may know there are conflictual writes but it could be hard to
+            // encapsulate them in the same object using different stages (above
+            // all if they are stimulated by different events). In this case,
+            // the probabilistic accumulation may help but it is brittle and based
+            // on the probability that a given event happens in a specific time.
+            // For this reason we have introduced PredictableFlowOperationsProcessor
+            // which uses the striped key (provided by the apps) to serialize the ops
+            // on the same executor.
+            operationsService.execute(new PredictableFlowOperationsProcessor(ops));
+        }
     }
 
     @Override
@@ -793,12 +806,12 @@
 
     private class FlowOperationsProcessor implements Runnable {
         // Immutable
-        private final FlowRuleOperations fops;
+        protected final FlowRuleOperations fops;
 
         // Mutable
-        private final List<Set<FlowRuleOperation>> stages;
-        private final Set<DeviceId> pendingDevices = new HashSet<>();
-        private boolean hasFailed = false;
+        protected final List<Set<FlowRuleOperation>> stages;
+        protected final Set<DeviceId> pendingDevices = new HashSet<>();
+        protected boolean hasFailed = false;
 
         FlowOperationsProcessor(FlowRuleOperations ops) {
             this.stages = Lists.newArrayList(ops.stages());
@@ -814,7 +827,7 @@
             }
         }
 
-        private void process(Set<FlowRuleOperation> ops) {
+        protected void process(Set<FlowRuleOperation> ops) {
             Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches = ArrayListMultimap.create();
 
             for (FlowRuleOperation op : ops) {
@@ -853,6 +866,91 @@
         }
     }
 
+    // Provides in-order processing in the local instance. The main difference with its
+    // ancestor is that the runnable ends when all the stages have been processed. Instead,
+    // its ancestor ends as soon as one stage has been processed and cannot guarantee in-order
+    // processing between subsequent stages and a new FlowRuleOperation (having the same key).
+    private class PredictableFlowOperationsProcessor extends FlowOperationsProcessor implements PickyRunnable {
+
+        private static final int WAIT_TIMEOUT = 5000;
+        private static final int WAIT_ATTEMPTS = 3;
+
+        PredictableFlowOperationsProcessor(FlowRuleOperations ops) {
+            super(ops);
+        }
+
+        @Override
+        public void run() {
+            try {
+                while (!stages.isEmpty()) {
+                    process(stages.remove(0));
+                    synchronized (this) {
+                        // Batch in flights - let's wait
+                        int attempts = 0;
+                        while (!pendingDevices.isEmpty() && attempts < WAIT_ATTEMPTS) {
+                            this.wait(WAIT_TIMEOUT);
+                            attempts++;
+                        }
+                        // Something wrong, we cannot block all the pipeline
+                        if (attempts == WAIT_ATTEMPTS) {
+                            break;
+                        }
+                    }
+                }
+            } catch (InterruptedException e) {
+                // Interrupted case
+                if (log.isTraceEnabled()) {
+                    log.trace("Interrupted while waiting for {} stages to be completed",
+                            stages.size());
+                }
+            }
+
+            synchronized (this) {
+                if (stages.isEmpty() && !hasFailed && pendingDevices.isEmpty()) {
+                    // No error and it is done, signal success to the apps
+                    fops.callback().onSuccess(fops);
+                } else {
+                    // It was interrupted or there is a failure - signal error.
+                    // This may introduce a duplicate error in some cases but
+                    // better than nothing and keeping the apps blocked forever.
+                    FlowRuleOperations.Builder failedOpsBuilder = FlowRuleOperations.builder();
+                    if (!stages.isEmpty()) {
+                        stages.remove(0).forEach(flowRuleOperation -> failedOpsBuilder.add(
+                                flowRuleOperation.rule()));
+                    }
+                    fops.callback().onError(failedOpsBuilder.build());
+                }
+            }
+        }
+
+        @Override
+        synchronized void satisfy(DeviceId devId) {
+            pendingDevices.remove(devId);
+            if (pendingDevices.isEmpty()) {
+                this.notifyAll();
+            }
+        }
+
+        @Override
+        synchronized void fail(DeviceId devId, Set<? extends FlowRule> failures) {
+            hasFailed = true;
+            pendingDevices.remove(devId);
+            if (pendingDevices.isEmpty()) {
+                this.notifyAll();
+            }
+
+            FlowRuleOperations.Builder failedOpsBuilder = FlowRuleOperations.builder();
+            failures.forEach(failedOpsBuilder::add);
+
+            fops.callback().onError(failedOpsBuilder.build());
+        }
+
+        @Override
+        public int hint() {
+            return fops.stripeKey();
+        }
+    }
+
     @Override
     public Iterable<TableStatisticsEntry> getFlowTableStatistics(DeviceId deviceId) {
         checkPermission(FLOWRULE_READ);
@@ -866,6 +964,35 @@
         return store.getActiveFlowRuleCount(deviceId);
     }
 
+    @Override
+    public void applyFlowRules(int key, FlowRule... flowRules) {
+        checkPermission(FLOWRULE_WRITE);
+
+        apply(buildFlowRuleOperations(true, key, flowRules));
+    }
+
+    @Override
+    public void removeFlowRules(int key, FlowRule... flowRules) {
+        checkPermission(FLOWRULE_WRITE);
+
+        apply(buildFlowRuleOperations(false, key, flowRules));
+    }
+
+    private FlowRuleOperations buildFlowRuleOperations(boolean add, Integer key, FlowRule... flowRules) {
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+        for (FlowRule flowRule : flowRules) {
+            if (add) {
+                builder.add(flowRule);
+            } else {
+                builder.remove(flowRule);
+            }
+        }
+        if (key != null) {
+            builder.striped(key);
+        }
+        return builder.build();
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
@@ -875,7 +1002,7 @@
                     DeviceId deviceId = event.subject().id();
                     if (!deviceService.isAvailable(deviceId)) {
                         BasicDeviceConfig cfg = netCfgService.getConfig(deviceId, BasicDeviceConfig.class);
-                        //if purgeOnDisconnection is set for the device or it's a global configuration
+                        // if purgeOnDisconnection is set for the device or it's a global configuration
                         // lets remove the flows. Priority is given to the per device flag
                         boolean purge = cfg != null && cfg.isPurgeOnDisconnectionConfigured() ?
                                 cfg.purgeOnDisconnection() : purgeOnDisconnection;