Implements accumulation of the fwdobjectives in ofdpa pipelines
Change-Id: I95cbdd9b3fb8d439003a103111a01dc3aee2072b
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
index b3494bd..6f497e7 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
@@ -29,6 +29,16 @@
public interface Pipeliner extends HandlerBehaviour {
/**
+ * Accumulator enabled property. Determines whether the accumulator is enabled.
+ * The accumulator is assumed to be disabled if this property is undefined.
+ *
+ * If enabled, the pipeliner will try to accumulate objectives and create
+ * batches of flow rules when possible.
+ *
+ */
+ String ACCUMULATOR_ENABLED = "accumulatorEnabled";
+
+ /**
* Initializes the driver with context required for its operation.
*
* @param deviceId the deviceId
@@ -70,4 +80,5 @@
* an empty list if no groups were created
*/
List<String> getNextMappings(NextGroup nextGroup);
+
}
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java b/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
index d539e6e..5c7ddd1 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
@@ -38,4 +38,32 @@
FlowObjectiveStore store();
// TODO: add means to store and access shared state
+
+ /**
+ * Returns the maximum number of objectives to accumulate before processing is triggered.
+ *
+ * @return the maximum number of objectives. -1 if the method is not implemented.
+ */
+ default int accumulatorMaxObjectives() {
+ return -1;
+ }
+
+ /**
+ * Returns the maximum number of millis between objectives before processing is triggered.
+ *
+ * @return the maximum number of millis between objectives. -1 if the method is not implemented.
+ */
+ default int accumulatorMaxIdleMillis() {
+ return -1;
+ }
+
+ /**
+ * Returns the maximum number of millis allowed since the first objective before processing is triggered.
+ *
+ * @return the maximum number of millis allowed since the first objective. -1 if the method is not implemented.
+ */
+ default int accumulatorMaxBatchMillis() {
+ return -1;
+ }
+
}
diff --git a/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java b/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
index 3d6dec4..a88754f 100644
--- a/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
+++ b/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
@@ -131,4 +131,13 @@
public static final String AUDIT_LOGGER = "auditLogger";
public static final String AUDIT_LOGGER_DEFAULT = "securityAudit";
+ public static final String FOM_ACCUMULATOR_MAX_OBJECTIVES = "accumulatorMaxObjectives";
+ public static final int FOM_ACCUMULATOR_MAX_OBJECTIVES_DEFAULT = 1000;
+
+ public static final String FOM_ACCUMULATOR_MAX_IDLE_MILLIS = "accumulatorMaxIdleMillis";
+ public static final int FOM_ACCUMULATOR_MAX_IDLE_MILLIS_DEFAULT = 10;
+
+ public static final String FOM_ACCUMULATOR_MAX_BATCH_MILLIS = "accumulatorMaxBatchMillis";
+ public static final int FOM_ACCUMULATOR_MAX_BATCH_MILLIS_DEFAULT = 500;
+
}
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 1a4dd02..960f322 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -639,6 +639,7 @@
// getProvider is customized to favor driverProvider
FlowRuleProvider flowRuleProvider = getProvider(deviceId);
if (flowRuleProvider != null) {
+ log.trace("Sending {} flow rules to {}", batchOperation.size(), deviceId);
flowRuleProvider.executeBatch(batchOperation);
}
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 2368f25..d0bd1c5 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -75,6 +75,12 @@
import static org.onosproject.net.AnnotationKeys.DRIVER;
import static org.onosproject.net.OsgiPropertyConstants.FOM_NUM_THREADS;
import static org.onosproject.net.OsgiPropertyConstants.FOM_NUM_THREADS_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_OBJECTIVES;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_OBJECTIVES_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_IDLE_MILLIS;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_IDLE_MILLIS_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_BATCH_MILLIS;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_BATCH_MILLIS_DEFAULT;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
@@ -85,7 +91,10 @@
enabled = false,
service = FlowObjectiveService.class,
property = {
- FOM_NUM_THREADS + ":Integer=" + FOM_NUM_THREADS_DEFAULT
+ FOM_NUM_THREADS + ":Integer=" + FOM_NUM_THREADS_DEFAULT,
+ FOM_ACCUMULATOR_MAX_OBJECTIVES + ":Integer=" + FOM_ACCUMULATOR_MAX_OBJECTIVES_DEFAULT,
+ FOM_ACCUMULATOR_MAX_IDLE_MILLIS + ":Integer=" + FOM_ACCUMULATOR_MAX_IDLE_MILLIS_DEFAULT,
+ FOM_ACCUMULATOR_MAX_BATCH_MILLIS + ":Integer=" + FOM_ACCUMULATOR_MAX_BATCH_MILLIS_DEFAULT,
}
)
public class FlowObjectiveManager implements FlowObjectiveService {
@@ -101,6 +110,18 @@
/** Number of worker threads. */
private int numThreads = FOM_NUM_THREADS_DEFAULT;
+ // Parameters for the accumulator, each pipeline can implement
+ // its own accumulation logic. The following parameters are used
+ // to control the accumulator.
+
+ // Maximum number of objectives to accumulate before processing is triggered
+ private int accumulatorMaxObjectives = FOM_ACCUMULATOR_MAX_OBJECTIVES_DEFAULT;
+ // Maximum number of millis between objectives before processing is triggered
+ private int accumulatorMaxIdleMillis = FOM_ACCUMULATOR_MAX_IDLE_MILLIS_DEFAULT;
+ // Maximum number of millis allowed since the first objective before processing is triggered
+ private int accumulatorMaxBatchMillis = FOM_ACCUMULATOR_MAX_BATCH_MILLIS_DEFAULT;
+
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DriverService driverService;
@@ -154,10 +175,11 @@
protected ExecutorService devEventExecutor;
@Activate
- protected void activate() {
+ protected void activate(ComponentContext context) {
cfgService.registerProperties(FlowObjectiveManager.class);
executorService = newFixedThreadPool(numThreads,
groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+ modified(context);
devEventExecutor = newSingleThreadScheduledExecutor(
groupedThreads("onos/flowobj-dev-events", "events-%d", log));
flowObjectiveStore.setDelegate(delegate);
@@ -183,8 +205,18 @@
@Modified
protected void modified(ComponentContext context) {
- String propertyValue =
- Tools.get(context.getProperties(), FOM_NUM_THREADS);
+ if (context != null) {
+ readComponentConfiguration(context);
+ }
+ }
+
+ /**
+ * Extracts properties from the component configuration context.
+ *
+ * @param context the component context
+ */
+ private void readComponentConfiguration(ComponentContext context) {
+ String propertyValue = Tools.get(context.getProperties(), FOM_NUM_THREADS);
int newNumThreads = isNullOrEmpty(propertyValue) ? numThreads : Integer.parseInt(propertyValue);
if (newNumThreads != numThreads && newNumThreads > 0) {
@@ -197,6 +229,36 @@
}
log.info("Reconfigured number of worker threads to {}", numThreads);
}
+
+ // Reconfiguration of the accumulator parameters is allowed
+ // Note: it will affect only pipelines going through init method
+ propertyValue = Tools.get(context.getProperties(), FOM_ACCUMULATOR_MAX_OBJECTIVES);
+ int newMaxObjs = isNullOrEmpty(propertyValue) ?
+ accumulatorMaxObjectives : Integer.parseInt(propertyValue);
+ if (newMaxObjs != accumulatorMaxObjectives && newMaxObjs > 0) {
+ accumulatorMaxObjectives = newMaxObjs;
+ log.info("Reconfigured maximum number of objectives to accumulate to {}",
+ accumulatorMaxObjectives);
+ }
+
+ propertyValue = Tools.get(context.getProperties(), FOM_ACCUMULATOR_MAX_IDLE_MILLIS);
+ int newMaxIdleMS = isNullOrEmpty(propertyValue) ?
+ accumulatorMaxIdleMillis : Integer.parseInt(propertyValue);
+ if (newMaxIdleMS != accumulatorMaxIdleMillis && newMaxIdleMS > 0) {
+ accumulatorMaxIdleMillis = newMaxIdleMS;
+ log.info("Reconfigured maximum number of millis between objectives to {}",
+ accumulatorMaxIdleMillis);
+ }
+
+ propertyValue = Tools.get(context.getProperties(), FOM_ACCUMULATOR_MAX_BATCH_MILLIS);
+ int newMaxBatchMS = isNullOrEmpty(propertyValue) ?
+ accumulatorMaxBatchMillis : Integer.parseInt(propertyValue);
+ if (newMaxBatchMS != accumulatorMaxBatchMillis && newMaxBatchMS > 0) {
+ accumulatorMaxBatchMillis = newMaxBatchMS;
+ log.info("Reconfigured maximum number of millis allowed since the first objective to {}",
+ accumulatorMaxBatchMillis);
+ }
+
}
/**
@@ -531,6 +593,7 @@
// Processing context for initializing pipeline driver behaviours.
private class InnerPipelineContext implements PipelinerContext {
+
@Override
public ServiceDirectory directory() {
return serviceDirectory;
@@ -540,6 +603,22 @@
public FlowObjectiveStore store() {
return flowObjectiveStore;
}
+
+ @Override
+ public int accumulatorMaxObjectives() {
+ return accumulatorMaxObjectives;
+ }
+
+ @Override
+ public int accumulatorMaxIdleMillis() {
+ return accumulatorMaxIdleMillis;
+ }
+
+ @Override
+ public int accumulatorMaxBatchMillis() {
+ return accumulatorMaxBatchMillis;
+ }
+
}
private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index f9cc9bf..0999698 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -42,6 +42,7 @@
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -85,8 +86,8 @@
final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
@Activate
- protected void activate() {
- super.activate();
+ protected void activate(ComponentContext context) {
+ super.activate(context);
filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
index 32bd7dc..121bfaa 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
@@ -190,7 +190,7 @@
filteringObjectives = new ArrayList<>();
forwardingObjectives = new ArrayList<>();
nextObjectives = new ArrayList<>();
- manager.activate();
+ manager.activate(null);
}
@After
@@ -243,7 +243,7 @@
.addCondition(Criteria.matchEthType(12))
.add();
- manager.activate();
+ manager.activate(null);
manager.filter(id1, filter);
TestTools.assertAfter(RETRY_MS, () ->
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index 32c9f95..93b6337 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -198,7 +198,7 @@
mgr.deviceService = createMock(DeviceService.class);
mgr.driverService = createMock(DriverService.class);
mgr.flowObjectiveStore = createMock(FlowObjectiveStore.class);
- mgr.activate();
+ mgr.activate(null);
reset(mgr.flowObjectiveStore);
offset = DEFAULT_OFFSET;
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
index 851b24b..f4ab685 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
@@ -16,7 +16,9 @@
package org.onosproject.driver.pipeline.ofdpa;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
import org.onlab.osgi.ServiceDirectory;
import org.onlab.packet.EthType;
import org.onlab.packet.Ethernet;
@@ -25,6 +27,8 @@
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
@@ -43,6 +47,7 @@
import org.onosproject.net.behaviour.PipelinerContext;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.driver.Driver;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -98,10 +103,12 @@
import java.util.List;
import java.util.Objects;
import java.util.Optional;
+import java.util.Timer;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.packet.MacAddress.BROADCAST;
import static org.onlab.packet.MacAddress.IPV4_MULTICAST;
import static org.onlab.packet.MacAddress.IPV6_MULTICAST;
@@ -119,6 +126,10 @@
* Driver for Broadcom's OF-DPA v2.0 TTP.
*/
public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeliner {
+ // Timer for the accumulator
+ private static final Timer TIMER = new Timer("fwdobj-batching");
+ private Accumulator<Pair<ForwardingObjective, Collection<FlowRule>>> accumulator;
+
protected static final int PORT_TABLE = 0;
protected static final int VLAN_TABLE = 10;
protected static final int VLAN_1_TABLE = 11;
@@ -175,11 +186,15 @@
protected Ofdpa2GroupHandler groupHandler;
// flows installations to be retried
- private ScheduledExecutorService executorService
+ private ScheduledExecutorService retryExecutorService
= newScheduledThreadPool(5, groupedThreads("OfdpaPipeliner", "retry-%d", log));
private static final int MAX_RETRY_ATTEMPTS = 10;
private static final int RETRY_MS = 1000;
+ // accumulator executor service
+ private ScheduledExecutorService accumulatorExecutorService
+ = newSingleThreadScheduledExecutor(groupedThreads("OfdpaPipeliner", "acc-%d", log));
+
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
this.deviceId = deviceId;
@@ -190,6 +205,12 @@
groupService = serviceDirectory.get(GroupService.class);
flowObjectiveStore = context.store();
deviceService = serviceDirectory.get(DeviceService.class);
+ // Init the accumulator, if enabled
+ if (isAccumulatorEnabled()) {
+ accumulator = new ForwardingObjectiveAccumulator(context.accumulatorMaxObjectives(),
+ context.accumulatorMaxBatchMillis(),
+ context.accumulatorMaxIdleMillis());
+ }
initDriverId();
initGroupHander(context);
@@ -213,6 +234,15 @@
// software switches does require table-miss-entries.
}
+ public boolean isAccumulatorEnabled() {
+ Driver driver = super.data().driver();
+ // we cannot determine the property
+ if (driver == null) {
+ return false;
+ }
+ return Boolean.parseBoolean(driver.getProperty(ACCUMULATOR_ENABLED));
+ }
+
/**
* Determines whether this pipeline requires MPLS POP instruction.
*
@@ -329,38 +359,57 @@
// generated by FlowRule service for empty flowOps.
return;
}
- sendForward(fwd, rules);
+ // Let's accumulate flow rules - otherwise send directly
+ if (accumulator != null) {
+ accumulator.add(Pair.of(fwd, rules));
+ } else {
+ sendForwards(Collections.singletonList(Pair.of(fwd, rules)));
+ }
}
- private void sendForward(ForwardingObjective fwd, Collection<FlowRule> rules) {
+ // Builds the batch using the accumulated flow rules
+ private void sendForwards(List<Pair<ForwardingObjective, Collection<FlowRule>>> pairs) {
FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
- switch (fwd.op()) {
- case ADD:
- rules.stream()
- .filter(Objects::nonNull)
- .forEach(flowOpsBuilder::add);
- log.debug("Applying a add fwd-obj {} to sw:{}", fwd.id(), deviceId);
- break;
- case REMOVE:
- rules.stream()
- .filter(Objects::nonNull)
- .forEach(flowOpsBuilder::remove);
- log.debug("Deleting a flow rule to sw:{}", deviceId);
- break;
- default:
- fail(fwd, ObjectiveError.UNKNOWN);
- log.warn("Unknown forwarding type {}", fwd.op());
- }
-
+ log.debug("Sending {} fwd-objs", pairs.size());
+ List<Objective> fwdObjs = Lists.newArrayList();
+ // Iterates over all accumulated flow rules and then build an unique batch
+ pairs.forEach(pair -> {
+ ForwardingObjective fwd = pair.getLeft();
+ Collection<FlowRule> rules = pair.getRight();
+ switch (fwd.op()) {
+ case ADD:
+ rules.stream()
+ .filter(Objects::nonNull)
+ .forEach(flowOpsBuilder::add);
+ log.debug("Applying a add fwd-obj {} to sw:{}", fwd.id(), deviceId);
+ fwdObjs.add(fwd);
+ break;
+ case REMOVE:
+ rules.stream()
+ .filter(Objects::nonNull)
+ .forEach(flowOpsBuilder::remove);
+ log.debug("Deleting a flow rule to sw:{}", deviceId);
+ fwdObjs.add(fwd);
+ break;
+ default:
+ fail(fwd, ObjectiveError.UNKNOWN);
+ log.warn("Unknown forwarding type {}", fwd.op());
+ }
+ });
+ // Finally applies the operations
flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
+
@Override
public void onSuccess(FlowRuleOperations ops) {
- pass(fwd);
+ log.trace("Flow rule operations onSuccess {}", ops);
+ fwdObjs.forEach(Ofdpa2Pipeline::pass);
}
@Override
public void onError(FlowRuleOperations ops) {
- fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
+ ObjectiveError error = ObjectiveError.FLOWINSTALLATIONFAILED;
+ log.warn("Flow rule operations onError {}. Reason = {}", ops, error);
+ fwdObjs.forEach(fwdObj -> fail(fwdObj, error));
}
}));
}
@@ -1528,7 +1577,7 @@
}
if (emptyGroup) {
- executorService.schedule(new RetryFlows(fwd, flowRuleCollection),
+ retryExecutorService.schedule(new RetryFlows(fwd, flowRuleCollection),
RETRY_MS, TimeUnit.MILLISECONDS);
}
return flowRuleCollection;
@@ -1951,9 +2000,42 @@
public void run() {
log.info("RETRY FLOWS ATTEMPT# {} for fwd:{} rules:{}",
MAX_RETRY_ATTEMPTS - attempts, fwd.id(), retryFlows.size());
- sendForward(fwd, retryFlows);
+ sendForwards(Collections.singletonList(Pair.of(fwd, retryFlows)));
if (--attempts > 0) {
- executorService.schedule(this, RETRY_MS, TimeUnit.MILLISECONDS);
+ retryExecutorService.schedule(this, RETRY_MS, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+
+ // Flow rules accumulator for reducing the number of transactions required to the devices.
+ private final class ForwardingObjectiveAccumulator
+ extends AbstractAccumulator<Pair<ForwardingObjective, Collection<FlowRule>>> {
+
+ ForwardingObjectiveAccumulator(int maxFwd, int maxBatchMS, int maxIdleMS) {
+ super(TIMER, maxFwd, maxBatchMS, maxIdleMS);
+ }
+
+ @Override
+ public void processItems(List<Pair<ForwardingObjective, Collection<FlowRule>>> pairs) {
+ // Triggers creation of a batch using the list of flowrules generated from fwdobjs.
+ accumulatorExecutorService.execute(new FlowRulesBuilderTask(pairs));
+ }
+ }
+
+ // Task for building batch of flow rules in a separate thread.
+ private final class FlowRulesBuilderTask implements Runnable {
+ private final List<Pair<ForwardingObjective, Collection<FlowRule>>> pairs;
+
+ FlowRulesBuilderTask(List<Pair<ForwardingObjective, Collection<FlowRule>>> pairs) {
+ this.pairs = pairs;
+ }
+
+ @Override
+ public void run() {
+ try {
+ sendForwards(pairs);
+ } catch (Exception e) {
+ log.warn("Unable to send forwards", e);
}
}
}
diff --git a/drivers/default/src/main/resources/onos-drivers.xml b/drivers/default/src/main/resources/onos-drivers.xml
index 46fe30d..c37ad79 100644
--- a/drivers/default/src/main/resources/onos-drivers.xml
+++ b/drivers/default/src/main/resources/onos-drivers.xml
@@ -72,6 +72,7 @@
<behaviour api="org.onosproject.net.behaviour.ExtensionSelectorResolver"
impl="org.onosproject.driver.extensions.OfdpaExtensionSelectorInterpreter" />
<property name="meterCapable">false</property>
+ <property name="accumulatorEnabled">true</property>
</driver>
<!-- Driver for OFDPA 3.0 EA*.
diff --git a/tools/build/conf/src/main/resources/onos/suppressions.xml b/tools/build/conf/src/main/resources/onos/suppressions.xml
index d1c613b..f87d11f 100644
--- a/tools/build/conf/src/main/resources/onos/suppressions.xml
+++ b/tools/build/conf/src/main/resources/onos/suppressions.xml
@@ -31,6 +31,7 @@
<suppress files="org.onlab.jdvue.*" checks="AbbreviationAsWordInName" />
<suppress files="org.onosproject.driver.pipeline.*" checks="AbbreviationAsWordInName" />
<suppress files="org.onosproject.driver.pipeline.ofdpa.Ofdpa2GroupHandler.java" checks="FileLength" />
+ <suppress files="org.onosproject.driver.pipeline.ofdpa.Ofdpa2Pipeline.java" checks="FileLength" />
<suppress files="org.onosproject.segmentrouting.*" checks="AbbreviationAsWordInName" />
<!-- These files carry AT&T copyrights -->