ONOS-2812 Refactored the link code in search of a defect; the root cause was old OVS-based switch which is wrongly forwarding LLDP frames.

ONOS can fix this later by tracking links to be pruned using port pairs.

Change-Id: Ia79ec69946daff80636f5ab4b75a3dcdba91465d
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
index 0c933162..e9e2bfa 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
@@ -275,6 +275,10 @@
         packetService.cancelPackets(selector.build(), PacketPriority.CONTROL, appId);
     }
 
+    private LinkDiscovery createLinkDiscovery(Device device) {
+        return new LinkDiscovery(device, packetService, masterService,
+                                 providerService, useBDDP);
+    }
 
     private class InternalRoleListener implements MastershipListener {
 
@@ -297,11 +301,8 @@
             synchronized (discoverers) {
                 if (!discoverers.containsKey(deviceId)) {
                     // ideally, should never reach here
-                    log.debug("Device mastership changed ({}) {}",
-                              event.type(), deviceId);
-                    discoverers.put(deviceId, new LinkDiscovery(device,
-                                                                packetService, masterService, providerService,
-                                                                useBDDP));
+                    log.debug("Device mastership changed ({}) {}", event.type(), deviceId);
+                    discoverers.put(deviceId, createLinkDiscovery(device));
                 }
             }
         }
@@ -331,15 +332,11 @@
                                 log.debug("LinkDiscovery from {} disabled by configuration", device.id());
                                 return;
                             }
-                            log.debug("Device added ({}) {}", event.type(),
-                                      deviceId);
-                            discoverers.put(deviceId, new LinkDiscovery(device,
-                                                                        packetService, masterService,
-                                                                        providerService, useBDDP));
+                            log.debug("Device added ({}) {}", event.type(), deviceId);
+                            discoverers.put(deviceId, createLinkDiscovery(device));
                         } else {
                             if (ld.isStopped()) {
-                                log.debug("Device restarted ({}) {}", event.type(),
-                                          deviceId);
+                                log.debug("Device restarted ({}) {}", event.type(), deviceId);
                                 ld.start();
                             }
                         }
@@ -363,15 +360,13 @@
                         }
                     } else {
                         log.debug("Port down {}", port);
-                        ConnectPoint point = new ConnectPoint(deviceId,
-                                                              port.number());
+                        ConnectPoint point = new ConnectPoint(deviceId, port.number());
                         providerService.linksVanished(point);
                     }
                     break;
                 case PORT_REMOVED:
                     log.debug("Port removed {}", port);
-                    ConnectPoint point = new ConnectPoint(deviceId,
-                                                          port.number());
+                    ConnectPoint point = new ConnectPoint(deviceId, port.number());
                     providerService.linksVanished(point);
 
                     break;
@@ -411,8 +406,7 @@
             if (context == null) {
                 return;
             }
-            LinkDiscovery ld = discoverers.get(
-                    context.inPacket().receivedFrom().deviceId());
+            LinkDiscovery ld = discoverers.get(context.inPacket().receivedFrom().deviceId());
             if (ld == null) {
                 return;
             }
@@ -441,8 +435,7 @@
                     synchronized (discoverers) {
                         LinkDiscovery discoverer = discoverers.get(did);
                         if (discoverer == null) {
-                            discoverer = new LinkDiscovery(dev, packetService,
-                                    masterService, providerService, useBDDP);
+                            discoverer = createLinkDiscovery(dev);
                             discoverers.put(did, discoverer);
                         }
 
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
index f4ac22e..a81eeb1 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
@@ -15,21 +15,8 @@
  */
 package org.onosproject.provider.lldp.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static org.onosproject.net.PortNumber.portNumber;
-import static org.onosproject.net.flow.DefaultTrafficTreatment.builder;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicInteger;
-
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.jboss.netty.util.Timeout;
 import org.jboss.netty.util.TimerTask;
 import org.onlab.packet.Ethernet;
@@ -51,7 +38,20 @@
 import org.onosproject.net.packet.PacketService;
 import org.slf4j.Logger;
 
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.onosproject.net.PortNumber.portNumber;
+import static org.onosproject.net.flow.DefaultTrafficTreatment.builder;
+import static org.slf4j.LoggerFactory.getLogger;
+
 // TODO: add 'fast discovery' mode: drop LLDPs in destination switch but listen for flow_removed messages
+// FIXME: add ability to track links using port pairs or the link inventory
 
 /**
  * Run discovery process from a physical switch. Ports are initially labeled as
@@ -62,74 +62,79 @@
  */
 public class LinkDiscovery implements TimerTask {
 
-    private final Device device;
-    // send 1 probe every probeRate milliseconds
-    private final long probeRate;
-    private final Set<Long> slowPorts;
-    // ports, known to have incoming links
-    private final Set<Long> fastPorts;
-    // number of unacknowledged probes per port
-    private final Map<Long, AtomicInteger> portProbeCount;
-    // number of probes to send before link is removed
-    private static final short MAX_PROBE_COUNT = 3;
     private final Logger log = getLogger(getClass());
+
+    private static final short MAX_PROBE_COUNT = 3; // probes to send before link is removed
+    private static final short DEFAULT_PROBE_RATE = 3000; // millis
+    private static final String SRC_MAC = "DE:AD:BE:EF:BA:11";
+    private static final String SERVICE_NULL = "Service cannot be null";
+
+    private final Device device;
+
+    // send 1 probe every probeRate milliseconds
+    private final long probeRate = DEFAULT_PROBE_RATE;
+
+    private final Set<Long> slowPorts = Sets.newConcurrentHashSet();
+    // ports, known to have incoming links
+    private final Set<Long> fastPorts = Sets.newConcurrentHashSet();
+
+    // number of unacknowledged probes per port
+    private final Map<Long, AtomicInteger> portProbeCount = Maps.newHashMap();
+
     private final ONOSLLDP lldpPacket;
     private final Ethernet ethPacket;
     private Ethernet bddpEth;
     private final boolean useBDDP;
+
+    private Timeout timeout;
+    private volatile boolean isStopped;
+
     private final LinkProviderService linkProvider;
     private final PacketService pktService;
     private final MastershipService mastershipService;
-    private Timeout timeout;
-    private volatile boolean isStopped;
 
     /**
      * Instantiates discovery manager for the given physical switch. Creates a
      * generic LLDP packet that will be customized for the port it is sent out on.
      * Starts the the timer for the discovery process.
      *
-     * @param device        the physical switch
-     * @param pktService    packet service
-     * @param masterService mastership service
+     * @param device          the physical switch
+     * @param pktService      packet service
+     * @param masterService   mastership service
      * @param providerService link provider service
-     * @param useBDDP       flag to also use BDDP for discovery
+     * @param useBDDP         flag to also use BDDP for discovery
      */
     public LinkDiscovery(Device device, PacketService pktService,
                          MastershipService masterService,
                          LinkProviderService providerService, Boolean... useBDDP) {
         this.device = device;
-        this.probeRate = 3000;
-        this.linkProvider = providerService;
-        this.pktService = pktService;
+        this.linkProvider = checkNotNull(providerService, SERVICE_NULL);
+        this.pktService = checkNotNull(pktService, SERVICE_NULL);
+        this.mastershipService = checkNotNull(masterService, SERVICE_NULL);
 
-        this.mastershipService = checkNotNull(masterService, "WTF!");
-        this.slowPorts = Collections.synchronizedSet(new HashSet<>());
-        this.fastPorts = Collections.synchronizedSet(new HashSet<>());
-        this.portProbeCount = new HashMap<>();
-        this.lldpPacket = new ONOSLLDP();
-        this.lldpPacket.setChassisId(device.chassisId());
-        this.lldpPacket.setDevice(device.id().toString());
+        lldpPacket = new ONOSLLDP();
+        lldpPacket.setChassisId(device.chassisId());
+        lldpPacket.setDevice(device.id().toString());
 
+        ethPacket = new Ethernet();
+        ethPacket.setEtherType(Ethernet.TYPE_LLDP);
+        ethPacket.setDestinationMACAddress(ONOSLLDP.LLDP_NICIRA);
+        ethPacket.setPayload(this.lldpPacket);
+        ethPacket.setPad(true);
 
-        this.ethPacket = new Ethernet();
-        this.ethPacket.setEtherType(Ethernet.TYPE_LLDP);
-        this.ethPacket.setDestinationMACAddress(ONOSLLDP.LLDP_NICIRA);
-        this.ethPacket.setPayload(this.lldpPacket);
-        this.ethPacket.setPad(true);
         this.useBDDP = useBDDP.length > 0 ? useBDDP[0] : false;
         if (this.useBDDP) {
-            this.bddpEth = new Ethernet();
-            this.bddpEth.setPayload(this.lldpPacket);
-            this.bddpEth.setEtherType(Ethernet.TYPE_BSN);
-            this.bddpEth.setDestinationMACAddress(ONOSLLDP.BDDP_MULTICAST);
-            this.bddpEth.setPad(true);
+            bddpEth = new Ethernet();
+            bddpEth.setPayload(lldpPacket);
+            bddpEth.setEtherType(Ethernet.TYPE_BSN);
+            bddpEth.setDestinationMACAddress(ONOSLLDP.BDDP_MULTICAST);
+            bddpEth.setPad(true);
             log.info("Using BDDP to discover network");
         }
 
-        this.isStopped = true;
+        isStopped = true;
         start();
-        this.log.debug("Started discovery manager for switch {}",
-                       device.id());
+        log.debug("Started discovery manager for switch {}", device.id());
 
     }
 
@@ -139,19 +144,18 @@
      *
      * @param port the port
      */
-    public void addPort(final Port port) {
+    public void addPort(Port port) {
         boolean newPort = false;
         synchronized (this) {
             if (!containsPort(port.number().toLong())) {
                 newPort = true;
-                this.slowPorts.add(port.number().toLong());
+                slowPorts.add(port.number().toLong());
             }
         }
 
         boolean isMaster = mastershipService.isLocalMaster(device.id());
         if (newPort && isMaster) {
-            this.log.debug("Sending init probe to port {}@{}",
-                    port.number().toLong(), device.id());
+            log.debug("Sending init probe to port {}@{}", port.number().toLong(), device.id());
             sendProbes(port.number().toLong());
         }
     }
@@ -161,21 +165,19 @@
      *
      * @param port the port
      */
-    public void removePort(final Port port) {
+    public void removePort(Port port) {
         // Ignore ports that are not on this switch
-
         long portnum = port.number().toLong();
         synchronized (this) {
-            if (this.slowPorts.contains(portnum)) {
-                this.slowPorts.remove(portnum);
+            if (slowPorts.contains(portnum)) {
+                slowPorts.remove(portnum);
 
-            } else if (this.fastPorts.contains(portnum)) {
-                this.fastPorts.remove(portnum);
-                this.portProbeCount.remove(portnum);
+            } else if (fastPorts.contains(portnum)) {
+                fastPorts.remove(portnum);
+                portProbeCount.remove(portnum);
                 // no iterator to update
             } else {
-                this.log.warn("Tried to dynamically remove non-existing port {}",
-                              portnum);
+                log.warn("Tried to dynamically remove non-existing port {}", portnum);
             }
         }
     }
@@ -187,18 +189,17 @@
      *
      * @param portNumber the port
      */
-    public void ackProbe(final Long portNumber) {
+    public void ackProbe(Long portNumber) {
         synchronized (this) {
-            if (this.slowPorts.contains(portNumber)) {
-                this.log.debug("Setting slow port to fast: {}:{}",
-                               this.device.id(), portNumber);
-                this.slowPorts.remove(portNumber);
-                this.fastPorts.add(portNumber);
-                this.portProbeCount.put(portNumber, new AtomicInteger(0));
-            } else if (this.fastPorts.contains(portNumber)) {
-                this.portProbeCount.get(portNumber).set(0);
+            if (slowPorts.contains(portNumber)) {
+                log.debug("Setting slow port to fast: {}:{}", device.id(), portNumber);
+                slowPorts.remove(portNumber);
+                fastPorts.add(portNumber);
+                portProbeCount.put(portNumber, new AtomicInteger(0));
+            } else if (fastPorts.contains(portNumber)) {
+                portProbeCount.get(portNumber).set(0);
             } else {
-                this.log.debug("Got ackProbe for non-existing port: {}", portNumber);
+                log.debug("Got ackProbe for non-existing port: {}", portNumber);
             }
         }
     }
@@ -207,6 +208,7 @@
     /**
      * Handles an incoming LLDP packet. Creates link in topology and sends ACK
      * to port where LLDP originated.
+     *
      * @param context packet context
      * @return true if handled
      */
@@ -218,21 +220,18 @@
 
         ONOSLLDP onoslldp = ONOSLLDP.parseONOSLLDP(eth);
         if (onoslldp != null) {
-            final PortNumber dstPort =
-                    context.inPacket().receivedFrom().port();
-            final PortNumber srcPort = portNumber(onoslldp.getPort());
-            final DeviceId srcDeviceId = DeviceId.deviceId(onoslldp.getDeviceString());
-            final DeviceId dstDeviceId = context.inPacket().receivedFrom().deviceId();
-            this.ackProbe(dstPort.toLong());
+            PortNumber srcPort = portNumber(onoslldp.getPort());
+            PortNumber dstPort = context.inPacket().receivedFrom().port();
+            DeviceId srcDeviceId = DeviceId.deviceId(onoslldp.getDeviceString());
+            DeviceId dstDeviceId = context.inPacket().receivedFrom().deviceId();
+            ackProbe(dstPort.toLong());
+
             ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort);
             ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort);
 
-            LinkDescription ld;
-            if (eth.getEtherType() == Ethernet.TYPE_BSN) {
-                ld = new DefaultLinkDescription(src, dst, Type.INDIRECT);
-            } else {
-                ld = new DefaultLinkDescription(src, dst, Type.DIRECT);
-            }
+            LinkDescription ld = eth.getEtherType() == Ethernet.TYPE_LLDP ?
+                    new DefaultLinkDescription(src, dst, Type.DIRECT) :
+                    new DefaultLinkDescription(src, dst, Type.INDIRECT);
 
             try {
                 linkProvider.linkDetected(ld);
@@ -253,38 +252,36 @@
      * @param t timeout
      */
     @Override
-    public void run(final Timeout t) {
+    public void run(Timeout t) {
         if (isStopped()) {
             return;
         }
         if (!mastershipService.isLocalMaster(device.id())) {
             if (!isStopped()) {
                 // reschedule timer
-                timeout = Timer.getTimer().newTimeout(this, this.probeRate, MILLISECONDS);
+                timeout = Timer.getTimer().newTimeout(this, probeRate, MILLISECONDS);
             }
             return;
         }
 
-        this.log.trace("Sending probes from {}", device.id());
+        log.trace("Sending probes from {}", device.id());
         synchronized (this) {
-            final Iterator<Long> fastIterator = this.fastPorts.iterator();
+            Iterator<Long> fastIterator = fastPorts.iterator();
             while (fastIterator.hasNext()) {
                 long portNumber = fastIterator.next();
                 int probeCount = portProbeCount.get(portNumber).getAndIncrement();
 
                 if (probeCount < LinkDiscovery.MAX_PROBE_COUNT) {
-                    this.log.trace("Sending fast probe to port {}", portNumber);
+                    log.trace("Sending fast probe to port {}", portNumber);
                     sendProbes(portNumber);
 
                 } else {
-                    // Link down, demote to slowPorts
-                    // Update fast and slow ports
+                    // Link down, demote to slowPorts; update fast and slow ports
                     fastIterator.remove();
-                    this.slowPorts.add(portNumber);
-                    this.portProbeCount.remove(portNumber);
+                    slowPorts.add(portNumber);
+                    portProbeCount.remove(portNumber);
 
-                    ConnectPoint cp = new ConnectPoint(device.id(),
-                                                       portNumber(portNumber));
+                    ConnectPoint cp = new ConnectPoint(device.id(), portNumber(portNumber));
                     log.debug("Link down -> {}", cp);
                     linkProvider.linksVanished(cp);
                 }
@@ -292,14 +289,14 @@
 
             // send a probe for the next slow port
             for (long portNumber : slowPorts) {
-                this.log.trace("Sending slow probe to port {}", portNumber);
+                log.trace("Sending slow probe to port {}", portNumber);
                 sendProbes(portNumber);
             }
         }
 
         if (!isStopped()) {
             // reschedule timer
-            timeout = Timer.getTimer().newTimeout(this, this.probeRate, MILLISECONDS);
+            timeout = Timer.getTimer().newTimeout(this, probeRate, MILLISECONDS);
         }
     }
 
@@ -323,17 +320,15 @@
      * @param port the port
      * @return Packet_out message with LLDP data
      */
-    private OutboundPacket createOutBoundLLDP(final Long port) {
+    private OutboundPacket createOutBoundLLDP(Long port) {
         if (port == null) {
             return null;
         }
-        this.lldpPacket.setPortId(port.intValue());
-        this.ethPacket.setSourceMACAddress("DE:AD:BE:EF:BA:11");
-
-        final byte[] lldp = this.ethPacket.serialize();
-        return new DefaultOutboundPacket(this.device.id(),
+        lldpPacket.setPortId(port.intValue());
+        ethPacket.setSourceMACAddress(SRC_MAC);
+        return new DefaultOutboundPacket(device.id(),
                                          builder().setOutput(portNumber(port)).build(),
-                                         ByteBuffer.wrap(lldp));
+                                         ByteBuffer.wrap(ethPacket.serialize()));
     }
 
     /**
@@ -342,25 +337,23 @@
      * @param port the port
      * @return Packet_out message with LLDP data
      */
-    private OutboundPacket createOutBoundBDDP(final Long port) {
+    private OutboundPacket createOutBoundBDDP(Long port) {
         if (port == null) {
             return null;
         }
-        this.lldpPacket.setPortId(port.intValue());
-        this.bddpEth.setSourceMACAddress("DE:AD:BE:EF:BA:11");
-
-        final byte[] bddp = this.bddpEth.serialize();
-        return new DefaultOutboundPacket(this.device.id(),
+        lldpPacket.setPortId(port.intValue());
+        bddpEth.setSourceMACAddress(SRC_MAC);
+        return new DefaultOutboundPacket(device.id(),
                                          builder().setOutput(portNumber(port)).build(),
-                                         ByteBuffer.wrap(bddp));
+                                         ByteBuffer.wrap(bddpEth.serialize()));
     }
 
     private void sendProbes(Long portNumber) {
         log.trace("Sending probes out to {}@{}", portNumber, device.id());
-        OutboundPacket pkt = this.createOutBoundLLDP(portNumber);
+        OutboundPacket pkt = createOutBoundLLDP(portNumber);
         pktService.emit(pkt);
         if (useBDDP) {
-            OutboundPacket bpkt = this.createOutBoundBDDP(portNumber);
+            OutboundPacket bpkt = createOutBoundBDDP(portNumber);
             pktService.emit(bpkt);
         }
     }