added support for multicast in olt pipeline
Change-Id: I25c6ab18d23035094851b0800f719f28e210d403
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
index c6d9f2b..de90b9d 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
@@ -15,6 +15,10 @@
*/
package org.onosproject.driver.pipeline;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
import com.google.common.collect.Lists;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
@@ -22,20 +26,14 @@
import org.onlab.packet.EthType;
import org.onlab.packet.IPv4;
import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.net.DefaultAnnotations;
-import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.NextGroup;
import org.onosproject.net.behaviour.Pipeliner;
import org.onosproject.net.behaviour.PipelinerContext;
-import org.onosproject.net.device.DefaultDeviceDescription;
-import org.onosproject.net.device.DeviceDescription;
-import org.onosproject.net.device.DeviceProvider;
-import org.onosproject.net.device.DeviceProviderRegistry;
-import org.onosproject.net.device.DeviceService;
import org.onosproject.net.driver.AbstractHandlerBehaviour;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
@@ -56,20 +54,32 @@
import org.onosproject.net.flow.instructions.Instructions;
import org.onosproject.net.flow.instructions.L2ModificationInstruction;
import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.NextObjective;
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.provider.AbstractProvider;
-import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupEvent;
+import org.onosproject.net.group.GroupKey;
+import org.onosproject.net.group.GroupListener;
+import org.onosproject.net.group.GroupService;
+import org.onosproject.store.serializers.KryoNamespaces;
import org.slf4j.Logger;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -79,35 +89,55 @@
public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
private static final Integer QQ_TABLE = 1;
+ private static final short MCAST_VLAN = 4000;
private final Logger log = getLogger(getClass());
- static final ProviderId PID = new ProviderId("olt", "org.onosproject.olt", true);
-
- static final String DEVICE = "isAccess";
- static final String OLT = "true";
-
private ServiceDirectory serviceDirectory;
private FlowRuleService flowRuleService;
- private DeviceId deviceId;
+ private GroupService groupService;
private CoreService coreService;
+ private DeviceId deviceId;
private ApplicationId appId;
- private DeviceProvider provider = new AnnotationProvider();
+ protected FlowObjectiveStore flowObjectiveStore;
+
+ private Cache<GroupKey, NextObjective> pendingGroups;
+
+ protected static KryoNamespace appKryo = new KryoNamespace.Builder()
+ .register(KryoNamespaces.API)
+ .register(GroupKey.class)
+ .register(DefaultGroupKey.class)
+ .register(OLTPipelineGroup.class)
+ .register(byte[].class)
+ .build();
@Override
public void init(DeviceId deviceId, PipelinerContext context) {
log.debug("Initiate OLT pipeline");
this.serviceDirectory = context.directory();
this.deviceId = deviceId;
- DeviceProviderRegistry registry =
- serviceDirectory.get(DeviceProviderRegistry.class);
+
flowRuleService = serviceDirectory.get(FlowRuleService.class);
coreService = serviceDirectory.get(CoreService.class);
+ groupService = serviceDirectory.get(GroupService.class);
+ flowObjectiveStore = context.store();
+
appId = coreService.registerApplication(
"org.onosproject.driver.OLTPipeline");
+
+ pendingGroups = CacheBuilder.newBuilder()
+ .expireAfterWrite(20, TimeUnit.SECONDS)
+ .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
+ if (notification.getCause() == RemovalCause.EXPIRED) {
+ fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
+ }
+ }).build();
+
+ groupService.addListener(new InnerGroupListener());
+
}
@Override
@@ -164,6 +194,12 @@
@Override
public void forward(ForwardingObjective fwd) {
+
+ if (checkForMulticast(fwd)) {
+ processMulticastRule(fwd);
+ return;
+ }
+
TrafficTreatment treatment = fwd.treatment();
List<Instruction> instructions = treatment.allInstructions();
@@ -198,9 +234,113 @@
}
+
@Override
public void next(NextObjective nextObjective) {
- throw new UnsupportedOperationException("OLT does not next hop.");
+ if (nextObjective.type() != NextObjective.Type.BROADCAST) {
+ log.error("OLT only supports broadcast groups.");
+ fail(nextObjective, ObjectiveError.BADPARAMS);
+ }
+
+ if (nextObjective.next().size() != 1) {
+ log.error("OLT only supports singleton broadcast groups.");
+ fail(nextObjective, ObjectiveError.BADPARAMS);
+ }
+
+ TrafficTreatment treatment = nextObjective.next().stream().findFirst().get();
+
+
+ GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
+ GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
+
+ GroupDescription groupDesc =
+ new DefaultGroupDescription(deviceId,
+ GroupDescription.Type.ALL,
+ new GroupBuckets(Collections.singletonList(bucket)),
+ key,
+ null,
+ nextObjective.appId());
+
+ pendingGroups.put(key, nextObjective);
+
+ switch (nextObjective.op()) {
+ case ADD:
+ groupService.addGroup(groupDesc);
+ break;
+ case REMOVE:
+ groupService.removeGroup(deviceId, key, nextObjective.appId());
+ break;
+ case ADD_TO_EXISTING:
+ case REMOVE_FROM_EXISTING:
+ //TODO: handle addition to group when caller signals it.
+ break;
+ default:
+ log.warn("Unknown next objective operation: {}", nextObjective.op());
+ }
+
+
+ }
+
+ private void processMulticastRule(ForwardingObjective fwd) {
+ if (fwd.nextId() == null) {
+ log.error("Multicast objective does not have a next id");
+ fail(fwd, ObjectiveError.BADPARAMS);
+ }
+
+ OLTPipelineGroup next = getGroupForNextObjective(fwd.nextId());
+
+ if (next == null) {
+ log.error("Group for forwarding objective missing: {}", fwd);
+ fail(fwd, ObjectiveError.GROUPMISSING);
+ }
+
+ Group group = groupService.getGroup(deviceId, next.key());
+ TrafficTreatment treatment =
+ buildTreatment(Instructions.createGroup(group.id()));
+
+ FlowRule rule = DefaultFlowRule.builder()
+ .forDevice(deviceId)
+ .forTable(0)
+ .fromApp(fwd.appId())
+ .makePermanent()
+ .withPriority(fwd.priority())
+ .withSelector(fwd.selector())
+ .withTreatment(treatment)
+ .build();
+
+ FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+ switch (fwd.op()) {
+
+ case ADD:
+ builder.add(rule);
+ break;
+ case REMOVE:
+ builder.remove(rule);
+ break;
+ case ADD_TO_EXISTING:
+ case REMOVE_FROM_EXISTING:
+ break;
+ default:
+ log.warn("Unknown forwarding operation: {}", fwd.op());
+ }
+
+ applyFlowRules(builder, fwd);
+
+ }
+
+ private boolean checkForMulticast(ForwardingObjective fwd) {
+
+ VlanIdCriterion vlan = (VlanIdCriterion) filterForCriterion(fwd.selector().criteria(),
+ Criterion.Type.VLAN_VID);
+
+ return (vlan != null && vlan.vlanId().equals(VlanId.vlanId(MCAST_VLAN)));
+
+ }
+
+ private OLTPipelineGroup getGroupForNextObjective(Integer nextId) {
+ NextGroup next = flowObjectiveStore.getNextGroup(nextId);
+ return (OLTPipelineGroup) appKryo.deserialize(next.data());
+
}
private void installDownstreamRules(ForwardingObjective fwd) {
@@ -338,7 +478,7 @@
private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
- L2ModificationInstruction.L2SubType type) {
+ L2ModificationInstruction.L2SubType type) {
List<Instruction> vlanPushs = findL2Instructions(
type,
@@ -494,53 +634,39 @@
}
}
- /**
- * Build a device description.
- *
- * @param deviceId a deviceId
- * @param key the key of the annotation
- * @param value the value for the annotation
- * @return a device description
- */
- private DeviceDescription description(DeviceId deviceId, String key, String value) {
- DeviceService deviceService = serviceDirectory.get(DeviceService.class);
- Device device = deviceService.getDevice(deviceId);
- checkNotNull(device, "Device not found in device service.");
-
- DefaultAnnotations.Builder builder = DefaultAnnotations.builder();
- if (value != null) {
- builder.set(key, value);
- } else {
- builder.remove(key);
- }
- return new DefaultDeviceDescription(device.id().uri(), device.type(),
- device.manufacturer(), device.hwVersion(),
- device.swVersion(), device.serialNumber(),
- device.chassisId(), builder.build());
- }
-
- /**
- * Simple ancillary provider used to annotate device.
- */
- private static final class AnnotationProvider
- extends AbstractProvider implements DeviceProvider {
- private AnnotationProvider() {
- super(PID);
- }
-
+ private class InnerGroupListener implements GroupListener {
@Override
- public void triggerProbe(DeviceId deviceId) {
- }
+ public void event(GroupEvent event) {
+ if (event.type() == GroupEvent.Type.GROUP_ADDED) {
+ GroupKey key = event.subject().appCookie();
- @Override
- public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
- }
-
- @Override
- public boolean isReachable(DeviceId deviceId) {
- return false;
+ NextObjective obj = pendingGroups.getIfPresent(key);
+ if (obj != null) {
+ flowObjectiveStore.putNextGroup(obj.id(), new OLTPipelineGroup(key));
+ pass(obj);
+ pendingGroups.invalidate(key);
+ }
+ }
}
}
+ private static class OLTPipelineGroup implements NextGroup {
+
+ private final GroupKey key;
+
+ public OLTPipelineGroup(GroupKey key) {
+ this.key = key;
+ }
+
+ public GroupKey key() {
+ return key;
+ }
+
+ @Override
+ public byte[] data() {
+ return appKryo.serialize(key);
+ }
+
+ }
}