[SDFAB-1100] In-order flowrule processing
Extends the FlowRuleService and its api by adding in-order processing
capabilities. This is achieved by introducing stripe key as way to
indicate how to process the flowrules. Key is an object which is used
to select a specific executor. Operations having the same key is guaranteed
that will be processed by the same executor.
Change-Id: I5ab4d42e8a2b8cb869f3dc2305dbc5084d31f08b
(cherry picked from commit 07af21d9e116414b74152dc812cc980ca2c4c0a5)
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
index 9c0f522..14af90d 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleOperations.java
@@ -24,7 +24,9 @@
import java.util.Set;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onosproject.net.flow.FlowRuleOperation.Type.*;
+import static org.onosproject.net.flow.FlowRuleOperation.Type.ADD;
+import static org.onosproject.net.flow.FlowRuleOperation.Type.REMOVE;
+import static org.onosproject.net.flow.FlowRuleOperation.Type.MODIFY;
/**
* A batch of flow rule operations that are broken into stages.
@@ -34,17 +36,21 @@
private final List<Set<FlowRuleOperation>> stages;
private final FlowRuleOperationsContext callback;
+ private final Integer stripeKey;
private FlowRuleOperations(List<Set<FlowRuleOperation>> stages,
- FlowRuleOperationsContext cb) {
+ FlowRuleOperationsContext cb,
+ Integer stripeKey) {
this.stages = stages;
this.callback = cb;
+ this.stripeKey = stripeKey;
}
// kryo-constructor
protected FlowRuleOperations() {
this.stages = Lists.newArrayList();
this.callback = null;
+ this.stripeKey = null;
}
/**
@@ -67,6 +73,18 @@
}
/**
+ * Returns the stripe key. Expectation is that FlowRuleOperations with the
+ * same key will be executed sequentially in the same order as they are
+ * submitted to the FlowRuleService. Operations without a key or with
+ * different keys might be executed in parallel.
+ *
+ * @return the stripe key associated to this or null if not present
+ */
+ public Integer stripeKey() {
+ return stripeKey;
+ }
+
+ /**
* Returns a new builder.
*
* @return new builder
@@ -89,6 +107,7 @@
private final ImmutableList.Builder<Set<FlowRuleOperation>> listBuilder = ImmutableList.builder();
private ImmutableSet.Builder<FlowRuleOperation> currentStage = ImmutableSet.builder();
+ private Integer stripeKey = null;
// prevent use of the default constructor outside of this file; use the above method
private Builder() {}
@@ -160,6 +179,25 @@
}
/**
+ * Provides semantics similar to lock striping. FlowRuleOperations
+ * with the same key will be executed sequentially. Operations without
+ * a key or with different keys might be executed in parallel.
+ * <p>
+ * This parameter is useful to correlate different operations with
+ * potentially conflicting writes, to guarantee that operations are
+ * executed in-order. For example, to handle the case where one operation
+ * that removes a flow rule is followed by one that adds the same flow rule.
+ * </p>
+ *
+ * @param stripeKey an integer key
+ * @return this
+ */
+ public Builder striped(int stripeKey) {
+ this.stripeKey = stripeKey;
+ return this;
+ }
+
+ /**
* Builds the immutable flow rule operations.
*
* @return flow rule operations
@@ -178,7 +216,7 @@
checkNotNull(cb);
closeStage();
- return new FlowRuleOperations(listBuilder.build(), cb);
+ return new FlowRuleOperations(listBuilder.build(), cb, stripeKey);
}
}
}
diff --git a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
index 9cada03..807d09e 100644
--- a/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onosproject/net/flow/FlowRuleService.java
@@ -196,4 +196,33 @@
default long getActiveFlowRuleCount(DeviceId deviceId) {
return 0;
}
+
+ /**
+ * Applies the specified flow rules onto their respective devices. Similar
+ * to {@link FlowRuleService#applyFlowRules(FlowRule...)} but expectation is
+ * that flow rules applied by subsequent calls using the same key will be
+ * executed sequentially. Flow rules applied through {@link FlowRuleService#applyFlowRules(FlowRule...)}
+ * might be executed in parallel.
+ *
+ * @param stripeKey an integer key
+ * @param flowRules one or more flow rules
+ */
+ default void applyFlowRules(int stripeKey, FlowRule... flowRules) {
+
+ }
+
+ /**
+ * Removes the specified flow rules from their respective devices. Similar
+ * to {@link FlowRuleService#removeFlowRules(FlowRule...)} but expectation is
+ * that flow rules removed by subsequent calls using the same key will be
+ * executed sequentially. Flow rules applied through {@link FlowRuleService#removeFlowRules(FlowRule...)}
+ * might be executed in parallel.
+ *
+ * @param stripeKey an integer key
+ * @param flowRules one or more flow rules
+ */
+ default void removeFlowRules(int stripeKey, FlowRule... flowRules) {
+
+ }
+
}
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 670103b..e7cd691 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -21,6 +21,8 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import org.onlab.util.PredictableExecutor;
+import org.onlab.util.PredictableExecutor.PickyRunnable;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
@@ -143,11 +145,11 @@
private final FlowRuleDriverProvider driverProvider = new FlowRuleDriverProvider();
- protected ExecutorService deviceInstallers =
- Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "device-installer-%d", log));
+ protected ExecutorService deviceInstallers = Executors.newFixedThreadPool(32,
+ groupedThreads("onos/flowservice", "device-installer-%d", log));
- protected ExecutorService operationsService =
- Executors.newFixedThreadPool(32, groupedThreads("onos/flowservice", "operations-%d", log));
+ protected ExecutorService operationsService = new PredictableExecutor(32,
+ groupedThreads("onos/flowservice", "operations-%d", log));
private IdGenerator idGenerator;
@@ -267,7 +269,7 @@
log.info("Configured. FallbackFlowPollFrequency is {} seconds",
fallbackFlowPollFrequency);
} catch (NumberFormatException e) {
- log.warn("Configured fallbackFlowPollFrequency value '{}' " +
+ log.warn("Configured fallbackFlowPollFrequency value " +
"is not a number, using current value of {} seconds",
fallbackFlowPollFrequency);
}
@@ -312,11 +314,7 @@
public void applyFlowRules(FlowRule... flowRules) {
checkPermission(FLOWRULE_WRITE);
- FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
- for (FlowRule flowRule : flowRules) {
- builder.add(flowRule);
- }
- apply(builder.build());
+ apply(buildFlowRuleOperations(true, null, flowRules));
}
@Override
@@ -339,11 +337,7 @@
public void removeFlowRules(FlowRule... flowRules) {
checkPermission(FLOWRULE_WRITE);
- FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
- for (FlowRule flowRule : flowRules) {
- builder.remove(flowRule);
- }
- apply(builder.build());
+ apply(buildFlowRuleOperations(false, null, flowRules));
}
@Override
@@ -395,7 +389,26 @@
@Override
public void apply(FlowRuleOperations ops) {
checkPermission(FLOWRULE_WRITE);
- operationsService.execute(new FlowOperationsProcessor(ops));
+ if (ops.stripeKey() == null) {
+ // Null means that we don't care about the in-order processing
+ // this approach maximizes the throughput but it can introduce
+ // consistency issues as the original order between conflictual
+ // writes is not maintained. If conflictual writes can be easily
+ // handled using different stages, this is the approach to use.
+ operationsService.execute(new FlowOperationsProcessor(ops));
+ } else {
+ // Following approach is suggested when it is hard to handle
+ // conflictual writes in the same FlowRuleOperations object. Apps
+ // may know there are conflictual writes but it could be hard to
+ // encapsulate them in the same object using different stages (above
+ // all if they are stimulated by different events). In this case,
+ // the probabilistic accumulation may help but it is brittle and based
+ // on the probability that a given event happens in a specific time.
+ // For this reason we have introduced PredictableFlowOperationsProcessor
+ // which uses the striped key (provided by the apps) to serialize the ops
+ // on the same executor.
+ operationsService.execute(new PredictableFlowOperationsProcessor(ops));
+ }
}
@Override
@@ -793,12 +806,12 @@
private class FlowOperationsProcessor implements Runnable {
// Immutable
- private final FlowRuleOperations fops;
+ protected final FlowRuleOperations fops;
// Mutable
- private final List<Set<FlowRuleOperation>> stages;
- private final Set<DeviceId> pendingDevices = new HashSet<>();
- private boolean hasFailed = false;
+ protected final List<Set<FlowRuleOperation>> stages;
+ protected final Set<DeviceId> pendingDevices = new HashSet<>();
+ protected boolean hasFailed = false;
FlowOperationsProcessor(FlowRuleOperations ops) {
this.stages = Lists.newArrayList(ops.stages());
@@ -814,7 +827,7 @@
}
}
- private void process(Set<FlowRuleOperation> ops) {
+ protected void process(Set<FlowRuleOperation> ops) {
Multimap<DeviceId, FlowRuleBatchEntry> perDeviceBatches = ArrayListMultimap.create();
for (FlowRuleOperation op : ops) {
@@ -853,6 +866,91 @@
}
}
+ // Provides in-order processing in the local instance. The main difference with its
+ // ancestor is that the runnable ends when all the stages have been processed. Instead,
+ // its ancestor ends as soon as one stage has been processed and cannot guarantee in-order
+ // processing between subsequent stages and a new FlowRuleOperation (having the same key).
+ private class PredictableFlowOperationsProcessor extends FlowOperationsProcessor implements PickyRunnable {
+
+ private static final int WAIT_TIMEOUT = 5000;
+ private static final int WAIT_ATTEMPTS = 3;
+
+ PredictableFlowOperationsProcessor(FlowRuleOperations ops) {
+ super(ops);
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!stages.isEmpty()) {
+ process(stages.remove(0));
+ synchronized (this) {
+ // Batch in flights - let's wait
+ int attempts = 0;
+ while (!pendingDevices.isEmpty() && attempts < WAIT_ATTEMPTS) {
+ this.wait(WAIT_TIMEOUT);
+ attempts++;
+ }
+ // Something wrong, we cannot block all the pipeline
+ if (attempts == WAIT_ATTEMPTS) {
+ break;
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ // Interrupted case
+ if (log.isTraceEnabled()) {
+ log.trace("Interrupted while waiting for {} stages to be completed",
+ stages.size());
+ }
+ }
+
+ synchronized (this) {
+ if (stages.isEmpty() && !hasFailed && pendingDevices.isEmpty()) {
+ // No error and it is done, signal success to the apps
+ fops.callback().onSuccess(fops);
+ } else {
+ // It was interrupted or there is a failure - signal error.
+ // This may introduce a duplicate error in some cases but
+ // better than nothing and keeping the apps blocked forever.
+ FlowRuleOperations.Builder failedOpsBuilder = FlowRuleOperations.builder();
+ if (!stages.isEmpty()) {
+ stages.remove(0).forEach(flowRuleOperation -> failedOpsBuilder.add(
+ flowRuleOperation.rule()));
+ }
+ fops.callback().onError(failedOpsBuilder.build());
+ }
+ }
+ }
+
+ @Override
+ synchronized void satisfy(DeviceId devId) {
+ pendingDevices.remove(devId);
+ if (pendingDevices.isEmpty()) {
+ this.notifyAll();
+ }
+ }
+
+ @Override
+ synchronized void fail(DeviceId devId, Set<? extends FlowRule> failures) {
+ hasFailed = true;
+ pendingDevices.remove(devId);
+ if (pendingDevices.isEmpty()) {
+ this.notifyAll();
+ }
+
+ FlowRuleOperations.Builder failedOpsBuilder = FlowRuleOperations.builder();
+ failures.forEach(failedOpsBuilder::add);
+
+ fops.callback().onError(failedOpsBuilder.build());
+ }
+
+ @Override
+ public int hint() {
+ return fops.stripeKey();
+ }
+ }
+
@Override
public Iterable<TableStatisticsEntry> getFlowTableStatistics(DeviceId deviceId) {
checkPermission(FLOWRULE_READ);
@@ -866,6 +964,35 @@
return store.getActiveFlowRuleCount(deviceId);
}
+ @Override
+ public void applyFlowRules(int key, FlowRule... flowRules) {
+ checkPermission(FLOWRULE_WRITE);
+
+ apply(buildFlowRuleOperations(true, key, flowRules));
+ }
+
+ @Override
+ public void removeFlowRules(int key, FlowRule... flowRules) {
+ checkPermission(FLOWRULE_WRITE);
+
+ apply(buildFlowRuleOperations(false, key, flowRules));
+ }
+
+ private FlowRuleOperations buildFlowRuleOperations(boolean add, Integer key, FlowRule... flowRules) {
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+ for (FlowRule flowRule : flowRules) {
+ if (add) {
+ builder.add(flowRule);
+ } else {
+ builder.remove(flowRule);
+ }
+ }
+ if (key != null) {
+ builder.striped(key);
+ }
+ return builder.build();
+ }
+
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
@@ -875,7 +1002,7 @@
DeviceId deviceId = event.subject().id();
if (!deviceService.isAvailable(deviceId)) {
BasicDeviceConfig cfg = netCfgService.getConfig(deviceId, BasicDeviceConfig.class);
- //if purgeOnDisconnection is set for the device or it's a global configuration
+ // if purgeOnDisconnection is set for the device or it's a global configuration
// lets remove the flows. Priority is given to the per device flag
boolean purge = cfg != null && cfg.isPurgeOnDisconnectionConfigured() ?
cfg.purgeOnDisconnection() : purgeOnDisconnection;