ONOS-4422 Implemented device discovery in Bmv2 device provider and other
improvements

- Added listener for hello messages received from Bmv2 devices
- Added a periodic poller task to check device reachability and port
information updates
- Avoids periodically re-connecting the device if it is already
available in the core
- Fixed minor bug in Bmv2ThriftClient

Change-Id: I416d1880773e11b2ac6fa062d8be2b8f280786fb
diff --git a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java
index db0d5e2..f0e68f7 100644
--- a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java
+++ b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/Bmv2ThriftClient.java
@@ -80,9 +80,9 @@
     // Seconds after a client is expired (and connection closed) in the cache.
     private static final int CLIENT_CACHE_TIMEOUT = 60;
     // Number of connection retries after a network error.
-    private static final int NUM_CONNECTION_RETRIES = 10;
+    private static final int NUM_CONNECTION_RETRIES = 3;
     // Time between retries in milliseconds.
-    private static final int TIME_BETWEEN_RETRIES = 200;
+    private static final int TIME_BETWEEN_RETRIES = 300;
 
     // Static client cache where clients are removed after a predefined timeout.
     private static final LoadingCache<DeviceId, Bmv2ThriftClient>
@@ -125,6 +125,15 @@
     }
 
     /**
+     * Force a close of the transport session (if one is open) with the given device.
+     *
+     * @param deviceId device id
+     */
+    public static void forceDisconnectOf(DeviceId deviceId) {
+        CLIENT_CACHE.invalidate(deviceId);
+    }
+
+    /**
      * Pings the device. Returns true if the device is reachable,
      * false otherwise.
      *
@@ -392,7 +401,7 @@
             LOG.debug("Packet transmission requested! > portNumber={}, packet={}", portNumber, packet);
         } catch (TException e) {
             LOG.debug("Exception while requesting packet transmission: {} > portNumber={}, packet={}",
-                      portNumber, packet);
+                      e, portNumber, packet);
             throw new Bmv2RuntimeException(e.getMessage(), e);
         }
     }
diff --git a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java
index bbe0546..95a052a 100644
--- a/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java
+++ b/protocols/bmv2/src/main/java/org/onosproject/bmv2/ctl/SafeThriftClient.java
@@ -173,7 +173,12 @@
         private static void reconnectOrThrowException(TTransport transport, int maxRetries, long timeBetweenRetries)
                 throws TTransportException {
             int errors = 0;
-            transport.close();
+            try {
+                transport.close();
+            } catch (Exception e) {
+                // Thrift seems to have a bug where if the transport is already closed a SocketException is thrown.
+                // However, such an exception is not advertised by .close(), hence the general-purpose catch.
+            }
 
             while (errors < maxRetries) {
                 try {
@@ -182,7 +187,7 @@
                     LOG.debug("Reconnection successful");
                     break;
                 } catch (TTransportException e) {
-                    LOG.error("Error while reconnecting:", e);
+                    LOG.debug("Error while reconnecting:", e);
                     errors++;
 
                     if (errors < maxRetries) {
diff --git a/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
index 10a6997..2d16ff2 100644
--- a/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
+++ b/providers/bmv2/device/src/main/java/org/onosproject/provider/bmv2/device/impl/Bmv2DeviceProvider.java
@@ -16,11 +16,17 @@
 
 package org.onosproject.provider.bmv2.device.impl;
 
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.jboss.netty.util.Timeout;
+import org.jboss.netty.util.TimerTask;
 import org.onlab.packet.ChassisId;
+import org.onlab.util.Timer;
+import org.onosproject.bmv2.api.runtime.Bmv2ControlPlaneServer;
+import org.onosproject.bmv2.api.runtime.Bmv2Device;
 import org.onosproject.common.net.AbstractDeviceProvider;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
@@ -40,17 +46,20 @@
 import org.onosproject.net.device.DefaultDeviceDescription;
 import org.onosproject.net.device.DeviceDescription;
 import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.device.PortDescription;
 import org.onosproject.net.provider.ProviderId;
 import org.slf4j.Logger;
 
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.Set;
+import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.bmv2.ctl.Bmv2ThriftClient.forceDisconnectOf;
 import static org.onosproject.bmv2.ctl.Bmv2ThriftClient.ping;
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -61,30 +70,14 @@
 @Component(immediate = true)
 public class Bmv2DeviceProvider extends AbstractDeviceProvider {
 
-    private final Logger log = getLogger(Bmv2DeviceProvider.class);
+    private static final Logger LOG = getLogger(Bmv2DeviceProvider.class);
 
     public static final String MANUFACTURER = "p4.org";
     public static final String HW_VERSION = "bmv2";
+    public static final String SCHEME = "bmv2";
     private static final String APP_NAME = "org.onosproject.bmv2";
     private static final String UNKNOWN = "unknown";
-    public static final String SCHEME = "bmv2";
-
-    private final ExecutorService deviceDiscoveryExecutor = Executors
-            .newFixedThreadPool(5, groupedThreads("onos/bmv2", "device-discovery", log));
-
-    private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
-
-    private final ConfigFactory cfgFactory =
-            new ConfigFactory<ApplicationId, Bmv2ProviderConfig>(
-                    APP_SUBJECT_FACTORY, Bmv2ProviderConfig.class,
-                    "devices", true) {
-                @Override
-                public Bmv2ProviderConfig createConfig() {
-                    return new Bmv2ProviderConfig();
-                }
-            };
-
-    private final Set<DeviceId> activeDevices = Sets.newConcurrentHashSet();
+    private static final int POLL_INTERVAL = 5; // seconds
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigRegistry netCfgService;
@@ -95,6 +88,17 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected Bmv2ControlPlaneServer controlPlaneServer;
+
+    private final ExecutorService deviceDiscoveryExecutor = Executors
+            .newFixedThreadPool(5, groupedThreads("onos/bmv2", "device-discovery", LOG));
+
+    private final NetworkConfigListener cfgListener = new InternalNetworkConfigListener();
+    private final ConfigFactory cfgFactory = new InternalConfigFactory();
+    private final ConcurrentMap<DeviceId, Boolean> activeDevices = Maps.newConcurrentMap();
+    private final DevicePoller devicePoller = new DevicePoller();
+    private final InternalHelloListener helloListener = new InternalHelloListener();
     private ApplicationId appId;
 
     /**
@@ -104,56 +108,66 @@
         super(new ProviderId("bmv2", "org.onosproject.provider.device"));
     }
 
-    protected static DeviceId deviceIdOf(Bmv2ProviderConfig.Bmv2DeviceInfo info) {
+    private static DeviceId deviceIdOf(String ip, int port) {
         try {
-            return DeviceId.deviceId(new URI(
-                    SCHEME, info.ip().toString() + ":" + info.port(), null));
+            return DeviceId.deviceId(new URI(SCHEME, ip + ":" + port, null));
         } catch (URISyntaxException e) {
-            throw new IllegalArgumentException(
-                    "Unable to build deviceID for device "
-                            + info.ip().toString() + ":" + info.ip().toString(),
-                    e);
+            throw new IllegalArgumentException("Unable to build deviceID for device " + ip + ":" + port, e);
         }
     }
 
+    /**
+     * Creates a new device ID for the given BMv2 device.
+     *
+     * @param device a BMv2 device object
+     *
+     * @return a new device ID
+     */
+    public static DeviceId deviceIdOf(Bmv2Device device) {
+        return deviceIdOf(device.thriftServerHost(), device.thriftServerPort());
+    }
+
     @Override
     protected void activate() {
         appId = coreService.registerApplication(APP_NAME);
         netCfgService.registerConfigFactory(cfgFactory);
         netCfgService.addListener(cfgListener);
-
+        controlPlaneServer.addHelloListener(helloListener);
+        devicePoller.start();
         super.activate();
     }
 
     @Override
     protected void deactivate() {
+        devicePoller.stop();
+        controlPlaneServer.removeHelloListener(helloListener);
         try {
-            activeDevices.stream().forEach(did -> {
+            activeDevices.forEach((did, value) -> {
                 deviceDiscoveryExecutor.execute(() -> disconnectDevice(did));
             });
             deviceDiscoveryExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
-            log.error("Device discovery threads did not terminate");
+            LOG.error("Device discovery threads did not terminate");
         }
         deviceDiscoveryExecutor.shutdownNow();
         netCfgService.unregisterConfigFactory(cfgFactory);
         netCfgService.removeListener(cfgListener);
-
         super.deactivate();
     }
 
     @Override
     public void triggerProbe(DeviceId deviceId) {
+        // Asynchronously trigger probe task.
         deviceDiscoveryExecutor.execute(() -> executeProbe(deviceId));
     }
 
     private void executeProbe(DeviceId did) {
         boolean reachable = isReachable(did);
-        log.debug("Probed device: id={}, reachable={}",
+        LOG.debug("Probed device: id={}, reachable={}",
                   did.toString(),
                   reachable);
         if (reachable) {
-            connectDevice(did);
+            discoverDevice(did);
         } else {
             disconnectDevice(did);
         }
@@ -161,7 +175,7 @@
 
     @Override
     public void roleChanged(DeviceId deviceId, MastershipRole newRole) {
-        log.debug("roleChanged() is not yet implemented");
+        LOG.debug("roleChanged() is not yet implemented");
         // TODO: implement mastership handling
     }
 
@@ -172,41 +186,69 @@
 
     @Override
     public void changePortState(DeviceId deviceId, PortNumber portNumber, boolean enable) {
-        log.debug("changePortState() is not yet implemented");
+        LOG.debug("changePortState() is not yet implemented");
         // TODO: implement port handling
     }
 
-    private void connectDevice(DeviceId did) {
-        log.debug("Trying to create device on ONOS core: {}", did);
-        SparseAnnotations annotations = DefaultAnnotations.builder()
-                .set(AnnotationKeys.PROTOCOL, SCHEME)
-                .build();
-        DeviceDescription descr = new DefaultDeviceDescription(
-                did.uri(), Device.Type.SWITCH, MANUFACTURER, HW_VERSION,
-                UNKNOWN, UNKNOWN, new ChassisId(), annotations);
-        providerService.deviceConnected(did, descr);
-        activeDevices.add(did);
-        discoverPorts(did);
-    }
+    private void discoverDevice(DeviceId did) {
+        LOG.debug("Starting device discovery... deviceId={}", did);
 
-    private void discoverPorts(DeviceId did) {
-        Device device = deviceService.getDevice(did);
-        if (device.is(PortDiscovery.class)) {
-            PortDiscovery portConfig = device.as(PortDiscovery.class);
-            providerService.updatePorts(did, portConfig.getPorts());
-        } else {
-            log.warn("No PortDiscovery behavior for device {}", did);
-        }
+        // Atomically notify device to core and update port information.
+        activeDevices.compute(did, (k, v) -> {
+            if (!deviceService.isAvailable(did)) {
+                // Device not available in the core, connect it now.
+                SparseAnnotations annotations = DefaultAnnotations.builder()
+                        .set(AnnotationKeys.PROTOCOL, SCHEME)
+                        .build();
+                DeviceDescription descr = new DefaultDeviceDescription(
+                        did.uri(), Device.Type.SWITCH, MANUFACTURER, HW_VERSION,
+                        UNKNOWN, UNKNOWN, new ChassisId(), annotations);
+                providerService.deviceConnected(did, descr);
+            }
+            // Discover ports.
+            Device device = deviceService.getDevice(did);
+            if (device.is(PortDiscovery.class)) {
+                PortDiscovery portConfig = device.as(PortDiscovery.class);
+                List<PortDescription> portDescriptions = portConfig.getPorts();
+                providerService.updatePorts(did, portDescriptions);
+            } else {
+                LOG.warn("No PortDiscovery behavior for device {}", did);
+            }
+            return true;
+        });
     }
 
     private void disconnectDevice(DeviceId did) {
-        log.debug("Trying to remove device from ONOS core: {}", did);
-        providerService.deviceDisconnected(did);
-        activeDevices.remove(did);
+        LOG.debug("Trying to disconnect device from core... deviceId={}", did);
+
+        // Atomically disconnect device.
+        activeDevices.compute(did, (k, v) -> {
+            if (deviceService.isAvailable(did)) {
+                providerService.deviceDisconnected(did);
+                // Make sure to close the transport session with device.
+                forceDisconnectOf(did);
+            }
+            return null;
+        });
     }
 
     /**
-     * Handles net-cfg events.
+     * Internal net-cfg config factory.
+     */
+    private class InternalConfigFactory extends ConfigFactory<ApplicationId, Bmv2ProviderConfig> {
+
+        InternalConfigFactory() {
+            super(APP_SUBJECT_FACTORY, Bmv2ProviderConfig.class, "devices", true);
+        }
+
+        @Override
+        public Bmv2ProviderConfig createConfig() {
+            return new Bmv2ProviderConfig();
+        }
+    }
+
+    /**
+     * Internal net-cfg event listener.
      */
     private class InternalNetworkConfigListener implements NetworkConfigListener {
 
@@ -216,13 +258,13 @@
             if (cfg != null) {
                 try {
                     cfg.getDevicesInfo().stream().forEach(info -> {
-                        triggerProbe(deviceIdOf(info));
+                        triggerProbe(deviceIdOf(info.ip().toString(), info.port()));
                     });
                 } catch (ConfigException e) {
-                    log.error("Unable to read config: " + e);
+                    LOG.error("Unable to read config: " + e);
                 }
             } else {
-                log.error("Unable to read config (was null)");
+                LOG.error("Unable to read config (was null)");
             }
         }
 
@@ -233,4 +275,50 @@
                             event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED);
         }
     }
+
+    /**
+     * Listener triggered by Bmv2ControlPlaneServer each time a hello message is received.
+     */
+    private class InternalHelloListener implements Bmv2ControlPlaneServer.HelloListener {
+        @Override
+        public void handleHello(Bmv2Device device) {
+            log.debug("Received hello from {}", device);
+            triggerProbe(deviceIdOf(device));
+        }
+    }
+
+    /**
+     * Task that periodically trigger device probes.
+     */
+    private class DevicePoller implements TimerTask {
+
+        private final HashedWheelTimer timer = Timer.getTimer();
+        private Timeout timeout;
+
+        @Override
+        public void run(Timeout timeout) throws Exception {
+            if (timeout.isCancelled()) {
+                return;
+            }
+            log.debug("Executing polling on {} devices...", activeDevices.size());
+            activeDevices.forEach((did, value) -> triggerProbe(did));
+            timeout.getTimer().newTimeout(this, POLL_INTERVAL, TimeUnit.SECONDS);
+        }
+
+        /**
+         * Starts the collector.
+         */
+         synchronized void start() {
+            LOG.info("Starting device poller...");
+            timeout = timer.newTimeout(this, 1, TimeUnit.SECONDS);
+        }
+
+        /**
+         * Stops the collector.
+         */
+        synchronized void stop() {
+            LOG.info("Stopping device poller...");
+            timeout.cancel();
+        }
+    }
 }