Add support for vlan based intents in the Corsa driver

Changes:
- Improves processSpecific in AbstractCorsaPipeline in order to support
Intents without an explicit match on the Ethertype;
- Implements vlan based circuits in CorsaPipelineV3 through the management
of the FwdObjective without Treatment;
- Distinguish Groups from simple actions;
- Corsa group are identified using the actions of the treatment;
- handling of the pending next similar to DefaultSingleTablePipeline

Change-Id: Iff0f70d56c64193524c6640f31ffb3f5629499dc
diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
index ebc1da4..8083c2b 100644
--- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
+++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
@@ -93,14 +93,14 @@
     private ServiceDirectory serviceDirectory;
     protected FlowRuleService flowRuleService;
     private CoreService coreService;
-    private GroupService groupService;
+    protected GroupService groupService;
     protected MeterService meterService;
-    private FlowObjectiveStore flowObjectiveStore;
+    protected FlowObjectiveStore flowObjectiveStore;
     protected DeviceId deviceId;
     protected ApplicationId appId;
     protected DeviceService deviceService;
 
-    private KryoNamespace appKryo = new KryoNamespace.Builder()
+    protected KryoNamespace appKryo = new KryoNamespace.Builder()
             .register(GroupKey.class)
             .register(DefaultGroupKey.class)
             .register(CorsaGroup.class)
@@ -108,6 +108,8 @@
             .build("AbstractCorsaPipeline");
 
     private Cache<GroupKey, NextObjective> pendingGroups;
+    protected Cache<Integer, NextObjective> pendingNext;
+
 
     private ScheduledExecutorService groupChecker =
             Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
@@ -131,6 +133,16 @@
                     }
                 }).build();
 
+        pendingNext = CacheBuilder.newBuilder()
+                .expireAfterWrite(20, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        notification.getValue().context()
+                                .ifPresent(c -> c.onError(notification.getValue(),
+                                                          ObjectiveError.FLOWINSTALLATIONFAILED));
+                    }
+                }).build();
+
         groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS);
 
         coreService = serviceDirectory.get(CoreService.class);
@@ -304,6 +316,7 @@
 
     @Override
     public void forward(ForwardingObjective fwd) {
+
         Collection<FlowRule> rules;
         FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
 
@@ -354,16 +367,20 @@
     private Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
         log.debug("Processing specific forwarding objective");
         TrafficSelector selector = fwd.selector();
-        EthTypeCriterion ethType =
+        EthTypeCriterion ethTypeCriterion =
                 (EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
-        if (ethType != null) {
-            short et = ethType.ethType().toShort();
+        VlanIdCriterion vlanIdCriterion =
+                (VlanIdCriterion) selector.getCriterion(Criterion.Type.VLAN_VID);
+        if (ethTypeCriterion != null) {
+            short et = ethTypeCriterion.ethType().toShort();
             if (et == Ethernet.TYPE_IPV4) {
                 return processSpecificRoute(fwd);
             } else if (et == Ethernet.TYPE_VLAN) {
                 /* The ForwardingObjective must specify VLAN ethtype in order to use the Transit Circuit */
                 return processSpecificSwitch(fwd);
             }
+        } else if (vlanIdCriterion != null) {
+            return processSpecificSwitch(fwd);
         }
 
         fail(fwd, ObjectiveError.UNSUPPORTED);
@@ -464,6 +481,41 @@
     //Hook for modifying Route flow rule
     protected abstract Builder processSpecificRoutingRule(Builder rb);
 
+    protected enum CorsaTrafficTreatmentType {
+        /**
+         * If the treatment has to be handled as group.
+         */
+        GROUP,
+        /**
+         * If the treatment has to be handled as simple set of actions.
+         */
+        ACTIONS
+    }
+
+    /**
+     * Helper class to encapsulate both traffic treatment and
+     * type of treatment.
+     */
+    protected class CorsaTrafficTreatment {
+
+        private CorsaTrafficTreatmentType type;
+        private TrafficTreatment trafficTreatment;
+
+        public CorsaTrafficTreatment(CorsaTrafficTreatmentType treatmentType, TrafficTreatment trafficTreatment) {
+            this.type = treatmentType;
+            this.trafficTreatment = trafficTreatment;
+        }
+
+        public CorsaTrafficTreatmentType type() {
+            return type;
+        }
+
+        public TrafficTreatment treatment() {
+            return trafficTreatment;
+        }
+
+    }
+
     @Override
     public void next(NextObjective nextObjective) {
         switch (nextObjective.type()) {
@@ -471,20 +523,25 @@
                 Collection<TrafficTreatment> treatments = nextObjective.next();
                 if (treatments.size() == 1) {
                     TrafficTreatment treatment = treatments.iterator().next();
-                    treatment = processNextTreatment(treatment);
-                    GroupBucket bucket =
-                            DefaultGroupBucket.createIndirectGroupBucket(treatment);
+                    CorsaTrafficTreatment corsaTreatment = processNextTreatment(treatment);
                     final GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
-                    GroupDescription groupDescription
-                            = new DefaultGroupDescription(deviceId,
-                            GroupDescription.Type.INDIRECT,
-                            new GroupBuckets(Collections
-                                    .singletonList(bucket)),
-                            key,
-                            null, // let group service determine group id
-                            nextObjective.appId());
-                    groupService.addGroup(groupDescription);
-                    pendingGroups.put(key, nextObjective);
+                    if (corsaTreatment.type() == CorsaTrafficTreatmentType.GROUP) {
+                        GroupBucket bucket = DefaultGroupBucket.createIndirectGroupBucket(corsaTreatment.treatment());
+                        GroupBuckets buckets = new GroupBuckets(Collections.singletonList(bucket));
+                        // group id == null, let group service determine group id
+                        GroupDescription groupDescription = new DefaultGroupDescription(deviceId,
+                                                                                        GroupDescription.Type.INDIRECT,
+                                                                                        buckets,
+                                                                                        key,
+                                                                                        null,
+                                                                                        nextObjective.appId());
+                        groupService.addGroup(groupDescription);
+                        pendingGroups.put(key, nextObjective);
+                    } else if (corsaTreatment.type() == CorsaTrafficTreatmentType.ACTIONS) {
+                        pendingNext.put(nextObjective.id(), nextObjective);
+                        flowObjectiveStore.putNextGroup(nextObjective.id(), new CorsaGroup(key));
+                        nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
+                    }
                 }
                 break;
             case HASHED:
@@ -501,8 +558,8 @@
     }
 
     //Hook for altering the NextObjective treatment
-    protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
-        return treatment;
+    protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) {
+        return new CorsaTrafficTreatment(CorsaTrafficTreatmentType.GROUP, treatment);
     }
 
     //Init helper: Table Miss = Drop