LLDPLinkProvider listens to MastershipEvents
Change-Id: Iaa3655c680a8fc93921f0b83dc4fc16311222bf9
diff --git a/apps/optical/src/main/java/org/onlab/onos/optical/testapp/LambdaForwarding.java b/apps/optical/src/main/java/org/onlab/onos/optical/testapp/LambdaForwarding.java
index b4a7d0c..4d4fc7a 100644
--- a/apps/optical/src/main/java/org/onlab/onos/optical/testapp/LambdaForwarding.java
+++ b/apps/optical/src/main/java/org/onlab/onos/optical/testapp/LambdaForwarding.java
@@ -149,8 +149,6 @@
break;
case DEVICE_AVAILABILITY_CHANGED:
break;
- case DEVICE_MASTERSHIP_CHANGED:
- break;
case DEVICE_REMOVED:
break;
case DEVICE_SUSPENDED:
diff --git a/apps/optical/src/main/java/org/onlab/onos/optical/testapp/MPLSForwarding.java b/apps/optical/src/main/java/org/onlab/onos/optical/testapp/MPLSForwarding.java
index 8ae4a724..bcae4b5 100644
--- a/apps/optical/src/main/java/org/onlab/onos/optical/testapp/MPLSForwarding.java
+++ b/apps/optical/src/main/java/org/onlab/onos/optical/testapp/MPLSForwarding.java
@@ -146,8 +146,6 @@
break;
case DEVICE_AVAILABILITY_CHANGED:
break;
- case DEVICE_MASTERSHIP_CHANGED:
- break;
case DEVICE_REMOVED:
break;
case DEVICE_SUSPENDED:
diff --git a/core/api/src/main/java/org/onlab/onos/net/device/DeviceEvent.java b/core/api/src/main/java/org/onlab/onos/net/device/DeviceEvent.java
index 7fa981e..63baeb4 100644
--- a/core/api/src/main/java/org/onlab/onos/net/device/DeviceEvent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/device/DeviceEvent.java
@@ -60,12 +60,6 @@
DEVICE_AVAILABILITY_CHANGED,
/**
- * Signifies that the current controller instance relationship has
- * changed with respect to a device.
- */
- DEVICE_MASTERSHIP_CHANGED,
-
- /**
* Signifies that a port has been added.
*/
PORT_ADDED,
diff --git a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
index 597f15f..f45d4de 100644
--- a/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
@@ -16,7 +16,6 @@
package org.onlab.onos.net.device.impl;
import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onlab.onos.net.device.DeviceEvent.Type.DEVICE_MASTERSHIP_CHANGED;
import static org.onlab.onos.net.MastershipRole.*;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -315,8 +314,6 @@
if (event != null) {
log.trace("event: {} {}", event.type(), event);
post(event);
- } else {
- post(new DeviceEvent(DEVICE_MASTERSHIP_CHANGED, store.getDevice(deviceId)));
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java b/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java
index 80d461a..283a096 100644
--- a/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/proxyarp/impl/ProxyArpManager.java
@@ -441,7 +441,6 @@
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_AVAILABILITY_CHANGED:
- case DEVICE_MASTERSHIP_CHANGED:
case DEVICE_SUSPENDED:
case DEVICE_UPDATED:
// nothing to do in these cases; handled when links get reported
diff --git a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java
index 6dc464e..b8905ae 100644
--- a/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java
+++ b/providers/lldp/src/main/java/org/onlab/onos/provider/lldp/impl/LLDPLinkProvider.java
@@ -20,6 +20,8 @@
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.onos.mastership.MastershipEvent;
+import org.onlab.onos.mastership.MastershipListener;
import org.onlab.onos.mastership.MastershipService;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.Device;
@@ -40,7 +42,11 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -67,11 +73,17 @@
private LinkProviderService providerService;
+ private ScheduledExecutorService executor;
+
private final boolean useBDDP = true;
+ private static final long INIT_DELAY = 5;
+ private static final long DELAY = 5;
private final InternalLinkProvider listener = new InternalLinkProvider();
+ private final InternalRoleListener roleListener = new InternalRoleListener();
+
protected final Map<DeviceId, LinkDiscovery> discoverers = new ConcurrentHashMap<>();
/**
@@ -86,6 +98,8 @@
providerService = providerRegistry.register(this);
deviceService.addListener(listener);
packetSevice.addProcessor(listener, 0);
+ masterService.addListener(roleListener);
+
LinkDiscovery ld;
for (Device device : deviceService.getDevices()) {
ld = new LinkDiscovery(device, packetSevice, masterService,
@@ -98,22 +112,57 @@
}
}
+ executor = newSingleThreadScheduledExecutor(namedThreads("device-sync-%d"));
+ executor.scheduleAtFixedRate(new SyncDeviceInfoTask(), INIT_DELAY,
+ DELAY, TimeUnit.SECONDS);
+
log.info("Started");
}
@Deactivate
public void deactivate() {
+ executor.shutdownNow();
for (LinkDiscovery ld : discoverers.values()) {
ld.stop();
}
providerRegistry.unregister(this);
deviceService.removeListener(listener);
packetSevice.removeProcessor(listener);
+ masterService.removeListener(roleListener);
providerService = null;
log.info("Stopped");
}
+ private class InternalRoleListener implements MastershipListener {
+
+ @Override
+ public void event(MastershipEvent event) {
+
+ if (MastershipEvent.Type.BACKUPS_CHANGED.equals(event.type())) {
+ // only need new master events
+ return;
+ }
+
+ DeviceId deviceId = event.subject();
+ Device device = deviceService.getDevice(deviceId);
+ if (device == null) {
+ log.warn("Device {} doesn't exist, or isn't there yet", deviceId);
+ return;
+ }
+ synchronized (discoverers) {
+ if (!discoverers.containsKey(deviceId)) {
+ // TODO: ideally, should never reach here
+ log.debug("Device mastership changed ({}) {}",
+ event.type(), deviceId);
+ discoverers.put(deviceId, new LinkDiscovery(device,
+ packetSevice, masterService, providerService,
+ useBDDP));
+ }
+ }
+ }
+
+ }
private class InternalLinkProvider implements PacketProcessor, DeviceListener {
@@ -131,17 +180,21 @@
switch (event.type()) {
case DEVICE_ADDED:
case DEVICE_UPDATED:
+ synchronized (discoverers) {
ld = discoverers.get(deviceId);
if (ld == null) {
- log.debug("Device added ({}) {}", event.type(), deviceId);
- discoverers.put(deviceId,
- new LinkDiscovery(device, packetSevice, masterService,
- providerService, useBDDP));
+ log.debug("Device added ({}) {}", event.type(),
+ deviceId);
+ discoverers.put(deviceId, new LinkDiscovery(device,
+ packetSevice, masterService, providerService,
+ useBDDP));
} else {
if (ld.isStopped()) {
- log.debug("Device restarted ({}) {}", event.type(), deviceId);
+ log.debug("Device restarted ({}) {}", event.type(),
+ deviceId);
ld.start();
}
+ }
}
break;
case PORT_ADDED:
@@ -193,15 +246,6 @@
ld.stop();
}
break;
- case DEVICE_MASTERSHIP_CHANGED:
- if (!discoverers.containsKey(deviceId)) {
- // TODO: ideally, should never reach here
- log.debug("Device mastership changed ({}) {}", event.type(), deviceId);
- discoverers.put(deviceId,
- new LinkDiscovery(device, packetSevice, masterService,
- providerService, useBDDP));
- }
- break;
default:
log.debug("Unknown event {}", event);
}
@@ -224,4 +268,37 @@
}
}
+ private final class SyncDeviceInfoTask implements Runnable {
+
+ @Override
+ public void run() {
+ if (Thread.currentThread().isInterrupted()) {
+ log.info("Interrupted, quitting");
+ return;
+ }
+ // check what deviceService sees, to see if we are missing anything
+ try {
+ LinkDiscovery ld = null;
+ for (Device dev : deviceService.getDevices()) {
+ DeviceId did = dev.id();
+ synchronized (discoverers) {
+ if (!discoverers.containsKey(did)) {
+ ld = new LinkDiscovery(dev, packetSevice,
+ masterService, providerService, useBDDP);
+ discoverers.put(did, ld);
+ for (Port p : deviceService.getPorts(did)) {
+ if (!p.number().isLogical()) {
+ ld.addPort(p);
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ // catch all Exception to avoid Scheduled task being suppressed.
+ log.error("Exception thrown during synchronization process", e);
+ }
+ }
+ }
+
}