Implement an option in CPRM to reprogram the flows when device reconnecting

Also remove unused AsyncDeviceFetcher in FibInstaller

Change-Id: I52e778a51854efd6bfe47c56569efa5c27d7c7fb
diff --git a/apps/routing-api/src/main/java/org/onosproject/routing/AsyncDeviceFetcher.java b/apps/routing-api/src/main/java/org/onosproject/routing/AsyncDeviceFetcher.java
index 65ffec0..4d14bc5 100644
--- a/apps/routing-api/src/main/java/org/onosproject/routing/AsyncDeviceFetcher.java
+++ b/apps/routing-api/src/main/java/org/onosproject/routing/AsyncDeviceFetcher.java
@@ -22,7 +22,7 @@
 import org.onosproject.net.device.DeviceService;
 
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -36,7 +36,8 @@
 
     private DeviceListener listener = new InternalDeviceListener();
 
-    private Map<DeviceId, CompletableFuture<DeviceId>> devices = new ConcurrentHashMap();
+    private Map<DeviceId, Runnable> onConnect = new ConcurrentHashMap<>();
+    private Map<DeviceId, Runnable> onDisconnect = new ConcurrentHashMap<>();
 
     private AsyncDeviceFetcher(DeviceService deviceService) {
         this.deviceService = checkNotNull(deviceService);
@@ -48,24 +49,27 @@
      */
     public void shutdown() {
         deviceService.removeListener(listener);
-        devices.clear();
+        onConnect.clear();
+        onDisconnect.clear();
     }
 
     /**
-     * Returns a completable future that completes when the device is available
-     * for the first time.
-     *
-     * @param deviceId ID of the device
-     * @return completable future
+     * Executes provided callback when given device connects/disconnects.
+     * @param deviceId device ID
+     * @param onConnect callback that will be executed immediately if the device
+     *                  is currently online, or when the device becomes online
+     * @param onDisconnect callback that will be executed when the device becomes offline
      */
-    public CompletableFuture<DeviceId> getDevice(DeviceId deviceId) {
-        CompletableFuture<DeviceId> future = new CompletableFuture<>();
-        return devices.computeIfAbsent(deviceId, deviceId1 -> {
+     void registerCallback(DeviceId deviceId, Runnable onConnect, Runnable onDisconnect) {
+        if (onConnect != null) {
             if (deviceService.isAvailable(deviceId)) {
-                future.complete(deviceId);
+                onConnect.run();
             }
-            return future;
-        });
+            this.onConnect.put(deviceId, onConnect);
+        }
+        if (onDisconnect != null) {
+            this.onDisconnect.put(deviceId, onDisconnect);
+        }
     }
 
     /**
@@ -82,24 +86,23 @@
         @Override
         public void event(DeviceEvent event) {
             switch (event.type()) {
-            case DEVICE_ADDED:
-            case DEVICE_AVAILABILITY_CHANGED:
-                if (deviceService.isAvailable(event.subject().id())) {
+                case DEVICE_ADDED:
+                case DEVICE_AVAILABILITY_CHANGED:
                     DeviceId deviceId = event.subject().id();
-                    CompletableFuture<DeviceId> future = devices.get(deviceId);
-                    if (future != null) {
-                        future.complete(deviceId);
+                    if (deviceService.isAvailable(deviceId)) {
+                        Optional.ofNullable(onConnect.get(deviceId)).ifPresent(Runnable::run);
+                    } else {
+                        Optional.ofNullable(onDisconnect.get(deviceId)).ifPresent(Runnable::run);
                     }
-                }
-                break;
-            case DEVICE_UPDATED:
-            case DEVICE_REMOVED:
-            case DEVICE_SUSPENDED:
-            case PORT_ADDED:
-            case PORT_UPDATED:
-            case PORT_REMOVED:
-            default:
-                break;
+                    break;
+                case DEVICE_UPDATED:
+                case DEVICE_REMOVED:
+                case DEVICE_SUSPENDED:
+                case PORT_ADDED:
+                case PORT_UPDATED:
+                case PORT_REMOVED:
+                default:
+                    break;
             }
         }
     }
diff --git a/apps/routing-api/src/main/java/org/onosproject/routing/Router.java b/apps/routing-api/src/main/java/org/onosproject/routing/Router.java
index 97fee88..855084e 100644
--- a/apps/routing-api/src/main/java/org/onosproject/routing/Router.java
+++ b/apps/routing-api/src/main/java/org/onosproject/routing/Router.java
@@ -53,9 +53,9 @@
     private InterfaceService interfaceService;
     private InterfaceListener listener = new InternalInterfaceListener();
 
-    private AsyncDeviceFetcher asyncDeviceFetcher;
+    private DeviceService deviceService;
 
-    private volatile boolean deviceAvailable = false;
+    private AsyncDeviceFetcher asyncDeviceFetcher;
 
     /**
      * Creates a new router interface manager.
@@ -65,27 +65,26 @@
      * @param deviceService device service
      * @param provisioner consumer that will provision new interfaces
      * @param unprovisioner consumer that will unprovision old interfaces
+     * @param forceUnprovision force unprovision when the device goes offline
      */
     public Router(RouterInfo info,
                   InterfaceService interfaceService,
                   DeviceService deviceService,
                   Consumer<InterfaceProvisionRequest> provisioner,
-                  Consumer<InterfaceProvisionRequest> unprovisioner) {
+                  Consumer<InterfaceProvisionRequest> unprovisioner,
+                  boolean forceUnprovision) {
         this.info = checkNotNull(info);
         this.provisioner = checkNotNull(provisioner);
         this.unprovisioner = checkNotNull(unprovisioner);
         this.interfaceService = checkNotNull(interfaceService);
+        this.deviceService = checkNotNull(deviceService);
 
         this.asyncDeviceFetcher = AsyncDeviceFetcher.create(deviceService);
-        asyncDeviceFetcher.getDevice(info.deviceId())
-                .thenAccept(deviceId1 -> {
-                    deviceAvailable = true;
-                    provision();
-                }).whenComplete((v, t) -> {
-                    if (t != null) {
-                        log.error("Error provisioning: ", t);
-                    }
-                });
+        if (forceUnprovision) {
+            asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, this::forceUnprovision);
+        } else {
+            asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, null);
+        }
 
         interfaceService.addListener(listener);
     }
@@ -94,6 +93,8 @@
      * Cleans up the router and unprovisions all interfaces.
      */
     public void cleanup() {
+        asyncDeviceFetcher.shutdown();
+
         interfaceService.removeListener(listener);
 
         unprovision();
@@ -112,8 +113,15 @@
      * Changes the router configuration.
      *
      * @param newConfig new configuration
+     * @param forceUnprovision true if we want to force unprovision the device when it goes offline
      */
-    public void changeConfiguration(RouterInfo newConfig) {
+    public void changeConfiguration(RouterInfo newConfig, boolean forceUnprovision) {
+        if (forceUnprovision) {
+            asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, this::forceUnprovision);
+        } else {
+            asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, null);
+        }
+
         Set<String> oldConfiguredInterfaces = info.interfaces();
         info = newConfig;
         Set<String> newConfiguredInterfaces = info.interfaces();
@@ -153,18 +161,21 @@
 
     private void provision() {
         getInterfacesForDevice(info.deviceId())
-                .filter(this::shouldProvision)
                 .forEach(this::provision);
     }
 
     private void unprovision() {
         getInterfacesForDevice(info.deviceId())
-                .filter(this::shouldProvision)
                 .forEach(this::unprovision);
     }
 
+    private void forceUnprovision() {
+        getInterfacesForDevice(info.deviceId())
+                .forEach(this::forceUnprovision);
+    }
+
     private void provision(Interface intf) {
-        if (!provisioned.contains(intf) && shouldProvision(intf)) {
+        if (!provisioned.contains(intf) && deviceAvailable(intf) && shouldProvision(intf)) {
             log.info("Provisioning interface {}", intf);
             provisioner.accept(InterfaceProvisionRequest.of(info, intf));
             provisioned.add(intf);
@@ -172,16 +183,28 @@
     }
 
     private void unprovision(Interface intf) {
-        if (provisioned.contains(intf)) {
+        if (provisioned.contains(intf) && deviceAvailable(intf) && shouldProvision(intf)) {
             log.info("Unprovisioning interface {}", intf);
             unprovisioner.accept(InterfaceProvisionRequest.of(info, intf));
             provisioned.remove(intf);
         }
     }
 
+    private void forceUnprovision(Interface intf) {
+        // Skip availability check when force unprovisioning an interface
+        if (provisioned.contains(intf) && shouldProvision(intf)) {
+            log.info("Unprovisioning interface {}", intf);
+            unprovisioner.accept(InterfaceProvisionRequest.of(info, intf));
+            provisioned.remove(intf);
+        }
+    }
+
+    private boolean deviceAvailable(Interface intf) {
+        return deviceService.isAvailable(intf.connectPoint().deviceId());
+    }
+
     private boolean shouldProvision(Interface intf) {
-        return deviceAvailable &&
-                (info.interfaces().isEmpty() || info.interfaces().contains(intf.name()));
+        return info.interfaces().isEmpty() || info.interfaces().contains(intf.name());
     }
 
     private Stream<Interface> getInterfacesForDevice(DeviceId deviceId) {