| /* |
| * Copyright 2020-present Open Networking Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.onosproject.segmentrouting.phasedrecovery.impl; |
| |
| import com.google.common.collect.Sets; |
| import org.onlab.util.KryoNamespace; |
| import org.onlab.util.Tools; |
| import org.onosproject.cfg.ComponentConfigService; |
| import org.onosproject.core.ApplicationId; |
| import org.onosproject.core.CoreService; |
| import org.onosproject.net.DeviceId; |
| import org.onosproject.net.Port; |
| import org.onosproject.net.PortNumber; |
| import org.onosproject.net.device.DeviceAdminService; |
| import org.onosproject.segmentrouting.SegmentRoutingService; |
| import org.onosproject.segmentrouting.phasedrecovery.api.Phase; |
| import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService; |
| import org.onosproject.store.serializers.KryoNamespaces; |
| import org.onosproject.store.service.ConsistentMap; |
| import org.onosproject.store.service.Serializer; |
| import org.onosproject.store.service.StorageService; |
| import org.onosproject.store.service.Versioned; |
| import org.osgi.service.component.ComponentContext; |
| import org.osgi.service.component.annotations.Activate; |
| import org.osgi.service.component.annotations.Component; |
| import org.osgi.service.component.annotations.Deactivate; |
| import org.osgi.service.component.annotations.Modified; |
| import org.osgi.service.component.annotations.Reference; |
| import org.osgi.service.component.annotations.ReferenceCardinality; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.util.Dictionary; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.TimeUnit; |
| import java.util.stream.Collectors; |
| |
| import static org.onlab.util.Tools.groupedThreads; |
| import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PHASED_RECOVERY_DEFAULT; |
| import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PROP_PHASED_RECOVERY; |
| |
| @Component( |
| immediate = true, |
| service = PhasedRecoveryService.class, |
| property = { |
| PROP_PHASED_RECOVERY + ":Boolean=" + PHASED_RECOVERY_DEFAULT |
| } |
| ) |
| public class PhasedRecoveryManager implements PhasedRecoveryService { |
| private static final Logger log = LoggerFactory.getLogger(PhasedRecoveryManager.class); |
| private static final String APP_NAME = "org.onosproject.phasedrecovery"; |
| |
| // TODO Make these configurable via Component Config |
| // Amount of time delayed to wait for port description (in second) |
| private static final int PORT_CHECKER_INTERVAL = 1; |
| // Max number of retry for port checker |
| private static final int PORT_CHECKER_RETRIES = 15; |
| // RoutingStableChecker interval (in second) |
| private static final int ROUTING_CHECKER_DELAY = 3; |
| // RoutingStableChecker timeout (in second) |
| private static final int ROUTING_CHECKER_TIMEOUT = 15; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private CoreService coreService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private ComponentConfigService compCfgService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private DeviceAdminService deviceAdminService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| private StorageService storageService; |
| |
| @Reference(cardinality = ReferenceCardinality.OPTIONAL) |
| volatile SegmentRoutingService srService; |
| |
| /** Enabling phased recovery. */ |
| boolean phasedRecovery = PHASED_RECOVERY_DEFAULT; |
| |
| private ApplicationId appId; |
| private ConsistentMap<DeviceId, Phase> phasedRecoveryStore; |
| private ScheduledExecutorService executor = Executors.newScheduledThreadPool( |
| Runtime.getRuntime().availableProcessors(), groupedThreads("onos/sr/pr", "executor")); |
| |
| @Activate |
| protected void activate(ComponentContext context) { |
| appId = coreService.registerApplication(APP_NAME); |
| |
| KryoNamespace.Builder serializer = KryoNamespace.newBuilder() |
| .register(KryoNamespaces.API) |
| .register(Phase.class); |
| phasedRecoveryStore = storageService.<DeviceId, Phase>consistentMapBuilder() |
| .withName("onos-sr-phasedrecovery") |
| .withRelaxedReadConsistency() |
| .withSerializer(Serializer.using(serializer.build())) |
| .build(); |
| |
| compCfgService.registerProperties(getClass()); |
| modified(context); |
| log.info("Started"); |
| } |
| |
| @Deactivate |
| protected void deactivate() { |
| phasedRecoveryStore.destroy(); |
| compCfgService.unregisterProperties(getClass(), false); |
| log.info("Stopped"); |
| } |
| |
| @Modified |
| protected void modified(ComponentContext context) { |
| Dictionary<?, ?> properties = context.getProperties(); |
| if (properties == null) { |
| return; |
| } |
| |
| String strPhasedRecovery = Tools.get(properties, PROP_PHASED_RECOVERY); |
| boolean expectPhasedRecovery = Boolean.parseBoolean(strPhasedRecovery); |
| if (expectPhasedRecovery != phasedRecovery) { |
| phasedRecovery = expectPhasedRecovery; |
| log.info("{} phased recovery", phasedRecovery ? "Enabling" : "Disabling"); |
| } |
| } |
| |
| @Override |
| public boolean isEnabled() { |
| return phasedRecovery; |
| } |
| |
| @Override |
| public boolean init(DeviceId deviceId) { |
| if (this.srService == null) { |
| log.info("SegmentRoutingService is not ready"); |
| return false; |
| } |
| |
| if (!srService.shouldProgram(deviceId)) { |
| log.info("Skip init not leading the phase recovery of {}", deviceId); |
| return false; |
| } |
| |
| Phase phase = Optional.ofNullable(phasedRecoveryStore.putIfAbsent(deviceId, Phase.PENDING)) |
| .map(Versioned::value).orElse(null); |
| |
| if (phase != null) { |
| log.info("{} has been initialized already. Skipping.", deviceId); |
| return false; |
| } else { |
| if (phasedRecovery) { |
| // Even in case of EDGE as next phase, it is better to drive the transition |
| // to the next phase through the port checker. If the device is reported by |
| // a non master instance the first time, ports wont be available until the next |
| // port reconciliation. |
| Phase nextPhase = this.srService.getPairDeviceId(deviceId).isPresent() ? |
| Phase.PAIR : Phase.EDGE; |
| // Wait for the PORT_STAT before entering next phase. |
| // Note: Unlikely, when the device init fails due to PORT_STATS timeout, |
| // it requires operator to manually move the device to the next phase by CLI command. |
| executor.schedule(new PortChecker(deviceId, PORT_CHECKER_RETRIES, nextPhase), |
| PORT_CHECKER_INTERVAL, TimeUnit.SECONDS); |
| } else { |
| // We assume that all ports will be reported as enabled on devices that don't require phased recovery |
| setPhase(deviceId, Phase.EDGE); |
| } |
| return true; |
| } |
| } |
| |
| @Override |
| public boolean reset(DeviceId deviceId) { |
| if (this.srService == null) { |
| log.info("SegmentRoutingService is not ready"); |
| return false; |
| } |
| |
| if (!srService.shouldProgram(deviceId)) { |
| log.info("Skip reset not leading the phase recovery of {}", deviceId); |
| return false; |
| } |
| |
| Phase result = Optional.ofNullable(phasedRecoveryStore.remove(deviceId)) |
| .map(Versioned::value).orElse(null); |
| if (result != null) { |
| log.info("{} is reset", deviceId); |
| } |
| return result != null; |
| } |
| |
| @Override |
| public Map<DeviceId, Phase> getPhases() { |
| return phasedRecoveryStore.asJavaMap(); |
| } |
| |
| @Override |
| public Phase getPhase(DeviceId deviceId) { |
| return Optional.ofNullable(phasedRecoveryStore.get(deviceId)).map(Versioned::value).orElse(null); |
| } |
| |
| @Override |
| public Phase setPhase(DeviceId deviceId, Phase newPhase) { |
| if (this.srService == null) { |
| log.info("SegmentRoutingService is not ready"); |
| return null; |
| } |
| |
| if (!srService.shouldProgram(deviceId)) { |
| log.info("Skip setPhase not leading the phase recovery of {}", deviceId); |
| return null; |
| } |
| |
| return Optional.ofNullable(phasedRecoveryStore.compute(deviceId, (k, v) -> { |
| if (v == null && newPhase == Phase.PENDING) { |
| log.info("Initializing {}", deviceId); |
| return newPhase; |
| } else if (v == Phase.PENDING && newPhase == Phase.PAIR) { |
| srService.initHost(deviceId); |
| // RouteHandler init is intentionally skipped when phased recovery is on. |
| // Edge ports remain down in this phase. Therefore, no nexthop will be discovered on the given device. |
| // The flow on given device will be programmed later by hostHandler.processHostMovedEvent() |
| changePairPort(deviceId, true); |
| log.info("Transitioning {} from PENDING to PAIR", deviceId); |
| return newPhase; |
| } else if (v == Phase.PAIR && newPhase == Phase.INFRA) { |
| changeInfraPorts(deviceId, true); |
| srService.initRoute(deviceId); |
| log.info("Transitioning {} from PAIR to INFRA", deviceId); |
| monitorRoutingStability(deviceId, newPhase); |
| return newPhase; |
| } else if (v == Phase.INFRA && newPhase == Phase.EDGE) { |
| changeEdgePorts(deviceId, true); |
| log.info("Transitioning {} from INFRA to EDGE", deviceId); |
| return newPhase; |
| } else if (v == Phase.PENDING && newPhase == Phase.EDGE) { |
| // We want to enable ports in order - even for non paired devices |
| // newPhase is used to implement different behaviors in monitorRoutingStability |
| srService.initHost(deviceId); |
| changeInfraPorts(deviceId, true); |
| srService.initRoute(deviceId); |
| monitorRoutingStability(deviceId, newPhase); |
| log.info("Transitioning {} from PENDING to EDGE", deviceId); |
| return newPhase; |
| } else { |
| log.debug("Ignore illegal state transition on {} from {} to {}", deviceId, v, newPhase); |
| return v; |
| } |
| })).map(Versioned::value).orElse(null); |
| } |
| |
| // Use current phase to implement a different behavior for non-paired or infra |
| // devices. For these devices we are just missing to enable the edge ports, while |
| // for the paired leaves we will go the final phase. |
| private void monitorRoutingStability(DeviceId deviceId, Phase currentPhase) { |
| // restrict the state machine transitions |
| if (currentPhase != Phase.INFRA && currentPhase != Phase.EDGE) { |
| log.warn("Ignore monitorRoutingStability on {} for phase {}", deviceId, currentPhase); |
| return; |
| } |
| |
| CompletableFuture<Void> checkerFuture = new CompletableFuture<>(); |
| CompletableFuture<Void> timeoutFuture = |
| Tools.completeAfter(ROUTING_CHECKER_TIMEOUT, TimeUnit.SECONDS); |
| RoutingStabilityChecker checker = new RoutingStabilityChecker(checkerFuture); |
| |
| checkerFuture.runAfterEitherAsync(timeoutFuture, () -> { |
| if (checkerFuture.isDone()) { |
| log.info("Routing stable. Move {} to the next phase", deviceId); |
| } else { |
| log.info("Timeout reached. Move {} to the next phase", deviceId); |
| // Mark the future as completed to signify the termination of periodical checker |
| checkerFuture.complete(null); |
| } |
| // Paired devices. Else will be performed by infra devices |
| // or non paired devices that go directly from PENDING to EDGE |
| if (currentPhase == Phase.INFRA) { |
| setPhase(deviceId, Phase.EDGE); |
| } else { |
| changeEdgePorts(deviceId, true); |
| } |
| }); |
| |
| executor.schedule(checker, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS); |
| } |
| |
| @Override |
| public Set<PortNumber> changeAllPorts(DeviceId deviceId, boolean enabled) { |
| if (this.srService == null) { |
| log.warn("SegmentRoutingService is not ready. Unable to changeAllPorts({}) to {}", deviceId, enabled); |
| return Sets.newHashSet(); |
| } |
| Set<PortNumber> portsToBeEnabled = deviceAdminService.getPorts(deviceId) |
| .stream().map(Port::number).collect(Collectors.toSet()); |
| changePorts(deviceId, portsToBeEnabled, enabled); |
| return portsToBeEnabled; |
| } |
| |
| @Override |
| public Set<PortNumber> changePairPort(DeviceId deviceId, boolean enabled) { |
| if (this.srService == null) { |
| log.warn("SegmentRoutingService is not ready. Unable to changePairPort({}) to {}", deviceId, enabled); |
| return Sets.newHashSet(); |
| } |
| Set<PortNumber> portsToBeEnabled = this.srService.getPairLocalPort(deviceId) |
| .map(Sets::newHashSet).orElse(Sets.newHashSet()); |
| changePorts(deviceId, portsToBeEnabled, enabled); |
| return portsToBeEnabled; |
| } |
| |
| @Override |
| public Set<PortNumber> changeInfraPorts(DeviceId deviceId, boolean enabled) { |
| if (this.srService == null) { |
| log.warn("SegmentRoutingService is not ready. Unable to changeInfraPorts({}) to {}", deviceId, enabled); |
| return Sets.newHashSet(); |
| } |
| Set<PortNumber> portsToBeEnabled = this.srService.getInfraPorts(deviceId); |
| changePorts(deviceId, portsToBeEnabled, enabled); |
| return portsToBeEnabled; |
| } |
| |
| @Override |
| public Set<PortNumber> changeEdgePorts(DeviceId deviceId, boolean enabled) { |
| if (this.srService == null) { |
| log.warn("SegmentRoutingService is not ready. Unable to changeEdgePorts({}) to {}", deviceId, enabled); |
| return Sets.newHashSet(); |
| } |
| Set<PortNumber> portsToBeEnabled = this.srService.getEdgePorts(deviceId); |
| changePorts(deviceId, portsToBeEnabled, enabled); |
| return portsToBeEnabled; |
| } |
| |
| private void changePorts(DeviceId deviceId, Set<PortNumber> portNumbers, boolean enabled) { |
| if (!portNumbers.isEmpty()) { |
| log.info("{} {} on {}", enabled ? "Enabled" : "Disabled", portNumbers, deviceId); |
| portNumbers.forEach(portNumber -> |
| deviceAdminService.changePortState(deviceId, portNumber, enabled)); |
| } |
| } |
| |
| private class PortChecker implements Runnable { |
| int retries; |
| DeviceId deviceId; |
| Phase nextPhase; |
| |
| PortChecker(DeviceId deviceId, int retries, Phase nextPhase) { |
| this.deviceId = deviceId; |
| this.retries = retries; |
| this.nextPhase = nextPhase; |
| } |
| |
| @Override |
| public void run() { |
| retries -= 1; |
| if (retries < 0) { |
| log.warn("PORT_STATS timeout. Unable to initialize {}", deviceId); |
| return; |
| } |
| |
| if (!deviceAdminService.getPorts(deviceId).isEmpty()) { |
| log.info("{} reported PORT_STATS, moving to {}", deviceId, nextPhase); |
| setPhase(deviceId, nextPhase); |
| return; |
| } |
| log.info("{} still waiting for PORT_STATS", deviceId); |
| executor.schedule(this, PORT_CHECKER_INTERVAL, TimeUnit.SECONDS); |
| } |
| } |
| |
| private class RoutingStabilityChecker implements Runnable { |
| private final CompletableFuture<Void> future; |
| |
| RoutingStabilityChecker(CompletableFuture<Void> future) { |
| this.future = future; |
| } |
| |
| @Override |
| public void run() { |
| // Do not continue if the future has been completed |
| if (future.isDone()) { |
| log.trace("RouteStabilityChecker is done. Stop checking"); |
| return; |
| } |
| |
| if (srService.isRoutingStable()) { |
| log.trace("Routing is stable"); |
| future.complete(null); |
| } else { |
| log.trace("Routing is not yet stable"); |
| executor.schedule(this, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS); |
| } |
| } |
| } |
| |
| // FIXME We should handle cluster events to resume the phase recovery |
| } |