Various pseudowire fixes and improvements.

- Co-ordination when creating - removing pseudowires from
  different instances with a use of a DistributedLock.
- Fixed REST API To return json with specific error for
  the single pw instantiation.
- Fixed REST API to return specific error also for pseudowires
  that could not be decoded.
- Minor bug fix to return appropriate error when instantiating
  a pw from the command line that could not be decoded.
- Fixed bug when creating spine-leaf-leaf pseudowire where we observed flows in pending state.
- Improved logging.

Change-Id: I60dd0ebf8af63ca74d18cfe4801d01846641fb7b
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/pwaas/DefaultL2TunnelHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/pwaas/DefaultL2TunnelHandler.java
index c736614..e40dcb8 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/pwaas/DefaultL2TunnelHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/pwaas/DefaultL2TunnelHandler.java
@@ -52,8 +52,10 @@
 import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.DistributedLock;
 import org.onosproject.store.service.DistributedSet;
 import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageException;
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,10 +76,11 @@
 import static org.onosproject.segmentrouting.pwaas.PwaasUtil.*;
 
 /**
- * Handles pwaas related events.
+ * Handler for pseudowire management.
  */
 public class DefaultL2TunnelHandler implements L2TunnelHandler {
 
+    private static final String LOCK_NAME = "l2-tunnel-handler-lock";
     private static final Logger log = LoggerFactory.getLogger(DefaultL2TunnelHandler.class);
 
     private final SegmentRoutingManager srManager;
@@ -118,6 +121,13 @@
     private final DistributedSet<VlanId> vlanStore;
 
     /**
+     * Lock used when creating or removing pseudowires.
+     */
+    private final DistributedLock pwLock;
+
+    private static final long LOCK_TIMEOUT = 2000;
+
+    /**
      * Used for determining transport vlans for leaf-spine.
      */
     private short transportVlanUpper = 4093, transportVlanLower = 3500;
@@ -188,6 +198,11 @@
                                 .build()))
                 .build()
                 .asDistributedSet();
+
+        pwLock = srManager.storageService.lockBuilder()
+                .withName(LOCK_NAME)
+                .build()
+                .asLock(LOCK_TIMEOUT);
     }
 
     /**
@@ -281,10 +296,7 @@
     private Result manageIntermediateFiltering(L2TunnelDescription pw, boolean leafSpinePw) {
 
         // only leaf-spine-spine should need intermediate rules for now
-        if (!leafSpinePw) {
-            return Result.SUCCESS;
-        }
-        if (pw.l2Tunnel().pathUsed().size() != 2) {
+        if (!leafSpinePw || (pw.l2Tunnel().pathUsed().size() != 2)) {
             return Result.SUCCESS;
         }
 
@@ -292,7 +304,7 @@
         DeviceId intermediateSpineId = pw.l2Tunnel().pathUsed().get(0).dst().deviceId();
         L2Tunnel l2Tunnel = pw.l2Tunnel();
 
-        log.info("Installing intermediate filtering rules for spine {} , for pseudowire {}",
+        log.debug("Installing intermediate filtering rules for spine {} , for pseudowire {}",
                  intermediateSpineId, pw.l2Tunnel().tunnelId());
 
         MacAddress dstMac;
@@ -394,7 +406,7 @@
 
         if (!spinePw) {
 
-            log.info("Untagged transport with internal vlan {} for pseudowire!", UNTAGGED_TRANSPORT_VLAN);
+            log.debug("Untagged transport with internal vlan {} for pseudowire!", UNTAGGED_TRANSPORT_VLAN);
             return UNTAGGED_TRANSPORT_VLAN;
         } else {
             for (short i = transportVlanUpper; i > transportVlanLower; i--) {
@@ -403,12 +415,12 @@
                 if (!vlanStore.contains(vlanToUse)) {
 
                     vlanStore.add(vlanToUse);
-                    log.info("Transport vlan {} for pseudowire!", vlanToUse);
+                    log.debug("Transport vlan {} for pseudowire!", vlanToUse);
                     return vlanToUse;
                 }
             }
 
-            log.info("No available transport vlan found, pseudowire traffic will be carried untagged " +
+            log.warn("No available transport vlan found, pseudowire traffic will be carried untagged " +
                              "with internal vlan {}!", UNTAGGED_TRANSPORT_VLAN);
             return UNTAGGED_TRANSPORT_VLAN;
         }
@@ -455,6 +467,7 @@
      */
     private boolean isValidPath(List<Link> path, boolean leafSpinePw) {
 
+        log.debug("Checking path validity for pseudowire.");
         List<DeviceId> devices = getDevicesOnPath(path);
         if (devices.size() < 2) {
             log.error("Path size for pseudowire should be greater than 1!");
@@ -528,196 +541,202 @@
      */
     public Result deployPseudowire(L2TunnelDescription pw) {
 
-        Result result;
-        long l2TunnelId;
-
-        log.debug("Pseudowire with {} deployment started, check log for any errors in this process!",
-                  pw.l2Tunnel().tunnelId());
-
-        l2TunnelId = pw.l2Tunnel().tunnelId();
-        // The tunnel id cannot be 0.
-        if (l2TunnelId == 0) {
-            log.warn("Tunnel id id must be > 0");
-            return Result.WRONG_PARAMETERS
-                    .appendError("Tunnel id id must be > 0");
-        }
-
-        result = verifyGlobalValidity(pw);
-        if (result != SUCCESS) {
-            log.error("Global validity for pseudowire {} is wrong!", l2TunnelId);
-            return result;
-        }
-
-        // leafSpinePw determines if this is a leaf-leaf
-        // or leaf-spine pseudowire
-        boolean leafSpinePw;
-        ConnectPoint cp1 = pw.l2TunnelPolicy().cP1();
-        ConnectPoint cp2 = pw.l2TunnelPolicy().cP2();
         try {
-            // differentiate between leaf-leaf pseudowires and leaf-spine
-            if (!srManager.deviceConfiguration().isEdgeDevice(cp1.deviceId()) &&
-                    !srManager.deviceConfiguration().isEdgeDevice(cp2.deviceId())) {
-                log.error("Can not deploy pseudowire from spine to spine!");
+            // take the lock
+            pwLock.lock();
+            Result result;
+            long l2TunnelId;
+            log.debug("Pseudowire with {} deployment started, check log for any errors in this process!",
+                      pw.l2Tunnel().tunnelId());
+            l2TunnelId = pw.l2Tunnel().tunnelId();
+            // The tunnel id cannot be 0.
+            if (l2TunnelId == 0) {
+                log.warn("Tunnel id id must be > 0 in {}", l2TunnelId);
                 return Result.WRONG_PARAMETERS
-                        .appendError("Can not deploy pseudowire from spine to spine!");
-            } else if (srManager.deviceConfiguration().isEdgeDevice(cp1.deviceId()) &&
-                    srManager.deviceConfiguration().isEdgeDevice(cp2.deviceId())) {
-                leafSpinePw = false;
-            } else {
-                leafSpinePw = true;
+                        .appendError("Tunnel id id must be > 0");
             }
-        } catch (DeviceConfigNotFoundException e) {
-            log.error("Device for pseudowire connection points does not exist in the configuration");
-            return Result.INTERNAL_ERROR
-                    .appendError("Device for pseudowire connection points does not exist in the configuration");
+
+            result = verifyGlobalValidity(pw);
+            if (result != SUCCESS) {
+                log.error("Global validity for pseudowire {} is wrong!", l2TunnelId);
+                return result;
+            }
+
+            // leafSpinePw determines if this is a leaf-leaf
+            // or leaf-spine pseudowire
+            boolean leafSpinePw;
+            ConnectPoint cp1 = pw.l2TunnelPolicy().cP1();
+            ConnectPoint cp2 = pw.l2TunnelPolicy().cP2();
+            try {
+                // differentiate between leaf-leaf pseudowires and leaf-spine
+                if (!srManager.deviceConfiguration().isEdgeDevice(cp1.deviceId()) &&
+                        !srManager.deviceConfiguration().isEdgeDevice(cp2.deviceId())) {
+                    log.error("Can not deploy pseudowire {} from spine to spine!", l2TunnelId);
+                    return Result.WRONG_PARAMETERS
+                            .appendError("Can not deploy pseudowire from spine to spine!");
+                } else if (srManager.deviceConfiguration().isEdgeDevice(cp1.deviceId()) &&
+                        srManager.deviceConfiguration().isEdgeDevice(cp2.deviceId())) {
+                    leafSpinePw = false;
+                } else {
+                    leafSpinePw = true;
+                }
+            } catch (DeviceConfigNotFoundException e) {
+                log.error("Device for pseudowire {} connection points does not exist in the configuration", l2TunnelId);
+                return Result.INTERNAL_ERROR
+                        .appendError("Device for pseudowire connection points does not exist in the configuration");
+            }
+
+            // reverse the policy in order for leaf switch to be at CP1
+            // this will help us for re-using SRLinkWeigher for computing valid paths
+            L2TunnelPolicy reversedPolicy = reverseL2TunnelPolicy(pw.l2TunnelPolicy());
+            if (reversedPolicy == null) {
+                log.error("Error in reversing policy, device configuration was not found for pseudowire {}.",
+                          l2TunnelId);
+                return INTERNAL_ERROR
+                        .appendError("Device configuration not found when reversing the policy.");
+            }
+            pw.setL2TunnelPolicy(reversedPolicy);
+
+            // get path here, need to use the same for fwd and rev direction
+            List<Link> path = getPath(pw.l2TunnelPolicy().cP1(),
+                                      pw.l2TunnelPolicy().cP2());
+            if (path == null) {
+                log.error("Deploying process : No path between the connection points for pseudowire {}", l2TunnelId);
+                return PATH_NOT_FOUND.appendError("No path between the connection points for pseudowire!");
+            }
+
+            Link fwdNextHop;
+            Link revNextHop;
+            if (!isValidPath(path, leafSpinePw)) {
+                log.error("Deploying process : Path for pseudowire {} is not valid", l2TunnelId);
+                return INTERNAL_ERROR.appendError("Internal error : path for pseudowire is not valid!");
+            }
+            // oneHope flag is used to determine if we need to
+            // install transit mpls rules
+            boolean oneHop = true;
+            if (path.size() > 1) {
+                oneHop = false;
+            }
+
+            fwdNextHop = path.get(0);
+            revNextHop = reverseLink(path.get(path.size() - 1));
+
+            pw.l2Tunnel().setPath(path);
+            pw.l2Tunnel().setTransportVlan(determineTransportVlan(leafSpinePw));
+
+            // next hops for next objectives
+            log.info("Deploying process : Establishing forward direction for pseudowire {}", l2TunnelId);
+
+            VlanId egressVlan = determineEgressVlan(pw.l2TunnelPolicy().cP1OuterTag(),
+                                                    pw.l2TunnelPolicy().cP1InnerTag(),
+                                                    pw.l2TunnelPolicy().cP2OuterTag(),
+                                                    pw.l2TunnelPolicy().cP2InnerTag());
+            result = deployPseudoWireInit(pw.l2Tunnel(),
+                                          pw.l2TunnelPolicy().cP1(),
+                                          pw.l2TunnelPolicy().cP2(),
+                                          FWD,
+                                          fwdNextHop,
+                                          leafSpinePw,
+                                          oneHop,
+                                          egressVlan);
+            if (result != SUCCESS) {
+                log.error("Deploying process : Error in deploying pseudowire {} initiation for CP1", l2TunnelId);
+                return Result.INTERNAL_ERROR.appendError("Error in deploying pseudowire initiation for CP1");
+            }
+
+            result = deployPolicy(l2TunnelId,
+                                  pw.l2TunnelPolicy().cP1(),
+                                  pw.l2TunnelPolicy().cP1InnerTag(),
+                                  pw.l2TunnelPolicy().cP1OuterTag(),
+                                  egressVlan,
+                                  result.getNextId());
+            if (result != SUCCESS) {
+                log.error("Deploying process : Error in deploying pseudowire {} policy for CP1", l2TunnelId);
+                return Result.INTERNAL_ERROR.appendError("Error in deploying pseudowire policy for CP1");
+            }
+
+            PortNumber termPort = pw.l2Tunnel().pathUsed().get(pw.l2Tunnel().pathUsed().size() - 1).dst().port();
+            result = deployPseudoWireTerm(pw.l2Tunnel(),
+                                          pw.l2TunnelPolicy().cP2(),
+                                          egressVlan,
+                                          FWD,
+                                          leafSpinePw,
+                                          oneHop,
+                                          termPort);
+
+            if (result != SUCCESS) {
+                log.error("Deploying process : Error in deploying pseudowire {} termination for CP1", l2TunnelId);
+                return Result.INTERNAL_ERROR.appendError("Error in deploying pseudowire termination for CP1");
+            }
+
+            // We establish the reverse tunnel.
+            log.info("Deploying process : Establishing reverse direction for pseudowire {}", l2TunnelId);
+            egressVlan = determineEgressVlan(pw.l2TunnelPolicy().cP2OuterTag(),
+                                             pw.l2TunnelPolicy().cP2InnerTag(),
+                                             pw.l2TunnelPolicy().cP1OuterTag(),
+                                             pw.l2TunnelPolicy().cP1InnerTag());
+
+            result = deployPseudoWireInit(pw.l2Tunnel(),
+                                          pw.l2TunnelPolicy().cP2(),
+                                          pw.l2TunnelPolicy().cP1(),
+                                          REV,
+                                          revNextHop,
+                                          leafSpinePw,
+                                          oneHop,
+                                          egressVlan);
+            if (result != SUCCESS) {
+                log.error("Deploying process : Error in deploying pseudowire {} initiation for CP2", l2TunnelId);
+                return Result.INTERNAL_ERROR
+                        .appendError("Error in deploying pseudowire initiation for CP2");
+            }
+
+            result = deployPolicy(l2TunnelId,
+                                  pw.l2TunnelPolicy().cP2(),
+                                  pw.l2TunnelPolicy().cP2InnerTag(),
+                                  pw.l2TunnelPolicy().cP2OuterTag(),
+                                  egressVlan,
+                                  result.getNextId());
+            if (result != SUCCESS) {
+                log.error("Deploying process : Error in deploying policy {} for CP2", l2TunnelId);
+                return Result.INTERNAL_ERROR
+                        .appendError("Deploying process : Error in deploying policy for CP2");
+            }
+
+            termPort = pw.l2Tunnel().pathUsed().get(0).src().port();
+            result = deployPseudoWireTerm(pw.l2Tunnel(),
+                                          pw.l2TunnelPolicy().cP1(),
+                                          egressVlan,
+                                          REV,
+                                          leafSpinePw,
+                                          oneHop,
+                                          termPort);
+
+            if (result != SUCCESS) {
+                log.error("Deploying process : Error in deploying pseudowire {} termination for CP2", l2TunnelId);
+                return Result.INTERNAL_ERROR.appendError("Error in deploying pseudowire termination for CP2");
+            }
+
+            result = manageIntermediateFiltering(pw, leafSpinePw);
+            if (result != SUCCESS) {
+                log.error("Deploying process : Error in installing intermediate rules for " +
+                                  "tagged transport for pseudowire {}", l2TunnelId);
+                return Result.INTERNAL_ERROR.appendError("Error in installing intermediate rules for tagged transport");
+            }
+
+            log.info("Deploying process : Updating relevant information for pseudowire {}", l2TunnelId);
+
+            // Populate stores as the final step of the process
+            l2TunnelStore.put(Long.toString(l2TunnelId), pw.l2Tunnel());
+            l2PolicyStore.put(Long.toString(l2TunnelId), pw.l2TunnelPolicy());
+
+            return Result.SUCCESS;
+        } catch (StorageException.Timeout e) {
+            log.error("Can not acquire distributed lock for pseudowire {}!", pw.l2Tunnel().tunnelId());
+            return Result.INTERNAL_ERROR.appendError("Can not acquire distributed lock!");
+        } finally {
+            // release the lock
+            pwLock.unlock();
         }
-
-        // reverse the policy in order for leaf switch to be at CP1
-        // this will help us for re-using SRLinkWeigher for computing valid paths
-        L2TunnelPolicy reversedPolicy = reverseL2TunnelPolicy(pw.l2TunnelPolicy());
-        if (reversedPolicy == null) {
-            log.error("Error in reversing policy, device configuration was not found!");
-            return  INTERNAL_ERROR
-                    .appendError("Device configuration not found when reversing the policy.");
-        }
-        pw.setL2TunnelPolicy(reversedPolicy);
-
-        // get path here, need to use the same for fwd and rev direction
-        List<Link> path = getPath(pw.l2TunnelPolicy().cP1(),
-                                  pw.l2TunnelPolicy().cP2());
-        if (path == null) {
-            log.error("Deploying process : No path between the connection points for pseudowire {}", l2TunnelId);
-            return PATH_NOT_FOUND.appendError("No path between the connection points for pseudowire!");
-        }
-
-        Link fwdNextHop;
-        Link revNextHop;
-        if (!isValidPath(path, leafSpinePw)) {
-            log.error("Deploying process : Path for pseudowire {} is not valid",
-                      l2TunnelId);
-            return INTERNAL_ERROR.appendError("Internal error : path for pseudowire is not valid!");
-        }
-
-        // oneHope flag is used to determine if we need to
-        // install transit mpls rules
-        boolean oneHop = true;
-        if (path.size() > 1) {
-            oneHop = false;
-        }
-
-        fwdNextHop = path.get(0);
-        revNextHop = reverseLink(path.get(path.size() - 1));
-
-        pw.l2Tunnel().setPath(path);
-        pw.l2Tunnel().setTransportVlan(determineTransportVlan(leafSpinePw));
-
-        // next hops for next objectives
-        log.info("Deploying process : Establishing forward direction for pseudowire {}", l2TunnelId);
-
-        VlanId egressVlan = determineEgressVlan(pw.l2TunnelPolicy().cP1OuterTag(),
-                                                pw.l2TunnelPolicy().cP1InnerTag(),
-                                                pw.l2TunnelPolicy().cP2OuterTag(),
-                                                pw.l2TunnelPolicy().cP2InnerTag());
-        // We establish the tunnel.
-        // result.nextId will be used in fwd
-        result = deployPseudoWireInit(pw.l2Tunnel(),
-                                      pw.l2TunnelPolicy().cP1(),
-                                      pw.l2TunnelPolicy().cP2(),
-                                      FWD,
-                                      fwdNextHop,
-                                      leafSpinePw,
-                                      oneHop,
-                                      egressVlan);
-        if (result != SUCCESS) {
-            log.info("Deploying process : Error in deploying pseudowire initiation for CP1");
-            return Result.INTERNAL_ERROR.appendError("Error in deploying pseudowire initiation for CP1");
-        }
-
-        // We create the policy.
-        result = deployPolicy(l2TunnelId,
-                              pw.l2TunnelPolicy().cP1(),
-                              pw.l2TunnelPolicy().cP1InnerTag(),
-                              pw.l2TunnelPolicy().cP1OuterTag(),
-                              egressVlan,
-                              result.getNextId());
-        if (result != SUCCESS) {
-            log.info("Deploying process : Error in deploying pseudowire policy for CP1");
-            return Result.INTERNAL_ERROR.appendError("Error in deploying pseudowire policy for CP1");
-        }
-
-        // We terminate the tunnel
-        result = deployPseudoWireTerm(pw.l2Tunnel(),
-                                       pw.l2TunnelPolicy().cP2(),
-                                       egressVlan,
-                                       FWD,
-                                       leafSpinePw,
-                                       oneHop);
-
-        if (result != SUCCESS) {
-            log.info("Deploying process : Error in deploying pseudowire termination for CP1");
-            return Result.INTERNAL_ERROR.appendError("Error in deploying pseudowire termination for CP1");
-        }
-
-        log.info("Deploying process : Establishing reverse direction for pseudowire {}", l2TunnelId);
-
-        egressVlan = determineEgressVlan(pw.l2TunnelPolicy().cP2OuterTag(),
-                                         pw.l2TunnelPolicy().cP2InnerTag(),
-                                         pw.l2TunnelPolicy().cP1OuterTag(),
-                                         pw.l2TunnelPolicy().cP1InnerTag());
-
-        // We establish the reverse tunnel.
-        result = deployPseudoWireInit(pw.l2Tunnel(),
-                                       pw.l2TunnelPolicy().cP2(),
-                                       pw.l2TunnelPolicy().cP1(),
-                                       REV,
-                                       revNextHop,
-                                       leafSpinePw,
-                                       oneHop,
-                                       egressVlan);
-        if (result != SUCCESS) {
-            log.info("Deploying process : Error in deploying pseudowire initiation for CP2");
-            return Result.INTERNAL_ERROR
-                    .appendError("Error in deploying pseudowire initiation for CP2");
-        }
-
-
-        result = deployPolicy(l2TunnelId,
-                               pw.l2TunnelPolicy().cP2(),
-                               pw.l2TunnelPolicy().cP2InnerTag(),
-                               pw.l2TunnelPolicy().cP2OuterTag(),
-                               egressVlan,
-                               result.getNextId());
-        if (result != SUCCESS) {
-            log.info("Deploying process : Error in deploying policy for CP2");
-            return Result.INTERNAL_ERROR
-                    .appendError("Deploying process : Error in deploying policy for CP2");
-        }
-
-        result = deployPseudoWireTerm(pw.l2Tunnel(),
-                                      pw.l2TunnelPolicy().cP1(),
-                                      egressVlan,
-                                      REV,
-                                      leafSpinePw,
-                                      oneHop);
-
-        if (result != SUCCESS) {
-            log.info("Deploying process : Error in deploying pseudowire termination for CP2");
-            return Result.INTERNAL_ERROR.appendError("Error in deploying pseudowire termination for CP2");
-        }
-
-        result = manageIntermediateFiltering(pw, leafSpinePw);
-        if (result != SUCCESS) {
-            log.info("Deploying process : Error in installing intermediate rules for tagged transport");
-            return Result.INTERNAL_ERROR.appendError("Error in installing intermediate rules for tagged transport");
-        }
-
-        log.info("Deploying process : Updating relevant information for pseudowire {}", l2TunnelId);
-
-        // Populate stores as the final step of the process
-        l2TunnelStore.put(Long.toString(l2TunnelId), pw.l2Tunnel());
-        l2PolicyStore.put(Long.toString(l2TunnelId), pw.l2TunnelPolicy());
-
-        return Result.SUCCESS;
     }
 
     @Override
@@ -760,7 +779,7 @@
         CompletableFuture<ObjectiveError> revTermNextFuture = new CompletableFuture<>();
 
         if (l2TunnelId == 0) {
-            log.warn("Removal process : Tunnel id cannot be 0");
+            log.error("Removal process : Tunnel id cannot be 0");
             return Result.WRONG_PARAMETERS.appendError("Pseudowire id can not be 0.");
         }
 
@@ -825,10 +844,13 @@
 
             fwdTermNextFuture.thenAcceptAsync(status -> {
                 if (status == null) {
+                    PortNumber termPort = pwToRemove.l2Tunnel().pathUsed()
+                            .get(pwToRemove.l2Tunnel().pathUsed().size() - 1).dst().port();
                     tearDownPseudoWireTerm(pwToRemove.l2Tunnel(),
                                            pwToRemove.l2TunnelPolicy().cP2(),
                                            null,
-                                           FWD);
+                                           FWD,
+                                           termPort);
                 }
             });
         }
@@ -861,10 +883,12 @@
 
             revTermNextFuture.thenAcceptAsync(status -> {
                 if (status == null) {
+                    PortNumber termPort = pwToRemove.l2Tunnel().pathUsed().get(0).src().port();
                     tearDownPseudoWireTerm(pwToRemove.l2Tunnel(),
                                            pwToRemove.l2TunnelPolicy().cP1(),
                                            null,
-                                           REV);
+                                           REV,
+                                           termPort);
                 }
             });
         }
@@ -883,14 +907,25 @@
      */
     public Result tearDownPseudowire(long l2TunnelId) {
 
-        if (checkIfPwExists(l2TunnelId, true) == Result.SUCCESS) {
-            return tearDownConnectionPoints(l2TunnelId, true, true, true);
-        } else if (checkIfPwExists(l2TunnelId, false) == Result.SUCCESS) {
-            return tearDownConnectionPoints(l2TunnelId, true, true, false);
-        } else {
-            return Result.WRONG_PARAMETERS.appendError("Pseudowire with "
-                                                        + l2TunnelId
-                                                        + " did not reside in any store!");
+        try {
+            // take the lock
+            pwLock.lock();
+
+            if (checkIfPwExists(l2TunnelId, true) == Result.SUCCESS) {
+                return tearDownConnectionPoints(l2TunnelId, true, true, true);
+            } else if (checkIfPwExists(l2TunnelId, false) == Result.SUCCESS) {
+                return tearDownConnectionPoints(l2TunnelId, true, true, false);
+            } else {
+                return Result.WRONG_PARAMETERS.appendError("Pseudowire with "
+                                                                   + l2TunnelId
+                                                                   + " did not reside in any store!");
+            }
+        } catch (StorageException.Timeout e) {
+            log.error("Can not acquire distributed lock for pseudowire {}!", l2TunnelId);
+            return Result.INTERNAL_ERROR.appendError("Can not acquire distributed lock!");
+        } finally {
+            // release the lock
+            pwLock.unlock();
         }
     }
 
@@ -919,16 +954,17 @@
      * @param ingress      the ingress point
      * @param ingressInner the ingress inner tag
      * @param ingressOuter the ingress outer tag
-     * @param nextId       the next objective id
      * @param egressVlan   Vlan-id to set, depends on ingress vlan
      *                     combinations. For example, if pw is double tagged
      *                     then this is the value of the outer vlan, if single
      *                     tagged then it is the new value of the single tag.
      *                     Should be None for untagged traffic.
+     * @param nextId       the next objective id
      * @return the result of the operation
      */
     private Result deployPolicy(long tunnelId, ConnectPoint ingress, VlanId ingressInner,
                                 VlanId ingressOuter, VlanId egressVlan, int nextId) {
+        log.debug("Starting deploying policy for pseudowire {}.", tunnelId);
 
         List<Objective> objectives = Lists.newArrayList();
         // We create the forwarding objective for supporting
@@ -938,8 +974,9 @@
         ObjectiveContext context = new DefaultObjectiveContext((objective) ->
                                                                 log.debug("FwdObj for tunnel {} populated", tunnelId),
                                                                (objective, error) ->
-                                                                log.warn("Failed to populate fwdrObj " +
-                                                                                 "for tunnel {}", tunnelId, error));
+                                                                log.warn("Failed to populate fwdObj " +
+                                                                                 "for tunnel {} : {}",
+                                                                         tunnelId, error));
         objectives.add(fwdBuilder.add(context));
 
         // We create the filtering objective to define the
@@ -956,7 +993,8 @@
         // We create and add objective context.
         context = new DefaultObjectiveContext((objective) -> log.debug("FilterObj for tunnel {} populated", tunnelId),
                                               (objective, error) -> log.warn("Failed to populate filterObj for " +
-                                                                                     "tunnel {}", tunnelId, error));
+                                                                                     "tunnel {} : {}",
+                                                                             tunnelId, error));
         objectives.add(filtBuilder.add(context));
 
         for (Objective objective : objectives) {
@@ -985,9 +1023,10 @@
     private Result deployPseudoWireInit(L2Tunnel l2Tunnel, ConnectPoint ingress,
                                         ConnectPoint egress, Direction direction,
                                         Link nextHop, boolean spinePw, boolean oneHop, VlanId termVlanId) {
-
+        log.debug("Started deploying init next objectives for pseudowire {} for tunnel {} -> {}.",
+                  l2Tunnel.tunnelId(), ingress, egress);
         if (nextHop == null) {
-            log.warn("No path between ingress and egress cps for tunnel {}", l2Tunnel.tunnelId());
+            log.warn("No path between ingress and egress connection points for tunnel {}", l2Tunnel.tunnelId());
             return WRONG_PARAMETERS;
         }
 
@@ -1047,10 +1086,15 @@
      * @param egressVlan the expected vlan at egress
      * @param direction  the direction
      * @param spinePw if the pseudowire involves a spine switch
+     * @param inputTermPort the input port at the termination point for the pseudowire, used for installing special
+     *                      filtering rules at the termination
      * @return the result of the operation
      */
     private Result deployPseudoWireTerm(L2Tunnel l2Tunnel, ConnectPoint egress,
-                                        VlanId egressVlan, Direction direction, boolean spinePw, boolean oneHop) {
+                                        VlanId egressVlan, Direction direction,
+                                        boolean spinePw, boolean oneHop, PortNumber inputTermPort) {
+        log.debug("Started deploying termination objectives for pseudowire {} , direction {}.",
+                  l2Tunnel.tunnelId(), direction == FWD ? "forward" : "reverse");
 
         // We create the group relative to the termination.
         NextObjective.Builder nextObjectiveBuilder = createNextObjective(TERMINATION, egress, null,
@@ -1093,7 +1137,7 @@
         context = new DefaultObjectiveContext((objective) -> log.debug("FwdObj for tunnel termination {} populated",
                                                                        l2Tunnel.tunnelId()),
                                               (objective, error) -> log.warn("Failed to populate fwdrObj" +
-                                                                             " for tunnel termination {}",
+                                                                             " for tunnel termination {} : {}",
                                                                              l2Tunnel.tunnelId(), error));
         srManager.flowObjectiveService.forward(egress.deviceId(), fwdBuilder.add(context));
         log.debug("Creating new FwdObj for termination NextObj with id={} for tunnel {}",
@@ -1101,16 +1145,6 @@
 
         if (spinePw) {
 
-            // determine the input port at the
-            PortNumber inPort;
-
-            if (egress.deviceId().
-                    equals(l2Tunnel.pathUsed().get(0).dst().deviceId())) {
-                    inPort = l2Tunnel.pathUsed().get(0).dst().port();
-            } else {
-                    inPort = l2Tunnel.pathUsed().get(0).src().port();
-            }
-
             MacAddress dstMac;
             try {
                 dstMac = srManager.deviceConfiguration().getDeviceMac(egress.deviceId());
@@ -1122,10 +1156,10 @@
             log.info("Populating filtering objective for pseudowire transport" +
                              " with vlan = {}, port = {}, mac = {}",
                      l2Tunnel.transportVlan(),
-                     inPort,
+                     inputTermPort,
                      dstMac);
             FilteringObjective.Builder filteringObjectiveBuilder =
-                    createNormalPipelineFiltObjective(inPort, l2Tunnel.transportVlan(), dstMac);
+                    createNormalPipelineFiltObjective(inputTermPort, l2Tunnel.transportVlan(), dstMac);
             context = new DefaultObjectiveContext((objective) ->
                                                           log.debug("Special filtObj for  " + "for {} populated",
                                                                     l2Tunnel.tunnelId()),
@@ -1139,7 +1173,7 @@
             srManager.flowObjectiveService.filter(egress.deviceId(), filteringObjectiveBuilder.add(context));
             log.debug("Creating new special FiltObj for termination point with tunnel {} for port {}",
                       l2Tunnel.tunnelId(),
-                      inPort);
+                      inputTermPort);
         }
 
         return SUCCESS;
@@ -1157,7 +1191,7 @@
                                                                          VlanId vlanId,
                                                                          MacAddress dstMac) {
 
-        log.info("Creating filtering objective for pseudowire transport with vlan={}, port={}, mac={}",
+        log.debug("Creating filtering objective for pseudowire intermediate transport with vlan={}, port={}, mac={}",
                  vlanId,
                  inPort,
                  dstMac);
@@ -1186,7 +1220,7 @@
      */
     private FilteringObjective.Builder createFiltObjective(PortNumber inPort, VlanId innerTag, VlanId outerTag) {
 
-        log.info("Creating filtering objective for vlans {} / {}", outerTag, innerTag);
+        log.debug("Creating connection point filtering objective for vlans {} / {}", outerTag, innerTag);
         return DefaultFilteringObjective
                 .builder()
                 .withKey(Criteria.matchInPort(inPort))
@@ -1209,6 +1243,8 @@
     private ForwardingObjective.Builder createTermFwdObjective(MplsLabel pwLabel, long tunnelId,
                                                                PortNumber egressPort, int nextId) {
 
+        log.debug("Creating forwarding objective for termination for tunnel {} : pwLabel {}, egressPort {}, nextId {}",
+                 tunnelId, pwLabel, egressPort, nextId);
         TrafficSelector.Builder trafficSelector = DefaultTrafficSelector.builder();
         TrafficTreatment.Builder trafficTreatment = DefaultTrafficTreatment.builder();
         // The flow has to match on the pw label and bos
@@ -1244,6 +1280,7 @@
      */
     private ForwardingObjective.Builder createInitFwdObjective(long tunnelId, PortNumber inPort, int nextId) {
 
+        log.debug("Creating forwarding objective for tunnel {} : Port {} , nextId {}", tunnelId, inPort, nextId);
         TrafficSelector.Builder trafficSelector = DefaultTrafficSelector.builder();
 
         // The flow has to match on the mpls logical
@@ -1282,6 +1319,9 @@
                                                       ConnectPoint dstCp,  L2Tunnel l2Tunnel,
                                                       DeviceId egressId, boolean leafSpinePw,
                                                       boolean oneHop, VlanId termVlanId) {
+        log.debug("Creating {} next objective for pseudowire {}.",
+                  pipeline == TERMINATION ? "termination" : "inititation");
+
         NextObjective.Builder nextObjBuilder;
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
         if (pipeline == INITIATION) {
@@ -1292,7 +1332,7 @@
             // The pw label is the bottom of stack. It has to
             // be different -1.
             if (l2Tunnel.pwLabel().toInt() == MplsLabel.MAX_MPLS) {
-                log.warn("Pw label not configured");
+                log.error("Pw label not configured");
                 return null;
             }
             treatmentBuilder.pushMpls();
@@ -1318,7 +1358,7 @@
                     srLabel = MplsLabel.mplsLabel(srManager.deviceConfiguration().getPWRoutingLabel(egressId));
 
                 } catch (DeviceConfigNotFoundException e) {
-                    log.warn("Sr label for pw traffic not configured");
+                    log.error("Sr label for pw traffic not configured");
                     return null;
                 }
 
@@ -1333,7 +1373,7 @@
             try {
                 ingressMac = srManager.deviceConfiguration().getDeviceMac(srcCp.deviceId());
             } catch (DeviceConfigNotFoundException e) {
-                log.warn("Was not able to find the ingress mac");
+                log.error("Was not able to find the ingress mac");
                 return null;
             }
             treatmentBuilder.setEthSrc(ingressMac);
@@ -1341,7 +1381,7 @@
             try {
                 neighborMac = srManager.deviceConfiguration().getDeviceMac(dstCp.deviceId());
             } catch (DeviceConfigNotFoundException e) {
-                log.warn("Was not able to find the neighbor mac");
+                log.error("Was not able to find the neighbor mac");
                 return null;
             }
             treatmentBuilder.setEthDst(neighborMac);
@@ -1349,7 +1389,7 @@
             // if true we need to pop the vlan because
             // we instantiate a leaf to leaf pseudowire
             if (!leafSpinePw) {
-                log.info("We should carry this traffic UNTAGGED!");
+                log.debug("We should carry traffic UNTAGGED for pseudowire {}", l2Tunnel.tunnelId());
                 treatmentBuilder.popVlan();
             }
 
@@ -1386,6 +1426,8 @@
      *         current SRLinkWeigher
      */
     private L2TunnelPolicy reverseL2TunnelPolicy(L2TunnelPolicy policy) {
+
+        log.debug("Reversing policy for pseudowire.");
         try {
             // if cp1 is a leaf, just return
             if (srManager.deviceConfiguration().isEdgeDevice(policy.cP1().deviceId())) {
@@ -1470,7 +1512,7 @@
 
         String key = generateKey(tunnelId, direction);
         if (!l2InitiationNextObjStore.containsKey(key)) {
-            log.warn("Abort delete of policy for tunnel {}: next does not exist in the store", tunnelId);
+            log.error("Abort delete of policy for tunnel {}: next does not exist in the store", tunnelId);
             if (future != null) {
                 future.complete(null);
             }
@@ -1492,7 +1534,7 @@
 
             @Override
             public void onError(Objective objective, ObjectiveError error) {
-                log.warn("Failed to remove previous fwdObj for policy {}: {}", tunnelId, error);
+                log.error("Failed to remove previous fwdObj for policy {}: {}", tunnelId, error);
                 if (future != null) {
                     future.complete(error);
                 }
@@ -1532,10 +1574,11 @@
      */
     private void tearDownPseudoWireInit(long l2TunnelId, ConnectPoint ingress,
                                         CompletableFuture<ObjectiveError> future, Direction direction) {
-
+        log.debug("Starting tearing dowing initation of pseudowire {} for direction {}.",
+                  l2TunnelId, direction == FWD ? "forward" : "reverse");
         String key = generateKey(l2TunnelId, direction);
         if (!l2InitiationNextObjStore.containsKey(key)) {
-            log.info("Abort delete of {} for {}: next does not exist in the store", INITIATION, key);
+            log.error("Abort delete of {} for {}: next does not exist in the store", INITIATION, key);
             if (future != null) {
                 future.complete(null);
             }
@@ -1582,11 +1625,13 @@
     private void tearDownPseudoWireTerm(L2Tunnel l2Tunnel,
                                         ConnectPoint egress,
                                         CompletableFuture<ObjectiveError> future,
-                                        Direction direction) {
-
+                                        Direction direction,
+                                        PortNumber inPort) {
+        log.debug("Starting tearing down termination for pseudowire {} direction {}.",
+                  l2Tunnel.tunnelId(), direction == FWD ? "forward" : "reverse");
         String key = generateKey(l2Tunnel.tunnelId(), direction);
         if (!l2TerminationNextObjStore.containsKey(key)) {
-            log.info("Abort delete of {} for {}: next does not exist in the store", TERMINATION, key);
+            log.error("Abort delete of {} for {}: next does not exist in the store", TERMINATION, key);
             if (future != null) {
                 future.complete(null);
             }
@@ -1641,21 +1686,11 @@
         // spine-spine pws
         if (!l2Tunnel.transportVlan().equals(UNTAGGED_TRANSPORT_VLAN)) {
 
-            // determine the input port at the
-            PortNumber inPort;
-
-            if (egress.deviceId().
-                    equals(l2Tunnel.pathUsed().get(0).dst().deviceId())) {
-                inPort = l2Tunnel.pathUsed().get(0).dst().port();
-            } else {
-                inPort = l2Tunnel.pathUsed().get(0).src().port();
-            }
-
             MacAddress dstMac;
             try {
                 dstMac = srManager.deviceConfiguration().getDeviceMac(egress.deviceId());
             } catch (Exception e) {
-                log.info("Device not found in configuration, no programming of MAC address");
+                log.error("Device not found in configuration, no programming of MAC address");
                 dstMac = null;
             }
 
@@ -1668,9 +1703,11 @@
                     createNormalPipelineFiltObjective(inPort, l2Tunnel.transportVlan(), dstMac);
             context = new DefaultObjectiveContext((objective) ->
                                                           log.debug("Special filtObj for  " + "for {} removed",
-                                                                    l2Tunnel.tunnelId()), (objective, error) ->
-                    log.warn("Failed to populate " + "special filtObj " +
-                                     "rule for {}: {}", l2Tunnel.tunnelId(), error));
+                                                                    l2Tunnel.tunnelId()),
+                                                  (objective, error) ->
+                                                          log.warn("Failed to populate " + "special filtObj " +
+                                                                           "rule for {}: {}",
+                                                                   l2Tunnel.tunnelId(), error));
             TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
             filteringObjectiveBuilder.withMeta(treatment.build());
             srManager.flowObjectiveService.filter(egress.deviceId(), filteringObjectiveBuilder.remove(context));
@@ -1693,5 +1730,4 @@
     private String generateKey(long tunnelId, Direction direction) {
         return String.format("%s-%s", tunnelId, direction);
     }
-
 }