| /* |
| * Copyright 2017-present Open Networking Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.onosproject.upgrade.impl; |
| |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| |
| import org.apache.felix.scr.annotations.Activate; |
| import org.apache.felix.scr.annotations.Component; |
| import org.apache.felix.scr.annotations.Deactivate; |
| import org.apache.felix.scr.annotations.Reference; |
| import org.apache.felix.scr.annotations.ReferenceCardinality; |
| import org.apache.felix.scr.annotations.Service; |
| import org.onosproject.cluster.ClusterEvent; |
| import org.onosproject.cluster.ClusterEventListener; |
| import org.onosproject.cluster.ClusterService; |
| import org.onosproject.cluster.ControllerNode; |
| import org.onosproject.cluster.MembershipService; |
| import org.onosproject.cluster.NodeId; |
| import org.onosproject.core.Version; |
| import org.onosproject.core.VersionService; |
| import org.onosproject.event.AbstractListenerManager; |
| import org.onosproject.store.serializers.KryoNamespaces; |
| import org.onosproject.store.service.AtomicValue; |
| import org.onosproject.store.service.AtomicValueEvent; |
| import org.onosproject.store.service.AtomicValueEventListener; |
| import org.onosproject.store.service.CoordinationService; |
| import org.onosproject.store.service.Serializer; |
| import org.onosproject.upgrade.Upgrade; |
| import org.onosproject.upgrade.UpgradeAdminService; |
| import org.onosproject.upgrade.UpgradeEvent; |
| import org.onosproject.upgrade.UpgradeEventListener; |
| import org.onosproject.upgrade.UpgradeService; |
| import org.slf4j.Logger; |
| |
| import static org.onosproject.security.AppGuard.checkPermission; |
| import static org.onosproject.security.AppPermission.Type.CLUSTER_EVENT; |
| import static org.onosproject.security.AppPermission.Type.UPGRADE_EVENT; |
| import static org.onosproject.security.AppPermission.Type.UPGRADE_READ; |
| import static org.onosproject.security.AppPermission.Type.UPGRADE_WRITE; |
| import static org.slf4j.LoggerFactory.getLogger; |
| |
| /** |
| * Upgrade service implementation. |
| * <p> |
| * This implementation uses the {@link CoordinationService} to store upgrade state in a version-agnostic primitive. |
| * Upgrade state can be seen by current and future version nodes. |
| */ |
| @Component(immediate = true) |
| @Service |
| public class UpgradeManager |
| extends AbstractListenerManager<UpgradeEvent, UpgradeEventListener> |
| implements UpgradeService, UpgradeAdminService { |
| |
| private final Logger log = getLogger(getClass()); |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected VersionService versionService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected CoordinationService coordinationService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected ClusterService clusterService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| protected MembershipService membershipService; |
| |
| private Version localVersion; |
| private AtomicValue<Upgrade> state; |
| private final AtomicReference<Upgrade> currentState = new AtomicReference<>(); |
| private final AtomicValueEventListener<Upgrade> stateListener = this::handleUpgradeEvent; |
| private final ClusterEventListener clusterListener = this::handleClusterEvent; |
| |
| @Activate |
| public void activate() { |
| eventDispatcher.addSink(UpgradeEvent.class, listenerRegistry); |
| |
| state = coordinationService.<Upgrade>atomicValueBuilder() |
| .withName("onos-upgrade-state") |
| .withSerializer(Serializer.using(KryoNamespaces.API)) |
| .build() |
| .asAtomicValue(); |
| localVersion = versionService.version(); |
| |
| currentState.set(state.get()); |
| if (getState() == null) { |
| initializeState(new Upgrade(localVersion, localVersion, Upgrade.Status.INACTIVE)); |
| } |
| |
| Upgrade upgrade = getState(); |
| |
| // If the upgrade state is initialized then check the node version. |
| if (upgrade.status() == Upgrade.Status.INITIALIZED) { |
| // If the source version equals the target version, attempt to update the target version. |
| if (Objects.equals(upgrade.source(), upgrade.target()) && !Objects.equals(upgrade.target(), localVersion)) { |
| checkPermission(UPGRADE_WRITE); |
| upgrade = new Upgrade(upgrade.source(), localVersion, upgrade.status()); |
| initializeState(upgrade); |
| } |
| } |
| |
| // If the upgrade status is active, verify that the local version matches the upgrade version. |
| if (upgrade.status().active() && !Objects.equals(upgrade.source(), upgrade.target())) { |
| // If the upgrade source/target are not equal, validate that the node's version is consistent |
| // with versions in the upgrade. There are two possibilities: that a not-yet-upgraded node is being |
| // restarted, or that a node has been upgraded, so we need to check that this node is running either |
| // the source or target version. |
| if (!Objects.equals(localVersion, upgrade.source()) && !Objects.equals(localVersion, upgrade.target())) { |
| log.error("Cannot upgrade node to version {}; Upgrade to {} already in progress", |
| localVersion, upgrade.target()); |
| throw new IllegalStateException("Cannot upgrade node to version " + localVersion + "; Upgrade to " + |
| upgrade.target() + " already in progress"); |
| } |
| } |
| |
| state.addListener(stateListener); |
| clusterService.addListener(clusterListener); |
| log.info("Started"); |
| } |
| |
| @Deactivate |
| public void deactivate() { |
| eventDispatcher.removeSink(UpgradeEvent.class); |
| state.removeListener(stateListener); |
| clusterService.removeListener(clusterListener); |
| log.info("Stopped"); |
| } |
| |
| /** |
| * Initializes the state when the cluster starts. |
| * <p> |
| * This method must be called when updating the state in order to check the permissions |
| * |
| * @param newState new state |
| */ |
| private void initializeState(Upgrade newState) { |
| checkPermission(UPGRADE_WRITE); |
| currentState.set(newState); |
| state.set(newState); |
| } |
| |
| /** |
| * Changes the current state to new one. |
| * <p> |
| * This method must be called when changing between states in order to check the permissions and |
| * to avoid concurrent state modifications |
| * |
| * @param oldState current upgrade state |
| * @param newState new upgrade state |
| * |
| * @throws IllegalStateException if an upgrade is already in progress |
| */ |
| private void changeState(Upgrade oldState, Upgrade newState) { |
| checkPermission(UPGRADE_WRITE); |
| if (!state.compareAndSet(oldState, newState)) { |
| throw new IllegalStateException("Concurrent upgrade modification"); |
| } else { |
| currentState.set(newState); |
| } |
| } |
| |
| @Override |
| public Upgrade getState() { |
| checkPermission(UPGRADE_READ); |
| return currentState.get(); |
| } |
| |
| @Override |
| public boolean isUpgrading() { |
| return getState().status().active(); |
| } |
| |
| @Override |
| public Version getVersion() { |
| Upgrade upgrade = getState(); |
| return upgrade.status().upgraded() |
| ? upgrade.target() |
| : upgrade.source(); |
| } |
| |
| @Override |
| public boolean isLocalActive() { |
| return localVersion.equals(getVersion()); |
| } |
| |
| @Override |
| public boolean isLocalUpgraded() { |
| Upgrade upgrade = getState(); |
| return upgrade.status().active() |
| && !upgrade.source().equals(upgrade.target()) |
| && localVersion.equals(upgrade.target()); |
| } |
| |
| @Override |
| public void initialize() { |
| Upgrade inactive = getState(); |
| |
| // If the current upgrade status is active, fail initialization. |
| if (inactive.status().active()) { |
| throw new IllegalStateException("Upgrade already active"); |
| } |
| |
| // Set the upgrade status to INITIALIZING. |
| Upgrade initializing = new Upgrade( |
| localVersion, |
| localVersion, |
| Upgrade.Status.INITIALIZING); |
| changeState(inactive, initializing); |
| |
| // Set the upgrade status to INITIALIZED. |
| Upgrade initialized = new Upgrade( |
| initializing.source(), |
| initializing.target(), |
| Upgrade.Status.INITIALIZED); |
| changeState(initializing, initialized); |
| } |
| |
| @Override |
| public void upgrade() { |
| Upgrade initialized = getState(); |
| |
| // If the current upgrade status is not INITIALIZED, throw an exception. |
| if (initialized.status() != Upgrade.Status.INITIALIZED) { |
| throw new IllegalStateException("Upgrade not initialized"); |
| } |
| |
| // Set the upgrade status to UPGRADING. |
| Upgrade upgrading = new Upgrade( |
| initialized.source(), |
| initialized.target(), |
| Upgrade.Status.UPGRADING); |
| changeState(initialized, upgrading); |
| |
| // Set the upgrade status to UPGRADED. |
| Upgrade upgraded = new Upgrade( |
| upgrading.source(), |
| upgrading.target(), |
| Upgrade.Status.UPGRADED); |
| changeState(upgrading, upgraded); |
| } |
| |
| @Override |
| public void commit() { |
| Upgrade upgraded = getState(); |
| |
| // If the current upgrade status is not UPGRADED, throw an exception. |
| if (upgraded.status() != Upgrade.Status.UPGRADED) { |
| throw new IllegalStateException("Upgrade not performed"); |
| } |
| |
| // Determine whether any nodes have not been upgraded to the target version. |
| boolean upgradeComplete = membershipService.getGroups().size() == 1 |
| && membershipService.getLocalGroup().version().equals(upgraded.target()); |
| |
| // If some nodes have not yet been upgraded, throw an exception. |
| if (!upgradeComplete) { |
| throw new IllegalStateException("Some nodes have not yet been upgraded to version " + upgraded.target()); |
| } |
| |
| // Set the upgrade status to COMMITTING. |
| Upgrade committing = new Upgrade( |
| upgraded.source(), |
| upgraded.target(), |
| Upgrade.Status.COMMITTING); |
| changeState(upgraded, committing); |
| |
| // Set the upgrade status to COMMITTED. |
| Upgrade committed = new Upgrade( |
| committing.source(), |
| committing.target(), |
| Upgrade.Status.COMMITTED); |
| changeState(committing, committed); |
| |
| // Set the upgrade status to INACTIVE. |
| Upgrade inactive = new Upgrade( |
| localVersion, |
| localVersion, |
| Upgrade.Status.INACTIVE); |
| changeState(committed, inactive); |
| } |
| |
| @Override |
| public void rollback() { |
| Upgrade upgraded = getState(); |
| |
| // If the current upgrade status is not UPGRADED, throw an exception. |
| if (upgraded.status() != Upgrade.Status.UPGRADED) { |
| throw new IllegalStateException("Upgrade not performed"); |
| } |
| |
| // Set the upgrade status to ROLLING_BACK. |
| Upgrade rollingBack = new Upgrade( |
| upgraded.source(), |
| upgraded.target(), |
| Upgrade.Status.ROLLING_BACK); |
| changeState(upgraded, rollingBack); |
| |
| // Set the upgrade status to ROLLED_BACK. |
| Upgrade rolledBack = new Upgrade( |
| rollingBack.source(), |
| rollingBack.target(), |
| Upgrade.Status.ROLLED_BACK); |
| changeState(rollingBack, rolledBack); |
| } |
| |
| @Override |
| public void reset() { |
| Upgrade upgraded = getState(); |
| |
| // If the current upgrade status is not INITIALIZED or ROLLED_BACK, throw an exception. |
| if (upgraded.status() != Upgrade.Status.INITIALIZED |
| && upgraded.status() != Upgrade.Status.ROLLED_BACK) { |
| throw new IllegalStateException("Upgrade not rolled back"); |
| } |
| |
| // Determine whether any nodes are still running the target version. |
| boolean rollbackComplete = membershipService.getGroups().size() == 1 |
| && membershipService.getLocalGroup().version().equals(upgraded.source()); |
| |
| // If some nodes have not yet been downgraded, throw an exception. |
| if (!rollbackComplete) { |
| throw new IllegalStateException("Some nodes have not yet been downgraded to version " + upgraded.source()); |
| } |
| |
| // Set the upgrade status to RESETTING. |
| Upgrade resetting = new Upgrade( |
| upgraded.source(), |
| upgraded.target(), |
| Upgrade.Status.RESETTING); |
| changeState(upgraded, resetting); |
| |
| // Set the upgrade status to RESET. |
| Upgrade reset = new Upgrade( |
| resetting.source(), |
| resetting.target(), |
| Upgrade.Status.RESET); |
| changeState(resetting, reset); |
| |
| // Set the upgrade status to INACTIVE. |
| Upgrade inactive = new Upgrade( |
| localVersion, |
| localVersion, |
| Upgrade.Status.INACTIVE); |
| changeState(reset, inactive); |
| } |
| |
| /** |
| * Handles a cluster event. |
| * |
| * @param event the cluster event |
| */ |
| protected void handleClusterEvent(ClusterEvent event) { |
| checkPermission(CLUSTER_EVENT); |
| // If an instance was deactivated, check whether we need to roll back the upgrade. |
| if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) { |
| Upgrade upgrade = getState(); |
| if (upgrade.status().upgraded()) { |
| // Get the upgraded subset of the cluster and check whether the down node is a member |
| // of the upgraded subset. If so, roll back the upgrade to tolerate the failure. |
| Set<NodeId> upgradedNodes = clusterService.getNodes().stream() |
| .map(ControllerNode::id) |
| .filter(id -> clusterService.getVersion(id).equals(upgrade.target())) |
| .collect(Collectors.toSet()); |
| if (upgradedNodes.contains(event.subject().id())) { |
| log.warn("Upgrade failure detected: rolling back upgrade"); |
| rollback(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Handles an upgrade state event. |
| * |
| * @param event the upgrade value event |
| */ |
| protected void handleUpgradeEvent(AtomicValueEvent<Upgrade> event) { |
| checkPermission(UPGRADE_EVENT); |
| currentState.set(event.newValue()); |
| switch (event.newValue().status()) { |
| case INITIALIZED: |
| post(new UpgradeEvent(UpgradeEvent.Type.INITIALIZED, event.newValue())); |
| break; |
| case UPGRADED: |
| post(new UpgradeEvent(UpgradeEvent.Type.UPGRADED, event.newValue())); |
| break; |
| case COMMITTED: |
| post(new UpgradeEvent(UpgradeEvent.Type.COMMITTED, event.newValue())); |
| break; |
| case ROLLED_BACK: |
| post(new UpgradeEvent(UpgradeEvent.Type.ROLLED_BACK, event.newValue())); |
| break; |
| case RESET: |
| post(new UpgradeEvent(UpgradeEvent.Type.RESET, event.newValue())); |
| break; |
| default: |
| break; |
| } |
| } |
| } |