Phased recovery

- Implemented a set of CLI commands
    - Enable/disable group of ports
    - List recovery phase of each device
    - Force a specific device to enter given phase
- Return CompletableFuture in RRP
- Introduce completeAfter method in Tools
- Introduce submit method in PredictableExecutor which returns a CompletableFuture

Change-Id: I60b0fb7b67e392b33b52d908d2b53f7acbddc565
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
index a21e327..46231b0 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
@@ -16,28 +16,37 @@
 
 package org.onosproject.segmentrouting;
 
+import com.google.common.collect.Lists;
 import org.onlab.packet.EthType;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.IpPrefix;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.PredictableExecutor;
+import org.onlab.util.Tools;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Host;
 import org.onosproject.net.HostLocation;
 import org.onosproject.net.PortNumber;
+import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.host.HostEvent;
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.host.ProbeMode;
+import org.onosproject.segmentrouting.phasedrecovery.api.Phase;
+import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Sets;
 
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkArgument;
@@ -75,19 +84,54 @@
     }
 
     protected void init(DeviceId devId) {
+        log.info("Initializing hosts on {}", devId);
+        List<CompletableFuture<Void>> hostFutures = Lists.newArrayList();
+
         // Init hosts in parallel using hostWorkers executor
-        hostService.getHosts().forEach(
-                host -> hostWorkers.execute(() -> initHost(host, devId), host.id().hashCode())
-        );
+        hostService.getHosts().forEach(host -> {
+            hostFutures.add(hostWorkers.submit(() -> initHost(host, devId), host.id().hashCode()));
+        });
+
+        log.debug("{} hostFutures for {}", hostFutures.size(), devId);
+        CompletableFuture<Void> allHostFuture = CompletableFuture.allOf(hostFutures.toArray(new CompletableFuture[0]));
+        CompletableFuture<Void> timeoutFuture =
+                Tools.completeAfter(PhasedRecoveryService.PAIR_TIMEOUT, TimeUnit.SECONDS);
+
+        allHostFuture.runAfterEitherAsync(timeoutFuture, () -> {
+            if (allHostFuture.isDone()) {
+                log.info("{} hosts initialized. Move {} to the next phase", hostFutures.size(), devId);
+            } else {
+                log.info("Timeout reached. Move {} to the next phase", devId);
+            }
+            srManager.phasedRecoveryService.setPhase(devId, Phase.INFRA);
+        });
     }
 
     private void initHost(Host host, DeviceId deviceId) {
+        List<CompletableFuture<Objective>> locationFutures = Lists.newArrayList();
+
         effectiveLocations(host).forEach(location -> {
             if (location.deviceId().equals(deviceId) ||
                     location.deviceId().equals(srManager.getPairDeviceId(deviceId).orElse(null))) {
-                processHostAddedAtLocation(host, location);
+                locationFutures.addAll(processHostAddedAtLocation(host, location));
             }
         });
+
+        log.debug("{} locationFutures for {}", locationFutures.size(), host);
+
+        // Waiting for all locationFutures to be completed.
+        // This is a blocking operation but it is fine since this is run in a separate thread
+        try {
+            CompletableFuture.allOf(locationFutures.toArray(new CompletableFuture[0]))
+                    .thenApply(objectives -> locationFutures.stream()
+                            .map(CompletableFuture::join)
+                            .collect(Collectors.toList())
+                    )
+                    .get();
+        } catch (InterruptedException | ExecutionException e) {
+            log.warn("Exception caught when executing locationFutures");
+            locationFutures.forEach(future -> future.cancel(false));
+        }
     }
 
     void processHostAddedEvent(HostEvent event) {
@@ -107,7 +151,7 @@
         }
     }
 
-    void processHostAddedAtLocation(Host host, HostLocation location) {
+    List<CompletableFuture<Objective>> processHostAddedAtLocation(Host host, HostLocation location) {
         checkArgument(effectiveLocations(host).contains(location), "{} is not a location of {}", location, host);
 
         MacAddress hostMac = host.mac();
@@ -116,15 +160,22 @@
         Set<IpAddress> ips = host.ipAddresses();
         log.info("Host {}/{} is added at {}", hostMac, hostVlanId, locations);
 
+        List<CompletableFuture<Objective>> objectiveFutures = Lists.newArrayList();
+
+        // TODO Phased recovery does not trace double tagged hosts
         if (isDoubleTaggedHost(host)) {
             ips.forEach(ip ->
                 processDoubleTaggedRoutingRule(location.deviceId(), location.port(), hostMac,
                                                host.innerVlan(), hostVlanId, host.tpid(), ip, false)
             );
         } else {
-            processBridgingRule(location.deviceId(), location.port(), hostMac, hostVlanId, false);
+            objectiveFutures.add(
+                    processBridgingRule(location.deviceId(), location.port(), hostMac, hostVlanId, false)
+            );
             ips.forEach(ip ->
-                processRoutingRule(location.deviceId(), location.port(), hostMac, hostVlanId, ip, false)
+                objectiveFutures.add(
+                        processRoutingRule(location.deviceId(), location.port(), hostMac, hostVlanId, ip, false)
+                )
             );
         }
 
@@ -141,9 +192,14 @@
                         return;
                     }
 
-                    processBridgingRule(pairDeviceId, pairRemotePort, hostMac, vlanId, false);
-                    ips.forEach(ip -> processRoutingRule(pairDeviceId, pairRemotePort, hostMac, vlanId,
-                                    ip, false));
+                    objectiveFutures.add(
+                            processBridgingRule(pairDeviceId, pairRemotePort, hostMac, vlanId, false)
+                    );
+                    ips.forEach(ip ->
+                            objectiveFutures.add(
+                                    processRoutingRule(pairDeviceId, pairRemotePort, hostMac, vlanId, ip, false)
+                            )
+                    );
 
                     if (srManager.activeProbing) {
                         probe(host, location, pairDeviceId, pairRemotePort);
@@ -160,6 +216,9 @@
             srManager.updateMacVlanTreatment(location.deviceId(), hostMac, vlanId,
                                 location.port(), nextId);
         }
+
+        log.debug("{} objectiveFutures for {}", objectiveFutures.size(), location);
+        return objectiveFutures;
     }
 
     void processHostRemovedEvent(HostEvent event) {
@@ -552,16 +611,17 @@
      * @param mac mac address
      * @param vlanId VLAN ID
      * @param revoke true to revoke the rule; false to populate
+     * @return future that includes the flow objective if succeeded, null if otherwise
      */
-    private void processBridgingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
+    private CompletableFuture<Objective> processBridgingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
                                      VlanId vlanId, boolean revoke) {
         log.info("{} bridging entry for host {}/{} at {}:{}", revoke ? "Revoking" : "Populating",
                 mac, vlanId, deviceId, port);
 
         if (!revoke) {
-            srManager.defaultRoutingHandler.populateBridging(deviceId, port, mac, vlanId);
+            return srManager.defaultRoutingHandler.populateBridging(deviceId, port, mac, vlanId);
         } else {
-            srManager.defaultRoutingHandler.revokeBridging(deviceId, port, mac, vlanId);
+            return srManager.defaultRoutingHandler.revokeBridging(deviceId, port, mac, vlanId);
         }
     }
 
@@ -575,20 +635,21 @@
      * @param vlanId VLAN ID
      * @param ip IP address
      * @param revoke true to revoke the rule; false to populate
+     * @return future that includes the flow objective if succeeded, null if otherwise
      */
-    private void processRoutingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
+    private CompletableFuture<Objective> processRoutingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
                                     VlanId vlanId, IpAddress ip, boolean revoke) {
         ConnectPoint location = new ConnectPoint(deviceId, port);
         if (!srManager.deviceConfiguration.inSameSubnet(location, ip)) {
             log.info("{} is not included in the subnet config of {}/{}. Ignored.", ip, deviceId, port);
-            return;
+            return CompletableFuture.completedFuture(null);
         }
 
         log.info("{} routing rule for {} at {}", revoke ? "Revoking" : "Populating", ip, location);
         if (revoke) {
-            srManager.defaultRoutingHandler.revokeRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
+            return srManager.defaultRoutingHandler.revokeRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
         } else {
-            srManager.defaultRoutingHandler.populateRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
+            return srManager.defaultRoutingHandler.populateRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
         }
     }