blob: ce55eda728ad786cd9f93fa6c992d535e7bca8d5 [file] [log] [blame]
Jordan Halterman980a8c12017-09-22 18:01:19 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.upgrade.impl;
17
18import java.util.Objects;
19import java.util.concurrent.atomic.AtomicReference;
20
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
26import org.apache.felix.scr.annotations.Service;
27import org.onosproject.cluster.ControllerNode;
28import org.onosproject.cluster.UnifiedClusterService;
29import org.onosproject.core.Version;
30import org.onosproject.core.VersionService;
31import org.onosproject.event.AbstractListenerManager;
32import org.onosproject.store.serializers.KryoNamespaces;
33import org.onosproject.store.service.AtomicValue;
34import org.onosproject.store.service.AtomicValueEvent;
35import org.onosproject.store.service.AtomicValueEventListener;
36import org.onosproject.store.service.CoordinationService;
37import org.onosproject.store.service.Serializer;
38import org.onosproject.upgrade.Upgrade;
39import org.onosproject.upgrade.UpgradeAdminService;
40import org.onosproject.upgrade.UpgradeEvent;
41import org.onosproject.upgrade.UpgradeEventListener;
42import org.onosproject.upgrade.UpgradeService;
43import org.slf4j.Logger;
44
45import static org.slf4j.LoggerFactory.getLogger;
46
47/**
48 * Upgrade service implementation.
49 * <p>
50 * This implementation uses the {@link CoordinationService} to store upgrade state in a version-agnostic primitive.
51 * Upgrade state can be seen by current and future version nodes.
52 */
53@Component(immediate = true)
54@Service
55public class UpgradeManager
56 extends AbstractListenerManager<UpgradeEvent, UpgradeEventListener>
57 implements UpgradeService, UpgradeAdminService {
58
59 private final Logger log = getLogger(getClass());
60
61 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
62 protected VersionService versionService;
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
65 protected CoordinationService coordinationService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 protected UnifiedClusterService clusterService;
69
70 private Version localVersion;
71 private AtomicValue<Upgrade> state;
72 private final AtomicReference<Upgrade> currentState = new AtomicReference<>();
73 private final AtomicValueEventListener<Upgrade> stateListener = event -> handleChange(event);
74
75 @Activate
76 public void activate() {
77 state = coordinationService.<Upgrade>atomicValueBuilder()
78 .withName("onos-upgrade-state")
79 .withSerializer(Serializer.using(KryoNamespaces.API))
80 .build()
81 .asAtomicValue();
82 localVersion = versionService.version();
83
84 currentState.set(state.get());
85 if (currentState.get() == null) {
86 currentState.set(new Upgrade(localVersion, localVersion, Upgrade.Status.INACTIVE));
87 state.set(currentState.get());
88 }
89
90 Upgrade upgrade = currentState.get();
91
92 // If the upgrade state is not initialized, ensure this node matches the version of the cluster.
93 if (!upgrade.status().active() && !Objects.equals(upgrade.source(), localVersion)) {
94 log.error("Node version {} inconsistent with cluster version {}", localVersion, upgrade.source());
95 throw new IllegalStateException("Node version " + localVersion +
96 " inconsistent with cluster version " + upgrade.source());
97 }
98
99 // If the upgrade state is initialized then check the node version.
100 if (upgrade.status() == Upgrade.Status.INITIALIZED) {
101 // If the source version equals the target version, attempt to update the target version.
102 if (Objects.equals(upgrade.source(), upgrade.target()) && !Objects.equals(upgrade.target(), localVersion)) {
103 upgrade = new Upgrade(upgrade.source(), localVersion, upgrade.status());
104 currentState.set(upgrade);
105 state.set(upgrade);
106 }
107 }
108
109 // If the upgrade status is active, verify that the local version matches the upgrade version.
110 if (upgrade.status().active() && !Objects.equals(upgrade.source(), upgrade.target())) {
111 // If the upgrade source/target are not equal, validate that the node's version is consistent
112 // with versions in the upgrade. There are two possibilities: that a not-yet-upgraded node is being
113 // restarted, or that a node has been upgraded, so we need to check that this node is running either
114 // the source or target version.
115 if (!Objects.equals(localVersion, upgrade.source()) && !Objects.equals(localVersion, upgrade.target())) {
116 log.error("Cannot upgrade node to version {}; Upgrade to {} already in progress",
117 localVersion, upgrade.target());
118 throw new IllegalStateException("Cannot upgrade node to version " + localVersion + "; Upgrade to " +
119 upgrade.target() + " already in progress");
120 }
121 }
122
123 state.addListener(stateListener);
124 log.info("Started");
125 }
126
127 @Deactivate
128 public void deactivate() {
129 state.removeListener(stateListener);
130 log.info("Stopped");
131 }
132
133 @Override
134 public boolean isUpgrading() {
135 return getState().status().active();
136 }
137
138 @Override
139 public Upgrade getState() {
140 return currentState.get();
141 }
142
143 @Override
144 public Version getVersion() {
145 Upgrade upgrade = currentState.get();
146 return upgrade.status().upgraded()
147 ? upgrade.target()
148 : upgrade.source();
149 }
150
151 @Override
152 public boolean isLocalActive() {
153 return localVersion.equals(getVersion());
154 }
155
156 @Override
157 public boolean isLocalUpgraded() {
158 Upgrade upgrade = currentState.get();
159 return upgrade.status().active()
160 && !upgrade.source().equals(upgrade.target())
161 && localVersion.equals(upgrade.target());
162 }
163
164 @Override
165 public void initialize() {
166 Upgrade inactive = currentState.get();
167
168 // If the current upgrade status is active, fail initialization.
169 if (inactive.status().active()) {
170 throw new IllegalStateException("Upgrade already active");
171 }
172
173 // Set the upgrade status to INITIALIZING.
174 Upgrade initializing = new Upgrade(
175 localVersion,
176 localVersion,
177 Upgrade.Status.INITIALIZING);
178 if (!state.compareAndSet(inactive, initializing)) {
179 throw new IllegalStateException("Concurrent upgrade modification");
180 } else {
181 currentState.set(initializing);
182
183 // Set the upgrade status to INITIALIZED.
184 Upgrade initialized = new Upgrade(
185 initializing.source(),
186 initializing.target(),
187 Upgrade.Status.INITIALIZED);
188 if (!state.compareAndSet(initializing, initialized)) {
189 throw new IllegalStateException("Concurrent upgrade modification");
190 } else {
191 currentState.set(initialized);
192 }
193 }
194 }
195
196 @Override
197 public void upgrade() {
198 Upgrade initialized = currentState.get();
199
200 // If the current upgrade status is not INITIALIZED, throw an exception.
201 if (initialized.status() != Upgrade.Status.INITIALIZED) {
202 throw new IllegalStateException("Upgrade not initialized");
203 }
204
205 // Set the upgrade status to UPGRADING.
206 Upgrade upgrading = new Upgrade(
207 initialized.source(),
208 initialized.target(),
209 Upgrade.Status.UPGRADING);
210 if (!state.compareAndSet(initialized, upgrading)) {
211 throw new IllegalStateException("Concurrent upgrade modification");
212 } else {
213 currentState.set(upgrading);
214
215 // Set the upgrade status to UPGRADED.
216 Upgrade upgraded = new Upgrade(
217 upgrading.source(),
218 upgrading.target(),
219 Upgrade.Status.UPGRADED);
220 if (!state.compareAndSet(upgrading, upgraded)) {
221 throw new IllegalStateException("Concurrent upgrade modification");
222 } else {
223 currentState.set(upgraded);
224 }
225 }
226 }
227
228 @Override
229 public void commit() {
230 Upgrade upgraded = currentState.get();
231
232 // If the current upgrade status is not UPGRADED, throw an exception.
233 if (upgraded.status() != Upgrade.Status.UPGRADED) {
234 throw new IllegalStateException("Upgrade not performed");
235 }
236
237 // Determine whether any nodes have not been upgraded to the target version.
238 boolean upgradeComplete = clusterService.getNodes()
239 .stream()
240 .allMatch(node -> {
241 ControllerNode.State state = clusterService.getState(node.id());
242 Version version = clusterService.getVersion(node.id());
243 return state.isActive() && version != null && version.equals(upgraded.target());
244 });
245
246 // If some nodes have not yet been upgraded, throw an exception.
247 if (!upgradeComplete) {
248 throw new IllegalStateException("Some nodes have not yet been upgraded to version " + upgraded.target());
249 }
250
251 // Set the upgrade status to COMMITTING.
252 Upgrade committing = new Upgrade(
253 upgraded.source(),
254 upgraded.target(),
255 Upgrade.Status.COMMITTING);
256 if (!state.compareAndSet(upgraded, committing)) {
257 throw new IllegalStateException("Concurrent upgrade modification");
258 } else {
259 currentState.set(committing);
260
261 // Set the upgrade status to COMMITTED.
262 Upgrade committed = new Upgrade(
263 committing.source(),
264 committing.target(),
265 Upgrade.Status.COMMITTED);
266 if (!state.compareAndSet(committing, committed)) {
267 throw new IllegalStateException("Concurrent upgrade modification");
268 } else {
269 currentState.set(committed);
270
271 // Set the upgrade status to INACTIVE.
272 Upgrade inactive = new Upgrade(
273 localVersion,
274 localVersion,
275 Upgrade.Status.INACTIVE);
276 if (!state.compareAndSet(committed, inactive)) {
277 throw new IllegalStateException("Concurrent upgrade modification");
278 } else {
279 currentState.set(inactive);
280 }
281 }
282 }
283 }
284
285 @Override
286 public void rollback() {
287 Upgrade upgraded = currentState.get();
288
289 // If the current upgrade status is not UPGRADED, throw an exception.
290 if (upgraded.status() != Upgrade.Status.UPGRADED) {
291 throw new IllegalStateException("Upgrade not performed");
292 }
293
294 // Set the upgrade status to ROLLING_BACK.
295 Upgrade rollingBack = new Upgrade(
296 upgraded.source(),
297 upgraded.target(),
298 Upgrade.Status.ROLLING_BACK);
299 if (!state.compareAndSet(upgraded, rollingBack)) {
300 throw new IllegalStateException("Concurrent upgrade modification");
301 } else {
302 currentState.set(rollingBack);
303
304 // Set the upgrade status to ROLLED_BACK.
305 Upgrade rolledBack = new Upgrade(
306 rollingBack.source(),
307 rollingBack.target(),
308 Upgrade.Status.ROLLED_BACK);
309 if (!state.compareAndSet(rollingBack, rolledBack)) {
310 throw new IllegalStateException("Concurrent upgrade modification");
311 } else {
312 currentState.set(rolledBack);
313 }
314 }
315 }
316
317 @Override
318 public void reset() {
319 Upgrade upgraded = currentState.get();
320
321 // If the current upgrade status is not INITIALIZED or ROLLED_BACK, throw an exception.
322 if (upgraded.status() != Upgrade.Status.INITIALIZED
323 && upgraded.status() != Upgrade.Status.ROLLED_BACK) {
324 throw new IllegalStateException("Upgrade not rolled back");
325 }
326
327 // Determine whether any nodes are still running the target version.
328 boolean rollbackComplete = clusterService.getNodes()
329 .stream()
330 .allMatch(node -> {
331 ControllerNode.State state = clusterService.getState(node.id());
332 Version version = clusterService.getVersion(node.id());
333 return state.isActive() && version != null && version.equals(upgraded.source());
334 });
335
336 // If some nodes have not yet been downgraded, throw an exception.
337 if (!rollbackComplete) {
338 throw new IllegalStateException("Some nodes have not yet been downgraded to version " + upgraded.source());
339 }
340
341 // Set the upgrade status to RESETTING.
342 Upgrade resetting = new Upgrade(
343 upgraded.source(),
344 upgraded.target(),
345 Upgrade.Status.RESETTING);
346 if (!state.compareAndSet(upgraded, resetting)) {
347 throw new IllegalStateException("Concurrent upgrade modification");
348 } else {
349 currentState.set(resetting);
350
351 // Set the upgrade status to RESET.
352 Upgrade reset = new Upgrade(
353 resetting.source(),
354 resetting.target(),
355 Upgrade.Status.RESET);
356 if (!state.compareAndSet(resetting, reset)) {
357 throw new IllegalStateException("Concurrent upgrade modification");
358 } else {
359 currentState.set(reset);
360
361 // Set the upgrade status to INACTIVE.
362 Upgrade inactive = new Upgrade(
363 localVersion,
364 localVersion,
365 Upgrade.Status.INACTIVE);
366 if (!state.compareAndSet(reset, inactive)) {
367 throw new IllegalStateException("Concurrent upgrade modification");
368 } else {
369 currentState.set(inactive);
370 }
371 }
372 }
373 }
374
375 private void handleChange(AtomicValueEvent<Upgrade> event) {
376 currentState.set(event.newValue());
377 switch (event.newValue().status()) {
378 case INITIALIZED:
379 post(new UpgradeEvent(UpgradeEvent.Type.INITIALIZED, event.newValue()));
380 break;
381 case UPGRADED:
382 post(new UpgradeEvent(UpgradeEvent.Type.UPGRADED, event.newValue()));
383 break;
384 case COMMITTED:
385 post(new UpgradeEvent(UpgradeEvent.Type.COMMITTED, event.newValue()));
386 break;
387 case ROLLED_BACK:
388 post(new UpgradeEvent(UpgradeEvent.Type.ROLLED_BACK, event.newValue()));
389 break;
390 case RESET:
391 post(new UpgradeEvent(UpgradeEvent.Type.RESET, event.newValue()));
392 break;
393 default:
394 break;
395 }
396 }
397}