[ONOS-4170] LSP-DB sync

Change-Id: Icda3afd9cca8d1fb8c58b44da6bc26064b300388
diff --git a/apps/pce/app/src/main/java/org/onosproject/pce/pceservice/PceManager.java b/apps/pce/app/src/main/java/org/onosproject/pce/pceservice/PceManager.java
index bd2edf5..89a685b 100644
--- a/apps/pce/app/src/main/java/org/onosproject/pce/pceservice/PceManager.java
+++ b/apps/pce/app/src/main/java/org/onosproject/pce/pceservice/PceManager.java
@@ -30,6 +30,9 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IPv4;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -38,6 +41,7 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
+import org.onlab.packet.TCP;
 import org.onlab.util.Bandwidth;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
@@ -139,6 +143,7 @@
     private static final String TRUE = "true";
     private static final String FALSE = "false";
     private static final String END_OF_SYNC_IP_PREFIX = "0.0.0.0/32";
+    public static final int PCEP_PORT = 4189;
 
     private IdGenerator localLspIdIdGen;
     protected DistributedSet<Short> localLspIdFreeList;
@@ -635,7 +640,7 @@
                     LinkEvent linkEvent = (LinkEvent) e;
                     if (linkEvent.type() == LinkEvent.Type.LINK_REMOVED) {
                         tunnelService.queryTunnel(MPLS).forEach(t -> {
-                                if (t.path().links().contains(((Link) e.subject()))) {
+                                if (t.path().links().contains((e.subject()))) {
                                     // Check whether this ONOS instance is master for ingress device if yes,
                                     // recompute and send update
                                     checkForMasterAndUpdateTunnel(t.path().src().deviceId(), t);
@@ -949,12 +954,30 @@
         public void process(PacketContext context) {
             // Stop processing if the packet has been handled, since we
             // can't do any more to it.
-
             if (context.isHandled()) {
                 return;
             }
 
             InboundPacket pkt = context.inPacket();
+            if (pkt == null) {
+                return;
+            }
+
+            Ethernet ethernet = pkt.parsed();
+            if (ethernet == null || ethernet.getEtherType() != Ethernet.TYPE_IPV4) {
+                return;
+            }
+
+            IPv4 ipPacket = (IPv4) ethernet.getPayload();
+            if (ipPacket == null || ipPacket.getProtocol() != IPv4.PROTOCOL_TCP) {
+                return;
+            }
+
+            TCP tcp = (TCP) ipPacket.getPayload();
+            if (tcp == null || tcp.getDestinationPort() != PCEP_PORT) {
+                return;
+            }
+
             syncLabelDb(pkt.receivedFrom().deviceId());
         }
     }