[ONOS-7086] Detect failures in upgraded nodes and roll back upgrades

Change-Id: Ie804b5cd8cae0a1f0bbbe233dc34ae819c41673c
diff --git a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
index ce55eda..410b137 100644
--- a/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
+++ b/core/net/src/main/java/org/onosproject/upgrade/impl/UpgradeManager.java
@@ -16,7 +16,9 @@
 package org.onosproject.upgrade.impl;
 
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -24,7 +26,10 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
 import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.UnifiedClusterService;
 import org.onosproject.core.Version;
 import org.onosproject.core.VersionService;
@@ -70,7 +75,8 @@
     private Version localVersion;
     private AtomicValue<Upgrade> state;
     private final AtomicReference<Upgrade> currentState = new AtomicReference<>();
-    private final AtomicValueEventListener<Upgrade> stateListener = event -> handleChange(event);
+    private final AtomicValueEventListener<Upgrade> stateListener = event -> handleUpgradeEvent(event);
+    private final ClusterEventListener clusterListener = event -> handleClusterEvent(event);
 
     @Activate
     public void activate() {
@@ -121,12 +127,14 @@
         }
 
         state.addListener(stateListener);
+        clusterService.addListener(clusterListener);
         log.info("Started");
     }
 
     @Deactivate
     public void deactivate() {
         state.removeListener(stateListener);
+        clusterService.removeListener(clusterListener);
         log.info("Stopped");
     }
 
@@ -372,7 +380,35 @@
         }
     }
 
-    private void handleChange(AtomicValueEvent<Upgrade> event) {
+    /**
+     * Handles a cluster event.
+     *
+     * @param event the cluster event
+     */
+    protected void handleClusterEvent(ClusterEvent event) {
+        // If an instance was deactivated, check whether we need to roll back the upgrade.
+        if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
+            Upgrade upgrade = state.get();
+            if (upgrade.status().upgraded()) {
+                // Get the upgraded subset of the cluster and check whether the down node is a member
+                // of the upgraded subset. If so, roll back the upgrade to tolerate the failure.
+                Set<NodeId> upgradedNodes = clusterService.getNodes().stream()
+                        .map(ControllerNode::id)
+                        .filter(id -> clusterService.getVersion(id).equals(upgrade.target()))
+                        .collect(Collectors.toSet());
+                if (upgradedNodes.contains(event.subject().id())) {
+                    rollback();
+                }
+            }
+        }
+    }
+
+    /**
+     * Handles an upgrade state event.
+     *
+     * @param event the upgrade value event
+     */
+    protected void handleUpgradeEvent(AtomicValueEvent<Upgrade> event) {
         currentState.set(event.newValue());
         switch (event.newValue().status()) {
             case INITIALIZED: