[CORD-2534] McastHandler does not handle device down

Rationale: McastHandler handles only link down events
while just removes flows, groups, and internal state
related to the device going down. SegmentRoutingManager
skips link down if they are related to a device going
down.

Change-Id: I96c9c00c86d2cae9c2a8162710c61ef72c115eb0
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java
index b0be4e3..9aca787 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/McastHandler.java
@@ -25,6 +25,7 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.config.basics.McastConfig;
@@ -56,6 +57,7 @@
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -303,6 +305,10 @@
      */
     protected void processLinkDown(Link affectedLink) {
         getAffectedGroups(affectedLink).forEach(mcastIp -> {
+            // TODO Optimize when the group editing is in place
+            log.debug("Processing link down {} for group {}",
+                      affectedLink, mcastIp);
+
             // Find out the ingress, transit and egress device of affected group
             DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
                     .stream().findAny().orElse(null);
@@ -340,15 +346,7 @@
             egressDevices.forEach(egressDevice -> {
                 Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
                 if (mcastPath.isPresent()) {
-                    List<Link> links = mcastPath.get().links();
-                    links.forEach(link -> {
-                        addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
-                                assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
-                        addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null), mcastIp);
-                    });
-                    // Setup new transit mcast role
-                    mcastRoleStore.put(new McastStoreKey(mcastIp,
-                            links.get(0).dst().deviceId()), McastRole.TRANSIT);
+                    installPath(mcastIp, source, mcastPath.get());
                 } else {
                     log.warn("Fail to recover egress device {} from link failure {}",
                             egressDevice, affectedLink);
@@ -359,6 +357,111 @@
     }
 
     /**
+     * Process the DEVICE_DOWN event.
+     *
+     * @param deviceDown device going down
+     */
+    protected void processDeviceDown(DeviceId deviceDown) {
+        // Get the mcast groups affected by the device going down
+        getAffectedGroups(deviceDown).forEach(mcastIp -> {
+            // TODO Optimize when the group editing is in place
+            log.debug("Processing device down {} for group {}",
+                      deviceDown, mcastIp);
+
+            // Find out the ingress, transit and egress device of affected group
+            DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
+                    .stream().findAny().orElse(null);
+            DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
+                    .stream().findAny().orElse(null);
+            Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
+            ConnectPoint source = getSource(mcastIp);
+
+            // Do not proceed if ingress device or source of this group are missing
+            // If sinks are in other leafs, we have ingress, transit, egress, and source
+            // If sinks are in the same leaf, we have just ingress and source
+            if (ingressDevice == null || source == null) {
+                log.warn("Missing ingress {} or source {} for group {}",
+                         ingressDevice, source, mcastIp);
+                return;
+            }
+
+            // Continue only when we have the mastership on the operation
+            if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+                // When the source is available we just check the mastership
+                if (srManager.deviceService.isAvailable(source.deviceId())) {
+                    log.info("Skip {} due to lack of mastership of the source device {}",
+                             mcastIp, source.deviceId());
+                    return;
+                }
+                // Fallback with Leadership service
+                // source id is used a topic
+                NodeId leader = srManager.leadershipService.runForLeadership(
+                        source.deviceId().toString()).leaderNodeId();
+                // Verify if this node is the leader
+                if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
+                    log.info("Skip {} due to lack of leadership on the topic {}",
+                             mcastIp, source.deviceId());
+                    return;
+                }
+            }
+
+            // If it exists, we have to remove it in any case
+            if (transitDevice != null) {
+                // Remove entire transit
+                removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null));
+            }
+            // If the ingress is down
+            if (ingressDevice.equals(deviceDown)) {
+                // Remove entire ingress
+                removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
+                // If other sinks different from the ingress exist
+                if (!egressDevices.isEmpty()) {
+                    // Remove all the remaining egress
+                    egressDevices.forEach(
+                            egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
+                    );
+                }
+            } else {
+                // Egress or transit could be down at this point
+                // Get the ingress-transit port if it exists
+                PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
+                if (ingressTransitPort != null) {
+                    // Remove transit-facing port on ingress device
+                    removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan(source));
+                }
+                // One of the egress device is down
+                if (egressDevices.contains(deviceDown)) {
+                    // Remove entire device down
+                    removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
+                    // Remove the device down from egress
+                    egressDevices.remove(deviceDown);
+                    // If there are no more egress and ingress does not have sinks
+                    if (egressDevices.isEmpty() && !hasSinks(ingressDevice, mcastIp)) {
+                        // Remove entire ingress
+                        mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
+                        // We have done
+                        return;
+                    }
+                }
+                // Construct a new path for each egress device
+                egressDevices.forEach(egressDevice -> {
+                    Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
+                    // If there is a new path
+                    if (mcastPath.isPresent()) {
+                        // Let's install the new mcast path for this egress
+                        installPath(mcastIp, source, mcastPath.get());
+                    } else {
+                        // We were not able to find an alternative path for this egress
+                        log.warn("Fail to recover egress device {} from device down {}",
+                                 egressDevice, deviceDown);
+                        removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+                    }
+                });
+            }
+        });
+    }
+
+    /**
      * Adds filtering objective for given device and port.
      *
      * @param deviceId device ID
@@ -494,7 +597,6 @@
         return existingPorts.isEmpty();
     }
 
-
     /**
      * Removes entire group on given device.
      *
@@ -526,28 +628,20 @@
         mcastRoleStore.remove(mcastStoreKey);
     }
 
-    /**
-     * Remove all groups on given device.
-     *
-     * @param deviceId device ID
-     */
-    public void removeDevice(DeviceId deviceId) {
-        mcastNextObjStore.entrySet().stream()
-                .filter(entry -> entry.getKey().deviceId().equals(deviceId))
-                .forEach(entry -> {
-                    ConnectPoint source = getSource(entry.getKey().mcastIp());
-                    removeGroupFromDevice(entry.getKey().deviceId(), entry.getKey().mcastIp(),
-                            assignedVlan(source != null && deviceId.equals(source.deviceId()) ? source : null));
-                    mcastNextObjStore.remove(entry.getKey());
-                });
-        log.debug("{} is removed from mcastNextObjStore", deviceId);
-
-        mcastRoleStore.entrySet().stream()
-                .filter(entry -> entry.getKey().deviceId().equals(deviceId))
-                .forEach(entry -> {
-                    mcastRoleStore.remove(entry.getKey());
-                });
-        log.debug("{} is removed from mcastRoleStore", deviceId);
+    private void installPath(IpAddress mcastIp, ConnectPoint source, Path mcastPath) {
+        // Get Links
+        List<Link> links = mcastPath.links();
+        // For each link, modify the next on the source device adding the src port
+        // and a new filter objective on the destination port
+        links.forEach(link -> {
+            addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
+                            assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
+            addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan(null),
+                              mcastIp);
+        });
+        // Setup new transit mcast role
+        mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
+                           McastRole.TRANSIT);
     }
 
     /**
@@ -750,6 +844,19 @@
     }
 
     /**
+     * Gets groups which are affected by the device down event.
+     *
+     * @param deviceId device going down
+     * @return a set of multicast IpAddress
+     */
+    private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
+        return mcastNextObjStore.entrySet().stream()
+                .filter(entry -> entry.getKey().deviceId().equals(deviceId))
+                .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
+                .collect(Collectors.toSet());
+    }
+
+    /**
      * Gets egress VLAN from McastConfig.
      *
      * @return egress VLAN or VlanId.NONE if not configured
@@ -808,6 +915,40 @@
     }
 
     /**
+     * Verify if the given device has sinks
+     * for the multicast group.
+     *
+     * @param deviceId device Id
+     * @param mcastIp multicast IP
+     * @return true if the device has sink for the group.
+     * False otherwise.
+     */
+    private boolean hasSinks(DeviceId deviceId, IpAddress mcastIp) {
+        if (deviceId != null) {
+            // Get the nextobjective
+            Versioned<NextObjective> versionedNextObj = mcastNextObjStore.get(
+                    new McastStoreKey(mcastIp, deviceId)
+            );
+            // If it exists
+            if (versionedNextObj != null) {
+                NextObjective nextObj = versionedNextObj.value();
+                // Retrieves all the output ports
+                Set<PortNumber> ports = getPorts(nextObj.next());
+                // Tries to find at least one port that is not spine-facing
+                for (PortNumber port : ports) {
+                    // Spine-facing port should have no subnet and no xconnect
+                    if (srManager.deviceConfiguration != null &&
+                            (!srManager.deviceConfiguration.getPortSubnets(deviceId, port).isEmpty() ||
+                            srManager.xConnectHandler.hasXConnect(new ConnectPoint(deviceId, port)))) {
+                        return true;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
      * Removes filtering objective for given device and port.
      *
      * @param deviceId device ID
diff --git a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index b346173..7bfa29b 100644
--- a/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -40,6 +40,8 @@
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.Event;
@@ -196,6 +198,12 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     public InterfaceService interfaceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    public ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    public LeadershipService leadershipService;
+
     @Property(name = "activeProbing", boolValue = true,
             label = "Enable active probing to discover dual-homed hosts.")
     boolean activeProbing = true;
@@ -1173,7 +1181,7 @@
         defaultRoutingHandler
             .populateRoutingRulesForLinkStatusChange(null, null, device.id());
         defaultRoutingHandler.purgeEcmpGraph(device.id());
-        mcastHandler.removeDevice(device.id());
+        mcastHandler.processDeviceDown(device.id());
         xConnectHandler.removeDevice(device.id());
     }