Flow Objective implementation
Provides an abstraction which isolates the application from any pipeline
knowledge. By using the provided objectives applications can express
their forwarding desires in a pipeline agnostic way. The objectives
are then consumed by a driver for the specific device who converts them
into the appropriate pipeline coherent flows.
Change-Id: I74a68b4971c367c0cd5b7de9d877abdd117afa98
diff --git a/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java b/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java
index 997c207..4c7bdee 100644
--- a/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java
+++ b/drivers/src/main/java/org/onosproject/driver/pipeline/OVSCorsaPipeline.java
@@ -15,15 +15,19 @@
*/
package org.onosproject.driver.pipeline;
-import com.google.common.collect.Sets;
-import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
import org.onlab.osgi.ServiceDirectory;
import org.onlab.packet.Ethernet;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.behaviour.NextGroup;
import org.onosproject.net.behaviour.Pipeliner;
import org.onosproject.net.behaviour.PipelinerContext;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
@@ -39,18 +43,37 @@
import org.onosproject.net.flow.criteria.Criteria;
import org.onosproject.net.flow.criteria.Criterion;
import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupEvent;
+import org.onosproject.net.group.GroupKey;
+import org.onosproject.net.group.GroupListener;
+import org.onosproject.net.group.GroupService;
import org.slf4j.Logger;
import java.util.Collection;
-import java.util.concurrent.Future;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
- * Corsa pipeline handler.
+ * OpenvSwitch emulation of the Corsa pipeline handler.
*/
public class OVSCorsaPipeline extends AbstractHandlerBehaviour implements Pipeliner {
@@ -63,17 +86,45 @@
private ServiceDirectory serviceDirectory;
private FlowRuleService flowRuleService;
private CoreService coreService;
+ private GroupService groupService;
+ private FlowObjectiveStore flowObjectiveStore;
private DeviceId deviceId;
private ApplicationId appId;
+ private KryoNamespace appKryo = new KryoNamespace.Builder()
+ .register(GroupKey.class)
+ .register(DefaultGroupKey.class)
+ .register(CorsaGroup.class)
+ .register(byte[].class)
+ .build();
+
+ private Cache<GroupKey, NextObjective> pendingGroups;
+
+ private ScheduledExecutorService groupChecker =
+ Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
+ "ovs-corsa-%d"));
+
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
this.serviceDirectory = context.directory();
this.deviceId = deviceId;
+ pendingGroups = CacheBuilder.newBuilder()
+ .expireAfterWrite(20, TimeUnit.SECONDS)
+ .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
+ if (notification.getCause() == RemovalCause.EXPIRED) {
+ fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
+ }
+ }).build();
+
+ groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS);
coreService = serviceDirectory.get(CoreService.class);
flowRuleService = serviceDirectory.get(FlowRuleService.class);
+ groupService = serviceDirectory.get(GroupService.class);
+ flowObjectiveStore = context.store();
+
+ groupService.addListener(new InnerGroupListener());
appId = coreService.registerApplication(
"org.onosproject.driver.OVSCorsaPipeline");
@@ -82,33 +133,159 @@
}
@Override
- public Future<Boolean> filter(Collection<FilteringObjective> filteringObjectives) {
- Collection<Future<Boolean>> results = Sets.newHashSet();
- filteringObjectives.stream()
- .filter(obj -> obj.type() == FilteringObjective.Type.PERMIT)
- .forEach(filtobj -> results.add(processFilter(filtobj,
- filtobj.op() == Objective.Operation.ADD,
- filtobj.appId()
- )));
+ public void filter(FilteringObjective filteringObjective) {
+ if (filteringObjective.type() == FilteringObjective.Type.PERMIT) {
+ processFilter(filteringObjective,
+ filteringObjective.op() == Objective.Operation.ADD,
+ filteringObjective.appId());
+ } else {
+ fail(filteringObjective, ObjectiveError.UNSUPPORTED);
+ }
+ }
- //TODO: return something more helpful/sensible in the future (no pun intended)
- return results.iterator().next();
+ @Override
+ public void forward(ForwardingObjective fwd) {
+ Collection<FlowRule> rules;
+ FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
+
+ rules = processForward(fwd);
+ switch (fwd.op()) {
+ case ADD:
+ rules.stream()
+ .filter(rule -> rule != null)
+ .forEach(flowBuilder::add);
+ break;
+ case REMOVE:
+ rules.stream()
+ .filter(rule -> rule != null)
+ .forEach(flowBuilder::remove);
+ break;
+ default:
+ fail(fwd, ObjectiveError.UNKNOWN);
+ log.warn("Unknown forwarding type {}", fwd.op());
+ }
+
+
+ flowRuleService.apply(flowBuilder.build(new FlowRuleOperationsContext() {
+ @Override
+ public void onSuccess(FlowRuleOperations ops) {
+ pass(fwd);
+ }
+
+ @Override
+ public void onError(FlowRuleOperations ops) {
+ fail(fwd, ObjectiveError.FLOWINSTALLATIONFAILED);
+ }
+ }));
}
- private Future<Boolean> processFilter(FilteringObjective filt, boolean install,
+ @Override
+ public void next(NextObjective nextObjective) {
+ switch (nextObjective.type()) {
+ case SIMPLE:
+ Collection<TrafficTreatment> treatments = nextObjective.next();
+ if (treatments.size() == 1) {
+ TrafficTreatment treatment = treatments.iterator().next();
+ GroupBucket bucket =
+ DefaultGroupBucket.createIndirectGroupBucket(treatment);
+ final GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
+ GroupDescription groupDescription
+ = new DefaultGroupDescription(deviceId,
+ GroupDescription.Type.INDIRECT,
+ new GroupBuckets(Collections
+ .singletonList(bucket)),
+ key,
+ nextObjective.appId());
+ groupService.addGroup(groupDescription);
+ pendingGroups.put(key, nextObjective);
+ }
+ break;
+ case HASHED:
+ case BROADCAST:
+ case FAILOVER:
+ fail(nextObjective, ObjectiveError.UNSUPPORTED);
+ log.warn("Unsupported next objective type {}", nextObjective.type());
+ break;
+ default:
+ fail(nextObjective, ObjectiveError.UNKNOWN);
+ log.warn("Unknown next objective type {}", nextObjective.type());
+ }
+
+ }
+
+ private Collection<FlowRule> processForward(ForwardingObjective fwd) {
+ switch (fwd.flag()) {
+ case SPECIFIC:
+ return processSpecific(fwd);
+ case VERSATILE:
+ return processVersatile(fwd);
+ default:
+ fail(fwd, ObjectiveError.UNKNOWN);
+ log.warn("Unknown forwarding flag {}", fwd.flag());
+ }
+ return Collections.emptySet();
+ }
+
+ private Collection<FlowRule> processVersatile(ForwardingObjective fwd) {
+ fail(fwd, ObjectiveError.UNSUPPORTED);
+ return Collections.emptySet();
+ }
+
+ private Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
+ log.warn("Processing specific");
+ TrafficSelector selector = fwd.selector();
+ Criteria.EthTypeCriterion ethType =
+ (Criteria.EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
+ if (ethType == null || ethType.ethType() != Ethernet.TYPE_IPV4) {
+ fail(fwd, ObjectiveError.UNSUPPORTED);
+ return Collections.emptySet();
+ }
+
+ TrafficSelector filteredSelector =
+ DefaultTrafficSelector.builder()
+ .matchEthType(Ethernet.TYPE_IPV4)
+ .matchIPDst(
+ ((Criteria.IPCriterion)
+ selector.getCriterion(Criterion.Type.IPV4_DST)).ip())
+ .build();
+
+ NextGroup next = flowObjectiveStore.getNextGroup(fwd.nextId());
+
+ GroupKey key = appKryo.deserialize(next.data());
+
+ Group group = groupService.getGroup(deviceId, key);
+
+ if (group == null) {
+ log.warn("The group left!");
+ fail(fwd, ObjectiveError.GROUPMISSING);
+ return Collections.emptySet();
+ }
+
+ TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+ .group(group.id())
+ .build();
+
+ return Collections.singletonList(
+ new DefaultFlowRule(deviceId, filteredSelector, treatment,
+ fwd.priority(), fwd.appId(), 0, fwd.permanent(),
+ FlowRule.Type.IP));
+
+ }
+
+ private void processFilter(FilteringObjective filt, boolean install,
ApplicationId applicationId) {
- SettableFuture<Boolean> result = SettableFuture.create();
// This driver only processes filtering criteria defined with switch
// ports as the key
- Criteria.PortCriterion p = null;
+ Criteria.PortCriterion p;
if (!filt.key().equals(Criteria.dummy()) &&
filt.key().type() == Criterion.Type.IN_PORT) {
p = (Criteria.PortCriterion) filt.key();
} else {
log.warn("No key defined in filtering objective from app: {}. Not"
+ "processing filtering objective", applicationId);
- return null;
+ fail(filt, ObjectiveError.UNKNOWN);
+ return;
}
// convert filtering conditions for switch-intfs into flowrules
FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
@@ -154,45 +331,45 @@
} else {
log.warn("Driver does not currently process filtering condition"
+ " of type: {}", c.type());
+ fail(filt, ObjectiveError.UNSUPPORTED);
}
}
// apply filtering flow rules
flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
@Override
public void onSuccess(FlowRuleOperations ops) {
- result.set(true);
+ pass(filt);
log.info("Provisioned default table for bgp router");
}
@Override
public void onError(FlowRuleOperations ops) {
- result.set(false);
+ fail(filt, ObjectiveError.FLOWINSTALLATIONFAILED);
log.info("Failed to provision default table for bgp router");
}
}));
-
- return result;
}
- @Override
- public Future<Boolean> forward(Collection<ForwardingObjective> forwardObjectives) {
- return null;
+ private void pass(Objective obj) {
+ if (obj.context().isPresent()) {
+ obj.context().get().onSuccess(obj);
+ }
}
- @Override
- public Future<Boolean> next(Collection<NextObjective> nextObjectives) {
- return null;
+ private void fail(Objective obj, ObjectiveError error) {
+ if (obj.context().isPresent()) {
+ obj.context().get().onError(obj, error);
+ }
}
private void pushDefaultRules() {
- boolean install = true;
- processTableZero(install);
- processTableOne(install);
- processTableTwo(install);
- processTableFour(install);
- processTableFive(install);
- processTableSix(install);
- processTableNine(install);
+ processTableZero(true);
+ processTableOne(true);
+ processTableTwo(true);
+ processTableFour(true);
+ processTableFive(true);
+ processTableSix(true);
+ processTableNine(true);
}
private void processTableZero(boolean install) {
@@ -447,4 +624,59 @@
}));
}
+ private class InnerGroupListener implements GroupListener {
+ @Override
+ public void event(GroupEvent event) {
+ if (event.type() == GroupEvent.Type.GROUP_ADDED) {
+ GroupKey key = event.subject().appCookie();
+
+ NextObjective obj = pendingGroups.getIfPresent(key);
+ if (obj != null) {
+ flowObjectiveStore.putNextGroup(obj.id(), new CorsaGroup(key));
+ pass(obj);
+ pendingGroups.invalidate(key);
+ }
+ }
+ }
+ }
+
+
+ private class GroupChecker implements Runnable {
+
+ @Override
+ public void run() {
+ Set<GroupKey> keys = pendingGroups.asMap().keySet().stream()
+ .filter(key -> groupService.getGroup(deviceId, key) != null)
+ .collect(Collectors.toSet());
+
+ keys.stream().forEach(key -> {
+ NextObjective obj = pendingGroups.getIfPresent(key);
+ if (obj == null) {
+ return;
+ }
+ pass(obj);
+ pendingGroups.invalidate(key);
+ flowObjectiveStore.putNextGroup(obj.id(), new CorsaGroup(key));
+ });
+ }
+ }
+
+ private class CorsaGroup implements NextGroup {
+
+ private final GroupKey key;
+
+ public CorsaGroup(GroupKey key) {
+ this.key = key;
+ }
+
+ public GroupKey key() {
+ return key;
+ }
+
+ @Override
+ public byte[] data() {
+ return appKryo.serialize(key);
+ }
+
+ }
}