Phased recovery
- Implemented a set of CLI commands
- Enable/disable group of ports
- List recovery phase of each device
- Force a specific device to enter given phase
- Return CompletableFuture in RRP
- Introduce completeAfter method in Tools
- Introduce submit method in PredictableExecutor which returns a CompletableFuture
Change-Id: I60b0fb7b67e392b33b52d908d2b53f7acbddc565
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
index a21e327..46231b0 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
@@ -16,28 +16,37 @@
package org.onosproject.segmentrouting;
+import com.google.common.collect.Lists;
import org.onlab.packet.EthType;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.PredictableExecutor;
+import org.onlab.util.Tools;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
import org.onosproject.net.HostLocation;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostService;
import org.onosproject.net.host.ProbeMode;
+import org.onosproject.segmentrouting.phasedrecovery.api.Phase;
+import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import java.util.HashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
@@ -75,19 +84,54 @@
}
protected void init(DeviceId devId) {
+ log.info("Initializing hosts on {}", devId);
+ List<CompletableFuture<Void>> hostFutures = Lists.newArrayList();
+
// Init hosts in parallel using hostWorkers executor
- hostService.getHosts().forEach(
- host -> hostWorkers.execute(() -> initHost(host, devId), host.id().hashCode())
- );
+ hostService.getHosts().forEach(host -> {
+ hostFutures.add(hostWorkers.submit(() -> initHost(host, devId), host.id().hashCode()));
+ });
+
+ log.debug("{} hostFutures for {}", hostFutures.size(), devId);
+ CompletableFuture<Void> allHostFuture = CompletableFuture.allOf(hostFutures.toArray(new CompletableFuture[0]));
+ CompletableFuture<Void> timeoutFuture =
+ Tools.completeAfter(PhasedRecoveryService.PAIR_TIMEOUT, TimeUnit.SECONDS);
+
+ allHostFuture.runAfterEitherAsync(timeoutFuture, () -> {
+ if (allHostFuture.isDone()) {
+ log.info("{} hosts initialized. Move {} to the next phase", hostFutures.size(), devId);
+ } else {
+ log.info("Timeout reached. Move {} to the next phase", devId);
+ }
+ srManager.phasedRecoveryService.setPhase(devId, Phase.INFRA);
+ });
}
private void initHost(Host host, DeviceId deviceId) {
+ List<CompletableFuture<Objective>> locationFutures = Lists.newArrayList();
+
effectiveLocations(host).forEach(location -> {
if (location.deviceId().equals(deviceId) ||
location.deviceId().equals(srManager.getPairDeviceId(deviceId).orElse(null))) {
- processHostAddedAtLocation(host, location);
+ locationFutures.addAll(processHostAddedAtLocation(host, location));
}
});
+
+ log.debug("{} locationFutures for {}", locationFutures.size(), host);
+
+ // Waiting for all locationFutures to be completed.
+ // This is a blocking operation but it is fine since this is run in a separate thread
+ try {
+ CompletableFuture.allOf(locationFutures.toArray(new CompletableFuture[0]))
+ .thenApply(objectives -> locationFutures.stream()
+ .map(CompletableFuture::join)
+ .collect(Collectors.toList())
+ )
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Exception caught when executing locationFutures");
+ locationFutures.forEach(future -> future.cancel(false));
+ }
}
void processHostAddedEvent(HostEvent event) {
@@ -107,7 +151,7 @@
}
}
- void processHostAddedAtLocation(Host host, HostLocation location) {
+ List<CompletableFuture<Objective>> processHostAddedAtLocation(Host host, HostLocation location) {
checkArgument(effectiveLocations(host).contains(location), "{} is not a location of {}", location, host);
MacAddress hostMac = host.mac();
@@ -116,15 +160,22 @@
Set<IpAddress> ips = host.ipAddresses();
log.info("Host {}/{} is added at {}", hostMac, hostVlanId, locations);
+ List<CompletableFuture<Objective>> objectiveFutures = Lists.newArrayList();
+
+ // TODO Phased recovery does not trace double tagged hosts
if (isDoubleTaggedHost(host)) {
ips.forEach(ip ->
processDoubleTaggedRoutingRule(location.deviceId(), location.port(), hostMac,
host.innerVlan(), hostVlanId, host.tpid(), ip, false)
);
} else {
- processBridgingRule(location.deviceId(), location.port(), hostMac, hostVlanId, false);
+ objectiveFutures.add(
+ processBridgingRule(location.deviceId(), location.port(), hostMac, hostVlanId, false)
+ );
ips.forEach(ip ->
- processRoutingRule(location.deviceId(), location.port(), hostMac, hostVlanId, ip, false)
+ objectiveFutures.add(
+ processRoutingRule(location.deviceId(), location.port(), hostMac, hostVlanId, ip, false)
+ )
);
}
@@ -141,9 +192,14 @@
return;
}
- processBridgingRule(pairDeviceId, pairRemotePort, hostMac, vlanId, false);
- ips.forEach(ip -> processRoutingRule(pairDeviceId, pairRemotePort, hostMac, vlanId,
- ip, false));
+ objectiveFutures.add(
+ processBridgingRule(pairDeviceId, pairRemotePort, hostMac, vlanId, false)
+ );
+ ips.forEach(ip ->
+ objectiveFutures.add(
+ processRoutingRule(pairDeviceId, pairRemotePort, hostMac, vlanId, ip, false)
+ )
+ );
if (srManager.activeProbing) {
probe(host, location, pairDeviceId, pairRemotePort);
@@ -160,6 +216,9 @@
srManager.updateMacVlanTreatment(location.deviceId(), hostMac, vlanId,
location.port(), nextId);
}
+
+ log.debug("{} objectiveFutures for {}", objectiveFutures.size(), location);
+ return objectiveFutures;
}
void processHostRemovedEvent(HostEvent event) {
@@ -552,16 +611,17 @@
* @param mac mac address
* @param vlanId VLAN ID
* @param revoke true to revoke the rule; false to populate
+ * @return future that includes the flow objective if succeeded, null if otherwise
*/
- private void processBridgingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
+ private CompletableFuture<Objective> processBridgingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
VlanId vlanId, boolean revoke) {
log.info("{} bridging entry for host {}/{} at {}:{}", revoke ? "Revoking" : "Populating",
mac, vlanId, deviceId, port);
if (!revoke) {
- srManager.defaultRoutingHandler.populateBridging(deviceId, port, mac, vlanId);
+ return srManager.defaultRoutingHandler.populateBridging(deviceId, port, mac, vlanId);
} else {
- srManager.defaultRoutingHandler.revokeBridging(deviceId, port, mac, vlanId);
+ return srManager.defaultRoutingHandler.revokeBridging(deviceId, port, mac, vlanId);
}
}
@@ -575,20 +635,21 @@
* @param vlanId VLAN ID
* @param ip IP address
* @param revoke true to revoke the rule; false to populate
+ * @return future that includes the flow objective if succeeded, null if otherwise
*/
- private void processRoutingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
+ private CompletableFuture<Objective> processRoutingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
VlanId vlanId, IpAddress ip, boolean revoke) {
ConnectPoint location = new ConnectPoint(deviceId, port);
if (!srManager.deviceConfiguration.inSameSubnet(location, ip)) {
log.info("{} is not included in the subnet config of {}/{}. Ignored.", ip, deviceId, port);
- return;
+ return CompletableFuture.completedFuture(null);
}
log.info("{} routing rule for {} at {}", revoke ? "Revoking" : "Populating", ip, location);
if (revoke) {
- srManager.defaultRoutingHandler.revokeRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
+ return srManager.defaultRoutingHandler.revokeRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
} else {
- srManager.defaultRoutingHandler.populateRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
+ return srManager.defaultRoutingHandler.populateRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
}
}