Add support for vlan based intents in the Corsa driver
Changes:
- Improves processSpecific in AbstractCorsaPipeline in order to support
Intents without an explicit match on the Ethertype;
- Implements vlan based circuits in CorsaPipelineV3 through the management
of the FwdObjective without Treatment;
- Distinguish Groups from simple actions;
- Corsa group are identified using the actions of the treatment;
- handling of the pending next similar to DefaultSingleTablePipeline
Change-Id: Iff0f70d56c64193524c6640f31ffb3f5629499dc
diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
index ebc1da4..8083c2b 100644
--- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
+++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/AbstractCorsaPipeline.java
@@ -93,14 +93,14 @@
private ServiceDirectory serviceDirectory;
protected FlowRuleService flowRuleService;
private CoreService coreService;
- private GroupService groupService;
+ protected GroupService groupService;
protected MeterService meterService;
- private FlowObjectiveStore flowObjectiveStore;
+ protected FlowObjectiveStore flowObjectiveStore;
protected DeviceId deviceId;
protected ApplicationId appId;
protected DeviceService deviceService;
- private KryoNamespace appKryo = new KryoNamespace.Builder()
+ protected KryoNamespace appKryo = new KryoNamespace.Builder()
.register(GroupKey.class)
.register(DefaultGroupKey.class)
.register(CorsaGroup.class)
@@ -108,6 +108,8 @@
.build("AbstractCorsaPipeline");
private Cache<GroupKey, NextObjective> pendingGroups;
+ protected Cache<Integer, NextObjective> pendingNext;
+
private ScheduledExecutorService groupChecker =
Executors.newScheduledThreadPool(2, groupedThreads("onos/pipeliner",
@@ -131,6 +133,16 @@
}
}).build();
+ pendingNext = CacheBuilder.newBuilder()
+ .expireAfterWrite(20, TimeUnit.SECONDS)
+ .removalListener((RemovalNotification<Integer, NextObjective> notification) -> {
+ if (notification.getCause() == RemovalCause.EXPIRED) {
+ notification.getValue().context()
+ .ifPresent(c -> c.onError(notification.getValue(),
+ ObjectiveError.FLOWINSTALLATIONFAILED));
+ }
+ }).build();
+
groupChecker.scheduleAtFixedRate(new GroupChecker(), 0, 500, TimeUnit.MILLISECONDS);
coreService = serviceDirectory.get(CoreService.class);
@@ -304,6 +316,7 @@
@Override
public void forward(ForwardingObjective fwd) {
+
Collection<FlowRule> rules;
FlowRuleOperations.Builder flowBuilder = FlowRuleOperations.builder();
@@ -354,16 +367,20 @@
private Collection<FlowRule> processSpecific(ForwardingObjective fwd) {
log.debug("Processing specific forwarding objective");
TrafficSelector selector = fwd.selector();
- EthTypeCriterion ethType =
+ EthTypeCriterion ethTypeCriterion =
(EthTypeCriterion) selector.getCriterion(Criterion.Type.ETH_TYPE);
- if (ethType != null) {
- short et = ethType.ethType().toShort();
+ VlanIdCriterion vlanIdCriterion =
+ (VlanIdCriterion) selector.getCriterion(Criterion.Type.VLAN_VID);
+ if (ethTypeCriterion != null) {
+ short et = ethTypeCriterion.ethType().toShort();
if (et == Ethernet.TYPE_IPV4) {
return processSpecificRoute(fwd);
} else if (et == Ethernet.TYPE_VLAN) {
/* The ForwardingObjective must specify VLAN ethtype in order to use the Transit Circuit */
return processSpecificSwitch(fwd);
}
+ } else if (vlanIdCriterion != null) {
+ return processSpecificSwitch(fwd);
}
fail(fwd, ObjectiveError.UNSUPPORTED);
@@ -464,6 +481,41 @@
//Hook for modifying Route flow rule
protected abstract Builder processSpecificRoutingRule(Builder rb);
+ protected enum CorsaTrafficTreatmentType {
+ /**
+ * If the treatment has to be handled as group.
+ */
+ GROUP,
+ /**
+ * If the treatment has to be handled as simple set of actions.
+ */
+ ACTIONS
+ }
+
+ /**
+ * Helper class to encapsulate both traffic treatment and
+ * type of treatment.
+ */
+ protected class CorsaTrafficTreatment {
+
+ private CorsaTrafficTreatmentType type;
+ private TrafficTreatment trafficTreatment;
+
+ public CorsaTrafficTreatment(CorsaTrafficTreatmentType treatmentType, TrafficTreatment trafficTreatment) {
+ this.type = treatmentType;
+ this.trafficTreatment = trafficTreatment;
+ }
+
+ public CorsaTrafficTreatmentType type() {
+ return type;
+ }
+
+ public TrafficTreatment treatment() {
+ return trafficTreatment;
+ }
+
+ }
+
@Override
public void next(NextObjective nextObjective) {
switch (nextObjective.type()) {
@@ -471,20 +523,25 @@
Collection<TrafficTreatment> treatments = nextObjective.next();
if (treatments.size() == 1) {
TrafficTreatment treatment = treatments.iterator().next();
- treatment = processNextTreatment(treatment);
- GroupBucket bucket =
- DefaultGroupBucket.createIndirectGroupBucket(treatment);
+ CorsaTrafficTreatment corsaTreatment = processNextTreatment(treatment);
final GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
- GroupDescription groupDescription
- = new DefaultGroupDescription(deviceId,
- GroupDescription.Type.INDIRECT,
- new GroupBuckets(Collections
- .singletonList(bucket)),
- key,
- null, // let group service determine group id
- nextObjective.appId());
- groupService.addGroup(groupDescription);
- pendingGroups.put(key, nextObjective);
+ if (corsaTreatment.type() == CorsaTrafficTreatmentType.GROUP) {
+ GroupBucket bucket = DefaultGroupBucket.createIndirectGroupBucket(corsaTreatment.treatment());
+ GroupBuckets buckets = new GroupBuckets(Collections.singletonList(bucket));
+ // group id == null, let group service determine group id
+ GroupDescription groupDescription = new DefaultGroupDescription(deviceId,
+ GroupDescription.Type.INDIRECT,
+ buckets,
+ key,
+ null,
+ nextObjective.appId());
+ groupService.addGroup(groupDescription);
+ pendingGroups.put(key, nextObjective);
+ } else if (corsaTreatment.type() == CorsaTrafficTreatmentType.ACTIONS) {
+ pendingNext.put(nextObjective.id(), nextObjective);
+ flowObjectiveStore.putNextGroup(nextObjective.id(), new CorsaGroup(key));
+ nextObjective.context().ifPresent(context -> context.onSuccess(nextObjective));
+ }
}
break;
case HASHED:
@@ -501,8 +558,8 @@
}
//Hook for altering the NextObjective treatment
- protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
- return treatment;
+ protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) {
+ return new CorsaTrafficTreatment(CorsaTrafficTreatmentType.GROUP, treatment);
}
//Init helper: Table Miss = Drop
diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java
index 10e3f0b..b1e15b2 100644
--- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java
+++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV3.java
@@ -30,9 +30,12 @@
import org.onosproject.net.flow.criteria.IPCriterion;
import org.onosproject.net.flow.criteria.PortCriterion;
import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flowobjective.FilteringObjective;
import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.DefaultBand;
import org.onosproject.net.meter.DefaultMeterRequest;
@@ -69,9 +72,11 @@
protected MeterId defaultMeterId = null;
@Override
- protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
+ protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) {
TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
+
+
treatment.immediate().stream()
.filter(i -> {
switch (i.type()) {
@@ -87,7 +92,48 @@
return false;
}
}).forEach(i -> tb.add(i));
- return tb.build();
+
+ TrafficTreatment t = tb.build();
+
+
+ boolean isPresentModVlanId = false;
+ boolean isPresentModEthSrc = false;
+ boolean isPresentModEthDst = false;
+ boolean isPresentOutpuPort = false;
+
+ for (Instruction instruction : t.immediate()) {
+ switch (instruction.type()) {
+ case L2MODIFICATION:
+ L2ModificationInstruction l2i = (L2ModificationInstruction) instruction;
+ if (l2i instanceof L2ModificationInstruction.ModVlanIdInstruction) {
+ isPresentModVlanId = true;
+ }
+
+ if (l2i instanceof L2ModificationInstruction.ModEtherInstruction) {
+ L2ModificationInstruction.L2SubType subType = l2i.subtype();
+ if (subType.equals(L2ModificationInstruction.L2SubType.ETH_SRC)) {
+ isPresentModEthSrc = true;
+ } else if (subType.equals(L2ModificationInstruction.L2SubType.ETH_DST)) {
+ isPresentModEthDst = true;
+ }
+ }
+ case OUTPUT:
+ isPresentOutpuPort = true;
+ default:
+ }
+ }
+ CorsaTrafficTreatmentType type = CorsaTrafficTreatmentType.ACTIONS;
+ /**
+ * This represents the allowed group for CorsaPipelinev3
+ */
+ if (isPresentModVlanId &&
+ isPresentModEthSrc &&
+ isPresentModEthDst &&
+ isPresentOutpuPort) {
+ type = CorsaTrafficTreatmentType.GROUP;
+ }
+ CorsaTrafficTreatment corsaTreatment = new CorsaTrafficTreatment(type, t);
+ return corsaTreatment;
}
@Override
@@ -115,9 +161,37 @@
.withPriority(fwd.priority())
.forDevice(deviceId)
.withSelector(filteredSelector)
- .withTreatment(fwd.treatment())
.forTable(VLAN_CIRCUIT_TABLE);
+ if (fwd.treatment() != null) {
+ ruleBuilder.withTreatment(fwd.treatment());
+ } else {
+ if (fwd.nextId() != null) {
+ NextObjective nextObjective = pendingNext.getIfPresent(fwd.nextId());
+ if (nextObjective != null) {
+ pendingNext.invalidate(fwd.nextId());
+ TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder()
+ .setVlanPcp((byte) 0)
+ .setQueue(0)
+ .meter(defaultMeterId);
+ nextObjective.next().forEach(trafficTreatment -> {
+ trafficTreatment.allInstructions().forEach(instruction -> {
+ treatment.add(instruction);
+ });
+ });
+ ruleBuilder.withTreatment(treatment.build());
+ } else {
+ log.warn("The group left!");
+ fwd.context().ifPresent(c -> c.onError(fwd, ObjectiveError.GROUPMISSING));
+ return ImmutableSet.of();
+ }
+ } else {
+ log.warn("Missing NextObjective ID for ForwardingObjective {}", fwd.id());
+ fail(fwd, ObjectiveError.BADPARAMS);
+ return ImmutableSet.of();
+ }
+ }
+
if (fwd.permanent()) {
ruleBuilder.makePermanent();
} else {
diff --git a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java
index 9979cc1..514ac78 100644
--- a/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java
+++ b/drivers/corsa/src/main/java/org/onosproject/drivers/corsa/CorsaPipelineV39.java
@@ -29,6 +29,7 @@
import org.onosproject.net.flow.criteria.Criterion;
import org.onosproject.net.flow.criteria.IPCriterion;
import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.instructions.Instruction;
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flowobjective.ForwardingObjective;
@@ -226,9 +227,8 @@
}
@Override
- protected TrafficTreatment processNextTreatment(TrafficTreatment treatment) {
+ protected CorsaTrafficTreatment processNextTreatment(TrafficTreatment treatment) {
TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
- tb.add(Instructions.popVlan());
treatment.immediate().stream()
.filter(i -> {
switch (i.type()) {
@@ -236,7 +236,6 @@
L2ModificationInstruction l2i = (L2ModificationInstruction) i;
if (l2i.subtype() == VLAN_ID ||
l2i.subtype() == VLAN_POP ||
- l2i.subtype() == VLAN_POP ||
l2i.subtype() == ETH_DST ||
l2i.subtype() == ETH_SRC) {
return true;
@@ -247,6 +246,51 @@
return false;
}
}).forEach(i -> tb.add(i));
- return tb.build();
+
+ TrafficTreatment t = tb.build();
+
+ boolean isPresentModVlanId = false;
+ boolean isPresentModEthSrc = false;
+ boolean isPresentModEthDst = false;
+ boolean isPresentOutpuPort = false;
+
+ for (Instruction instruction : t.immediate()) {
+ switch (instruction.type()) {
+ case L2MODIFICATION:
+ L2ModificationInstruction l2i = (L2ModificationInstruction) instruction;
+ if (l2i instanceof L2ModificationInstruction.ModVlanIdInstruction) {
+ isPresentModVlanId = true;
+ }
+
+ if (l2i instanceof L2ModificationInstruction.ModEtherInstruction) {
+ L2ModificationInstruction.L2SubType subType = l2i.subtype();
+ if (subType.equals(L2ModificationInstruction.L2SubType.ETH_SRC)) {
+ isPresentModEthSrc = true;
+ } else if (subType.equals(L2ModificationInstruction.L2SubType.ETH_DST)) {
+ isPresentModEthDst = true;
+ }
+ }
+ case OUTPUT:
+ isPresentOutpuPort = true;
+ default:
+ }
+ }
+ CorsaTrafficTreatmentType type = CorsaTrafficTreatmentType.ACTIONS;
+ /**
+ * These are the allowed groups for CorsaPipelinev39
+ */
+ if (isPresentModVlanId && isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) {
+ type = CorsaTrafficTreatmentType.GROUP;
+
+ } else if ((!isPresentModVlanId && isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) ||
+ (!isPresentModVlanId && !isPresentModEthSrc && isPresentModEthDst && isPresentOutpuPort) ||
+ (!isPresentModVlanId && !isPresentModEthSrc && !isPresentModEthDst && isPresentOutpuPort)) {
+ type = CorsaTrafficTreatmentType.GROUP;
+ TrafficTreatment.Builder tb2 = DefaultTrafficTreatment.builder(t);
+ tb2.add(Instructions.popVlan());
+ t = tb2.build();
+ }
+ CorsaTrafficTreatment corsaTreatment = new CorsaTrafficTreatment(type, t);
+ return corsaTreatment;
}
}