Add support for vlan based intents in the Corsa driver

Changes:
- Improves processSpecific in AbstractCorsaPipeline in order to support
Intents without an explicit match on the Ethertype;
- Implements vlan based circuits in CorsaPipelineV3 through the management
of the FwdObjective without Treatment;
- Distinguish Groups from simple actions;
- Corsa group are identified using the actions of the treatment;
- handling of the pending next similar to DefaultSingleTablePipeline

Change-Id: Iff0f70d56c64193524c6640f31ffb3f5629499dc
diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
index ebc1da4..8083c2b 100644
--- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
+++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
@@ -93,14 +93,14 @@
     private ServiceDirectory serviceDirectory;
     protected FlowRuleService flowRuleService;
     private CoreService coreService;
-    private GroupService groupService;
+    protected GroupService groupService;
     protected MeterService meterService;
-    private FlowObjectiveStore flowObjectiveStore;
+    protected FlowObjectiveStore flowObjectiveStore;
     protected DeviceId deviceId;
     protected ApplicationId appId;
     protected DeviceService deviceService;
 
-    private KryoNamespace appKryo = new KryoNamespace.Builder()
+    protected KryoNamespace appKryo = new KryoNamespace.Builder()
             .register(GroupKey.class)
             .register(DefaultGroupKey.class)
             .register(CorsaGroup.class)
@@ -108,6 +108,8 @@
             .build("AbstractCorsaPipeline");
 
     private Cache<GroupKey, NextObjective> pendingGroups;
+    protected Cache<Integer, NextObjective> pendingNext;
+
 
     private ScheduledExecutorService groupChecker =
             Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
@@ -131,6 +133,16 @@
                     }
                 }).build();
 
+        pendingNext = CacheBuilder.newBuilder()
+                .expireAfterWrite(20, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        notification.getValue().context()
+                                .ifPresent(c -> c.onError(notification.getValue(),
+                                                          ObjectiveError.FLOWINSTALLATIONFAILED));
+                    }
+                }).build();
+
         groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS);
 
         coreService = serviceDirectory.get(CoreService.class);
@@ -304,6 +316,7 @@
 
     @Override
     public void forward(ForwardingObjective fwd) {
+
         Collection<FlowRule> rules;
         FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
 
@@ -354,16 +367,20 @@
     private Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
         log.debug("Processing specific forwarding objective");
         TrafficSelector selector = fwd.selector();
-        EthTypeCriterion ethType =
+        EthTypeCriterion ethTypeCriterion =
                 (EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
-        if (ethType != null) {
-            short et = ethType.ethType().toShort();
+        VlanIdCriterion vlanIdCriterion =
+                (VlanIdCriterion) selector.getCriterion(Criterion.Type.VLAN_VID);
+        if (ethTypeCriterion != null) {
+            short et = ethTypeCriterion.ethType().toShort();
             if (et == Ethernet.TYPE_IPV4) {
                 return processSpecificRoute(fwd);
             } else if (et == Ethernet.TYPE_VLAN) {
                 /* The ForwardingObjective must specify VLAN ethtype in order to use the Transit Circuit */
                 return processSpecificSwitch(fwd);
             }
+        } else if (vlanIdCriterion != null) {
+            return processSpecificSwitch(fwd);
         }
 
         fail(fwd, ObjectiveError.UNSUPPORTED);
@@ -464,6 +481,41 @@
     //Hook for modifying Route flow rule
     protected abstract Builder processSpecificRoutingRule(Builder rb);
 
+    protected enum CorsaTrafficTreatmentType {
+        /**
+         * If the treatment has to be handled as group.
+         */
+        GROUP,
+        /**
+         * If the treatment has to be handled as simple set of actions.
+         */
+        ACTIONS
+    }
+
+    /**
+     * Helper class to encapsulate both traffic treatment and
+     * type of treatment.
+     */
+    protected class CorsaTrafficTreatment {
+
+        private CorsaTrafficTreatmentType type;
+        private TrafficTreatment trafficTreatment;
+
+        public CorsaTrafficTreatment(CorsaTrafficTreatmentType treatmentType, TrafficTreatment trafficTreatment) {
+            this.type = treatmentType;
+            this.trafficTreatment = trafficTreatment;
+        }
+
+        public CorsaTrafficTreatmentType type() {
+            return type;
+        }
+
+        public TrafficTreatment treatment() {
+            return trafficTreatment;
+        }
+
+    }
+
     @Override
     public void next(NextObjective nextObjective) {
         switch (nextObjective.type()) {
@@ -471,20 +523,25 @@
                 Collection<TrafficTreatment> treatments = nextObjective.next();
                 if (treatments.size() == 1) {
                     TrafficTreatment treatment = treatments.iterator().next();
-                    treatment = processNextTreatment(treatment);
-                    GroupBucket bucket =
-                            DefaultGroupBucket.createIndirectGroupBucket(treatment);
+                    CorsaTrafficTreatment corsaTreatment = processNextTreatment(treatment);
                     final GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
-                    GroupDescription groupDescription
-                            = new DefaultGroupDescription(deviceId,
-                            GroupDescription.Type.INDIRECT,
-                            new GroupBuckets(Collections
-                                    .singletonList(bucket)),
-                            key,
-                            null, // let group service determine group id
-                            nextObjective.appId());
-                    groupService.addGroup(groupDescription);
-                    pendingGroups.put(key, nextObjective);
+                    if (corsaTreatment.type() == CorsaTrafficTreatmentType.GROUP) {
+                        GroupBucket bucket = DefaultGroupBucket.createIndirectGroupBucket(corsaTreatment.treatment());
+                        GroupBuckets buckets = new GroupBuckets(Collections.singletonList(bucket));
+                        // group id == null, let group service determine group id
+                        GroupDescription groupDescription = new DefaultGroupDescription(deviceId,
+                                                                                        GroupDescription.Type.INDIRECT,
+                                                                                        buckets,
+                                                                                        key,
+                                                                                        null,
+                                                                                        nextObjective.appId());
+                        groupService.addGroup(groupDescription);
+                        pendingGroups.put(key, nextObjective);
+                    } else if (corsaTreatment.type() == CorsaTrafficTreatmentType.ACTIONS) {
+                        pendingNext.put(nextObjective.id(), nextObjective);
+                        flowObjectiveStore.putNextGroup(nextObjective.id(), new CorsaGroup(key));
+                        nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
+                    }
                 }
                 break;
             case HASHED:
@@ -501,8 +558,8 @@
     }
 
     //Hook for altering the NextObjective treatment
-    protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
-        return treatment;
+    protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) {
+        return new CorsaTrafficTreatment(CorsaTrafficTreatmentType.GROUP, treatment);
     }
 
     //Init helper: Table Miss = Drop
diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java
index 10e3f0b..b1e15b2 100644
--- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java
+++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java
@@ -30,9 +30,12 @@
 import org.onosproject.net.flow.criteria.IPCriterion;
 import org.onosproject.net.flow.criteria.PortCriterion;
 import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.Instruction;
 import org.onosproject.net.flow.instructions.L2ModificationInstruction;
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.meter.Band;
 import org.onosproject.net.meter.DefaultBand;
 import org.onosproject.net.meter.DefaultMeterRequest;
@@ -69,9 +72,11 @@
     protected MeterId defaultMeterId = null;
 
     @Override
-    protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
+    protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) {
         TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
 
+
+
         treatment.immediate().stream()
                 .filter(i -> {
                     switch (i.type()) {
@@ -87,7 +92,48 @@
                             return false;
                     }
                 }).forEach(i -> tb.add(i));
-        return tb.build();
+
+        TrafficTreatment t = tb.build();
+
+
+        boolean isPresentModVlanId = false;
+        boolean isPresentModEthSrc = false;
+        boolean isPresentModEthDst = false;
+        boolean isPresentOutpuPort = false;
+
+        for (Instruction instruction : t.immediate()) {
+            switch (instruction.type()) {
+                case L2MODIFICATION:
+                    L2ModificationInstruction l2i = (L2ModificationInstruction) instruction;
+                    if (l2i instanceof L2ModificationInstruction.ModVlanIdInstruction) {
+                        isPresentModVlanId = true;
+                    }
+
+                    if (l2i instanceof L2ModificationInstruction.ModEtherInstruction) {
+                        L2ModificationInstruction.L2SubType subType = l2i.subtype();
+                        if (subType.equals(L2ModificationInstruction.L2SubType.ETH_SRC)) {
+                            isPresentModEthSrc = true;
+                        } else if (subType.equals(L2ModificationInstruction.L2SubType.ETH_DST)) {
+                            isPresentModEthDst = true;
+                        }
+                    }
+                case OUTPUT:
+                    isPresentOutpuPort = true;
+                default:
+            }
+        }
+        CorsaTrafficTreatmentType type = CorsaTrafficTreatmentType.ACTIONS;
+        /**
+         * This represents the allowed group for CorsaPipelinev3
+         */
+        if (isPresentModVlanId &&
+                isPresentModEthSrc &&
+                isPresentModEthDst &&
+                isPresentOutpuPort) {
+            type = CorsaTrafficTreatmentType.GROUP;
+        }
+        CorsaTrafficTreatment corsaTreatment = new CorsaTrafficTreatment(type, t);
+        return corsaTreatment;
     }
 
     @Override
@@ -115,9 +161,37 @@
                 .withPriority(fwd.priority())
                 .forDevice(deviceId)
                 .withSelector(filteredSelector)
-                .withTreatment(fwd.treatment())
                 .forTable(VLAN_CIRCUIT_TABLE);
 
+        if (fwd.treatment() != null) {
+            ruleBuilder.withTreatment(fwd.treatment());
+        } else {
+            if (fwd.nextId() != null) {
+                NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
+                if (nextObjective != null) {
+                    pendingNext.invalidate(fwd.nextId());
+                    TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder()
+                            .setVlanPcp((byte) 0)
+                            .setQueue(0)
+                            .meter(defaultMeterId);
+                    nextObjective.next().forEach(trafficTreatment -> {
+                        trafficTreatment.allInstructions().forEach(instruction -> {
+                           treatment.add(instruction);
+                        });
+                    });
+                    ruleBuilder.withTreatment(treatment.build());
+                } else {
+                    log.warn("The group left!");
+                    fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING));
+                    return ImmutableSet.of();
+                }
+            } else {
+                log.warn("Missing NextObjective ID for ForwardingObjective {}", fwd.id());
+                fail(fwd, ObjectiveError.BADPARAMS);
+                return ImmutableSet.of();
+            }
+        }
+
         if (fwd.permanent()) {
             ruleBuilder.makePermanent();
         } else {
diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java
index 9979cc1..514ac78 100644
--- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java
+++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java
@@ -29,6 +29,7 @@
 import org.onosproject.net.flow.criteria.Criterion;
 import org.onosproject.net.flow.criteria.IPCriterion;
 import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.instructions.Instruction;
 import org.onosproject.net.flow.instructions.Instructions;
 import org.onosproject.net.flow.instructions.L2ModificationInstruction;
 import org.onosproject.net.flowobjective.ForwardingObjective;
@@ -226,9 +227,8 @@
     }
 
     @Override
-    protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
+    protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) {
         TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
-        tb.add(Instructions.popVlan());
         treatment.immediate().stream()
                 .filter(i -> {
                     switch (i.type()) {
@@ -236,7 +236,6 @@
                             L2ModificationInstruction l2i = (L2ModificationInstruction) i;
                             if (l2i.subtype() == VLAN_ID ||
                                     l2i.subtype() == VLAN_POP ||
-                                    l2i.subtype() == VLAN_POP ||
                                     l2i.subtype() == ETH_DST ||
                                     l2i.subtype() == ETH_SRC) {
                                 return true;
@@ -247,6 +246,51 @@
                             return false;
                     }
                 }).forEach(i -> tb.add(i));
-        return tb.build();
+
+        TrafficTreatment t = tb.build();
+
+        boolean isPresentModVlanId = false;
+        boolean isPresentModEthSrc = false;
+        boolean isPresentModEthDst = false;
+        boolean isPresentOutpuPort = false;
+
+        for (Instruction instruction : t.immediate()) {
+            switch (instruction.type()) {
+                case L2MODIFICATION:
+                    L2ModificationInstruction l2i = (L2ModificationInstruction) instruction;
+                    if (l2i instanceof L2ModificationInstruction.ModVlanIdInstruction) {
+                        isPresentModVlanId = true;
+                    }
+
+                    if (l2i instanceof L2ModificationInstruction.ModEtherInstruction) {
+                        L2ModificationInstruction.L2SubType subType = l2i.subtype();
+                        if (subType.equals(L2ModificationInstruction.L2SubType.ETH_SRC)) {
+                            isPresentModEthSrc = true;
+                        } else if (subType.equals(L2ModificationInstruction.L2SubType.ETH_DST)) {
+                            isPresentModEthDst = true;
+                        }
+                    }
+                case OUTPUT:
+                    isPresentOutpuPort = true;
+                default:
+            }
+        }
+        CorsaTrafficTreatmentType type = CorsaTrafficTreatmentType.ACTIONS;
+        /**
+         * These are the allowed groups for CorsaPipelinev39
+         */
+        if (isPresentModVlanId && isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) {
+            type = CorsaTrafficTreatmentType.GROUP;
+
+        } else if ((!isPresentModVlanId && isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) ||
+                (!isPresentModVlanId && !isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) ||
+                (!isPresentModVlanId && !isPresentModEthSrc && !isPresentModEthDst && isPresentOutpuPort)) {
+            type = CorsaTrafficTreatmentType.GROUP;
+            TrafficTreatment.Builder tb2 = DefaultTrafficTreatment.builder(t);
+            tb2.add(Instructions.popVlan());
+            t = tb2.build();
+        }
+        CorsaTrafficTreatment corsaTreatment = new CorsaTrafficTreatment(type, t);
+        return corsaTreatment;
     }
 }