blob: 9544b578f66185285478ae99b3e99507b58787b4 [file] [log] [blame]
Charles Chan12a8a842020-02-14 13:23:57 -08001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.segmentrouting.phasedrecovery.impl;
18
19import com.google.common.collect.Sets;
20import org.onlab.util.KryoNamespace;
21import org.onlab.util.Tools;
22import org.onosproject.cfg.ComponentConfigService;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
Charles Chan12a8a842020-02-14 13:23:57 -080025import org.onosproject.net.DeviceId;
26import org.onosproject.net.Port;
27import org.onosproject.net.PortNumber;
28import org.onosproject.net.device.DeviceAdminService;
29import org.onosproject.segmentrouting.SegmentRoutingService;
30import org.onosproject.segmentrouting.phasedrecovery.api.Phase;
31import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
32import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.ConsistentMap;
34import org.onosproject.store.service.Serializer;
35import org.onosproject.store.service.StorageService;
36import org.onosproject.store.service.Versioned;
37import org.osgi.service.component.ComponentContext;
38import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Modified;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
44import org.slf4j.Logger;
45import org.slf4j.LoggerFactory;
46
47import java.util.Dictionary;
48import java.util.Map;
49import java.util.Optional;
50import java.util.Set;
51import java.util.concurrent.CompletableFuture;
52import java.util.concurrent.Executors;
53import java.util.concurrent.ScheduledExecutorService;
54import java.util.concurrent.TimeUnit;
55import java.util.stream.Collectors;
56
57import static org.onlab.util.Tools.groupedThreads;
58import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PHASED_RECOVERY_DEFAULT;
59import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PROP_PHASED_RECOVERY;
60
61@Component(
62 immediate = true,
63 service = PhasedRecoveryService.class,
64 property = {
65 PROP_PHASED_RECOVERY + ":Boolean=" + PHASED_RECOVERY_DEFAULT
66 }
67)
68public class PhasedRecoveryManager implements PhasedRecoveryService {
69 private static final Logger log = LoggerFactory.getLogger(PhasedRecoveryManager.class);
70 private static final String APP_NAME = "org.onosproject.phasedrecovery";
71
72 // TODO Make these configurable via Component Config
73 // Amount of time delayed to wait for port description (in second)
74 private static final int PORT_CHECKER_INTERVAL = 1;
75 // Max number of retry for port checker
pierventreb3fe7922021-08-04 23:09:40 +020076 private static final int PORT_CHECKER_RETRIES = 15;
Charles Chan12a8a842020-02-14 13:23:57 -080077 // RoutingStableChecker interval (in second)
78 private static final int ROUTING_CHECKER_DELAY = 3;
79 // RoutingStableChecker timeout (in second)
80 private static final int ROUTING_CHECKER_TIMEOUT = 15;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 private CoreService coreService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 private ComponentConfigService compCfgService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 private DeviceAdminService deviceAdminService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Charles Chan12a8a842020-02-14 13:23:57 -080092 private StorageService storageService;
93
94 @Reference(cardinality = ReferenceCardinality.OPTIONAL)
95 volatile SegmentRoutingService srService;
96
97 /** Enabling phased recovery. */
98 boolean phasedRecovery = PHASED_RECOVERY_DEFAULT;
99
100 private ApplicationId appId;
101 private ConsistentMap<DeviceId, Phase> phasedRecoveryStore;
102 private ScheduledExecutorService executor = Executors.newScheduledThreadPool(
103 Runtime.getRuntime().availableProcessors(), groupedThreads("onos/sr/pr", "executor"));
104
105 @Activate
106 protected void activate(ComponentContext context) {
107 appId = coreService.registerApplication(APP_NAME);
108
109 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
110 .register(KryoNamespaces.API)
111 .register(Phase.class);
112 phasedRecoveryStore = storageService.<DeviceId, Phase>consistentMapBuilder()
113 .withName("onos-sr-phasedrecovery")
114 .withRelaxedReadConsistency()
115 .withSerializer(Serializer.using(serializer.build()))
116 .build();
117
118 compCfgService.registerProperties(getClass());
119 modified(context);
120 log.info("Started");
121 }
122
123 @Deactivate
124 protected void deactivate() {
125 phasedRecoveryStore.destroy();
126 compCfgService.unregisterProperties(getClass(), false);
127 log.info("Stopped");
128 }
129
130 @Modified
131 protected void modified(ComponentContext context) {
132 Dictionary<?, ?> properties = context.getProperties();
133 if (properties == null) {
134 return;
135 }
136
137 String strPhasedRecovery = Tools.get(properties, PROP_PHASED_RECOVERY);
138 boolean expectPhasedRecovery = Boolean.parseBoolean(strPhasedRecovery);
139 if (expectPhasedRecovery != phasedRecovery) {
140 phasedRecovery = expectPhasedRecovery;
141 log.info("{} phased recovery", phasedRecovery ? "Enabling" : "Disabling");
142 }
143 }
144
145 @Override
146 public boolean isEnabled() {
147 return phasedRecovery;
148 }
149
150 @Override
151 public boolean init(DeviceId deviceId) {
152 if (this.srService == null) {
153 log.info("SegmentRoutingService is not ready");
154 return false;
155 }
pierventre37dcf4c2021-09-16 18:43:06 +0200156
157 if (!srService.shouldProgram(deviceId)) {
158 log.info("Skip init not leading the phase recovery of {}", deviceId);
Charles Chan12a8a842020-02-14 13:23:57 -0800159 return false;
160 }
161
162 Phase phase = Optional.ofNullable(phasedRecoveryStore.putIfAbsent(deviceId, Phase.PENDING))
163 .map(Versioned::value).orElse(null);
164
165 if (phase != null) {
166 log.info("{} has been initialized already. Skipping.", deviceId);
167 return false;
168 } else {
pierventreb3fe7922021-08-04 23:09:40 +0200169 if (phasedRecovery) {
170 // Even in case of EDGE as next phase, it is better to drive the transition
171 // to the next phase through the port checker. If the device is reported by
172 // a non master instance the first time, ports wont be available until the next
173 // port reconciliation.
174 Phase nextPhase = this.srService.getPairDeviceId(deviceId).isPresent() ?
175 Phase.PAIR : Phase.EDGE;
Charles Chan12a8a842020-02-14 13:23:57 -0800176 // Wait for the PORT_STAT before entering next phase.
177 // Note: Unlikely, when the device init fails due to PORT_STATS timeout,
178 // it requires operator to manually move the device to the next phase by CLI command.
pierventreb3fe7922021-08-04 23:09:40 +0200179 executor.schedule(new PortChecker(deviceId, PORT_CHECKER_RETRIES, nextPhase),
Charles Chan12a8a842020-02-14 13:23:57 -0800180 PORT_CHECKER_INTERVAL, TimeUnit.SECONDS);
181 } else {
182 // We assume that all ports will be reported as enabled on devices that don't require phased recovery
183 setPhase(deviceId, Phase.EDGE);
184 }
185 return true;
186 }
187 }
188
189 @Override
190 public boolean reset(DeviceId deviceId) {
191 if (this.srService == null) {
192 log.info("SegmentRoutingService is not ready");
193 return false;
194 }
pierventre37dcf4c2021-09-16 18:43:06 +0200195
196 if (!srService.shouldProgram(deviceId)) {
197 log.info("Skip reset not leading the phase recovery of {}", deviceId);
198 return false;
199 }
Charles Chan12a8a842020-02-14 13:23:57 -0800200
201 Phase result = Optional.ofNullable(phasedRecoveryStore.remove(deviceId))
202 .map(Versioned::value).orElse(null);
203 if (result != null) {
204 log.info("{} is reset", deviceId);
205 }
206 return result != null;
207 }
208
209 @Override
210 public Map<DeviceId, Phase> getPhases() {
211 return phasedRecoveryStore.asJavaMap();
212 }
213
214 @Override
215 public Phase getPhase(DeviceId deviceId) {
216 return Optional.ofNullable(phasedRecoveryStore.get(deviceId)).map(Versioned::value).orElse(null);
217 }
218
219 @Override
220 public Phase setPhase(DeviceId deviceId, Phase newPhase) {
221 if (this.srService == null) {
222 log.info("SegmentRoutingService is not ready");
223 return null;
224 }
pierventre37dcf4c2021-09-16 18:43:06 +0200225
226 if (!srService.shouldProgram(deviceId)) {
227 log.info("Skip setPhase not leading the phase recovery of {}", deviceId);
Charles Chan12a8a842020-02-14 13:23:57 -0800228 return null;
229 }
230
231 return Optional.ofNullable(phasedRecoveryStore.compute(deviceId, (k, v) -> {
232 if (v == null && newPhase == Phase.PENDING) {
233 log.info("Initializing {}", deviceId);
234 return newPhase;
235 } else if (v == Phase.PENDING && newPhase == Phase.PAIR) {
236 srService.initHost(deviceId);
237 // RouteHandler init is intentionally skipped when phased recovery is on.
238 // Edge ports remain down in this phase. Therefore, no nexthop will be discovered on the given device.
239 // The flow on given device will be programmed later by hostHandler.processHostMovedEvent()
240 changePairPort(deviceId, true);
241 log.info("Transitioning {} from PENDING to PAIR", deviceId);
242 return newPhase;
243 } else if (v == Phase.PAIR && newPhase == Phase.INFRA) {
244 changeInfraPorts(deviceId, true);
245 srService.initRoute(deviceId);
246 log.info("Transitioning {} from PAIR to INFRA", deviceId);
pierventreb3fe7922021-08-04 23:09:40 +0200247 monitorRoutingStability(deviceId, newPhase);
Charles Chan12a8a842020-02-14 13:23:57 -0800248 return newPhase;
249 } else if (v == Phase.INFRA && newPhase == Phase.EDGE) {
250 changeEdgePorts(deviceId, true);
251 log.info("Transitioning {} from INFRA to EDGE", deviceId);
252 return newPhase;
253 } else if (v == Phase.PENDING && newPhase == Phase.EDGE) {
pierventreb3fe7922021-08-04 23:09:40 +0200254 // We want to enable ports in order - even for non paired devices
255 // newPhase is used to implement different behaviors in monitorRoutingStability
Charles Chan12a8a842020-02-14 13:23:57 -0800256 srService.initHost(deviceId);
pierventreb3fe7922021-08-04 23:09:40 +0200257 changeInfraPorts(deviceId, true);
Charles Chan12a8a842020-02-14 13:23:57 -0800258 srService.initRoute(deviceId);
pierventreb3fe7922021-08-04 23:09:40 +0200259 monitorRoutingStability(deviceId, newPhase);
Charles Chan12a8a842020-02-14 13:23:57 -0800260 log.info("Transitioning {} from PENDING to EDGE", deviceId);
261 return newPhase;
262 } else {
263 log.debug("Ignore illegal state transition on {} from {} to {}", deviceId, v, newPhase);
264 return v;
265 }
266 })).map(Versioned::value).orElse(null);
267 }
268
pierventreb3fe7922021-08-04 23:09:40 +0200269 // Use current phase to implement a different behavior for non-paired or infra
270 // devices. For these devices we are just missing to enable the edge ports, while
271 // for the paired leaves we will go the final phase.
272 private void monitorRoutingStability(DeviceId deviceId, Phase currentPhase) {
273 // restrict the state machine transitions
274 if (currentPhase != Phase.INFRA && currentPhase != Phase.EDGE) {
275 log.warn("Ignore monitorRoutingStability on {} for phase {}", deviceId, currentPhase);
276 return;
277 }
278
Charles Chan12a8a842020-02-14 13:23:57 -0800279 CompletableFuture<Void> checkerFuture = new CompletableFuture<>();
280 CompletableFuture<Void> timeoutFuture =
281 Tools.completeAfter(ROUTING_CHECKER_TIMEOUT, TimeUnit.SECONDS);
282 RoutingStabilityChecker checker = new RoutingStabilityChecker(checkerFuture);
283
284 checkerFuture.runAfterEitherAsync(timeoutFuture, () -> {
285 if (checkerFuture.isDone()) {
286 log.info("Routing stable. Move {} to the next phase", deviceId);
287 } else {
288 log.info("Timeout reached. Move {} to the next phase", deviceId);
289 // Mark the future as completed to signify the termination of periodical checker
290 checkerFuture.complete(null);
291 }
pierventreb3fe7922021-08-04 23:09:40 +0200292 // Paired devices. Else will be performed by infra devices
293 // or non paired devices that go directly from PENDING to EDGE
294 if (currentPhase == Phase.INFRA) {
295 setPhase(deviceId, Phase.EDGE);
296 } else {
297 changeEdgePorts(deviceId, true);
298 }
Charles Chan12a8a842020-02-14 13:23:57 -0800299 });
300
301 executor.schedule(checker, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS);
302 }
303
304 @Override
305 public Set<PortNumber> changeAllPorts(DeviceId deviceId, boolean enabled) {
306 if (this.srService == null) {
307 log.warn("SegmentRoutingService is not ready. Unable to changeAllPorts({}) to {}", deviceId, enabled);
308 return Sets.newHashSet();
309 }
310 Set<PortNumber> portsToBeEnabled = deviceAdminService.getPorts(deviceId)
311 .stream().map(Port::number).collect(Collectors.toSet());
312 changePorts(deviceId, portsToBeEnabled, enabled);
313 return portsToBeEnabled;
314 }
315
316 @Override
317 public Set<PortNumber> changePairPort(DeviceId deviceId, boolean enabled) {
318 if (this.srService == null) {
319 log.warn("SegmentRoutingService is not ready. Unable to changePairPort({}) to {}", deviceId, enabled);
320 return Sets.newHashSet();
321 }
322 Set<PortNumber> portsToBeEnabled = this.srService.getPairLocalPort(deviceId)
323 .map(Sets::newHashSet).orElse(Sets.newHashSet());
324 changePorts(deviceId, portsToBeEnabled, enabled);
325 return portsToBeEnabled;
326 }
327
328 @Override
329 public Set<PortNumber> changeInfraPorts(DeviceId deviceId, boolean enabled) {
330 if (this.srService == null) {
331 log.warn("SegmentRoutingService is not ready. Unable to changeInfraPorts({}) to {}", deviceId, enabled);
332 return Sets.newHashSet();
333 }
334 Set<PortNumber> portsToBeEnabled = this.srService.getInfraPorts(deviceId);
335 changePorts(deviceId, portsToBeEnabled, enabled);
336 return portsToBeEnabled;
337 }
338
339 @Override
340 public Set<PortNumber> changeEdgePorts(DeviceId deviceId, boolean enabled) {
341 if (this.srService == null) {
342 log.warn("SegmentRoutingService is not ready. Unable to changeEdgePorts({}) to {}", deviceId, enabled);
343 return Sets.newHashSet();
344 }
345 Set<PortNumber> portsToBeEnabled = this.srService.getEdgePorts(deviceId);
346 changePorts(deviceId, portsToBeEnabled, enabled);
347 return portsToBeEnabled;
348 }
349
350 private void changePorts(DeviceId deviceId, Set<PortNumber> portNumbers, boolean enabled) {
pierventreb3fe7922021-08-04 23:09:40 +0200351 if (!portNumbers.isEmpty()) {
352 log.info("{} {} on {}", enabled ? "Enabled" : "Disabled", portNumbers, deviceId);
353 portNumbers.forEach(portNumber ->
354 deviceAdminService.changePortState(deviceId, portNumber, enabled));
355 }
Charles Chan12a8a842020-02-14 13:23:57 -0800356 }
357
358 private class PortChecker implements Runnable {
359 int retries;
360 DeviceId deviceId;
pierventreb3fe7922021-08-04 23:09:40 +0200361 Phase nextPhase;
Charles Chan12a8a842020-02-14 13:23:57 -0800362
pierventreb3fe7922021-08-04 23:09:40 +0200363 PortChecker(DeviceId deviceId, int retries, Phase nextPhase) {
Charles Chan12a8a842020-02-14 13:23:57 -0800364 this.deviceId = deviceId;
365 this.retries = retries;
pierventreb3fe7922021-08-04 23:09:40 +0200366 this.nextPhase = nextPhase;
Charles Chan12a8a842020-02-14 13:23:57 -0800367 }
368
369 @Override
370 public void run() {
371 retries -= 1;
372 if (retries < 0) {
373 log.warn("PORT_STATS timeout. Unable to initialize {}", deviceId);
374 return;
375 }
376
377 if (!deviceAdminService.getPorts(deviceId).isEmpty()) {
pierventreb3fe7922021-08-04 23:09:40 +0200378 log.info("{} reported PORT_STATS, moving to {}", deviceId, nextPhase);
379 setPhase(deviceId, nextPhase);
380 return;
Charles Chan12a8a842020-02-14 13:23:57 -0800381 }
382 log.info("{} still waiting for PORT_STATS", deviceId);
383 executor.schedule(this, PORT_CHECKER_INTERVAL, TimeUnit.SECONDS);
384 }
385 }
386
387 private class RoutingStabilityChecker implements Runnable {
388 private final CompletableFuture<Void> future;
389
390 RoutingStabilityChecker(CompletableFuture<Void> future) {
391 this.future = future;
392 }
393
394 @Override
395 public void run() {
396 // Do not continue if the future has been completed
397 if (future.isDone()) {
398 log.trace("RouteStabilityChecker is done. Stop checking");
399 return;
400 }
401
402 if (srService.isRoutingStable()) {
403 log.trace("Routing is stable");
404 future.complete(null);
405 } else {
406 log.trace("Routing is not yet stable");
407 executor.schedule(this, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS);
408 }
409 }
410 }
pierventre37dcf4c2021-09-16 18:43:06 +0200411
412 // FIXME We should handle cluster events to resume the phase recovery
Charles Chan12a8a842020-02-14 13:23:57 -0800413}