Extending DeviceProvider interface to include triggerDisconnect method.

- extended interface with default method implementation
- modified DeviceManager to exploit the new provider feature
- refactored a number of device providers to use the new method
    instead of relying on indirect DEVICE_REMOVED events

Change-Id: Ib315357ef06463012fcf26bbe937c8cdccbf3a94
diff --git a/core/api/src/main/java/org/onosproject/net/device/DeviceProvider.java b/core/api/src/main/java/org/onosproject/net/device/DeviceProvider.java
index a98ec8b..ebdf7b1 100644
--- a/core/api/src/main/java/org/onosproject/net/device/DeviceProvider.java
+++ b/core/api/src/main/java/org/onosproject/net/device/DeviceProvider.java
@@ -25,8 +25,6 @@
  */
 public interface DeviceProvider extends Provider {
 
-    // TODO: consider how dirty the triggerProbe gets; if it costs too much, let's drop it
-
     /**
      * Triggers an asynchronous probe of the specified device, intended to
      * determine whether the device is present or not. An indirect result of this
@@ -67,4 +65,24 @@
      */
     void changePortState(DeviceId deviceId, PortNumber portNumber,
                          boolean enable);
+
+
+    /**
+     * Administratively triggers 'disconnection' from the device. This is meant
+     * purely in logical sense and is intended to apply equally to implementations
+     * relying on connectionless control protocols.
+     *
+     * An indirect result of this should be invocation of
+     * {@link org.onosproject.net.device.DeviceProviderService#deviceDisconnected}
+     * if the device was presently 'connected' and
+     * {@link org.onosproject.net.device.DeviceProviderService#deviceConnected}
+     * at some later point in time if the device is available and continues to
+     * be permitted to reconnect or if the provider continues to discover it.
+     *
+     * @param deviceId device identifier
+     */
+    default void triggerDisconnect(DeviceId deviceId) {
+        throw new UnsupportedOperationException(id() + " does not implement this feature");
+    }
+
 }
diff --git a/core/api/src/main/java/org/onosproject/net/provider/AbstractProviderRegistry.java b/core/api/src/main/java/org/onosproject/net/provider/AbstractProviderRegistry.java
index 0ce7b49..2c89221 100644
--- a/core/api/src/main/java/org/onosproject/net/provider/AbstractProviderRegistry.java
+++ b/core/api/src/main/java/org/onosproject/net/provider/AbstractProviderRegistry.java
@@ -82,7 +82,7 @@
     public synchronized void unregister(P provider) {
         checkNotNull(provider, "Provider cannot be null");
         S service = services.get(provider.id());
-        if (service != null && service instanceof AbstractProviderService) {
+        if (service instanceof AbstractProviderService) {
             ((AbstractProviderService) service).invalidate();
             services.remove(provider.id());
             providers.remove(provider.id());
diff --git a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
index 458d100..a5b49cf 100644
--- a/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
+++ b/core/net/src/main/java/org/onosproject/net/device/impl/DeviceManager.java
@@ -470,7 +470,7 @@
                         //relinquish master role and ability to be backup.
                         mastershipService.relinquishMastership(deviceId).get();
                     } catch (InterruptedException e) {
-                        log.warn("Interrupted while reliquishing role for {}", deviceId);
+                        log.warn("Interrupted while relinquishing role for {}", deviceId);
                         Thread.currentThread().interrupt();
                     } catch (ExecutionException e) {
                         log.error("Exception thrown while relinquishing role for {}", deviceId, e);
@@ -986,7 +986,19 @@
         public void notify(DeviceEvent event) {
             post(event);
             if (event.type().equals(DeviceEvent.Type.DEVICE_REMOVED)) {
-                deviceLocalStatus.remove(event.subject().id());
+                // When device is administratively removed, force disconnect.
+                DeviceId deviceId = event.subject().id();
+                deviceLocalStatus.remove(deviceId);
+
+                DeviceProvider provider = getProvider(deviceId);
+                if (provider != null) {
+                    log.info("Triggering disconnect for device {}", deviceId);
+                    try {
+                        provider.triggerDisconnect(deviceId);
+                    } catch (UnsupportedOperationException e) {
+                        log.warn("Unable to trigger disconnect due to {}", e.getMessage());
+                    }
+                }
             }
         }
     }
diff --git a/providers/bgp/topology/src/main/java/org/onosproject/provider/bgp/topology/impl/BgpTopologyProvider.java b/providers/bgp/topology/src/main/java/org/onosproject/provider/bgp/topology/impl/BgpTopologyProvider.java
index 806c961..b840a89 100644
--- a/providers/bgp/topology/src/main/java/org/onosproject/provider/bgp/topology/impl/BgpTopologyProvider.java
+++ b/providers/bgp/topology/src/main/java/org/onosproject/provider/bgp/topology/impl/BgpTopologyProvider.java
@@ -184,23 +184,17 @@
 
     private class InternalDeviceListener implements DeviceListener {
         @Override
+        public boolean isRelevant(DeviceEvent event) {
+            return event.type() == DeviceEvent.Type.DEVICE_ADDED &&
+                    mastershipService.isLocalMaster(event.subject().id());
+        }
+
+        @Override
         public void event(DeviceEvent event) {
             Device device = event.subject();
-
-            switch (event.type()) {
-                case DEVICE_ADDED:
-                    if (!mastershipService.isLocalMaster(device.id())) {
-                        break;
-                    }
-
-                    // Reserve device label pool for L3 devices
-                    if (device.annotations().value(LSRID) != null) {
-                        createDevicePool(device.id());
-                    }
-                    break;
-
-                default:
-                    break;
+            // Reserve device label pool for L3 devices
+            if (device.annotations().value(LSRID) != null) {
+                createDevicePool(device.id());
             }
         }
     }
@@ -480,7 +474,7 @@
 
     private void registerBandwidthAndTeMetric(LinkDescription linkDes, PathAttrNlriDetails details) {
         if (details ==  null) {
-            log.error("Couldnot able to register bandwidth ");
+            log.error("Unable to register bandwidth ");
             return;
         }
 
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index f735006..48d8007 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -85,6 +85,7 @@
 import java.util.Collections;
 import java.util.Dictionary;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -173,6 +174,8 @@
     private Set<DeviceId> driverConfigured = new CopyOnWriteArraySet<>();
     private Set<DeviceId> pipelineConfigured = new CopyOnWriteArraySet<>();
 
+    private final Map<DeviceId, DeviceHandshaker> handshakers = Maps.newConcurrentMap();
+
 
     protected ScheduledExecutorService connectionExecutor
             = newScheduledThreadPool(CORE_POOL_SIZE,
@@ -208,6 +211,7 @@
         cfgService.registerConfigFactory(factory);
         cfgService.addListener(cfgListener);
         deviceService.addListener(deviceListener);
+        handshakers.clear();
         //This will fail if ONOS has CFG and drivers which depend on this provider
         // are activated, failing due to not finding the driver.
         cfgService.getSubjects(DeviceId.class, GeneralProviderDeviceConfig.class)
@@ -239,10 +243,8 @@
                 scheduledTasks.put(deviceId, schedulePolling(deviceId, true));
             });
         }
-
     }
 
-
     @Deactivate
     public void deactivate() {
         portStatsExecutor.shutdown();
@@ -254,6 +256,7 @@
         connectionExecutor.shutdown();
         deviceService.removeListener(deviceListener);
         providerRegistry.unregister(this);
+        handshakers.clear();
         providerService = null;
         cfgService.unregisterConfigFactory(factory);
         log.info("Stopped");
@@ -266,7 +269,7 @@
 
     @Override
     public void triggerProbe(DeviceId deviceId) {
-        //TODO Really don't see the point of this in non OF Context,
+        // TODO Really don't see the point of this in non OF Context,
         // for now testing reachability, can be moved to no-op
         log.debug("Triggering probe equals testing reachability on device {}", deviceId);
         isReachable(deviceId);
@@ -330,10 +333,18 @@
         }
     }
 
+    @Override
+    public void triggerDisconnect(DeviceId deviceId) {
+        connectionExecutor.execute(() -> disconnectDevice(deviceId));
+    }
+
     private DeviceHandshaker getHandshaker(DeviceId deviceId) {
-        Driver driver = getDriver(deviceId);
-        return getBehaviour(driver, DeviceHandshaker.class,
-                new DefaultDriverData(driver, deviceId));
+        return handshakers.computeIfAbsent(deviceId, id -> {
+            Driver driver = getDriver(deviceId);
+            return driver == null ? null :
+                    getBehaviour(driver, DeviceHandshaker.class,
+                                 new DefaultDriverData(driver, deviceId));
+        });
     }
 
     private PortAdmin getPortAdmin(DeviceId deviceId) {
@@ -358,23 +369,6 @@
         return driver;
     }
 
-    //Distinguishing from getDriver to not impose everywhere the overhead to get the whole device.
-    // This is what the driverService does with the getDriver(deviceId) method.
-    // A redundant method here is needed because the driverService returns null when the device is not in the store
-    // as happens during disconnection.
-    // The whole device object is needed only in disconnection.
-    private Driver getDriverFromAnnotations(Device device) {
-        String driverName = device.annotations().value(DRIVER);
-        if (driverName != null) {
-            try {
-                return driverService.getDriver(driverName);
-            } catch (ItemNotFoundException e) {
-                log.warn("Specified driver {} not found, falling back.", driverName);
-            }
-        }
-        return null;
-    }
-
     //needed since the device manager will not return the driver through implementation()
     // method since the device is not pushed to the core so for the connectDevice
     // we need to work around that in order to test before calling
@@ -403,23 +397,14 @@
         } else {
             log.info("Connecting to device {} with driver {}", deviceId, basicDeviceConfig.driver());
 
-            Driver driver;
-            try {
-                driver = driverService.getDriver(basicDeviceConfig.driver());
-            } catch (ItemNotFoundException e) {
-                log.warn("The driver of {} is not found : {}", deviceId, e.getMessage());
-                return;
-            }
-
-            DriverData driverData = new DefaultDriverData(driver, deviceId);
-            DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
+            DeviceHandshaker handshaker = getHandshaker(deviceId);
             if (handshaker == null) {
-                log.error("Device {}, with driver {} does not support DeviceHandshaker " +
-                        "behaviour, {}", deviceId, driver.name(), driver.behaviours());
+                log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
                 return;
             }
+            Driver driver = handshaker.handler().driver();
 
-            addConfigData(providerConfig, driverData);
+            addConfigData(providerConfig, handshaker.data());
 
             //Connecting to the device
             CompletableFuture<Boolean> connected = handshaker.connect();
@@ -442,7 +427,7 @@
                     List<PortDescription> ports = new ArrayList<>();
 
                     DeviceDescriptionDiscovery deviceDiscovery = getBehaviour(driver,
-                            DeviceDescriptionDiscovery.class, driverData);
+                            DeviceDescriptionDiscovery.class, handshaker.data());
                     if (deviceDiscovery != null) {
                         DeviceDescription newdescription = deviceDiscovery.discoverDeviceDetails();
                         if (newdescription != null) {
@@ -454,7 +439,7 @@
                                 "description or ports.", deviceId);
                     }
 
-                    if (!handlePipeconf(deviceId, driver, driverData, true)) {
+                    if (!handlePipeconf(deviceId, driver, handshaker.data(), true)) {
                         // Something went wrong during handling of pipeconf.
                         // We already logged the error.
                         handshaker.disconnect();
@@ -471,27 +456,21 @@
     }
 
     private void connectStandbyDevice(DeviceId deviceId) {
-
-        //if device is pipeline programmable we merge pipeconf + base driver for every other role
+        // if device is pipeline programmable we merge pipeconf + base driver for every other role
         GeneralProviderDeviceConfig providerConfig =
                 cfgService.getConfig(deviceId, GeneralProviderDeviceConfig.class);
 
-        Driver driver = getDriver(deviceId);
-
-
-        DriverData driverData = new DefaultDriverData(driver, deviceId);
-        DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class, driverData);
+        DeviceHandshaker handshaker = getHandshaker(deviceId);
         if (handshaker == null) {
-            log.error("Device {}, with driver {} does not support DeviceHandshaker " +
-                    "behaviour, supported behaviours={}", deviceId, driver.name(), driver.behaviours());
+            log.error("Device {} does not support DeviceHandshaker behaviour", deviceId);
             return;
         }
-        addConfigData(providerConfig, driverData);
+        addConfigData(providerConfig, handshaker.data());
 
         //Connecting to the device
         handshaker.connect().thenAcceptAsync(result -> {
             if (result) {
-                handlePipeconf(deviceId, driver, driverData, false);
+                handlePipeconf(deviceId, handshaker.handler().driver(), handshaker.data(), false);
             }
         });
     }
@@ -501,9 +480,8 @@
      * device can be registered to the core, false otherwise.
      */
     private boolean handlePipeconf(DeviceId deviceId, Driver driver, DriverData driverData, boolean deployPipeconf) {
-
-        PiPipelineProgrammable pipelineProg = getBehaviour(driver, PiPipelineProgrammable.class,
-                driverData);
+        PiPipelineProgrammable pipelineProg =
+                getBehaviour(driver, PiPipelineProgrammable.class, driverData);
 
         if (pipelineProg == null) {
             // Device is not pipeline programmable.
@@ -513,14 +491,13 @@
         PiPipeconf pipeconf = getPipeconf(deviceId, pipelineProg);
 
         if (pipeconf != null) {
-
             PiPipeconfId pipeconfId = pipeconf.id();
 
             try {
                 if (deployPipeconf) {
                     if (!pipelineProg.deployPipeconf(pipeconf).get()) {
                         log.error("Unable to deploy pipeconf {} to {}, aborting device discovery",
-                                pipeconfId, deviceId);
+                                  pipeconfId, deviceId);
                         return false;
                     }
                 }
@@ -531,7 +508,7 @@
             try {
                 if (!piPipeconfService.bindToDevice(pipeconfId, deviceId).get()) {
                     log.error("Unable to merge driver {} for device {} with pipeconf {}, aborting device discovery",
-                            driver.name(), deviceId, pipeconfId);
+                              driver.name(), deviceId, pipeconfId);
                     return false;
                 }
             } catch (InterruptedException | ExecutionException e) {
@@ -568,7 +545,6 @@
             return null;
         }
 
-
         return piPipeconfService.getPipeconf(pipeconfId).get();
     }
 
@@ -577,41 +553,29 @@
         providerService.updatePorts(deviceId, ports);
     }
 
-    private void disconnectDevice(Device device) {
-        DeviceId deviceId = device.id();
+    private void disconnectDevice(DeviceId deviceId) {
         log.info("Disconnecting for device {}", deviceId);
 
-        //The driver service will return a null driver for the given deviceId
-        //since it's already removed form the device store, we leverage the device object from the DEVICE_REMOVED
-        //event to get the driver.
-        Driver driver = getDriverFromAnnotations(device);
-        if (driver != null) {
-            DeviceHandshaker handshaker = getBehaviour(driver, DeviceHandshaker.class,
-                    new DefaultDriverData(driver, deviceId));
-            if (handshaker != null) {
-                CompletableFuture<Boolean> disconnect = handshaker.disconnect();
-                disconnect.thenAcceptAsync(result -> {
-                    if (result) {
-                        log.info("Disconnected device {}", deviceId);
-                        providerService.deviceDisconnected(deviceId);
-                    } else {
-                        log.warn("Device {} was unable to disconnect", deviceId);
-                    }
-                });
-            } else {
-                //gracefully ignoring.
-                log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
-                        "shutdown of communication", deviceId);
-            }
+        DeviceHandshaker handshaker = handshakers.remove(deviceId);
+        if (handshaker != null) {
+            CompletableFuture<Boolean> disconnect = handshaker.disconnect();
+            disconnect.thenAcceptAsync(result -> {
+                if (result) {
+                    log.info("Disconnected device {}", deviceId);
+                    providerService.deviceDisconnected(deviceId);
+                } else {
+                    log.warn("Device {} was unable to disconnect", deviceId);
+                }
+            });
         } else {
             //gracefully ignoring.
-            log.warn("Can't find driver for device {}, no guarantees of complete shutdown of communication", deviceId);
+            log.warn("No DeviceHandshaker for device {}, no guarantees of complete " +
+                    "shutdown of communication", deviceId);
         }
         ScheduledFuture<?> pollingStatisticsTask = scheduledTasks.get(deviceId);
         if (pollingStatisticsTask != null) {
             pollingStatisticsTask.cancel(true);
         }
-
     }
 
     //Needed to catch the exception in the executors since are not rethrown otherwise.
@@ -649,7 +613,6 @@
      */
     private class InternalNetworkConfigListener implements NetworkConfigListener {
 
-
         @Override
         public void event(NetworkConfigEvent event) {
             DeviceId deviceId = (DeviceId) event.subject();
@@ -681,7 +644,6 @@
                     cleanUpConfigInfo(deviceId);
                 }
             }
-
         }
 
         private boolean processEvent(NetworkConfigEvent event, DeviceId deviceId) {
@@ -786,28 +748,20 @@
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            Type type = event.type();
             DeviceId deviceId = event.subject().id();
-            if (type.equals((Type.DEVICE_ADDED))) {
+            // FIXME handling for mastership change scenario missing?
 
-                // FIXME handling for mastership change scenario missing?
-
-                //For now this is scheduled periodically, when streaming API will
-                // be available we check and base it on the streaming API (e.g. gNMI)
-                if (mastershipService.isLocalMaster(deviceId)) {
-                    scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
-                }
-
-            } else if (type.equals(Type.DEVICE_REMOVED)) {
-
-                //Passing the whole device object to get driver information
-                connectionExecutor.execute(() -> disconnectDevice(event.subject()));
+            // For now this is scheduled periodically, when streaming API will
+            // be available we check and base it on the streaming API (e.g. gNMI)
+            if (mastershipService.isLocalMaster(deviceId)) {
+                scheduledTasks.put(deviceId, schedulePolling(deviceId, false));
             }
         }
 
         @Override
         public boolean isRelevant(DeviceEvent event) {
-            return event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
+            return event.type() == Type.DEVICE_ADDED &&
+                    event.subject().id().toString().startsWith(URI_SCHEME.toLowerCase());
         }
     }
 }
diff --git a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
index f675dbc..8281f4e 100644
--- a/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
+++ b/providers/netconf/device/src/main/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProvider.java
@@ -377,6 +377,12 @@
         }
     }
 
+    @Override
+    public void triggerDisconnect(DeviceId deviceId) {
+        log.debug("Forcing disconnect for device {}", deviceId);
+        controller.disconnectDevice(deviceId, true);
+    }
+
     private class InnerNetconfDeviceListener implements NetconfDeviceListener {
 
 
@@ -639,16 +645,14 @@
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            if ((event.type() == DeviceEvent.Type.DEVICE_ADDED)) {
-                executor.execute(() -> discoverPorts(event.subject().id()));
-            } else if ((event.type() == DeviceEvent.Type.DEVICE_REMOVED)) {
-                log.debug("removing device {}", event.subject().id());
-                controller.disconnectDevice(event.subject().id(), true);
-            }
+            executor.execute(() -> discoverPorts(event.subject().id()));
         }
 
         @Override
         public boolean isRelevant(DeviceEvent event) {
+            if (event.type() != DeviceEvent.Type.DEVICE_ADDED) {
+                return false;
+            }
             if (mastershipService.getMasterFor(event.subject().id()) == null) {
                 return true;
             }
diff --git a/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java b/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
index 29d16ba..53e7838 100644
--- a/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
+++ b/providers/netconf/device/src/test/java/org/onosproject/provider/netconf/device/impl/NetconfDeviceProviderTest.java
@@ -280,7 +280,7 @@
         deviceService.listener.event(new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, netconfDevice));
         assertEquals("Ports should be added", PORT_COUNT, providerService.ports.get(netconfDevice.id()).size());
 
-        deviceService.listener.event(new DeviceEvent(DeviceEvent.Type.DEVICE_REMOVED, netconfDevice));
+        provider.triggerDisconnect(netconfDevice.id());
         assertEquals("Ports should be removed", 0, providerService.ports.get(netconfDevice.id()).size());
     }
 
diff --git a/providers/openflow/device/src/main/java/org/onosproject/provider/of/device/impl/OpenFlowDeviceProvider.java b/providers/openflow/device/src/main/java/org/onosproject/provider/of/device/impl/OpenFlowDeviceProvider.java
index cce89b0..8cd3870 100644
--- a/providers/openflow/device/src/main/java/org/onosproject/provider/of/device/impl/OpenFlowDeviceProvider.java
+++ b/providers/openflow/device/src/main/java/org/onosproject/provider/of/device/impl/OpenFlowDeviceProvider.java
@@ -52,8 +52,6 @@
 import org.onosproject.net.device.DefaultPortDescription;
 import org.onosproject.net.device.DefaultPortStatistics;
 import org.onosproject.net.device.DeviceDescription;
-import org.onosproject.net.device.DeviceEvent;
-import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceProvider;
 import org.onosproject.net.device.DeviceProviderRegistry;
 import org.onosproject.net.device.DeviceProviderService;
@@ -124,6 +122,7 @@
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
@@ -132,11 +131,9 @@
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
-import static org.onosproject.net.Device.Type.CONTROLLER;
 import static org.onosproject.net.DeviceId.deviceId;
 import static org.onosproject.net.Port.Type.COPPER;
 import static org.onosproject.net.Port.Type.FIBER;
-import static org.onosproject.net.device.DeviceEvent.Type.DEVICE_REMOVED;
 import static org.onosproject.net.optical.device.OchPortHelper.ochPortDescription;
 import static org.onosproject.net.optical.device.OduCltPortHelper.oduCltPortDescription;
 import static org.onosproject.net.optical.device.OmsPortHelper.omsPortDescription;
@@ -451,7 +448,6 @@
     private DeviceProviderService providerService;
 
     private final InternalDeviceProvider listener = new InternalDeviceProvider();
-    private final InternalDeviceListener deviceListener = new InternalDeviceListener();
 
     private static final String POLL_PROP_NAME = "portStatsPollFrequency";
     private static final int POLL_INTERVAL = 5;
@@ -467,7 +463,7 @@
 
     private final Timer timer = new Timer("onos-openflow-portstats-collector");
 
-    private HashMap<Dpid, PortStatsCollector> collectors = Maps.newHashMap();
+    private Map<Dpid, PortStatsCollector> collectors = Maps.newConcurrentMap();
 
     /**
      * Creates an OpenFlow device provider.
@@ -480,7 +476,6 @@
     public void activate(ComponentContext context) {
         cfgService.registerProperties(getClass());
         providerService = providerRegistry.register(this);
-        deviceService.addListener(deviceListener);
         controller.addListener(listener);
         controller.addEventListener(listener);
 
@@ -494,7 +489,6 @@
     public void deactivate(ComponentContext context) {
         cfgService.unregisterProperties(getClass(), false);
         listener.disable();
-        deviceService.removeListener(deviceListener);
         controller.removeListener(listener);
         providerRegistry.unregister(this);
         collectors.values().forEach(PortStatsCollector::stop);
@@ -621,6 +615,18 @@
         sw.sendMsg(Collections.singletonList(pmb.build()));
     }
 
+    @Override
+    public void triggerDisconnect(DeviceId deviceId) {
+        Dpid dpid = dpid(deviceId.uri());
+        OpenFlowSwitch sw = controller.getSwitch(dpid);
+        if (sw != null) {
+            LOG.debug("Forcing disconnect for device {}", deviceId);
+            // TODO: Further consolidate clean-up on device disconnect
+            listener.switchRemoved(dpid);
+            sw.disconnectSwitch();
+        }
+    }
+
     private void pushPortMetrics(Dpid dpid, List<OFPortStatsEntry> portStatsEntries) {
         DeviceId deviceId = DeviceId.deviceId(Dpid.uri(dpid));
         Collection<PortStatistics> stats =
@@ -1532,27 +1538,4 @@
         }
     }
 
-    class InternalDeviceListener implements DeviceListener {
-
-        @Override
-        public boolean isRelevant(DeviceEvent event) {
-            return event.subject().type() != CONTROLLER && event.type() == DEVICE_REMOVED
-                    && event.subject().id().uri().getScheme().equals(SCHEME);
-        }
-
-        @Override
-        public void event(DeviceEvent event) {
-            DeviceId deviceId = event.subject().id();
-            Dpid dpid = dpid(deviceId.uri());
-            OpenFlowSwitch sw = controller.getSwitch(dpid);
-            if (sw != null) {
-                LOG.debug("Forcing disconnect for device {}", deviceId);
-                PortStatsCollector portStatsCollector = collectors.remove(dpid);
-                if (portStatsCollector != null) {
-                    portStatsCollector.stop();
-                }
-                sw.disconnectSwitch();
-            }
-        }
-    }
 }
diff --git a/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java b/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java
index d439766..a520035 100644
--- a/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java
+++ b/providers/ovsdb/device/src/main/java/org/onosproject/ovsdb/providers/device/OvsdbDeviceProvider.java
@@ -187,27 +187,27 @@
         }
     }
 
+    @Override
+    public void triggerDisconnect(DeviceId deviceId) {
+        log.debug("Forcing disconnect for device {}", deviceId);
+        OvsdbNodeId ovsdbNodeId = changeDeviceIdToNodeId(deviceId);
+        OvsdbClientService client = controller.getOvsdbClient(ovsdbNodeId);
+        if (client != null) {
+            client.disconnect();
+        }
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
-            DeviceId deviceId = event.subject().id();
-
-            if ((event.type() == DeviceEvent.Type.DEVICE_ADDED)) {
-                executor.execute(() -> discoverPorts(deviceId));
-            } else if ((event.type() == DeviceEvent.Type.DEVICE_REMOVED)) {
-                log.debug("removing device {}", event.subject().id());
-                OvsdbNodeId ovsdbNodeId = changeDeviceIdToNodeId(deviceId);
-                OvsdbClientService client = controller.getOvsdbClient(ovsdbNodeId);
-                if (client != null) {
-                    client.disconnect();
-                }
-            }
+            executor.execute(() -> discoverPorts(event.subject().id()));
         }
 
         @Override
         public boolean isRelevant(DeviceEvent event) {
             DeviceId deviceId = event.subject().id();
-            return isRelevant(deviceId) && mastershipService.isLocalMaster(deviceId);
+            return event.type() == DeviceEvent.Type.DEVICE_ADDED &&
+                    isRelevant(deviceId) && mastershipService.isLocalMaster(deviceId);
         }
 
         private boolean isRelevant(DeviceId deviceId) {