Phased recovery
- Implemented a set of CLI commands
- Enable/disable group of ports
- List recovery phase of each device
- Force a specific device to enter given phase
- Return CompletableFuture in RRP
- Introduce completeAfter method in Tools
- Introduce submit method in PredictableExecutor which returns a CompletableFuture
Change-Id: I60b0fb7b67e392b33b52d908d2b53f7acbddc565
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java
index b4aaec8..8b4b31c 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/DefaultRoutingHandler.java
@@ -38,6 +38,7 @@
import org.onosproject.net.DeviceId;
import org.onosproject.net.Link;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.flowobjective.Objective;
import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
import org.onosproject.segmentrouting.config.DeviceConfiguration;
import org.onosproject.segmentrouting.grouphandler.DefaultGroupHandler;
@@ -203,7 +204,7 @@
/**
* Determines if routing in the network has been stable in the last
- * STABLITY_THRESHOLD seconds, by comparing the current time to the last
+ * STABILITY_THRESHOLD seconds, by comparing the current time to the last
* routing change timestamp.
*
* @return true if stable
@@ -1359,12 +1360,15 @@
* @param hostVlanId Vlan ID of the nexthop
* @param outPort port where the next hop attaches to
* @param directHost host is of type direct or indirect
+ * @return future that includes the flow objective if succeeded, null if otherwise
*/
- void populateRoute(DeviceId deviceId, IpPrefix prefix,
- MacAddress hostMac, VlanId hostVlanId, PortNumber outPort, boolean directHost) {
+ CompletableFuture<Objective> populateRoute(DeviceId deviceId, IpPrefix prefix, MacAddress hostMac,
+ VlanId hostVlanId, PortNumber outPort, boolean directHost) {
if (shouldProgram(deviceId)) {
- srManager.routingRulePopulator.populateRoute(deviceId, prefix, hostMac, hostVlanId, outPort, directHost);
+ return srManager.routingRulePopulator.populateRoute(deviceId, prefix,
+ hostMac, hostVlanId, outPort, directHost);
}
+ return CompletableFuture.completedFuture(null);
}
/**
@@ -1377,24 +1381,29 @@
* @param hostVlanId Vlan ID of the nexthop
* @param outPort port that next hop attaches to
* @param directHost host is of type direct or indirect
+ * @return future that carries the flow objective if succeeded, null if otherwise
*/
- void revokeRoute(DeviceId deviceId, IpPrefix prefix,
+ CompletableFuture<Objective> revokeRoute(DeviceId deviceId, IpPrefix prefix,
MacAddress hostMac, VlanId hostVlanId, PortNumber outPort, boolean directHost) {
if (shouldProgram(deviceId)) {
- srManager.routingRulePopulator.revokeRoute(deviceId, prefix, hostMac, hostVlanId, outPort, directHost);
+ return srManager.routingRulePopulator.revokeRoute(deviceId, prefix, hostMac, hostVlanId,
+ outPort, directHost);
}
+ return CompletableFuture.completedFuture(null);
}
- void populateBridging(DeviceId deviceId, PortNumber port, MacAddress mac, VlanId vlanId) {
+ CompletableFuture<Objective> populateBridging(DeviceId deviceId, PortNumber port, MacAddress mac, VlanId vlanId) {
if (shouldProgram(deviceId)) {
- srManager.routingRulePopulator.populateBridging(deviceId, port, mac, vlanId);
+ return srManager.routingRulePopulator.populateBridging(deviceId, port, mac, vlanId);
}
+ return CompletableFuture.completedFuture(null);
}
- void revokeBridging(DeviceId deviceId, PortNumber port, MacAddress mac, VlanId vlanId) {
+ CompletableFuture<Objective> revokeBridging(DeviceId deviceId, PortNumber port, MacAddress mac, VlanId vlanId) {
if (shouldProgram(deviceId)) {
- srManager.routingRulePopulator.revokeBridging(deviceId, port, mac, vlanId);
+ return srManager.routingRulePopulator.revokeBridging(deviceId, port, mac, vlanId);
}
+ return CompletableFuture.completedFuture(null);
}
void updateBridging(DeviceId deviceId, PortNumber portNum, MacAddress hostMac,
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
index a21e327..46231b0 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/HostHandler.java
@@ -16,28 +16,37 @@
package org.onosproject.segmentrouting;
+import com.google.common.collect.Lists;
import org.onlab.packet.EthType;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.PredictableExecutor;
+import org.onlab.util.Tools;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
import org.onosproject.net.HostLocation;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostService;
import org.onosproject.net.host.ProbeMode;
+import org.onosproject.segmentrouting.phasedrecovery.api.Phase;
+import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import java.util.HashSet;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkArgument;
@@ -75,19 +84,54 @@
}
protected void init(DeviceId devId) {
+ log.info("Initializing hosts on {}", devId);
+ List<CompletableFuture<Void>> hostFutures = Lists.newArrayList();
+
// Init hosts in parallel using hostWorkers executor
- hostService.getHosts().forEach(
- host -> hostWorkers.execute(() -> initHost(host, devId), host.id().hashCode())
- );
+ hostService.getHosts().forEach(host -> {
+ hostFutures.add(hostWorkers.submit(() -> initHost(host, devId), host.id().hashCode()));
+ });
+
+ log.debug("{} hostFutures for {}", hostFutures.size(), devId);
+ CompletableFuture<Void> allHostFuture = CompletableFuture.allOf(hostFutures.toArray(new CompletableFuture[0]));
+ CompletableFuture<Void> timeoutFuture =
+ Tools.completeAfter(PhasedRecoveryService.PAIR_TIMEOUT, TimeUnit.SECONDS);
+
+ allHostFuture.runAfterEitherAsync(timeoutFuture, () -> {
+ if (allHostFuture.isDone()) {
+ log.info("{} hosts initialized. Move {} to the next phase", hostFutures.size(), devId);
+ } else {
+ log.info("Timeout reached. Move {} to the next phase", devId);
+ }
+ srManager.phasedRecoveryService.setPhase(devId, Phase.INFRA);
+ });
}
private void initHost(Host host, DeviceId deviceId) {
+ List<CompletableFuture<Objective>> locationFutures = Lists.newArrayList();
+
effectiveLocations(host).forEach(location -> {
if (location.deviceId().equals(deviceId) ||
location.deviceId().equals(srManager.getPairDeviceId(deviceId).orElse(null))) {
- processHostAddedAtLocation(host, location);
+ locationFutures.addAll(processHostAddedAtLocation(host, location));
}
});
+
+ log.debug("{} locationFutures for {}", locationFutures.size(), host);
+
+ // Waiting for all locationFutures to be completed.
+ // This is a blocking operation but it is fine since this is run in a separate thread
+ try {
+ CompletableFuture.allOf(locationFutures.toArray(new CompletableFuture[0]))
+ .thenApply(objectives -> locationFutures.stream()
+ .map(CompletableFuture::join)
+ .collect(Collectors.toList())
+ )
+ .get();
+ } catch (InterruptedException | ExecutionException e) {
+ log.warn("Exception caught when executing locationFutures");
+ locationFutures.forEach(future -> future.cancel(false));
+ }
}
void processHostAddedEvent(HostEvent event) {
@@ -107,7 +151,7 @@
}
}
- void processHostAddedAtLocation(Host host, HostLocation location) {
+ List<CompletableFuture<Objective>> processHostAddedAtLocation(Host host, HostLocation location) {
checkArgument(effectiveLocations(host).contains(location), "{} is not a location of {}", location, host);
MacAddress hostMac = host.mac();
@@ -116,15 +160,22 @@
Set<IpAddress> ips = host.ipAddresses();
log.info("Host {}/{} is added at {}", hostMac, hostVlanId, locations);
+ List<CompletableFuture<Objective>> objectiveFutures = Lists.newArrayList();
+
+ // TODO Phased recovery does not trace double tagged hosts
if (isDoubleTaggedHost(host)) {
ips.forEach(ip ->
processDoubleTaggedRoutingRule(location.deviceId(), location.port(), hostMac,
host.innerVlan(), hostVlanId, host.tpid(), ip, false)
);
} else {
- processBridgingRule(location.deviceId(), location.port(), hostMac, hostVlanId, false);
+ objectiveFutures.add(
+ processBridgingRule(location.deviceId(), location.port(), hostMac, hostVlanId, false)
+ );
ips.forEach(ip ->
- processRoutingRule(location.deviceId(), location.port(), hostMac, hostVlanId, ip, false)
+ objectiveFutures.add(
+ processRoutingRule(location.deviceId(), location.port(), hostMac, hostVlanId, ip, false)
+ )
);
}
@@ -141,9 +192,14 @@
return;
}
- processBridgingRule(pairDeviceId, pairRemotePort, hostMac, vlanId, false);
- ips.forEach(ip -> processRoutingRule(pairDeviceId, pairRemotePort, hostMac, vlanId,
- ip, false));
+ objectiveFutures.add(
+ processBridgingRule(pairDeviceId, pairRemotePort, hostMac, vlanId, false)
+ );
+ ips.forEach(ip ->
+ objectiveFutures.add(
+ processRoutingRule(pairDeviceId, pairRemotePort, hostMac, vlanId, ip, false)
+ )
+ );
if (srManager.activeProbing) {
probe(host, location, pairDeviceId, pairRemotePort);
@@ -160,6 +216,9 @@
srManager.updateMacVlanTreatment(location.deviceId(), hostMac, vlanId,
location.port(), nextId);
}
+
+ log.debug("{} objectiveFutures for {}", objectiveFutures.size(), location);
+ return objectiveFutures;
}
void processHostRemovedEvent(HostEvent event) {
@@ -552,16 +611,17 @@
* @param mac mac address
* @param vlanId VLAN ID
* @param revoke true to revoke the rule; false to populate
+ * @return future that includes the flow objective if succeeded, null if otherwise
*/
- private void processBridgingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
+ private CompletableFuture<Objective> processBridgingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
VlanId vlanId, boolean revoke) {
log.info("{} bridging entry for host {}/{} at {}:{}", revoke ? "Revoking" : "Populating",
mac, vlanId, deviceId, port);
if (!revoke) {
- srManager.defaultRoutingHandler.populateBridging(deviceId, port, mac, vlanId);
+ return srManager.defaultRoutingHandler.populateBridging(deviceId, port, mac, vlanId);
} else {
- srManager.defaultRoutingHandler.revokeBridging(deviceId, port, mac, vlanId);
+ return srManager.defaultRoutingHandler.revokeBridging(deviceId, port, mac, vlanId);
}
}
@@ -575,20 +635,21 @@
* @param vlanId VLAN ID
* @param ip IP address
* @param revoke true to revoke the rule; false to populate
+ * @return future that includes the flow objective if succeeded, null if otherwise
*/
- private void processRoutingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
+ private CompletableFuture<Objective> processRoutingRule(DeviceId deviceId, PortNumber port, MacAddress mac,
VlanId vlanId, IpAddress ip, boolean revoke) {
ConnectPoint location = new ConnectPoint(deviceId, port);
if (!srManager.deviceConfiguration.inSameSubnet(location, ip)) {
log.info("{} is not included in the subnet config of {}/{}. Ignored.", ip, deviceId, port);
- return;
+ return CompletableFuture.completedFuture(null);
}
log.info("{} routing rule for {} at {}", revoke ? "Revoking" : "Populating", ip, location);
if (revoke) {
- srManager.defaultRoutingHandler.revokeRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
+ return srManager.defaultRoutingHandler.revokeRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
} else {
- srManager.defaultRoutingHandler.populateRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
+ return srManager.defaultRoutingHandler.populateRoute(deviceId, ip.toIpPrefix(), mac, vlanId, port, true);
}
}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RouteHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RouteHandler.java
index cc03597..d9372fe 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RouteHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RouteHandler.java
@@ -51,25 +51,30 @@
}
protected void init(DeviceId deviceId) {
- Optional<DeviceId> pairDeviceId = srManager.getPairDeviceId(deviceId);
-
srManager.routeService.getRouteTables().stream()
.map(srManager.routeService::getRoutes)
.flatMap(Collection::stream)
.map(RouteInfo::allRoutes)
- .filter(allRoutes -> allRoutes.stream().allMatch(resolvedRoute ->
- srManager.nextHopLocations(resolvedRoute).stream().allMatch(cp ->
- deviceId.equals(cp.deviceId()) ||
- (pairDeviceId.isPresent() && pairDeviceId.get().equals(cp.deviceId()))
- )))
- .forEach(this::processRouteAddedInternal);
+ .filter(allRoutes -> allRoutes.stream().anyMatch(resolvedRoute ->
+ srManager.nextHopLocations(resolvedRoute).stream()
+ .anyMatch(cp -> deviceId.equals(cp.deviceId()))))
+ .forEach(rr -> processRouteAddedInternal(rr, true));
}
void processRouteAdded(RouteEvent event) {
- processRouteAddedInternal(event.alternatives());
+ processRouteAddedInternal(event.alternatives(), false);
}
- private void processRouteAddedInternal(Collection<ResolvedRoute> routes) {
+ /**
+ * Internal logic that handles route addition.
+ *
+ * @param routes collection of routes to be processed
+ * @param populateRouteOnly true if we only want to populateRoute but not populateSubnet.
+ * Set it to true when initializing a device coming up.
+ * populateSubnet will be done when link comes up later so it is redundant.
+ * populateRoute still needs to be done for statically configured next hop hosts.
+ */
+ private void processRouteAddedInternal(Collection<ResolvedRoute> routes, boolean populateRouteOnly) {
if (!isReady()) {
log.info("System is not ready. Skip adding route for {}", routes);
return;
@@ -82,6 +87,11 @@
return;
}
+ ResolvedRoute rr = routes.stream().findFirst().orElse(null);
+ if (rr == null) {
+ log.warn("No resolved route found. Abort processRouteAddedInternal");
+ }
+
Set<ConnectPoint> allLocations = Sets.newHashSet();
Set<IpPrefix> allPrefixes = Sets.newHashSet();
routes.forEach(route -> {
@@ -91,6 +101,7 @@
log.debug("RouteAdded. populateSubnet {}, {}", allLocations, allPrefixes);
srManager.defaultRoutingHandler.populateSubnet(allLocations, allPrefixes);
+
routes.forEach(route -> {
IpPrefix prefix = route.prefix();
MacAddress nextHopMac = route.nextHopMac();
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RoutingRulePopulator.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RoutingRulePopulator.java
index b5fcbcb..a6f6e32 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RoutingRulePopulator.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/RoutingRulePopulator.java
@@ -67,6 +67,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
@@ -133,18 +134,27 @@
* @param port port
* @param mac mac address
* @param vlanId VLAN ID
+ * @return future that carries the flow objective if succeeded, null if otherwise
*/
- void populateBridging(DeviceId deviceId, PortNumber port, MacAddress mac, VlanId vlanId) {
+ CompletableFuture<Objective> populateBridging(DeviceId deviceId, PortNumber port, MacAddress mac, VlanId vlanId) {
ForwardingObjective.Builder fob = bridgingFwdObjBuilder(deviceId, mac, vlanId, port, false);
if (fob == null) {
log.warn("Fail to build fwd obj for host {}/{}. Abort.", mac, vlanId);
- return;
+ return CompletableFuture.completedFuture(null);
}
+ CompletableFuture<Objective> future = new CompletableFuture<>();
ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("Brigding rule for {}/{} populated", mac, vlanId),
- (objective, error) -> log.warn("Failed to populate bridging rule for {}/{}: {}", mac, vlanId, error));
+ (objective) -> {
+ log.debug("Brigding rule for {}/{} populated", mac, vlanId);
+ future.complete(objective);
+ },
+ (objective, error) -> {
+ log.warn("Failed to populate bridging rule for {}/{}: {}", mac, vlanId, error);
+ future.complete(null);
+ });
srManager.flowObjectiveService.forward(deviceId, fob.add(context));
+ return future;
}
/**
@@ -155,18 +165,27 @@
* @param port port
* @param mac mac address
* @param vlanId VLAN ID
+ * @return future that carries the flow objective if succeeded, null if otherwise
*/
- void revokeBridging(DeviceId deviceId, PortNumber port, MacAddress mac, VlanId vlanId) {
+ CompletableFuture<Objective> revokeBridging(DeviceId deviceId, PortNumber port, MacAddress mac, VlanId vlanId) {
ForwardingObjective.Builder fob = bridgingFwdObjBuilder(deviceId, mac, vlanId, port, true);
if (fob == null) {
log.warn("Fail to build fwd obj for host {}/{}. Abort.", mac, vlanId);
- return;
+ return CompletableFuture.completedFuture(null);
}
+ CompletableFuture<Objective> future = new CompletableFuture<>();
ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("Brigding rule for {}/{} revoked", mac, vlanId),
- (objective, error) -> log.warn("Failed to revoke bridging rule for {}/{}: {}", mac, vlanId, error));
+ (objective) -> {
+ log.debug("Brigding rule for {}/{} revoked", mac, vlanId);
+ future.complete(objective);
+ },
+ (objective, error) -> {
+ log.warn("Failed to revoke bridging rule for {}/{}: {}", mac, vlanId, error);
+ future.complete(null);
+ });
srManager.flowObjectiveService.forward(deviceId, fob.remove(context));
+ return future;
}
/**
@@ -309,8 +328,9 @@
* @param hostVlanId Vlan ID of the nexthop
* @param outPort port where the next hop attaches to
* @param directHost host is of type direct or indirect
+ * @return future that carries the flow objective if succeeded, null if otherwise
*/
- void populateRoute(DeviceId deviceId, IpPrefix prefix,
+ CompletableFuture<Objective> populateRoute(DeviceId deviceId, IpPrefix prefix,
MacAddress hostMac, VlanId hostVlanId, PortNumber outPort, boolean directHost) {
log.debug("Populate direct routing entry for route {} at {}:{}",
prefix, deviceId, outPort);
@@ -320,23 +340,28 @@
hostVlanId, outPort, null, null, directHost, false);
} catch (DeviceConfigNotFoundException e) {
log.warn(e.getMessage() + " Aborting direct populateRoute");
- return;
+ return CompletableFuture.completedFuture(null);
}
if (fwdBuilder == null) {
log.warn("Aborting host routing table entry due "
+ "to error for dev:{} route:{}", deviceId, prefix);
- return;
+ return CompletableFuture.completedFuture(null);
}
int nextId = fwdBuilder.add().nextId();
+ CompletableFuture<Objective> future = new CompletableFuture<>();
ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("Direct routing rule for route {} populated. nextId={}",
- prefix, nextId),
- (objective, error) ->
- log.warn("Failed to populate direct routing rule for route {}: {}",
- prefix, error));
+ (objective) -> {
+ log.debug("Direct routing rule for route {} populated. nextId={}", prefix, nextId);
+ future.complete(objective);
+ },
+ (objective, error) -> {
+ log.warn("Failed to populate direct routing rule for route {}: {}", prefix, error);
+ future.complete(null);
+ });
srManager.flowObjectiveService.forward(deviceId, fwdBuilder.add(context));
rulePopulationCounter.incrementAndGet();
+ return future;
}
/**
@@ -349,8 +374,9 @@
* @param hostVlanId Vlan ID of the nexthop
* @param outPort port that next hop attaches to
* @param directHost host is of type direct or indirect
+ * @return future that carries the flow objective if succeeded, null if otherwise
*/
- void revokeRoute(DeviceId deviceId, IpPrefix prefix,
+ CompletableFuture<Objective> revokeRoute(DeviceId deviceId, IpPrefix prefix,
MacAddress hostMac, VlanId hostVlanId, PortNumber outPort, boolean directHost) {
log.debug("Revoke IP table entry for route {} at {}:{}",
prefix, deviceId, outPort);
@@ -360,18 +386,26 @@
hostVlanId, outPort, null, null, directHost, true);
} catch (DeviceConfigNotFoundException e) {
log.warn(e.getMessage() + " Aborting revokeIpRuleForHost.");
- return;
+ return CompletableFuture.completedFuture(null);
}
if (fwdBuilder == null) {
log.warn("Aborting host routing table entries due "
+ "to error for dev:{} route:{}", deviceId, prefix);
- return;
+ return CompletableFuture.completedFuture(null);
}
+
+ CompletableFuture<Objective> future = new CompletableFuture<>();
ObjectiveContext context = new DefaultObjectiveContext(
- (objective) -> log.debug("IP rule for route {} revoked", prefix),
- (objective, error) ->
- log.warn("Failed to revoke IP rule for route {}: {}", prefix, error));
+ (objective) -> {
+ log.debug("IP rule for route {} revoked", prefix);
+ future.complete(objective);
+ },
+ (objective, error) -> {
+ log.warn("Failed to revoke IP rule for route {}: {}", prefix, error);
+ future.complete(null);
+ });
srManager.flowObjectiveService.forward(deviceId, fwdBuilder.remove(context));
+ return future;
}
/**
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 12bd9e4..5f11501 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -104,6 +104,7 @@
import org.onosproject.segmentrouting.mcast.McastRole;
import org.onosproject.segmentrouting.mcast.McastRoleStoreKey;
import org.onosproject.segmentrouting.mcast.McastStoreKey;
+import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
import org.onosproject.segmentrouting.pwaas.DefaultL2Tunnel;
import org.onosproject.segmentrouting.pwaas.DefaultL2TunnelDescription;
import org.onosproject.segmentrouting.pwaas.DefaultL2TunnelHandler;
@@ -261,6 +262,10 @@
policy = ReferencePolicy.DYNAMIC)
public volatile XconnectService xconnectService;
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+ policy = ReferencePolicy.DYNAMIC)
+ volatile PhasedRecoveryService phasedRecoveryService;
+
/** Enable active probing to discover dual-homed hosts. */
boolean activeProbing = ACTIVE_PROBING_DEFAULT;
@@ -1049,6 +1054,21 @@
}
@Override
+ public boolean isRoutingStable() {
+ return defaultRoutingHandler.isRoutingStable();
+ }
+
+ @Override
+ public void initHost(DeviceId deviceId) {
+ hostEventExecutor.execute(() -> hostHandler.init(deviceId));
+ }
+
+ @Override
+ public void initRoute(DeviceId deviceId) {
+ routeEventExecutor.execute(() -> routeHandler.init(deviceId));
+ }
+
+ @Override
public ApplicationId appId() {
return appId;
}
@@ -1146,6 +1166,25 @@
return Optional.ofNullable(deviceConfig).map(SegmentRoutingDeviceConfig::pairLocalPort);
}
+ @Override
+ public Set<PortNumber> getInfraPorts(DeviceId deviceId) {
+ return deviceService.getPorts(deviceId).stream()
+ .map(port -> new ConnectPoint(port.element().id(), port.number()))
+ .filter(cp -> interfaceService.getInterfacesByPort(cp).isEmpty())
+ .map(ConnectPoint::port)
+ .collect(Collectors.toSet());
+ }
+
+ @Override
+ public Set<PortNumber> getEdgePorts(DeviceId deviceId) {
+ return deviceService.getPorts(deviceId).stream()
+ .map(port -> new ConnectPoint(port.element().id(), port.number()))
+ .filter(cp -> !interfaceService.getInterfacesByPort(cp).isEmpty() &&
+ !cp.port().equals(getPairLocalPort(deviceId).orElse(null)))
+ .map(ConnectPoint::port)
+ .collect(Collectors.toSet());
+ }
+
/**
* Returns locations of given resolved route.
*
@@ -1570,11 +1609,10 @@
DefaultGroupHandler groupHandler = groupHandlerMap.get(deviceId);
groupHandler.createGroupsFromVlanConfig();
routingRulePopulator.populateSubnetBroadcastRule(deviceId);
+ phasedRecoveryService.init(deviceId);
}
appCfgHandler.init(deviceId);
- hostEventExecutor.execute(() -> hostHandler.init(deviceId));
- routeEventExecutor.execute(() -> routeHandler.init(deviceId));
}
private void processDeviceRemoved(Device device) {
@@ -1610,6 +1648,8 @@
// done after all rerouting or rehashing has been completed
groupHandlerMap.entrySet()
.forEach(entry -> entry.getValue().cleanUpForNeighborDown(device.id()));
+
+ phasedRecoveryService.reset(device.id());
}
/**
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
index 7721036..3bfd88b 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingService.java
@@ -15,7 +15,6 @@
*/
package org.onosproject.segmentrouting;
-import com.google.common.annotations.Beta;
import com.google.common.collect.Multimap;
import org.apache.commons.lang3.NotImplementedException;
import org.onlab.packet.IpAddress;
@@ -354,6 +353,29 @@
Map<DeviceId, List<McastFilteringObjStoreKey>> getMcastFilters();
/**
+ * Determines if routing in the network has been stable in the last
+ * STABILITY_THRESHOLD seconds, by comparing the current time to the last
+ * routing change timestamp.
+ *
+ * @return true if stable
+ */
+ boolean isRoutingStable();
+
+ /**
+ * Invoke hostHandler.init() for given device.
+ *
+ * @param deviceId device ID
+ */
+ void initHost(DeviceId deviceId);
+
+ /**
+ * Invoke routeHandler.init() for given device.
+ *
+ * @param deviceId device ID
+ */
+ void initRoute(DeviceId deviceId);
+
+ /**
* Gets application id.
*
* @return application id
@@ -371,33 +393,47 @@
* @param connectPoint connect point
* @return internal VLAN or null if both vlan-untagged and vlan-native are undefined
*/
- @Beta
default VlanId getInternalVlanId(ConnectPoint connectPoint) {
throw new NotImplementedException("getInternalVlanId not implemented");
}
-
/**
* Returns optional pair device ID of given device.
*
* @param deviceId device ID
* @return optional pair device ID. Might be empty if pair device is not configured
*/
- @Beta
default Optional<DeviceId> getPairDeviceId(DeviceId deviceId) {
throw new NotImplementedException("getPairDeviceId not implemented");
}
-
/**
* Returns optional pair device local port of given device.
*
* @param deviceId device ID
* @return optional pair device ID. Might be empty if pair device is not configured
*/
- @Beta
default Optional<PortNumber> getPairLocalPort(DeviceId deviceId) {
throw new NotImplementedException("getPairLocalPort not implemented");
}
+ /**
+ * Returns a set of infrastructure ports on the given device.
+ *
+ * @param deviceId device ID
+ * @return a set of ports that does not have interface configuration
+ */
+ default Set<PortNumber> getInfraPorts(DeviceId deviceId) {
+ throw new NotImplementedException("getInfraPorts not implemented");
+ }
+
+ /**
+ * Returns a set of edge ports on the given device.
+ *
+ * @param deviceId device ID
+ * @return a set of ports that has interface configuration
+ */
+ default Set<PortNumber> getEdgePorts(DeviceId deviceId) {
+ throw new NotImplementedException("getEdgePorts not implemented");
+ }
}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PhaseCompleter.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PhaseCompleter.java
new file mode 100644
index 0000000..bd16068
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PhaseCompleter.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright 2014-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.segmentrouting.cli;
+
+import com.google.common.collect.Lists;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractChoicesCompleter;
+import org.onosproject.segmentrouting.phasedrecovery.api.Phase;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Phase completer.
+ */
+@Service
+public class PhaseCompleter extends AbstractChoicesCompleter {
+ @Override
+ protected List<String> choices() {
+ return Lists.newArrayList(Phase.values()).stream().map(Enum::toString).collect(Collectors.toList());
+ }
+}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PhasedRecoveryListCommand.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PhasedRecoveryListCommand.java
new file mode 100644
index 0000000..6efdaaa
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PhasedRecoveryListCommand.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
+
+@Service
+@Command(scope = "onos", name = "sr-pr-list", description = "List current recovery phase of each device")
+
+public class PhasedRecoveryListCommand extends AbstractShellCommand {
+ @Override
+ protected void doExecute() {
+ PhasedRecoveryService prService = get(PhasedRecoveryService.class);
+ print(prService.getPhases().toString());
+ }
+}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PhasedRecoverySetCommand.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PhasedRecoverySetCommand.java
new file mode 100644
index 0000000..50e3bc6
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PhasedRecoverySetCommand.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.segmentrouting.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.cli.net.DeviceIdCompleter;
+import org.onosproject.net.DeviceId;
+import org.onosproject.segmentrouting.phasedrecovery.api.Phase;
+import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
+
+@Service
+@Command(scope = "onos", name = "sr-pr-set", description = "Set recovery phase of given device")
+
+public class PhasedRecoverySetCommand extends AbstractShellCommand {
+ @Argument(index = 0, name = "deviceId",
+ description = "Device ID",
+ required = true, multiValued = false)
+ @Completion(DeviceIdCompleter.class)
+ private String deviceIdStr;
+
+ @Argument(index = 1, name = "phase",
+ description = "Recovery phase",
+ required = true, multiValued = false)
+ @Completion(PhaseCompleter.class)
+ private String phaseStr;
+
+ @Override
+ protected void doExecute() {
+ DeviceId deviceId = DeviceId.deviceId(deviceIdStr);
+ Phase newPhase = Phase.valueOf(phaseStr);
+
+ PhasedRecoveryService prService = get(PhasedRecoveryService.class);
+ prService.setPhase(deviceId, newPhase);
+ }
+}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PortsCommand.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PortsCommand.java
new file mode 100644
index 0000000..f24d28f
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/cli/PortsCommand.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.cli;
+
+import org.apache.karaf.shell.api.action.Argument;
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.Completion;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.cli.PlaceholderCompleter;
+import org.onosproject.cli.net.DeviceIdCompleter;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+@Service
+@Command(scope = "onos", name = "sr-ports", description = "Enable/Disable group of ports on a specific device")
+
+public class PortsCommand extends AbstractShellCommand {
+ @Argument(index = 0, name = "deviceId",
+ description = "Device ID",
+ required = true, multiValued = false)
+ @Completion(DeviceIdCompleter.class)
+ private String deviceIdStr;
+
+ @Argument(index = 1, name = "ports",
+ description = "Ports to be enabled/disabled: ALL, PAIR, INFRA, EDGE",
+ required = true, multiValued = false)
+ @Completion(PlaceholderCompleter.class)
+ private String portsStr;
+
+ @Argument(index = 2, name = "action",
+ description = "Action: ENABLE, DISABLE",
+ required = true, multiValued = false)
+ @Completion(PlaceholderCompleter.class)
+ private String actionStr;
+
+ @Override
+ protected void doExecute() {
+ PhasedRecoveryService prService = get(PhasedRecoveryService.class);
+
+ DeviceId deviceId = DeviceId.deviceId(deviceIdStr);
+
+ boolean enabled;
+ switch (actionStr.toUpperCase()) {
+ case "ENABLE":
+ enabled = true;
+ break;
+ case "DISABLE":
+ enabled = false;
+ break;
+ default:
+ print("Action should be either ENABLE or DISABLE");
+ return;
+ }
+
+ Set<PortNumber> portsChanged;
+ switch (portsStr.toUpperCase()) {
+ case "ALL":
+ portsChanged = prService.changeAllPorts(deviceId, enabled);
+ break;
+ case "PAIR":
+ portsChanged = prService.changePairPort(deviceId, enabled);
+ break;
+ case "INFRA":
+ portsChanged = prService.changeInfraPorts(deviceId, enabled);
+ break;
+ case "EDGE":
+ portsChanged = prService.changeEdgePorts(deviceId, enabled);
+ break;
+ default:
+ print("Ports should be ALL, PAIR, INFRA, EDGE");
+ return;
+ }
+ print("Ports set to %s: %s",
+ enabled ? "enabled" : "disabled",
+ portsChanged.stream().map(PortNumber::toLong).collect(Collectors.toSet()));
+ }
+}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/OsgiPropertyConstants.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/OsgiPropertyConstants.java
new file mode 100644
index 0000000..cdeb8a2
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/OsgiPropertyConstants.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.phasedrecovery.api;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+ private OsgiPropertyConstants() {}
+
+ public static final String PROP_PHASED_RECOVERY = "phasedRecovery";
+ public static final boolean PHASED_RECOVERY_DEFAULT = false;
+}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/Phase.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/Phase.java
new file mode 100644
index 0000000..7f18685
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/Phase.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.phasedrecovery.api;
+
+/**
+ * Phases of recovering devices.
+ */
+public enum Phase {
+
+ /**
+ * Device is waiting for ports to be reported.
+ */
+ PENDING,
+
+ /**
+ * Pair port enabled. This is the initial state for paired devices.
+ */
+ PAIR,
+
+ /**
+ * Infrastructure ports enabled.
+ */
+ INFRA,
+
+ /**
+ * Edge ports enabled. This is the initial state for non-paired and spine devices.
+ */
+ EDGE
+}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/PhasedRecoveryService.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/PhasedRecoveryService.java
new file mode 100644
index 0000000..705ecb7
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/PhasedRecoveryService.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.phasedrecovery.api;
+
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.PortNumber;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Service that provides functionality related to phased recovery.
+ */
+public interface PhasedRecoveryService {
+
+ // TODO Make timeout values configurable via Component Config Service
+ /**
+ * Timeout for PAIR phase in seconds.
+ */
+ int PAIR_TIMEOUT = 30;
+
+ /**
+ * Timeout for INFRA phase in seconds.
+ */
+ int INFRA_TIMEOUT = 30;
+
+ /**
+ * Timeout for EDGE phase in seconds.
+ */
+ int EDGE_TIMEOUT = 30;
+
+ //
+ // Phased recovery APIs.
+ //
+
+
+ /**
+ * Returns true if phased recovery is enabled.
+ *
+ * @return true if phased recovery is enabled.
+ */
+ boolean isEnabled();
+
+ /**
+ * Initializes a device. Only the master of the device is allowed to do this.
+ *
+ * @param deviceId device ID
+ * @return true if the device is initialized successfully and the caller should proceed,
+ * false if the device initialization has failed and the caller should abort.
+ */
+ boolean init(DeviceId deviceId);
+
+ /**
+ * Resets a device. Only the master of the device is allowed to do this.
+ *
+ * @param deviceId device ID
+ * @return true if the device is reset successfully.
+ * false if the device has not been previously initialized.
+ */
+ boolean reset(DeviceId deviceId);
+
+ /**
+ * Gets recovery phase of every devices.
+ *
+ * @return a map between device ID and recovery phase
+ */
+ Map<DeviceId, Phase> getPhases();
+
+ /**
+ * Gets recovery phase of given device.
+ *
+ * @param deviceId device ID
+ * @return current phase or null if the device wasn't seen before
+ */
+ Phase getPhase(DeviceId deviceId);
+
+ /**
+ * Sets given device with given recovery phase. Only the master of the device is allowed to do this.
+ *
+ * @param deviceId device ID
+ * @param newPhase recovery phase
+ * @return new phase if transition succeeded, otherwise return current phase.
+ */
+ Phase setPhase(DeviceId deviceId, Phase newPhase);
+
+ //
+ // Port manipulation APIs.
+ //
+
+ /**
+ * Enables every ports on the given device.
+ *
+ * @param deviceId device id
+ * @param enabled true to enable, false to disable
+ * @return ports that have been enabled
+ */
+ Set<PortNumber> changeAllPorts(DeviceId deviceId, boolean enabled);
+
+ /**
+ * Enables pair port on the given device.
+ *
+ * @param deviceId device id
+ * @param enabled true to enable, false to disable
+ * @return ports that have been enabled
+ */
+ Set<PortNumber> changePairPort(DeviceId deviceId, boolean enabled);
+
+ /**
+ * Enables infrastructure ports on the given device.
+ *
+ * @param deviceId device id
+ * @param enabled true to enable, false to disable
+ * @return ports that have been enabled
+ */
+ Set<PortNumber> changeInfraPorts(DeviceId deviceId, boolean enabled);
+
+ /**
+ * Enables edge ports on the given device.
+ *
+ * @param deviceId device id
+ * @param enabled true to enable, false to disable
+ * @return ports that have been enabled
+ */
+ Set<PortNumber> changeEdgePorts(DeviceId deviceId, boolean enabled);
+}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/package-info.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/package-info.java
new file mode 100644
index 0000000..cc556be
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/api/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Phased recovery API.
+ */
+package org.onosproject.segmentrouting.phasedrecovery.api;
\ No newline at end of file
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/impl/PhasedRecoveryManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/impl/PhasedRecoveryManager.java
new file mode 100644
index 0000000..5e25015
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/impl/PhasedRecoveryManager.java
@@ -0,0 +1,383 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.phasedrecovery.impl;
+
+import com.google.common.collect.Sets;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.segmentrouting.SegmentRoutingService;
+import org.onosproject.segmentrouting.phasedrecovery.api.Phase;
+import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PHASED_RECOVERY_DEFAULT;
+import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PROP_PHASED_RECOVERY;
+
+@Component(
+ immediate = true,
+ service = PhasedRecoveryService.class,
+ property = {
+ PROP_PHASED_RECOVERY + ":Boolean=" + PHASED_RECOVERY_DEFAULT
+ }
+)
+public class PhasedRecoveryManager implements PhasedRecoveryService {
+ private static final Logger log = LoggerFactory.getLogger(PhasedRecoveryManager.class);
+ private static final String APP_NAME = "org.onosproject.phasedrecovery";
+
+ // TODO Make these configurable via Component Config
+ // Amount of time delayed to wait for port description (in second)
+ private static final int PORT_CHECKER_INTERVAL = 1;
+ // Max number of retry for port checker
+ private static final int PORT_CHECKER_RETRIES = 5;
+ // RoutingStableChecker interval (in second)
+ private static final int ROUTING_CHECKER_DELAY = 3;
+ // RoutingStableChecker timeout (in second)
+ private static final int ROUTING_CHECKER_TIMEOUT = 15;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private ComponentConfigService compCfgService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private DeviceAdminService deviceAdminService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ private StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL)
+ volatile SegmentRoutingService srService;
+
+ /** Enabling phased recovery. */
+ boolean phasedRecovery = PHASED_RECOVERY_DEFAULT;
+
+ private ApplicationId appId;
+ private ConsistentMap<DeviceId, Phase> phasedRecoveryStore;
+ private ScheduledExecutorService executor = Executors.newScheduledThreadPool(
+ Runtime.getRuntime().availableProcessors(), groupedThreads("onos/sr/pr", "executor"));
+
+ @Activate
+ protected void activate(ComponentContext context) {
+ appId = coreService.registerApplication(APP_NAME);
+
+ KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(Phase.class);
+ phasedRecoveryStore = storageService.<DeviceId, Phase>consistentMapBuilder()
+ .withName("onos-sr-phasedrecovery")
+ .withRelaxedReadConsistency()
+ .withSerializer(Serializer.using(serializer.build()))
+ .build();
+
+ compCfgService.registerProperties(getClass());
+ modified(context);
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ phasedRecoveryStore.destroy();
+ compCfgService.unregisterProperties(getClass(), false);
+ log.info("Stopped");
+ }
+
+ @Modified
+ protected void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context.getProperties();
+ if (properties == null) {
+ return;
+ }
+
+ String strPhasedRecovery = Tools.get(properties, PROP_PHASED_RECOVERY);
+ boolean expectPhasedRecovery = Boolean.parseBoolean(strPhasedRecovery);
+ if (expectPhasedRecovery != phasedRecovery) {
+ phasedRecovery = expectPhasedRecovery;
+ log.info("{} phased recovery", phasedRecovery ? "Enabling" : "Disabling");
+ }
+ }
+
+ @Override
+ public boolean isEnabled() {
+ return phasedRecovery;
+ }
+
+ @Override
+ public boolean init(DeviceId deviceId) {
+ if (this.srService == null) {
+ log.info("SegmentRoutingService is not ready");
+ return false;
+ }
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ log.info("Not master of {}", deviceId);
+ return false;
+ }
+
+ Phase phase = Optional.ofNullable(phasedRecoveryStore.putIfAbsent(deviceId, Phase.PENDING))
+ .map(Versioned::value).orElse(null);
+
+ if (phase != null) {
+ log.info("{} has been initialized already. Skipping.", deviceId);
+ return false;
+ } else {
+ Phase nextPhase = (phasedRecovery && this.srService.getPairDeviceId(deviceId).isPresent()) ?
+ Phase.PAIR : Phase.EDGE;
+ if (nextPhase == Phase.PAIR) {
+ // Wait for the PORT_STAT before entering next phase.
+ // Note: Unlikely, when the device init fails due to PORT_STATS timeout,
+ // it requires operator to manually move the device to the next phase by CLI command.
+ executor.schedule(new PortChecker(deviceId, PORT_CHECKER_RETRIES),
+ PORT_CHECKER_INTERVAL, TimeUnit.SECONDS);
+ } else {
+ // We assume that all ports will be reported as enabled on devices that don't require phased recovery
+ setPhase(deviceId, Phase.EDGE);
+ }
+ return true;
+ }
+ }
+
+ @Override
+ public boolean reset(DeviceId deviceId) {
+ if (this.srService == null) {
+ log.info("SegmentRoutingService is not ready");
+ return false;
+ }
+ // FIXME Skip mastership checking since master will not be available when a device goes offline
+ // Improve this when persistent mastership is introduced
+
+ Phase result = Optional.ofNullable(phasedRecoveryStore.remove(deviceId))
+ .map(Versioned::value).orElse(null);
+ if (result != null) {
+ log.info("{} is reset", deviceId);
+ }
+ return result != null;
+ }
+
+ @Override
+ public Map<DeviceId, Phase> getPhases() {
+ return phasedRecoveryStore.asJavaMap();
+ }
+
+ @Override
+ public Phase getPhase(DeviceId deviceId) {
+ return Optional.ofNullable(phasedRecoveryStore.get(deviceId)).map(Versioned::value).orElse(null);
+ }
+
+ @Override
+ public Phase setPhase(DeviceId deviceId, Phase newPhase) {
+ if (this.srService == null) {
+ log.info("SegmentRoutingService is not ready");
+ return null;
+ }
+ if (!mastershipService.isLocalMaster(deviceId)) {
+ log.info("Not master of {}", deviceId);
+ return null;
+ }
+
+ return Optional.ofNullable(phasedRecoveryStore.compute(deviceId, (k, v) -> {
+ if (v == null && newPhase == Phase.PENDING) {
+ log.info("Initializing {}", deviceId);
+ return newPhase;
+ } else if (v == Phase.PENDING && newPhase == Phase.PAIR) {
+ srService.initHost(deviceId);
+ // RouteHandler init is intentionally skipped when phased recovery is on.
+ // Edge ports remain down in this phase. Therefore, no nexthop will be discovered on the given device.
+ // The flow on given device will be programmed later by hostHandler.processHostMovedEvent()
+ changePairPort(deviceId, true);
+ log.info("Transitioning {} from PENDING to PAIR", deviceId);
+ return newPhase;
+ } else if (v == Phase.PAIR && newPhase == Phase.INFRA) {
+ changeInfraPorts(deviceId, true);
+ srService.initRoute(deviceId);
+ log.info("Transitioning {} from PAIR to INFRA", deviceId);
+ monitorRoutingStability(deviceId);
+ return newPhase;
+ } else if (v == Phase.INFRA && newPhase == Phase.EDGE) {
+ changeEdgePorts(deviceId, true);
+ log.info("Transitioning {} from INFRA to EDGE", deviceId);
+ return newPhase;
+ } else if (v == Phase.PENDING && newPhase == Phase.EDGE) {
+ changeAllPorts(deviceId, true);
+ srService.initHost(deviceId);
+ srService.initRoute(deviceId);
+ log.info("Transitioning {} from PENDING to EDGE", deviceId);
+ return newPhase;
+ } else {
+ log.debug("Ignore illegal state transition on {} from {} to {}", deviceId, v, newPhase);
+ return v;
+ }
+ })).map(Versioned::value).orElse(null);
+ }
+
+ private void monitorRoutingStability(DeviceId deviceId) {
+ CompletableFuture<Void> checkerFuture = new CompletableFuture<>();
+ CompletableFuture<Void> timeoutFuture =
+ Tools.completeAfter(ROUTING_CHECKER_TIMEOUT, TimeUnit.SECONDS);
+ RoutingStabilityChecker checker = new RoutingStabilityChecker(checkerFuture);
+
+ checkerFuture.runAfterEitherAsync(timeoutFuture, () -> {
+ if (checkerFuture.isDone()) {
+ log.info("Routing stable. Move {} to the next phase", deviceId);
+ } else {
+ log.info("Timeout reached. Move {} to the next phase", deviceId);
+ // Mark the future as completed to signify the termination of periodical checker
+ checkerFuture.complete(null);
+ }
+ setPhase(deviceId, Phase.EDGE);
+ });
+
+ executor.schedule(checker, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public Set<PortNumber> changeAllPorts(DeviceId deviceId, boolean enabled) {
+ if (this.srService == null) {
+ log.warn("SegmentRoutingService is not ready. Unable to changeAllPorts({}) to {}", deviceId, enabled);
+ return Sets.newHashSet();
+ }
+ Set<PortNumber> portsToBeEnabled = deviceAdminService.getPorts(deviceId)
+ .stream().map(Port::number).collect(Collectors.toSet());
+ changePorts(deviceId, portsToBeEnabled, enabled);
+ return portsToBeEnabled;
+ }
+
+ @Override
+ public Set<PortNumber> changePairPort(DeviceId deviceId, boolean enabled) {
+ if (this.srService == null) {
+ log.warn("SegmentRoutingService is not ready. Unable to changePairPort({}) to {}", deviceId, enabled);
+ return Sets.newHashSet();
+ }
+ Set<PortNumber> portsToBeEnabled = this.srService.getPairLocalPort(deviceId)
+ .map(Sets::newHashSet).orElse(Sets.newHashSet());
+ changePorts(deviceId, portsToBeEnabled, enabled);
+ return portsToBeEnabled;
+ }
+
+ @Override
+ public Set<PortNumber> changeInfraPorts(DeviceId deviceId, boolean enabled) {
+ if (this.srService == null) {
+ log.warn("SegmentRoutingService is not ready. Unable to changeInfraPorts({}) to {}", deviceId, enabled);
+ return Sets.newHashSet();
+ }
+ Set<PortNumber> portsToBeEnabled = this.srService.getInfraPorts(deviceId);
+ changePorts(deviceId, portsToBeEnabled, enabled);
+ return portsToBeEnabled;
+ }
+
+ @Override
+ public Set<PortNumber> changeEdgePorts(DeviceId deviceId, boolean enabled) {
+ if (this.srService == null) {
+ log.warn("SegmentRoutingService is not ready. Unable to changeEdgePorts({}) to {}", deviceId, enabled);
+ return Sets.newHashSet();
+ }
+ Set<PortNumber> portsToBeEnabled = this.srService.getEdgePorts(deviceId);
+ changePorts(deviceId, portsToBeEnabled, enabled);
+ return portsToBeEnabled;
+ }
+
+ private void changePorts(DeviceId deviceId, Set<PortNumber> portNumbers, boolean enabled) {
+ log.info("{} {} on {}", enabled ? "Enabled" : "Disabled", portNumbers, deviceId);
+ portNumbers.forEach(portNumber ->
+ deviceAdminService.changePortState(deviceId, portNumber, enabled));
+ }
+
+ private class PortChecker implements Runnable {
+ int retries;
+ DeviceId deviceId;
+
+ PortChecker(DeviceId deviceId, int retries) {
+ this.deviceId = deviceId;
+ this.retries = retries;
+ }
+
+ @Override
+ public void run() {
+ retries -= 1;
+ if (retries < 0) {
+ log.warn("PORT_STATS timeout. Unable to initialize {}", deviceId);
+ return;
+ }
+
+ if (!deviceAdminService.getPorts(deviceId).isEmpty()) {
+ log.info("{} reported PORT_STATS", deviceId);
+ setPhase(deviceId, Phase.PAIR);
+ }
+ log.info("{} still waiting for PORT_STATS", deviceId);
+ executor.schedule(this, PORT_CHECKER_INTERVAL, TimeUnit.SECONDS);
+ }
+ }
+
+ private class RoutingStabilityChecker implements Runnable {
+ private final CompletableFuture<Void> future;
+
+ RoutingStabilityChecker(CompletableFuture<Void> future) {
+ this.future = future;
+ }
+
+ @Override
+ public void run() {
+ // Do not continue if the future has been completed
+ if (future.isDone()) {
+ log.trace("RouteStabilityChecker is done. Stop checking");
+ return;
+ }
+
+ if (srService.isRoutingStable()) {
+ log.trace("Routing is stable");
+ future.complete(null);
+ } else {
+ log.trace("Routing is not yet stable");
+ executor.schedule(this, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS);
+ }
+ }
+ }
+}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/impl/package-info.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/impl/package-info.java
new file mode 100644
index 0000000..3788085
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/phasedrecovery/impl/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Phased recovery implementation.
+ */
+package org.onosproject.segmentrouting.phasedrecovery.impl;
\ No newline at end of file
diff --git a/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java b/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java
index dedd66f..d67cb07 100644
--- a/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java
+++ b/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/HostHandlerTest.java
@@ -59,6 +59,7 @@
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import static org.easymock.EasyMock.createMock;
import static org.easymock.EasyMock.createNiceMock;
@@ -569,10 +570,12 @@
// Host moved from [1A/1, 1B/1] to [1A/2, 1B/1]
// We should expect only one bridging flow and one routing flow programmed on 1A
- mockDefaultRoutingHandler.populateBridging(DEV3, P2, HOST_MAC, HOST_VLAN_UNTAGGED);
- expectLastCall().times(1);
- mockDefaultRoutingHandler.populateRoute(DEV3, HOST_IP11.toIpPrefix(), HOST_MAC, HOST_VLAN_UNTAGGED, P2, true);
- expectLastCall().times(1);
+
+ expect(mockDefaultRoutingHandler.populateBridging(DEV3, P2, HOST_MAC, HOST_VLAN_UNTAGGED))
+ .andReturn(CompletableFuture.completedFuture(null)).once();
+ expect(mockDefaultRoutingHandler.populateRoute(DEV3, HOST_IP11.toIpPrefix(),
+ HOST_MAC, HOST_VLAN_UNTAGGED, P2, true))
+ .andReturn(CompletableFuture.completedFuture(null)).once();
replay(mockDefaultRoutingHandler);
hostHandler.processHostMovedEvent(new HostEvent(HostEvent.Type.HOST_MOVED, host2, host1));
diff --git a/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/MockFlowObjectiveService.java b/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/MockFlowObjectiveService.java
index 138b98a..0f2d7a9 100644
--- a/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/MockFlowObjectiveService.java
+++ b/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/MockFlowObjectiveService.java
@@ -31,6 +31,7 @@
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
import org.onosproject.net.flowobjective.ForwardingObjective;
import org.onosproject.net.flowobjective.Objective;
+import org.onosproject.net.flowobjective.ObjectiveError;
import java.util.Map;
@@ -72,9 +73,13 @@
if (op.equals(Objective.Operation.ADD)) {
bridgingTable.put(btKey, btValue);
+ forwardingObjective.context().ifPresent(context -> context.onSuccess(forwardingObjective));
} else if (op.equals(Objective.Operation.REMOVE)) {
bridgingTable.remove(btKey, btValue);
+ forwardingObjective.context().ifPresent(context -> context.onSuccess(forwardingObjective));
} else {
+ forwardingObjective.context().ifPresent(context ->
+ context.onError(forwardingObjective, ObjectiveError.UNKNOWN));
throw new IllegalArgumentException();
}
}
diff --git a/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/MockRoutingRulePopulator.java b/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/MockRoutingRulePopulator.java
index 77eeed2..4d222d8 100644
--- a/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/MockRoutingRulePopulator.java
+++ b/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/MockRoutingRulePopulator.java
@@ -21,8 +21,10 @@
import org.onlab.packet.VlanId;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
+import org.onosproject.net.flowobjective.Objective;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
/**
* Mock Routing Rule Populator.
@@ -37,18 +39,20 @@
}
@Override
- public void populateRoute(DeviceId deviceId, IpPrefix prefix,
- MacAddress hostMac, VlanId hostVlanId, PortNumber outPort, boolean directHost) {
+ public CompletableFuture<Objective> populateRoute(DeviceId deviceId, IpPrefix prefix, MacAddress hostMac,
+ VlanId hostVlanId, PortNumber outPort, boolean directHost) {
MockRoutingTableKey rtKey = new MockRoutingTableKey(deviceId, prefix);
MockRoutingTableValue rtValue = new MockRoutingTableValue(outPort, hostMac, hostVlanId);
routingTable.put(rtKey, rtValue);
+ return CompletableFuture.completedFuture(null);
}
@Override
- public void revokeRoute(DeviceId deviceId, IpPrefix prefix,
+ public CompletableFuture<Objective> revokeRoute(DeviceId deviceId, IpPrefix prefix,
MacAddress hostMac, VlanId hostVlanId, PortNumber outPort, boolean directHost) {
MockRoutingTableKey rtKey = new MockRoutingTableKey(deviceId, prefix);
MockRoutingTableValue rtValue = new MockRoutingTableValue(outPort, hostMac, hostVlanId);
routingTable.remove(rtKey, rtValue);
+ return CompletableFuture.completedFuture(null);
}
}
diff --git a/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/RouteHandlerTest.java b/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/RouteHandlerTest.java
index 6332856..f2a439b 100644
--- a/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/RouteHandlerTest.java
+++ b/apps/segmentrouting/app/src/test/java/org/onosproject/segmentrouting/RouteHandlerTest.java
@@ -39,6 +39,7 @@
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostService;
+import org.onosproject.net.host.InterfaceIpAddress;
import org.onosproject.net.intf.Interface;
import org.onosproject.net.provider.ProviderId;
import org.onosproject.routeservice.ResolvedRoute;
@@ -46,6 +47,7 @@
import org.onosproject.routeservice.RouteEvent;
import org.onosproject.segmentrouting.config.DeviceConfiguration;
import org.onosproject.segmentrouting.config.SegmentRoutingDeviceConfig;
+import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.TestConsistentMap;
import org.onosproject.store.service.TestConsistentMultimap;
@@ -82,15 +84,17 @@
private static final IpPrefix P1 = IpPrefix.valueOf("10.0.0.0/24");
// Single homed router 1
- private static final IpAddress N1 = IpAddress.valueOf("10.0.1.254");
+ private static final IpAddress N1 = IpAddress.valueOf("10.0.1.1");
private static final MacAddress M1 = MacAddress.valueOf("00:00:00:00:00:01");
private static final VlanId V1 = VlanId.vlanId((short) 1);
private static final ConnectPoint CP1 = ConnectPoint.deviceConnectPoint("of:0000000000000001/1");
private static final Route R1 = new Route(Route.Source.STATIC, P1, N1);
private static final ResolvedRoute RR1 = new ResolvedRoute(R1, M1, V1);
+ private static final Route DHCP_R1 = new Route(Route.Source.DHCP, P1, N1);
+ private static final ResolvedRoute DHCP_RR1 = new ResolvedRoute(DHCP_R1, M1, V1);
// Single homed router 2
- private static final IpAddress N2 = IpAddress.valueOf("10.0.2.254");
+ private static final IpAddress N2 = IpAddress.valueOf("10.0.2.1");
private static final MacAddress M2 = MacAddress.valueOf("00:00:00:00:00:02");
private static final VlanId V2 = VlanId.vlanId((short) 2);
private static final ConnectPoint CP2 = ConnectPoint.deviceConnectPoint("of:0000000000000002/2");
@@ -98,14 +102,16 @@
private static final ResolvedRoute RR2 = new ResolvedRoute(R2, M2, V2);
// Dual homed router 1
- private static final IpAddress N3 = IpAddress.valueOf("10.0.3.254");
+ private static final IpAddress N3 = IpAddress.valueOf("10.0.3.1");
private static final MacAddress M3 = MacAddress.valueOf("00:00:00:00:00:03");
private static final VlanId V3 = VlanId.vlanId((short) 3);
private static final Route R3 = new Route(Route.Source.STATIC, P1, N3);
private static final ResolvedRoute RR3 = new ResolvedRoute(R3, M3, V3);
+ private static final Route DHCP_R3 = new Route(Route.Source.DHCP, P1, N3);
+ private static final ResolvedRoute DHCP_RR3 = new ResolvedRoute(DHCP_R3, M3, V3);
// Single homed router 3
- private static final IpAddress N4 = IpAddress.valueOf("10.0.4.254");
+ private static final IpAddress N4 = IpAddress.valueOf("10.0.4.1");
private static final MacAddress M4 = MacAddress.valueOf("00:00:00:00:00:04");
private static final VlanId V4 = VlanId.vlanId((short) 4);
private static final ConnectPoint CP4 = ConnectPoint.deviceConnectPoint("of:0000000000000004/4");
@@ -134,7 +140,15 @@
// A set of devices of which we have mastership
private static final Set<DeviceId> LOCAL_DEVICES = Sets.newHashSet(CP1.deviceId(), CP2.deviceId());
// A set of interfaces
- private static final Set<Interface> INTERFACES = Sets.newHashSet();
+ private static final InterfaceIpAddress IF_IP1 =
+ new InterfaceIpAddress(IpAddress.valueOf("10.0.1.254"), IpPrefix.valueOf("10.0.1.254/24"));
+ private static final InterfaceIpAddress IF_IP3 =
+ new InterfaceIpAddress(IpAddress.valueOf("10.0.3.254"), IpPrefix.valueOf("10.0.3.254/24"));
+ private static final Interface IF_CP1 = new Interface("if-cp1", CP1, Lists.newArrayList(IF_IP1, IF_IP3),
+ null, null, null, null, null);
+ private static final Interface IF_CP2 = new Interface("if-cp2", CP2, Lists.newArrayList(IF_IP1, IF_IP3),
+ null, null, null, null, null);
+ private static final Set<Interface> INTERFACES = Sets.newHashSet(IF_CP1, IF_CP2);
@Before
public void setUp() {
@@ -173,6 +187,9 @@
srManager.hostService = hostService;
srManager.cfgService = mockNetworkConfigRegistry;
srManager.routeService = new MockRouteService(ROUTE_STORE);
+ srManager.phasedRecoveryService = createMock(PhasedRecoveryService.class);
+ expect(srManager.phasedRecoveryService.isEnabled()).andReturn(true).anyTimes();
+ replay(srManager.phasedRecoveryService);
routeHandler = new RouteHandler(srManager);
@@ -194,7 +211,6 @@
assertEquals(CP1.port(), rtv1.portNumber);
assertEquals(1, SUBNET_TABLE.size());
- assertTrue(SUBNET_TABLE.get(CP1).contains(P1));
}
@Test
@@ -214,8 +230,46 @@
assertEquals(CP2.port(), rtv2.portNumber);
assertEquals(2, SUBNET_TABLE.size());
- assertTrue(SUBNET_TABLE.get(CP1).contains(P1));
- assertTrue(SUBNET_TABLE.get(CP2).contains(P1));
+ }
+
+ // Only one of two dual-homed next hops present.
+ // Expect one routing table to be programmed with direct flow.
+ // The other is not programmed, not even for subnet
+ @Test
+ public void initDhcpRouteSingleDualHomeNextHop() {
+ ROUTE_STORE.put(P1, Sets.newHashSet(DHCP_RR1));
+
+ routeHandler.init(CP1.deviceId());
+
+ assertEquals(1, ROUTING_TABLE.size());
+ MockRoutingTableValue rtv1 = ROUTING_TABLE.get(new MockRoutingTableKey(CP1.deviceId(), P1));
+ assertEquals(M1, rtv1.macAddress);
+ assertEquals(V1, rtv1.vlanId);
+ assertEquals(CP1.port(), rtv1.portNumber);
+
+ assertEquals(1, SUBNET_TABLE.size());
+ }
+
+ // Both dual-homed next hops present.
+ // Expect both routing table to be programmed with direct flow
+ @Test
+ public void initDhcpRouteBothDualHomeNextHop() {
+ ROUTE_STORE.put(P1, Sets.newHashSet(DHCP_RR3));
+
+ routeHandler.init(CP1.deviceId());
+
+ assertEquals(2, ROUTING_TABLE.size());
+ MockRoutingTableValue rtv1 = ROUTING_TABLE.get(new MockRoutingTableKey(CP1.deviceId(), P1));
+ assertEquals(M3, rtv1.macAddress);
+ assertEquals(V3, rtv1.vlanId);
+ assertEquals(CP1.port(), rtv1.portNumber);
+
+ MockRoutingTableValue rtv2 = ROUTING_TABLE.get(new MockRoutingTableKey(CP2.deviceId(), P1));
+ assertEquals(M3, rtv2.macAddress);
+ assertEquals(V3, rtv2.vlanId);
+ assertEquals(CP2.port(), rtv2.portNumber);
+
+ assertEquals(2, SUBNET_TABLE.size());
}
@Test
diff --git a/tools/package/runtime/bin/onos-diagnostics b/tools/package/runtime/bin/onos-diagnostics
index 0daa320..655bb40 100755
--- a/tools/package/runtime/bin/onos-diagnostics
+++ b/tools/package/runtime/bin/onos-diagnostics
@@ -118,6 +118,7 @@
"sr-next-pw"
"sr-next-xconnect"
"sr-next-mac-vlan"
+ "sr-pr-list"
"dhcp-relay"
"mcast-host-routes"
diff --git a/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java b/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java
index 279091c..b18a2fb 100644
--- a/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java
+++ b/utils/misc/src/main/java/org/onlab/util/PredictableExecutor.java
@@ -28,6 +28,7 @@
import java.util.Objects;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
@@ -136,6 +137,35 @@
execute(command, hintFunction.apply(command));
}
+ /**
+ * Submits a value-returning task for execution and returns a
+ * Future representing the pending results of the task. The
+ * Future's {@code get} method will return the task's result upon
+ * successful completion.
+ *
+ * @param command the {@link Runnable} task
+ * @param hint value to pick thread to run on.
+ * @return completable future representing the pending results
+ */
+ public CompletableFuture<Void> submit(Runnable command, int hint) {
+ int index = Math.abs(hint) % backends.size();
+ return CompletableFuture.runAsync(command, backends.get(index));
+ }
+
+ /**
+ * Submits a value-returning task for execution and returns a
+ * Future representing the pending results of the task. The
+ * Future's {@code get} method will return the task's result upon
+ * successful completion.
+ *
+ * @param command the {@link Runnable} task
+ * @param hintFunction Function to compute hint value
+ * @return completable future representing the pending results
+ */
+ public CompletableFuture<Void> submit(Runnable command, Function<Runnable, Integer> hintFunction) {
+ int hint = hintFunction.apply(command);
+ return submit(command, hint);
+ }
private static int hint(Runnable command) {
if (command instanceof PickyTask) {
diff --git a/utils/misc/src/main/java/org/onlab/util/Tools.java b/utils/misc/src/main/java/org/onlab/util/Tools.java
index e0ea0d5..596e34b 100644
--- a/utils/misc/src/main/java/org/onlab/util/Tools.java
+++ b/utils/misc/src/main/java/org/onlab/util/Tools.java
@@ -49,7 +49,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -80,6 +82,9 @@
private static final String INPUT_JSON_CANNOT_BE_NULL = "Input JSON cannot be null";
+ private static ScheduledExecutorService timer = Executors.newScheduledThreadPool(
+ Runtime.getRuntime().availableProcessors(), groupedThreads("onos/tool", "timer"));
+
/**
* Returns a thread factory that produces threads named according to the
* supplied name pattern.
@@ -699,6 +704,19 @@
}
/**
+ * Returns a future that completes normally after given time period.
+ *
+ * @param timeout amount of time to wait before completing the future
+ * @param unit Time unit
+ * @return a future that completes after given time period
+ */
+ public static CompletableFuture<Void> completeAfter(long timeout, TimeUnit unit) {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ timer.schedule(() -> result.complete(null), timeout, unit);
+ return result;
+ }
+
+ /**
* Returns a future that's completed using the given {@code orderedExecutor} if the future is not blocked or the
* given {@code threadPoolExecutor} if the future is blocked.
* <p>