blob: 410b137494c8bdf50b4653821e3158a60970fc8c [file] [log] [blame]
Jordan Halterman980a8c12017-09-22 18:01:19 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.upgrade.impl;
17
18import java.util.Objects;
Jordan Halterman5ca07932017-10-07 13:28:22 -070019import java.util.Set;
Jordan Halterman980a8c12017-09-22 18:01:19 -070020import java.util.concurrent.atomic.AtomicReference;
Jordan Halterman5ca07932017-10-07 13:28:22 -070021import java.util.stream.Collectors;
Jordan Halterman980a8c12017-09-22 18:01:19 -070022
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Jordan Halterman5ca07932017-10-07 13:28:22 -070029import org.onosproject.cluster.ClusterEvent;
30import org.onosproject.cluster.ClusterEventListener;
Jordan Halterman980a8c12017-09-22 18:01:19 -070031import org.onosproject.cluster.ControllerNode;
Jordan Halterman5ca07932017-10-07 13:28:22 -070032import org.onosproject.cluster.NodeId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070033import org.onosproject.cluster.UnifiedClusterService;
34import org.onosproject.core.Version;
35import org.onosproject.core.VersionService;
36import org.onosproject.event.AbstractListenerManager;
37import org.onosproject.store.serializers.KryoNamespaces;
38import org.onosproject.store.service.AtomicValue;
39import org.onosproject.store.service.AtomicValueEvent;
40import org.onosproject.store.service.AtomicValueEventListener;
41import org.onosproject.store.service.CoordinationService;
42import org.onosproject.store.service.Serializer;
43import org.onosproject.upgrade.Upgrade;
44import org.onosproject.upgrade.UpgradeAdminService;
45import org.onosproject.upgrade.UpgradeEvent;
46import org.onosproject.upgrade.UpgradeEventListener;
47import org.onosproject.upgrade.UpgradeService;
48import org.slf4j.Logger;
49
50import static org.slf4j.LoggerFactory.getLogger;
51
52/**
53 * Upgrade service implementation.
54 * <p>
55 * This implementation uses the {@link CoordinationService} to store upgrade state in a version-agnostic primitive.
56 * Upgrade state can be seen by current and future version nodes.
57 */
58@Component(immediate = true)
59@Service
60public class UpgradeManager
61 extends AbstractListenerManager<UpgradeEvent, UpgradeEventListener>
62 implements UpgradeService, UpgradeAdminService {
63
64 private final Logger log = getLogger(getClass());
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
67 protected VersionService versionService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
70 protected CoordinationService coordinationService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
73 protected UnifiedClusterService clusterService;
74
75 private Version localVersion;
76 private AtomicValue<Upgrade> state;
77 private final AtomicReference<Upgrade> currentState = new AtomicReference<>();
Jordan Halterman5ca07932017-10-07 13:28:22 -070078 private final AtomicValueEventListener<Upgrade> stateListener = event -> handleUpgradeEvent(event);
79 private final ClusterEventListener clusterListener = event -> handleClusterEvent(event);
Jordan Halterman980a8c12017-09-22 18:01:19 -070080
81 @Activate
82 public void activate() {
83 state = coordinationService.<Upgrade>atomicValueBuilder()
84 .withName("onos-upgrade-state")
85 .withSerializer(Serializer.using(KryoNamespaces.API))
86 .build()
87 .asAtomicValue();
88 localVersion = versionService.version();
89
90 currentState.set(state.get());
91 if (currentState.get() == null) {
92 currentState.set(new Upgrade(localVersion, localVersion, Upgrade.Status.INACTIVE));
93 state.set(currentState.get());
94 }
95
96 Upgrade upgrade = currentState.get();
97
98 // If the upgrade state is not initialized, ensure this node matches the version of the cluster.
99 if (!upgrade.status().active() && !Objects.equals(upgrade.source(), localVersion)) {
100 log.error("Node version {} inconsistent with cluster version {}", localVersion, upgrade.source());
101 throw new IllegalStateException("Node version " + localVersion +
102 " inconsistent with cluster version " + upgrade.source());
103 }
104
105 // If the upgrade state is initialized then check the node version.
106 if (upgrade.status() == Upgrade.Status.INITIALIZED) {
107 // If the source version equals the target version, attempt to update the target version.
108 if (Objects.equals(upgrade.source(), upgrade.target()) && !Objects.equals(upgrade.target(), localVersion)) {
109 upgrade = new Upgrade(upgrade.source(), localVersion, upgrade.status());
110 currentState.set(upgrade);
111 state.set(upgrade);
112 }
113 }
114
115 // If the upgrade status is active, verify that the local version matches the upgrade version.
116 if (upgrade.status().active() && !Objects.equals(upgrade.source(), upgrade.target())) {
117 // If the upgrade source/target are not equal, validate that the node's version is consistent
118 // with versions in the upgrade. There are two possibilities: that a not-yet-upgraded node is being
119 // restarted, or that a node has been upgraded, so we need to check that this node is running either
120 // the source or target version.
121 if (!Objects.equals(localVersion, upgrade.source()) && !Objects.equals(localVersion, upgrade.target())) {
122 log.error("Cannot upgrade node to version {}; Upgrade to {} already in progress",
123 localVersion, upgrade.target());
124 throw new IllegalStateException("Cannot upgrade node to version " + localVersion + "; Upgrade to " +
125 upgrade.target() + " already in progress");
126 }
127 }
128
129 state.addListener(stateListener);
Jordan Halterman5ca07932017-10-07 13:28:22 -0700130 clusterService.addListener(clusterListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700131 log.info("Started");
132 }
133
134 @Deactivate
135 public void deactivate() {
136 state.removeListener(stateListener);
Jordan Halterman5ca07932017-10-07 13:28:22 -0700137 clusterService.removeListener(clusterListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700138 log.info("Stopped");
139 }
140
141 @Override
142 public boolean isUpgrading() {
143 return getState().status().active();
144 }
145
146 @Override
147 public Upgrade getState() {
148 return currentState.get();
149 }
150
151 @Override
152 public Version getVersion() {
153 Upgrade upgrade = currentState.get();
154 return upgrade.status().upgraded()
155 ? upgrade.target()
156 : upgrade.source();
157 }
158
159 @Override
160 public boolean isLocalActive() {
161 return localVersion.equals(getVersion());
162 }
163
164 @Override
165 public boolean isLocalUpgraded() {
166 Upgrade upgrade = currentState.get();
167 return upgrade.status().active()
168 && !upgrade.source().equals(upgrade.target())
169 && localVersion.equals(upgrade.target());
170 }
171
172 @Override
173 public void initialize() {
174 Upgrade inactive = currentState.get();
175
176 // If the current upgrade status is active, fail initialization.
177 if (inactive.status().active()) {
178 throw new IllegalStateException("Upgrade already active");
179 }
180
181 // Set the upgrade status to INITIALIZING.
182 Upgrade initializing = new Upgrade(
183 localVersion,
184 localVersion,
185 Upgrade.Status.INITIALIZING);
186 if (!state.compareAndSet(inactive, initializing)) {
187 throw new IllegalStateException("Concurrent upgrade modification");
188 } else {
189 currentState.set(initializing);
190
191 // Set the upgrade status to INITIALIZED.
192 Upgrade initialized = new Upgrade(
193 initializing.source(),
194 initializing.target(),
195 Upgrade.Status.INITIALIZED);
196 if (!state.compareAndSet(initializing, initialized)) {
197 throw new IllegalStateException("Concurrent upgrade modification");
198 } else {
199 currentState.set(initialized);
200 }
201 }
202 }
203
204 @Override
205 public void upgrade() {
206 Upgrade initialized = currentState.get();
207
208 // If the current upgrade status is not INITIALIZED, throw an exception.
209 if (initialized.status() != Upgrade.Status.INITIALIZED) {
210 throw new IllegalStateException("Upgrade not initialized");
211 }
212
213 // Set the upgrade status to UPGRADING.
214 Upgrade upgrading = new Upgrade(
215 initialized.source(),
216 initialized.target(),
217 Upgrade.Status.UPGRADING);
218 if (!state.compareAndSet(initialized, upgrading)) {
219 throw new IllegalStateException("Concurrent upgrade modification");
220 } else {
221 currentState.set(upgrading);
222
223 // Set the upgrade status to UPGRADED.
224 Upgrade upgraded = new Upgrade(
225 upgrading.source(),
226 upgrading.target(),
227 Upgrade.Status.UPGRADED);
228 if (!state.compareAndSet(upgrading, upgraded)) {
229 throw new IllegalStateException("Concurrent upgrade modification");
230 } else {
231 currentState.set(upgraded);
232 }
233 }
234 }
235
236 @Override
237 public void commit() {
238 Upgrade upgraded = currentState.get();
239
240 // If the current upgrade status is not UPGRADED, throw an exception.
241 if (upgraded.status() != Upgrade.Status.UPGRADED) {
242 throw new IllegalStateException("Upgrade not performed");
243 }
244
245 // Determine whether any nodes have not been upgraded to the target version.
246 boolean upgradeComplete = clusterService.getNodes()
247 .stream()
248 .allMatch(node -> {
249 ControllerNode.State state = clusterService.getState(node.id());
250 Version version = clusterService.getVersion(node.id());
251 return state.isActive() && version != null && version.equals(upgraded.target());
252 });
253
254 // If some nodes have not yet been upgraded, throw an exception.
255 if (!upgradeComplete) {
256 throw new IllegalStateException("Some nodes have not yet been upgraded to version " + upgraded.target());
257 }
258
259 // Set the upgrade status to COMMITTING.
260 Upgrade committing = new Upgrade(
261 upgraded.source(),
262 upgraded.target(),
263 Upgrade.Status.COMMITTING);
264 if (!state.compareAndSet(upgraded, committing)) {
265 throw new IllegalStateException("Concurrent upgrade modification");
266 } else {
267 currentState.set(committing);
268
269 // Set the upgrade status to COMMITTED.
270 Upgrade committed = new Upgrade(
271 committing.source(),
272 committing.target(),
273 Upgrade.Status.COMMITTED);
274 if (!state.compareAndSet(committing, committed)) {
275 throw new IllegalStateException("Concurrent upgrade modification");
276 } else {
277 currentState.set(committed);
278
279 // Set the upgrade status to INACTIVE.
280 Upgrade inactive = new Upgrade(
281 localVersion,
282 localVersion,
283 Upgrade.Status.INACTIVE);
284 if (!state.compareAndSet(committed, inactive)) {
285 throw new IllegalStateException("Concurrent upgrade modification");
286 } else {
287 currentState.set(inactive);
288 }
289 }
290 }
291 }
292
293 @Override
294 public void rollback() {
295 Upgrade upgraded = currentState.get();
296
297 // If the current upgrade status is not UPGRADED, throw an exception.
298 if (upgraded.status() != Upgrade.Status.UPGRADED) {
299 throw new IllegalStateException("Upgrade not performed");
300 }
301
302 // Set the upgrade status to ROLLING_BACK.
303 Upgrade rollingBack = new Upgrade(
304 upgraded.source(),
305 upgraded.target(),
306 Upgrade.Status.ROLLING_BACK);
307 if (!state.compareAndSet(upgraded, rollingBack)) {
308 throw new IllegalStateException("Concurrent upgrade modification");
309 } else {
310 currentState.set(rollingBack);
311
312 // Set the upgrade status to ROLLED_BACK.
313 Upgrade rolledBack = new Upgrade(
314 rollingBack.source(),
315 rollingBack.target(),
316 Upgrade.Status.ROLLED_BACK);
317 if (!state.compareAndSet(rollingBack, rolledBack)) {
318 throw new IllegalStateException("Concurrent upgrade modification");
319 } else {
320 currentState.set(rolledBack);
321 }
322 }
323 }
324
325 @Override
326 public void reset() {
327 Upgrade upgraded = currentState.get();
328
329 // If the current upgrade status is not INITIALIZED or ROLLED_BACK, throw an exception.
330 if (upgraded.status() != Upgrade.Status.INITIALIZED
331 && upgraded.status() != Upgrade.Status.ROLLED_BACK) {
332 throw new IllegalStateException("Upgrade not rolled back");
333 }
334
335 // Determine whether any nodes are still running the target version.
336 boolean rollbackComplete = clusterService.getNodes()
337 .stream()
338 .allMatch(node -> {
339 ControllerNode.State state = clusterService.getState(node.id());
340 Version version = clusterService.getVersion(node.id());
341 return state.isActive() && version != null && version.equals(upgraded.source());
342 });
343
344 // If some nodes have not yet been downgraded, throw an exception.
345 if (!rollbackComplete) {
346 throw new IllegalStateException("Some nodes have not yet been downgraded to version " + upgraded.source());
347 }
348
349 // Set the upgrade status to RESETTING.
350 Upgrade resetting = new Upgrade(
351 upgraded.source(),
352 upgraded.target(),
353 Upgrade.Status.RESETTING);
354 if (!state.compareAndSet(upgraded, resetting)) {
355 throw new IllegalStateException("Concurrent upgrade modification");
356 } else {
357 currentState.set(resetting);
358
359 // Set the upgrade status to RESET.
360 Upgrade reset = new Upgrade(
361 resetting.source(),
362 resetting.target(),
363 Upgrade.Status.RESET);
364 if (!state.compareAndSet(resetting, reset)) {
365 throw new IllegalStateException("Concurrent upgrade modification");
366 } else {
367 currentState.set(reset);
368
369 // Set the upgrade status to INACTIVE.
370 Upgrade inactive = new Upgrade(
371 localVersion,
372 localVersion,
373 Upgrade.Status.INACTIVE);
374 if (!state.compareAndSet(reset, inactive)) {
375 throw new IllegalStateException("Concurrent upgrade modification");
376 } else {
377 currentState.set(inactive);
378 }
379 }
380 }
381 }
382
Jordan Halterman5ca07932017-10-07 13:28:22 -0700383 /**
384 * Handles a cluster event.
385 *
386 * @param event the cluster event
387 */
388 protected void handleClusterEvent(ClusterEvent event) {
389 // If an instance was deactivated, check whether we need to roll back the upgrade.
390 if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
391 Upgrade upgrade = state.get();
392 if (upgrade.status().upgraded()) {
393 // Get the upgraded subset of the cluster and check whether the down node is a member
394 // of the upgraded subset. If so, roll back the upgrade to tolerate the failure.
395 Set<NodeId> upgradedNodes = clusterService.getNodes().stream()
396 .map(ControllerNode::id)
397 .filter(id -> clusterService.getVersion(id).equals(upgrade.target()))
398 .collect(Collectors.toSet());
399 if (upgradedNodes.contains(event.subject().id())) {
400 rollback();
401 }
402 }
403 }
404 }
405
406 /**
407 * Handles an upgrade state event.
408 *
409 * @param event the upgrade value event
410 */
411 protected void handleUpgradeEvent(AtomicValueEvent<Upgrade> event) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700412 currentState.set(event.newValue());
413 switch (event.newValue().status()) {
414 case INITIALIZED:
415 post(new UpgradeEvent(UpgradeEvent.Type.INITIALIZED, event.newValue()));
416 break;
417 case UPGRADED:
418 post(new UpgradeEvent(UpgradeEvent.Type.UPGRADED, event.newValue()));
419 break;
420 case COMMITTED:
421 post(new UpgradeEvent(UpgradeEvent.Type.COMMITTED, event.newValue()));
422 break;
423 case ROLLED_BACK:
424 post(new UpgradeEvent(UpgradeEvent.Type.ROLLED_BACK, event.newValue()));
425 break;
426 case RESET:
427 post(new UpgradeEvent(UpgradeEvent.Type.RESET, event.newValue()));
428 break;
429 default:
430 break;
431 }
432 }
433}