[ONOS-4170] LSP-DB sync

Change-Id: Icda3afd9cca8d1fb8c58b44da6bc26064b300388
diff --git a/apps/pce/app/src/main/java/org/onosproject/pce/pceservice/PceManager.java b/apps/pce/app/src/main/java/org/onosproject/pce/pceservice/PceManager.java
index bd2edf5..89a685b 100644
--- a/apps/pce/app/src/main/java/org/onosproject/pce/pceservice/PceManager.java
+++ b/apps/pce/app/src/main/java/org/onosproject/pce/pceservice/PceManager.java
@@ -30,6 +30,9 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IPv4;
+
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -38,6 +41,7 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
+import org.onlab.packet.TCP;
 import org.onlab.util.Bandwidth;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
@@ -139,6 +143,7 @@
     private static final String TRUE = "true";
     private static final String FALSE = "false";
     private static final String END_OF_SYNC_IP_PREFIX = "0.0.0.0/32";
+    public static final int PCEP_PORT = 4189;
 
     private IdGenerator localLspIdIdGen;
     protected DistributedSet<Short> localLspIdFreeList;
@@ -635,7 +640,7 @@
                     LinkEvent linkEvent = (LinkEvent) e;
                     if (linkEvent.type() == LinkEvent.Type.LINK_REMOVED) {
                         tunnelService.queryTunnel(MPLS).forEach(t -> {
-                                if (t.path().links().contains(((Link) e.subject()))) {
+                                if (t.path().links().contains((e.subject()))) {
                                     // Check whether this ONOS instance is master for ingress device if yes,
                                     // recompute and send update
                                     checkForMasterAndUpdateTunnel(t.path().src().deviceId(), t);
@@ -949,12 +954,30 @@
         public void process(PacketContext context) {
             // Stop processing if the packet has been handled, since we
             // can't do any more to it.
-
             if (context.isHandled()) {
                 return;
             }
 
             InboundPacket pkt = context.inPacket();
+            if (pkt == null) {
+                return;
+            }
+
+            Ethernet ethernet = pkt.parsed();
+            if (ethernet == null || ethernet.getEtherType() != Ethernet.TYPE_IPV4) {
+                return;
+            }
+
+            IPv4 ipPacket = (IPv4) ethernet.getPayload();
+            if (ipPacket == null || ipPacket.getProtocol() != IPv4.PROTOCOL_TCP) {
+                return;
+            }
+
+            TCP tcp = (TCP) ipPacket.getPayload();
+            if (tcp == null || tcp.getDestinationPort() != PCEP_PORT) {
+                return;
+            }
+
             syncLabelDb(pkt.receivedFrom().deviceId());
         }
     }
diff --git a/apps/pce/app/src/test/java/org/onosproject/pce/pceservice/PceManagerTest.java b/apps/pce/app/src/test/java/org/onosproject/pce/pceservice/PceManagerTest.java
index dee7218..2da22e5 100644
--- a/apps/pce/app/src/test/java/org/onosproject/pce/pceservice/PceManagerTest.java
+++ b/apps/pce/app/src/test/java/org/onosproject/pce/pceservice/PceManagerTest.java
@@ -26,7 +26,6 @@
 import static org.onosproject.net.MastershipRole.MASTER;
 
 import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -44,6 +43,7 @@
 import org.onlab.junit.TestUtils.TestUtilsException;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IPv4;
+import org.onlab.packet.TCP;
 import org.onlab.util.Bandwidth;
 import org.onosproject.common.DefaultTopologyGraph;
 import org.onosproject.core.ApplicationId;
@@ -112,6 +112,7 @@
 import org.onosproject.store.service.TestStorageService;
 
 import com.google.common.collect.ImmutableSet;
+import static org.onosproject.pce.pceservice.PceManager.PCEP_PORT;
 
 /**
  * Tests the functions of PceManager.
@@ -675,21 +676,18 @@
      * Tests packet in to trigger label DB sync.
      */
     @Test
-    public void packetProcessingTest() throws URISyntaxException {
+    public void packetProcessingTest1() throws URISyntaxException {
 
         build4RouterTopo(false, true, true, true, 0); // This also initializes devices etc.
 
-        final int srcHost = 2;
-        final int dstHost = 5;
-
         LabelResourceId node1Label = LabelResourceId.labelResourceId(5200);
         LabelResourceId node2Label = LabelResourceId.labelResourceId(5201);
 
         pceManager.pceStore.addGlobalNodeLabel(D1.deviceId(), node1Label);
         pceManager.pceStore.addGlobalNodeLabel(D2.deviceId(), node2Label);
 
-        ConnectPoint src = new ConnectPoint(D1.deviceId(), PortNumber.portNumber(srcHost));
-        ConnectPoint dst = new ConnectPoint(D2.deviceId(), PortNumber.portNumber(dstHost));
+        ConnectPoint src = new ConnectPoint(D1.deviceId(), PortNumber.portNumber(1));
+        ConnectPoint dst = new ConnectPoint(D2.deviceId(), PortNumber.portNumber(2));
 
         Link link1 = DefaultLink.builder().src(src).dst(dst).state(ACTIVE).type(DIRECT)
                 .providerId(new ProviderId("eth", "1")).build();
@@ -697,24 +695,66 @@
         LabelResourceId link1Label = LabelResourceId.labelResourceId(5204);
         pceManager.pceStore.addAdjLabel(link1, link1Label);
 
-        Ethernet eth;
-        IPv4 ipv4;
+        TCP tcp = new TCP();
+        tcp.setDestinationPort(PCEP_PORT);
 
-        ipv4 = new IPv4();
-        eth = new Ethernet();
+        IPv4 ipv4 = new IPv4();
+        ipv4.setProtocol(IPv4.PROTOCOL_TCP);
+        ipv4.setPayload(tcp);
+
+        Ethernet eth = new Ethernet();
         eth.setEtherType(Ethernet.TYPE_IPV4);
         eth.setPayload(ipv4);
 
-        eth.setSourceMACAddress("00:00:00:10:00:0" + srcHost).setDestinationMACAddress("00:00:00:10:00:0" + dstHost);
-
-        InboundPacket inPkt = new DefaultInboundPacket(new ConnectPoint(D1.deviceId(), PortNumber.portNumber(srcHost)),
-                                                       eth, ByteBuffer.wrap(eth.serialize()));
+        InboundPacket inPkt = new DefaultInboundPacket(new ConnectPoint(D1.deviceId(),
+                                                                        PortNumber.portNumber(PCEP_PORT)),
+                                                       eth, null);
 
         pktProcessor.process(new MockPcepPacketContext(inPkt, null));
         assertThat(flowsDownloaded, is(4));
     }
 
     /**
+     * Tests faulty packet in to trigger label DB sync.
+     */
+    @Test
+    public void packetProcessingTest2() throws URISyntaxException {
+
+        build4RouterTopo(false, true, true, true, 0); // This also initializes devices etc.
+
+        LabelResourceId node1Label = LabelResourceId.labelResourceId(5200);
+        LabelResourceId node2Label = LabelResourceId.labelResourceId(5201);
+
+        pceManager.pceStore.addGlobalNodeLabel(D1.deviceId(), node1Label);
+        pceManager.pceStore.addGlobalNodeLabel(D2.deviceId(), node2Label);
+
+        ConnectPoint src = new ConnectPoint(D1.deviceId(), PortNumber.portNumber(1));
+        ConnectPoint dst = new ConnectPoint(D2.deviceId(), PortNumber.portNumber(2));
+
+        Link link1 = DefaultLink.builder().src(src).dst(dst).state(ACTIVE).type(DIRECT)
+                .providerId(new ProviderId("eth", "1")).build();
+
+        LabelResourceId link1Label = LabelResourceId.labelResourceId(5204);
+        pceManager.pceStore.addAdjLabel(link1, link1Label);
+
+        TCP tcp = new TCP(); // Not set the pcep port.
+        IPv4 ipv4 = new IPv4();
+        ipv4.setProtocol(IPv4.PROTOCOL_TCP);
+        ipv4.setPayload(tcp);
+
+        Ethernet eth = new Ethernet();
+        eth.setEtherType(Ethernet.TYPE_IPV4);
+        eth.setPayload(ipv4);
+
+        InboundPacket inPkt = new DefaultInboundPacket(new ConnectPoint(D1.deviceId(),
+                                                                        PortNumber.portNumber(PCEP_PORT)),
+                                                       eth, null);
+
+        pktProcessor.process(new MockPcepPacketContext(inPkt, null));
+        assertThat(flowsDownloaded, is(0));
+    }
+
+    /**
      * Tests tunnel events added and removed.
      */
     @Test