[VOL-3249] Batching operations in OLT pipeliner

Change-Id: Iea4a30ddb224ac5d9493b99565fb9b0dfe57f1bc
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
index dd72436..c6b1e81 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
@@ -29,6 +29,8 @@
 import org.onlab.packet.IPv6;
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.VlanId;
+import org.onlab.util.AbstractAccumulator;
+import org.onlab.util.Accumulator;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
@@ -38,6 +40,7 @@
 import org.onosproject.net.behaviour.Pipeliner;
 import org.onosproject.net.behaviour.PipelinerContext;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.driver.Driver;
 import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -85,10 +88,14 @@
 import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.Timer;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.onosproject.core.CoreService.CORE_APP_NAME;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -124,6 +131,13 @@
             .register(OLTPipelineGroup.class)
             .build("OltPipeline");
 
+    private static final Timer TIMER = new Timer("filterobj-batching");
+    private Accumulator<Pair<FilteringObjective, FlowRule>> accumulator;
+
+    // accumulator executor service
+    private ScheduledExecutorService accumulatorExecutorService
+            = newSingleThreadScheduledExecutor(groupedThreads("OltPipeliner", "acc-%d", log));
+
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
         log.debug("Initiate OLT pipeline");
@@ -139,6 +153,16 @@
         appId = coreService.registerApplication(
                 "org.onosproject.driver.OLTPipeline");
 
+        // Init the accumulator, if enabled
+        if (isAccumulatorEnabled()) {
+            log.debug("Building accumulator with maxObjs {}, batchMs {}, idleMs {}",
+                      context.accumulatorMaxObjectives(), context.accumulatorMaxBatchMillis(),
+                      context.accumulatorMaxIdleMillis());
+            accumulator = new ObjectiveAccumulator(context.accumulatorMaxObjectives(),
+                                                             context.accumulatorMaxBatchMillis(),
+                                                             context.accumulatorMaxIdleMillis());
+        }
+
 
         pendingGroups = CacheBuilder.newBuilder()
                 .expireAfterWrite(20, TimeUnit.SECONDS)
@@ -152,6 +176,15 @@
 
     }
 
+    public boolean isAccumulatorEnabled() {
+        Driver driver = super.data().driver();
+        // we cannot determine the property
+        if (driver == null) {
+            return false;
+        }
+        return Boolean.parseBoolean(driver.getProperty(ACCUMULATOR_ENABLED));
+    }
+
     @Override
     public void filter(FilteringObjective filter) {
         Instructions.OutputInstruction output;
@@ -440,7 +473,8 @@
                 log.warn("Unknown forwarding operation: {}", fwd.op());
         }
 
-        applyFlowRules(builder, fwd);
+        applyFlowRules(ImmutableList.of(fwd), builder);
+
 
     }
 
@@ -1005,21 +1039,26 @@
                 .withPriority(filter.priority())
                 .build();
 
-        FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
-
-        switch (filter.type()) {
-            case PERMIT:
-                opsBuilder.add(rule);
-                break;
-            case DENY:
-                opsBuilder.remove(rule);
-                break;
-            default:
-                log.warn("Unknown filter type : {}", filter.type());
-                fail(filter, ObjectiveError.UNSUPPORTED);
+        if (accumulator != null) {
+            if (log.isDebugEnabled()) {
+                log.debug("Adding pair to batch: {}", Pair.of(filter, rule));
+            }
+            accumulator.add(Pair.of(filter, rule));
+        } else {
+            FlowRuleOperations.Builder opsBuilder = FlowRuleOperations.builder();
+            switch (filter.type()) {
+                case PERMIT:
+                    opsBuilder.add(rule);
+                    break;
+                case DENY:
+                    opsBuilder.remove(rule);
+                    break;
+                default:
+                    log.warn("Unknown filter type : {}", filter.type());
+                    fail(filter, ObjectiveError.UNSUPPORTED);
+            }
+            applyFlowRules(ImmutableList.of(filter), opsBuilder);
         }
-
-        applyFlowRules(opsBuilder, filter);
     }
 
     private void applyRules(ForwardingObjective fwd, FlowRule.Builder... fwdBuilders) {
@@ -1043,24 +1082,62 @@
                 log.warn("Unknown forwarding operation: {}", fwd.op());
         }
 
-        applyFlowRules(builder, fwd);
+        applyFlowRules(ImmutableList.of(fwd), builder);
+
+
     }
 
-    private void applyFlowRules(FlowRuleOperations.Builder builder,
-                                Objective objective) {
+    private void applyFlowRules(List<Objective> objectives, FlowRuleOperations.Builder builder) {
         flowRuleService.apply(builder.build(new FlowRuleOperationsContext() {
             @Override
             public void onSuccess(FlowRuleOperations ops) {
-                pass(objective);
+                objectives.forEach(obj -> {
+                    pass(obj);
+                });
             }
 
             @Override
             public void onError(FlowRuleOperations ops) {
-                fail(objective, ObjectiveError.FLOWINSTALLATIONFAILED);
+                objectives.forEach(obj -> {
+                    fail(obj, ObjectiveError.FLOWINSTALLATIONFAILED);
+                });
+
             }
         }));
     }
 
+    // Builds the batch using the accumulated flow rules
+    private void sendFilters(List<Pair<FilteringObjective, FlowRule>> pairs) {
+        FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
+        log.debug("Sending batch of {} filter-objs", pairs.size());
+        List<Objective> filterObjs = Lists.newArrayList();
+        // Iterates over all accumulated flow rules and then build an unique batch
+        pairs.forEach(pair -> {
+            FilteringObjective filter = pair.getLeft();
+            FlowRule rule = pair.getRight();
+            switch (filter.type()) {
+                case PERMIT:
+                    flowOpsBuilder.add(rule);
+                    log.debug("Applying add filter-obj {} to device: {}", filter.id(), deviceId);
+                    filterObjs.add(filter);
+                    break;
+                case DENY:
+                    flowOpsBuilder.remove(rule);
+                    log.debug("Deleting flow rule {} to device: {}", rule, deviceId);
+                    filterObjs.add(filter);
+                    break;
+                default:
+                    fail(filter, ObjectiveError.UNKNOWN);
+                    log.warn("Unknown forwarding type {}", filter.type());
+            }
+        });
+        if (log.isDebugEnabled()) {
+            log.debug("Applying batch {}", flowOpsBuilder.build());
+        }
+        // Finally applies the operations
+        applyFlowRules(filterObjs, flowOpsBuilder);
+    }
+
     private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
         return criteria.stream()
                 .filter(c -> c.type().equals(type))
@@ -1149,4 +1226,37 @@
         // TODO Implementation deferred to vendor
         return null;
     }
+
+    // Flow rules accumulator for reducing the number of transactions required to the devices.
+    private final class ObjectiveAccumulator
+            extends AbstractAccumulator<Pair<FilteringObjective, FlowRule>> {
+
+        ObjectiveAccumulator(int maxFilter, int maxBatchMS, int maxIdleMS) {
+            super(TIMER, maxFilter, maxBatchMS, maxIdleMS);
+        }
+
+        @Override
+        public void processItems(List<Pair<FilteringObjective, FlowRule>> pairs) {
+            // Triggers creation of a batch using the list of flowrules generated from objs.
+            accumulatorExecutorService.execute(new FlowRulesBuilderTask(pairs));
+        }
+    }
+
+    // Task for building batch of flow rules in a separate thread.
+    private final class FlowRulesBuilderTask implements Runnable {
+        private final List<Pair<FilteringObjective, FlowRule>> pairs;
+
+        FlowRulesBuilderTask(List<Pair<FilteringObjective, FlowRule>> pairs) {
+            this.pairs = pairs;
+        }
+
+        @Override
+        public void run() {
+            try {
+                sendFilters(pairs);
+            } catch (Exception e) {
+                log.warn("Unable to send objectives", e);
+            }
+        }
+    }
 }
diff --git a/drivers/default/src/main/resources/onos-drivers.xml b/drivers/default/src/main/resources/onos-drivers.xml
index fff45dc..d63638b 100644
--- a/drivers/default/src/main/resources/onos-drivers.xml
+++ b/drivers/default/src/main/resources/onos-drivers.xml
@@ -193,6 +193,7 @@
                    impl="org.onosproject.driver.pipeline.OltPipeline"/>
         <behaviour api="org.onosproject.net.behaviour.MeterQuery"
                    impl="org.onosproject.driver.query.FullMetersAvailable"/>
+        <property name="accumulatorEnabled">true</property>
     </driver>
     <driver name="fj-olt" extends="default"
             manufacturer="Fujitsu" hwVersion="svkOLT" swVersion="v1.0">