ONOS-2846, ONOS-2812 Refactored link discovery pruning to be centralized rather than being with each link discovery helper.

This will make it behave properly in a distributed context.

Change-Id: I9b9788336468c41d1cf506e388306ad9136d5853
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/DiscoveryContext.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/DiscoveryContext.java
index 4ea3b1b..0cd1924 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/DiscoveryContext.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/DiscoveryContext.java
@@ -16,13 +16,14 @@
 package org.onosproject.provider.lldp.impl;
 
 import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.LinkKey;
 import org.onosproject.net.link.LinkProviderService;
 import org.onosproject.net.packet.PacketService;
 
 /**
  * Shared context for use by link discovery.
  */
-public interface DiscoveryContext {
+interface DiscoveryContext {
 
     /**
      * Returns the shared mastership service reference.
@@ -53,16 +54,16 @@
     long probeRate();
 
     /**
-     * Returns the max stale link age in millis.
-     *
-     * @return stale link age
-     */
-    long staleLinkAge();
-
-    /**
      * Indicates whether to emit BDDP.
      *
      * @return true to emit BDDP
      */
     boolean useBDDP();
+
+    /**
+     * Touches the link identified by the given key to indicate that it's active.
+     *
+     * @param key link key
+     */
+    void touchLink(LinkKey key);
 }
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
index 6613a7e..0519b3f 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
@@ -17,6 +17,7 @@
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -34,15 +35,18 @@
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.LinkKey;
 import org.onosproject.net.Port;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.link.DefaultLinkDescription;
 import org.onosproject.net.link.LinkProvider;
 import org.onosproject.net.link.LinkProviderRegistry;
 import org.onosproject.net.link.LinkProviderService;
+import org.onosproject.net.link.LinkService;
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketPriority;
 import org.onosproject.net.packet.PacketProcessor;
@@ -65,6 +69,7 @@
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.Link.Type.DIRECT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -91,6 +96,9 @@
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LinkService linkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PacketService packetService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -103,8 +111,9 @@
 
     private ScheduledExecutorService executor;
 
-    private static final long INIT_DELAY = 5;
-    private static final long DELAY = 5;
+    // TODO: Add sanity checking for the configurable params based on the delays
+    private static final long DEVICE_SYNC_DELAY = 5;
+    private static final long LINK_PRUNER_DELAY = 3;
 
     private static final String PROP_ENABLED = "enabled";
     @Property(name = PROP_ENABLED, boolValue = true,
@@ -135,13 +144,18 @@
             label = "Path to LLDP suppression configuration file")
     private String lldpSuppression = DEFAULT_LLDP_SUPPRESSION_CONFIG;
 
-
     private final DiscoveryContext context = new InternalDiscoveryContext();
-    private final InternalLinkProvider listener = new InternalLinkProvider();
     private final InternalRoleListener roleListener = new InternalRoleListener();
+    private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+    private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
 
+    // Device link discovery helpers.
     protected final Map<DeviceId, LinkDiscovery> discoverers = new ConcurrentHashMap<>();
 
+    // Most recent time a tracked link was seen; links are tracked if their
+    // destination connection point is mastered by this controller instance.
+    private final Map<LinkKey, Long> linkTimes = Maps.newConcurrentMap();
+
     private SuppressionRules rules;
     private ApplicationId appId;
 
@@ -216,28 +230,37 @@
         log.info(FORMAT, enabled, useBDDP, probeRate, staleLinkAge, lldpSuppression);
     }
 
+    /**
+     * Enables link discovery processing.
+     */
     private void enable() {
         providerService = providerRegistry.register(this);
-        deviceService.addListener(listener);
-        packetService.addProcessor(listener, PacketProcessor.advisor(0));
         masterService.addListener(roleListener);
+        deviceService.addListener(deviceListener);
+        packetService.addProcessor(packetProcessor, PacketProcessor.advisor(0));
 
-        processDevices();
+        loadDevices();
 
-        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "sync-%d"));
-        executor.scheduleAtFixedRate(new SyncDeviceInfoTask(), INIT_DELAY, DELAY, SECONDS);
+        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/link", "discovery-%d"));
+        executor.scheduleAtFixedRate(new SyncDeviceInfoTask(),
+                                     DEVICE_SYNC_DELAY, DEVICE_SYNC_DELAY, SECONDS);
+        executor.scheduleAtFixedRate(new LinkPrunerTask(),
+                                     LINK_PRUNER_DELAY, LINK_PRUNER_DELAY, SECONDS);
 
         loadSuppressionRules();
         requestIntercepts();
     }
 
+    /**
+     * Disables link discovery processing.
+     */
     private void disable() {
         withdrawIntercepts();
 
         providerRegistry.unregister(this);
-        deviceService.removeListener(listener);
-        packetService.removeProcessor(listener);
         masterService.removeListener(roleListener);
+        deviceService.removeListener(deviceListener);
+        packetService.removeProcessor(packetProcessor);
 
         if (executor != null) {
             executor.shutdownNow();
@@ -248,7 +271,10 @@
         providerService = null;
     }
 
-    private void processDevices() {
+    /**
+     * Loads available devices and registers their ports to be probed.
+     */
+    private void loadDevices() {
         for (Device device : deviceService.getAvailableDevices()) {
             if (rules.isSuppressed(device)) {
                 log.debug("LinkDiscovery from {} disabled by configuration", device.id());
@@ -260,6 +286,9 @@
         }
     }
 
+    /**
+     * Adds ports of the specified device to the specified discovery helper.
+     */
     private void addPorts(LinkDiscovery discoverer, DeviceId deviceId) {
         for (Port p : deviceService.getPorts(deviceId)) {
             if (rules.isSuppressed(p)) {
@@ -271,7 +300,12 @@
         }
     }
 
+
+    /**
+     * Loads LLDP suppression rules.
+     */
     private void loadSuppressionRules() {
+        // FIXME: convert to use network configuration
         SuppressionRulesStore store = new SuppressionRulesStore(lldpSuppression);
         try {
             log.info("Reading suppression rules from {}", lldpSuppression);
@@ -288,7 +322,7 @@
     }
 
     /**
-     * Request packet intercepts.
+     * Requests packet intercepts.
      */
     private void requestIntercepts() {
         TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
@@ -304,7 +338,7 @@
     }
 
     /**
-     * Withdraw packet intercepts.
+     * Withdraws packet intercepts.
      */
     private void withdrawIntercepts() {
         TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
@@ -314,6 +348,9 @@
         packetService.cancelPackets(selector.build(), PacketPriority.CONTROL, appId);
     }
 
+    /**
+     * Processes device mastership role changes.
+     */
     private class InternalRoleListener implements MastershipListener {
         @Override
         public void event(MastershipEvent event) {
@@ -336,7 +373,10 @@
 
     }
 
-    private class InternalLinkProvider implements PacketProcessor, DeviceListener {
+    /**
+     * Processes device events.
+     */
+    private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
             LinkDiscovery ld;
@@ -426,7 +466,12 @@
                     log.debug("Unknown event {}", event);
             }
         }
+    }
 
+    /**
+     * Processes incoming packets.
+     */
+    private class InternalPacketProcessor implements PacketProcessor {
         @Override
         public void process(PacketContext context) {
             if (context == null) {
@@ -443,6 +488,9 @@
         }
     }
 
+    /**
+     * Auxiliary task to keep device ports up to date.
+     */
     private final class SyncDeviceInfoTask implements Runnable {
         @Override
         public void run() {
@@ -464,12 +512,54 @@
                     }
                 }
             } catch (Exception e) {
-                // catch all Exception to avoid Scheduled task being suppressed.
+                // Catch all exceptions to avoid task being suppressed
                 log.error("Exception thrown during synchronization process", e);
             }
         }
     }
 
+    /**
+     * Auxiliary task for pruning stale links.
+     */
+    private class LinkPrunerTask implements Runnable {
+        @Override
+        public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                log.info("Interrupted, quitting");
+                return;
+            }
+
+            try {
+                // TODO: There is still a slight possibility of mastership
+                // change occurring right with link going stale. This will
+                // result in the stale link not being pruned.
+                Maps.filterEntries(linkTimes, e -> {
+                    if (!masterService.isLocalMaster(e.getKey().dst().deviceId())) {
+                        return true;
+                    }
+                    if (isStale(e.getValue())) {
+                        providerService.linkVanished(new DefaultLinkDescription(e.getKey().src(),
+                                                                                e.getKey().dst(),
+                                                                                DIRECT));
+                        return true;
+                    }
+                    return false;
+                }).clear();
+
+            } catch (Exception e) {
+                // Catch all exceptions to avoid task being suppressed
+                log.error("Exception thrown during link pruning process", e);
+            }
+        }
+
+        private boolean isStale(long lastSeen) {
+            return lastSeen < System.currentTimeMillis() - staleLinkAge;
+        }
+    }
+
+    /**
+     * Provides processing context for the device link discovery helpers.
+     */
     private class InternalDiscoveryContext implements DiscoveryContext {
         @Override
         public MastershipService mastershipService() {
@@ -492,13 +582,14 @@
         }
 
         @Override
-        public long staleLinkAge() {
-            return staleLinkAge;
-        }
-
-        @Override
         public boolean useBDDP() {
             return useBDDP;
         }
+
+        @Override
+        public void touchLink(LinkKey key) {
+            linkTimes.put(key, System.currentTimeMillis());
+        }
     }
+
 }
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
index f7fe8ab..5502624 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
@@ -15,7 +15,6 @@
  */
 package org.onosproject.provider.lldp.impl;
 
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.jboss.netty.util.Timeout;
 import org.jboss.netty.util.TimerTask;
@@ -37,9 +36,7 @@
 import org.slf4j.Logger;
 
 import java.nio.ByteBuffer;
-import java.util.Map;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.onosproject.net.PortNumber.portNumber;
@@ -53,7 +50,7 @@
  * LLDP, send an LLDP for a single slow port. Based on FlowVisor topology
  * discovery implementation.
  */
-public class LinkDiscovery implements TimerTask {
+class LinkDiscovery implements TimerTask {
 
     private final Logger log = getLogger(getClass());
 
@@ -72,9 +69,6 @@
     // Set of ports to be probed
     private final Set<Long> ports = Sets.newConcurrentHashSet();
 
-    // Most recent time a link was seen
-    private final Map<LinkKey, Long> linkTimes = Maps.newConcurrentMap();
-
     /**
      * Instantiates discovery manager for the given physical switch. Creates a
      * generic LLDP packet that will be customized for the port it is sent out on.
@@ -83,7 +77,7 @@
      * @param device  the physical switch
      * @param context discovery context
      */
-    public LinkDiscovery(Device device, DiscoveryContext context) {
+    LinkDiscovery(Device device, DiscoveryContext context) {
         this.device = device;
         this.context = context;
 
@@ -102,7 +96,6 @@
         bddpEth.setEtherType(Ethernet.TYPE_BSN);
         bddpEth.setDestinationMACAddress(ONOSLLDP.BDDP_MULTICAST);
         bddpEth.setPad(true);
-        log.info("Using BDDP to discover network");
 
         isStopped = true;
         start();
@@ -110,46 +103,47 @@
 
     }
 
+    synchronized void stop() {
+        isStopped = true;
+        timeout.cancel();
+    }
+
+    synchronized void start() {
+        if (isStopped) {
+            isStopped = false;
+            timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
+        } else {
+            log.warn("LinkDiscovery started multiple times?");
+        }
+    }
+
+    synchronized boolean isStopped() {
+        return isStopped || timeout.isCancelled();
+    }
+
     /**
      * Add physical port port to discovery process.
      * Send out initial LLDP and label it as slow port.
      *
      * @param port the port
      */
-    public void addPort(Port port) {
+    void addPort(Port port) {
         boolean newPort = ports.add(port.number().toLong());
         boolean isMaster = context.mastershipService().isLocalMaster(device.id());
         if (newPort && isMaster) {
-            log.debug("Sending init probe to port {}@{}", port.number().toLong(), device.id());
+            log.debug("Sending initial probe to port {}@{}", port.number().toLong(), device.id());
             sendProbes(port.number().toLong());
         }
     }
 
     /**
-     * Method called by remote port to acknowledge receipt of LLDP sent by
-     * this port. If slow port, updates label to fast. If fast port, decrements
-     * number of unacknowledged probes.
-     *
-     * @param key link key
-     */
-    private void ackProbe(LinkKey key) {
-        long portNumber = key.src().port().toLong();
-        if (ports.contains(portNumber)) {
-            linkTimes.put(key, System.currentTimeMillis());
-        } else {
-            log.debug("Got ackProbe for non-existing port: {}", portNumber);
-        }
-    }
-
-
-    /**
-     * Handles an incoming LLDP packet. Creates link in topology and sends ACK
-     * to port where LLDP originated.
+     * Handles an incoming LLDP packet. Creates link in topology and adds the
+     * link for staleness tracking.
      *
      * @param packetContext packet context
      * @return true if handled
      */
-    public boolean handleLLDP(PacketContext packetContext) {
+    boolean handleLLDP(PacketContext packetContext) {
         Ethernet eth = packetContext.inPacket().parsed();
         if (eth == null) {
             return false;
@@ -165,14 +159,13 @@
             ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort);
             ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort);
 
-            ackProbe(LinkKey.linkKey(src, dst));
-
             LinkDescription ld = eth.getEtherType() == Ethernet.TYPE_LLDP ?
                     new DefaultLinkDescription(src, dst, Type.DIRECT) :
                     new DefaultLinkDescription(src, dst, Type.INDIRECT);
 
             try {
                 context.providerService().linkDetected(ld);
+                context.touchLink(LinkKey.linkKey(src, dst));
             } catch (IllegalStateException e) {
                 return true;
             }
@@ -195,52 +188,16 @@
             return;
         }
 
-        if (!context.mastershipService().isLocalMaster(device.id())) {
-            if (!isStopped()) {
-                timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
-            }
-            return;
+        if (context.mastershipService().isLocalMaster(device.id())) {
+            log.trace("Sending probes from {}", device.id());
+            ports.forEach(this::sendProbes);
         }
 
-        // Prune stale links
-        linkTimes.entrySet().stream()
-                .filter(e -> isStale(e.getKey(), e.getValue()))
-                .map(Map.Entry::getKey).collect(Collectors.toSet())
-                .forEach(this::pruneLink);
-
-        // Probe ports
-        log.trace("Sending probes from {}", device.id());
-        ports.forEach(this::sendProbes);
-
         if (!isStopped()) {
             timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
         }
     }
 
-    private void pruneLink(LinkKey key) {
-        linkTimes.remove(key);
-        LinkDescription desc = new DefaultLinkDescription(key.src(), key.dst(), Type.DIRECT);
-        context.providerService().linkVanished(desc);
-    }
-
-    private boolean isStale(LinkKey key, long lastSeen) {
-        return lastSeen < (System.currentTimeMillis() - context.staleLinkAge());
-    }
-
-    public synchronized void stop() {
-        isStopped = true;
-        timeout.cancel();
-    }
-
-    public synchronized void start() {
-        if (isStopped) {
-            isStopped = false;
-            timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
-        } else {
-            log.warn("LinkDiscovery started multiple times?");
-        }
-    }
-
     /**
      * Creates packet_out LLDP for specified output port.
      *
@@ -285,11 +242,8 @@
         }
     }
 
-    public synchronized boolean isStopped() {
-        return isStopped || timeout.isCancelled();
-    }
-
     boolean containsPort(long portNumber) {
         return ports.contains(portNumber);
     }
+
 }
diff --git a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
index bba031f..b4b7b7b 100644
--- a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
+++ b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
@@ -48,6 +48,7 @@
 import org.onosproject.net.link.LinkProvider;
 import org.onosproject.net.link.LinkProviderRegistry;
 import org.onosproject.net.link.LinkProviderService;
+import org.onosproject.net.link.LinkServiceAdapter;
 import org.onosproject.net.packet.DefaultInboundPacket;
 import org.onosproject.net.packet.InboundPacket;
 import org.onosproject.net.packet.OutboundPacket;
@@ -79,7 +80,8 @@
     private static Port pd4;
 
     private final LLDPLinkProvider provider = new LLDPLinkProvider();
-    private final TestLinkRegistry linkService = new TestLinkRegistry();
+    private final TestLinkRegistry linkRegistry = new TestLinkRegistry();
+    private final TestLinkService linkService = new TestLinkService();
     private final TestPacketService packetService = new TestPacketService();
     private final TestDeviceService deviceService = new TestDeviceService();
     private final TestMasterShipService masterService = new TestMasterShipService();
@@ -104,8 +106,9 @@
         provider.coreService = coreService;
 
         provider.deviceService = deviceService;
+        provider.linkService = linkService;
         provider.packetService = packetService;
-        provider.providerRegistry = linkService;
+        provider.providerRegistry = linkRegistry;
         provider.masterService = masterService;
 
         provider.activate(null);
@@ -498,4 +501,6 @@
     }
 
 
+    private class TestLinkService extends LinkServiceAdapter {
+    }
 }
diff --git a/tools/test/cells/office b/tools/test/cells/office
index 2cd933e..2e282c6 100644
--- a/tools/test/cells/office
+++ b/tools/test/cells/office
@@ -3,5 +3,6 @@
 export ONOS_NIC="10.1.10.*"
 export OC1="10.1.10.223"
 
+unset ONOS_USE_SSH
 export ONOS_APPS="drivers,openflow,fwd,proxyarp,mobility"