Flow Objective implementation

Provides an abstraction which isolates the application from any pipeline
knowledge. By using the provided objectives applications can express
their forwarding desires in a pipeline agnostic way. The objectives
are then consumed by a driver for the specific device who converts them
into the appropriate pipeline coherent flows.

Change-Id: I74a68b4971c367c0cd5b7de9d877abdd117afa98
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java b/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java
index 997c207..4c7bdee 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java
@@ -15,15 +15,19 @@
  */
 package org.onosproject.driver.pipeline;
 
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
 import org.onlab.osgi.ServiceDirectory;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.NextGroup;
 import org.onosproject.net.behaviour.Pipeliner;
 import org.onosproject.net.behaviour.PipelinerContext;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
@@ -39,18 +43,37 @@
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flow.criteria.Criterion;
 import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupEvent;
+import org.onosproject.net.group.GroupKey;
+import org.onosproject.net.group.GroupListener;
+import org.onosproject.net.group.GroupService;
 import org.slf4j.Logger;
 
 import java.util.Collection;
-import java.util.concurrent.Future;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
- * Corsa pipeline handler.
+ * OpenvSwitch emulation of the Corsa pipeline handler.
  */
 public class OVSCorsaPipeline extends AbstractHandlerBehaviour implements Pipeliner {
 
@@ -63,17 +86,45 @@
     private ServiceDirectory serviceDirectory;
     private FlowRuleService flowRuleService;
     private CoreService coreService;
+    private GroupService groupService;
+    private FlowObjectiveStore flowObjectiveStore;
     private DeviceId deviceId;
     private ApplicationId appId;
 
+    private KryoNamespace appKryo = new KryoNamespace.Builder()
+            .register(GroupKey.class)
+            .register(DefaultGroupKey.class)
+            .register(CorsaGroup.class)
+            .register(byte[].class)
+            .build();
+
+    private Cache<GroupKey, NextObjective> pendingGroups;
+
+    private ScheduledExecutorService groupChecker =
+            Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
+                                                               "ovs-corsa-%d"));
+
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
         this.serviceDirectory = context.directory();
         this.deviceId = deviceId;
 
+        pendingGroups = CacheBuilder.newBuilder()
+                .expireAfterWrite(20, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
+                    }
+                }).build();
+
+        groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS);
 
         coreService = serviceDirectory.get(CoreService.class);
         flowRuleService = serviceDirectory.get(FlowRuleService.class);
+        groupService = serviceDirectory.get(GroupService.class);
+        flowObjectiveStore = context.store();
+
+        groupService.addListener(new InnerGroupListener());
 
         appId = coreService.registerApplication(
                 "org.onosproject.driver.OVSCorsaPipeline");
@@ -82,33 +133,159 @@
     }
 
     @Override
-    public Future<Boolean> filter(Collection<FilteringObjective> filteringObjectives) {
-        Collection<Future<Boolean>> results = Sets.newHashSet();
-        filteringObjectives.stream()
-                .filter(obj -> obj.type() == FilteringObjective.Type.PERMIT)
-                .forEach(filtobj -> results.add(processFilter(filtobj,
-                                        filtobj.op() == Objective.Operation.ADD,
-                                        filtobj.appId()
-                        )));
+    public void filter(FilteringObjective filteringObjective) {
+        if (filteringObjective.type() == FilteringObjective.Type.PERMIT) {
+            processFilter(filteringObjective,
+                          filteringObjective.op() == Objective.Operation.ADD,
+                          filteringObjective.appId());
+        } else {
+            fail(filteringObjective, ObjectiveError.UNSUPPORTED);
+        }
+    }
 
-        //TODO: return something more helpful/sensible in the future (no pun intended)
-        return results.iterator().next();
+    @Override
+    public void forward(ForwardingObjective fwd) {
+        Collection<FlowRule> rules;
+        FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
+
+        rules = processForward(fwd);
+        switch (fwd.op()) {
+            case ADD:
+                rules.stream()
+                        .filter(rule -> rule != null)
+                        .forEach(flowBuilder::add);
+                break;
+            case REMOVE:
+                rules.stream()
+                        .filter(rule -> rule != null)
+                        .forEach(flowBuilder::remove);
+                break;
+            default:
+                fail(fwd, ObjectiveError.UNKNOWN);
+                log.warn("Unknown forwarding type {}", fwd.op());
+        }
+
+
+        flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
+            @Override
+            public void onSuccess(FlowRuleOperations ops) {
+                pass(fwd);
+            }
+
+            @Override
+            public void onError(FlowRuleOperations ops) {
+                fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
+            }
+        }));
 
     }
 
-    private Future<Boolean> processFilter(FilteringObjective filt, boolean install,
+    @Override
+    public void next(NextObjective nextObjective) {
+        switch (nextObjective.type()) {
+            case SIMPLE:
+                Collection<TrafficTreatment> treatments = nextObjective.next();
+                if (treatments.size() == 1) {
+                    TrafficTreatment treatment = treatments.iterator().next();
+                    GroupBucket bucket =
+                            DefaultGroupBucket.createIndirectGroupBucket(treatment);
+                    final GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
+                    GroupDescription groupDescription
+                            = new DefaultGroupDescription(deviceId,
+                                    GroupDescription.Type.INDIRECT,
+                                    new GroupBuckets(Collections
+                                                             .singletonList(bucket)),
+                                    key,
+                                    nextObjective.appId());
+                    groupService.addGroup(groupDescription);
+                    pendingGroups.put(key, nextObjective);
+                }
+                break;
+            case HASHED:
+            case BROADCAST:
+            case FAILOVER:
+                fail(nextObjective, ObjectiveError.UNSUPPORTED);
+                log.warn("Unsupported next objective type {}", nextObjective.type());
+                break;
+            default:
+                fail(nextObjective, ObjectiveError.UNKNOWN);
+                log.warn("Unknown next objective type {}", nextObjective.type());
+        }
+
+    }
+
+    private Collection<FlowRule> processForward(ForwardingObjective fwd) {
+        switch (fwd.flag()) {
+            case SPECIFIC:
+                return processSpecific(fwd);
+            case VERSATILE:
+                return processVersatile(fwd);
+            default:
+                fail(fwd, ObjectiveError.UNKNOWN);
+                log.warn("Unknown forwarding flag {}", fwd.flag());
+        }
+        return Collections.emptySet();
+    }
+
+    private Collection<FlowRule> processVersatile(ForwardingObjective fwd) {
+        fail(fwd, ObjectiveError.UNSUPPORTED);
+        return Collections.emptySet();
+    }
+
+    private Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
+        log.warn("Processing specific");
+        TrafficSelector selector = fwd.selector();
+        Criteria.EthTypeCriterion ethType =
+                (Criteria.EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
+        if (ethType == null || ethType.ethType() != Ethernet.TYPE_IPV4) {
+            fail(fwd, ObjectiveError.UNSUPPORTED);
+            return Collections.emptySet();
+        }
+
+        TrafficSelector filteredSelector =
+                DefaultTrafficSelector.builder()
+                        .matchEthType(Ethernet.TYPE_IPV4)
+                        .matchIPDst(
+                                ((Criteria.IPCriterion)
+                                        selector.getCriterion(Criterion.Type.IPV4_DST)).ip())
+                        .build();
+
+        NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
+
+        GroupKey key = appKryo.deserialize(next.data());
+
+        Group group = groupService.getGroup(deviceId, key);
+
+        if (group == null) {
+            log.warn("The group left!");
+            fail(fwd, ObjectiveError.GROUPMISSING);
+            return Collections.emptySet();
+        }
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .group(group.id())
+                .build();
+
+        return Collections.singletonList(
+                new DefaultFlowRule(deviceId, filteredSelector, treatment,
+                                   fwd.priority(), fwd.appId(), 0, fwd.permanent(),
+                                   FlowRule.Type.IP));
+
+    }
+
+    private void processFilter(FilteringObjective filt, boolean install,
                                              ApplicationId applicationId) {
-        SettableFuture<Boolean> result = SettableFuture.create();
         // This driver only processes filtering criteria defined with switch
         // ports as the key
-        Criteria.PortCriterion p = null;
+        Criteria.PortCriterion p;
         if (!filt.key().equals(Criteria.dummy()) &&
                 filt.key().type() == Criterion.Type.IN_PORT) {
             p = (Criteria.PortCriterion) filt.key();
         } else {
             log.warn("No key defined in filtering objective from app: {}. Not"
                     + "processing filtering objective", applicationId);
-            return null;
+            fail(filt, ObjectiveError.UNKNOWN);
+            return;
         }
         // convert filtering conditions for switch-intfs into flowrules
         FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
@@ -154,45 +331,45 @@
             } else {
                 log.warn("Driver does not currently process filtering condition"
                         + " of type: {}", c.type());
+                fail(filt, ObjectiveError.UNSUPPORTED);
             }
         }
         // apply filtering flow rules
         flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
             @Override
             public void onSuccess(FlowRuleOperations ops) {
-                result.set(true);
+                pass(filt);
                 log.info("Provisioned default table for bgp router");
             }
 
             @Override
             public void onError(FlowRuleOperations ops) {
-                result.set(false);
+                fail(filt, ObjectiveError.FLOWINSTALLATIONFAILED);
                 log.info("Failed to provision default table for bgp router");
             }
         }));
-
-        return result;
     }
 
-    @Override
-    public Future<Boolean> forward(Collection<ForwardingObjective> forwardObjectives) {
-        return null;
+    private void pass(Objective obj) {
+        if (obj.context().isPresent()) {
+            obj.context().get().onSuccess(obj);
+        }
     }
 
-    @Override
-    public Future<Boolean> next(Collection<NextObjective> nextObjectives) {
-        return null;
+    private void fail(Objective obj, ObjectiveError error) {
+        if (obj.context().isPresent()) {
+            obj.context().get().onError(obj, error);
+        }
     }
 
     private void pushDefaultRules() {
-        boolean install = true;
-        processTableZero(install);
-        processTableOne(install);
-        processTableTwo(install);
-        processTableFour(install);
-        processTableFive(install);
-        processTableSix(install);
-        processTableNine(install);
+        processTableZero(true);
+        processTableOne(true);
+        processTableTwo(true);
+        processTableFour(true);
+        processTableFive(true);
+        processTableSix(true);
+        processTableNine(true);
     }
 
     private void processTableZero(boolean install) {
@@ -447,4 +624,59 @@
         }));
     }
 
+    private class InnerGroupListener implements GroupListener {
+        @Override
+        public void event(GroupEvent event) {
+            if (event.type() == GroupEvent.Type.GROUP_ADDED) {
+                GroupKey key = event.subject().appCookie();
+
+                NextObjective obj = pendingGroups.getIfPresent(key);
+                if (obj != null) {
+                    flowObjectiveStore.putNextGroup(obj.id(), new CorsaGroup(key));
+                    pass(obj);
+                    pendingGroups.invalidate(key);
+                }
+            }
+        }
+    }
+
+
+    private class GroupChecker implements Runnable {
+
+        @Override
+        public void run() {
+            Set<GroupKey> keys = pendingGroups.asMap().keySet().stream()
+                    .filter(key -> groupService.getGroup(deviceId, key) != null)
+                    .collect(Collectors.toSet());
+
+            keys.stream().forEach(key -> {
+                NextObjective obj = pendingGroups.getIfPresent(key);
+                if (obj == null) {
+                    return;
+                }
+                pass(obj);
+                pendingGroups.invalidate(key);
+                flowObjectiveStore.putNextGroup(obj.id(), new CorsaGroup(key));
+            });
+        }
+    }
+
+    private class CorsaGroup implements NextGroup {
+
+        private final GroupKey key;
+
+        public CorsaGroup(GroupKey key) {
+            this.key = key;
+        }
+
+        public GroupKey key() {
+            return key;
+        }
+
+        @Override
+        public byte[] data() {
+            return appKryo.serialize(key);
+        }
+
+    }
 }