ONOS-2846, ONOS-2812 Refactored link discovery pruning to be centralized rather than being with each link discovery helper.
This will make it behave properly in a distributed context.
Change-Id: I9b9788336468c41d1cf506e388306ad9136d5853
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
index 6613a7e..0519b3f 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
@@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -34,15 +35,18 @@
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.LinkKey;
import org.onosproject.net.Port;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.link.DefaultLinkDescription;
import org.onosproject.net.link.LinkProvider;
import org.onosproject.net.link.LinkProviderRegistry;
import org.onosproject.net.link.LinkProviderService;
+import org.onosproject.net.link.LinkService;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.packet.PacketProcessor;
@@ -65,6 +69,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.Link.Type.DIRECT;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -91,6 +96,9 @@
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LinkService linkService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -103,8 +111,9 @@
private ScheduledExecutorService executor;
- private static final long INIT_DELAY = 5;
- private static final long DELAY = 5;
+ // TODO: Add sanity checking for the configurable params based on the delays
+ private static final long DEVICE_SYNC_DELAY = 5;
+ private static final long LINK_PRUNER_DELAY = 3;
private static final String PROP_ENABLED = "enabled";
@Property(name = PROP_ENABLED, boolValue = true,
@@ -135,13 +144,18 @@
label = "Path to LLDP suppression configuration file")
private String lldpSuppression = DEFAULT_LLDP_SUPPRESSION_CONFIG;
-
private final DiscoveryContext context = new InternalDiscoveryContext();
- private final InternalLinkProvider listener = new InternalLinkProvider();
private final InternalRoleListener roleListener = new InternalRoleListener();
+ private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+ private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
+ // Device link discovery helpers.
protected final Map<DeviceId, LinkDiscovery> discoverers = new ConcurrentHashMap<>();
+ // Most recent time a tracked link was seen; links are tracked if their
+ // destination connection point is mastered by this controller instance.
+ private final Map<LinkKey, Long> linkTimes = Maps.newConcurrentMap();
+
private SuppressionRules rules;
private ApplicationId appId;
@@ -216,28 +230,37 @@
log.info(FORMAT, enabled, useBDDP, probeRate, staleLinkAge, lldpSuppression);
}
+ /**
+ * Enables link discovery processing.
+ */
private void enable() {
providerService = providerRegistry.register(this);
- deviceService.addListener(listener);
- packetService.addProcessor(listener, PacketProcessor.advisor(0));
masterService.addListener(roleListener);
+ deviceService.addListener(deviceListener);
+ packetService.addProcessor(packetProcessor, PacketProcessor.advisor(0));
- processDevices();
+ loadDevices();
- executor = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "sync-%d"));
- executor.scheduleAtFixedRate(new SyncDeviceInfoTask(), INIT_DELAY, DELAY, SECONDS);
+ executor = newSingleThreadScheduledExecutor(groupedThreads("onos/link", "discovery-%d"));
+ executor.scheduleAtFixedRate(new SyncDeviceInfoTask(),
+ DEVICE_SYNC_DELAY, DEVICE_SYNC_DELAY, SECONDS);
+ executor.scheduleAtFixedRate(new LinkPrunerTask(),
+ LINK_PRUNER_DELAY, LINK_PRUNER_DELAY, SECONDS);
loadSuppressionRules();
requestIntercepts();
}
+ /**
+ * Disables link discovery processing.
+ */
private void disable() {
withdrawIntercepts();
providerRegistry.unregister(this);
- deviceService.removeListener(listener);
- packetService.removeProcessor(listener);
masterService.removeListener(roleListener);
+ deviceService.removeListener(deviceListener);
+ packetService.removeProcessor(packetProcessor);
if (executor != null) {
executor.shutdownNow();
@@ -248,7 +271,10 @@
providerService = null;
}
- private void processDevices() {
+ /**
+ * Loads available devices and registers their ports to be probed.
+ */
+ private void loadDevices() {
for (Device device : deviceService.getAvailableDevices()) {
if (rules.isSuppressed(device)) {
log.debug("LinkDiscovery from {} disabled by configuration", device.id());
@@ -260,6 +286,9 @@
}
}
+ /**
+ * Adds ports of the specified device to the specified discovery helper.
+ */
private void addPorts(LinkDiscovery discoverer, DeviceId deviceId) {
for (Port p : deviceService.getPorts(deviceId)) {
if (rules.isSuppressed(p)) {
@@ -271,7 +300,12 @@
}
}
+
+ /**
+ * Loads LLDP suppression rules.
+ */
private void loadSuppressionRules() {
+ // FIXME: convert to use network configuration
SuppressionRulesStore store = new SuppressionRulesStore(lldpSuppression);
try {
log.info("Reading suppression rules from {}", lldpSuppression);
@@ -288,7 +322,7 @@
}
/**
- * Request packet intercepts.
+ * Requests packet intercepts.
*/
private void requestIntercepts() {
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
@@ -304,7 +338,7 @@
}
/**
- * Withdraw packet intercepts.
+ * Withdraws packet intercepts.
*/
private void withdrawIntercepts() {
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
@@ -314,6 +348,9 @@
packetService.cancelPackets(selector.build(), PacketPriority.CONTROL, appId);
}
+ /**
+ * Processes device mastership role changes.
+ */
private class InternalRoleListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
@@ -336,7 +373,10 @@
}
- private class InternalLinkProvider implements PacketProcessor, DeviceListener {
+ /**
+ * Processes device events.
+ */
+ private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
LinkDiscovery ld;
@@ -426,7 +466,12 @@
log.debug("Unknown event {}", event);
}
}
+ }
+ /**
+ * Processes incoming packets.
+ */
+ private class InternalPacketProcessor implements PacketProcessor {
@Override
public void process(PacketContext context) {
if (context == null) {
@@ -443,6 +488,9 @@
}
}
+ /**
+ * Auxiliary task to keep device ports up to date.
+ */
private final class SyncDeviceInfoTask implements Runnable {
@Override
public void run() {
@@ -464,12 +512,54 @@
}
}
} catch (Exception e) {
- // catch all Exception to avoid Scheduled task being suppressed.
+ // Catch all exceptions to avoid task being suppressed
log.error("Exception thrown during synchronization process", e);
}
}
}
+ /**
+ * Auxiliary task for pruning stale links.
+ */
+ private class LinkPrunerTask implements Runnable {
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ // TODO: There is still a slight possibility of mastership
+ // change occurring right with link going stale. This will
+ // result in the stale link not being pruned.
+ Maps.filterEntries(linkTimes, e -> {
+ if (!masterService.isLocalMaster(e.getKey().dst().deviceId())) {
+ return true;
+ }
+ if (isStale(e.getValue())) {
+ providerService.linkVanished(new DefaultLinkDescription(e.getKey().src(),
+ e.getKey().dst(),
+ DIRECT));
+ return true;
+ }
+ return false;
+ }).clear();
+
+ } catch (Exception e) {
+ // Catch all exceptions to avoid task being suppressed
+ log.error("Exception thrown during link pruning process", e);
+ }
+ }
+
+ private boolean isStale(long lastSeen) {
+ return lastSeen < System.currentTimeMillis() - staleLinkAge;
+ }
+ }
+
+ /**
+ * Provides processing context for the device link discovery helpers.
+ */
private class InternalDiscoveryContext implements DiscoveryContext {
@Override
public MastershipService mastershipService() {
@@ -492,13 +582,14 @@
}
@Override
- public long staleLinkAge() {
- return staleLinkAge;
- }
-
- @Override
public boolean useBDDP() {
return useBDDP;
}
+
+ @Override
+ public void touchLink(LinkKey key) {
+ linkTimes.put(key, System.currentTimeMillis());
+ }
}
+
}