Implements accumulation of the fwdobjectives in ofdpa pipelines

Change-Id: I95cbdd9b3fb8d439003a103111a01dc3aee2072b
diff --git a/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java b/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
index 3d6dec4..a88754f 100644
--- a/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
+++ b/core/net/src/main/java/org/onosproject/net/OsgiPropertyConstants.java
@@ -131,4 +131,13 @@
     public static final String AUDIT_LOGGER = "auditLogger";
     public static final String AUDIT_LOGGER_DEFAULT = "securityAudit";
 
+    public static final String FOM_ACCUMULATOR_MAX_OBJECTIVES = "accumulatorMaxObjectives";
+    public static final int FOM_ACCUMULATOR_MAX_OBJECTIVES_DEFAULT = 1000;
+
+    public static final String FOM_ACCUMULATOR_MAX_IDLE_MILLIS = "accumulatorMaxIdleMillis";
+    public static final int FOM_ACCUMULATOR_MAX_IDLE_MILLIS_DEFAULT = 10;
+
+    public static final String FOM_ACCUMULATOR_MAX_BATCH_MILLIS = "accumulatorMaxBatchMillis";
+    public static final int FOM_ACCUMULATOR_MAX_BATCH_MILLIS_DEFAULT = 500;
+
 }
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index 1a4dd02..960f322 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -639,6 +639,7 @@
                 // getProvider is customized to favor driverProvider
                 FlowRuleProvider flowRuleProvider = getProvider(deviceId);
                 if (flowRuleProvider != null) {
+                    log.trace("Sending {} flow rules to {}", batchOperation.size(), deviceId);
                     flowRuleProvider.executeBatch(batchOperation);
                 }
 
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 2368f25..d0bd1c5 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -75,6 +75,12 @@
 import static org.onosproject.net.AnnotationKeys.DRIVER;
 import static org.onosproject.net.OsgiPropertyConstants.FOM_NUM_THREADS;
 import static org.onosproject.net.OsgiPropertyConstants.FOM_NUM_THREADS_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_OBJECTIVES;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_OBJECTIVES_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_IDLE_MILLIS;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_IDLE_MILLIS_DEFAULT;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_BATCH_MILLIS;
+import static org.onosproject.net.OsgiPropertyConstants.FOM_ACCUMULATOR_MAX_BATCH_MILLIS_DEFAULT;
 import static org.onosproject.security.AppGuard.checkPermission;
 import static org.onosproject.security.AppPermission.Type.FLOWRULE_WRITE;
 
@@ -85,7 +91,10 @@
     enabled = false,
     service = FlowObjectiveService.class,
     property = {
-        FOM_NUM_THREADS + ":Integer=" + FOM_NUM_THREADS_DEFAULT
+            FOM_NUM_THREADS + ":Integer=" + FOM_NUM_THREADS_DEFAULT,
+            FOM_ACCUMULATOR_MAX_OBJECTIVES + ":Integer=" + FOM_ACCUMULATOR_MAX_OBJECTIVES_DEFAULT,
+            FOM_ACCUMULATOR_MAX_IDLE_MILLIS + ":Integer=" + FOM_ACCUMULATOR_MAX_IDLE_MILLIS_DEFAULT,
+            FOM_ACCUMULATOR_MAX_BATCH_MILLIS + ":Integer=" + FOM_ACCUMULATOR_MAX_BATCH_MILLIS_DEFAULT,
     }
 )
 public class FlowObjectiveManager implements FlowObjectiveService {
@@ -101,6 +110,18 @@
     /** Number of worker threads. */
     private int numThreads = FOM_NUM_THREADS_DEFAULT;
 
+    // Parameters for the accumulator, each pipeline can implement
+    // its own accumulation logic. The following parameters are used
+    // to control the accumulator.
+
+    // Maximum number of objectives to accumulate before processing is triggered
+    private int accumulatorMaxObjectives = FOM_ACCUMULATOR_MAX_OBJECTIVES_DEFAULT;
+    // Maximum number of millis between objectives before processing is triggered
+    private int accumulatorMaxIdleMillis = FOM_ACCUMULATOR_MAX_IDLE_MILLIS_DEFAULT;
+    // Maximum number of millis allowed since the first objective before processing is triggered
+    private int accumulatorMaxBatchMillis = FOM_ACCUMULATOR_MAX_BATCH_MILLIS_DEFAULT;
+
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DriverService driverService;
 
@@ -154,10 +175,11 @@
     protected ExecutorService devEventExecutor;
 
     @Activate
-    protected void activate() {
+    protected void activate(ComponentContext context) {
         cfgService.registerProperties(FlowObjectiveManager.class);
         executorService = newFixedThreadPool(numThreads,
                                              groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+        modified(context);
         devEventExecutor = newSingleThreadScheduledExecutor(
                                        groupedThreads("onos/flowobj-dev-events", "events-%d", log));
         flowObjectiveStore.setDelegate(delegate);
@@ -183,8 +205,18 @@
 
     @Modified
     protected void modified(ComponentContext context) {
-        String propertyValue =
-                Tools.get(context.getProperties(), FOM_NUM_THREADS);
+        if (context != null) {
+            readComponentConfiguration(context);
+        }
+    }
+
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
+        String propertyValue = Tools.get(context.getProperties(), FOM_NUM_THREADS);
         int newNumThreads = isNullOrEmpty(propertyValue) ? numThreads : Integer.parseInt(propertyValue);
 
         if (newNumThreads != numThreads && newNumThreads > 0) {
@@ -197,6 +229,36 @@
             }
             log.info("Reconfigured number of worker threads to {}", numThreads);
         }
+
+        // Reconfiguration of the accumulator parameters is allowed
+        // Note: it will affect only pipelines going through init method
+        propertyValue = Tools.get(context.getProperties(), FOM_ACCUMULATOR_MAX_OBJECTIVES);
+        int newMaxObjs = isNullOrEmpty(propertyValue) ?
+                accumulatorMaxObjectives : Integer.parseInt(propertyValue);
+        if (newMaxObjs != accumulatorMaxObjectives && newMaxObjs > 0) {
+            accumulatorMaxObjectives = newMaxObjs;
+            log.info("Reconfigured maximum number of objectives to accumulate to {}",
+                     accumulatorMaxObjectives);
+        }
+
+        propertyValue = Tools.get(context.getProperties(), FOM_ACCUMULATOR_MAX_IDLE_MILLIS);
+        int newMaxIdleMS = isNullOrEmpty(propertyValue) ?
+                accumulatorMaxIdleMillis : Integer.parseInt(propertyValue);
+        if (newMaxIdleMS != accumulatorMaxIdleMillis && newMaxIdleMS > 0) {
+            accumulatorMaxIdleMillis = newMaxIdleMS;
+            log.info("Reconfigured maximum number of millis between objectives to {}",
+                     accumulatorMaxIdleMillis);
+        }
+
+        propertyValue = Tools.get(context.getProperties(), FOM_ACCUMULATOR_MAX_BATCH_MILLIS);
+        int newMaxBatchMS = isNullOrEmpty(propertyValue) ?
+                accumulatorMaxBatchMillis : Integer.parseInt(propertyValue);
+        if (newMaxBatchMS != accumulatorMaxBatchMillis && newMaxBatchMS > 0) {
+            accumulatorMaxBatchMillis = newMaxBatchMS;
+            log.info("Reconfigured maximum number of millis allowed since the first objective to {}",
+                     accumulatorMaxBatchMillis);
+        }
+
     }
 
     /**
@@ -531,6 +593,7 @@
 
     // Processing context for initializing pipeline driver behaviours.
     private class InnerPipelineContext implements PipelinerContext {
+
         @Override
         public ServiceDirectory directory() {
             return serviceDirectory;
@@ -540,6 +603,22 @@
         public FlowObjectiveStore store() {
             return flowObjectiveStore;
         }
+
+        @Override
+        public int accumulatorMaxObjectives() {
+            return accumulatorMaxObjectives;
+        }
+
+        @Override
+        public int accumulatorMaxIdleMillis() {
+            return accumulatorMaxIdleMillis;
+        }
+
+        @Override
+        public int accumulatorMaxBatchMillis() {
+            return accumulatorMaxBatchMillis;
+        }
+
     }
 
     private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index f9cc9bf..0999698 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -42,6 +42,7 @@
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -85,8 +86,8 @@
     final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
 
     @Activate
-    protected void activate() {
-        super.activate();
+    protected void activate(ComponentContext context) {
+        super.activate(context);
 
         filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
         fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
index 32bd7dc..121bfaa 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
@@ -190,7 +190,7 @@
         filteringObjectives = new ArrayList<>();
         forwardingObjectives = new ArrayList<>();
         nextObjectives = new ArrayList<>();
-        manager.activate();
+        manager.activate(null);
     }
 
     @After
@@ -243,7 +243,7 @@
                         .addCondition(Criteria.matchEthType(12))
                         .add();
 
-        manager.activate();
+        manager.activate(null);
         manager.filter(id1, filter);
 
         TestTools.assertAfter(RETRY_MS, () ->
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index 32c9f95..93b6337 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -198,7 +198,7 @@
         mgr.deviceService = createMock(DeviceService.class);
         mgr.driverService = createMock(DriverService.class);
         mgr.flowObjectiveStore = createMock(FlowObjectiveStore.class);
-        mgr.activate();
+        mgr.activate(null);
 
         reset(mgr.flowObjectiveStore);
         offset = DEFAULT_OFFSET;