ONOS-2846, ONOS-2812 Refactored link discovery pruning to be centralized rather than being with each link discovery helper.
This will make it behave properly in a distributed context.
Change-Id: I9b9788336468c41d1cf506e388306ad9136d5853
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/DiscoveryContext.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/DiscoveryContext.java
index 4ea3b1b..0cd1924 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/DiscoveryContext.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/DiscoveryContext.java
@@ -16,13 +16,14 @@
package org.onosproject.provider.lldp.impl;
import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.LinkKey;
import org.onosproject.net.link.LinkProviderService;
import org.onosproject.net.packet.PacketService;
/**
* Shared context for use by link discovery.
*/
-public interface DiscoveryContext {
+interface DiscoveryContext {
/**
* Returns the shared mastership service reference.
@@ -53,16 +54,16 @@
long probeRate();
/**
- * Returns the max stale link age in millis.
- *
- * @return stale link age
- */
- long staleLinkAge();
-
- /**
* Indicates whether to emit BDDP.
*
* @return true to emit BDDP
*/
boolean useBDDP();
+
+ /**
+ * Touches the link identified by the given key to indicate that it's active.
+ *
+ * @param key link key
+ */
+ void touchLink(LinkKey key);
}
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
index 6613a7e..0519b3f 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LLDPLinkProvider.java
@@ -17,6 +17,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -34,15 +35,18 @@
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.LinkKey;
import org.onosproject.net.Port;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.link.DefaultLinkDescription;
import org.onosproject.net.link.LinkProvider;
import org.onosproject.net.link.LinkProviderRegistry;
import org.onosproject.net.link.LinkProviderService;
+import org.onosproject.net.link.LinkService;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.packet.PacketProcessor;
@@ -65,6 +69,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.Link.Type.DIRECT;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -91,6 +96,9 @@
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected LinkService linkService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -103,8 +111,9 @@
private ScheduledExecutorService executor;
- private static final long INIT_DELAY = 5;
- private static final long DELAY = 5;
+ // TODO: Add sanity checking for the configurable params based on the delays
+ private static final long DEVICE_SYNC_DELAY = 5;
+ private static final long LINK_PRUNER_DELAY = 3;
private static final String PROP_ENABLED = "enabled";
@Property(name = PROP_ENABLED, boolValue = true,
@@ -135,13 +144,18 @@
label = "Path to LLDP suppression configuration file")
private String lldpSuppression = DEFAULT_LLDP_SUPPRESSION_CONFIG;
-
private final DiscoveryContext context = new InternalDiscoveryContext();
- private final InternalLinkProvider listener = new InternalLinkProvider();
private final InternalRoleListener roleListener = new InternalRoleListener();
+ private final InternalDeviceListener deviceListener = new InternalDeviceListener();
+ private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
+ // Device link discovery helpers.
protected final Map<DeviceId, LinkDiscovery> discoverers = new ConcurrentHashMap<>();
+ // Most recent time a tracked link was seen; links are tracked if their
+ // destination connection point is mastered by this controller instance.
+ private final Map<LinkKey, Long> linkTimes = Maps.newConcurrentMap();
+
private SuppressionRules rules;
private ApplicationId appId;
@@ -216,28 +230,37 @@
log.info(FORMAT, enabled, useBDDP, probeRate, staleLinkAge, lldpSuppression);
}
+ /**
+ * Enables link discovery processing.
+ */
private void enable() {
providerService = providerRegistry.register(this);
- deviceService.addListener(listener);
- packetService.addProcessor(listener, PacketProcessor.advisor(0));
masterService.addListener(roleListener);
+ deviceService.addListener(deviceListener);
+ packetService.addProcessor(packetProcessor, PacketProcessor.advisor(0));
- processDevices();
+ loadDevices();
- executor = newSingleThreadScheduledExecutor(groupedThreads("onos/device", "sync-%d"));
- executor.scheduleAtFixedRate(new SyncDeviceInfoTask(), INIT_DELAY, DELAY, SECONDS);
+ executor = newSingleThreadScheduledExecutor(groupedThreads("onos/link", "discovery-%d"));
+ executor.scheduleAtFixedRate(new SyncDeviceInfoTask(),
+ DEVICE_SYNC_DELAY, DEVICE_SYNC_DELAY, SECONDS);
+ executor.scheduleAtFixedRate(new LinkPrunerTask(),
+ LINK_PRUNER_DELAY, LINK_PRUNER_DELAY, SECONDS);
loadSuppressionRules();
requestIntercepts();
}
+ /**
+ * Disables link discovery processing.
+ */
private void disable() {
withdrawIntercepts();
providerRegistry.unregister(this);
- deviceService.removeListener(listener);
- packetService.removeProcessor(listener);
masterService.removeListener(roleListener);
+ deviceService.removeListener(deviceListener);
+ packetService.removeProcessor(packetProcessor);
if (executor != null) {
executor.shutdownNow();
@@ -248,7 +271,10 @@
providerService = null;
}
- private void processDevices() {
+ /**
+ * Loads available devices and registers their ports to be probed.
+ */
+ private void loadDevices() {
for (Device device : deviceService.getAvailableDevices()) {
if (rules.isSuppressed(device)) {
log.debug("LinkDiscovery from {} disabled by configuration", device.id());
@@ -260,6 +286,9 @@
}
}
+ /**
+ * Adds ports of the specified device to the specified discovery helper.
+ */
private void addPorts(LinkDiscovery discoverer, DeviceId deviceId) {
for (Port p : deviceService.getPorts(deviceId)) {
if (rules.isSuppressed(p)) {
@@ -271,7 +300,12 @@
}
}
+
+ /**
+ * Loads LLDP suppression rules.
+ */
private void loadSuppressionRules() {
+ // FIXME: convert to use network configuration
SuppressionRulesStore store = new SuppressionRulesStore(lldpSuppression);
try {
log.info("Reading suppression rules from {}", lldpSuppression);
@@ -288,7 +322,7 @@
}
/**
- * Request packet intercepts.
+ * Requests packet intercepts.
*/
private void requestIntercepts() {
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
@@ -304,7 +338,7 @@
}
/**
- * Withdraw packet intercepts.
+ * Withdraws packet intercepts.
*/
private void withdrawIntercepts() {
TrafficSelector.Builder selector = DefaultTrafficSelector.builder();
@@ -314,6 +348,9 @@
packetService.cancelPackets(selector.build(), PacketPriority.CONTROL, appId);
}
+ /**
+ * Processes device mastership role changes.
+ */
private class InternalRoleListener implements MastershipListener {
@Override
public void event(MastershipEvent event) {
@@ -336,7 +373,10 @@
}
- private class InternalLinkProvider implements PacketProcessor, DeviceListener {
+ /**
+ * Processes device events.
+ */
+ private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
LinkDiscovery ld;
@@ -426,7 +466,12 @@
log.debug("Unknown event {}", event);
}
}
+ }
+ /**
+ * Processes incoming packets.
+ */
+ private class InternalPacketProcessor implements PacketProcessor {
@Override
public void process(PacketContext context) {
if (context == null) {
@@ -443,6 +488,9 @@
}
}
+ /**
+ * Auxiliary task to keep device ports up to date.
+ */
private final class SyncDeviceInfoTask implements Runnable {
@Override
public void run() {
@@ -464,12 +512,54 @@
}
}
} catch (Exception e) {
- // catch all Exception to avoid Scheduled task being suppressed.
+ // Catch all exceptions to avoid task being suppressed
log.error("Exception thrown during synchronization process", e);
}
}
}
+ /**
+ * Auxiliary task for pruning stale links.
+ */
+ private class LinkPrunerTask implements Runnable {
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+
+ try {
+ // TODO: There is still a slight possibility of mastership
+ // change occurring right with link going stale. This will
+ // result in the stale link not being pruned.
+ Maps.filterEntries(linkTimes, e -> {
+ if (!masterService.isLocalMaster(e.getKey().dst().deviceId())) {
+ return true;
+ }
+ if (isStale(e.getValue())) {
+ providerService.linkVanished(new DefaultLinkDescription(e.getKey().src(),
+ e.getKey().dst(),
+ DIRECT));
+ return true;
+ }
+ return false;
+ }).clear();
+
+ } catch (Exception e) {
+ // Catch all exceptions to avoid task being suppressed
+ log.error("Exception thrown during link pruning process", e);
+ }
+ }
+
+ private boolean isStale(long lastSeen) {
+ return lastSeen < System.currentTimeMillis() - staleLinkAge;
+ }
+ }
+
+ /**
+ * Provides processing context for the device link discovery helpers.
+ */
private class InternalDiscoveryContext implements DiscoveryContext {
@Override
public MastershipService mastershipService() {
@@ -492,13 +582,14 @@
}
@Override
- public long staleLinkAge() {
- return staleLinkAge;
- }
-
- @Override
public boolean useBDDP() {
return useBDDP;
}
+
+ @Override
+ public void touchLink(LinkKey key) {
+ linkTimes.put(key, System.currentTimeMillis());
+ }
}
+
}
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
index f7fe8ab..5502624 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.provider.lldp.impl;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
@@ -37,9 +36,7 @@
import org.slf4j.Logger;
import java.nio.ByteBuffer;
-import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onosproject.net.PortNumber.portNumber;
@@ -53,7 +50,7 @@
* LLDP, send an LLDP for a single slow port. Based on FlowVisor topology
* discovery implementation.
*/
-public class LinkDiscovery implements TimerTask {
+class LinkDiscovery implements TimerTask {
private final Logger log = getLogger(getClass());
@@ -72,9 +69,6 @@
// Set of ports to be probed
private final Set<Long> ports = Sets.newConcurrentHashSet();
- // Most recent time a link was seen
- private final Map<LinkKey, Long> linkTimes = Maps.newConcurrentMap();
-
/**
* Instantiates discovery manager for the given physical switch. Creates a
* generic LLDP packet that will be customized for the port it is sent out on.
@@ -83,7 +77,7 @@
* @param device the physical switch
* @param context discovery context
*/
- public LinkDiscovery(Device device, DiscoveryContext context) {
+ LinkDiscovery(Device device, DiscoveryContext context) {
this.device = device;
this.context = context;
@@ -102,7 +96,6 @@
bddpEth.setEtherType(Ethernet.TYPE_BSN);
bddpEth.setDestinationMACAddress(ONOSLLDP.BDDP_MULTICAST);
bddpEth.setPad(true);
- log.info("Using BDDP to discover network");
isStopped = true;
start();
@@ -110,46 +103,47 @@
}
+ synchronized void stop() {
+ isStopped = true;
+ timeout.cancel();
+ }
+
+ synchronized void start() {
+ if (isStopped) {
+ isStopped = false;
+ timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
+ } else {
+ log.warn("LinkDiscovery started multiple times?");
+ }
+ }
+
+ synchronized boolean isStopped() {
+ return isStopped || timeout.isCancelled();
+ }
+
/**
* Add physical port port to discovery process.
* Send out initial LLDP and label it as slow port.
*
* @param port the port
*/
- public void addPort(Port port) {
+ void addPort(Port port) {
boolean newPort = ports.add(port.number().toLong());
boolean isMaster = context.mastershipService().isLocalMaster(device.id());
if (newPort && isMaster) {
- log.debug("Sending init probe to port {}@{}", port.number().toLong(), device.id());
+ log.debug("Sending initial probe to port {}@{}", port.number().toLong(), device.id());
sendProbes(port.number().toLong());
}
}
/**
- * Method called by remote port to acknowledge receipt of LLDP sent by
- * this port. If slow port, updates label to fast. If fast port, decrements
- * number of unacknowledged probes.
- *
- * @param key link key
- */
- private void ackProbe(LinkKey key) {
- long portNumber = key.src().port().toLong();
- if (ports.contains(portNumber)) {
- linkTimes.put(key, System.currentTimeMillis());
- } else {
- log.debug("Got ackProbe for non-existing port: {}", portNumber);
- }
- }
-
-
- /**
- * Handles an incoming LLDP packet. Creates link in topology and sends ACK
- * to port where LLDP originated.
+ * Handles an incoming LLDP packet. Creates link in topology and adds the
+ * link for staleness tracking.
*
* @param packetContext packet context
* @return true if handled
*/
- public boolean handleLLDP(PacketContext packetContext) {
+ boolean handleLLDP(PacketContext packetContext) {
Ethernet eth = packetContext.inPacket().parsed();
if (eth == null) {
return false;
@@ -165,14 +159,13 @@
ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort);
ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort);
- ackProbe(LinkKey.linkKey(src, dst));
-
LinkDescription ld = eth.getEtherType() == Ethernet.TYPE_LLDP ?
new DefaultLinkDescription(src, dst, Type.DIRECT) :
new DefaultLinkDescription(src, dst, Type.INDIRECT);
try {
context.providerService().linkDetected(ld);
+ context.touchLink(LinkKey.linkKey(src, dst));
} catch (IllegalStateException e) {
return true;
}
@@ -195,52 +188,16 @@
return;
}
- if (!context.mastershipService().isLocalMaster(device.id())) {
- if (!isStopped()) {
- timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
- }
- return;
+ if (context.mastershipService().isLocalMaster(device.id())) {
+ log.trace("Sending probes from {}", device.id());
+ ports.forEach(this::sendProbes);
}
- // Prune stale links
- linkTimes.entrySet().stream()
- .filter(e -> isStale(e.getKey(), e.getValue()))
- .map(Map.Entry::getKey).collect(Collectors.toSet())
- .forEach(this::pruneLink);
-
- // Probe ports
- log.trace("Sending probes from {}", device.id());
- ports.forEach(this::sendProbes);
-
if (!isStopped()) {
timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
}
}
- private void pruneLink(LinkKey key) {
- linkTimes.remove(key);
- LinkDescription desc = new DefaultLinkDescription(key.src(), key.dst(), Type.DIRECT);
- context.providerService().linkVanished(desc);
- }
-
- private boolean isStale(LinkKey key, long lastSeen) {
- return lastSeen < (System.currentTimeMillis() - context.staleLinkAge());
- }
-
- public synchronized void stop() {
- isStopped = true;
- timeout.cancel();
- }
-
- public synchronized void start() {
- if (isStopped) {
- isStopped = false;
- timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
- } else {
- log.warn("LinkDiscovery started multiple times?");
- }
- }
-
/**
* Creates packet_out LLDP for specified output port.
*
@@ -285,11 +242,8 @@
}
}
- public synchronized boolean isStopped() {
- return isStopped || timeout.isCancelled();
- }
-
boolean containsPort(long portNumber) {
return ports.contains(portNumber);
}
+
}
diff --git a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
index bba031f..b4b7b7b 100644
--- a/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
+++ b/providers/lldp/src/test/java/org/onosproject/provider/lldp/impl/LLDPLinkProviderTest.java
@@ -48,6 +48,7 @@
import org.onosproject.net.link.LinkProvider;
import org.onosproject.net.link.LinkProviderRegistry;
import org.onosproject.net.link.LinkProviderService;
+import org.onosproject.net.link.LinkServiceAdapter;
import org.onosproject.net.packet.DefaultInboundPacket;
import org.onosproject.net.packet.InboundPacket;
import org.onosproject.net.packet.OutboundPacket;
@@ -79,7 +80,8 @@
private static Port pd4;
private final LLDPLinkProvider provider = new LLDPLinkProvider();
- private final TestLinkRegistry linkService = new TestLinkRegistry();
+ private final TestLinkRegistry linkRegistry = new TestLinkRegistry();
+ private final TestLinkService linkService = new TestLinkService();
private final TestPacketService packetService = new TestPacketService();
private final TestDeviceService deviceService = new TestDeviceService();
private final TestMasterShipService masterService = new TestMasterShipService();
@@ -104,8 +106,9 @@
provider.coreService = coreService;
provider.deviceService = deviceService;
+ provider.linkService = linkService;
provider.packetService = packetService;
- provider.providerRegistry = linkService;
+ provider.providerRegistry = linkRegistry;
provider.masterService = masterService;
provider.activate(null);
@@ -498,4 +501,6 @@
}
+ private class TestLinkService extends LinkServiceAdapter {
+ }
}
diff --git a/tools/test/cells/office b/tools/test/cells/office
index 2cd933e..2e282c6 100644
--- a/tools/test/cells/office
+++ b/tools/test/cells/office
@@ -3,5 +3,6 @@
export ONOS_NIC="10.1.10.*"
export OC1="10.1.10.223"
+unset ONOS_USE_SSH
export ONOS_APPS="drivers,openflow,fwd,proxyarp,mobility"