ONOS-2846, ONOS-2812 Refactored link discovery pruning to be centralized rather than being with each link discovery helper.

This will make it behave properly in a distributed context.

Change-Id: I9b9788336468c41d1cf506e388306ad9136d5853
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
index f7fe8ab..5502624 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.provider.lldp.impl;
 
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.jboss.netty.util.Timeout;
 import org.jboss.netty.util.TimerTask;
@@ -37,9 +36,7 @@
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
-import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.onosproject.net.PortNumber.portNumber;
@@ -53,7 +50,7 @@
  * LLDP, send an LLDP for a single slow port. Based on FlowVisor topology
  * discovery implementation.
  */
-public class LinkDiscovery implements TimerTask {
+class LinkDiscovery implements TimerTask {
 
     private final Logger log = getLogger(getClass());
 
@@ -72,9 +69,6 @@
     // Set of ports to be probed
     private final Set<Long> ports = Sets.newConcurrentHashSet();
 
-    // Most recent time a link was seen
-    private final Map<LinkKey, Long> linkTimes = Maps.newConcurrentMap();
-
     /**
      * Instantiates discovery manager for the given physical switch. Creates a
      * generic LLDP packet that will be customized for the port it is sent out on.
@@ -83,7 +77,7 @@
      * @param device  the physical switch
      * @param context discovery context
      */
-    public LinkDiscovery(Device device, DiscoveryContext context) {
+    LinkDiscovery(Device device, DiscoveryContext context) {
         this.device = device;
         this.context = context;
 
@@ -102,7 +96,6 @@
         bddpEth.setEtherType(Ethernet.TYPE_BSN);
         bddpEth.setDestinationMACAddress(ONOSLLDP.BDDP_MULTICAST);
         bddpEth.setPad(true);
-        log.info("Using BDDP to discover network");
 
         isStopped = true;
         start();
@@ -110,46 +103,47 @@
 
     }
 
+    synchronized void stop() {
+        isStopped = true;
+        timeout.cancel();
+    }
+
+    synchronized void start() {
+        if (isStopped) {
+            isStopped = false;
+            timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
+        } else {
+            log.warn("LinkDiscovery started multiple times?");
+        }
+    }
+
+    synchronized boolean isStopped() {
+        return isStopped || timeout.isCancelled();
+    }
+
     /**
      * Add physical port port to discovery process.
      * Send out initial LLDP and label it as slow port.
      *
      * @param port the port
      */
-    public void addPort(Port port) {
+    void addPort(Port port) {
         boolean newPort = ports.add(port.number().toLong());
         boolean isMaster = context.mastershipService().isLocalMaster(device.id());
         if (newPort && isMaster) {
-            log.debug("Sending init probe to port {}@{}", port.number().toLong(), device.id());
+            log.debug("Sending initial probe to port {}@{}", port.number().toLong(), device.id());
             sendProbes(port.number().toLong());
         }
     }
 
     /**
-     * Method called by remote port to acknowledge receipt of LLDP sent by
-     * this port. If slow port, updates label to fast. If fast port, decrements
-     * number of unacknowledged probes.
-     *
-     * @param key link key
-     */
-    private void ackProbe(LinkKey key) {
-        long portNumber = key.src().port().toLong();
-        if (ports.contains(portNumber)) {
-            linkTimes.put(key, System.currentTimeMillis());
-        } else {
-            log.debug("Got ackProbe for non-existing port: {}", portNumber);
-        }
-    }
-
-
-    /**
-     * Handles an incoming LLDP packet. Creates link in topology and sends ACK
-     * to port where LLDP originated.
+     * Handles an incoming LLDP packet. Creates link in topology and adds the
+     * link for staleness tracking.
      *
      * @param packetContext packet context
      * @return true if handled
      */
-    public boolean handleLLDP(PacketContext packetContext) {
+    boolean handleLLDP(PacketContext packetContext) {
         Ethernet eth = packetContext.inPacket().parsed();
         if (eth == null) {
             return false;
@@ -165,14 +159,13 @@
             ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort);
             ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort);
 
-            ackProbe(LinkKey.linkKey(src, dst));
-
             LinkDescription ld = eth.getEtherType() == Ethernet.TYPE_LLDP ?
                     new DefaultLinkDescription(src, dst, Type.DIRECT) :
                     new DefaultLinkDescription(src, dst, Type.INDIRECT);
 
             try {
                 context.providerService().linkDetected(ld);
+                context.touchLink(LinkKey.linkKey(src, dst));
             } catch (IllegalStateException e) {
                 return true;
             }
@@ -195,52 +188,16 @@
             return;
         }
 
-        if (!context.mastershipService().isLocalMaster(device.id())) {
-            if (!isStopped()) {
-                timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
-            }
-            return;
+        if (context.mastershipService().isLocalMaster(device.id())) {
+            log.trace("Sending probes from {}", device.id());
+            ports.forEach(this::sendProbes);
         }
 
-        // Prune stale links
-        linkTimes.entrySet().stream()
-                .filter(e -> isStale(e.getKey(), e.getValue()))
-                .map(Map.Entry::getKey).collect(Collectors.toSet())
-                .forEach(this::pruneLink);
-
-        // Probe ports
-        log.trace("Sending probes from {}", device.id());
-        ports.forEach(this::sendProbes);
-
         if (!isStopped()) {
             timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
         }
     }
 
-    private void pruneLink(LinkKey key) {
-        linkTimes.remove(key);
-        LinkDescription desc = new DefaultLinkDescription(key.src(), key.dst(), Type.DIRECT);
-        context.providerService().linkVanished(desc);
-    }
-
-    private boolean isStale(LinkKey key, long lastSeen) {
-        return lastSeen < (System.currentTimeMillis() - context.staleLinkAge());
-    }
-
-    public synchronized void stop() {
-        isStopped = true;
-        timeout.cancel();
-    }
-
-    public synchronized void start() {
-        if (isStopped) {
-            isStopped = false;
-            timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
-        } else {
-            log.warn("LinkDiscovery started multiple times?");
-        }
-    }
-
     /**
      * Creates packet_out LLDP for specified output port.
      *
@@ -285,11 +242,8 @@
         }
     }
 
-    public synchronized boolean isStopped() {
-        return isStopped || timeout.isCancelled();
-    }
-
     boolean containsPort(long portNumber) {
         return ports.contains(portNumber);
     }
+
 }