[ONOS-6576] Add ports discovery when OVSDB DEVICE_ADDED event occurs

Change-Id: Ie933bd7e2fe2ee65dec5e8a93b67c0a739283eb8
diff --git a/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java b/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java
index 455423b..50b8c83 100644
--- a/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java
+++ b/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java
@@ -16,9 +16,13 @@
 package org.onosproject.ovsdb.providers.device;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.net.URI;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -28,6 +32,7 @@
 import org.apache.felix.scr.annotations.Service;
 import org.onlab.packet.ChassisId;
 import org.onlab.packet.IpAddress;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
@@ -36,9 +41,13 @@
 import org.onosproject.net.SparseAnnotations;
 import org.onosproject.net.device.DefaultDeviceDescription;
 import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceDescriptionDiscovery;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceProvider;
 import org.onosproject.net.device.DeviceProviderRegistry;
 import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.ovsdb.controller.OvsdbClientService;
@@ -62,15 +71,28 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OvsdbController controller;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
     private DeviceProviderService providerService;
     private OvsdbNodeListener innerNodeListener = new InnerOvsdbNodeListener();
+    private InternalDeviceListener deviceListener = new InternalDeviceListener();
     protected static final String ISNOTNULL = "OvsdbNodeId is not null";
+    protected static final String SCHEME_NAME = "ovsdb";
     private static final String UNKNOWN = "unknown";
 
+    protected ExecutorService executor =
+            Executors.newFixedThreadPool(5, groupedThreads("onos/ovsdbdeviceprovider",
+                                                           "device-installer-%d", log));
+
     @Activate
     public void activate() {
         providerService = providerRegistry.register(this);
         controller.addNodeListener(innerNodeListener);
+        deviceService.addListener(deviceListener);
         log.info("Started");
     }
 
@@ -78,6 +100,8 @@
     public void deactivate() {
         controller.removeNodeListener(innerNodeListener);
         providerRegistry.unregister(this);
+        deviceService.removeListener(deviceListener);
+        waitForTasksToEnd();
         providerService = null;
         log.info("Stopped");
     }
@@ -152,4 +176,46 @@
                                 boolean enable) {
         // TODO if required
     }
+
+    private void discoverPorts(DeviceId deviceId) {
+        Device device = deviceService.getDevice(deviceId);
+        if (device.is(DeviceDescriptionDiscovery.class)) {
+            DeviceDescriptionDiscovery deviceDescriptionDiscovery = device.as(DeviceDescriptionDiscovery.class);
+            providerService.updatePorts(deviceId, deviceDescriptionDiscovery.discoverPortDetails());
+        } else {
+            log.warn("Device " + deviceId + " does not support behaviour DeviceDescriptionDiscovery");
+        }
+    }
+
+    private class InternalDeviceListener implements DeviceListener {
+        @Override
+        public void event(DeviceEvent event) {
+            DeviceId deviceId = event.subject().id();
+            if (!isRelevant(deviceId)) {
+                return;
+            }
+            if ((event.type() == DeviceEvent.Type.DEVICE_ADDED)) {
+                executor.execute(() -> discoverPorts(deviceId));
+            }
+        }
+
+        @Override
+        public boolean isRelevant(DeviceEvent event) {
+            DeviceId deviceId = event.subject().id();
+            return isRelevant(deviceId) && mastershipService.isLocalMaster(deviceId);
+        }
+
+        private boolean isRelevant(DeviceId deviceId) {
+            return deviceId.uri().getScheme().equals(SCHEME_NAME);
+        }
+    }
+
+    private void waitForTasksToEnd() {
+        executor.shutdown();
+        try {
+            executor.awaitTermination(5, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            log.error("Timeout while waiting for child threads to finish because: " + e.getMessage());
+        }
+    }
 }
diff --git a/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java b/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java
index 11bcd36..5ce7367 100644
--- a/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java
+++ b/providers/ovsdb/device/src/test/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProviderTest.java
@@ -15,28 +15,33 @@
  */
 package org.onosproject.ovsdb.providers.device;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.function.Consumer;
-
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.TpPort;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.MastershipRole;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DefaultPortDescription;
 import org.onosproject.net.device.DeviceDescription;
+import org.onosproject.net.device.DeviceDescriptionDiscovery;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceProvider;
 import org.onosproject.net.device.DeviceProviderRegistry;
 import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.DeviceServiceAdapter;
 import org.onosproject.net.device.PortDescription;
 import org.onosproject.net.device.PortStatistics;
+import org.onosproject.net.driver.AbstractHandlerBehaviour;
+import org.onosproject.net.driver.Behaviour;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.ovsdb.controller.OvsdbClientService;
 import org.onosproject.ovsdb.controller.OvsdbController;
@@ -44,8 +49,21 @@
 import org.onosproject.ovsdb.controller.OvsdbNodeId;
 import org.onosproject.ovsdb.controller.OvsdbNodeListener;
 
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Multimap;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 /**
  * Test for ovsdb device provider.
@@ -54,11 +72,24 @@
     private final OvsdbDeviceProvider provider = new OvsdbDeviceProvider();
     private final TestDeviceRegistry registry = new TestDeviceRegistry();
     private final TestController controller = new TestController();
+    private final TestDeviceService deviceService = new TestDeviceService();
+
+    private final Device ovsdbDevice = new MockDevice(
+            DeviceId.deviceId("ovsdb:127.0.0.1"),
+            DefaultAnnotations.EMPTY);
+
+    private final Device notOvsdbDevice = new MockDevice(
+            DeviceId.deviceId("other:127.0.0.1"),
+            DefaultAnnotations.EMPTY);
+
+    private final TestDescription deviceDescription = new TestDescription();
+
 
     @Before
     public void startUp() {
         provider.providerRegistry = registry;
         provider.controller = controller;
+        provider.deviceService = deviceService;
         provider.activate();
         assertNotNull("provider should be registered", registry.provider);
     }
@@ -68,11 +99,7 @@
         provider.deactivate();
         provider.controller = null;
         provider.providerRegistry = null;
-    }
-
-    @Test
-    public void triggerProbe() {
-
+        provider.deviceService = null;
     }
 
     @Test
@@ -91,11 +118,30 @@
         assertEquals("ovsdb node removded", 0, registry.connected.size());
     }
 
+    @Test
+    public void testDiscoverPortsAfterDeviceAdded() {
+        final int portCount = 5;
+        provider.executor = new SynchronousExecutor();
+        prepareMocks(portCount);
+
+        deviceService.listener.event(new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, ovsdbDevice));
+        deviceService.listener.event(new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, notOvsdbDevice));
+
+        assertEquals(portCount, registry.ports.get(ovsdbDevice.id()).size());
+        assertEquals(0, registry.ports.get(notOvsdbDevice.id()).size());
+    }
+
+    private void prepareMocks(int count) {
+        for(int i = 1; i <= count; i++) {
+            deviceDescription.portDescriptions.add(new DefaultPortDescription(PortNumber.portNumber(i), true));
+        }
+    }
+
     private class TestDeviceRegistry implements DeviceProviderRegistry {
         DeviceProvider provider;
 
-        Set<DeviceId> connected = new HashSet<>();
-        Multimap<DeviceId, PortDescription> ports = HashMultimap.create();
+        final Set<DeviceId> connected = new HashSet<>();
+        final Multimap<DeviceId, PortDescription> ports = HashMultimap.create();
         PortDescription descr = null;
 
         @Override
@@ -207,4 +253,125 @@
 
     }
 
+    private class TestDeviceService extends DeviceServiceAdapter {
+        DeviceListener listener = null;
+
+        @Override
+        public Device getDevice(DeviceId deviceId) {
+            return ovsdbDevice;
+        }
+
+        @Override
+        public void addListener(DeviceListener listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void removeListener(DeviceListener listener) {
+            this.listener = null;
+        }
+    }
+
+    private class TestDescription extends AbstractHandlerBehaviour implements DeviceDescriptionDiscovery {
+
+        final List<PortDescription> portDescriptions = new ArrayList<>();
+
+        @Override
+        public DeviceDescription discoverDeviceDetails() {
+            return null;
+        }
+
+        @Override
+        public List<PortDescription> discoverPortDetails() {
+            return portDescriptions;
+        }
+    }
+
+    private class MockDevice extends DefaultDevice {
+
+        MockDevice(DeviceId id,
+                   Annotations... annotations) {
+            super(null, id, Type.SWITCH, null, null, null, null,
+                  null, annotations);
+        }
+
+        @Override
+        public <B extends Behaviour> B as(Class<B> projectionClass) {
+            return (B) deviceDescription;
+        }
+
+        @Override
+        public <B extends Behaviour> boolean is(Class<B> projectionClass) {
+            return true;
+        }
+
+    }
+
+    private class SynchronousExecutor implements ExecutorService {
+        @Override
+        public void execute(Runnable task) {
+            task.run();
+        }
+
+        @Override
+        public void shutdown() {
+
+        }
+
+        @Override
+        public List<Runnable> shutdownNow() {
+            return null;
+        }
+
+        @Override
+        public boolean isShutdown() {
+            return true;
+        }
+
+        @Override
+        public boolean isTerminated() {
+            return true;
+        }
+
+        @Override
+        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
+            return true;
+        }
+
+        @Override
+        public <T> Future<T> submit(Callable<T> task) {
+            return null;
+        }
+
+        @Override
+        public <T> Future<T> submit(Runnable task, T result) {
+            return null;
+        }
+
+        @Override
+        public Future<?> submit(Runnable task) {
+            return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException {
+            return null;
+        }
+
+        @Override
+        public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException {
+            return null;
+        }
+
+        @Override
+        public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+            return null;
+        }
+    }
+
 }