added support for multicast in olt pipeline

Change-Id: I25c6ab18d23035094851b0800f719f28e210d403
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
index c6d9f2b..de90b9d 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/OltPipeline.java
@@ -15,6 +15,10 @@
  */
 package org.onosproject.driver.pipeline;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalCause;
+import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
@@ -22,20 +26,14 @@
 import org.onlab.packet.EthType;
 import org.onlab.packet.IPv4;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
-import org.onosproject.net.DefaultAnnotations;
-import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
 import org.onosproject.net.PortNumber;
+import org.onosproject.net.behaviour.NextGroup;
 import org.onosproject.net.behaviour.Pipeliner;
 import org.onosproject.net.behaviour.PipelinerContext;
-import org.onosproject.net.device.DefaultDeviceDescription;
-import org.onosproject.net.device.DeviceDescription;
-import org.onosproject.net.device.DeviceProvider;
-import org.onosproject.net.device.DeviceProviderRegistry;
-import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.driver.AbstractHandlerBehaviour;
 import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficSelector;
@@ -56,20 +54,32 @@
 import org.onosproject.net.flow.instructions.Instructions;
 import org.onosproject.net.flow.instructions.L2ModificationInstruction;
 import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveStore;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.net.provider.AbstractProvider;
-import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupEvent;
+import org.onosproject.net.group.GroupKey;
+import org.onosproject.net.group.GroupListener;
+import org.onosproject.net.group.GroupService;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -79,35 +89,55 @@
 public class OltPipeline extends AbstractHandlerBehaviour implements Pipeliner {
 
     private static final Integer QQ_TABLE = 1;
+    private static final short MCAST_VLAN = 4000;
     private final Logger log = getLogger(getClass());
 
-    static final ProviderId PID = new ProviderId("olt", "org.onosproject.olt", true);
-
-    static final String DEVICE = "isAccess";
-    static final String OLT = "true";
-
     private ServiceDirectory serviceDirectory;
     private FlowRuleService flowRuleService;
-    private DeviceId deviceId;
+    private GroupService groupService;
     private CoreService coreService;
 
+    private DeviceId deviceId;
     private ApplicationId appId;
 
-    private DeviceProvider provider = new AnnotationProvider();
+    protected FlowObjectiveStore flowObjectiveStore;
+
+    private Cache<GroupKey, NextObjective> pendingGroups;
+
+    protected static KryoNamespace appKryo = new KryoNamespace.Builder()
+            .register(KryoNamespaces.API)
+            .register(GroupKey.class)
+            .register(DefaultGroupKey.class)
+            .register(OLTPipelineGroup.class)
+            .register(byte[].class)
+            .build();
 
     @Override
     public void init(DeviceId deviceId, PipelinerContext context) {
         log.debug("Initiate OLT pipeline");
         this.serviceDirectory = context.directory();
         this.deviceId = deviceId;
-        DeviceProviderRegistry registry =
-                serviceDirectory.get(DeviceProviderRegistry.class);
+
         flowRuleService = serviceDirectory.get(FlowRuleService.class);
         coreService = serviceDirectory.get(CoreService.class);
+        groupService = serviceDirectory.get(GroupService.class);
+        flowObjectiveStore = context.store();
+
 
         appId = coreService.registerApplication(
                 "org.onosproject.driver.OLTPipeline");
 
+
+        pendingGroups = CacheBuilder.newBuilder()
+                .expireAfterWrite(20, TimeUnit.SECONDS)
+                .removalListener((RemovalNotification<GroupKey, NextObjective> notification) -> {
+                    if (notification.getCause() == RemovalCause.EXPIRED) {
+                        fail(notification.getValue(), ObjectiveError.GROUPINSTALLATIONFAILED);
+                    }
+                }).build();
+
+        groupService.addListener(new InnerGroupListener());
+
     }
 
     @Override
@@ -164,6 +194,12 @@
 
     @Override
     public void forward(ForwardingObjective fwd) {
+
+        if (checkForMulticast(fwd)) {
+            processMulticastRule(fwd);
+            return;
+        }
+
         TrafficTreatment treatment = fwd.treatment();
 
         List<Instruction> instructions = treatment.allInstructions();
@@ -198,9 +234,113 @@
 
     }
 
+
     @Override
     public void next(NextObjective nextObjective) {
-        throw new UnsupportedOperationException("OLT does not next hop.");
+        if (nextObjective.type() != NextObjective.Type.BROADCAST) {
+            log.error("OLT only supports broadcast groups.");
+            fail(nextObjective, ObjectiveError.BADPARAMS);
+        }
+
+        if (nextObjective.next().size() != 1) {
+            log.error("OLT only supports singleton broadcast groups.");
+            fail(nextObjective, ObjectiveError.BADPARAMS);
+        }
+
+        TrafficTreatment treatment = nextObjective.next().stream().findFirst().get();
+
+
+        GroupBucket bucket = DefaultGroupBucket.createAllGroupBucket(treatment);
+        GroupKey key = new DefaultGroupKey(appKryo.serialize(nextObjective.id()));
+
+        GroupDescription groupDesc =
+                new DefaultGroupDescription(deviceId,
+                                            GroupDescription.Type.ALL,
+                                            new GroupBuckets(Collections.singletonList(bucket)),
+                                            key,
+                                            null,
+                                            nextObjective.appId());
+
+        pendingGroups.put(key, nextObjective);
+
+        switch (nextObjective.op()) {
+            case ADD:
+                groupService.addGroup(groupDesc);
+                break;
+            case REMOVE:
+                groupService.removeGroup(deviceId, key, nextObjective.appId());
+                break;
+            case ADD_TO_EXISTING:
+            case REMOVE_FROM_EXISTING:
+                //TODO: handle addition to group when caller signals it.
+                break;
+            default:
+                log.warn("Unknown next objective operation: {}", nextObjective.op());
+        }
+
+
+    }
+
+    private void processMulticastRule(ForwardingObjective fwd) {
+        if (fwd.nextId() == null) {
+            log.error("Multicast objective does not have a next id");
+            fail(fwd, ObjectiveError.BADPARAMS);
+        }
+
+        OLTPipelineGroup next = getGroupForNextObjective(fwd.nextId());
+
+        if (next == null) {
+            log.error("Group for forwarding objective missing: {}", fwd);
+            fail(fwd, ObjectiveError.GROUPMISSING);
+        }
+
+        Group group = groupService.getGroup(deviceId, next.key());
+        TrafficTreatment treatment =
+                buildTreatment(Instructions.createGroup(group.id()));
+
+        FlowRule rule = DefaultFlowRule.builder()
+                .forDevice(deviceId)
+                .forTable(0)
+                .fromApp(fwd.appId())
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(fwd.selector())
+                .withTreatment(treatment)
+                .build();
+
+        FlowRuleOperations.Builder builder = FlowRuleOperations.builder();
+        switch (fwd.op()) {
+
+            case ADD:
+                builder.add(rule);
+                break;
+            case REMOVE:
+                builder.remove(rule);
+                break;
+            case ADD_TO_EXISTING:
+            case REMOVE_FROM_EXISTING:
+                break;
+            default:
+                log.warn("Unknown forwarding operation: {}", fwd.op());
+        }
+
+        applyFlowRules(builder, fwd);
+
+    }
+
+    private boolean checkForMulticast(ForwardingObjective fwd) {
+
+        VlanIdCriterion vlan = (VlanIdCriterion) filterForCriterion(fwd.selector().criteria(),
+                                                                    Criterion.Type.VLAN_VID);
+
+        return (vlan != null && vlan.vlanId().equals(VlanId.vlanId(MCAST_VLAN)));
+
+    }
+
+    private OLTPipelineGroup getGroupForNextObjective(Integer nextId) {
+        NextGroup next = flowObjectiveStore.getNextGroup(nextId);
+        return (OLTPipelineGroup) appKryo.deserialize(next.data());
+
     }
 
     private void installDownstreamRules(ForwardingObjective fwd) {
@@ -338,7 +478,7 @@
 
 
     private List<Pair<Instruction, Instruction>> findVlanOps(List<Instruction> instructions,
-             L2ModificationInstruction.L2SubType type) {
+                                                             L2ModificationInstruction.L2SubType type) {
 
         List<Instruction> vlanPushs = findL2Instructions(
                 type,
@@ -494,53 +634,39 @@
         }
     }
 
-    /**
-     * Build a device description.
-     *
-     * @param deviceId a deviceId
-     * @param key      the key of the annotation
-     * @param value    the value for the annotation
-     * @return a device description
-     */
-    private DeviceDescription description(DeviceId deviceId, String key, String value) {
-        DeviceService deviceService = serviceDirectory.get(DeviceService.class);
-        Device device = deviceService.getDevice(deviceId);
 
-        checkNotNull(device, "Device not found in device service.");
-
-        DefaultAnnotations.Builder builder = DefaultAnnotations.builder();
-        if (value != null) {
-            builder.set(key, value);
-        } else {
-            builder.remove(key);
-        }
-        return new DefaultDeviceDescription(device.id().uri(), device.type(),
-                                            device.manufacturer(), device.hwVersion(),
-                                            device.swVersion(), device.serialNumber(),
-                                            device.chassisId(), builder.build());
-    }
-
-    /**
-     * Simple ancillary provider used to annotate device.
-     */
-    private static final class AnnotationProvider
-            extends AbstractProvider implements DeviceProvider {
-        private AnnotationProvider() {
-            super(PID);
-        }
-
+    private class InnerGroupListener implements GroupListener {
         @Override
-        public void triggerProbe(DeviceId deviceId) {
-        }
+        public void event(GroupEvent event) {
+            if (event.type() == GroupEvent.Type.GROUP_ADDED) {
+                GroupKey key = event.subject().appCookie();
 
-        @Override
-        public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
-        }
-
-        @Override
-        public boolean isReachable(DeviceId deviceId) {
-            return false;
+                NextObjective obj = pendingGroups.getIfPresent(key);
+                if (obj != null) {
+                    flowObjectiveStore.putNextGroup(obj.id(), new OLTPipelineGroup(key));
+                    pass(obj);
+                    pendingGroups.invalidate(key);
+                }
+            }
         }
     }
 
+    private static class OLTPipelineGroup implements NextGroup {
+
+        private final GroupKey key;
+
+        public OLTPipelineGroup(GroupKey key) {
+            this.key = key;
+        }
+
+        public GroupKey key() {
+            return key;
+        }
+
+        @Override
+        public byte[] data() {
+            return appKryo.serialize(key);
+        }
+
+    }
 }