Implements [CORD-587] and [CORD-588]

Changes:
- Add termination in SR app;
- Add termination in the drivers

Change-Id: Ia9bb31c2c2e20acab8d6bfe27113f7421a8b83da
diff --git a/src/main/java/org/onosproject/segmentrouting/pwaas/L2TunnelHandler.java b/src/main/java/org/onosproject/segmentrouting/pwaas/L2TunnelHandler.java
index 5f0ed35..8206180 100644
--- a/src/main/java/org/onosproject/segmentrouting/pwaas/L2TunnelHandler.java
+++ b/src/main/java/org/onosproject/segmentrouting/pwaas/L2TunnelHandler.java
@@ -19,6 +19,7 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.RandomUtils;
+import org.onlab.packet.Ethernet;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.MplsLabel;
 import org.onlab.packet.VlanId;
@@ -63,7 +64,10 @@
 import static org.onosproject.net.flowobjective.ForwardingObjective.Flag.VERSATILE;
 import static org.onosproject.segmentrouting.pwaas.L2Mode.TAGGED;
 import static org.onosproject.segmentrouting.pwaas.L2TunnelHandler.Pipeline.INITIATION;
+import static org.onosproject.segmentrouting.pwaas.L2TunnelHandler.Pipeline.TERMINATION;
 import static org.onosproject.segmentrouting.pwaas.L2TunnelHandler.Result.*;
+import static org.onosproject.segmentrouting.pwaas.L2TunnelHandler.Direction.FWD;
+import static org.onosproject.segmentrouting.pwaas.L2TunnelHandler.Direction.REV;
 
 /**
  * Handles pwaas related events.
@@ -71,20 +75,30 @@
 public class L2TunnelHandler {
 
     private static final Logger log = LoggerFactory.getLogger(L2TunnelHandler.class);
-
-    private static final String FWD = "f";
-    private static final String REV = "r";
-
-    private static final String NOT_MASTER = "Not master controller";
+    /**
+     * Error message for invalid paths.
+     */
     private static final String WRONG_TOPOLOGY = "Path in leaf-spine topology" +
             " should always be two hops: ";
 
     private final SegmentRoutingManager srManager;
-
-    private final ConsistentMap<String, NextObjective> l2InitiationNextObjStore;
-
     /**
-     * TODO a proper store is necessary to handle the policies and collisions.
+     * To store the next objectives related to the initiation.
+     */
+    private final ConsistentMap<String, NextObjective> l2InitiationNextObjStore;
+    /**
+     * To store the next objectives related to the termination.
+     */
+    private final ConsistentMap<String, NextObjective> l2TerminationNextObjStore;
+    /**
+     * TODO a proper store is necessary to handle the policies, collisions and recovery.
+     * We should have a proper store for the policies and the tunnels. For several reasons:
+     * 1) We should avoid the overlapping of different policies;
+     * 2) We should avoid the overlapping of different tunnels;
+     * 3) We should have a proper mechanism for the protection;
+     * The most important one is 3). At least for 3.0 EA0 was not possible
+     * to remove the bucket, so we need a mapping between policies and tunnel
+     * in order to proper update the fwd objective for the recovery of a fault.
      */
     private final KryoNamespace.Builder l2TunnelKryo;
 
@@ -104,6 +118,12 @@
                 .withName("onos-l2initiation-nextobj-store")
                 .withSerializer(Serializer.using(l2TunnelKryo.build()))
                 .build();
+
+        l2TerminationNextObjStore = srManager.storageService
+                .<String, NextObjective>consistentMapBuilder()
+                .withName("onos-l2termination-nextobj-store")
+                .withSerializer(Serializer.using(l2TunnelKryo.build()))
+                .build();
     }
 
     /**
@@ -122,6 +142,11 @@
         deploy(pwToAdd);
     }
 
+    /**
+     * To deploy a number of pseudo wires.
+     *
+     * @param pwToAdd the set of pseudo wires to add
+     */
     private void deploy(Set<DefaultL2TunnelDescription> pwToAdd) {
         Result result;
         long l2TunnelId;
@@ -129,7 +154,7 @@
             l2TunnelId = currentL2Tunnel.l2Tunnel().tunnelId();
             // The tunnel id cannot be 0.
             if (l2TunnelId == 0) {
-                log.warn("Tunnel id cannot be 0");
+                log.warn("Tunnel id id must be > 0");
                 continue;
             }
             // We do a sanity check of the pseudo wire.
@@ -138,7 +163,7 @@
                 continue;
             }
             // We establish the tunnel.
-            result = deployPseudoWire(
+            result = deployPseudoWireInit(
                     currentL2Tunnel.l2Tunnel(),
                     currentL2Tunnel.l2TunnelPolicy().cP1(),
                     currentL2Tunnel.l2TunnelPolicy().cP2(),
@@ -158,8 +183,18 @@
             if (result != SUCCESS) {
                 continue;
             }
+            // We terminate the tunnel
+            result = deployPseudoWireTerm(
+                    currentL2Tunnel.l2Tunnel(),
+                    currentL2Tunnel.l2TunnelPolicy().cP2(),
+                    currentL2Tunnel.l2TunnelPolicy().cP2OuterTag(),
+                    FWD
+            );
+            if (result != SUCCESS) {
+                continue;
+            }
             // We establish the reverse tunnel.
-            result = deployPseudoWire(
+            result = deployPseudoWireInit(
                     currentL2Tunnel.l2Tunnel(),
                     currentL2Tunnel.l2TunnelPolicy().cP2(),
                     currentL2Tunnel.l2TunnelPolicy().cP1(),
@@ -168,18 +203,27 @@
             if (result != SUCCESS) {
                 continue;
             }
-            deployPolicy(
+            result = deployPolicy(
                     l2TunnelId,
                     currentL2Tunnel.l2TunnelPolicy().cP2(),
                     currentL2Tunnel.l2TunnelPolicy().cP2InnerTag(),
                     currentL2Tunnel.l2TunnelPolicy().cP2OuterTag(),
                     result.nextId
             );
+            if (result != SUCCESS) {
+                continue;
+            }
+            deployPseudoWireTerm(
+                    currentL2Tunnel.l2Tunnel(),
+                    currentL2Tunnel.l2TunnelPolicy().cP1(),
+                    currentL2Tunnel.l2TunnelPolicy().cP1OuterTag(),
+                    REV
+            );
         }
     }
 
     /**
-     * Processes Pwaas Config updated event.
+     * Processes PWaaS Config updated event.
      *
      * @param event network config updated event
      */
@@ -209,12 +253,10 @@
                 .collect(Collectors.toSet());
         deploy(pwToAdd);
         // The pseudo wires to update.
-        updPws.forEach(tunnelId -> {
-            updatePw(
-                    prevConfig.getPwDescription(tunnelId),
-                    config.getPwDescription(tunnelId)
-            );
-        });
+        updPws.forEach(tunnelId -> updatePw(
+                prevConfig.getPwDescription(tunnelId),
+                config.getPwDescription(tunnelId))
+        );
     }
 
     /**
@@ -225,88 +267,91 @@
      */
     private void updatePw(DefaultL2TunnelDescription oldPw,
                           DefaultL2TunnelDescription newPw) {
-
         long tunnelId = oldPw.l2Tunnel().tunnelId();
-        String fwdKey = generateKey(tunnelId, FWD);
-        String revKey = generateKey(tunnelId, REV);
-        Result result;
-        NextObjective fwdNextObjective;
-        NextObjective revNextObjective;
         // The async tasks to orchestrate the next and
         // forwarding update.
-        CompletableFuture<ObjectiveError> revPolicyFuture = new CompletableFuture<>();
         CompletableFuture<ObjectiveError> fwdInitNextFuture = new CompletableFuture<>();
         CompletableFuture<ObjectiveError> revInitNextFuture = new CompletableFuture<>();
-        CompletableFuture<ObjectiveError> newPwFuture = new CompletableFuture<>();
+        CompletableFuture<ObjectiveError> fwdTermNextFuture = new CompletableFuture<>();
+        CompletableFuture<ObjectiveError> revTermNextFuture = new CompletableFuture<>();
+        CompletableFuture<ObjectiveError> fwdPwFuture = new CompletableFuture<>();
+        CompletableFuture<ObjectiveError> revPwFuture = new CompletableFuture<>();
 
-        result = verifyPseudoWire(newPw);
+
+        Result result = verifyPseudoWire(newPw);
         if (result != SUCCESS) {
             return;
         }
-        if (!l2InitiationNextObjStore.containsKey(fwdKey)) {
-            log.warn("NextObj for {} does not exist in the store.", fwdKey);
-            return;
-        }
-        fwdNextObjective = l2InitiationNextObjStore.get(fwdKey).value();
-        if (!l2InitiationNextObjStore.containsKey(revKey)) {
-            log.warn("NextObj for {} does not exist in the store.", revKey);
-            return;
-        }
         // First we remove both policy.
-        revNextObjective = l2InitiationNextObjStore.get(revKey).value();
         log.debug("Start deleting fwd policy for {}", tunnelId);
         deletePolicy(
                 tunnelId,
                 oldPw.l2TunnelPolicy().cP1(),
                 oldPw.l2TunnelPolicy().cP1InnerTag(),
                 oldPw.l2TunnelPolicy().cP1OuterTag(),
-                fwdNextObjective.id(),
-                revPolicyFuture
+                fwdInitNextFuture,
+                FWD
         );
-        revPolicyFuture.thenAcceptAsync(status -> {
-            if (status == null) {
-                log.debug("Fwd policy removed. Now remove rev policy for {}", tunnelId);
-                deletePolicy(
-                        tunnelId,
-                        oldPw.l2TunnelPolicy().cP2(),
-                        oldPw.l2TunnelPolicy().cP2InnerTag(),
-                        oldPw.l2TunnelPolicy().cP2OuterTag(),
-                        revNextObjective.id(),
-                        fwdInitNextFuture
-                );
-            }
-        });
+        log.debug("Start deleting rev policy for {}", tunnelId);
+        deletePolicy(
+                tunnelId,
+                oldPw.l2TunnelPolicy().cP2(),
+                oldPw.l2TunnelPolicy().cP2InnerTag(),
+                oldPw.l2TunnelPolicy().cP2OuterTag(),
+                revInitNextFuture,
+                REV
+        );
         // Finally we remove both the tunnels.
         fwdInitNextFuture.thenAcceptAsync(status -> {
             if (status == null) {
-                log.debug("Rev policy removed. Now remove fwd pw for {}", tunnelId);
-                tearDownPseudoWire(
-                        fwdKey,
-                        fwdNextObjective,
+                log.debug("Fwd policy removed. Now remove fwd {} for {}", INITIATION, tunnelId);
+                tearDownPseudoWireInit(
+                        tunnelId,
                         oldPw.l2TunnelPolicy().cP1(),
-                        oldPw.l2TunnelPolicy().cP2(),
-                        revInitNextFuture
+                        fwdTermNextFuture,
+                        FWD
                 );
             }
         });
         revInitNextFuture.thenAcceptAsync(status -> {
            if (status == null) {
-               log.debug("Fwd tunnel removed. Now remove rev pw for {}", tunnelId);
-               tearDownPseudoWire(
-                       revKey,
-                       revNextObjective,
+               log.debug("Rev policy removed. Now remove rev {} for {}", INITIATION, tunnelId);
+               tearDownPseudoWireInit(
+                       tunnelId,
                        oldPw.l2TunnelPolicy().cP2(),
-                       oldPw.l2TunnelPolicy().cP1(),
-                       newPwFuture
+                       revTermNextFuture,
+                       REV
                );
 
            }
         });
+        fwdTermNextFuture.thenAcceptAsync(status -> {
+            if (status == null) {
+                log.debug("Fwd {} removed. Now remove fwd {} for {}", INITIATION, TERMINATION, tunnelId);
+                tearDownPseudoWireTerm(
+                        oldPw.l2Tunnel(),
+                        oldPw.l2TunnelPolicy().cP2(),
+                        fwdPwFuture,
+                        FWD
+                );
+            }
+        });
+        revTermNextFuture.thenAcceptAsync(status -> {
+            if (status == null) {
+                log.debug("Rev {} removed. Now remove rev {} for {}", INITIATION, TERMINATION, tunnelId);
+                tearDownPseudoWireTerm(
+                        oldPw.l2Tunnel(),
+                        oldPw.l2TunnelPolicy().cP1(),
+                        revPwFuture,
+                        REV
+                );
+            }
+        });
         // At the end we install the new pw.
-        newPwFuture.thenAcceptAsync(status -> {
+        fwdPwFuture.thenAcceptAsync(status -> {
             if (status == null) {
                 log.debug("Deploying new fwd pw for {}", tunnelId);
-                Result lamdaResult = deployPseudoWire(
+                Result lamdaResult = deployPseudoWireInit(
                         newPw.l2Tunnel(),
                         newPw.l2TunnelPolicy().cP1(),
                         newPw.l2TunnelPolicy().cP2(),
@@ -322,8 +367,22 @@
                         newPw.l2TunnelPolicy().cP1OuterTag(),
                         lamdaResult.nextId
                 );
+                if (lamdaResult != SUCCESS) {
+                    return;
+                }
+                deployPseudoWireTerm(
+                        newPw.l2Tunnel(),
+                        newPw.l2TunnelPolicy().cP2(),
+                        newPw.l2TunnelPolicy().cP2OuterTag(),
+                        FWD
+                );
+
+            }
+        });
+        revPwFuture.thenAcceptAsync(status -> {
+            if (status == null) {
                 log.debug("Deploying new rev pw for {}", tunnelId);
-                lamdaResult = deployPseudoWire(
+                Result lamdaResult = deployPseudoWireInit(
                         newPw.l2Tunnel(),
                         newPw.l2TunnelPolicy().cP2(),
                         newPw.l2TunnelPolicy().cP1(),
@@ -339,6 +398,15 @@
                         newPw.l2TunnelPolicy().cP2OuterTag(),
                         lamdaResult.nextId
                 );
+                if (lamdaResult != SUCCESS) {
+                    return;
+                }
+                deployPseudoWireTerm(
+                        newPw.l2Tunnel(),
+                        newPw.l2TunnelPolicy().cP1(),
+                        newPw.l2TunnelPolicy().cP1OuterTag(),
+                        REV
+                );
             }
         });
     }
@@ -366,11 +434,8 @@
      */
     private void tearDown(Set<DefaultL2TunnelDescription> pwToRemove) {
         Result result;
-        int nextId;
-        NextObjective nextObjective;
         long l2TunnelId;
-        // We remove all the pw in the configuration
-        // file.
+        // We remove all the pw in the configuration file.
         for (DefaultL2TunnelDescription currentL2Tunnel : pwToRemove) {
             l2TunnelId = currentL2Tunnel.l2Tunnel().tunnelId();
             if (l2TunnelId == 0) {
@@ -381,52 +446,48 @@
             if (result != SUCCESS) {
                 continue;
             }
-            String key = generateKey(l2TunnelId, FWD);
-            if (!l2InitiationNextObjStore.containsKey(key)) {
-                log.warn("NextObj for {} does not exist in the store.", key);
-                continue;
-            }
-            nextObjective = l2InitiationNextObjStore.get(key).value();
-            nextId = nextObjective.id();
             // First all we have to delete the policy.
             deletePolicy(
                     l2TunnelId,
                     currentL2Tunnel.l2TunnelPolicy().cP1(),
                     currentL2Tunnel.l2TunnelPolicy().cP1InnerTag(),
                     currentL2Tunnel.l2TunnelPolicy().cP1OuterTag(),
-                    nextId,
-                    null
+                    null,
+                    FWD
             );
             // Finally we will tear down the pseudo wire.
-            tearDownPseudoWire(
-                    key,
-                    nextObjective,
+            tearDownPseudoWireInit(
+                    l2TunnelId,
                     currentL2Tunnel.l2TunnelPolicy().cP1(),
+                    null,
+                    FWD
+            );
+            tearDownPseudoWireTerm(
+                    currentL2Tunnel.l2Tunnel(),
                     currentL2Tunnel.l2TunnelPolicy().cP2(),
-                    null
+                    null,
+                    FWD
             );
             // We do the same operations on the reverse side.
-            key = generateKey(l2TunnelId, REV);
-            if (!l2InitiationNextObjStore.containsKey(key)) {
-                log.warn("NextObj for {} does not exist in the store.", key);
-                continue;
-            }
-            nextObjective = l2InitiationNextObjStore.get(key).value();
-            nextId = nextObjective.id();
             deletePolicy(
                     l2TunnelId,
                     currentL2Tunnel.l2TunnelPolicy().cP2(),
                     currentL2Tunnel.l2TunnelPolicy().cP2InnerTag(),
                     currentL2Tunnel.l2TunnelPolicy().cP2OuterTag(),
-                    nextId,
-                    null
+                    null,
+                    REV
             );
-            tearDownPseudoWire(
-                    key,
-                    nextObjective,
+            tearDownPseudoWireInit(
+                    l2TunnelId,
                     currentL2Tunnel.l2TunnelPolicy().cP2(),
+                    null,
+                    REV
+            );
+            tearDownPseudoWireTerm(
+                    currentL2Tunnel.l2Tunnel(),
                     currentL2Tunnel.l2TunnelPolicy().cP1(),
-                    null
+                    null,
+                    REV
             );
         }
 
@@ -444,7 +505,7 @@
         DefaultL2TunnelPolicy l2TunnelPolicy = l2TunnelDescription.l2TunnelPolicy();
         result = verifyTunnel(l2Tunnel);
         if (result != SUCCESS) {
-            log.warn("Tunnel {} did not pass the validation", l2Tunnel.tunnelId());
+            log.warn("Tunnel {}: did not pass the validation", l2Tunnel.tunnelId());
             return result;
         }
         result = verifyPolicy(
@@ -455,7 +516,7 @@
                 l2TunnelPolicy.cP2OuterTag()
         );
         if (result != SUCCESS) {
-            log.warn("Policy for tunnel {} did not pass the validation", l2Tunnel.tunnelId());
+            log.warn("Policy for tunnel {}: did not pass the validation", l2Tunnel.tunnelId());
             return result;
         }
 
@@ -463,8 +524,6 @@
     }
 
     /**
-     * TODO Operation on the policies store.
-     *
      * Handles the policy establishment which consists in
      * create the filtering and forwarding objectives related
      * to the initiation and termination.
@@ -474,27 +533,21 @@
      * @param ingressInner the ingress inner tag
      * @param ingressOuter the ingress outer tag
      * @param nextId the next objective id
-     * @return SUCCESS if the policy has been deployed.
-     * Otherwise an error according to the failure
-     * scenario.
+     * @return the result of the operation
      */
     private Result deployPolicy(long tunnelId,
                                 ConnectPoint ingress,
                                 VlanId ingressInner,
                                 VlanId ingressOuter,
                                 int nextId) {
-
-        ForwardingObjective.Builder fwdBuilder;
-        FilteringObjective.Builder filtBuilder;
-        List<Objective> objectives = Lists.newArrayList();
         if (!srManager.mastershipService.isLocalMaster(ingress.deviceId())) {
-            log.info("Abort creation of policy for L2 tunnel {}: {}", tunnelId, NOT_MASTER);
+            log.info("Abort creation of policy for tunnel {}: I am not the master", tunnelId);
             return SUCCESS;
         }
+        List<Objective> objectives = Lists.newArrayList();
         // We create the forwarding objective for supporting
         // the l2 tunnel.
-        fwdBuilder = createFwdObjective(
-                INITIATION,
+        ForwardingObjective.Builder fwdBuilder = createInitFwdObjective(
                 tunnelId,
                 ingress.port(),
                 nextId
@@ -508,7 +561,7 @@
         objectives.add(fwdBuilder.add(context));
         // We create the filtering objective to define the
         // permit traffic in the switch
-        filtBuilder = createFiltObjective(
+        FilteringObjective.Builder filtBuilder = createFiltObjective(
                 ingress.port(),
                 ingressInner,
                 ingressOuter
@@ -528,7 +581,7 @@
         for (Objective objective : objectives) {
             if (objective instanceof ForwardingObjective) {
                 srManager.flowObjectiveService.forward(ingress.deviceId(), (ForwardingObjective) objective);
-                log.debug("Creating new FwdObj for NextObj with id={} for tunnel {}", nextId, tunnelId);
+                log.debug("Creating new FwdObj for initiation NextObj with id={} for tunnel {}", nextId, tunnelId);
             } else {
                 srManager.flowObjectiveService.filter(ingress.deviceId(), (FilteringObjective) objective);
                 log.debug("Creating new FiltObj for tunnel {}", tunnelId);
@@ -572,46 +625,36 @@
     }
 
     /**
-     * TODO Operation on the policies store.
-     *
      * Handles the tunnel establishment which consists in
-     * create the next objectives related to the initiation
-     * and termination.
+     * create the next objectives related to the initiation.
      *
      * @param l2Tunnel the tunnel to deploy
      * @param ingress the ingress connect point
      * @param egress the egress connect point
      * @param direction the direction of the pw
-     * @return SUCCESS if the tunnel has been created.
-     * Otherwise an error according to the failure
-     * scenario
+     * @return the result of the operation
      */
-    private Result deployPseudoWire(DefaultL2Tunnel l2Tunnel,
-                                    ConnectPoint ingress,
-                                    ConnectPoint egress,
-                                    String direction) {
-        Link nextHop;
-        NextObjective.Builder nextObjectiveBuilder;
-        NextObjective nextObjective;
-        int nextId;
-        Result result;
+    private Result deployPseudoWireInit(DefaultL2Tunnel l2Tunnel,
+                                        ConnectPoint ingress,
+                                        ConnectPoint egress,
+                                        Direction direction) {
         if (!srManager.mastershipService.isLocalMaster(ingress.deviceId())) {
-            log.info("Abort initiation creation of L2 tunnel {}: {}",
-                     l2Tunnel.tunnelId(), NOT_MASTER);
+            log.info("Abort initiation of tunnel {}: I am not the master", l2Tunnel.tunnelId());
             return SUCCESS;
         }
         // We need at least a path between ingress and egress.
-        nextHop = getNextHop(ingress, egress);
+        Link nextHop = getNextHop(ingress, egress);
         if (nextHop == null) {
             log.warn("No path between ingress and egress");
             return WRONG_PARAMETERS;
         }
         // We create the next objective without the metadata
         // context and id. We check if it already exists in the
-        // store. If not we store as it is in the store ?
-        nextObjectiveBuilder = createNextObjective(
+        // store. If not we store as it is in the store.
+        NextObjective.Builder nextObjectiveBuilder = createNextObjective(
                 INITIATION,
-                nextHop,
+                nextHop.src(),
+                nextHop.dst(),
                 l2Tunnel,
                 egress.deviceId()
         );
@@ -625,7 +668,7 @@
                 .matchTunnelId(l2Tunnel.tunnelId())
                 .build();
         nextObjectiveBuilder.withMeta(metadata);
-        nextId = srManager.flowObjectiveService.allocateNextId();
+        int nextId = srManager.flowObjectiveService.allocateNextId();
         if (nextId < 0) {
             log.warn("Not able to allocate a next id for initiation");
             return INTERNAL_ERROR;
@@ -640,18 +683,95 @@
                 (objective, error)
                         -> log.warn("Failed to populate Initiation l2 tunnel rule for {}: {}",
                                     l2Tunnel.tunnelId(), error));
-        nextObjective = nextObjectiveBuilder.add(context);
+        NextObjective nextObjective = nextObjectiveBuilder.add(context);
         srManager.flowObjectiveService.next(ingress.deviceId(), nextObjective);
         log.debug("Initiation next objective for {} not found. Creating new NextObj with id={}",
                   l2Tunnel.tunnelId(),
                   nextObjective.id()
         );
-        result = SUCCESS;
+        Result result = SUCCESS;
         result.nextId = nextObjective.id();
         return result;
     }
 
     /**
+     * Handles the tunnel termination, which consists in the creation
+     * of a forwarding objective and a next objective.
+     *
+     * @param l2Tunnel the tunnel to terminate
+     * @param egress the egress point
+     * @param egressVlan the expected vlan at egress
+     * @param direction the direction
+     * @return the result of the operation
+     */
+    private Result deployPseudoWireTerm(DefaultL2Tunnel l2Tunnel,
+                                        ConnectPoint egress,
+                                        VlanId egressVlan,
+                                        Direction direction) {
+        // We create the group relative to the termination.
+        // It's fine to abort the termination if we are
+        // not the master.
+        if (!srManager.mastershipService.isLocalMaster(egress.deviceId())) {
+            log.info("Abort termination of tunnel {}: I am not the master", l2Tunnel.tunnelId());
+            return SUCCESS;
+        }
+        NextObjective.Builder nextObjectiveBuilder = createNextObjective(
+                TERMINATION,
+                egress,
+                null,
+                null,
+                egress.deviceId()
+        );
+        if (nextObjectiveBuilder == null) {
+            return INTERNAL_ERROR;
+        }
+        TrafficSelector metadata = DefaultTrafficSelector
+                .builder()
+                .matchVlanId(egressVlan)
+                .build();
+        nextObjectiveBuilder.withMeta(metadata);
+        int nextId = srManager.flowObjectiveService.allocateNextId();
+        if (nextId < 0) {
+            log.warn("Not able to allocate a next id for initiation");
+            return INTERNAL_ERROR;
+        }
+        nextObjectiveBuilder.withId(nextId);
+        String key = generateKey(l2Tunnel.tunnelId(), direction);
+        l2TerminationNextObjStore.put(key, nextObjectiveBuilder.add());
+        ObjectiveContext context = new DefaultObjectiveContext(
+                (objective)
+                        -> log.debug("Termination l2 tunnel rule for {} populated",
+                                     l2Tunnel.tunnelId()),
+                (objective, error)
+                        -> log.warn("Failed to populate termination l2 tunnel rule for {}: {}",
+                                    l2Tunnel.tunnelId(), error));
+        NextObjective nextObjective = nextObjectiveBuilder.add(context);
+        srManager.flowObjectiveService.next(egress.deviceId(), nextObjective);
+        log.debug("Termination next objective for {} not found. Creating new NextObj with id={}",
+                  l2Tunnel.tunnelId(),
+                  nextObjective.id()
+        );
+        // We create the flow relative to the termination.
+        ForwardingObjective.Builder fwdBuilder = createTermFwdObjective(
+                l2Tunnel.pwLabel(),
+                l2Tunnel.tunnelId(),
+                egress.port(),
+                nextObjective.id()
+        );
+        context = new DefaultObjectiveContext(
+                (objective)
+                        -> log.debug("FwdObj for tunnel termination {} populated",
+                                     l2Tunnel.tunnelId()),
+                (objective, error)
+                        -> log.warn("Failed to populate fwdrObj for tunnel termination {}",
+                                    l2Tunnel.tunnelId(), error));
+        srManager.flowObjectiveService.forward(egress.deviceId(), fwdBuilder.add(context));
+        log.debug("Creating new FwdObj for termination NextObj with id={} for tunnel {}", nextId, l2Tunnel.tunnelId());
+        return SUCCESS;
+
+    }
+
+    /**
      * Helper method to verify if the tunnel is whether or not
      * supported.
      *
@@ -675,7 +795,7 @@
     }
 
     /**
-     * Create the filtering objective according to a given policy.
+     * Creates the filtering objective according to a given policy.
      *
      * @param inPort the in port
      * @param innerTag the inner vlan tag
@@ -695,34 +815,69 @@
     }
 
     /**
-     * Create the forwarding objective according to a given pipeline.
+     * Creates the forwarding objective for the termination.
      *
-     * @param pipeline the pipeline
+     * @param pwLabel the pseudo wire label
      * @param tunnelId the tunnel id
+     * @param egressPort the egress port
      * @param nextId the next step
-     * @return the forwarding objective to support the pipeline.
+     * @return the forwarding objective to support the termination
      */
-    private ForwardingObjective.Builder createFwdObjective(Pipeline pipeline,
-                                                           long tunnelId,
-                                                           PortNumber inPort,
-                                                           int nextId) {
-        ForwardingObjective.Builder fwdBuilder = null;
+    private ForwardingObjective.Builder createTermFwdObjective(MplsLabel pwLabel,
+                                                               long tunnelId,
+                                                               PortNumber egressPort,
+                                                               int nextId) {
         TrafficSelector.Builder trafficSelector = DefaultTrafficSelector
                 .builder();
-        if (pipeline == INITIATION) {
-            // The flow has to match on the mpls logical
-            // port and the tunnel id.
-            trafficSelector.matchTunnelId(tunnelId);
-            trafficSelector.matchInPort(inPort);
-            fwdBuilder = DefaultForwardingObjective.builder()
-                    .fromApp(srManager.appId)
-                    .makePermanent()
-                    .nextStep(nextId)
-                    .withPriority(SegmentRoutingService.DEFAULT_PRIORITY)
-                    .withSelector(trafficSelector.build())
-                    .withFlag(VERSATILE);
-        }
-        return fwdBuilder;
+        TrafficTreatment.Builder trafficTreatment = DefaultTrafficTreatment
+                .builder();
+        // The flow has to match on the pw label and bos
+        trafficSelector.matchEthType(Ethernet.MPLS_UNICAST);
+        trafficSelector.matchMplsLabel(pwLabel);
+        trafficSelector.matchMplsBos(true);
+        // The flow has to decrement ttl, restore ttl in
+        // pop mpls, set tunnel id and port.
+        trafficTreatment.decMplsTtl();
+        trafficTreatment.copyTtlIn();
+        trafficTreatment.popMpls();
+        trafficTreatment.setTunnelId(tunnelId);
+        trafficTreatment.setOutput(egressPort);
+
+        return DefaultForwardingObjective.builder()
+                .fromApp(srManager.appId)
+                .makePermanent()
+                .nextStep(nextId)
+                .withPriority(SegmentRoutingService.DEFAULT_PRIORITY)
+                .withSelector(trafficSelector.build())
+                .withTreatment(trafficTreatment.build())
+                .withFlag(VERSATILE);
+    }
+
+    /**
+     * Creates the forwarding objective for the initiation.
+     *
+     * @param tunnelId the tunnel id
+     * @param inPort the input port
+     * @param nextId the next step
+     * @return the forwarding objective to support the initiation.
+     */
+    private ForwardingObjective.Builder createInitFwdObjective(long tunnelId,
+                                                               PortNumber inPort,
+                                                               int nextId) {
+        TrafficSelector.Builder trafficSelector = DefaultTrafficSelector
+                .builder();
+        // The flow has to match on the mpls logical
+        // port and the tunnel id.
+        trafficSelector.matchTunnelId(tunnelId);
+        trafficSelector.matchInPort(inPort);
+
+        return DefaultForwardingObjective.builder()
+                .fromApp(srManager.appId)
+                .makePermanent()
+                .nextStep(nextId)
+                .withPriority(SegmentRoutingService.DEFAULT_PRIORITY)
+                .withSelector(trafficSelector.build())
+                .withFlag(VERSATILE);
 
     }
 
@@ -733,13 +888,15 @@
      * the same next objective for different tunnels.
      *
      * @param pipeline the pipeline to support
-     * @param nextHop the next hop towards the destination
+     * @param srcCp the source port
+     * @param dstCp the destination port
      * @param l2Tunnel the tunnel to support
      * @param egressId the egress device id
      * @return the next objective to support the pipeline
      */
     private NextObjective.Builder createNextObjective(Pipeline pipeline,
-                                                      Link nextHop,
+                                                      ConnectPoint srcCp,
+                                                      ConnectPoint dstCp,
                                                       DefaultL2Tunnel l2Tunnel,
                                                       DeviceId egressId) {
         NextObjective.Builder nextObjBuilder;
@@ -786,7 +943,7 @@
             try {
                 ingressMac = srManager
                         .deviceConfiguration
-                        .getDeviceMac(nextHop.src().deviceId());
+                        .getDeviceMac(srcCp.deviceId());
             } catch (DeviceConfigNotFoundException e) {
                 log.warn("Was not able to find the ingress mac");
                 return null;
@@ -796,20 +953,21 @@
             try {
                 neighborMac = srManager
                         .deviceConfiguration
-                        .getDeviceMac(nextHop.dst().deviceId());
+                        .getDeviceMac(dstCp.deviceId());
             } catch (DeviceConfigNotFoundException e) {
                 log.warn("Was not able to find the neighbor mac");
                 return null;
             }
             treatmentBuilder.setEthDst(neighborMac);
         } else {
+            // We create the next objective which
+            // will be a simple l2 group.
             nextObjBuilder = DefaultNextObjective
                     .builder()
                     .withType(NextObjective.Type.SIMPLE)
                     .fromApp(srManager.appId);
-
         }
-        treatmentBuilder.setOutput(nextHop.src().port());
+        treatmentBuilder.setOutput(srcCp.port());
         nextObjBuilder.addTreatment(treatmentBuilder.build());
         return nextObjBuilder;
     }
@@ -841,32 +999,41 @@
     }
 
     /**
-     * TODO Operation on the store.
      * Deletes a given policy using the parameter supplied.
      *
      * @param tunnelId the tunnel id
      * @param ingress the ingress point
      * @param ingressInner the ingress inner vlan id
      * @param ingressOuter the ingress outer vlan id
-     * @param nextId the next objective id
+     * @param future to perform the async operation
+     * @param direction the direction: forward or reverse
      */
     private void deletePolicy(long tunnelId,
                               ConnectPoint ingress,
                               VlanId ingressInner,
                               VlanId ingressOuter,
-                              int nextId,
-                              CompletableFuture<ObjectiveError> fwdFuture) {
-
-        ForwardingObjective.Builder fwdBuilder;
-        FilteringObjective.Builder filtBuilder;
-        List<Objective> objectives = Lists.newArrayList();
+                              CompletableFuture<ObjectiveError> future,
+                              Direction direction) {
         if (!srManager.mastershipService.isLocalMaster(ingress.deviceId())) {
-            log.info("Abort delete of policy for L2 tunnel {}: {}", tunnelId, NOT_MASTER);
+            log.info("Abort delete of policy for tunnel {}: I am not the master", tunnelId);
+            if (future != null) {
+                future.complete(null);
+            }
             return;
         }
+        String key = generateKey(tunnelId, direction);
+        if (!l2InitiationNextObjStore.containsKey(key)) {
+            log.warn("Abort delete of policy for tunnel {}: next does not exist in the store", tunnelId);
+            if (future != null) {
+                future.complete(null);
+            }
+            return;
+        }
+        NextObjective nextObjective = l2InitiationNextObjStore.get(key).value();
+        int nextId = nextObjective.id();
+        List<Objective> objectives = Lists.newArrayList();
         // We create the forwarding objective.
-        fwdBuilder = createFwdObjective(
-                INITIATION,
+        ForwardingObjective.Builder fwdBuilder = createInitFwdObjective(
                 tunnelId,
                 ingress.port(),
                 nextId
@@ -874,24 +1041,24 @@
         ObjectiveContext context = new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
-                log.debug("Previous FwdObj for policy {} removed", tunnelId);
-                if (fwdFuture != null) {
-                    fwdFuture.complete(null);
+                log.debug("Previous fwdObj for policy {} removed", tunnelId);
+                if (future != null) {
+                    future.complete(null);
                 }
             }
 
             @Override
             public void onError(Objective objective, ObjectiveError error) {
-                log.warn("Failed to remove previous FwdObj for policy {}: {}", tunnelId, error);
-                if (fwdFuture != null) {
-                    fwdFuture.complete(error);
+                log.warn("Failed to remove previous fwdObj for policy {}: {}", tunnelId, error);
+                if (future != null) {
+                    future.complete(error);
                 }
             }
         };
         objectives.add(fwdBuilder.remove(context));
         // We create the filtering objective to define the
         // permit traffic in the switch
-        filtBuilder = createFiltObjective(
+        FilteringObjective.Builder filtBuilder = createFiltObjective(
                 ingress.port(),
                 ingressInner,
                 ingressOuter
@@ -916,55 +1083,130 @@
     }
 
     /**
-     * TODO Operation on the store.
-     * Deletes a given pseudo wire using the parameter supplied.
+     * Deletes the pseudo wire initiation.
      *
-     * @param key the key of the store
-     * @param nextObjective the next objective representing the pw
+     * @param l2TunnelId the tunnel id
      * @param ingress the ingress connect point
-     * @param egress the egress connect point
+     * @param future to perform an async operation
+     * @param direction the direction: reverse of forward
      */
-    private void tearDownPseudoWire(String key,
-                                    NextObjective nextObjective,
-                                    ConnectPoint ingress,
-                                    ConnectPoint egress,
-                                    CompletableFuture<ObjectiveError> nextFutureForInit) {
+    private void tearDownPseudoWireInit(long l2TunnelId,
+                                        ConnectPoint ingress,
+                                        CompletableFuture<ObjectiveError> future,
+                                        Direction direction) {
+        String key = generateKey(l2TunnelId, direction);
         if (!srManager.mastershipService.isLocalMaster(ingress.deviceId())) {
-            log.info("Abort delete of {} for {}: {}", INITIATION, key, NOT_MASTER);
+            log.info("Abort delete of {} for {}: I am not the master", INITIATION, key);
+            if (future != null) {
+                future.complete(null);
+            }
             return;
         }
+        if (!l2InitiationNextObjStore.containsKey(key)) {
+            log.info("Abort delete of {} for {}: next does not exist in the store", INITIATION, key);
+            if (future != null) {
+                future.complete(null);
+            }
+            return;
+        }
+        NextObjective nextObjective = l2InitiationNextObjStore.get(key).value();
         ObjectiveContext context = new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
-                log.debug("Previous {} NextObj for {} removed", INITIATION, key);
-                if (nextFutureForInit != null) {
-                    nextFutureForInit.complete(null);
+                log.debug("Previous {} next for {} removed", INITIATION, key);
+                if (future != null) {
+                    future.complete(null);
                 }
             }
 
             @Override
             public void onError(Objective objective, ObjectiveError error) {
-                log.warn("Failed to remove previous {} NextObj for {}: {}", INITIATION, key, error);
-                if (nextFutureForInit != null) {
-                    nextFutureForInit.complete(error);
+                log.warn("Failed to remove previous {} next for {}: {}", INITIATION, key, error);
+                if (future != null) {
+                    future.complete(error);
                 }
             }
         };
-        srManager.flowObjectiveService.next(
-                ingress.deviceId(),
-                (NextObjective) nextObjective.copy().remove(context)
-        );
+        srManager.flowObjectiveService
+                .next(ingress.deviceId(), (NextObjective) nextObjective.copy().remove(context));
         l2InitiationNextObjStore.remove(key);
     }
 
     /**
+     * Deletes the pseudo wire termination.
+     *
+     * @param l2Tunnel the tunnel
+     * @param egress the egress connect point
+     * @param future the async task
+     * @param direction the direction of the tunnel
+     */
+    private void tearDownPseudoWireTerm(DefaultL2Tunnel l2Tunnel,
+                                        ConnectPoint egress,
+                                        CompletableFuture<ObjectiveError> future,
+                                        Direction direction) {
+        /*
+         * We verify the mastership for the termination.
+         */
+        String key = generateKey(l2Tunnel.tunnelId(), direction);
+        if (!srManager.mastershipService.isLocalMaster(egress.deviceId())) {
+            log.info("Abort delete of {} for {}: I am not the master", TERMINATION, key);
+            if (future != null) {
+                future.complete(null);
+            }
+            return;
+        }
+        if (!l2TerminationNextObjStore.containsKey(key)) {
+            log.info("Abort delete of {} for {}: next does not exist in the store", TERMINATION, key);
+            if (future != null) {
+                future.complete(null);
+            }
+            return;
+        }
+        NextObjective nextObjective = l2TerminationNextObjStore.get(key).value();
+        ForwardingObjective.Builder fwdBuilder = createTermFwdObjective(
+                l2Tunnel.pwLabel(),
+                l2Tunnel.tunnelId(),
+                egress.port(),
+                nextObjective.id()
+        );
+        ObjectiveContext context = new DefaultObjectiveContext(
+                (objective)
+                        -> log.debug("FwdObj for {} {} removed", TERMINATION, l2Tunnel.tunnelId()),
+                (objective, error)
+                        -> log.warn("Failed to remove fwdObj for {} {}", TERMINATION, l2Tunnel.tunnelId(),
+                                    error));
+        srManager.flowObjectiveService.forward(egress.deviceId(), fwdBuilder.remove(context));
+
+        context = new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                log.debug("Previous {} next for {} removed", TERMINATION, key);
+                if (future != null) {
+                    future.complete(null);
+                }
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                log.warn("Failed to remove previous {} next for {}: {}", TERMINATION, key, error);
+                if (future != null) {
+                    future.complete(error);
+                }
+            }
+        };
+        srManager.flowObjectiveService
+                .next(egress.deviceId(), (NextObjective) nextObjective.copy().remove(context));
+        l2TerminationNextObjStore.remove(key);
+    }
+
+    /**
      * Utilities to generate pw key.
      *
      * @param tunnelId the tunnel id
      * @param direction the direction of the pw
      * @return the key of the store
      */
-    private String generateKey(long tunnelId, String direction) {
+    private String generateKey(long tunnelId, Direction direction) {
         return String.format("%s-%s", tunnelId, direction);
     }
 
@@ -979,7 +1221,7 @@
         /**
          * The termination pipeline.
          */
-        TERMINATION;
+        TERMINATION
     }
 
     /**
@@ -1012,7 +1254,7 @@
         private final String description;
         private int nextId;
 
-        private Result(int code, String description) {
+        Result(int code, String description) {
             this.code = code;
             this.description = description;
         }
@@ -1021,14 +1263,24 @@
             return description;
         }
 
-        public int getCode() {
-            return code;
-        }
-
         @Override
         public String toString() {
             return code + ": " + description;
         }
     }
 
+    /**
+     * Enum helper for handling the direction of the pw.
+     */
+    public enum Direction {
+        /**
+         * The forward direction of the pseudo wire.
+         */
+        FWD,
+        /**
+         * The reverse direction of the pseudo wire.
+         */
+        REV;
+    }
+
 }