corsa support intent

Change-Id: I6eaa46d1ef39405defa3b0661e94d1cf285db332
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
index b650038..884e41a 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/DefaultSingleTablePipeline.java
@@ -15,7 +15,12 @@
  */
 package org.onosproject.driver.pipeline;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
 import org.onlab.osgi.ServiceDirectory;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.behaviour.NextGroup;
 import org.onosproject.net.behaviour.Pipeliner;
@@ -30,17 +35,20 @@
 import org.onosproject.net.flow.FlowRuleService;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.flow.instructions.Instructions;
 import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.GroupKey;
 import org.slf4j.Logger;
 
-import static org.slf4j.LoggerFactory.getLogger;
-
 import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Simple single table pipeline abstraction.
@@ -51,14 +59,36 @@
 
     private ServiceDirectory serviceDirectory;
     private FlowRuleService flowRuleService;
+    private FlowObjectiveStore flowObjectiveStore;
     private DeviceId deviceId;
 
+    private Cache<Integer, NextObjective> pendingNext;
+
+    private KryoNamespace appKryo = new KryoNamespace.Builder()
+            .register(GroupKey.class)
+            .register(DefaultGroupKey.class)
+            .register(SingleGroup.class)
+            .register(byte[].class)
+            .build();
+
+
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
         this.serviceDirectory = context.directory();
         this.deviceId = deviceId;
 
         flowRuleService = serviceDirectory.get(FlowRuleService.class);
+        flowObjectiveStore = serviceDirectory.get(FlowObjectiveStore.class);
+
+        pendingNext = CacheBuilder.newBuilder()
+                .expireAfterWrite(20, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        notification.getValue().context()
+                                .ifPresent(c -> c.onError(notification.getValue(),
+                                        ObjectiveError.FLOWINSTALLATIONFAILED));
+                    }
+                }).build();
     }
 
     @Override
@@ -109,33 +139,48 @@
 
     @Override
     public void forward(ForwardingObjective fwd) {
-        // Deal with SPECIFIC and VERSATILE in the same manner.
         TrafficSelector selector = fwd.selector();
-        TrafficTreatment treatment = fwd.treatment();
-        if ((fwd.treatment().deferred().size() == 0) &&
-                (fwd.treatment().immediate().size() == 0) &&
-                (fwd.treatment().tableTransition() == null) &&
-                (!fwd.treatment().clearedDeferred())) {
-            TrafficTreatment.Builder flowTreatment = DefaultTrafficTreatment.builder();
-            flowTreatment.add(Instructions.createNoAction());
-            treatment = flowTreatment.build();
-        }
 
-        FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
-                .forDevice(deviceId)
-                .withSelector(selector)
-                .withTreatment(treatment)
-                .fromApp(fwd.appId())
-                .withPriority(fwd.priority());
+        if (fwd.treatment() != null) {
+            // Deal with SPECIFIC and VERSATILE in the same manner.
+            FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+                    .forDevice(deviceId)
+                    .withSelector(selector)
+                    .fromApp(fwd.appId())
+                    .withPriority(fwd.priority())
+                    .withTreatment(fwd.treatment());
 
-        if (fwd.permanent()) {
-            ruleBuilder.makePermanent();
+            if (fwd.permanent()) {
+                ruleBuilder.makePermanent();
+            } else {
+                ruleBuilder.makeTemporary(fwd.timeout());
+            }
+            installObjective(ruleBuilder, fwd);
+
         } else {
-            ruleBuilder.makeTemporary(fwd.timeout());
+            NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
+            if (nextObjective != null) {
+                pendingNext.invalidate(fwd.nextId());
+                nextObjective.next().forEach(treat -> {
+                    FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+                            .forDevice(deviceId)
+                            .withSelector(selector)
+                            .fromApp(fwd.appId())
+                            .withPriority(fwd.priority())
+                            .withTreatment(treat);
+
+                    if (fwd.permanent()) {
+                        ruleBuilder.makePermanent();
+                    } else {
+                        ruleBuilder.makeTemporary(fwd.timeout());
+                    }
+                    installObjective(ruleBuilder, fwd);
+                });
+            } else {
+                fwd.context().ifPresent(c -> c.onError(fwd,
+                        ObjectiveError.GROUPMISSING));
+            }
         }
-
-        installObjective(ruleBuilder, fwd);
-
     }
 
     private void installObjective(FlowRule.Builder ruleBuilder, Objective objective) {
@@ -168,6 +213,10 @@
 
     @Override
     public void next(NextObjective nextObjective) {
+
+        pendingNext.put(nextObjective.id(), nextObjective);
+        flowObjectiveStore.putNextGroup(nextObjective.id(),
+                new SingleGroup(new DefaultGroupKey(appKryo.serialize(nextObjective.id()))));
     }
 
     @Override
@@ -176,4 +225,22 @@
         return null;
     }
 
+    private class SingleGroup implements NextGroup {
+
+        private final GroupKey key;
+
+        public SingleGroup(GroupKey key) {
+            this.key = key;
+        }
+
+        public GroupKey key() {
+            return key;
+        }
+
+        @Override
+        public byte[] data() {
+            return appKryo.serialize(key);
+        }
+
+    }
 }