Implement an option in CPRM to reprogram the flows when device reconnecting

Also remove unused AsyncDeviceFetcher in FibInstaller

Change-Id: I52e778a51854efd6bfe47c56569efa5c27d7c7fb
diff --git a/apps/routing-api/src/main/java/org/onosproject/routing/AsyncDeviceFetcher.java b/apps/routing-api/src/main/java/org/onosproject/routing/AsyncDeviceFetcher.java
index 65ffec0..4d14bc5 100644
--- a/apps/routing-api/src/main/java/org/onosproject/routing/AsyncDeviceFetcher.java
+++ b/apps/routing-api/src/main/java/org/onosproject/routing/AsyncDeviceFetcher.java
@@ -22,7 +22,7 @@
 import org.onosproject.net.device.DeviceService;
 
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static com.google.common.base.Preconditions.checkNotNull;
@@ -36,7 +36,8 @@
 
     private DeviceListener listener = new InternalDeviceListener();
 
-    private Map<DeviceId, CompletableFuture<DeviceId>> devices = new ConcurrentHashMap();
+    private Map<DeviceId, Runnable> onConnect = new ConcurrentHashMap<>();
+    private Map<DeviceId, Runnable> onDisconnect = new ConcurrentHashMap<>();
 
     private AsyncDeviceFetcher(DeviceService deviceService) {
         this.deviceService = checkNotNull(deviceService);
@@ -48,24 +49,27 @@
      */
     public void shutdown() {
         deviceService.removeListener(listener);
-        devices.clear();
+        onConnect.clear();
+        onDisconnect.clear();
     }
 
     /**
-     * Returns a completable future that completes when the device is available
-     * for the first time.
-     *
-     * @param deviceId ID of the device
-     * @return completable future
+     * Executes provided callback when given device connects/disconnects.
+     * @param deviceId device ID
+     * @param onConnect callback that will be executed immediately if the device
+     *                  is currently online, or when the device becomes online
+     * @param onDisconnect callback that will be executed when the device becomes offline
      */
-    public CompletableFuture<DeviceId> getDevice(DeviceId deviceId) {
-        CompletableFuture<DeviceId> future = new CompletableFuture<>();
-        return devices.computeIfAbsent(deviceId, deviceId1 -> {
+     void registerCallback(DeviceId deviceId, Runnable onConnect, Runnable onDisconnect) {
+        if (onConnect != null) {
             if (deviceService.isAvailable(deviceId)) {
-                future.complete(deviceId);
+                onConnect.run();
             }
-            return future;
-        });
+            this.onConnect.put(deviceId, onConnect);
+        }
+        if (onDisconnect != null) {
+            this.onDisconnect.put(deviceId, onDisconnect);
+        }
     }
 
     /**
@@ -82,24 +86,23 @@
         @Override
         public void event(DeviceEvent event) {
             switch (event.type()) {
-            case DEVICE_ADDED:
-            case DEVICE_AVAILABILITY_CHANGED:
-                if (deviceService.isAvailable(event.subject().id())) {
+                case DEVICE_ADDED:
+                case DEVICE_AVAILABILITY_CHANGED:
                     DeviceId deviceId = event.subject().id();
-                    CompletableFuture<DeviceId> future = devices.get(deviceId);
-                    if (future != null) {
-                        future.complete(deviceId);
+                    if (deviceService.isAvailable(deviceId)) {
+                        Optional.ofNullable(onConnect.get(deviceId)).ifPresent(Runnable::run);
+                    } else {
+                        Optional.ofNullable(onDisconnect.get(deviceId)).ifPresent(Runnable::run);
                     }
-                }
-                break;
-            case DEVICE_UPDATED:
-            case DEVICE_REMOVED:
-            case DEVICE_SUSPENDED:
-            case PORT_ADDED:
-            case PORT_UPDATED:
-            case PORT_REMOVED:
-            default:
-                break;
+                    break;
+                case DEVICE_UPDATED:
+                case DEVICE_REMOVED:
+                case DEVICE_SUSPENDED:
+                case PORT_ADDED:
+                case PORT_UPDATED:
+                case PORT_REMOVED:
+                default:
+                    break;
             }
         }
     }
diff --git a/apps/routing-api/src/main/java/org/onosproject/routing/Router.java b/apps/routing-api/src/main/java/org/onosproject/routing/Router.java
index 97fee88..855084e 100644
--- a/apps/routing-api/src/main/java/org/onosproject/routing/Router.java
+++ b/apps/routing-api/src/main/java/org/onosproject/routing/Router.java
@@ -53,9 +53,9 @@
     private InterfaceService interfaceService;
     private InterfaceListener listener = new InternalInterfaceListener();
 
-    private AsyncDeviceFetcher asyncDeviceFetcher;
+    private DeviceService deviceService;
 
-    private volatile boolean deviceAvailable = false;
+    private AsyncDeviceFetcher asyncDeviceFetcher;
 
     /**
      * Creates a new router interface manager.
@@ -65,27 +65,26 @@
      * @param deviceService device service
      * @param provisioner consumer that will provision new interfaces
      * @param unprovisioner consumer that will unprovision old interfaces
+     * @param forceUnprovision force unprovision when the device goes offline
      */
     public Router(RouterInfo info,
                   InterfaceService interfaceService,
                   DeviceService deviceService,
                   Consumer<InterfaceProvisionRequest> provisioner,
-                  Consumer<InterfaceProvisionRequest> unprovisioner) {
+                  Consumer<InterfaceProvisionRequest> unprovisioner,
+                  boolean forceUnprovision) {
         this.info = checkNotNull(info);
         this.provisioner = checkNotNull(provisioner);
         this.unprovisioner = checkNotNull(unprovisioner);
         this.interfaceService = checkNotNull(interfaceService);
+        this.deviceService = checkNotNull(deviceService);
 
         this.asyncDeviceFetcher = AsyncDeviceFetcher.create(deviceService);
-        asyncDeviceFetcher.getDevice(info.deviceId())
-                .thenAccept(deviceId1 -> {
-                    deviceAvailable = true;
-                    provision();
-                }).whenComplete((v, t) -> {
-                    if (t != null) {
-                        log.error("Error provisioning: ", t);
-                    }
-                });
+        if (forceUnprovision) {
+            asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, this::forceUnprovision);
+        } else {
+            asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, null);
+        }
 
         interfaceService.addListener(listener);
     }
@@ -94,6 +93,8 @@
      * Cleans up the router and unprovisions all interfaces.
      */
     public void cleanup() {
+        asyncDeviceFetcher.shutdown();
+
         interfaceService.removeListener(listener);
 
         unprovision();
@@ -112,8 +113,15 @@
      * Changes the router configuration.
      *
      * @param newConfig new configuration
+     * @param forceUnprovision true if we want to force unprovision the device when it goes offline
      */
-    public void changeConfiguration(RouterInfo newConfig) {
+    public void changeConfiguration(RouterInfo newConfig, boolean forceUnprovision) {
+        if (forceUnprovision) {
+            asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, this::forceUnprovision);
+        } else {
+            asyncDeviceFetcher.registerCallback(info.deviceId(), this::provision, null);
+        }
+
         Set<String> oldConfiguredInterfaces = info.interfaces();
         info = newConfig;
         Set<String> newConfiguredInterfaces = info.interfaces();
@@ -153,18 +161,21 @@
 
     private void provision() {
         getInterfacesForDevice(info.deviceId())
-                .filter(this::shouldProvision)
                 .forEach(this::provision);
     }
 
     private void unprovision() {
         getInterfacesForDevice(info.deviceId())
-                .filter(this::shouldProvision)
                 .forEach(this::unprovision);
     }
 
+    private void forceUnprovision() {
+        getInterfacesForDevice(info.deviceId())
+                .forEach(this::forceUnprovision);
+    }
+
     private void provision(Interface intf) {
-        if (!provisioned.contains(intf) && shouldProvision(intf)) {
+        if (!provisioned.contains(intf) && deviceAvailable(intf) && shouldProvision(intf)) {
             log.info("Provisioning interface {}", intf);
             provisioner.accept(InterfaceProvisionRequest.of(info, intf));
             provisioned.add(intf);
@@ -172,16 +183,28 @@
     }
 
     private void unprovision(Interface intf) {
-        if (provisioned.contains(intf)) {
+        if (provisioned.contains(intf) && deviceAvailable(intf) && shouldProvision(intf)) {
             log.info("Unprovisioning interface {}", intf);
             unprovisioner.accept(InterfaceProvisionRequest.of(info, intf));
             provisioned.remove(intf);
         }
     }
 
+    private void forceUnprovision(Interface intf) {
+        // Skip availability check when force unprovisioning an interface
+        if (provisioned.contains(intf) && shouldProvision(intf)) {
+            log.info("Unprovisioning interface {}", intf);
+            unprovisioner.accept(InterfaceProvisionRequest.of(info, intf));
+            provisioned.remove(intf);
+        }
+    }
+
+    private boolean deviceAvailable(Interface intf) {
+        return deviceService.isAvailable(intf.connectPoint().deviceId());
+    }
+
     private boolean shouldProvision(Interface intf) {
-        return deviceAvailable &&
-                (info.interfaces().isEmpty() || info.interfaces().contains(intf.name()));
+        return info.interfaces().isEmpty() || info.interfaces().contains(intf.name());
     }
 
     private Stream<Interface> getInterfacesForDevice(DeviceId deviceId) {
diff --git a/apps/routing/cpr/BUCK b/apps/routing/cpr/BUCK
index 66a6f1e..51ffe35 100644
--- a/apps/routing/cpr/BUCK
+++ b/apps/routing/cpr/BUCK
@@ -8,6 +8,7 @@
     '//lib:TEST_ADAPTERS',
     '//incubator/api:onos-incubator-api-tests',
     '//apps/routing-api:onos-apps-routing-api-tests',
+    '//utils/osgi:onlab-osgi-tests',
 ]
 
 osgi_jar_with_tests (
diff --git a/apps/routing/cpr/pom.xml b/apps/routing/cpr/pom.xml
index 6c12020..5e46550 100644
--- a/apps/routing/cpr/pom.xml
+++ b/apps/routing/cpr/pom.xml
@@ -27,5 +27,13 @@
     <artifactId>onos-apps-routing-cpr</artifactId>
     <packaging>bundle</packaging>
 
+    <dependencies>
+        <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onlab-osgi</artifactId>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
 
 </project>
diff --git a/apps/routing/cpr/src/main/java/org/onosproject/routing/cpr/ControlPlaneRedirectManager.java b/apps/routing/cpr/src/main/java/org/onosproject/routing/cpr/ControlPlaneRedirectManager.java
index 0c97bb1..f0f2927 100644
--- a/apps/routing/cpr/src/main/java/org/onosproject/routing/cpr/ControlPlaneRedirectManager.java
+++ b/apps/routing/cpr/src/main/java/org/onosproject/routing/cpr/ControlPlaneRedirectManager.java
@@ -22,6 +22,8 @@
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
 import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.onlab.packet.EthType;
@@ -30,7 +32,9 @@
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.Tools;
 import org.onosproject.app.ApplicationService;
+import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.incubator.net.intf.Interface;
@@ -63,8 +67,10 @@
 import org.onosproject.routing.config.RouterConfigHelper;
 import org.onosproject.routing.config.RoutersConfig;
 import org.onosproject.routing.config.RoutingConfigurationService;
+import org.osgi.service.component.ComponentContext;
 import org.slf4j.Logger;
 
+import java.util.Dictionary;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -127,6 +133,13 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected RoutingConfigurationService rs;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ComponentConfigService cfgService;
+
+    @Property(name = "forceUnprovision", boolValue = false,
+            label = "Force unprovision when the device goes offline")
+    private boolean forceUnprovision = false;
+
     private static final String APP_NAME = "org.onosproject.cpr";
     private ApplicationId appId;
 
@@ -139,9 +152,12 @@
     private final InternalHostListener hostListener = new InternalHostListener();
 
     @Activate
-    protected void activate() {
+    protected void activate(ComponentContext context) {
         this.appId = coreService.registerApplication(APP_NAME);
 
+        cfgService.registerProperties(getClass());
+        modified(context);
+
         networkConfigService.addListener(networkConfigListener);
         hostService.addListener(hostListener);
 
@@ -153,10 +169,34 @@
 
     @Deactivate
     protected void deactivate() {
+        cfgService.unregisterProperties(getClass(), false);
         networkConfigService.removeListener(networkConfigListener);
         hostService.removeListener(hostListener);
     }
 
+    @Modified
+    protected void modified(ComponentContext context) {
+        if (context != null) {
+            readComponentConfiguration(context);
+            processRouterConfig();
+        }
+    }
+
+    private void readComponentConfiguration(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+        Boolean flag;
+
+        flag = Tools.isPropertyEnabled(properties, "forceUnprovision");
+        if (flag == null) {
+            log.info("ForceUnprovision is not configured, " +
+                    "using current value of {}", forceUnprovision);
+        } else {
+            forceUnprovision = flag;
+            log.info("Configured. ForceUnprovision is {}",
+                    forceUnprovision ? "enabled" : "disabled");
+        }
+    }
+
     /**
      * Sets up the router interfaces if router config is available.
      */
@@ -174,7 +214,7 @@
                 if (r == null) {
                     return createRouter(RouterInfo.from(router));
                 } else {
-                    r.changeConfiguration(RouterInfo.from(router));
+                    r.changeConfiguration(RouterInfo.from(router), forceUnprovision);
                     return r;
                 }
             });
@@ -198,7 +238,8 @@
                 interfaceService,
                 deviceService,
                 this::provisionInterface,
-                this::unprovisionInterface);
+                this::unprovisionInterface,
+                forceUnprovision);
     }
 
     private void provisionInterface(InterfaceProvisionRequest intf) {
diff --git a/apps/routing/cpr/src/test/java/org/onosproject/routing/cpr/ControlPlaneRedirectManagerTest.java b/apps/routing/cpr/src/test/java/org/onosproject/routing/cpr/ControlPlaneRedirectManagerTest.java
index d399fc3..ffcce3b 100644
--- a/apps/routing/cpr/src/test/java/org/onosproject/routing/cpr/ControlPlaneRedirectManagerTest.java
+++ b/apps/routing/cpr/src/test/java/org/onosproject/routing/cpr/ControlPlaneRedirectManagerTest.java
@@ -21,6 +21,7 @@
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.onlab.osgi.ComponentContextAdapter;
 import org.onlab.packet.EthType;
 import org.onlab.packet.Ip6Address;
 import org.onlab.packet.IpAddress;
@@ -152,7 +153,7 @@
         controlPlaneRedirectManager.hostService = createNiceMock(HostService.class);
         controlPlaneRedirectManager.mastershipService = mastershipService;
         controlPlaneRedirectManager.applicationService = applicationService;
-        controlPlaneRedirectManager.activate();
+        controlPlaneRedirectManager.activate(new ComponentContextAdapter());
         verify(flowObjectiveService);
     }
 
diff --git a/apps/routing/fibinstaller/src/main/java/org/onosproject/routing/fibinstaller/FibInstaller.java b/apps/routing/fibinstaller/src/main/java/org/onosproject/routing/fibinstaller/FibInstaller.java
index 04b4352..d279ffc 100644
--- a/apps/routing/fibinstaller/src/main/java/org/onosproject/routing/fibinstaller/FibInstaller.java
+++ b/apps/routing/fibinstaller/src/main/java/org/onosproject/routing/fibinstaller/FibInstaller.java
@@ -65,7 +65,6 @@
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
-import org.onosproject.routing.AsyncDeviceFetcher;
 import org.onosproject.routing.NextHop;
 import org.onosproject.routing.NextHopGroupKey;
 import org.onosproject.routing.RouterInfo;
@@ -137,7 +136,6 @@
     private DeviceId deviceId;
 
     private Router interfaceManager;
-    private AsyncDeviceFetcher asyncDeviceFetcher;
 
     private ApplicationId coreAppId;
     private ApplicationId routerAppId;
@@ -177,8 +175,6 @@
 
         networkConfigService.addListener(configListener);
 
-        asyncDeviceFetcher = AsyncDeviceFetcher.create(deviceService);
-
         processRouterConfig();
 
         applicationService.registerDeactivateHook(fibAppId, () -> cleanUp());
@@ -188,7 +184,6 @@
 
     @Deactivate
     protected void deactivate() {
-        asyncDeviceFetcher.shutdown();
         networkConfigService.removeListener(configListener);
 
         componentConfigService.unregisterProperties(getClass(), false);
@@ -226,7 +221,7 @@
 
             interfaceManager = createRouter(RouterInfo.from(routerConfig));
         } else {
-            interfaceManager.changeConfiguration(RouterInfo.from(routerConfig));
+            interfaceManager.changeConfiguration(RouterInfo.from(routerConfig), false);
         }
     }
 
@@ -253,7 +248,8 @@
                 interfaceService,
                 deviceService,
                 this::provisionInterface,
-                this::unprovisionInterface);
+                this::unprovisionInterface,
+                false);
     }
 
     private void updateRoute(ResolvedRoute route) {
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 0e444fd..dc3f358 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -360,7 +360,8 @@
                                       "arpEnabled", "false");
         compCfgService.preSetProperty("org.onosproject.net.host.impl.HostManager",
                                       "greedyLearningIpv6", "true");
-
+        compCfgService.preSetProperty("org.onosproject.routing.cpr.ControlPlaneRedirectManager",
+                                      "forceUnprovision", "true");
 
         processor = new InternalPacketProcessor();
         linkListener = new InternalLinkListener();