[VOL-3249] Batching operations in OLT pipeliner
Change-Id: Iea4a30ddb224ac5d9493b99565fb9b0dfe57f1bc
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
index dd72436..c6b1e81 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
@@ -29,6 +29,8 @@
import org.onlab.packet.IPv6;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.VlanId;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
@@ -38,6 +40,7 @@
import org.onosproject.net.behaviour.Pipeliner;
import org.onosproject.net.behaviour.PipelinerContext;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.driver.Driver;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -85,10 +88,14 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.Timer;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static org.onosproject.core.CoreService.CORE_APP_NAME;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -124,6 +131,13 @@
.register(OLTPipelineGroup.class)
.build("OltPipeline");
+ private static final Timer TIMER = new Timer("filterobj-batching");
+ private Accumulator<Pair<FilteringObjective, FlowRule>> accumulator;
+
+ // accumulator executor service
+ private ScheduledExecutorService accumulatorExecutorService
+ = newSingleThreadScheduledExecutor(groupedThreads("OltPipeliner", "acc-%d", log));
+
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
log.debug("Initiate OLT pipeline");
@@ -139,6 +153,16 @@
appId = coreService.registerApplication(
"org.onosproject.driver.OLTPipeline");
+ // Init the accumulator, if enabled
+ if (isAccumulatorEnabled()) {
+ log.debug("Building accumulator with maxObjs {}, batchMs {}, idleMs {}",
+ context.accumulatorMaxObjectives(), context.accumulatorMaxBatchMillis(),
+ context.accumulatorMaxIdleMillis());
+ accumulator = new ObjectiveAccumulator(context.accumulatorMaxObjectives(),
+ context.accumulatorMaxBatchMillis(),
+ context.accumulatorMaxIdleMillis());
+ }
+
pendingGroups = CacheBuilder.newBuilder()
.expireAfterWrite(20, TimeUnit.SECONDS)
@@ -152,6 +176,15 @@
}
+ public boolean isAccumulatorEnabled() {
+ Driver driver = super.data().driver();
+ // we cannot determine the property
+ if (driver == null) {
+ return false;
+ }
+ return Boolean.parseBoolean(driver.getProperty(ACCUMULATOR_ENABLED));
+ }
+
@Override
public void filter(FilteringObjective filter) {
Instructions.OutputInstruction output;
@@ -440,7 +473,8 @@
log.warn("Unknown forwarding operation: {}", fwd.op());
}
- applyFlowRules(builder, fwd);
+ applyFlowRules(ImmutableList.of(fwd), builder);
+
}
@@ -1005,21 +1039,26 @@
.withPriority(filter.priority())
.build();
- FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
-
- switch (filter.type()) {
- case PERMIT:
- opsBuilder.add(rule);
- break;
- case DENY:
- opsBuilder.remove(rule);
- break;
- default:
- log.warn("Unknown filter type : {}", filter.type());
- fail(filter, ObjectiveError.UNSUPPORTED);
+ if (accumulator != null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Adding pair to batch: {}", Pair.of(filter, rule));
+ }
+ accumulator.add(Pair.of(filter, rule));
+ } else {
+ FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+ switch (filter.type()) {
+ case PERMIT:
+ opsBuilder.add(rule);
+ break;
+ case DENY:
+ opsBuilder.remove(rule);
+ break;
+ default:
+ log.warn("Unknown filter type : {}", filter.type());
+ fail(filter, ObjectiveError.UNSUPPORTED);
+ }
+ applyFlowRules(ImmutableList.of(filter), opsBuilder);
}
-
- applyFlowRules(opsBuilder, filter);
}
private void applyRules(ForwardingObjective fwd, FlowRule.Builder... fwdBuilders) {
@@ -1043,24 +1082,62 @@
log.warn("Unknown forwarding operation: {}", fwd.op());
}
- applyFlowRules(builder, fwd);
+ applyFlowRules(ImmutableList.of(fwd), builder);
+
+
}
- private void applyFlowRules(FlowRuleOperations.Builder builder,
- Objective objective) {
+ private void applyFlowRules(List<Objective> objectives, FlowRuleOperations.Builder builder) {
flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
@Override
public void onSuccess(FlowRuleOperations ops) {
- pass(objective);
+ objectives.forEach(obj -> {
+ pass(obj);
+ });
}
@Override
public void onError(FlowRuleOperations ops) {
- fail(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
+ objectives.forEach(obj -> {
+ fail(obj, ObjectiveError.FLOWINSTALLATIONFAILED);
+ });
+
}
}));
}
+ // Builds the batch using the accumulated flow rules
+ private void sendFilters(List<Pair<FilteringObjective, FlowRule>> pairs) {
+ FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
+ log.debug("Sending batch of {} filter-objs", pairs.size());
+ List<Objective> filterObjs = Lists.newArrayList();
+ // Iterates over all accumulated flow rules and then build an unique batch
+ pairs.forEach(pair -> {
+ FilteringObjective filter = pair.getLeft();
+ FlowRule rule = pair.getRight();
+ switch (filter.type()) {
+ case PERMIT:
+ flowOpsBuilder.add(rule);
+ log.debug("Applying add filter-obj {} to device: {}", filter.id(), deviceId);
+ filterObjs.add(filter);
+ break;
+ case DENY:
+ flowOpsBuilder.remove(rule);
+ log.debug("Deleting flow rule {} to device: {}", rule, deviceId);
+ filterObjs.add(filter);
+ break;
+ default:
+ fail(filter, ObjectiveError.UNKNOWN);
+ log.warn("Unknown forwarding type {}", filter.type());
+ }
+ });
+ if (log.isDebugEnabled()) {
+ log.debug("Applying batch {}", flowOpsBuilder.build());
+ }
+ // Finally applies the operations
+ applyFlowRules(filterObjs, flowOpsBuilder);
+ }
+
private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
return criteria.stream()
.filter(c -> c.type().equals(type))
@@ -1149,4 +1226,37 @@
// TODO Implementation deferred to vendor
return null;
}
+
+ // Flow rules accumulator for reducing the number of transactions required to the devices.
+ private final class ObjectiveAccumulator
+ extends AbstractAccumulator<Pair<FilteringObjective, FlowRule>> {
+
+ ObjectiveAccumulator(int maxFilter, int maxBatchMS, int maxIdleMS) {
+ super(TIMER, maxFilter, maxBatchMS, maxIdleMS);
+ }
+
+ @Override
+ public void processItems(List<Pair<FilteringObjective, FlowRule>> pairs) {
+ // Triggers creation of a batch using the list of flowrules generated from objs.
+ accumulatorExecutorService.execute(new FlowRulesBuilderTask(pairs));
+ }
+ }
+
+ // Task for building batch of flow rules in a separate thread.
+ private final class FlowRulesBuilderTask implements Runnable {
+ private final List<Pair<FilteringObjective, FlowRule>> pairs;
+
+ FlowRulesBuilderTask(List<Pair<FilteringObjective, FlowRule>> pairs) {
+ this.pairs = pairs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ sendFilters(pairs);
+ } catch (Exception e) {
+ log.warn("Unable to send objectives", e);
+ }
+ }
+ }
}