Jordan Halterman | 980a8c1 | 2017-09-22 18:01:19 -0700 | [diff] [blame^] | 1 | /* |
| 2 | * Copyright 2017-present Open Networking Foundation |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.upgrade.impl; |
| 17 | |
| 18 | import java.util.Objects; |
| 19 | import java.util.concurrent.atomic.AtomicReference; |
| 20 | |
| 21 | import org.apache.felix.scr.annotations.Activate; |
| 22 | import org.apache.felix.scr.annotations.Component; |
| 23 | import org.apache.felix.scr.annotations.Deactivate; |
| 24 | import org.apache.felix.scr.annotations.Reference; |
| 25 | import org.apache.felix.scr.annotations.ReferenceCardinality; |
| 26 | import org.apache.felix.scr.annotations.Service; |
| 27 | import org.onosproject.cluster.ControllerNode; |
| 28 | import org.onosproject.cluster.UnifiedClusterService; |
| 29 | import org.onosproject.core.Version; |
| 30 | import org.onosproject.core.VersionService; |
| 31 | import org.onosproject.event.AbstractListenerManager; |
| 32 | import org.onosproject.store.serializers.KryoNamespaces; |
| 33 | import org.onosproject.store.service.AtomicValue; |
| 34 | import org.onosproject.store.service.AtomicValueEvent; |
| 35 | import org.onosproject.store.service.AtomicValueEventListener; |
| 36 | import org.onosproject.store.service.CoordinationService; |
| 37 | import org.onosproject.store.service.Serializer; |
| 38 | import org.onosproject.upgrade.Upgrade; |
| 39 | import org.onosproject.upgrade.UpgradeAdminService; |
| 40 | import org.onosproject.upgrade.UpgradeEvent; |
| 41 | import org.onosproject.upgrade.UpgradeEventListener; |
| 42 | import org.onosproject.upgrade.UpgradeService; |
| 43 | import org.slf4j.Logger; |
| 44 | |
| 45 | import static org.slf4j.LoggerFactory.getLogger; |
| 46 | |
| 47 | /** |
| 48 | * Upgrade service implementation. |
| 49 | * <p> |
| 50 | * This implementation uses the {@link CoordinationService} to store upgrade state in a version-agnostic primitive. |
| 51 | * Upgrade state can be seen by current and future version nodes. |
| 52 | */ |
| 53 | @Component(immediate = true) |
| 54 | @Service |
| 55 | public class UpgradeManager |
| 56 | extends AbstractListenerManager<UpgradeEvent, UpgradeEventListener> |
| 57 | implements UpgradeService, UpgradeAdminService { |
| 58 | |
| 59 | private final Logger log = getLogger(getClass()); |
| 60 | |
| 61 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 62 | protected VersionService versionService; |
| 63 | |
| 64 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 65 | protected CoordinationService coordinationService; |
| 66 | |
| 67 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 68 | protected UnifiedClusterService clusterService; |
| 69 | |
| 70 | private Version localVersion; |
| 71 | private AtomicValue<Upgrade> state; |
| 72 | private final AtomicReference<Upgrade> currentState = new AtomicReference<>(); |
| 73 | private final AtomicValueEventListener<Upgrade> stateListener = event -> handleChange(event); |
| 74 | |
| 75 | @Activate |
| 76 | public void activate() { |
| 77 | state = coordinationService.<Upgrade>atomicValueBuilder() |
| 78 | .withName("onos-upgrade-state") |
| 79 | .withSerializer(Serializer.using(KryoNamespaces.API)) |
| 80 | .build() |
| 81 | .asAtomicValue(); |
| 82 | localVersion = versionService.version(); |
| 83 | |
| 84 | currentState.set(state.get()); |
| 85 | if (currentState.get() == null) { |
| 86 | currentState.set(new Upgrade(localVersion, localVersion, Upgrade.Status.INACTIVE)); |
| 87 | state.set(currentState.get()); |
| 88 | } |
| 89 | |
| 90 | Upgrade upgrade = currentState.get(); |
| 91 | |
| 92 | // If the upgrade state is not initialized, ensure this node matches the version of the cluster. |
| 93 | if (!upgrade.status().active() && !Objects.equals(upgrade.source(), localVersion)) { |
| 94 | log.error("Node version {} inconsistent with cluster version {}", localVersion, upgrade.source()); |
| 95 | throw new IllegalStateException("Node version " + localVersion + |
| 96 | " inconsistent with cluster version " + upgrade.source()); |
| 97 | } |
| 98 | |
| 99 | // If the upgrade state is initialized then check the node version. |
| 100 | if (upgrade.status() == Upgrade.Status.INITIALIZED) { |
| 101 | // If the source version equals the target version, attempt to update the target version. |
| 102 | if (Objects.equals(upgrade.source(), upgrade.target()) && !Objects.equals(upgrade.target(), localVersion)) { |
| 103 | upgrade = new Upgrade(upgrade.source(), localVersion, upgrade.status()); |
| 104 | currentState.set(upgrade); |
| 105 | state.set(upgrade); |
| 106 | } |
| 107 | } |
| 108 | |
| 109 | // If the upgrade status is active, verify that the local version matches the upgrade version. |
| 110 | if (upgrade.status().active() && !Objects.equals(upgrade.source(), upgrade.target())) { |
| 111 | // If the upgrade source/target are not equal, validate that the node's version is consistent |
| 112 | // with versions in the upgrade. There are two possibilities: that a not-yet-upgraded node is being |
| 113 | // restarted, or that a node has been upgraded, so we need to check that this node is running either |
| 114 | // the source or target version. |
| 115 | if (!Objects.equals(localVersion, upgrade.source()) && !Objects.equals(localVersion, upgrade.target())) { |
| 116 | log.error("Cannot upgrade node to version {}; Upgrade to {} already in progress", |
| 117 | localVersion, upgrade.target()); |
| 118 | throw new IllegalStateException("Cannot upgrade node to version " + localVersion + "; Upgrade to " + |
| 119 | upgrade.target() + " already in progress"); |
| 120 | } |
| 121 | } |
| 122 | |
| 123 | state.addListener(stateListener); |
| 124 | log.info("Started"); |
| 125 | } |
| 126 | |
| 127 | @Deactivate |
| 128 | public void deactivate() { |
| 129 | state.removeListener(stateListener); |
| 130 | log.info("Stopped"); |
| 131 | } |
| 132 | |
| 133 | @Override |
| 134 | public boolean isUpgrading() { |
| 135 | return getState().status().active(); |
| 136 | } |
| 137 | |
| 138 | @Override |
| 139 | public Upgrade getState() { |
| 140 | return currentState.get(); |
| 141 | } |
| 142 | |
| 143 | @Override |
| 144 | public Version getVersion() { |
| 145 | Upgrade upgrade = currentState.get(); |
| 146 | return upgrade.status().upgraded() |
| 147 | ? upgrade.target() |
| 148 | : upgrade.source(); |
| 149 | } |
| 150 | |
| 151 | @Override |
| 152 | public boolean isLocalActive() { |
| 153 | return localVersion.equals(getVersion()); |
| 154 | } |
| 155 | |
| 156 | @Override |
| 157 | public boolean isLocalUpgraded() { |
| 158 | Upgrade upgrade = currentState.get(); |
| 159 | return upgrade.status().active() |
| 160 | && !upgrade.source().equals(upgrade.target()) |
| 161 | && localVersion.equals(upgrade.target()); |
| 162 | } |
| 163 | |
| 164 | @Override |
| 165 | public void initialize() { |
| 166 | Upgrade inactive = currentState.get(); |
| 167 | |
| 168 | // If the current upgrade status is active, fail initialization. |
| 169 | if (inactive.status().active()) { |
| 170 | throw new IllegalStateException("Upgrade already active"); |
| 171 | } |
| 172 | |
| 173 | // Set the upgrade status to INITIALIZING. |
| 174 | Upgrade initializing = new Upgrade( |
| 175 | localVersion, |
| 176 | localVersion, |
| 177 | Upgrade.Status.INITIALIZING); |
| 178 | if (!state.compareAndSet(inactive, initializing)) { |
| 179 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 180 | } else { |
| 181 | currentState.set(initializing); |
| 182 | |
| 183 | // Set the upgrade status to INITIALIZED. |
| 184 | Upgrade initialized = new Upgrade( |
| 185 | initializing.source(), |
| 186 | initializing.target(), |
| 187 | Upgrade.Status.INITIALIZED); |
| 188 | if (!state.compareAndSet(initializing, initialized)) { |
| 189 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 190 | } else { |
| 191 | currentState.set(initialized); |
| 192 | } |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | @Override |
| 197 | public void upgrade() { |
| 198 | Upgrade initialized = currentState.get(); |
| 199 | |
| 200 | // If the current upgrade status is not INITIALIZED, throw an exception. |
| 201 | if (initialized.status() != Upgrade.Status.INITIALIZED) { |
| 202 | throw new IllegalStateException("Upgrade not initialized"); |
| 203 | } |
| 204 | |
| 205 | // Set the upgrade status to UPGRADING. |
| 206 | Upgrade upgrading = new Upgrade( |
| 207 | initialized.source(), |
| 208 | initialized.target(), |
| 209 | Upgrade.Status.UPGRADING); |
| 210 | if (!state.compareAndSet(initialized, upgrading)) { |
| 211 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 212 | } else { |
| 213 | currentState.set(upgrading); |
| 214 | |
| 215 | // Set the upgrade status to UPGRADED. |
| 216 | Upgrade upgraded = new Upgrade( |
| 217 | upgrading.source(), |
| 218 | upgrading.target(), |
| 219 | Upgrade.Status.UPGRADED); |
| 220 | if (!state.compareAndSet(upgrading, upgraded)) { |
| 221 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 222 | } else { |
| 223 | currentState.set(upgraded); |
| 224 | } |
| 225 | } |
| 226 | } |
| 227 | |
| 228 | @Override |
| 229 | public void commit() { |
| 230 | Upgrade upgraded = currentState.get(); |
| 231 | |
| 232 | // If the current upgrade status is not UPGRADED, throw an exception. |
| 233 | if (upgraded.status() != Upgrade.Status.UPGRADED) { |
| 234 | throw new IllegalStateException("Upgrade not performed"); |
| 235 | } |
| 236 | |
| 237 | // Determine whether any nodes have not been upgraded to the target version. |
| 238 | boolean upgradeComplete = clusterService.getNodes() |
| 239 | .stream() |
| 240 | .allMatch(node -> { |
| 241 | ControllerNode.State state = clusterService.getState(node.id()); |
| 242 | Version version = clusterService.getVersion(node.id()); |
| 243 | return state.isActive() && version != null && version.equals(upgraded.target()); |
| 244 | }); |
| 245 | |
| 246 | // If some nodes have not yet been upgraded, throw an exception. |
| 247 | if (!upgradeComplete) { |
| 248 | throw new IllegalStateException("Some nodes have not yet been upgraded to version " + upgraded.target()); |
| 249 | } |
| 250 | |
| 251 | // Set the upgrade status to COMMITTING. |
| 252 | Upgrade committing = new Upgrade( |
| 253 | upgraded.source(), |
| 254 | upgraded.target(), |
| 255 | Upgrade.Status.COMMITTING); |
| 256 | if (!state.compareAndSet(upgraded, committing)) { |
| 257 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 258 | } else { |
| 259 | currentState.set(committing); |
| 260 | |
| 261 | // Set the upgrade status to COMMITTED. |
| 262 | Upgrade committed = new Upgrade( |
| 263 | committing.source(), |
| 264 | committing.target(), |
| 265 | Upgrade.Status.COMMITTED); |
| 266 | if (!state.compareAndSet(committing, committed)) { |
| 267 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 268 | } else { |
| 269 | currentState.set(committed); |
| 270 | |
| 271 | // Set the upgrade status to INACTIVE. |
| 272 | Upgrade inactive = new Upgrade( |
| 273 | localVersion, |
| 274 | localVersion, |
| 275 | Upgrade.Status.INACTIVE); |
| 276 | if (!state.compareAndSet(committed, inactive)) { |
| 277 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 278 | } else { |
| 279 | currentState.set(inactive); |
| 280 | } |
| 281 | } |
| 282 | } |
| 283 | } |
| 284 | |
| 285 | @Override |
| 286 | public void rollback() { |
| 287 | Upgrade upgraded = currentState.get(); |
| 288 | |
| 289 | // If the current upgrade status is not UPGRADED, throw an exception. |
| 290 | if (upgraded.status() != Upgrade.Status.UPGRADED) { |
| 291 | throw new IllegalStateException("Upgrade not performed"); |
| 292 | } |
| 293 | |
| 294 | // Set the upgrade status to ROLLING_BACK. |
| 295 | Upgrade rollingBack = new Upgrade( |
| 296 | upgraded.source(), |
| 297 | upgraded.target(), |
| 298 | Upgrade.Status.ROLLING_BACK); |
| 299 | if (!state.compareAndSet(upgraded, rollingBack)) { |
| 300 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 301 | } else { |
| 302 | currentState.set(rollingBack); |
| 303 | |
| 304 | // Set the upgrade status to ROLLED_BACK. |
| 305 | Upgrade rolledBack = new Upgrade( |
| 306 | rollingBack.source(), |
| 307 | rollingBack.target(), |
| 308 | Upgrade.Status.ROLLED_BACK); |
| 309 | if (!state.compareAndSet(rollingBack, rolledBack)) { |
| 310 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 311 | } else { |
| 312 | currentState.set(rolledBack); |
| 313 | } |
| 314 | } |
| 315 | } |
| 316 | |
| 317 | @Override |
| 318 | public void reset() { |
| 319 | Upgrade upgraded = currentState.get(); |
| 320 | |
| 321 | // If the current upgrade status is not INITIALIZED or ROLLED_BACK, throw an exception. |
| 322 | if (upgraded.status() != Upgrade.Status.INITIALIZED |
| 323 | && upgraded.status() != Upgrade.Status.ROLLED_BACK) { |
| 324 | throw new IllegalStateException("Upgrade not rolled back"); |
| 325 | } |
| 326 | |
| 327 | // Determine whether any nodes are still running the target version. |
| 328 | boolean rollbackComplete = clusterService.getNodes() |
| 329 | .stream() |
| 330 | .allMatch(node -> { |
| 331 | ControllerNode.State state = clusterService.getState(node.id()); |
| 332 | Version version = clusterService.getVersion(node.id()); |
| 333 | return state.isActive() && version != null && version.equals(upgraded.source()); |
| 334 | }); |
| 335 | |
| 336 | // If some nodes have not yet been downgraded, throw an exception. |
| 337 | if (!rollbackComplete) { |
| 338 | throw new IllegalStateException("Some nodes have not yet been downgraded to version " + upgraded.source()); |
| 339 | } |
| 340 | |
| 341 | // Set the upgrade status to RESETTING. |
| 342 | Upgrade resetting = new Upgrade( |
| 343 | upgraded.source(), |
| 344 | upgraded.target(), |
| 345 | Upgrade.Status.RESETTING); |
| 346 | if (!state.compareAndSet(upgraded, resetting)) { |
| 347 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 348 | } else { |
| 349 | currentState.set(resetting); |
| 350 | |
| 351 | // Set the upgrade status to RESET. |
| 352 | Upgrade reset = new Upgrade( |
| 353 | resetting.source(), |
| 354 | resetting.target(), |
| 355 | Upgrade.Status.RESET); |
| 356 | if (!state.compareAndSet(resetting, reset)) { |
| 357 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 358 | } else { |
| 359 | currentState.set(reset); |
| 360 | |
| 361 | // Set the upgrade status to INACTIVE. |
| 362 | Upgrade inactive = new Upgrade( |
| 363 | localVersion, |
| 364 | localVersion, |
| 365 | Upgrade.Status.INACTIVE); |
| 366 | if (!state.compareAndSet(reset, inactive)) { |
| 367 | throw new IllegalStateException("Concurrent upgrade modification"); |
| 368 | } else { |
| 369 | currentState.set(inactive); |
| 370 | } |
| 371 | } |
| 372 | } |
| 373 | } |
| 374 | |
| 375 | private void handleChange(AtomicValueEvent<Upgrade> event) { |
| 376 | currentState.set(event.newValue()); |
| 377 | switch (event.newValue().status()) { |
| 378 | case INITIALIZED: |
| 379 | post(new UpgradeEvent(UpgradeEvent.Type.INITIALIZED, event.newValue())); |
| 380 | break; |
| 381 | case UPGRADED: |
| 382 | post(new UpgradeEvent(UpgradeEvent.Type.UPGRADED, event.newValue())); |
| 383 | break; |
| 384 | case COMMITTED: |
| 385 | post(new UpgradeEvent(UpgradeEvent.Type.COMMITTED, event.newValue())); |
| 386 | break; |
| 387 | case ROLLED_BACK: |
| 388 | post(new UpgradeEvent(UpgradeEvent.Type.ROLLED_BACK, event.newValue())); |
| 389 | break; |
| 390 | case RESET: |
| 391 | post(new UpgradeEvent(UpgradeEvent.Type.RESET, event.newValue())); |
| 392 | break; |
| 393 | default: |
| 394 | break; |
| 395 | } |
| 396 | } |
| 397 | } |