ONOS-2846, ONOS-2812 Refactored link discovery pruning to be centralized rather than being with each link discovery helper.
This will make it behave properly in a distributed context.
Change-Id: I9b9788336468c41d1cf506e388306ad9136d5853
diff --git a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
index f7fe8ab..5502624 100644
--- a/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
+++ b/providers/lldp/src/main/java/org/onosproject/provider/lldp/impl/LinkDiscovery.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.provider.lldp.impl;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
@@ -37,9 +36,7 @@
import org.slf4j.Logger;
import java.nio.ByteBuffer;
-import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.onosproject.net.PortNumber.portNumber;
@@ -53,7 +50,7 @@
* LLDP, send an LLDP for a single slow port. Based on FlowVisor topology
* discovery implementation.
*/
-public class LinkDiscovery implements TimerTask {
+class LinkDiscovery implements TimerTask {
private final Logger log = getLogger(getClass());
@@ -72,9 +69,6 @@
// Set of ports to be probed
private final Set<Long> ports = Sets.newConcurrentHashSet();
- // Most recent time a link was seen
- private final Map<LinkKey, Long> linkTimes = Maps.newConcurrentMap();
-
/**
* Instantiates discovery manager for the given physical switch. Creates a
* generic LLDP packet that will be customized for the port it is sent out on.
@@ -83,7 +77,7 @@
* @param device the physical switch
* @param context discovery context
*/
- public LinkDiscovery(Device device, DiscoveryContext context) {
+ LinkDiscovery(Device device, DiscoveryContext context) {
this.device = device;
this.context = context;
@@ -102,7 +96,6 @@
bddpEth.setEtherType(Ethernet.TYPE_BSN);
bddpEth.setDestinationMACAddress(ONOSLLDP.BDDP_MULTICAST);
bddpEth.setPad(true);
- log.info("Using BDDP to discover network");
isStopped = true;
start();
@@ -110,46 +103,47 @@
}
+ synchronized void stop() {
+ isStopped = true;
+ timeout.cancel();
+ }
+
+ synchronized void start() {
+ if (isStopped) {
+ isStopped = false;
+ timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
+ } else {
+ log.warn("LinkDiscovery started multiple times?");
+ }
+ }
+
+ synchronized boolean isStopped() {
+ return isStopped || timeout.isCancelled();
+ }
+
/**
* Add physical port port to discovery process.
* Send out initial LLDP and label it as slow port.
*
* @param port the port
*/
- public void addPort(Port port) {
+ void addPort(Port port) {
boolean newPort = ports.add(port.number().toLong());
boolean isMaster = context.mastershipService().isLocalMaster(device.id());
if (newPort && isMaster) {
- log.debug("Sending init probe to port {}@{}", port.number().toLong(), device.id());
+ log.debug("Sending initial probe to port {}@{}", port.number().toLong(), device.id());
sendProbes(port.number().toLong());
}
}
/**
- * Method called by remote port to acknowledge receipt of LLDP sent by
- * this port. If slow port, updates label to fast. If fast port, decrements
- * number of unacknowledged probes.
- *
- * @param key link key
- */
- private void ackProbe(LinkKey key) {
- long portNumber = key.src().port().toLong();
- if (ports.contains(portNumber)) {
- linkTimes.put(key, System.currentTimeMillis());
- } else {
- log.debug("Got ackProbe for non-existing port: {}", portNumber);
- }
- }
-
-
- /**
- * Handles an incoming LLDP packet. Creates link in topology and sends ACK
- * to port where LLDP originated.
+ * Handles an incoming LLDP packet. Creates link in topology and adds the
+ * link for staleness tracking.
*
* @param packetContext packet context
* @return true if handled
*/
- public boolean handleLLDP(PacketContext packetContext) {
+ boolean handleLLDP(PacketContext packetContext) {
Ethernet eth = packetContext.inPacket().parsed();
if (eth == null) {
return false;
@@ -165,14 +159,13 @@
ConnectPoint src = new ConnectPoint(srcDeviceId, srcPort);
ConnectPoint dst = new ConnectPoint(dstDeviceId, dstPort);
- ackProbe(LinkKey.linkKey(src, dst));
-
LinkDescription ld = eth.getEtherType() == Ethernet.TYPE_LLDP ?
new DefaultLinkDescription(src, dst, Type.DIRECT) :
new DefaultLinkDescription(src, dst, Type.INDIRECT);
try {
context.providerService().linkDetected(ld);
+ context.touchLink(LinkKey.linkKey(src, dst));
} catch (IllegalStateException e) {
return true;
}
@@ -195,52 +188,16 @@
return;
}
- if (!context.mastershipService().isLocalMaster(device.id())) {
- if (!isStopped()) {
- timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
- }
- return;
+ if (context.mastershipService().isLocalMaster(device.id())) {
+ log.trace("Sending probes from {}", device.id());
+ ports.forEach(this::sendProbes);
}
- // Prune stale links
- linkTimes.entrySet().stream()
- .filter(e -> isStale(e.getKey(), e.getValue()))
- .map(Map.Entry::getKey).collect(Collectors.toSet())
- .forEach(this::pruneLink);
-
- // Probe ports
- log.trace("Sending probes from {}", device.id());
- ports.forEach(this::sendProbes);
-
if (!isStopped()) {
timeout = Timer.getTimer().newTimeout(this, context.probeRate(), MILLISECONDS);
}
}
- private void pruneLink(LinkKey key) {
- linkTimes.remove(key);
- LinkDescription desc = new DefaultLinkDescription(key.src(), key.dst(), Type.DIRECT);
- context.providerService().linkVanished(desc);
- }
-
- private boolean isStale(LinkKey key, long lastSeen) {
- return lastSeen < (System.currentTimeMillis() - context.staleLinkAge());
- }
-
- public synchronized void stop() {
- isStopped = true;
- timeout.cancel();
- }
-
- public synchronized void start() {
- if (isStopped) {
- isStopped = false;
- timeout = Timer.getTimer().newTimeout(this, 0, MILLISECONDS);
- } else {
- log.warn("LinkDiscovery started multiple times?");
- }
- }
-
/**
* Creates packet_out LLDP for specified output port.
*
@@ -285,11 +242,8 @@
}
}
- public synchronized boolean isStopped() {
- return isStopped || timeout.isCancelled();
- }
-
boolean containsPort(long portNumber) {
return ports.contains(portNumber);
}
+
}