[ONOS-7086] Detect failures in upgraded nodes and roll back upgrades

Change-Id: Ie804b5cd8cae0a1f0bbbe233dc34ae819c41673c
diff --git a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
index ce55eda..410b137 100644
--- a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
+++ b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
@@ -16,7 +16,9 @@
 package org.onosproject.upgrade.impl;
 
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -24,7 +26,10 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.UnifiedClusterService;
 import org.onosproject.core.Version;
 import org.onosproject.core.VersionService;
@@ -70,7 +75,8 @@
     private Version localVersion;
     private AtomicValue<Upgrade> state;
     private final AtomicReference<Upgrade> currentState = new AtomicReference<>();
-    private final AtomicValueEventListener<Upgrade> stateListener = event -> handleChange(event);
+    private final AtomicValueEventListener<Upgrade> stateListener = event -> handleUpgradeEvent(event);
+    private final ClusterEventListener clusterListener = event -> handleClusterEvent(event);
 
     @Activate
     public void activate() {
@@ -121,12 +127,14 @@
         }
 
         state.addListener(stateListener);
+        clusterService.addListener(clusterListener);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         state.removeListener(stateListener);
+        clusterService.removeListener(clusterListener);
         log.info("Stopped");
     }
 
@@ -372,7 +380,35 @@
         }
     }
 
-    private void handleChange(AtomicValueEvent<Upgrade> event) {
+    /**
+     * Handles a cluster event.
+     *
+     * @param event the cluster event
+     */
+    protected void handleClusterEvent(ClusterEvent event) {
+        // If an instance was deactivated, check whether we need to roll back the upgrade.
+        if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
+            Upgrade upgrade = state.get();
+            if (upgrade.status().upgraded()) {
+                // Get the upgraded subset of the cluster and check whether the down node is a member
+                // of the upgraded subset. If so, roll back the upgrade to tolerate the failure.
+                Set<NodeId> upgradedNodes = clusterService.getNodes().stream()
+                        .map(ControllerNode::id)
+                        .filter(id -> clusterService.getVersion(id).equals(upgrade.target()))
+                        .collect(Collectors.toSet());
+                if (upgradedNodes.contains(event.subject().id())) {
+                    rollback();
+                }
+            }
+        }
+    }
+
+    /**
+     * Handles an upgrade state event.
+     *
+     * @param event the upgrade value event
+     */
+    protected void handleUpgradeEvent(AtomicValueEvent<Upgrade> event) {
         currentState.set(event.newValue());
         switch (event.newValue().status()) {
             case INITIALIZED:
diff --git a/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
index 4137744..48337cd 100644
--- a/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
+++ b/core/net/src/test/java/org/onosproject/upgrade/impl/UpgradeManagerTest.java
@@ -23,6 +23,7 @@
 
 import org.junit.Test;
 import org.onlab.packet.IpAddress;
+import org.onosproject.cluster.ClusterEvent;
 import org.onosproject.cluster.UnifiedClusterServiceAdapter;
 import org.onosproject.cluster.ControllerNode;
 import org.onosproject.cluster.DefaultControllerNode;
@@ -74,6 +75,15 @@
             }
 
             @Override
+            public ControllerNode getNode(NodeId nodeId) {
+                return getNodes()
+                        .stream()
+                        .filter(node -> node.id().equals(nodeId))
+                        .findFirst()
+                        .orElse(null);
+            }
+
+            @Override
             public ControllerNode.State getState(NodeId nodeId) {
                 return ControllerNode.State.READY;
             }
@@ -237,4 +247,22 @@
         assertEquals(Upgrade.Status.INACTIVE, upgradeManager.getState().status());
     }
 
+    @Test
+    public void testCrashRollback() throws Exception {
+        UpgradeManager upgradeManager = createUpgradeManager(
+                Version.version("1.0.0"),
+                new Upgrade(Version.version("1.0.0"), Version.version("1.0.1"), Upgrade.Status.UPGRADED),
+                Arrays.asList(Version.version("1.0.0"), Version.version("1.0.0"), Version.version("1.0.1")));
+
+        assertFalse(upgradeManager.isLocalActive());
+
+        upgradeManager.handleClusterEvent(new ClusterEvent(
+                ClusterEvent.Type.INSTANCE_DEACTIVATED,
+                upgradeManager.clusterService.getNode(NodeId.nodeId("2"))));
+
+        assertEquals(Upgrade.Status.ROLLED_BACK, upgradeManager.getState().status());
+        assertTrue(upgradeManager.isLocalActive());
+        assertFalse(upgradeManager.isLocalUpgraded());
+    }
+
 }