blob: 5e250158ea84990ab9ab809179b362520deac0fe [file] [log] [blame]
Charles Chan9797ebb2020-02-14 13:23:57 -08001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.segmentrouting.phasedrecovery.impl;
18
19import com.google.common.collect.Sets;
20import org.onlab.util.KryoNamespace;
21import org.onlab.util.Tools;
22import org.onosproject.cfg.ComponentConfigService;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.mastership.MastershipService;
26import org.onosproject.net.DeviceId;
27import org.onosproject.net.Port;
28import org.onosproject.net.PortNumber;
29import org.onosproject.net.device.DeviceAdminService;
30import org.onosproject.segmentrouting.SegmentRoutingService;
31import org.onosproject.segmentrouting.phasedrecovery.api.Phase;
32import org.onosproject.segmentrouting.phasedrecovery.api.PhasedRecoveryService;
33import org.onosproject.store.serializers.KryoNamespaces;
34import org.onosproject.store.service.ConsistentMap;
35import org.onosproject.store.service.Serializer;
36import org.onosproject.store.service.StorageService;
37import org.onosproject.store.service.Versioned;
38import org.osgi.service.component.ComponentContext;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Modified;
43import org.osgi.service.component.annotations.Reference;
44import org.osgi.service.component.annotations.ReferenceCardinality;
45import org.slf4j.Logger;
46import org.slf4j.LoggerFactory;
47
48import java.util.Dictionary;
49import java.util.Map;
50import java.util.Optional;
51import java.util.Set;
52import java.util.concurrent.CompletableFuture;
53import java.util.concurrent.Executors;
54import java.util.concurrent.ScheduledExecutorService;
55import java.util.concurrent.TimeUnit;
56import java.util.stream.Collectors;
57
58import static org.onlab.util.Tools.groupedThreads;
59import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PHASED_RECOVERY_DEFAULT;
60import static org.onosproject.segmentrouting.phasedrecovery.api.OsgiPropertyConstants.PROP_PHASED_RECOVERY;
61
62@Component(
63 immediate = true,
64 service = PhasedRecoveryService.class,
65 property = {
66 PROP_PHASED_RECOVERY + ":Boolean=" + PHASED_RECOVERY_DEFAULT
67 }
68)
69public class PhasedRecoveryManager implements PhasedRecoveryService {
70 private static final Logger log = LoggerFactory.getLogger(PhasedRecoveryManager.class);
71 private static final String APP_NAME = "org.onosproject.phasedrecovery";
72
73 // TODO Make these configurable via Component Config
74 // Amount of time delayed to wait for port description (in second)
75 private static final int PORT_CHECKER_INTERVAL = 1;
76 // Max number of retry for port checker
77 private static final int PORT_CHECKER_RETRIES = 5;
78 // RoutingStableChecker interval (in second)
79 private static final int ROUTING_CHECKER_DELAY = 3;
80 // RoutingStableChecker timeout (in second)
81 private static final int ROUTING_CHECKER_TIMEOUT = 15;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 private CoreService coreService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 private ComponentConfigService compCfgService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 private DeviceAdminService deviceAdminService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 private MastershipService mastershipService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 private StorageService storageService;
97
98 @Reference(cardinality = ReferenceCardinality.OPTIONAL)
99 volatile SegmentRoutingService srService;
100
101 /** Enabling phased recovery. */
102 boolean phasedRecovery = PHASED_RECOVERY_DEFAULT;
103
104 private ApplicationId appId;
105 private ConsistentMap<DeviceId, Phase> phasedRecoveryStore;
106 private ScheduledExecutorService executor = Executors.newScheduledThreadPool(
107 Runtime.getRuntime().availableProcessors(), groupedThreads("onos/sr/pr", "executor"));
108
109 @Activate
110 protected void activate(ComponentContext context) {
111 appId = coreService.registerApplication(APP_NAME);
112
113 KryoNamespace.Builder serializer = KryoNamespace.newBuilder()
114 .register(KryoNamespaces.API)
115 .register(Phase.class);
116 phasedRecoveryStore = storageService.<DeviceId, Phase>consistentMapBuilder()
117 .withName("onos-sr-phasedrecovery")
118 .withRelaxedReadConsistency()
119 .withSerializer(Serializer.using(serializer.build()))
120 .build();
121
122 compCfgService.registerProperties(getClass());
123 modified(context);
124 log.info("Started");
125 }
126
127 @Deactivate
128 protected void deactivate() {
129 phasedRecoveryStore.destroy();
130 compCfgService.unregisterProperties(getClass(), false);
131 log.info("Stopped");
132 }
133
134 @Modified
135 protected void modified(ComponentContext context) {
136 Dictionary<?, ?> properties = context.getProperties();
137 if (properties == null) {
138 return;
139 }
140
141 String strPhasedRecovery = Tools.get(properties, PROP_PHASED_RECOVERY);
142 boolean expectPhasedRecovery = Boolean.parseBoolean(strPhasedRecovery);
143 if (expectPhasedRecovery != phasedRecovery) {
144 phasedRecovery = expectPhasedRecovery;
145 log.info("{} phased recovery", phasedRecovery ? "Enabling" : "Disabling");
146 }
147 }
148
149 @Override
150 public boolean isEnabled() {
151 return phasedRecovery;
152 }
153
154 @Override
155 public boolean init(DeviceId deviceId) {
156 if (this.srService == null) {
157 log.info("SegmentRoutingService is not ready");
158 return false;
159 }
160 if (!mastershipService.isLocalMaster(deviceId)) {
161 log.info("Not master of {}", deviceId);
162 return false;
163 }
164
165 Phase phase = Optional.ofNullable(phasedRecoveryStore.putIfAbsent(deviceId, Phase.PENDING))
166 .map(Versioned::value).orElse(null);
167
168 if (phase != null) {
169 log.info("{} has been initialized already. Skipping.", deviceId);
170 return false;
171 } else {
172 Phase nextPhase = (phasedRecovery && this.srService.getPairDeviceId(deviceId).isPresent()) ?
173 Phase.PAIR : Phase.EDGE;
174 if (nextPhase == Phase.PAIR) {
175 // Wait for the PORT_STAT before entering next phase.
176 // Note: Unlikely, when the device init fails due to PORT_STATS timeout,
177 // it requires operator to manually move the device to the next phase by CLI command.
178 executor.schedule(new PortChecker(deviceId, PORT_CHECKER_RETRIES),
179 PORT_CHECKER_INTERVAL, TimeUnit.SECONDS);
180 } else {
181 // We assume that all ports will be reported as enabled on devices that don't require phased recovery
182 setPhase(deviceId, Phase.EDGE);
183 }
184 return true;
185 }
186 }
187
188 @Override
189 public boolean reset(DeviceId deviceId) {
190 if (this.srService == null) {
191 log.info("SegmentRoutingService is not ready");
192 return false;
193 }
194 // FIXME Skip mastership checking since master will not be available when a device goes offline
195 // Improve this when persistent mastership is introduced
196
197 Phase result = Optional.ofNullable(phasedRecoveryStore.remove(deviceId))
198 .map(Versioned::value).orElse(null);
199 if (result != null) {
200 log.info("{} is reset", deviceId);
201 }
202 return result != null;
203 }
204
205 @Override
206 public Map<DeviceId, Phase> getPhases() {
207 return phasedRecoveryStore.asJavaMap();
208 }
209
210 @Override
211 public Phase getPhase(DeviceId deviceId) {
212 return Optional.ofNullable(phasedRecoveryStore.get(deviceId)).map(Versioned::value).orElse(null);
213 }
214
215 @Override
216 public Phase setPhase(DeviceId deviceId, Phase newPhase) {
217 if (this.srService == null) {
218 log.info("SegmentRoutingService is not ready");
219 return null;
220 }
221 if (!mastershipService.isLocalMaster(deviceId)) {
222 log.info("Not master of {}", deviceId);
223 return null;
224 }
225
226 return Optional.ofNullable(phasedRecoveryStore.compute(deviceId, (k, v) -> {
227 if (v == null && newPhase == Phase.PENDING) {
228 log.info("Initializing {}", deviceId);
229 return newPhase;
230 } else if (v == Phase.PENDING && newPhase == Phase.PAIR) {
231 srService.initHost(deviceId);
232 // RouteHandler init is intentionally skipped when phased recovery is on.
233 // Edge ports remain down in this phase. Therefore, no nexthop will be discovered on the given device.
234 // The flow on given device will be programmed later by hostHandler.processHostMovedEvent()
235 changePairPort(deviceId, true);
236 log.info("Transitioning {} from PENDING to PAIR", deviceId);
237 return newPhase;
238 } else if (v == Phase.PAIR && newPhase == Phase.INFRA) {
239 changeInfraPorts(deviceId, true);
240 srService.initRoute(deviceId);
241 log.info("Transitioning {} from PAIR to INFRA", deviceId);
242 monitorRoutingStability(deviceId);
243 return newPhase;
244 } else if (v == Phase.INFRA && newPhase == Phase.EDGE) {
245 changeEdgePorts(deviceId, true);
246 log.info("Transitioning {} from INFRA to EDGE", deviceId);
247 return newPhase;
248 } else if (v == Phase.PENDING && newPhase == Phase.EDGE) {
249 changeAllPorts(deviceId, true);
250 srService.initHost(deviceId);
251 srService.initRoute(deviceId);
252 log.info("Transitioning {} from PENDING to EDGE", deviceId);
253 return newPhase;
254 } else {
255 log.debug("Ignore illegal state transition on {} from {} to {}", deviceId, v, newPhase);
256 return v;
257 }
258 })).map(Versioned::value).orElse(null);
259 }
260
261 private void monitorRoutingStability(DeviceId deviceId) {
262 CompletableFuture<Void> checkerFuture = new CompletableFuture<>();
263 CompletableFuture<Void> timeoutFuture =
264 Tools.completeAfter(ROUTING_CHECKER_TIMEOUT, TimeUnit.SECONDS);
265 RoutingStabilityChecker checker = new RoutingStabilityChecker(checkerFuture);
266
267 checkerFuture.runAfterEitherAsync(timeoutFuture, () -> {
268 if (checkerFuture.isDone()) {
269 log.info("Routing stable. Move {} to the next phase", deviceId);
270 } else {
271 log.info("Timeout reached. Move {} to the next phase", deviceId);
272 // Mark the future as completed to signify the termination of periodical checker
273 checkerFuture.complete(null);
274 }
275 setPhase(deviceId, Phase.EDGE);
276 });
277
278 executor.schedule(checker, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS);
279 }
280
281 @Override
282 public Set<PortNumber> changeAllPorts(DeviceId deviceId, boolean enabled) {
283 if (this.srService == null) {
284 log.warn("SegmentRoutingService is not ready. Unable to changeAllPorts({}) to {}", deviceId, enabled);
285 return Sets.newHashSet();
286 }
287 Set<PortNumber> portsToBeEnabled = deviceAdminService.getPorts(deviceId)
288 .stream().map(Port::number).collect(Collectors.toSet());
289 changePorts(deviceId, portsToBeEnabled, enabled);
290 return portsToBeEnabled;
291 }
292
293 @Override
294 public Set<PortNumber> changePairPort(DeviceId deviceId, boolean enabled) {
295 if (this.srService == null) {
296 log.warn("SegmentRoutingService is not ready. Unable to changePairPort({}) to {}", deviceId, enabled);
297 return Sets.newHashSet();
298 }
299 Set<PortNumber> portsToBeEnabled = this.srService.getPairLocalPort(deviceId)
300 .map(Sets::newHashSet).orElse(Sets.newHashSet());
301 changePorts(deviceId, portsToBeEnabled, enabled);
302 return portsToBeEnabled;
303 }
304
305 @Override
306 public Set<PortNumber> changeInfraPorts(DeviceId deviceId, boolean enabled) {
307 if (this.srService == null) {
308 log.warn("SegmentRoutingService is not ready. Unable to changeInfraPorts({}) to {}", deviceId, enabled);
309 return Sets.newHashSet();
310 }
311 Set<PortNumber> portsToBeEnabled = this.srService.getInfraPorts(deviceId);
312 changePorts(deviceId, portsToBeEnabled, enabled);
313 return portsToBeEnabled;
314 }
315
316 @Override
317 public Set<PortNumber> changeEdgePorts(DeviceId deviceId, boolean enabled) {
318 if (this.srService == null) {
319 log.warn("SegmentRoutingService is not ready. Unable to changeEdgePorts({}) to {}", deviceId, enabled);
320 return Sets.newHashSet();
321 }
322 Set<PortNumber> portsToBeEnabled = this.srService.getEdgePorts(deviceId);
323 changePorts(deviceId, portsToBeEnabled, enabled);
324 return portsToBeEnabled;
325 }
326
327 private void changePorts(DeviceId deviceId, Set<PortNumber> portNumbers, boolean enabled) {
328 log.info("{} {} on {}", enabled ? "Enabled" : "Disabled", portNumbers, deviceId);
329 portNumbers.forEach(portNumber ->
330 deviceAdminService.changePortState(deviceId, portNumber, enabled));
331 }
332
333 private class PortChecker implements Runnable {
334 int retries;
335 DeviceId deviceId;
336
337 PortChecker(DeviceId deviceId, int retries) {
338 this.deviceId = deviceId;
339 this.retries = retries;
340 }
341
342 @Override
343 public void run() {
344 retries -= 1;
345 if (retries < 0) {
346 log.warn("PORT_STATS timeout. Unable to initialize {}", deviceId);
347 return;
348 }
349
350 if (!deviceAdminService.getPorts(deviceId).isEmpty()) {
351 log.info("{} reported PORT_STATS", deviceId);
352 setPhase(deviceId, Phase.PAIR);
353 }
354 log.info("{} still waiting for PORT_STATS", deviceId);
355 executor.schedule(this, PORT_CHECKER_INTERVAL, TimeUnit.SECONDS);
356 }
357 }
358
359 private class RoutingStabilityChecker implements Runnable {
360 private final CompletableFuture<Void> future;
361
362 RoutingStabilityChecker(CompletableFuture<Void> future) {
363 this.future = future;
364 }
365
366 @Override
367 public void run() {
368 // Do not continue if the future has been completed
369 if (future.isDone()) {
370 log.trace("RouteStabilityChecker is done. Stop checking");
371 return;
372 }
373
374 if (srService.isRoutingStable()) {
375 log.trace("Routing is stable");
376 future.complete(null);
377 } else {
378 log.trace("Routing is not yet stable");
379 executor.schedule(this, ROUTING_CHECKER_DELAY, TimeUnit.SECONDS);
380 }
381 }
382 }
383}