Implements [CORD-96] and [CORD-410]

Changes:
- Introduces L2TunnelHandler for managing the pws;
- Supports pws initiation and pws policy for olt<->vsg communication;
- Supports teardown and update;

Change-Id: If51272c91445f618727434606edd2491f93cc4dd
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
index 2a4de98..9e3161a 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2GroupHandler.java
@@ -37,6 +37,7 @@
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.TunnelIdCriterion;
 import org.onosproject.net.flow.criteria.VlanIdCriterion;
 import org.onosproject.net.flow.instructions.Instruction;
 import org.onosproject.net.flow.instructions.Instructions;
@@ -80,6 +81,7 @@
 import static org.onosproject.driver.pipeline.Ofdpa2GroupHandler.OfdpaMplsGroupSubType.OFDPA_GROUP_TYPE_SHIFT;
 import static org.onosproject.driver.pipeline.Ofdpa2GroupHandler.OfdpaMplsGroupSubType.OFDPA_MPLS_SUBTYPE_SHIFT;
 import static org.onosproject.driver.pipeline.Ofdpa2Pipeline.isNotMplsBos;
+import static org.onosproject.net.flow.criteria.Criterion.Type.TUNNEL_ID;
 import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID;
 import static org.onosproject.net.flowobjective.NextObjective.Type.HASHED;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -284,32 +286,50 @@
         }
 
         boolean isMpls = false;
+        // In order to understand if it is a pseudo wire related
+        // next objective we look for the tunnel id in the meta.
+        boolean isPw = false;
         if (nextObj.meta() != null) {
             isMpls = isNotMplsBos(nextObj.meta());
+
+            TunnelIdCriterion tunnelIdCriterion = (TunnelIdCriterion) nextObj
+                    .meta()
+                    .getCriterion(TUNNEL_ID);
+            if (tunnelIdCriterion != null) {
+                isPw = true;
+            }
+
         }
 
-        // break up simple next objective to GroupChain objects
-        GroupInfo groupInfo = createL2L3Chain(treatment, nextObj.id(),
-                                              nextObj.appId(), isMpls,
-                                              nextObj.meta());
-        if (groupInfo == null) {
-            log.error("Could not process nextObj={} in dev:{}", nextObj.id(), deviceId);
-            return;
+        if (!isPw) {
+            // break up simple next objective to GroupChain objects
+            GroupInfo groupInfo = createL2L3Chain(treatment, nextObj.id(),
+                                                  nextObj.appId(), isMpls,
+                                                  nextObj.meta());
+            if (groupInfo == null) {
+                log.error("Could not process nextObj={} in dev:{}", nextObj.id(), deviceId);
+                return;
+            }
+            // create object for local and distributed storage
+            Deque<GroupKey> gkeyChain = new ArrayDeque<>();
+            gkeyChain.addFirst(groupInfo.innerMostGroupDesc.appCookie());
+            gkeyChain.addFirst(groupInfo.nextGroupDesc.appCookie());
+            OfdpaNextGroup ofdpaGrp =
+                    new OfdpaNextGroup(Collections.singletonList(gkeyChain), nextObj);
+
+            // store l3groupkey with the ofdpaNextGroup for the nextObjective that depends on it
+            updatePendingNextObjective(groupInfo.nextGroupDesc.appCookie(), ofdpaGrp);
+
+            // now we are ready to send the l2 groupDescription (inner), as all the stores
+            // that will get async replies have been updated. By waiting to update
+            // the stores, we prevent nasty race conditions.
+            groupService.addGroup(groupInfo.innerMostGroupDesc);
+        } else {
+            // We handle the pseudo wire with a different a procedure.
+            // This procedure is meant to handle both initiation and
+            // termination of the pseudo wire.
+            processPwNextObjective(nextObj);
         }
-        // create object for local and distributed storage
-        Deque<GroupKey> gkeyChain = new ArrayDeque<>();
-        gkeyChain.addFirst(groupInfo.innerMostGroupDesc.appCookie());
-        gkeyChain.addFirst(groupInfo.nextGroupDesc.appCookie());
-        OfdpaNextGroup ofdpaGrp =
-                new OfdpaNextGroup(Collections.singletonList(gkeyChain), nextObj);
-
-        // store l3groupkey with the ofdpaNextGroup for the nextObjective that depends on it
-        updatePendingNextObjective(groupInfo.nextGroupDesc.appCookie(), ofdpaGrp);
-
-        // now we are ready to send the l2 groupDescription (inner), as all the stores
-        // that will get async replies have been updated. By waiting to update
-        // the stores, we prevent nasty race conditions.
-        groupService.addGroup(groupInfo.innerMostGroupDesc);
     }
 
     /**
@@ -387,8 +407,8 @@
      *         error in processing the chain
      */
     protected GroupInfo createL2L3ChainInternal(TrafficTreatment treatment, int nextId,
-                                      ApplicationId appId, boolean mpls,
-                                      TrafficSelector meta, boolean useSetVlanExtension) {
+                                                ApplicationId appId, boolean mpls,
+                                                TrafficSelector meta, boolean useSetVlanExtension) {
         // for the l2interface group, get vlan and port info
         // for the outer group, get the src/dst mac, and vlan info
         TrafficTreatment.Builder outerTtb = DefaultTrafficTreatment.builder();
@@ -973,6 +993,18 @@
         }
     }
 
+    /**
+     * Processes the pseudo wire related next objective.
+     * This procedure try to reuse the mpls label groups,
+     * the mpls interface group and the l2 interface group.
+     *
+     * @param nextObjective the objective to process.
+     */
+    protected void processPwNextObjective(NextObjective nextObjective) {
+        log.warn("Pseudo wire extensions are not support for the OFDPA 2.0 {}", nextObjective.id());
+        return;
+    }
+
     //////////////////////////////////////
     //  Group Editing
     //////////////////////////////////////
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
index a49e4fa..e32f5d5 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa2Pipeline.java
@@ -102,8 +102,12 @@
  *
  */
 public class Ofdpa2Pipeline extends AbstractHandlerBehaviour implements Pipeliner {
+
     protected static final int PORT_TABLE = 0;
     protected static final int VLAN_TABLE = 10;
+    protected static final int VLAN_1_TABLE = 11;
+    protected static final int MPLS_L2_PORT_FLOW_TABLE = 13;
+    protected static final int MPLS_L2_PORT_PCP_TRUST_FLOW_TABLE = 16;
     protected static final int TMAC_TABLE = 20;
     protected static final int UNICAST_ROUTING_TABLE = 30;
     protected static final int MULTICAST_ROUTING_TABLE = 40;
@@ -120,6 +124,11 @@
     protected static final int DEFAULT_PRIORITY = 0x8000;
     protected static final int LOWEST_PRIORITY = 0x0;
 
+    protected static final int MPLS_L2_PORT_PRIORITY = 2;
+
+    protected static final int MPLS_TUNNEL_ID_BASE = 0x10000;
+    protected static final int MPLS_TUNNEL_ID_MAX = 0x1FFFF;
+
     private final Logger log = getLogger(getClass());
     protected ServiceDirectory serviceDirectory;
     protected FlowRuleService flowRuleService;
@@ -225,6 +234,7 @@
             rules.stream()
             .filter(Objects::nonNull)
             .forEach(flowOpsBuilder::remove);
+            log.debug("Deleting a flow rule to sw:{}", deviceId);
             break;
         default:
             fail(fwd, ObjectiveError.UNKNOWN);
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa3GroupHandler.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa3GroupHandler.java
index 5ba0446..1c30333 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa3GroupHandler.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa3GroupHandler.java
@@ -16,18 +16,338 @@
 
 package org.onosproject.driver.pipeline;
 
+import com.google.common.collect.Lists;
+import org.onlab.packet.VlanId;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.core.DefaultGroupId;
+import org.onosproject.driver.extensions.Ofdpa3PushCw;
+import org.onosproject.driver.extensions.Ofdpa3PushL2Header;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.TrafficSelector;
 import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.instructions.Instruction;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flow.instructions.L3ModificationInstruction;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.DefaultGroupBucket;
+import org.onosproject.net.group.DefaultGroupDescription;
+import org.onosproject.net.group.DefaultGroupKey;
+import org.onosproject.net.group.GroupBucket;
+import org.onosproject.net.group.GroupBuckets;
+import org.onosproject.net.group.GroupDescription;
+import org.onosproject.net.group.GroupKey;
+import org.slf4j.Logger;
+
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.List;
+
+import static org.onosproject.driver.pipeline.Ofdpa2GroupHandler.OfdpaMplsGroupSubType.*;
+import static org.onosproject.net.flow.instructions.L3ModificationInstruction.L3SubType.TTL_OUT;
+import static org.onosproject.net.group.GroupDescription.Type.INDIRECT;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Group handler for OFDPA2 pipeline.
  */
 public class Ofdpa3GroupHandler extends Ofdpa2GroupHandler {
+
+    private static final int PW_INTERNAL_VLAN = 4094;
+    private static final int MAX_DEPTH_UNPROTECTED_PW = 3;
+
+    private final Logger log = getLogger(getClass());
+
     @Override
     protected GroupInfo createL2L3Chain(TrafficTreatment treatment, int nextId,
                                         ApplicationId appId, boolean mpls,
                                         TrafficSelector meta) {
         return createL2L3ChainInternal(treatment, nextId, appId, mpls, meta, false);
     }
+
+    @Override
+    protected void processPwNextObjective(NextObjective nextObjective) {
+        TrafficTreatment treatment = nextObjective.next().iterator().next();
+        Deque<GroupKey> gkeyChain = new ArrayDeque<>();
+        GroupChainElem groupChainElem;
+        GroupKey groupKey;
+        GroupDescription groupDescription;
+        // Now we separate the mpls actions from the l2/l3 actions
+        TrafficTreatment.Builder l2L3Treatment = DefaultTrafficTreatment.builder();
+        TrafficTreatment.Builder mplsTreatment = DefaultTrafficTreatment.builder();
+        createL2L3AndMplsTreatments(treatment, l2L3Treatment, mplsTreatment);
+        // We create the chain from mpls intf group to
+        // l2 intf group.
+        GroupInfo groupInfo = createL2L3ChainInternal(
+                l2L3Treatment.build(),
+                nextObjective.id(),
+                nextObjective.appId(),
+                true,
+                nextObjective.meta(),
+                false
+        );
+        if (groupInfo == null) {
+            log.error("Could not process nextObj={} in dev:{}", nextObjective.id(), deviceId);
+            Ofdpa2Pipeline.fail(nextObjective, ObjectiveError.GROUPINSTALLATIONFAILED);
+            return;
+        }
+        // We update the chain with the last two groups;
+        gkeyChain.addFirst(groupInfo.getInnerMostGroupDesc().appCookie());
+        gkeyChain.addFirst(groupInfo.getNextGroupDesc().appCookie());
+        // We retrieve also all mpls instructions.
+        List<List<Instruction>> mplsInstructionSets = Lists.newArrayList();
+        List<Instruction> mplsInstructionSet = Lists.newArrayList();
+        L3ModificationInstruction l3Ins;
+        for (Instruction ins : treatment.allInstructions()) {
+            // Each mpls instruction set is delimited by a
+            // copy ttl outward action.
+            mplsInstructionSet.add(ins);
+            if (ins.type() == Instruction.Type.L3MODIFICATION) {
+                l3Ins = (L3ModificationInstruction) ins;
+                if (l3Ins.subtype() == TTL_OUT) {
+                    mplsInstructionSets.add(mplsInstructionSet);
+                    mplsInstructionSet = Lists.newArrayList();
+                }
+
+            }
+        }
+        if (mplsInstructionSets.size() > MAX_DEPTH_UNPROTECTED_PW) {
+            log.error("Next Objective for pseudo wire should have at "
+                              + "most {} mpls instruction sets. Next Objective Id:{}",
+                      MAX_DEPTH_UNPROTECTED_PW, nextObjective.id());
+            Ofdpa2Pipeline.fail(nextObjective, ObjectiveError.BADPARAMS);
+            return;
+        }
+        int nextGid = groupInfo.getNextGroupDesc().givenGroupId();
+        int index;
+        // We create the mpls tunnel label groups.
+        // In this case we need to use also the
+        // tunnel label group 2;
+        if (mplsInstructionSets.size() == MAX_DEPTH_UNPROTECTED_PW) {
+            // We deal with the label 2 group.
+            index = getNextAvailableIndex();
+            groupDescription = createMplsTunnelLabelGroup(
+                    nextGid,
+                    MPLS_TUNNEL_LABEL_2,
+                    index,
+                    mplsInstructionSets.get(2),
+                    nextObjective.appId()
+            );
+            groupKey = new DefaultGroupKey(
+                    Ofdpa2Pipeline.appKryo.serialize(index)
+            );
+            // We update the chain.
+            groupChainElem = new GroupChainElem(groupDescription, 1, false);
+            updatePendingGroups(
+                    groupInfo.getNextGroupDesc().appCookie(),
+                    groupChainElem
+            );
+            gkeyChain.addFirst(groupKey);
+            // We have to create tunnel label group and
+            // l2 vpn group before to send the inner most
+            // group. We update the nextGid.
+            nextGid = groupDescription.givenGroupId();
+            groupInfo = new GroupInfo(groupInfo.getInnerMostGroupDesc(), groupDescription);
+
+            log.debug("Trying Label 2 Group: device:{} gid:{} gkey:{} nextId:{}",
+                      deviceId, Integer.toHexString(nextGid),
+                      groupKey, nextObjective.id());
+        }
+        // We deal with the label 1 group.
+        index = getNextAvailableIndex();
+        groupDescription = createMplsTunnelLabelGroup(
+                nextGid,
+                MPLS_TUNNEL_LABEL_1,
+                index,
+                mplsInstructionSets.get(1),
+                nextObjective.appId()
+        );
+        groupKey = new DefaultGroupKey(
+                Ofdpa2Pipeline.appKryo.serialize(index)
+        );
+        groupChainElem = new GroupChainElem(groupDescription, 1, false);
+        updatePendingGroups(
+                groupInfo.getNextGroupDesc().appCookie(),
+                groupChainElem
+        );
+        gkeyChain.addFirst(groupKey);
+        // We have to create the l2 vpn group before
+        // to send the inner most group.
+        nextGid = groupDescription.givenGroupId();
+        groupInfo = new GroupInfo(groupInfo.getInnerMostGroupDesc(), groupDescription);
+
+        log.debug("Trying Label 1 Group: device:{} gid:{} gkey:{} nextId:{}",
+                  deviceId, Integer.toHexString(nextGid),
+                  groupKey, nextObjective.id());
+        // Finally we create the l2 vpn group.
+        index = getNextAvailableIndex();
+        groupDescription = createMplsL2VpnGroup(
+                nextGid,
+                index,
+                mplsInstructionSets.get(0),
+                nextObjective.appId()
+        );
+        groupKey = new DefaultGroupKey(
+                Ofdpa2Pipeline.appKryo.serialize(index)
+        );
+        groupChainElem = new GroupChainElem(groupDescription, 1, false);
+        updatePendingGroups(
+                groupInfo.getNextGroupDesc().appCookie(),
+                groupChainElem
+        );
+        gkeyChain.addFirst(groupKey);
+        OfdpaNextGroup ofdpaGrp = new OfdpaNextGroup(
+                Collections.singletonList(gkeyChain),
+                nextObjective
+        );
+        updatePendingNextObjective(groupKey, ofdpaGrp);
+
+        log.debug("Trying L2 Vpn Group: device:{} gid:{} gkey:{} nextId:{}",
+                  deviceId, Integer.toHexString(nextGid),
+                  groupKey, nextObjective.id());
+        // Finally we send the innermost group.
+        log.debug("Sending innermost group {} in group chain on device {} ",
+                  Integer.toHexString(groupInfo.getInnerMostGroupDesc().givenGroupId()), deviceId);
+        groupService.addGroup(groupInfo.getInnerMostGroupDesc());
+    }
+
+    /**
+     * Helper method to create a mpls tunnel label group.
+     *
+     * @param nextGroupId the next group in the chain
+     * @param subtype the mpls tunnel label group subtype
+     * @param index the index of the group
+     * @param instructions the instructions to push
+     * @param applicationId the application id
+     * @return the group description
+     */
+    private GroupDescription createMplsTunnelLabelGroup(int nextGroupId,
+                                                          OfdpaMplsGroupSubType subtype,
+                                                          int index,
+                                                          List<Instruction> instructions,
+                                                          ApplicationId applicationId) {
+        TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
+        // We add all the instructions.
+        instructions.forEach(treatment::add);
+        // We point the group to the next group.
+        treatment.group(new DefaultGroupId(nextGroupId));
+        GroupBucket groupBucket = DefaultGroupBucket
+                .createIndirectGroupBucket(treatment.build());
+        // Finally we build the group description.
+        int groupId = makeMplsLabelGroupId(subtype, index);
+        GroupKey groupKey = new DefaultGroupKey(
+                Ofdpa2Pipeline.appKryo.serialize(index)
+        );
+        return new DefaultGroupDescription(
+                deviceId,
+                INDIRECT,
+                new GroupBuckets(Collections.singletonList(groupBucket)),
+                groupKey,
+                groupId,
+                applicationId
+        );
+    }
+
+    /**
+     * Helper method to create a mpls l2 vpn group.
+     *
+     * @param nextGroupId the next group in the chain
+     * @param index the index of the group
+     * @param instructions the instructions to push
+     * @param applicationId the application id
+     * @return the group description
+     */
+    private GroupDescription createMplsL2VpnGroup(int nextGroupId,
+                                                    int index,
+                                                    List<Instruction> instructions,
+                                                    ApplicationId applicationId) {
+        TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
+        // We add the extensions and the instructions.
+        treatment.extension(new Ofdpa3PushL2Header(), deviceId);
+        treatment.pushVlan();
+        instructions.forEach(treatment::add);
+        treatment.extension(new Ofdpa3PushCw(), deviceId);
+        // We point the group to the next group.
+        treatment.group(new DefaultGroupId(nextGroupId));
+        GroupBucket groupBucket = DefaultGroupBucket
+                .createIndirectGroupBucket(treatment.build());
+        // Finally we build the group description.
+        int groupId = makeMplsLabelGroupId(L2_VPN, index);
+        GroupKey groupKey = new DefaultGroupKey(
+                Ofdpa2Pipeline.appKryo.serialize(index)
+        );
+        return new DefaultGroupDescription(
+                deviceId,
+                INDIRECT,
+                new GroupBuckets(Collections.singletonList(groupBucket)),
+                groupKey,
+                groupId,
+                applicationId
+        );
+    }
+
+    /**
+     * Helper method for dividing the l2/l3 instructions from the mpls
+     * instructions.
+     *
+     * @param treatment the treatment to analyze
+     * @param l2L3Treatment the l2/l3 treatment builder
+     * @param mplsTreatment the mpls treatment builder
+     */
+    private void createL2L3AndMplsTreatments(TrafficTreatment treatment,
+                                               TrafficTreatment.Builder l2L3Treatment,
+                                               TrafficTreatment.Builder mplsTreatment) {
+
+        for (Instruction ins : treatment.allInstructions()) {
+
+            if (ins.type() == Instruction.Type.L2MODIFICATION) {
+                L2ModificationInstruction l2ins = (L2ModificationInstruction) ins;
+                switch (l2ins.subtype()) {
+                    // These instructions have to go in the l2/l3 treatment.
+                    case ETH_DST:
+                    case ETH_SRC:
+                    case VLAN_ID:
+                    case VLAN_POP:
+                        l2L3Treatment.add(ins);
+                        break;
+                    // These instructions have to go in the mpls treatment.
+                    case MPLS_BOS:
+                    case DEC_MPLS_TTL:
+                    case MPLS_LABEL:
+                    case MPLS_PUSH:
+                        mplsTreatment.add(ins);
+                        break;
+                    default:
+                        log.warn("Driver does not handle this type of TrafficTreatment"
+                                         + " instruction in nextObjectives: {} - {}",
+                                 ins.type(), ins);
+                        break;
+                }
+            } else if (ins.type() == Instruction.Type.OUTPUT) {
+                // The output goes in the l2/l3 treatment.
+                l2L3Treatment.add(ins);
+            } else if (ins.type() == Instruction.Type.L3MODIFICATION) {
+                 // We support partially the l3 instructions.
+                L3ModificationInstruction l3ins = (L3ModificationInstruction) ins;
+                switch (l3ins.subtype()) {
+                    case TTL_OUT:
+                        mplsTreatment.add(ins);
+                        break;
+                    default:
+                        log.warn("Driver does not handle this type of TrafficTreatment"
+                                         + " instruction in nextObjectives: {} - {}",
+                                 ins.type(), ins);
+                }
+
+            } else {
+                log.warn("Driver does not handle this type of TrafficTreatment"
+                                 + " instruction in nextObjectives: {} - {}",
+                         ins.type(), ins);
+            }
+        }
+        // We add in a transparent way the set vlan to 4094.
+        l2L3Treatment.setVlanId(VlanId.vlanId((short) PW_INTERNAL_VLAN));
+    }
+    // TODO Introduce in the future an inner class to return two treatments
 }
diff --git a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa3Pipeline.java b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa3Pipeline.java
index 895c187..38fe384 100644
--- a/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa3Pipeline.java
+++ b/drivers/default/src/main/java/org/onosproject/driver/pipeline/Ofdpa3Pipeline.java
@@ -16,21 +16,57 @@
 
 package org.onosproject.driver.pipeline;
 
+import com.google.common.collect.ImmutableList;
 import org.onlab.packet.VlanId;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.driver.extensions.Ofdpa3MatchMplsL2Port;
+import org.onosproject.driver.extensions.Ofdpa3MatchOvid;
+import org.onosproject.driver.extensions.Ofdpa3SetMplsL2Port;
+import org.onosproject.driver.extensions.Ofdpa3SetMplsType;
+import org.onosproject.driver.extensions.Ofdpa3SetOvid;
+import org.onosproject.driver.extensions.Ofdpa3SetQosIndex;
+import org.onosproject.driver.extensions.OfdpaMatchVlanVid;
+import org.onosproject.net.behaviour.NextGroup;
 import org.onosproject.net.behaviour.PipelinerContext;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleOperations;
+import org.onosproject.net.flow.FlowRuleOperationsContext;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criterion;
 import org.onosproject.net.flow.criteria.PortCriterion;
+import org.onosproject.net.flow.criteria.TunnelIdCriterion;
 import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction.L2SubType;
+import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.group.Group;
+import org.onosproject.net.group.GroupKey;
+import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
 import java.util.List;
 
+import static org.onosproject.driver.extensions.Ofdpa3MplsType.VPWS;
+import static org.onosproject.net.flow.criteria.Criterion.Type.*;
+import static org.onosproject.net.flow.instructions.Instruction.Type.L2MODIFICATION;
+import static org.onosproject.net.flow.instructions.L2ModificationInstruction.ModTunnelIdInstruction;
+import static org.slf4j.LoggerFactory.getLogger;
+
 /**
  * Pipeliner for Broadcom OF-DPA 3.0 TTP.
  */
 public class Ofdpa3Pipeline extends Ofdpa2Pipeline {
+
+    private final Logger log = getLogger(getClass());
+
     @Override
     protected void initDriverId() {
         driverId = coreService.registerApplication(
@@ -44,6 +80,180 @@
     }
 
     @Override
+    protected void processFilter(FilteringObjective filteringObjective,
+                                 boolean install,
+                                 ApplicationId applicationId) {
+        // We are looking for inner vlan id criterion. We use this
+        // to identify the pseudo wire flows. In future we can enforce
+        // using also the tunnel id in the meta.
+        VlanIdCriterion innerVlanIdCriterion = null;
+        for (Criterion criterion : filteringObjective.conditions()) {
+            if (criterion.type() == INNER_VLAN_VID) {
+                innerVlanIdCriterion = (VlanIdCriterion) criterion;
+                break;
+            }
+        }
+        if (innerVlanIdCriterion != null) {
+            FlowRuleOperations.Builder ops = FlowRuleOperations.builder();
+            PortCriterion portCriterion;
+            VlanIdCriterion outerVlanIdCriterion = null;
+            // We extract the expected port criterion in the key.
+            portCriterion = (PortCriterion) filteringObjective.key();
+            // We extract the outer vlan id criterion.
+            for (Criterion criterion : filteringObjective.conditions()) {
+                if (criterion.type() == VLAN_VID) {
+                    outerVlanIdCriterion = (VlanIdCriterion) criterion;
+                    break;
+                }
+            }
+            // We extract the tunnel id.
+            long tunnelId;
+            if (filteringObjective.meta() != null &&
+                    filteringObjective.meta().allInstructions().size() != 1) {
+                log.warn("Bad filtering objective from app: {}. Not"
+                                 + "processing filtering objective", applicationId);
+                fail(filteringObjective, ObjectiveError.BADPARAMS);
+                return;
+            } else if (filteringObjective.meta() != null &&
+                    filteringObjective.meta().allInstructions().size() == 1 &&
+                    filteringObjective.meta().allInstructions().get(0).type() == L2MODIFICATION) {
+                L2ModificationInstruction l2instruction = (L2ModificationInstruction)
+                        filteringObjective.meta().allInstructions().get(0);
+                if (l2instruction.subtype() != L2SubType.TUNNEL_ID) {
+                    log.warn("Bad filtering objective from app: {}. Not"
+                                     + "processing filtering objective", applicationId);
+                    fail(filteringObjective, ObjectiveError.BADPARAMS);
+                    return;
+                } else {
+                    tunnelId = ((ModTunnelIdInstruction) l2instruction).tunnelId();
+                }
+            } else {
+                log.warn("Bad filtering objective from app: {}. Not"
+                                 + "processing filtering objective", applicationId);
+                fail(filteringObjective, ObjectiveError.BADPARAMS);
+                return;
+            }
+            // Mpls tunnel ids according to the OFDPA manual have to be
+            // in the range [2^17-1, 2^16].
+            tunnelId = MPLS_TUNNEL_ID_BASE | tunnelId;
+            // Sanity check for the filtering objective.
+            if (portCriterion == null ||
+                    outerVlanIdCriterion == null ||
+                    tunnelId > MPLS_TUNNEL_ID_MAX) {
+                log.warn("Bad filtering objective from app: {}. Not"
+                                 + "processing filtering objective", applicationId);
+                fail(filteringObjective, ObjectiveError.BADPARAMS);
+                return;
+            }
+            // 0x0000XXXX is UNI interface.
+            if (portCriterion.port().toLong() > 0x0000FFFF) {
+                log.error("Filering Objective invalid logical port {}",
+                          portCriterion.port().toLong());
+                fail(filteringObjective, ObjectiveError.BADPARAMS);
+                return;
+            }
+            // We create the flows.
+            List<FlowRule> pwRules = processPwFilter(portCriterion,
+                                                     innerVlanIdCriterion,
+                                                     outerVlanIdCriterion,
+                                                     tunnelId,
+                                                     applicationId
+            );
+            // We tag the flow for adding or for removing.
+            for (FlowRule pwRule : pwRules) {
+                log.debug("adding filtering rule in VLAN tables: {} for dev: {}",
+                          pwRule, deviceId);
+                ops = install ? ops.add(pwRule) : ops.remove(pwRule);
+            }
+            // We push the filtering rules for the pw.
+            flowRuleService.apply(ops.build(new FlowRuleOperationsContext() {
+                @Override
+                public void onSuccess(FlowRuleOperations ops) {
+                    log.info("Applied {} filtering rules in device {}",
+                             ops.stages().get(0).size(), deviceId);
+                    pass(filteringObjective);
+                }
+
+                @Override
+                public void onError(FlowRuleOperations ops) {
+                    log.info("Failed to apply all filtering rules in dev {}", deviceId);
+                    fail(filteringObjective, ObjectiveError.FLOWINSTALLATIONFAILED);
+                }
+            }));
+
+            return;
+        }
+        // If it is not a pseudo wire flow we fall back
+        // to the OFDPA 2.0 pipeline.
+        super.processFilter(filteringObjective, install, applicationId);
+    }
+
+    /**
+     * Method to process the pw related filtering objectives.
+     *
+     * @param portCriterion the in port match
+     * @param innerVlanIdCriterion the inner vlan match
+     * @param outerVlanIdCriterion the outer vlan match
+     * @param tunnelId the tunnel id
+     * @param applicationId the application id
+     * @return a list of flow rules to install
+     */
+    private List<FlowRule> processPwFilter(PortCriterion portCriterion,
+                                             VlanIdCriterion innerVlanIdCriterion,
+                                             VlanIdCriterion outerVlanIdCriterion,
+                                             long tunnelId,
+                                             ApplicationId applicationId) {
+        // As first we create the flow rule for the vlan 1 table.
+        FlowRule vlan1FlowRule;
+        int mplsLogicalPort = ((int) portCriterion.port().toLong());
+        // We have to match on the inner vlan and outer vlan at the same time.
+        // Ofdpa supports this through the OVID meta-data type.
+        TrafficSelector.Builder vlan1Selector = DefaultTrafficSelector.builder()
+                .matchInPort(portCriterion.port())
+                .extension(new OfdpaMatchVlanVid(innerVlanIdCriterion.vlanId()), deviceId)
+                .extension(new Ofdpa3MatchOvid(outerVlanIdCriterion.vlanId()), deviceId);
+        // TODO understand for the future how to manage the vlan rewrite.
+        TrafficTreatment.Builder vlan1Treatment = DefaultTrafficTreatment.builder()
+                .pushVlan()
+                .setVlanId(outerVlanIdCriterion.vlanId())
+                .extension(new Ofdpa3SetMplsType(VPWS), deviceId)
+                .extension(new Ofdpa3SetMplsL2Port(mplsLogicalPort), deviceId)
+                .setTunnelId(tunnelId)
+                .transition(MPLS_L2_PORT_FLOW_TABLE);
+        vlan1FlowRule = DefaultFlowRule.builder()
+                .forDevice(deviceId)
+                .withSelector(vlan1Selector.build())
+                .withTreatment(vlan1Treatment.build())
+                .withPriority(DEFAULT_PRIORITY)
+                .fromApp(applicationId)
+                .makePermanent()
+                .forTable(VLAN_1_TABLE)
+                .build();
+        // Finally we create the flow rule for the vlan table.
+        FlowRule vlanFlowRule;
+        // We have to match on the outer vlan.
+        TrafficSelector.Builder vlanSelector = DefaultTrafficSelector.builder()
+                .matchInPort(portCriterion.port())
+                .extension(new OfdpaMatchVlanVid(outerVlanIdCriterion.vlanId()), deviceId);
+        // TODO understand for the future how to manage the vlan rewrite.
+        TrafficTreatment.Builder vlanTreatment = DefaultTrafficTreatment.builder()
+                .popVlan()
+                .extension(new Ofdpa3SetOvid(outerVlanIdCriterion.vlanId()), deviceId)
+                .transition(VLAN_1_TABLE);
+        vlanFlowRule = DefaultFlowRule.builder()
+                .forDevice(deviceId)
+                .withSelector(vlanSelector.build())
+                .withTreatment(vlanTreatment.build())
+                .withPriority(DEFAULT_PRIORITY)
+                .fromApp(applicationId)
+                .makePermanent()
+                .forTable(VLAN_TABLE)
+                .build();
+
+        return ImmutableList.of(vlan1FlowRule, vlanFlowRule);
+    }
+
+    @Override
     protected List<FlowRule> processVlanIdFilter(PortCriterion portCriterion,
                                                  VlanIdCriterion vidCriterion,
                                                  VlanId assignedVlan,
@@ -59,4 +269,104 @@
         }
         return processEthTypeSpecificInternal(fwd, true, MPLS_L3_TYPE_TABLE);
     }
+
+    @Override
+    protected Collection<FlowRule> processVersatile(ForwardingObjective fwd) {
+        // We use the tunnel id to identify pw related flows.
+        TunnelIdCriterion tunnelIdCriterion = (TunnelIdCriterion) fwd.selector()
+                .getCriterion(TUNNEL_ID);
+        if (tunnelIdCriterion != null) {
+            return processPwVersatile(fwd);
+        }
+        // If it is not a pseudo wire flow we fall back
+        // to the OFDPA 2.0 pipeline.
+        return super.processVersatile(fwd);
+    }
+
+    /**
+     * Helper method to process the pw forwarding objectives.
+     *
+     * @param forwardingObjective the fw objective to process
+     * @return a singleton list of flow rule
+     */
+    private Collection<FlowRule> processPwVersatile(ForwardingObjective forwardingObjective) {
+        // We retrieve the matching criteria for mpls l2 port.
+        TunnelIdCriterion tunnelIdCriterion = (TunnelIdCriterion) forwardingObjective.selector()
+                .getCriterion(TUNNEL_ID);
+        PortCriterion portCriterion = (PortCriterion) forwardingObjective.selector()
+                .getCriterion(IN_PORT);
+        TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
+        TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
+        int mplsLogicalPort;
+        long tunnelId;
+        // Mpls tunnel ids according to the OFDPA manual have to be
+        // in the range [2^17-1, 2^16].
+        tunnelId = MPLS_TUNNEL_ID_BASE | tunnelIdCriterion.tunnelId();
+        if (tunnelId > MPLS_TUNNEL_ID_MAX) {
+            log.error("Pw Versatile Forwarding Objective must include tunnel id < {}",
+                      MPLS_TUNNEL_ID_MAX);
+            fail(forwardingObjective, ObjectiveError.BADPARAMS);
+            return Collections.emptySet();
+        }
+        // Port has not been null.
+        if (portCriterion == null) {
+            log.error("Pw Versatile Forwarding Objective must include port");
+            fail(forwardingObjective, ObjectiveError.BADPARAMS);
+            return Collections.emptySet();
+        }
+        // 0x0000XXXX is UNI interface.
+        if (portCriterion.port().toLong() > 0x0000FFFF) {
+            log.error("Pw Versatile Forwarding Objective invalid logical port {}",
+                      portCriterion.port().toLong());
+            fail(forwardingObjective, ObjectiveError.BADPARAMS);
+            return Collections.emptySet();
+        }
+        mplsLogicalPort = ((int) portCriterion.port().toLong());
+        if (forwardingObjective.nextId() == null) {
+            log.error("Pw Versatile Forwarding Objective must contain nextId ",
+                      forwardingObjective.nextId());
+            fail(forwardingObjective, ObjectiveError.BADPARAMS);
+            return Collections.emptySet();
+        }
+        // We don't expect a treatment.
+        if (forwardingObjective.treatment() != null &&
+                !forwardingObjective.treatment().equals(DefaultTrafficTreatment.emptyTreatment())) {
+            log.error("Pw Versatile Forwarding Objective cannot contain a treatment ",
+                      forwardingObjective.nextId());
+            fail(forwardingObjective, ObjectiveError.BADPARAMS);
+            return Collections.emptySet();
+        }
+        // We retrieve the l2 vpn group and point the mpls
+        // l2 port to this.
+        NextGroup next = getGroupForNextObjective(forwardingObjective.nextId());
+        if (next == null) {
+            log.warn("next-id:{} not found in dev:{}", forwardingObjective.nextId(), deviceId);
+            fail(forwardingObjective, ObjectiveError.GROUPMISSING);
+            return Collections.emptySet();
+        }
+        List<Deque<GroupKey>> gkeys = appKryo.deserialize(next.data());
+        Group group = groupService.getGroup(deviceId, gkeys.get(0).peekFirst());
+        if (group == null) {
+            log.warn("Group with key:{} for next-id:{} not found in dev:{}",
+                     gkeys.get(0).peekFirst(), forwardingObjective.nextId(), deviceId);
+            fail(forwardingObjective, ObjectiveError.GROUPMISSING);
+            return Collections.emptySet();
+        }
+        // We prepare the flow rule for the mpls l2 port table.
+        selector.matchTunnelId(tunnelId);
+        selector.extension(new Ofdpa3MatchMplsL2Port(mplsLogicalPort), deviceId);
+        // This should not be necessary but without we receive an error
+        treatment.extension(new Ofdpa3SetQosIndex(0), deviceId);
+        treatment.transition(MPLS_L2_PORT_PCP_TRUST_FLOW_TABLE);
+        treatment.deferred().group(group.id());
+        FlowRule.Builder ruleBuilder = DefaultFlowRule.builder()
+                .fromApp(forwardingObjective.appId())
+                .withPriority(MPLS_L2_PORT_PRIORITY)
+                .forDevice(deviceId)
+                .withSelector(selector.build())
+                .withTreatment(treatment.build())
+                .makePermanent()
+                .forTable(MPLS_L2_PORT_FLOW_TABLE);
+        return Collections.singletonList(ruleBuilder.build());
+    }
 }