[ONOS-6576] Add ports discovery when OVSDB DEVICE_ADDED event occurs
Change-Id: Ie933bd7e2fe2ee65dec5e8a93b67c0a739283eb8
diff --git a/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java b/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java
index 455423b..50b8c83 100644
--- a/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java
+++ b/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java
@@ -16,9 +16,13 @@
package org.onosproject.ovsdb.providers.device;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -28,6 +32,7 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.packet.ChassisId;
import org.onlab.packet.IpAddress;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
@@ -36,9 +41,13 @@
import org.onosproject.net.SparseAnnotations;
import org.onosproject.net.device.DefaultDeviceDescription;
import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceDescriptionDiscovery;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.DeviceService;
import org.onosproject.net.provider.AbstractProvider;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.ovsdb.controller.OvsdbClientService;
@@ -62,15 +71,28 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OvsdbController controller;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
private DeviceProviderService providerService;
private OvsdbNodeListener innerNodeListener = new InnerOvsdbNodeListener();
+ private InternalDeviceListener deviceListener = new InternalDeviceListener();
protected static final String ISNOTNULL = "OvsdbNodeId is not null";
+ protected static final String SCHEME_NAME = "ovsdb";
private static final String UNKNOWN = "unknown";
+ protected ExecutorService executor =
+ Executors.newFixedThreadPool(5, groupedThreads("onos/ovsdbdeviceprovider",
+ "device-installer-%d", log));
+
@Activate
public void activate() {
providerService = providerRegistry.register(this);
controller.addNodeListener(innerNodeListener);
+ deviceService.addListener(deviceListener);
log.info("Started");
}
@@ -78,6 +100,8 @@
public void deactivate() {
controller.removeNodeListener(innerNodeListener);
providerRegistry.unregister(this);
+ deviceService.removeListener(deviceListener);
+ waitForTasksToEnd();
providerService = null;
log.info("Stopped");
}
@@ -152,4 +176,46 @@
boolean enable) {
// TODO if required
}
+
+ private void discoverPorts(DeviceId deviceId) {
+ Device device = deviceService.getDevice(deviceId);
+ if (device.is(DeviceDescriptionDiscovery.class)) {
+ DeviceDescriptionDiscovery deviceDescriptionDiscovery = device.as(DeviceDescriptionDiscovery.class);
+ providerService.updatePorts(deviceId, deviceDescriptionDiscovery.discoverPortDetails());
+ } else {
+ log.warn("Device " + deviceId + " does not support behaviour DeviceDescriptionDiscovery");
+ }
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ DeviceId deviceId = event.subject().id();
+ if (!isRelevant(deviceId)) {
+ return;
+ }
+ if ((event.type() == DeviceEvent.Type.DEVICE_ADDED)) {
+ executor.execute(() -> discoverPorts(deviceId));
+ }
+ }
+
+ @Override
+ public boolean isRelevant(DeviceEvent event) {
+ DeviceId deviceId = event.subject().id();
+ return isRelevant(deviceId) && mastershipService.isLocalMaster(deviceId);
+ }
+
+ private boolean isRelevant(DeviceId deviceId) {
+ return deviceId.uri().getScheme().equals(SCHEME_NAME);
+ }
+ }
+
+ private void waitForTasksToEnd() {
+ executor.shutdown();
+ try {
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ log.error("Timeout while waiting for child threads to finish because: " + e.getMessage());
+ }
+ }
}
diff --git a/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java b/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java
index 11bcd36..5ce7367 100644
--- a/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java
+++ b/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java
@@ -15,28 +15,33 @@
*/
package org.onosproject.ovsdb.providers.device;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Consumer;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.onlab.packet.IpAddress;
import org.onlab.packet.TpPort;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceDescriptionDiscovery;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceProvider;
import org.onosproject.net.device.DeviceProviderRegistry;
import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.DeviceServiceAdapter;
import org.onosproject.net.device.PortDescription;
import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.driver.Behaviour;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.ovsdb.controller.OvsdbClientService;
import org.onosproject.ovsdb.controller.OvsdbController;
@@ -44,8 +49,21 @@
import org.onosproject.ovsdb.controller.OvsdbNodeId;
import org.onosproject.ovsdb.controller.OvsdbNodeListener;
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
/**
* Test for ovsdb device provider.
@@ -54,11 +72,24 @@
private final OvsdbDeviceProvider provider = new OvsdbDeviceProvider();
private final TestDeviceRegistry registry = new TestDeviceRegistry();
private final TestController controller = new TestController();
+ private final TestDeviceService deviceService = new TestDeviceService();
+
+ private final Device ovsdbDevice = new MockDevice(
+ DeviceId.deviceId("ovsdb:127.0.0.1"),
+ DefaultAnnotations.EMPTY);
+
+ private final Device notOvsdbDevice = new MockDevice(
+ DeviceId.deviceId("other:127.0.0.1"),
+ DefaultAnnotations.EMPTY);
+
+ private final TestDescription deviceDescription = new TestDescription();
+
@Before
public void startUp() {
provider.providerRegistry = registry;
provider.controller = controller;
+ provider.deviceService = deviceService;
provider.activate();
assertNotNull("provider should be registered", registry.provider);
}
@@ -68,11 +99,7 @@
provider.deactivate();
provider.controller = null;
provider.providerRegistry = null;
- }
-
- @Test
- public void triggerProbe() {
-
+ provider.deviceService = null;
}
@Test
@@ -91,11 +118,30 @@
assertEquals("ovsdb node removded", 0, registry.connected.size());
}
+ @Test
+ public void testDiscoverPortsAfterDeviceAdded() {
+ final int portCount = 5;
+ provider.executor = new SynchronousExecutor();
+ prepareMocks(portCount);
+
+ deviceService.listener.event(new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, ovsdbDevice));
+ deviceService.listener.event(new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, notOvsdbDevice));
+
+ assertEquals(portCount, registry.ports.get(ovsdbDevice.id()).size());
+ assertEquals(0, registry.ports.get(notOvsdbDevice.id()).size());
+ }
+
+ private void prepareMocks(int count) {
+ for(int i = 1; i <= count; i++) {
+ deviceDescription.portDescriptions.add(new DefaultPortDescription(PortNumber.portNumber(i), true));
+ }
+ }
+
private class TestDeviceRegistry implements DeviceProviderRegistry {
DeviceProvider provider;
- Set<DeviceId> connected = new HashSet<>();
- Multimap<DeviceId, PortDescription> ports = HashMultimap.create();
+ final Set<DeviceId> connected = new HashSet<>();
+ final Multimap<DeviceId, PortDescription> ports = HashMultimap.create();
PortDescription descr = null;
@Override
@@ -207,4 +253,125 @@
}
+ private class TestDeviceService extends DeviceServiceAdapter {
+ DeviceListener listener = null;
+
+ @Override
+ public Device getDevice(DeviceId deviceId) {
+ return ovsdbDevice;
+ }
+
+ @Override
+ public void addListener(DeviceListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public void removeListener(DeviceListener listener) {
+ this.listener = null;
+ }
+ }
+
+ private class TestDescription extends AbstractHandlerBehaviour implements DeviceDescriptionDiscovery {
+
+ final List<PortDescription> portDescriptions = new ArrayList<>();
+
+ @Override
+ public DeviceDescription discoverDeviceDetails() {
+ return null;
+ }
+
+ @Override
+ public List<PortDescription> discoverPortDetails() {
+ return portDescriptions;
+ }
+ }
+
+ private class MockDevice extends DefaultDevice {
+
+ MockDevice(DeviceId id,
+ Annotations... annotations) {
+ super(null, id, Type.SWITCH, null, null, null, null,
+ null, annotations);
+ }
+
+ @Override
+ public <B extends Behaviour> B as(Class<B> projectionClass) {
+ return (B) deviceDescription;
+ }
+
+ @Override
+ public <B extends Behaviour> boolean is(Class<B> projectionClass) {
+ return true;
+ }
+
+ }
+
+ private class SynchronousExecutor implements ExecutorService {
+ @Override
+ public void execute(Runnable task) {
+ task.run();
+ }
+
+ @Override
+ public void shutdown() {
+
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ return null;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return true;
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return true;
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+ return true;
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ return null;
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ return null;
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ return null;
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+ return null;
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ return null;
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+ return null;
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ return null;
+ }
+ }
+
}