blob: 9fdd7a23d56f9475c08f4c2b901576b9c6036043 [file] [log] [blame]
Jordan Halterman980a8c12017-09-22 18:01:19 -07001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.upgrade.impl;
17
18import java.util.Objects;
Jordan Halterman5ca07932017-10-07 13:28:22 -070019import java.util.Set;
Jordan Halterman980a8c12017-09-22 18:01:19 -070020import java.util.concurrent.atomic.AtomicReference;
Jordan Halterman5ca07932017-10-07 13:28:22 -070021import java.util.stream.Collectors;
Jordan Halterman980a8c12017-09-22 18:01:19 -070022
23import org.apache.felix.scr.annotations.Activate;
24import org.apache.felix.scr.annotations.Component;
25import org.apache.felix.scr.annotations.Deactivate;
26import org.apache.felix.scr.annotations.Reference;
27import org.apache.felix.scr.annotations.ReferenceCardinality;
28import org.apache.felix.scr.annotations.Service;
Jordan Halterman5ca07932017-10-07 13:28:22 -070029import org.onosproject.cluster.ClusterEvent;
30import org.onosproject.cluster.ClusterEventListener;
Jordan Halterman28183ee2017-10-17 17:29:10 -070031import org.onosproject.cluster.ClusterService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070032import org.onosproject.cluster.ControllerNode;
Jordan Halterman28183ee2017-10-17 17:29:10 -070033import org.onosproject.cluster.MembershipService;
Jordan Halterman5ca07932017-10-07 13:28:22 -070034import org.onosproject.cluster.NodeId;
Jordan Halterman980a8c12017-09-22 18:01:19 -070035import org.onosproject.core.Version;
36import org.onosproject.core.VersionService;
37import org.onosproject.event.AbstractListenerManager;
38import org.onosproject.store.serializers.KryoNamespaces;
39import org.onosproject.store.service.AtomicValue;
40import org.onosproject.store.service.AtomicValueEvent;
41import org.onosproject.store.service.AtomicValueEventListener;
42import org.onosproject.store.service.CoordinationService;
43import org.onosproject.store.service.Serializer;
44import org.onosproject.upgrade.Upgrade;
45import org.onosproject.upgrade.UpgradeAdminService;
46import org.onosproject.upgrade.UpgradeEvent;
47import org.onosproject.upgrade.UpgradeEventListener;
48import org.onosproject.upgrade.UpgradeService;
49import org.slf4j.Logger;
50
51import static org.slf4j.LoggerFactory.getLogger;
52
53/**
54 * Upgrade service implementation.
55 * <p>
56 * This implementation uses the {@link CoordinationService} to store upgrade state in a version-agnostic primitive.
57 * Upgrade state can be seen by current and future version nodes.
58 */
59@Component(immediate = true)
60@Service
61public class UpgradeManager
62 extends AbstractListenerManager<UpgradeEvent, UpgradeEventListener>
63 implements UpgradeService, UpgradeAdminService {
64
65 private final Logger log = getLogger(getClass());
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
68 protected VersionService versionService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
71 protected CoordinationService coordinationService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
Jordan Halterman28183ee2017-10-17 17:29:10 -070074 protected ClusterService clusterService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
77 protected MembershipService membershipService;
Jordan Halterman980a8c12017-09-22 18:01:19 -070078
79 private Version localVersion;
80 private AtomicValue<Upgrade> state;
81 private final AtomicReference<Upgrade> currentState = new AtomicReference<>();
Jordan Halterman5ca07932017-10-07 13:28:22 -070082 private final AtomicValueEventListener<Upgrade> stateListener = event -> handleUpgradeEvent(event);
83 private final ClusterEventListener clusterListener = event -> handleClusterEvent(event);
Jordan Halterman980a8c12017-09-22 18:01:19 -070084
85 @Activate
86 public void activate() {
87 state = coordinationService.<Upgrade>atomicValueBuilder()
88 .withName("onos-upgrade-state")
89 .withSerializer(Serializer.using(KryoNamespaces.API))
90 .build()
91 .asAtomicValue();
92 localVersion = versionService.version();
93
94 currentState.set(state.get());
95 if (currentState.get() == null) {
96 currentState.set(new Upgrade(localVersion, localVersion, Upgrade.Status.INACTIVE));
97 state.set(currentState.get());
98 }
99
100 Upgrade upgrade = currentState.get();
101
102 // If the upgrade state is not initialized, ensure this node matches the version of the cluster.
103 if (!upgrade.status().active() && !Objects.equals(upgrade.source(), localVersion)) {
104 log.error("Node version {} inconsistent with cluster version {}", localVersion, upgrade.source());
105 throw new IllegalStateException("Node version " + localVersion +
106 " inconsistent with cluster version " + upgrade.source());
107 }
108
109 // If the upgrade state is initialized then check the node version.
110 if (upgrade.status() == Upgrade.Status.INITIALIZED) {
111 // If the source version equals the target version, attempt to update the target version.
112 if (Objects.equals(upgrade.source(), upgrade.target()) && !Objects.equals(upgrade.target(), localVersion)) {
113 upgrade = new Upgrade(upgrade.source(), localVersion, upgrade.status());
114 currentState.set(upgrade);
115 state.set(upgrade);
116 }
117 }
118
119 // If the upgrade status is active, verify that the local version matches the upgrade version.
120 if (upgrade.status().active() && !Objects.equals(upgrade.source(), upgrade.target())) {
121 // If the upgrade source/target are not equal, validate that the node's version is consistent
122 // with versions in the upgrade. There are two possibilities: that a not-yet-upgraded node is being
123 // restarted, or that a node has been upgraded, so we need to check that this node is running either
124 // the source or target version.
125 if (!Objects.equals(localVersion, upgrade.source()) && !Objects.equals(localVersion, upgrade.target())) {
126 log.error("Cannot upgrade node to version {}; Upgrade to {} already in progress",
127 localVersion, upgrade.target());
128 throw new IllegalStateException("Cannot upgrade node to version " + localVersion + "; Upgrade to " +
129 upgrade.target() + " already in progress");
130 }
131 }
132
133 state.addListener(stateListener);
Jordan Halterman5ca07932017-10-07 13:28:22 -0700134 clusterService.addListener(clusterListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700135 log.info("Started");
136 }
137
138 @Deactivate
139 public void deactivate() {
140 state.removeListener(stateListener);
Jordan Halterman5ca07932017-10-07 13:28:22 -0700141 clusterService.removeListener(clusterListener);
Jordan Halterman980a8c12017-09-22 18:01:19 -0700142 log.info("Stopped");
143 }
144
145 @Override
146 public boolean isUpgrading() {
147 return getState().status().active();
148 }
149
150 @Override
151 public Upgrade getState() {
152 return currentState.get();
153 }
154
155 @Override
156 public Version getVersion() {
157 Upgrade upgrade = currentState.get();
158 return upgrade.status().upgraded()
159 ? upgrade.target()
160 : upgrade.source();
161 }
162
163 @Override
164 public boolean isLocalActive() {
165 return localVersion.equals(getVersion());
166 }
167
168 @Override
169 public boolean isLocalUpgraded() {
170 Upgrade upgrade = currentState.get();
171 return upgrade.status().active()
172 && !upgrade.source().equals(upgrade.target())
173 && localVersion.equals(upgrade.target());
174 }
175
176 @Override
177 public void initialize() {
178 Upgrade inactive = currentState.get();
179
180 // If the current upgrade status is active, fail initialization.
181 if (inactive.status().active()) {
182 throw new IllegalStateException("Upgrade already active");
183 }
184
185 // Set the upgrade status to INITIALIZING.
186 Upgrade initializing = new Upgrade(
187 localVersion,
188 localVersion,
189 Upgrade.Status.INITIALIZING);
190 if (!state.compareAndSet(inactive, initializing)) {
191 throw new IllegalStateException("Concurrent upgrade modification");
192 } else {
193 currentState.set(initializing);
194
195 // Set the upgrade status to INITIALIZED.
196 Upgrade initialized = new Upgrade(
197 initializing.source(),
198 initializing.target(),
199 Upgrade.Status.INITIALIZED);
200 if (!state.compareAndSet(initializing, initialized)) {
201 throw new IllegalStateException("Concurrent upgrade modification");
202 } else {
203 currentState.set(initialized);
204 }
205 }
206 }
207
208 @Override
209 public void upgrade() {
210 Upgrade initialized = currentState.get();
211
212 // If the current upgrade status is not INITIALIZED, throw an exception.
213 if (initialized.status() != Upgrade.Status.INITIALIZED) {
214 throw new IllegalStateException("Upgrade not initialized");
215 }
216
217 // Set the upgrade status to UPGRADING.
218 Upgrade upgrading = new Upgrade(
219 initialized.source(),
220 initialized.target(),
221 Upgrade.Status.UPGRADING);
222 if (!state.compareAndSet(initialized, upgrading)) {
223 throw new IllegalStateException("Concurrent upgrade modification");
224 } else {
225 currentState.set(upgrading);
226
227 // Set the upgrade status to UPGRADED.
228 Upgrade upgraded = new Upgrade(
229 upgrading.source(),
230 upgrading.target(),
231 Upgrade.Status.UPGRADED);
232 if (!state.compareAndSet(upgrading, upgraded)) {
233 throw new IllegalStateException("Concurrent upgrade modification");
234 } else {
235 currentState.set(upgraded);
236 }
237 }
238 }
239
240 @Override
241 public void commit() {
242 Upgrade upgraded = currentState.get();
243
244 // If the current upgrade status is not UPGRADED, throw an exception.
245 if (upgraded.status() != Upgrade.Status.UPGRADED) {
246 throw new IllegalStateException("Upgrade not performed");
247 }
248
249 // Determine whether any nodes have not been upgraded to the target version.
Jordan Halterman28183ee2017-10-17 17:29:10 -0700250 boolean upgradeComplete = membershipService.getGroups().size() == 1
251 && membershipService.getLocalGroup().version().equals(upgraded.target());
Jordan Halterman980a8c12017-09-22 18:01:19 -0700252
253 // If some nodes have not yet been upgraded, throw an exception.
254 if (!upgradeComplete) {
255 throw new IllegalStateException("Some nodes have not yet been upgraded to version " + upgraded.target());
256 }
257
258 // Set the upgrade status to COMMITTING.
259 Upgrade committing = new Upgrade(
260 upgraded.source(),
261 upgraded.target(),
262 Upgrade.Status.COMMITTING);
263 if (!state.compareAndSet(upgraded, committing)) {
264 throw new IllegalStateException("Concurrent upgrade modification");
265 } else {
266 currentState.set(committing);
267
268 // Set the upgrade status to COMMITTED.
269 Upgrade committed = new Upgrade(
270 committing.source(),
271 committing.target(),
272 Upgrade.Status.COMMITTED);
273 if (!state.compareAndSet(committing, committed)) {
274 throw new IllegalStateException("Concurrent upgrade modification");
275 } else {
276 currentState.set(committed);
277
278 // Set the upgrade status to INACTIVE.
279 Upgrade inactive = new Upgrade(
280 localVersion,
281 localVersion,
282 Upgrade.Status.INACTIVE);
283 if (!state.compareAndSet(committed, inactive)) {
284 throw new IllegalStateException("Concurrent upgrade modification");
285 } else {
286 currentState.set(inactive);
287 }
288 }
289 }
290 }
291
292 @Override
293 public void rollback() {
294 Upgrade upgraded = currentState.get();
295
296 // If the current upgrade status is not UPGRADED, throw an exception.
297 if (upgraded.status() != Upgrade.Status.UPGRADED) {
298 throw new IllegalStateException("Upgrade not performed");
299 }
300
301 // Set the upgrade status to ROLLING_BACK.
302 Upgrade rollingBack = new Upgrade(
303 upgraded.source(),
304 upgraded.target(),
305 Upgrade.Status.ROLLING_BACK);
306 if (!state.compareAndSet(upgraded, rollingBack)) {
307 throw new IllegalStateException("Concurrent upgrade modification");
308 } else {
309 currentState.set(rollingBack);
310
311 // Set the upgrade status to ROLLED_BACK.
312 Upgrade rolledBack = new Upgrade(
313 rollingBack.source(),
314 rollingBack.target(),
315 Upgrade.Status.ROLLED_BACK);
316 if (!state.compareAndSet(rollingBack, rolledBack)) {
317 throw new IllegalStateException("Concurrent upgrade modification");
318 } else {
319 currentState.set(rolledBack);
320 }
321 }
322 }
323
324 @Override
325 public void reset() {
326 Upgrade upgraded = currentState.get();
327
328 // If the current upgrade status is not INITIALIZED or ROLLED_BACK, throw an exception.
329 if (upgraded.status() != Upgrade.Status.INITIALIZED
330 && upgraded.status() != Upgrade.Status.ROLLED_BACK) {
331 throw new IllegalStateException("Upgrade not rolled back");
332 }
333
334 // Determine whether any nodes are still running the target version.
Jordan Halterman28183ee2017-10-17 17:29:10 -0700335 boolean rollbackComplete = membershipService.getGroups().size() == 1
336 && membershipService.getLocalGroup().version().equals(upgraded.source());
Jordan Halterman980a8c12017-09-22 18:01:19 -0700337
338 // If some nodes have not yet been downgraded, throw an exception.
339 if (!rollbackComplete) {
340 throw new IllegalStateException("Some nodes have not yet been downgraded to version " + upgraded.source());
341 }
342
343 // Set the upgrade status to RESETTING.
344 Upgrade resetting = new Upgrade(
345 upgraded.source(),
346 upgraded.target(),
347 Upgrade.Status.RESETTING);
348 if (!state.compareAndSet(upgraded, resetting)) {
349 throw new IllegalStateException("Concurrent upgrade modification");
350 } else {
351 currentState.set(resetting);
352
353 // Set the upgrade status to RESET.
354 Upgrade reset = new Upgrade(
355 resetting.source(),
356 resetting.target(),
357 Upgrade.Status.RESET);
358 if (!state.compareAndSet(resetting, reset)) {
359 throw new IllegalStateException("Concurrent upgrade modification");
360 } else {
361 currentState.set(reset);
362
363 // Set the upgrade status to INACTIVE.
364 Upgrade inactive = new Upgrade(
365 localVersion,
366 localVersion,
367 Upgrade.Status.INACTIVE);
368 if (!state.compareAndSet(reset, inactive)) {
369 throw new IllegalStateException("Concurrent upgrade modification");
370 } else {
371 currentState.set(inactive);
372 }
373 }
374 }
375 }
376
Jordan Halterman5ca07932017-10-07 13:28:22 -0700377 /**
378 * Handles a cluster event.
379 *
380 * @param event the cluster event
381 */
382 protected void handleClusterEvent(ClusterEvent event) {
383 // If an instance was deactivated, check whether we need to roll back the upgrade.
384 if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
385 Upgrade upgrade = state.get();
386 if (upgrade.status().upgraded()) {
387 // Get the upgraded subset of the cluster and check whether the down node is a member
388 // of the upgraded subset. If so, roll back the upgrade to tolerate the failure.
389 Set<NodeId> upgradedNodes = clusterService.getNodes().stream()
390 .map(ControllerNode::id)
391 .filter(id -> clusterService.getVersion(id).equals(upgrade.target()))
392 .collect(Collectors.toSet());
393 if (upgradedNodes.contains(event.subject().id())) {
394 rollback();
395 }
396 }
397 }
398 }
399
400 /**
401 * Handles an upgrade state event.
402 *
403 * @param event the upgrade value event
404 */
405 protected void handleUpgradeEvent(AtomicValueEvent<Upgrade> event) {
Jordan Halterman980a8c12017-09-22 18:01:19 -0700406 currentState.set(event.newValue());
407 switch (event.newValue().status()) {
408 case INITIALIZED:
409 post(new UpgradeEvent(UpgradeEvent.Type.INITIALIZED, event.newValue()));
410 break;
411 case UPGRADED:
412 post(new UpgradeEvent(UpgradeEvent.Type.UPGRADED, event.newValue()));
413 break;
414 case COMMITTED:
415 post(new UpgradeEvent(UpgradeEvent.Type.COMMITTED, event.newValue()));
416 break;
417 case ROLLED_BACK:
418 post(new UpgradeEvent(UpgradeEvent.Type.ROLLED_BACK, event.newValue()));
419 break;
420 case RESET:
421 post(new UpgradeEvent(UpgradeEvent.Type.RESET, event.newValue()));
422 break;
423 default:
424 break;
425 }
426 }
427}