Implements accumulation of the fwdobjectives in ofdpa pipelines
Change-Id: I95cbdd9b3fb8d439003a103111a01dc3aee2072b
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 2368f25..d0bd1c5 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -75,6 +75,12 @@
import static org.onosproject.net.AnnotationKeys.DRIVER;
import static org.onosproject.net.OsgiPropertyConstants.FOM_NUM_THREADS;
import static org.onosproject.net.OsgiPropertyConstants.FOM_NUM_THREADS_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_OBJECTIVES;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_OBJECTIVES_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_IDLE_MILLIS;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_IDLE_MILLIS_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_BATCH_MILLIS;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_BATCH_MILLIS_DEFAULT;
import static org.onosproject.security.AppGuard.checkPermission;
import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
@@ -85,7 +91,10 @@
enabled = false,
service = FlowObjectiveService.class,
property = {
- FOM_NUM_THREADS + ":Integer=" + FOM_NUM_THREADS_DEFAULT
+ FOM_NUM_THREADS + ":Integer=" + FOM_NUM_THREADS_DEFAULT,
+ FOM_ACCUMULATOR_MAX_OBJECTIVES + ":Integer=" + FOM_ACCUMULATOR_MAX_OBJECTIVES_DEFAULT,
+ FOM_ACCUMULATOR_MAX_IDLE_MILLIS + ":Integer=" + FOM_ACCUMULATOR_MAX_IDLE_MILLIS_DEFAULT,
+ FOM_ACCUMULATOR_MAX_BATCH_MILLIS + ":Integer=" + FOM_ACCUMULATOR_MAX_BATCH_MILLIS_DEFAULT,
}
)
public class FlowObjectiveManager implements FlowObjectiveService {
@@ -101,6 +110,18 @@
/** Number of worker threads. */
private int numThreads = FOM_NUM_THREADS_DEFAULT;
+ // Parameters for the accumulator, each pipeline can implement
+ // its own accumulation logic. The following parameters are used
+ // to control the accumulator.
+
+ // Maximum number of objectives to accumulate before processing is triggered
+ private int accumulatorMaxObjectives = FOM_ACCUMULATOR_MAX_OBJECTIVES_DEFAULT;
+ // Maximum number of millis between objectives before processing is triggered
+ private int accumulatorMaxIdleMillis = FOM_ACCUMULATOR_MAX_IDLE_MILLIS_DEFAULT;
+ // Maximum number of millis allowed since the first objective before processing is triggered
+ private int accumulatorMaxBatchMillis = FOM_ACCUMULATOR_MAX_BATCH_MILLIS_DEFAULT;
+
+
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DriverService driverService;
@@ -154,10 +175,11 @@
protected ExecutorService devEventExecutor;
@Activate
- protected void activate() {
+ protected void activate(ComponentContext context) {
cfgService.registerProperties(FlowObjectiveManager.class);
executorService = newFixedThreadPool(numThreads,
groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+ modified(context);
devEventExecutor = newSingleThreadScheduledExecutor(
groupedThreads("onos/flowobj-dev-events", "events-%d", log));
flowObjectiveStore.setDelegate(delegate);
@@ -183,8 +205,18 @@
@Modified
protected void modified(ComponentContext context) {
- String propertyValue =
- Tools.get(context.getProperties(), FOM_NUM_THREADS);
+ if (context != null) {
+ readComponentConfiguration(context);
+ }
+ }
+
+ /**
+ * Extracts properties from the component configuration context.
+ *
+ * @param context the component context
+ */
+ private void readComponentConfiguration(ComponentContext context) {
+ String propertyValue = Tools.get(context.getProperties(), FOM_NUM_THREADS);
int newNumThreads = isNullOrEmpty(propertyValue) ? numThreads : Integer.parseInt(propertyValue);
if (newNumThreads != numThreads && newNumThreads > 0) {
@@ -197,6 +229,36 @@
}
log.info("Reconfigured number of worker threads to {}", numThreads);
}
+
+ // Reconfiguration of the accumulator parameters is allowed
+ // Note: it will affect only pipelines going through init method
+ propertyValue = Tools.get(context.getProperties(), FOM_ACCUMULATOR_MAX_OBJECTIVES);
+ int newMaxObjs = isNullOrEmpty(propertyValue) ?
+ accumulatorMaxObjectives : Integer.parseInt(propertyValue);
+ if (newMaxObjs != accumulatorMaxObjectives && newMaxObjs > 0) {
+ accumulatorMaxObjectives = newMaxObjs;
+ log.info("Reconfigured maximum number of objectives to accumulate to {}",
+ accumulatorMaxObjectives);
+ }
+
+ propertyValue = Tools.get(context.getProperties(), FOM_ACCUMULATOR_MAX_IDLE_MILLIS);
+ int newMaxIdleMS = isNullOrEmpty(propertyValue) ?
+ accumulatorMaxIdleMillis : Integer.parseInt(propertyValue);
+ if (newMaxIdleMS != accumulatorMaxIdleMillis && newMaxIdleMS > 0) {
+ accumulatorMaxIdleMillis = newMaxIdleMS;
+ log.info("Reconfigured maximum number of millis between objectives to {}",
+ accumulatorMaxIdleMillis);
+ }
+
+ propertyValue = Tools.get(context.getProperties(), FOM_ACCUMULATOR_MAX_BATCH_MILLIS);
+ int newMaxBatchMS = isNullOrEmpty(propertyValue) ?
+ accumulatorMaxBatchMillis : Integer.parseInt(propertyValue);
+ if (newMaxBatchMS != accumulatorMaxBatchMillis && newMaxBatchMS > 0) {
+ accumulatorMaxBatchMillis = newMaxBatchMS;
+ log.info("Reconfigured maximum number of millis allowed since the first objective to {}",
+ accumulatorMaxBatchMillis);
+ }
+
}
/**
@@ -531,6 +593,7 @@
// Processing context for initializing pipeline driver behaviours.
private class InnerPipelineContext implements PipelinerContext {
+
@Override
public ServiceDirectory directory() {
return serviceDirectory;
@@ -540,6 +603,22 @@
public FlowObjectiveStore store() {
return flowObjectiveStore;
}
+
+ @Override
+ public int accumulatorMaxObjectives() {
+ return accumulatorMaxObjectives;
+ }
+
+ @Override
+ public int accumulatorMaxIdleMillis() {
+ return accumulatorMaxIdleMillis;
+ }
+
+ @Override
+ public int accumulatorMaxBatchMillis() {
+ return accumulatorMaxBatchMillis;
+ }
+
}
private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {