[ONOS-6475] Supports BROADCAST by DefaultSingleTablePipeline

Change-Id: I66c9f9961763b057005bdf6f7a3b6fde1b5970cd
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
index 358fea5..306f801 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
@@ -44,6 +44,7 @@
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.slf4j.Logger;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -96,7 +97,6 @@
 
     @Override
     public void filter(FilteringObjective filter) {
-
         TrafficTreatment.Builder actions;
         switch (filter.type()) {
             case PERMIT:
@@ -135,9 +135,7 @@
         } else {
             ruleBuilder.makeTemporary(filter.timeout());
         }
-
         installObjective(ruleBuilder, filter);
-
     }
 
     @Override
@@ -180,24 +178,24 @@
                     treatment = appKryo.deserialize(next.data());
                 } else {
                     pendingAddNext.invalidate(fwd.nextId());
-                    treatment = nextObjective.next().iterator().next();
+                    treatment = getTreatment(nextObjective);
+                    if (treatment == null) {
+                        fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.UNSUPPORTED));
+                        return;
+                    }
                 }
             } else {
                 // We get the NextGroup from the remove operation.
                 // Doing an operation on the store seems to be very expensive.
                 next = flowObjectiveStore.getNextGroup(fwd.nextId());
-                if (next == null) {
-                    fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING));
-                    return;
-                }
-                treatment = appKryo.deserialize(next.data());
+                treatment = (next != null) ? appKryo.deserialize(next.data()) : null;
             }
             // If the treatment is null we cannot re-build the original flow
             if (treatment == null)  {
                 fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING));
                 return;
             }
-            // Finally we build the flow rule and push to the flowrule subsystem.
+            // Finally we build the flow rule and push to the flow rule subsystem.
             FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
                     .forDevice(deviceId)
                     .withSelector(selector)
@@ -216,7 +214,6 @@
     private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
         FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
         switch (objective.op()) {
-
             case ADD:
                 flowBuilder.add(ruleBuilder.build());
                 break;
@@ -245,12 +242,21 @@
     public void next(NextObjective nextObjective) {
         switch (nextObjective.op()) {
             case ADD:
+                // Check next objective
+                TrafficTreatment treatment = getTreatment(nextObjective);
+                if (treatment == null) {
+                    // unsupported next objective
+                    nextObjective.context().ifPresent(context -> context.onError(nextObjective,
+                                                                                 ObjectiveError.UNSUPPORTED));
+                    return;
+                }
                 // We insert the value in the cache
                 pendingAddNext.put(nextObjective.id(), nextObjective);
+
                 // Then in the store, this will unblock the queued fwd obj
                 flowObjectiveStore.putNextGroup(
                         nextObjective.id(),
-                        new SingleGroup(nextObjective.next().iterator().next())
+                        new SingleGroup(treatment)
                 );
                 break;
             case REMOVE:
@@ -273,6 +279,37 @@
         return Collections.emptyList();
     }
 
+    /**
+     * Gets traffic treatment from a next objective.
+     * Merge traffic treatments from next objective if the next objective is
+     * BROADCAST type and contains multiple traffic treatments.
+     * Returns first treatment from next objective if the next objective is
+     * SIMPLE type and it contains only one treatment.
+     *
+     * @param nextObjective the next objective
+     * @return the treatment from next objective; null if not supported
+     */
+    private TrafficTreatment getTreatment(NextObjective nextObjective) {
+        Collection<TrafficTreatment> treatments = nextObjective.next();
+        switch (nextObjective.type()) {
+            case SIMPLE:
+                if (treatments.size() != 1) {
+                    log.error("Next Objectives of type SIMPLE should have only " +
+                                      "one traffic treatment. NexObjective: {}",
+                              nextObjective.toString());
+                    return null;
+                }
+                return treatments.iterator().next();
+            case BROADCAST:
+                TrafficTreatment.Builder builder = DefaultTrafficTreatment.builder();
+                treatments.forEach(builder::addTreatment);
+                return builder.build();
+            default:
+                log.error("Unsupported next objective type {}.", nextObjective.type());
+                return null;
+        }
+    }
+
     private class SingleGroup implements NextGroup {
 
         private TrafficTreatment nextActions;