blob: 8c62a2f9870ce92bfac9b1bfb19f7cfe555e294c [file] [log] [blame]
Charles Chan12a8a842020-02-14 13:23:57 -08001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.segmentrouting.phasedrecovery.impl;
18
19import com.google.common.collect.Sets;
20import org.onlab.util.KryoNamespace;
21import org.onlab.util.Tools;
22import org.onosproject.cfg.ComponentConfigService;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.mastership.MastershipService;
26import org.onosproject.net.DeviceId;
27import org.onosproject.net.Port;
28import org.onosproject.net.PortNumber;
29import org.onosproject.net.device.DeviceAdminService;
30import org.onosproject.segmentrouting.SegmentRoutingService;
31import org.onosproject.segmentrouting.phasedrecovery.api.Phase;
32import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
33import org.onosproject.store.serializers.KryoNamespaces;
34import org.onosproject.store.service.ConsistentMap;
35import org.onosproject.store.service.Serializer;
36import org.onosproject.store.service.StorageService;
37import org.onosproject.store.service.Versioned;
38import org.osgi.service.component.ComponentContext;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Modified;
43import org.osgi.service.component.annotations.Reference;
44import org.osgi.service.component.annotations.ReferenceCardinality;
45import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
47
48import java.util.Dictionary;
49import java.util.Map;
50import java.util.Optional;
51import java.util.Set;
52import java.util.concurrent.CompletableFuture;
53import java.util.concurrent.Executors;
54import java.util.concurrent.ScheduledExecutorService;
55import java.util.concurrent.TimeUnit;
56import java.util.stream.Collectors;
57
58import static org.onlab.util.Tools.groupedThreads;
59import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PHASED_RECOVERY_DEFAULT;
60import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PROP_PHASED_RECOVERY;
61
62@Component(
63 immediate = true,
64 service = PhasedRecoveryService.class,
65 property = {
66 PROP_PHASED_RECOVERY + ":Boolean=" + PHASED_RECOVERY_DEFAULT
67 }
68)
69public class PhasedRecoveryManager implements PhasedRecoveryService {
70 private static final Logger log = LoggerFactory.getLogger(PhasedRecoveryManager.class);
71 private static final String APP_NAME = "org.onosproject.phasedrecovery";
72
73 // TODO Make these configurable via Component Config
74 // Amount of time delayed to wait for port description (in second)
75 private static final int PORT_CHECKER_INTERVAL = 1;
76 // Max number of retry for port checker
pierventreb3fe7922021-08-04 23:09:40 +020077 private static final int PORT_CHECKER_RETRIES = 15;
Charles Chan12a8a842020-02-14 13:23:57 -080078 // RoutingStableChecker interval (in second)
79 private static final int ROUTING_CHECKER_DELAY = 3;
80 // RoutingStableChecker timeout (in second)
81 private static final int ROUTING_CHECKER_TIMEOUT = 15;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 private CoreService coreService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 private ComponentConfigService compCfgService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 private DeviceAdminService deviceAdminService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 private MastershipService mastershipService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 private StorageService storageService;
97
98 @Reference(cardinality = ReferenceCardinality.OPTIONAL)
99 volatile SegmentRoutingService srService;
100
101 /** Enabling phased recovery. */
102 boolean phasedRecovery = PHASED_RECOVERY_DEFAULT;
103
104 private ApplicationId appId;
105 private ConsistentMap<DeviceId, Phase> phasedRecoveryStore;
106 private ScheduledExecutorService executor = Executors.newScheduledThreadPool(
107 Runtime.getRuntime().availableProcessors(), groupedThreads("onos/sr/pr", "executor"));
108
109 @Activate
110 protected void activate(ComponentContext context) {
111 appId = coreService.registerApplication(APP_NAME);
112
113 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
114 .register(KryoNamespaces.API)
115 .register(Phase.class);
116 phasedRecoveryStore = storageService.<DeviceId, Phase>consistentMapBuilder()
117 .withName("onos-sr-phasedrecovery")
118 .withRelaxedReadConsistency()
119 .withSerializer(Serializer.using(serializer.build()))
120 .build();
121
122 compCfgService.registerProperties(getClass());
123 modified(context);
124 log.info("Started");
125 }
126
127 @Deactivate
128 protected void deactivate() {
129 phasedRecoveryStore.destroy();
130 compCfgService.unregisterProperties(getClass(), false);
131 log.info("Stopped");
132 }
133
134 @Modified
135 protected void modified(ComponentContext context) {
136 Dictionary<?, ?> properties = context.getProperties();
137 if (properties == null) {
138 return;
139 }
140
141 String strPhasedRecovery = Tools.get(properties, PROP_PHASED_RECOVERY);
142 boolean expectPhasedRecovery = Boolean.parseBoolean(strPhasedRecovery);
143 if (expectPhasedRecovery != phasedRecovery) {
144 phasedRecovery = expectPhasedRecovery;
145 log.info("{} phased recovery", phasedRecovery ? "Enabling" : "Disabling");
146 }
147 }
148
149 @Override
150 public boolean isEnabled() {
151 return phasedRecovery;
152 }
153
154 @Override
155 public boolean init(DeviceId deviceId) {
156 if (this.srService == null) {
157 log.info("SegmentRoutingService is not ready");
158 return false;
159 }
160 if (!mastershipService.isLocalMaster(deviceId)) {
161 log.info("Not master of {}", deviceId);
162 return false;
163 }
164
165 Phase phase = Optional.ofNullable(phasedRecoveryStore.putIfAbsent(deviceId, Phase.PENDING))
166 .map(Versioned::value).orElse(null);
167
168 if (phase != null) {
169 log.info("{} has been initialized already. Skipping.", deviceId);
170 return false;
171 } else {
pierventreb3fe7922021-08-04 23:09:40 +0200172 if (phasedRecovery) {
173 // Even in case of EDGE as next phase, it is better to drive the transition
174 // to the next phase through the port checker. If the device is reported by
175 // a non master instance the first time, ports wont be available until the next
176 // port reconciliation.
177 Phase nextPhase = this.srService.getPairDeviceId(deviceId).isPresent() ?
178 Phase.PAIR : Phase.EDGE;
Charles Chan12a8a842020-02-14 13:23:57 -0800179 // Wait for the PORT_STAT before entering next phase.
180 // Note: Unlikely, when the device init fails due to PORT_STATS timeout,
181 // it requires operator to manually move the device to the next phase by CLI command.
pierventreb3fe7922021-08-04 23:09:40 +0200182 executor.schedule(new PortChecker(deviceId, PORT_CHECKER_RETRIES, nextPhase),
Charles Chan12a8a842020-02-14 13:23:57 -0800183 PORT_CHECKER_INTERVAL, TimeUnit.SECONDS);
184 } else {
185 // We assume that all ports will be reported as enabled on devices that don't require phased recovery
186 setPhase(deviceId, Phase.EDGE);
187 }
188 return true;
189 }
190 }
191
192 @Override
193 public boolean reset(DeviceId deviceId) {
194 if (this.srService == null) {
195 log.info("SegmentRoutingService is not ready");
196 return false;
197 }
198 // FIXME Skip mastership checking since master will not be available when a device goes offline
199 // Improve this when persistent mastership is introduced
200
201 Phase result = Optional.ofNullable(phasedRecoveryStore.remove(deviceId))
202 .map(Versioned::value).orElse(null);
203 if (result != null) {
204 log.info("{} is reset", deviceId);
205 }
206 return result != null;
207 }
208
209 @Override
210 public Map<DeviceId, Phase> getPhases() {
211 return phasedRecoveryStore.asJavaMap();
212 }
213
214 @Override
215 public Phase getPhase(DeviceId deviceId) {
216 return Optional.ofNullable(phasedRecoveryStore.get(deviceId)).map(Versioned::value).orElse(null);
217 }
218
219 @Override
220 public Phase setPhase(DeviceId deviceId, Phase newPhase) {
221 if (this.srService == null) {
222 log.info("SegmentRoutingService is not ready");
223 return null;
224 }
225 if (!mastershipService.isLocalMaster(deviceId)) {
226 log.info("Not master of {}", deviceId);
227 return null;
228 }
229
230 return Optional.ofNullable(phasedRecoveryStore.compute(deviceId, (k, v) -> {
231 if (v == null && newPhase == Phase.PENDING) {
232 log.info("Initializing {}", deviceId);
233 return newPhase;
234 } else if (v == Phase.PENDING && newPhase == Phase.PAIR) {
235 srService.initHost(deviceId);
236 // RouteHandler init is intentionally skipped when phased recovery is on.
237 // Edge ports remain down in this phase. Therefore, no nexthop will be discovered on the given device.
238 // The flow on given device will be programmed later by hostHandler.processHostMovedEvent()
239 changePairPort(deviceId, true);
240 log.info("Transitioning {} from PENDING to PAIR", deviceId);
241 return newPhase;
242 } else if (v == Phase.PAIR && newPhase == Phase.INFRA) {
243 changeInfraPorts(deviceId, true);
244 srService.initRoute(deviceId);
245 log.info("Transitioning {} from PAIR to INFRA", deviceId);
pierventreb3fe7922021-08-04 23:09:40 +0200246 monitorRoutingStability(deviceId, newPhase);
Charles Chan12a8a842020-02-14 13:23:57 -0800247 return newPhase;
248 } else if (v == Phase.INFRA && newPhase == Phase.EDGE) {
249 changeEdgePorts(deviceId, true);
250 log.info("Transitioning {} from INFRA to EDGE", deviceId);
251 return newPhase;
252 } else if (v == Phase.PENDING && newPhase == Phase.EDGE) {
pierventreb3fe7922021-08-04 23:09:40 +0200253 // We want to enable ports in order - even for non paired devices
254 // newPhase is used to implement different behaviors in monitorRoutingStability
Charles Chan12a8a842020-02-14 13:23:57 -0800255 srService.initHost(deviceId);
pierventreb3fe7922021-08-04 23:09:40 +0200256 changeInfraPorts(deviceId, true);
Charles Chan12a8a842020-02-14 13:23:57 -0800257 srService.initRoute(deviceId);
pierventreb3fe7922021-08-04 23:09:40 +0200258 monitorRoutingStability(deviceId, newPhase);
Charles Chan12a8a842020-02-14 13:23:57 -0800259 log.info("Transitioning {} from PENDING to EDGE", deviceId);
260 return newPhase;
261 } else {
262 log.debug("Ignore illegal state transition on {} from {} to {}", deviceId, v, newPhase);
263 return v;
264 }
265 })).map(Versioned::value).orElse(null);
266 }
267
pierventreb3fe7922021-08-04 23:09:40 +0200268 // Use current phase to implement a different behavior for non-paired or infra
269 // devices. For these devices we are just missing to enable the edge ports, while
270 // for the paired leaves we will go the final phase.
271 private void monitorRoutingStability(DeviceId deviceId, Phase currentPhase) {
272 // restrict the state machine transitions
273 if (currentPhase != Phase.INFRA && currentPhase != Phase.EDGE) {
274 log.warn("Ignore monitorRoutingStability on {} for phase {}", deviceId, currentPhase);
275 return;
276 }
277
Charles Chan12a8a842020-02-14 13:23:57 -0800278 CompletableFuture<Void> checkerFuture = new CompletableFuture<>();
279 CompletableFuture<Void> timeoutFuture =
280 Tools.completeAfter(ROUTING_CHECKER_TIMEOUT, TimeUnit.SECONDS);
281 RoutingStabilityChecker checker = new RoutingStabilityChecker(checkerFuture);
282
283 checkerFuture.runAfterEitherAsync(timeoutFuture, () -> {
284 if (checkerFuture.isDone()) {
285 log.info("Routing stable. Move {} to the next phase", deviceId);
286 } else {
287 log.info("Timeout reached. Move {} to the next phase", deviceId);
288 // Mark the future as completed to signify the termination of periodical checker
289 checkerFuture.complete(null);
290 }
pierventreb3fe7922021-08-04 23:09:40 +0200291 // Paired devices. Else will be performed by infra devices
292 // or non paired devices that go directly from PENDING to EDGE
293 if (currentPhase == Phase.INFRA) {
294 setPhase(deviceId, Phase.EDGE);
295 } else {
296 changeEdgePorts(deviceId, true);
297 }
Charles Chan12a8a842020-02-14 13:23:57 -0800298 });
299
300 executor.schedule(checker, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS);
301 }
302
303 @Override
304 public Set<PortNumber> changeAllPorts(DeviceId deviceId, boolean enabled) {
305 if (this.srService == null) {
306 log.warn("SegmentRoutingService is not ready. Unable to changeAllPorts({}) to {}", deviceId, enabled);
307 return Sets.newHashSet();
308 }
309 Set<PortNumber> portsToBeEnabled = deviceAdminService.getPorts(deviceId)
310 .stream().map(Port::number).collect(Collectors.toSet());
311 changePorts(deviceId, portsToBeEnabled, enabled);
312 return portsToBeEnabled;
313 }
314
315 @Override
316 public Set<PortNumber> changePairPort(DeviceId deviceId, boolean enabled) {
317 if (this.srService == null) {
318 log.warn("SegmentRoutingService is not ready. Unable to changePairPort({}) to {}", deviceId, enabled);
319 return Sets.newHashSet();
320 }
321 Set<PortNumber> portsToBeEnabled = this.srService.getPairLocalPort(deviceId)
322 .map(Sets::newHashSet).orElse(Sets.newHashSet());
323 changePorts(deviceId, portsToBeEnabled, enabled);
324 return portsToBeEnabled;
325 }
326
327 @Override
328 public Set<PortNumber> changeInfraPorts(DeviceId deviceId, boolean enabled) {
329 if (this.srService == null) {
330 log.warn("SegmentRoutingService is not ready. Unable to changeInfraPorts({}) to {}", deviceId, enabled);
331 return Sets.newHashSet();
332 }
333 Set<PortNumber> portsToBeEnabled = this.srService.getInfraPorts(deviceId);
334 changePorts(deviceId, portsToBeEnabled, enabled);
335 return portsToBeEnabled;
336 }
337
338 @Override
339 public Set<PortNumber> changeEdgePorts(DeviceId deviceId, boolean enabled) {
340 if (this.srService == null) {
341 log.warn("SegmentRoutingService is not ready. Unable to changeEdgePorts({}) to {}", deviceId, enabled);
342 return Sets.newHashSet();
343 }
344 Set<PortNumber> portsToBeEnabled = this.srService.getEdgePorts(deviceId);
345 changePorts(deviceId, portsToBeEnabled, enabled);
346 return portsToBeEnabled;
347 }
348
349 private void changePorts(DeviceId deviceId, Set<PortNumber> portNumbers, boolean enabled) {
pierventreb3fe7922021-08-04 23:09:40 +0200350 if (!portNumbers.isEmpty()) {
351 log.info("{} {} on {}", enabled ? "Enabled" : "Disabled", portNumbers, deviceId);
352 portNumbers.forEach(portNumber ->
353 deviceAdminService.changePortState(deviceId, portNumber, enabled));
354 }
Charles Chan12a8a842020-02-14 13:23:57 -0800355 }
356
357 private class PortChecker implements Runnable {
358 int retries;
359 DeviceId deviceId;
pierventreb3fe7922021-08-04 23:09:40 +0200360 Phase nextPhase;
Charles Chan12a8a842020-02-14 13:23:57 -0800361
pierventreb3fe7922021-08-04 23:09:40 +0200362 PortChecker(DeviceId deviceId, int retries, Phase nextPhase) {
Charles Chan12a8a842020-02-14 13:23:57 -0800363 this.deviceId = deviceId;
364 this.retries = retries;
pierventreb3fe7922021-08-04 23:09:40 +0200365 this.nextPhase = nextPhase;
Charles Chan12a8a842020-02-14 13:23:57 -0800366 }
367
368 @Override
369 public void run() {
370 retries -= 1;
371 if (retries < 0) {
372 log.warn("PORT_STATS timeout. Unable to initialize {}", deviceId);
373 return;
374 }
375
376 if (!deviceAdminService.getPorts(deviceId).isEmpty()) {
pierventreb3fe7922021-08-04 23:09:40 +0200377 log.info("{} reported PORT_STATS, moving to {}", deviceId, nextPhase);
378 setPhase(deviceId, nextPhase);
379 return;
Charles Chan12a8a842020-02-14 13:23:57 -0800380 }
381 log.info("{} still waiting for PORT_STATS", deviceId);
382 executor.schedule(this, PORT_CHECKER_INTERVAL, TimeUnit.SECONDS);
383 }
384 }
385
386 private class RoutingStabilityChecker implements Runnable {
387 private final CompletableFuture<Void> future;
388
389 RoutingStabilityChecker(CompletableFuture<Void> future) {
390 this.future = future;
391 }
392
393 @Override
394 public void run() {
395 // Do not continue if the future has been completed
396 if (future.isDone()) {
397 log.trace("RouteStabilityChecker is done. Stop checking");
398 return;
399 }
400
401 if (srService.isRoutingStable()) {
402 log.trace("Routing is stable");
403 future.complete(null);
404 } else {
405 log.trace("Routing is not yet stable");
406 executor.schedule(this, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS);
407 }
408 }
409 }
410}