corsa support intent

Change-Id: I6eaa46d1ef39405defa3b0661e94d1cf285db332
diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
index 2bb521c..8ae04d2 100644
--- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
+++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
@@ -20,6 +20,7 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalCause;
 import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableSet;
 import org.onlab.osgi.ServiceDirectory;
 import org.onlab.packet.Ethernet;
 import org.onlab.util.KryoNamespace;
@@ -347,7 +348,7 @@
                 fail(fwd, ObjectiveError.UNKNOWN);
                 log.warn("Unknown forwarding flag {}", fwd.flag());
         }
-        return Collections.emptySet();
+        return ImmutableSet.of();
     }
 
     private Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
@@ -366,14 +367,14 @@
         }
 
         fail(fwd, ObjectiveError.UNSUPPORTED);
-        return Collections.emptySet();
+        return ImmutableSet.of();
     }
 
     protected Collection<FlowRule> processSpecificSwitch(ForwardingObjective fwd) {
         /* Not supported by until CorsaPipelineV3 */
         log.warn("Vlan switching not supported in ovs-corsa driver");
         fail(fwd, ObjectiveError.UNSUPPORTED);
-        return Collections.emptySet();
+        return ImmutableSet.of();
     }
 
     private Collection<FlowRule> processVersatile(ForwardingObjective fwd) {
@@ -385,7 +386,7 @@
         if (ethType == null) {
             log.error("Versatile forwarding objective must include ethType");
             fail(fwd, ObjectiveError.UNKNOWN);
-            return Collections.emptySet();
+            return ImmutableSet.of();
         }
         Builder rule = DefaultFlowRule.builder()
                 .forDevice(deviceId)
@@ -404,7 +405,7 @@
         }
         log.warn("Driver does not support given versatile forwarding objective");
         fail(fwd, ObjectiveError.UNSUPPORTED);
-        return Collections.emptySet();
+        return ImmutableSet.of();
     }
 
     protected abstract Collection<FlowRule> processArpTraffic(ForwardingObjective fwd, Builder rule);
@@ -430,9 +431,13 @@
             if (group == null) {
                 log.warn("The group left!");
                 fail(fwd, ObjectiveError.GROUPMISSING);
-                return Collections.emptySet();
+                return ImmutableSet.of();
             }
             tb.group(group.id());
+        } else {
+            log.error("Missing NextObjective ID for ForwardingObjective {}", fwd.id());
+            fail(fwd, ObjectiveError.BADPARAMS);
+            return ImmutableSet.of();
         }
         Builder ruleBuilder = DefaultFlowRule.builder()
                 .fromApp(fwd.appId())
diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java
index 581a734..10e3f0b 100644
--- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java
+++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java
@@ -15,6 +15,7 @@
  */
 package org.onosproject.drivers.corsa;
 
+import com.google.common.collect.ImmutableSet;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
@@ -129,24 +130,24 @@
     @Override
     protected Collection<FlowRule> processArpTraffic(ForwardingObjective fwd, Builder rule) {
         //TODO
-        return Collections.emptyList();
+        return ImmutableSet.of();
     }
 
     @Override
     protected Collection<FlowRule> processLinkDiscovery(ForwardingObjective fwd, Builder rule) {
         //TODO
-        return Collections.emptyList();
+        return ImmutableSet.of();
     }
 
     @Override
     protected Collection<FlowRule> processIpTraffic(ForwardingObjective fwd, Builder rule) {
         //TODO
-        return Collections.emptyList();
+        return ImmutableSet.of();
     }
 
     @Override
     protected Builder processEthFiler(FilteringObjective filt, EthCriterion eth, PortCriterion port) {
-                         log.debug("adding rule for MAC: {}", eth.mac());
+        log.debug("adding rule for MAC: {}", eth.mac());
         TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
         TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
         selector.matchEthDst(eth.mac());
@@ -338,7 +339,7 @@
         processFlowRule(install, rule, "Provisioned ether type table ip");
     }
 
-    private void processFibTable(boolean install) {
+    protected void processFibTable(boolean install) {
         /* Default action */
         processTableMissDrop(install, FIB_TABLE, "Provisioned fib drop");
     }
diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java
index e9a331a..9979cc1 100644
--- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java
+++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java
@@ -16,6 +16,7 @@
 
 package org.onosproject.drivers.corsa;
 
+import com.google.common.collect.ImmutableSet;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IPv4;
 import org.onlab.packet.VlanId;
@@ -28,6 +29,8 @@
 import org.onosproject.net.flow.criteria.Criterion;
 import org.onosproject.net.flow.criteria.IPCriterion;
 import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.slf4j.Logger;
@@ -36,6 +39,10 @@
 import java.util.Collections;
 
 import static org.onosproject.net.flow.FlowRule.Builder;
+import static org.onosproject.net.flow.instructions.L2ModificationInstruction.L2SubType.ETH_DST;
+import static org.onosproject.net.flow.instructions.L2ModificationInstruction.L2SubType.ETH_SRC;
+import static org.onosproject.net.flow.instructions.L2ModificationInstruction.L2SubType.VLAN_ID;
+import static org.onosproject.net.flow.instructions.L2ModificationInstruction.L2SubType.VLAN_POP;
 import static org.slf4j.LoggerFactory.getLogger;
 
 public class CorsaPipelineV39 extends CorsaPipelineV3 {
@@ -55,13 +62,13 @@
         processL3IFMacDATable(true);       //Table 5
         processEtherTable(true);           //Table 6
         //TODO: to be implemented for intents
-        //processFibTable(true);           //Table 7
+        processFibTable(true);           //Table 7
         //processLocalTable(true);         //Table 9
     }
 
     @Override
     protected void processVlanCheckTable(boolean install) {
-        //FIXME: error
+        //current device pipeline reports errors, but it is a bug
         processTableMissGoTo(true, VLAN_CHECK_TABLE, VLAN_MAC_XLATE_TABLE, "Provisioned vlan tagged");
         //Tag untagged packets
         processUntaggedPackets(install);
@@ -197,7 +204,7 @@
         if (ipSrc != null) {
             log.warn("Driver does not currently handle matching Src IP");
             fail(fwd, ObjectiveError.UNSUPPORTED);
-            return Collections.emptySet();
+            return ImmutableSet.of();
         }
         IPCriterion ipDst = (IPCriterion) fwd.selector()
                 .getCriterion(Criterion.Type.IPV4_DST);
@@ -205,7 +212,7 @@
             log.error("Driver handles Dst IP matching as specific forwarding "
                     + "objective, not versatile");
             fail(fwd, ObjectiveError.UNSUPPORTED);
-            return Collections.emptySet();
+            return ImmutableSet.of();
         }
         IPProtocolCriterion ipProto = (IPProtocolCriterion) fwd.selector()
                 .getCriterion(Criterion.Type.IP_PROTO);
@@ -213,8 +220,33 @@
             log.warn("Driver automatically punts all packets reaching the "
                     + "LOCAL table to the controller");
             pass(fwd);
-            return Collections.emptySet();
+            return ImmutableSet.of();
         }
-        return Collections.emptySet();
+        return ImmutableSet.of();
+    }
+
+    @Override
+    protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
+        TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
+        tb.add(Instructions.popVlan());
+        treatment.immediate().stream()
+                .filter(i -> {
+                    switch (i.type()) {
+                        case L2MODIFICATION:
+                            L2ModificationInstruction l2i = (L2ModificationInstruction) i;
+                            if (l2i.subtype() == VLAN_ID ||
+                                    l2i.subtype() == VLAN_POP ||
+                                    l2i.subtype() == VLAN_POP ||
+                                    l2i.subtype() == ETH_DST ||
+                                    l2i.subtype() == ETH_SRC) {
+                                return true;
+                            }
+                        case OUTPUT:
+                            return true;
+                        default:
+                            return false;
+                    }
+                }).forEach(i -> tb.add(i));
+        return tb.build();
     }
 }
diff --git a/drivers/corsa/src/main/resources/corsa-drivers.xml b/drivers/corsa/src/main/resources/corsa-drivers.xml
index d4dd268..8f4c335 100644
--- a/drivers/corsa/src/main/resources/corsa-drivers.xml
+++ b/drivers/corsa/src/main/resources/corsa-drivers.xml
@@ -42,7 +42,7 @@
                    impl="org.onosproject.drivers.corsa.CorsaSwitchHandshaker"/>
     </driver>
     <driver name="corsa-v39"
-            manufacturer="Corsa" hwVersion="CDP6420-A00" swVersion="corsa-ovs-datapath 1.4.88">
+            manufacturer="Corsa" hwVersion="CDP6420-A00" swVersion="corsa-ovs-datapath 1.4.97">
         <behaviour api="org.onosproject.net.behaviour.Pipeliner"
                    impl="org.onosproject.drivers.corsa.CorsaPipelineV39"/>
         <behaviour api="org.onosproject.openflow.controller.driver.OpenFlowSwitchDriver"
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
index b650038..884e41a 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
@@ -15,7 +15,12 @@
  */
 package org.onosproject.driver.pipeline;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
 import org.onlab.osgi.ServiceDirectory;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.behaviour.NextGroup;
 import org.onosproject.net.behaviour.Pipeliner;
@@ -30,17 +35,20 @@
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.flow.instructions.Instructions;
 import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.GroupKey;
 import org.slf4j.Logger;
 
-import static org.slf4j.LoggerFactory.getLogger;
-
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Simple single table pipeline abstraction.
@@ -51,14 +59,36 @@
 
     private ServiceDirectory serviceDirectory;
     private FlowRuleService flowRuleService;
+    private FlowObjectiveStore flowObjectiveStore;
     private DeviceId deviceId;
 
+    private Cache<Integer, NextObjective> pendingNext;
+
+    private KryoNamespace appKryo = new KryoNamespace.Builder()
+            .register(GroupKey.class)
+            .register(DefaultGroupKey.class)
+            .register(SingleGroup.class)
+            .register(byte[].class)
+            .build();
+
+
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
         this.serviceDirectory = context.directory();
         this.deviceId = deviceId;
 
         flowRuleService = serviceDirectory.get(FlowRuleService.class);
+        flowObjectiveStore = serviceDirectory.get(FlowObjectiveStore.class);
+
+        pendingNext = CacheBuilder.newBuilder()
+                .expireAfterWrite(20, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        notification.getValue().context()
+                                .ifPresent(c -> c.onError(notification.getValue(),
+                                        ObjectiveError.FLOWINSTALLATIONFAILED));
+                    }
+                }).build();
     }
 
     @Override
@@ -109,33 +139,48 @@
 
     @Override
     public void forward(ForwardingObjective fwd) {
-        // Deal with SPECIFIC and VERSATILE in the same manner.
         TrafficSelector selector = fwd.selector();
-        TrafficTreatment treatment = fwd.treatment();
-        if ((fwd.treatment().deferred().size() == 0) &&
-                (fwd.treatment().immediate().size() == 0) &&
-                (fwd.treatment().tableTransition() == null) &&
-                (!fwd.treatment().clearedDeferred())) {
-            TrafficTreatment.Builder flowTreatment = DefaultTrafficTreatment.builder();
-            flowTreatment.add(Instructions.createNoAction());
-            treatment = flowTreatment.build();
-        }
 
-        FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
-                .forDevice(deviceId)
-                .withSelector(selector)
-                .withTreatment(treatment)
-                .fromApp(fwd.appId())
-                .withPriority(fwd.priority());
+        if (fwd.treatment() != null) {
+            // Deal with SPECIFIC and VERSATILE in the same manner.
+            FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+                    .forDevice(deviceId)
+                    .withSelector(selector)
+                    .fromApp(fwd.appId())
+                    .withPriority(fwd.priority())
+                    .withTreatment(fwd.treatment());
 
-        if (fwd.permanent()) {
-            ruleBuilder.makePermanent();
+            if (fwd.permanent()) {
+                ruleBuilder.makePermanent();
+            } else {
+                ruleBuilder.makeTemporary(fwd.timeout());
+            }
+            installObjective(ruleBuilder, fwd);
+
         } else {
-            ruleBuilder.makeTemporary(fwd.timeout());
+            NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
+            if (nextObjective != null) {
+                pendingNext.invalidate(fwd.nextId());
+                nextObjective.next().forEach(treat -> {
+                    FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+                            .forDevice(deviceId)
+                            .withSelector(selector)
+                            .fromApp(fwd.appId())
+                            .withPriority(fwd.priority())
+                            .withTreatment(treat);
+
+                    if (fwd.permanent()) {
+                        ruleBuilder.makePermanent();
+                    } else {
+                        ruleBuilder.makeTemporary(fwd.timeout());
+                    }
+                    installObjective(ruleBuilder, fwd);
+                });
+            } else {
+                fwd.context().ifPresent(c -> c.onError(fwd,
+                        ObjectiveError.GROUPMISSING));
+            }
         }
-
-        installObjective(ruleBuilder, fwd);
-
     }
 
     private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
@@ -168,6 +213,10 @@
 
     @Override
     public void next(NextObjective nextObjective) {
+
+        pendingNext.put(nextObjective.id(), nextObjective);
+        flowObjectiveStore.putNextGroup(nextObjective.id(),
+                new SingleGroup(new DefaultGroupKey(appKryo.serialize(nextObjective.id()))));
     }
 
     @Override
@@ -176,4 +225,22 @@
         return null;
     }
 
+    private class SingleGroup implements NextGroup {
+
+        private final GroupKey key;
+
+        public SingleGroup(GroupKey key) {
+            this.key = key;
+        }
+
+        public GroupKey key() {
+            return key;
+        }
+
+        @Override
+        public byte[] data() {
+            return appKryo.serialize(key);
+        }
+
+    }
 }