ONOS-3206 refactoring LLDP link provider.

- refactoring to unify LinkDiscovery helper instantiation
- Support suppression rule update at runtime

Change-Id: I2a6db6e82fcb90ee5635f0ac09564efd55276ebf
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
index a840f85..9844203 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
@@ -27,6 +27,7 @@
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.Ethernet;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipEvent;
@@ -37,6 +38,7 @@
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.LinkKey;
 import org.onosproject.net.Port;
+import org.onosproject.net.config.NetworkConfigRegistry;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
@@ -60,6 +62,7 @@
 import java.util.Dictionary;
 import java.util.EnumSet;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
@@ -86,6 +89,10 @@
             "Settings: enabled={}, useBDDP={}, probeRate={}, " +
                     "staleLinkAge={}, lldpSuppression={}";
 
+    // When a Device/Port has this annotation, do not send out LLDP/BDDP
+    public static final String NO_LLDP = "no-lldp";
+
+
     private final Logger log = getLogger(getClass());
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -109,6 +116,12 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected ComponentConfigService cfgService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected NetworkConfigRegistry cfgRegistry;
+
     private LinkProviderService providerService;
 
     private ScheduledExecutorService executor;
@@ -207,7 +220,7 @@
             newLldpSuppression = isNullOrEmpty(s) ? DEFAULT_LLDP_SUPPRESSION_CONFIG : s;
 
         } catch (NumberFormatException e) {
-            log.warn(e.getMessage());
+            log.warn("Component configuration had invalid values", e);
             newEnabled = enabled;
             newUseBddp = useBDDP;
             newProbeRate = probeRate;
@@ -227,6 +240,14 @@
             enable();
         } else if (wasEnabled && !enabled) {
             disable();
+        } else {
+            // reflect changes in suppression rules to discovery helpers
+            // FIXME: After migrating to Network Configuration Subsystem,
+            //        it should be possible to update only changed subset
+            if (enabled) {
+                // update all discovery helper state
+                loadDevices();
+            }
         }
 
         log.info(FORMAT, enabled, useBDDP, probeRate, staleLinkAge, lldpSuppression);
@@ -277,32 +298,97 @@
      * Loads available devices and registers their ports to be probed.
      */
     private void loadDevices() {
-        for (Device device : deviceService.getAvailableDevices()) {
-            if (rules.isSuppressed(device)) {
-                log.debug("LinkDiscovery from {} disabled by configuration", device.id());
-                continue;
-            }
-            LinkDiscovery ld = new LinkDiscovery(device, context);
-            discoverers.put(device.id(), ld);
-            addPorts(ld, device.id());
+        deviceService.getAvailableDevices()
+                .forEach(d -> updateDevice(d)
+                               .ifPresent(ld -> updatePorts(ld, d.id())));
+    }
+
+    /**
+     * Updates discovery helper for specified device.
+     *
+     * Adds and starts a discovery helper for specified device if enabled,
+     * calls {@link #removeDevice(DeviceId)} otherwise.
+     *
+     * @param device device to add
+     * @return discovery helper if discovery is enabled for the device
+     */
+    private Optional<LinkDiscovery> updateDevice(Device device) {
+        if (rules.isSuppressed(device)) {
+            log.trace("LinkDiscovery from {} disabled by configuration", device.id());
+            removeDevice(device.id());
+            return Optional.empty();
+        }
+        LinkDiscovery ld = discoverers.computeIfAbsent(device.id(),
+                                     did -> new LinkDiscovery(device, context));
+        if (ld.isStopped()) {
+            ld.start();
+        }
+        return Optional.of(ld);
+    }
+
+    /**
+     * Removes after stopping discovery helper for specified device.
+     * @param deviceId device to remove
+     */
+    private void removeDevice(final DeviceId deviceId) {
+        discoverers.computeIfPresent(deviceId, (did, ld) -> {
+            ld.stop();
+            providerService.linksVanished(deviceId);
+            return null;
+        });
+
+    }
+
+    /**
+     * Updates ports of the specified device to the specified discovery helper.
+     */
+    private void updatePorts(LinkDiscovery discoverer, DeviceId deviceId) {
+        deviceService.getPorts(deviceId).forEach(p -> updatePort(discoverer, p));
+    }
+
+    /**
+     * Updates discovery helper state of the specified port.
+     *
+     * Adds a port to the discovery helper if up and discovery is enabled,
+     * or calls {@link #removePort(Port)} otherwise.
+     */
+    private void updatePort(LinkDiscovery discoverer, Port port) {
+        if (rules.isSuppressed(port)) {
+            log.trace("LinkDiscovery from {} disabled by configuration", port);
+            removePort(port);
+            return;
+        }
+
+        // check if enabled and turn off discovery?
+        if (!port.isEnabled()) {
+            removePort(port);
+            return;
+        }
+
+        if (!port.number().isLogical()) {
+            discoverer.addPort(port);
         }
     }
 
     /**
-     * Adds ports of the specified device to the specified discovery helper.
+     * Removes a port from the specified discovery helper.
+     * @param port the port
      */
-    private void addPorts(LinkDiscovery discoverer, DeviceId deviceId) {
-        for (Port p : deviceService.getPorts(deviceId)) {
-            if (rules.isSuppressed(p)) {
-                continue;
+    private void removePort(Port port) {
+        if (port.element() instanceof Device) {
+            Device d = (Device) port.element();
+            LinkDiscovery ld = discoverers.get(d.id());
+            if (ld != null) {
+                ld.removePort(port.number());
             }
-            if (!p.number().isLogical()) {
-                discoverer.addPort(p);
-            }
+
+            ConnectPoint point = new ConnectPoint(d.id(), port.number());
+            providerService.linksVanished(point);
+        } else {
+            log.warn("Attempted to remove non-Device port", port);
         }
     }
 
-
     /**
      * Loads LLDP suppression rules.
      */
@@ -317,7 +403,7 @@
             // default rule to suppress ROADM to maintain compatibility
             rules = new SuppressionRules(ImmutableSet.of(),
                                          EnumSet.of(Device.Type.ROADM),
-                                         ImmutableMap.of());
+                                         ImmutableMap.of(NO_LLDP, SuppressionRules.ANY_VALUE));
         }
 
         // should refresh discoverers when we need dynamic reconfiguration
@@ -367,10 +453,9 @@
                 log.debug("Device {} doesn't exist, or isn't there yet", deviceId);
                 return;
             }
-            if (rules.isSuppressed(device)) {
-                return;
+            if (clusterService.getLocalNode().id().equals(event.roleInfo().master())) {
+                updateDevice(device).ifPresent(ld -> updatePorts(ld, device.id()));
             }
-            discoverers.computeIfAbsent(deviceId, k -> new LinkDiscovery(device, context));
         }
 
     }
@@ -381,7 +466,6 @@
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            LinkDiscovery ld;
             Device device = event.subject();
             Port port = event.port();
             if (device == null) {
@@ -393,73 +477,33 @@
             switch (event.type()) {
                 case DEVICE_ADDED:
                 case DEVICE_UPDATED:
-                    synchronized (discoverers) {
-                        ld = discoverers.get(deviceId);
-                        if (ld == null) {
-                            if (rules != null && rules.isSuppressed(device)) {
-                                log.debug("LinkDiscovery from {} disabled by configuration", device.id());
-                                return;
-                            }
-                            log.debug("Device added ({}) {}", event.type(), deviceId);
-                            discoverers.put(deviceId, new LinkDiscovery(device, context));
-                        } else {
-                            if (ld.isStopped()) {
-                                log.debug("Device restarted ({}) {}", event.type(), deviceId);
-                                ld.start();
-                            }
-                        }
-                    }
+                    updateDevice(device).ifPresent(ld -> updatePorts(ld, deviceId));
                     break;
                 case PORT_ADDED:
                 case PORT_UPDATED:
                     if (port.isEnabled()) {
-                        ld = discoverers.get(deviceId);
-                        if (ld == null) {
-                            return;
-                        }
-                        if (rules.isSuppressed(port)) {
-                            log.debug("LinkDiscovery from {}@{} disabled by configuration",
-                                      port.number(), device.id());
-                            return;
-                        }
-                        if (!port.number().isLogical()) {
-                            log.debug("Port added {}", port);
-                            ld.addPort(port);
-                        }
+                        updateDevice(device).ifPresent(ld -> updatePort(ld, port));
                     } else {
                         log.debug("Port down {}", port);
-                        ConnectPoint point = new ConnectPoint(deviceId, port.number());
-                        providerService.linksVanished(point);
+                        removePort(port);
                     }
                     break;
                 case PORT_REMOVED:
                     log.debug("Port removed {}", port);
-                    ConnectPoint point = new ConnectPoint(deviceId, port.number());
-                    providerService.linksVanished(point);
-
+                    removePort(port);
                     break;
                 case DEVICE_REMOVED:
                 case DEVICE_SUSPENDED:
                     log.debug("Device removed {}", deviceId);
-                    ld = discoverers.get(deviceId);
-                    if (ld == null) {
-                        return;
-                    }
-                    ld.stop();
-                    providerService.linksVanished(deviceId);
+                    removeDevice(deviceId);
                     break;
                 case DEVICE_AVAILABILITY_CHANGED:
-                    ld = discoverers.get(deviceId);
-                    if (ld == null) {
-                        return;
-                    }
                     if (deviceService.isAvailable(deviceId)) {
                         log.debug("Device up {}", deviceId);
-                        ld.start();
+                        updateDevice(device);
                     } else {
-                        providerService.linksVanished(deviceId);
                         log.debug("Device down {}", deviceId);
-                        ld.stop();
+                        removeDevice(deviceId);
                     }
                     break;
                 case PORT_STATS_UPDATED:
@@ -508,17 +552,7 @@
             }
             // check what deviceService sees, to see if we are missing anything
             try {
-                for (Device dev : deviceService.getDevices()) {
-                    if (rules.isSuppressed(dev)) {
-                        continue;
-                    }
-                    DeviceId did = dev.id();
-                    synchronized (discoverers) {
-                        LinkDiscovery ld = discoverers
-                                .computeIfAbsent(did, k -> new LinkDiscovery(dev, context));
-                        addPorts(ld, did);
-                    }
-                }
+                loadDevices();
             } catch (Exception e) {
                 // Catch all exceptions to avoid task being suppressed
                 log.error("Exception thrown during synchronization process", e);