ONOS-4422 Implemented device discovery in Bmv2 device provider and other
improvements
- Added listener for hello messages received from Bmv2 devices
- Added a periodic poller task to check device reachability and port
information updates
- Avoids periodically re-connecting the device if it is already
available in the core
- Fixed minor bug in Bmv2ThriftClient
Change-Id: I416d1880773e11b2ac6fa062d8be2b8f280786fb
diff --git a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java
index db0d5e2..f0e68f7 100644
--- a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java
+++ b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java
@@ -80,9 +80,9 @@
// Seconds after a client is expired (and connection closed) in the cache.
private static final int CLIENT_CACHE_TIMEOUT = 60;
// Number of connection retries after a network error.
- private static final int NUM_CONNECTION_RETRIES = 10;
+ private static final int NUM_CONNECTION_RETRIES = 3;
// Time between retries in milliseconds.
- private static final int TIME_BETWEEN_RETRIES = 200;
+ private static final int TIME_BETWEEN_RETRIES = 300;
// Static client cache where clients are removed after a predefined timeout.
private static final LoadingCache<DeviceId, Bmv2ThriftClient>
@@ -125,6 +125,15 @@
}
/**
+ * Force a close of the transport session (if one is open) with the given device.
+ *
+ * @param deviceId device id
+ */
+ public static void forceDisconnectOf(DeviceId deviceId) {
+ CLIENT_CACHE.invalidate(deviceId);
+ }
+
+ /**
* Pings the device. Returns true if the device is reachable,
* false otherwise.
*
@@ -392,7 +401,7 @@
LOG.debug("Packet transmission requested! > portNumber={}, packet={}", portNumber, packet);
} catch (TException e) {
LOG.debug("Exception while requesting packet transmission: {} > portNumber={}, packet={}",
- portNumber, packet);
+ e, portNumber, packet);
throw new Bmv2RuntimeException(e.getMessage(), e);
}
}
diff --git a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java
index bbe0546..95a052a 100644
--- a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java
+++ b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java
@@ -173,7 +173,12 @@
private static void reconnectOrThrowException(TTransport transport, int maxRetries, long timeBetweenRetries)
throws TTransportException {
int errors = 0;
- transport.close();
+ try {
+ transport.close();
+ } catch (Exception e) {
+ // Thrift seems to have a bug where if the transport is already closed a SocketException is thrown.
+ // However, such an exception is not advertised by .close(), hence the general-purpose catch.
+ }
while (errors < maxRetries) {
try {
@@ -182,7 +187,7 @@
LOG.debug("Reconnection successful");
break;
} catch (TTransportException e) {
- LOG.error("Error while reconnecting:", e);
+ LOG.debug("Error while reconnecting:", e);
errors++;
if (errors < maxRetries) {
diff --git a/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
index 10a6997..2d16ff2 100644
--- a/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
+++ b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
@@ -16,11 +16,17 @@
package org.onosproject.provider.bmv2.device.impl;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
import org.onlab.packet.ChassisId;
+import org.onlab.util.Timer;
+import org.onosproject.bmv2.api.runtime.Bmv2ControlPlaneServer;
+import org.onosproject.bmv2.api.runtime.Bmv2Device;
import org.onosproject.common.net.AbstractDeviceProvider;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
@@ -40,17 +46,20 @@
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DeviceDescription;
import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.device.PortDescription;
import org.onosproject.net.provider.ProviderId;
import org.slf4j.Logger;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.Set;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.bmv2.ctl.Bmv2ThriftClient.forceDisconnectOf;
import static org.onosproject.bmv2.ctl.Bmv2ThriftClient.ping;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
import static org.slf4j.LoggerFactory.getLogger;
@@ -61,30 +70,14 @@
@Component(immediate = true)
public class Bmv2DeviceProvider extends AbstractDeviceProvider {
- private final Logger log = getLogger(Bmv2DeviceProvider.class);
+ private static final Logger LOG = getLogger(Bmv2DeviceProvider.class);
public static final String MANUFACTURER = "p4.org";
public static final String HW_VERSION = "bmv2";
+ public static final String SCHEME = "bmv2";
private static final String APP_NAME = "org.onosproject.bmv2";
private static final String UNKNOWN = "unknown";
- public static final String SCHEME = "bmv2";
-
- private final ExecutorService deviceDiscoveryExecutor = Executors
- .newFixedThreadPool(5, groupedThreads("onos/bmv2", "device-discovery", log));
-
- private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
-
- private final ConfigFactory cfgFactory =
- new ConfigFactory<ApplicationId, Bmv2ProviderConfig>(
- APP_SUBJECT_FACTORY, Bmv2ProviderConfig.class,
- "devices", true) {
- @Override
- public Bmv2ProviderConfig createConfig() {
- return new Bmv2ProviderConfig();
- }
- };
-
- private final Set<DeviceId> activeDevices = Sets.newConcurrentHashSet();
+ private static final int POLL_INTERVAL = 5; // seconds
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigRegistry netCfgService;
@@ -95,6 +88,17 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected Bmv2ControlPlaneServer controlPlaneServer;
+
+ private final ExecutorService deviceDiscoveryExecutor = Executors
+ .newFixedThreadPool(5, groupedThreads("onos/bmv2", "device-discovery", LOG));
+
+ private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
+ private final ConfigFactory cfgFactory = new InternalConfigFactory();
+ private final ConcurrentMap<DeviceId, Boolean> activeDevices = Maps.newConcurrentMap();
+ private final DevicePoller devicePoller = new DevicePoller();
+ private final InternalHelloListener helloListener = new InternalHelloListener();
private ApplicationId appId;
/**
@@ -104,56 +108,66 @@
super(new ProviderId("bmv2", "org.onosproject.provider.device"));
}
- protected static DeviceId deviceIdOf(Bmv2ProviderConfig.Bmv2DeviceInfo info) {
+ private static DeviceId deviceIdOf(String ip, int port) {
try {
- return DeviceId.deviceId(new URI(
- SCHEME, info.ip().toString() + ":" + info.port(), null));
+ return DeviceId.deviceId(new URI(SCHEME, ip + ":" + port, null));
} catch (URISyntaxException e) {
- throw new IllegalArgumentException(
- "Unable to build deviceID for device "
- + info.ip().toString() + ":" + info.ip().toString(),
- e);
+ throw new IllegalArgumentException("Unable to build deviceID for device " + ip + ":" + port, e);
}
}
+ /**
+ * Creates a new device ID for the given BMv2 device.
+ *
+ * @param device a BMv2 device object
+ *
+ * @return a new device ID
+ */
+ public static DeviceId deviceIdOf(Bmv2Device device) {
+ return deviceIdOf(device.thriftServerHost(), device.thriftServerPort());
+ }
+
@Override
protected void activate() {
appId = coreService.registerApplication(APP_NAME);
netCfgService.registerConfigFactory(cfgFactory);
netCfgService.addListener(cfgListener);
-
+ controlPlaneServer.addHelloListener(helloListener);
+ devicePoller.start();
super.activate();
}
@Override
protected void deactivate() {
+ devicePoller.stop();
+ controlPlaneServer.removeHelloListener(helloListener);
try {
- activeDevices.stream().forEach(did -> {
+ activeDevices.forEach((did, value) -> {
deviceDiscoveryExecutor.execute(() -> disconnectDevice(did));
});
deviceDiscoveryExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
- log.error("Device discovery threads did not terminate");
+ LOG.error("Device discovery threads did not terminate");
}
deviceDiscoveryExecutor.shutdownNow();
netCfgService.unregisterConfigFactory(cfgFactory);
netCfgService.removeListener(cfgListener);
-
super.deactivate();
}
@Override
public void triggerProbe(DeviceId deviceId) {
+ // Asynchronously trigger probe task.
deviceDiscoveryExecutor.execute(() -> executeProbe(deviceId));
}
private void executeProbe(DeviceId did) {
boolean reachable = isReachable(did);
- log.debug("Probed device: id={}, reachable={}",
+ LOG.debug("Probed device: id={}, reachable={}",
did.toString(),
reachable);
if (reachable) {
- connectDevice(did);
+ discoverDevice(did);
} else {
disconnectDevice(did);
}
@@ -161,7 +175,7 @@
@Override
public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
- log.debug("roleChanged() is not yet implemented");
+ LOG.debug("roleChanged() is not yet implemented");
// TODO: implement mastership handling
}
@@ -172,41 +186,69 @@
@Override
public void changePortState(DeviceId deviceId, PortNumber portNumber, boolean enable) {
- log.debug("changePortState() is not yet implemented");
+ LOG.debug("changePortState() is not yet implemented");
// TODO: implement port handling
}
- private void connectDevice(DeviceId did) {
- log.debug("Trying to create device on ONOS core: {}", did);
- SparseAnnotations annotations = DefaultAnnotations.builder()
- .set(AnnotationKeys.PROTOCOL, SCHEME)
- .build();
- DeviceDescription descr = new DefaultDeviceDescription(
- did.uri(), Device.Type.SWITCH, MANUFACTURER, HW_VERSION,
- UNKNOWN, UNKNOWN, new ChassisId(), annotations);
- providerService.deviceConnected(did, descr);
- activeDevices.add(did);
- discoverPorts(did);
- }
+ private void discoverDevice(DeviceId did) {
+ LOG.debug("Starting device discovery... deviceId={}", did);
- private void discoverPorts(DeviceId did) {
- Device device = deviceService.getDevice(did);
- if (device.is(PortDiscovery.class)) {
- PortDiscovery portConfig = device.as(PortDiscovery.class);
- providerService.updatePorts(did, portConfig.getPorts());
- } else {
- log.warn("No PortDiscovery behavior for device {}", did);
- }
+ // Atomically notify device to core and update port information.
+ activeDevices.compute(did, (k, v) -> {
+ if (!deviceService.isAvailable(did)) {
+ // Device not available in the core, connect it now.
+ SparseAnnotations annotations = DefaultAnnotations.builder()
+ .set(AnnotationKeys.PROTOCOL, SCHEME)
+ .build();
+ DeviceDescription descr = new DefaultDeviceDescription(
+ did.uri(), Device.Type.SWITCH, MANUFACTURER, HW_VERSION,
+ UNKNOWN, UNKNOWN, new ChassisId(), annotations);
+ providerService.deviceConnected(did, descr);
+ }
+ // Discover ports.
+ Device device = deviceService.getDevice(did);
+ if (device.is(PortDiscovery.class)) {
+ PortDiscovery portConfig = device.as(PortDiscovery.class);
+ List<PortDescription> portDescriptions = portConfig.getPorts();
+ providerService.updatePorts(did, portDescriptions);
+ } else {
+ LOG.warn("No PortDiscovery behavior for device {}", did);
+ }
+ return true;
+ });
}
private void disconnectDevice(DeviceId did) {
- log.debug("Trying to remove device from ONOS core: {}", did);
- providerService.deviceDisconnected(did);
- activeDevices.remove(did);
+ LOG.debug("Trying to disconnect device from core... deviceId={}", did);
+
+ // Atomically disconnect device.
+ activeDevices.compute(did, (k, v) -> {
+ if (deviceService.isAvailable(did)) {
+ providerService.deviceDisconnected(did);
+ // Make sure to close the transport session with device.
+ forceDisconnectOf(did);
+ }
+ return null;
+ });
}
/**
- * Handles net-cfg events.
+ * Internal net-cfg config factory.
+ */
+ private class InternalConfigFactory extends ConfigFactory<ApplicationId, Bmv2ProviderConfig> {
+
+ InternalConfigFactory() {
+ super(APP_SUBJECT_FACTORY, Bmv2ProviderConfig.class, "devices", true);
+ }
+
+ @Override
+ public Bmv2ProviderConfig createConfig() {
+ return new Bmv2ProviderConfig();
+ }
+ }
+
+ /**
+ * Internal net-cfg event listener.
*/
private class InternalNetworkConfigListener implements NetworkConfigListener {
@@ -216,13 +258,13 @@
if (cfg != null) {
try {
cfg.getDevicesInfo().stream().forEach(info -> {
- triggerProbe(deviceIdOf(info));
+ triggerProbe(deviceIdOf(info.ip().toString(), info.port()));
});
} catch (ConfigException e) {
- log.error("Unable to read config: " + e);
+ LOG.error("Unable to read config: " + e);
}
} else {
- log.error("Unable to read config (was null)");
+ LOG.error("Unable to read config (was null)");
}
}
@@ -233,4 +275,50 @@
event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
}
}
+
+ /**
+ * Listener triggered by Bmv2ControlPlaneServer each time a hello message is received.
+ */
+ private class InternalHelloListener implements Bmv2ControlPlaneServer.HelloListener {
+ @Override
+ public void handleHello(Bmv2Device device) {
+ log.debug("Received hello from {}", device);
+ triggerProbe(deviceIdOf(device));
+ }
+ }
+
+ /**
+ * Task that periodically trigger device probes.
+ */
+ private class DevicePoller implements TimerTask {
+
+ private final HashedWheelTimer timer = Timer.getTimer();
+ private Timeout timeout;
+
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (timeout.isCancelled()) {
+ return;
+ }
+ log.debug("Executing polling on {} devices...", activeDevices.size());
+ activeDevices.forEach((did, value) -> triggerProbe(did));
+ timeout.getTimer().newTimeout(this, POLL_INTERVAL, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Starts the collector.
+ */
+ synchronized void start() {
+ LOG.info("Starting device poller...");
+ timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
+ }
+
+ /**
+ * Stops the collector.
+ */
+ synchronized void stop() {
+ LOG.info("Stopping device poller...");
+ timeout.cancel();
+ }
+ }
}