ONOS-2846, ONOS-2812 Refactored link discovery pruning to be centralized rather than being with each link discovery helper.

This will make it behave properly in a distributed context.

Change-Id: I9b9788336468c41d1cf506e388306ad9136d5853
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
index 6613a7e..0519b3f 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
@@ -17,6 +17,7 @@
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
@@ -34,15 +35,18 @@
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.LinkKey;
 import org.onosproject.net.Port;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultTrafficSelector;
 import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.link.DefaultLinkDescription;
 import org.onosproject.net.link.LinkProvider;
 import org.onosproject.net.link.LinkProviderRegistry;
 import org.onosproject.net.link.LinkProviderService;
+import org.onosproject.net.link.LinkService;
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketPriority;
 import org.onosproject.net.packet.PacketProcessor;
@@ -65,6 +69,7 @@
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.Link.Type.DIRECT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -91,6 +96,9 @@
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected LinkService linkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected PacketService packetService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -103,8 +111,9 @@
 
     private ScheduledExecutorService executor;
 
-    private static final long INIT_DELAY = 5;
-    private static final long DELAY = 5;
+    // TODO: Add sanity checking for the configurable params based on the delays
+    private static final long DEVICE_SYNC_DELAY = 5;
+    private static final long LINK_PRUNER_DELAY = 3;
 
     private static final String PROP_ENABLED = "enabled";
     @Property(name = PROP_ENABLED, boolValue = true,
@@ -135,13 +144,18 @@
             label = "Path to LLDP suppression configuration file")
     private String lldpSuppression = DEFAULT_LLDP_SUPPRESSION_CONFIG;
 
-
     private final DiscoveryContext context = new InternalDiscoveryContext();
-    private final InternalLinkProvider listener = new InternalLinkProvider();
     private final InternalRoleListener roleListener = new InternalRoleListener();
+    private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+    private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
 
+    // Device link discovery helpers.
     protected final Map<DeviceId, LinkDiscovery> discoverers = new ConcurrentHashMap<>();
 
+    // Most recent time a tracked link was seen; links are tracked if their
+    // destination connection point is mastered by this controller instance.
+    private final Map<LinkKey, Long> linkTimes = Maps.newConcurrentMap();
+
     private SuppressionRules rules;
     private ApplicationId appId;
 
@@ -216,28 +230,37 @@
         log.info(FORMAT, enabled, useBDDP, probeRate, staleLinkAge, lldpSuppression);
     }
 
+    /**
+     * Enables link discovery processing.
+     */
     private void enable() {
         providerService = providerRegistry.register(this);
-        deviceService.addListener(listener);
-        packetService.addProcessor(listener, PacketProcessor.advisor(0));
         masterService.addListener(roleListener);
+        deviceService.addListener(deviceListener);
+        packetService.addProcessor(packetProcessor, PacketProcessor.advisor(0));
 
-        processDevices();
+        loadDevices();
 
-        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "sync-%d"));
-        executor.scheduleAtFixedRate(new SyncDeviceInfoTask(), INIT_DELAY, DELAY, SECONDS);
+        executor = newSingleThreadScheduledExecutor(groupedThreads("onos/link", "discovery-%d"));
+        executor.scheduleAtFixedRate(new SyncDeviceInfoTask(),
+                                     DEVICE_SYNC_DELAY, DEVICE_SYNC_DELAY, SECONDS);
+        executor.scheduleAtFixedRate(new LinkPrunerTask(),
+                                     LINK_PRUNER_DELAY, LINK_PRUNER_DELAY, SECONDS);
 
         loadSuppressionRules();
         requestIntercepts();
     }
 
+    /**
+     * Disables link discovery processing.
+     */
     private void disable() {
         withdrawIntercepts();
 
         providerRegistry.unregister(this);
-        deviceService.removeListener(listener);
-        packetService.removeProcessor(listener);
         masterService.removeListener(roleListener);
+        deviceService.removeListener(deviceListener);
+        packetService.removeProcessor(packetProcessor);
 
         if (executor != null) {
             executor.shutdownNow();
@@ -248,7 +271,10 @@
         providerService = null;
     }
 
-    private void processDevices() {
+    /**
+     * Loads available devices and registers their ports to be probed.
+     */
+    private void loadDevices() {
         for (Device device : deviceService.getAvailableDevices()) {
             if (rules.isSuppressed(device)) {
                 log.debug("LinkDiscovery from {} disabled by configuration", device.id());
@@ -260,6 +286,9 @@
         }
     }
 
+    /**
+     * Adds ports of the specified device to the specified discovery helper.
+     */
     private void addPorts(LinkDiscovery discoverer, DeviceId deviceId) {
         for (Port p : deviceService.getPorts(deviceId)) {
             if (rules.isSuppressed(p)) {
@@ -271,7 +300,12 @@
         }
     }
 
+
+    /**
+     * Loads LLDP suppression rules.
+     */
     private void loadSuppressionRules() {
+        // FIXME: convert to use network configuration
         SuppressionRulesStore store = new SuppressionRulesStore(lldpSuppression);
         try {
             log.info("Reading suppression rules from {}", lldpSuppression);
@@ -288,7 +322,7 @@
     }
 
     /**
-     * Request packet intercepts.
+     * Requests packet intercepts.
      */
     private void requestIntercepts() {
         TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
@@ -304,7 +338,7 @@
     }
 
     /**
-     * Withdraw packet intercepts.
+     * Withdraws packet intercepts.
      */
     private void withdrawIntercepts() {
         TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
@@ -314,6 +348,9 @@
         packetService.cancelPackets(selector.build(), PacketPriority.CONTROL, appId);
     }
 
+    /**
+     * Processes device mastership role changes.
+     */
     private class InternalRoleListener implements MastershipListener {
         @Override
         public void event(MastershipEvent event) {
@@ -336,7 +373,10 @@
 
     }
 
-    private class InternalLinkProvider implements PacketProcessor, DeviceListener {
+    /**
+     * Processes device events.
+     */
+    private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
             LinkDiscovery ld;
@@ -426,7 +466,12 @@
                     log.debug("Unknown event {}", event);
             }
         }
+    }
 
+    /**
+     * Processes incoming packets.
+     */
+    private class InternalPacketProcessor implements PacketProcessor {
         @Override
         public void process(PacketContext context) {
             if (context == null) {
@@ -443,6 +488,9 @@
         }
     }
 
+    /**
+     * Auxiliary task to keep device ports up to date.
+     */
     private final class SyncDeviceInfoTask implements Runnable {
         @Override
         public void run() {
@@ -464,12 +512,54 @@
                     }
                 }
             } catch (Exception e) {
-                // catch all Exception to avoid Scheduled task being suppressed.
+                // Catch all exceptions to avoid task being suppressed
                 log.error("Exception thrown during synchronization process", e);
             }
         }
     }
 
+    /**
+     * Auxiliary task for pruning stale links.
+     */
+    private class LinkPrunerTask implements Runnable {
+        @Override
+        public void run() {
+            if (Thread.currentThread().isInterrupted()) {
+                log.info("Interrupted, quitting");
+                return;
+            }
+
+            try {
+                // TODO: There is still a slight possibility of mastership
+                // change occurring right with link going stale. This will
+                // result in the stale link not being pruned.
+                Maps.filterEntries(linkTimes, e -> {
+                    if (!masterService.isLocalMaster(e.getKey().dst().deviceId())) {
+                        return true;
+                    }
+                    if (isStale(e.getValue())) {
+                        providerService.linkVanished(new DefaultLinkDescription(e.getKey().src(),
+                                                                                e.getKey().dst(),
+                                                                                DIRECT));
+                        return true;
+                    }
+                    return false;
+                }).clear();
+
+            } catch (Exception e) {
+                // Catch all exceptions to avoid task being suppressed
+                log.error("Exception thrown during link pruning process", e);
+            }
+        }
+
+        private boolean isStale(long lastSeen) {
+            return lastSeen < System.currentTimeMillis() - staleLinkAge;
+        }
+    }
+
+    /**
+     * Provides processing context for the device link discovery helpers.
+     */
     private class InternalDiscoveryContext implements DiscoveryContext {
         @Override
         public MastershipService mastershipService() {
@@ -492,13 +582,14 @@
         }
 
         @Override
-        public long staleLinkAge() {
-            return staleLinkAge;
-        }
-
-        @Override
         public boolean useBDDP() {
             return useBDDP;
         }
+
+        @Override
+        public void touchLink(LinkKey key) {
+            linkTimes.put(key, System.currentTimeMillis());
+        }
     }
+
 }