Implements accumulation of the fwdobjectives in ofdpa pipelines

Change-Id: I95cbdd9b3fb8d439003a103111a01dc3aee2072b
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
index b3494bd..6f497e7 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/Pipeliner.java
@@ -29,6 +29,16 @@
 public interface Pipeliner extends HandlerBehaviour {
 
     /**
+     * Accumulator enabled property. Determines whether the accumulator is enabled.
+     * The accumulator is assumed to be disabled if this property is undefined.
+     *
+     * If enabled, the pipeliner will try to accumulate objectives and create
+     * batches of flow rules when possible.
+     *
+     */
+    String ACCUMULATOR_ENABLED = "accumulatorEnabled";
+
+    /**
      * Initializes the driver with context required for its operation.
      *
      * @param deviceId the deviceId
@@ -70,4 +80,5 @@
      *          an empty list if no groups were created
      */
     List<String> getNextMappings(NextGroup nextGroup);
+
 }
diff --git a/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java b/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
index d539e6e..5c7ddd1 100644
--- a/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
+++ b/core/api/src/main/java/org/onosproject/net/behaviour/PipelinerContext.java
@@ -38,4 +38,32 @@
     FlowObjectiveStore store();
 
     // TODO: add means to store and access shared state
+
+    /**
+     * Returns the maximum number of objectives to accumulate before processing is triggered.
+     *
+     * @return the maximum number of objectives. -1 if the method is not implemented.
+     */
+    default int accumulatorMaxObjectives() {
+        return -1;
+    }
+
+    /**
+     * Returns the maximum number of millis between objectives before processing is triggered.
+     *
+     * @return the maximum number of millis between objectives. -1 if the method is not implemented.
+     */
+    default int accumulatorMaxIdleMillis() {
+        return -1;
+    }
+
+    /**
+     * Returns the maximum number of millis allowed since the first objective before processing is triggered.
+     *
+     * @return the maximum number of millis allowed since the first objective. -1 if the method is not implemented.
+     */
+    default int accumulatorMaxBatchMillis() {
+        return -1;
+    }
+
 }
diff --git a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
index e7f3ef6..429d691 100644
--- a/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flow/impl/FlowRuleManager.java
@@ -629,6 +629,7 @@
                 // getProvider is customized to favor driverProvider
                 FlowRuleProvider flowRuleProvider = getProvider(deviceId);
                 if (flowRuleProvider != null) {
+                    log.trace("Sending {} flow rules to {}", batchOperation.size(), deviceId);
                     flowRuleProvider.executeBatch(batchOperation);
                 }
 
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
index 462762a..e458f71 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManager.java
@@ -94,6 +94,33 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    // Parameters for the accumulator, each pipeline can implement
+    // its own accumulation logic. The following parameters are used
+    // to control the accumulator.
+    private static final String ACCUMULATOR_MAX_OBJECTIVES = "accumulatorMaxObjectives";
+    // Maximum number of objectives to accumulate before processing is triggered
+    private static final int DEFAULT_ACCUMULATOR_MAX_OBJECTIVES = 1000;
+    @Property(name = ACCUMULATOR_MAX_OBJECTIVES,
+            intValue = DEFAULT_ACCUMULATOR_MAX_OBJECTIVES,
+            label = "Maximum number of objectives to accumulate")
+    private int accumulatorMaxObjectives = DEFAULT_ACCUMULATOR_MAX_OBJECTIVES;
+
+    private static final String ACCUMULATOR_MAX_IDLE_MILLIS = "accumulatorMaxIdleMillis";
+    // Maximum number of millis between objectives before processing is triggered
+    private static final int DEFAULT_ACCUMULATOR_MAX_IDLE_MILLIS = 10;
+    @Property(name = ACCUMULATOR_MAX_IDLE_MILLIS,
+            intValue = DEFAULT_ACCUMULATOR_MAX_IDLE_MILLIS,
+            label = "Maximum number of millis between objectives")
+    private int accumulatorMaxIdleMillis = DEFAULT_ACCUMULATOR_MAX_IDLE_MILLIS;
+
+    private static final String ACCUMULATOR_MAX_BATCH_MILLIS = "accumulatorMaxBatchMillis";
+    // Maximum number of millis allowed since the first objective before processing is triggered
+    private static final int DEFAULT_ACCUMULATOR_MAX_BATCH_MILLIS = 500;
+    @Property(name = ACCUMULATOR_MAX_BATCH_MILLIS,
+            intValue = DEFAULT_ACCUMULATOR_MAX_BATCH_MILLIS,
+            label = "Maximum number of millis allowed since the first objective")
+    private int accumulatorMaxBatchMillis = DEFAULT_ACCUMULATOR_MAX_BATCH_MILLIS;
+
     private static final int DEFAULT_NUM_THREADS = 4;
     @Property(name = NUM_THREAD,
              intValue = DEFAULT_NUM_THREADS,
@@ -153,10 +180,11 @@
     protected ExecutorService devEventExecutor;
 
     @Activate
-    protected void activate() {
+    protected void activate(ComponentContext context) {
         cfgService.registerProperties(getClass());
         executorService = newFixedThreadPool(numThreads,
                                              groupedThreads(GROUP_THREAD_NAME, WORKER_PATTERN, log));
+        modified(context);
         devEventExecutor = newSingleThreadScheduledExecutor(
                                        groupedThreads("onos/flowobj-dev-events", "events-%d", log));
         flowObjectiveStore.setDelegate(delegate);
@@ -182,8 +210,18 @@
 
     @Modified
     protected void modified(ComponentContext context) {
-        String propertyValue =
-                Tools.get(context.getProperties(), NUM_THREAD);
+        if (context != null) {
+            readComponentConfiguration(context);
+        }
+    }
+
+    /**
+     * Extracts properties from the component configuration context.
+     *
+     * @param context the component context
+     */
+    private void readComponentConfiguration(ComponentContext context) {
+        String propertyValue = Tools.get(context.getProperties(), NUM_THREAD);
         int newNumThreads = isNullOrEmpty(propertyValue) ? numThreads : Integer.parseInt(propertyValue);
 
         if (newNumThreads != numThreads && newNumThreads > 0) {
@@ -196,6 +234,36 @@
             }
             log.info("Reconfigured number of worker threads to {}", numThreads);
         }
+
+        // Reconfiguration of the accumulator parameters is allowed
+        // Note: it will affect only pipelines going through init method
+        propertyValue = Tools.get(context.getProperties(), ACCUMULATOR_MAX_OBJECTIVES);
+        int newMaxObjs = isNullOrEmpty(propertyValue) ?
+                accumulatorMaxObjectives : Integer.parseInt(propertyValue);
+        if (newMaxObjs != accumulatorMaxObjectives && newMaxObjs > 0) {
+            accumulatorMaxObjectives = newMaxObjs;
+            log.info("Reconfigured maximum number of objectives to accumulate to {}",
+                     accumulatorMaxObjectives);
+        }
+
+        propertyValue = Tools.get(context.getProperties(), ACCUMULATOR_MAX_IDLE_MILLIS);
+        int newMaxIdleMS = isNullOrEmpty(propertyValue) ?
+                accumulatorMaxIdleMillis : Integer.parseInt(propertyValue);
+        if (newMaxIdleMS != accumulatorMaxIdleMillis && newMaxIdleMS > 0) {
+            accumulatorMaxIdleMillis = newMaxIdleMS;
+            log.info("Reconfigured maximum number of millis between objectives to {}",
+                     accumulatorMaxIdleMillis);
+        }
+
+        propertyValue = Tools.get(context.getProperties(), ACCUMULATOR_MAX_BATCH_MILLIS);
+        int newMaxBatchMS = isNullOrEmpty(propertyValue) ?
+                accumulatorMaxBatchMillis : Integer.parseInt(propertyValue);
+        if (newMaxBatchMS != accumulatorMaxBatchMillis && newMaxBatchMS > 0) {
+            accumulatorMaxBatchMillis = newMaxBatchMS;
+            log.info("Reconfigured maximum number of millis allowed since the first objective to {}",
+                     accumulatorMaxBatchMillis);
+        }
+
     }
 
     /**
@@ -530,6 +598,7 @@
 
     // Processing context for initializing pipeline driver behaviours.
     private class InnerPipelineContext implements PipelinerContext {
+
         @Override
         public ServiceDirectory directory() {
             return serviceDirectory;
@@ -539,6 +608,21 @@
         public FlowObjectiveStore store() {
             return flowObjectiveStore;
         }
+
+        @Override
+        public int accumulatorMaxObjectives() {
+            return accumulatorMaxObjectives;
+        }
+
+        @Override
+        public int accumulatorMaxIdleMillis() {
+            return accumulatorMaxIdleMillis;
+        }
+
+        @Override
+        public int accumulatorMaxBatchMillis() {
+            return accumulatorMaxBatchMillis;
+        }
     }
 
     private class InternalStoreDelegate implements FlowObjectiveStoreDelegate {
diff --git a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
index 11f99d7..899b4f8 100644
--- a/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
+++ b/core/net/src/main/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManager.java
@@ -42,6 +42,7 @@
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.flowobjective.ObjectiveEvent;
 import org.onosproject.net.flowobjective.ObjectiveQueueKey;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,8 +87,8 @@
     final FlowObjectiveStoreDelegate delegate = new InternalStoreDelegate();
 
     @Activate
-    protected void activate() {
-        super.activate();
+    protected void activate(ComponentContext context) {
+        super.activate(context);
 
         filtCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-filt", log));
         fwdCacheEventExecutor = newSingleThreadExecutor(groupedThreads("onos/flowobj", "cache-event-fwd", log));
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
index 32bd7dc..121bfaa 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/FlowObjectiveManagerTest.java
@@ -190,7 +190,7 @@
         filteringObjectives = new ArrayList<>();
         forwardingObjectives = new ArrayList<>();
         nextObjectives = new ArrayList<>();
-        manager.activate();
+        manager.activate(null);
     }
 
     @After
@@ -243,7 +243,7 @@
                         .addCondition(Criteria.matchEthType(12))
                         .add();
 
-        manager.activate();
+        manager.activate(null);
         manager.filter(id1, filter);
 
         TestTools.assertAfter(RETRY_MS, () ->
diff --git a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
index a312b29..bd9ae7c 100644
--- a/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/net/flowobjective/impl/InOrderFlowObjectiveManagerTest.java
@@ -196,7 +196,7 @@
         mgr.deviceService = createMock(DeviceService.class);
         mgr.driverService = createMock(DriverService.class);
         mgr.flowObjectiveStore = createMock(FlowObjectiveStore.class);
-        mgr.activate();
+        mgr.activate(null);
 
         reset(mgr.flowObjectiveStore);
         offset = DEFAULT_OFFSET;
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
index 46e2e75..e8d172c 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/ofdpa/Ofdpa2Pipeline.java
@@ -16,7 +16,9 @@
 package org.onosproject.driver.pipeline.ofdpa;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
+import org.apache.commons.lang3.tuple.Pair;
 import org.onlab.osgi.ServiceDirectory;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.EthType.EtherType;
@@ -24,6 +26,8 @@
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
@@ -39,6 +43,7 @@
 import org.onosproject.net.behaviour.PipelinerContext;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.driver.Driver;
 import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -93,10 +98,12 @@
 import java.util.Deque;
 import java.util.List;
 import java.util.Objects;
+import java.util.Timer;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.Executors.newScheduledThreadPool;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.packet.MacAddress.BROADCAST;
 import static org.onlab.packet.MacAddress.IPV4_MULTICAST;
 import static org.onlab.packet.MacAddress.IPV6_MULTICAST;
@@ -112,6 +119,10 @@
  * Driver for Broadcom's OF-DPA v2.0 TTP.
  */
 public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeliner {
+    // Timer for the accumulator
+    private static final Timer TIMER = new Timer("fwdobj-batching");
+    private Accumulator<Pair<ForwardingObjective, Collection<FlowRule>>> accumulator;
+
     protected static final int PORT_TABLE = 0;
     protected static final int VLAN_TABLE = 10;
     protected static final int VLAN_1_TABLE = 11;
@@ -161,11 +172,15 @@
     protected Ofdpa2GroupHandler groupHandler;
 
     // flows installations to be retried
-    private ScheduledExecutorService executorService
+    private ScheduledExecutorService retryExecutorService
         = newScheduledThreadPool(5, groupedThreads("OfdpaPipeliner", "retry-%d", log));
     private static final int MAX_RETRY_ATTEMPTS = 10;
     private static final int RETRY_MS = 1000;
 
+    // accumulator executor service
+    private ScheduledExecutorService accumulatorExecutorService
+        = newSingleThreadScheduledExecutor(groupedThreads("OfdpaPipeliner", "acc-%d", log));
+
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
         this.deviceId = deviceId;
@@ -176,6 +191,12 @@
         groupService = serviceDirectory.get(GroupService.class);
         flowObjectiveStore = context.store();
         deviceService = serviceDirectory.get(DeviceService.class);
+        // Init the accumulator, if enabled
+        if (isAccumulatorEnabled()) {
+            accumulator = new ForwardingObjectiveAccumulator(context.accumulatorMaxObjectives(),
+                                                             context.accumulatorMaxBatchMillis(),
+                                                             context.accumulatorMaxIdleMillis());
+        }
 
         initDriverId();
         initGroupHander(context);
@@ -199,6 +220,15 @@
         // software switches does require table-miss-entries.
     }
 
+    public boolean isAccumulatorEnabled() {
+        Driver driver = super.data().driver();
+        // we cannot determine the property
+        if (driver == null) {
+            return false;
+        }
+        return Boolean.parseBoolean(driver.getProperty(ACCUMULATOR_ENABLED));
+    }
+
     /**
      * Determines whether this pipeline requires MPLS POP instruction.
      *
@@ -311,38 +341,57 @@
             // generated by FlowRule service for empty flowOps.
             return;
         }
-        sendForward(fwd, rules);
+        // Let's accumulate flow rules - otherwise send directly
+        if (accumulator != null) {
+            accumulator.add(Pair.of(fwd, rules));
+        } else {
+            sendForwards(Collections.singletonList(Pair.of(fwd, rules)));
+        }
     }
 
-    private void sendForward(ForwardingObjective fwd, Collection<FlowRule> rules) {
+    // Builds the batch using the accumulated flow rules
+    private void sendForwards(List<Pair<ForwardingObjective, Collection<FlowRule>>> pairs) {
         FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
-        switch (fwd.op()) {
-        case ADD:
-            rules.stream()
-            .filter(Objects::nonNull)
-            .forEach(flowOpsBuilder::add);
-            log.debug("Applying a add fwd-obj {} to sw:{}", fwd.id(), deviceId);
-            break;
-        case REMOVE:
-            rules.stream()
-            .filter(Objects::nonNull)
-            .forEach(flowOpsBuilder::remove);
-            log.debug("Deleting a flow rule to sw:{}", deviceId);
-            break;
-        default:
-            fail(fwd, ObjectiveError.UNKNOWN);
-            log.warn("Unknown forwarding type {}", fwd.op());
-        }
-
+        log.debug("Sending {} fwd-objs", pairs.size());
+        List<Objective> fwdObjs = Lists.newArrayList();
+        // Iterates over all accumulated flow rules and then build an unique batch
+        pairs.forEach(pair -> {
+            ForwardingObjective fwd = pair.getLeft();
+            Collection<FlowRule> rules = pair.getRight();
+            switch (fwd.op()) {
+                case ADD:
+                    rules.stream()
+                            .filter(Objects::nonNull)
+                            .forEach(flowOpsBuilder::add);
+                    log.debug("Applying a add fwd-obj {} to sw:{}", fwd.id(), deviceId);
+                    fwdObjs.add(fwd);
+                    break;
+                case REMOVE:
+                    rules.stream()
+                            .filter(Objects::nonNull)
+                            .forEach(flowOpsBuilder::remove);
+                    log.debug("Deleting a flow rule to sw:{}", deviceId);
+                    fwdObjs.add(fwd);
+                    break;
+                default:
+                    fail(fwd, ObjectiveError.UNKNOWN);
+                    log.warn("Unknown forwarding type {}", fwd.op());
+            }
+        });
+        // Finally applies the operations
         flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
+
             @Override
             public void onSuccess(FlowRuleOperations ops) {
-                pass(fwd);
+                log.trace("Flow rule operations onSuccess {}", ops);
+                fwdObjs.forEach(Ofdpa2Pipeline::pass);
             }
 
             @Override
             public void onError(FlowRuleOperations ops) {
-                fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
+                ObjectiveError error = ObjectiveError.FLOWINSTALLATIONFAILED;
+                log.warn("Flow rule operations onError {}. Reason = {}", ops, error);
+                fwdObjs.forEach(fwdObj -> fail(fwdObj, error));
             }
         }));
     }
@@ -1387,7 +1436,7 @@
         }
 
         if (emptyGroup) {
-            executorService.schedule(new RetryFlows(fwd, flowRuleCollection),
+            retryExecutorService.schedule(new RetryFlows(fwd, flowRuleCollection),
                                      RETRY_MS, TimeUnit.MILLISECONDS);
         }
         return flowRuleCollection;
@@ -1810,9 +1859,42 @@
         public void run() {
             log.info("RETRY FLOWS ATTEMPT# {} for fwd:{} rules:{}",
                      MAX_RETRY_ATTEMPTS - attempts, fwd.id(), retryFlows.size());
-            sendForward(fwd, retryFlows);
+            sendForwards(Collections.singletonList(Pair.of(fwd, retryFlows)));
             if (--attempts > 0) {
-                executorService.schedule(this, RETRY_MS, TimeUnit.MILLISECONDS);
+                retryExecutorService.schedule(this, RETRY_MS, TimeUnit.MILLISECONDS);
+            }
+        }
+    }
+
+    // Flow rules accumulator for reducing the number of transactions required to the devices.
+    private final class ForwardingObjectiveAccumulator
+            extends AbstractAccumulator<Pair<ForwardingObjective, Collection<FlowRule>>> {
+
+        ForwardingObjectiveAccumulator(int maxFwd, int maxBatchMS, int maxIdleMS) {
+            super(TIMER, maxFwd, maxBatchMS, maxIdleMS);
+        }
+
+        @Override
+        public void processItems(List<Pair<ForwardingObjective, Collection<FlowRule>>> pairs) {
+            // Triggers creation of a batch using the list of flowrules generated from fwdobjs.
+            accumulatorExecutorService.execute(new FlowRulesBuilderTask(pairs));
+        }
+    }
+
+    // Task for building batch of flow rules in a separate thread.
+    private final class FlowRulesBuilderTask implements Runnable {
+        private final List<Pair<ForwardingObjective, Collection<FlowRule>>> pairs;
+
+        FlowRulesBuilderTask(List<Pair<ForwardingObjective, Collection<FlowRule>>> pairs) {
+            this.pairs = pairs;
+        }
+
+        @Override
+        public void run() {
+            try {
+                sendForwards(pairs);
+            } catch (Exception e) {
+                log.warn("Unable to send forwards", e);
             }
         }
     }
diff --git a/drivers/default/src/main/resources/onos-drivers.xml b/drivers/default/src/main/resources/onos-drivers.xml
index a9413e4..0ecb03e 100644
--- a/drivers/default/src/main/resources/onos-drivers.xml
+++ b/drivers/default/src/main/resources/onos-drivers.xml
@@ -72,6 +72,7 @@
         <behaviour api="org.onosproject.net.behaviour.ExtensionSelectorResolver"
                    impl="org.onosproject.driver.extensions.OfdpaExtensionSelectorInterpreter" />
         <property name="meterCapable">false</property>
+        <property name="accumulatorEnabled">true</property>
     </driver>
 
     <!--  Driver for OFDPA 3.0 EA*.