CORD-348 Fabric multicast support - error handling

Automatically failover to backup spine if
- ingress - transit link down
- transit - egress link down
- transit device down

Can recover from fatal error with human involved
- ingress switch down
- egress switch down
- all links to spine down

Scan through McastRouteStore when
- SR activate
- link up

Also include following features
- Use flow objective context in McastHandler
- Update Mcast VLAN config sample

Change-Id: I75007d9efd7646e7c4e57fa6d3fc6943543153cf
diff --git a/src/main/java/org/onosproject/segmentrouting/McastHandler.java b/src/main/java/org/onosproject/segmentrouting/McastHandler.java
index 74db457..8139e27 100644
--- a/src/main/java/org/onosproject/segmentrouting/McastHandler.java
+++ b/src/main/java/org/onosproject/segmentrouting/McastHandler.java
@@ -42,25 +42,33 @@
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
 import org.onosproject.net.flowobjective.DefaultForwardingObjective;
 import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.DefaultObjectiveContext;
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.mcast.McastEvent;
 import org.onosproject.net.mcast.McastRouteInfo;
 import org.onosproject.net.topology.TopologyService;
-import org.onosproject.segmentrouting.storekey.McastNextObjectiveStoreKey;
+import org.onosproject.segmentrouting.storekey.McastStoreKey;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  * Handles multicast-related events.
@@ -71,8 +79,27 @@
     private final ApplicationId coreAppId;
     private StorageService storageService;
     private TopologyService topologyService;
-    private final KryoNamespace.Builder kryoBuilder;
-    private final ConsistentMap<McastNextObjectiveStoreKey, NextObjective> mcastNextObjStore;
+    private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
+    private final KryoNamespace.Builder mcastKryo;
+    private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
+
+    /**
+     * Role in the multicast tree.
+     */
+    public enum McastRole {
+        /**
+         * The device is the ingress device of this group.
+         */
+        INGRESS,
+        /**
+         * The device is the transit device of this group.
+         */
+        TRANSIT,
+        /**
+         * The device is the egress device of this group.
+         */
+        EGRESS
+    }
 
     /**
      * Constructs the McastEventHandler.
@@ -81,19 +108,36 @@
      */
     public McastHandler(SegmentRoutingManager srManager) {
         coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
-
         this.srManager = srManager;
         this.storageService = srManager.storageService;
         this.topologyService = srManager.topologyService;
-
-        kryoBuilder = new KryoNamespace.Builder()
+        mcastKryo = new KryoNamespace.Builder()
                 .register(KryoNamespaces.API)
-                .register(McastNextObjectiveStoreKey.class);
+                .register(McastStoreKey.class)
+                .register(McastRole.class);
         mcastNextObjStore = storageService
-                .<McastNextObjectiveStoreKey, NextObjective>consistentMapBuilder()
+                .<McastStoreKey, NextObjective>consistentMapBuilder()
                 .withName("onos-mcast-nextobj-store")
-                .withSerializer(Serializer.using(kryoBuilder.build()))
+                .withSerializer(Serializer.using(mcastKryo.build()))
                 .build();
+        mcastRoleStore = storageService
+                .<McastStoreKey, McastRole>consistentMapBuilder()
+                .withName("onos-mcast-role-store")
+                .withSerializer(Serializer.using(mcastKryo.build()))
+                .build();
+    }
+
+    /**
+     * Read initial multicast from mcast store.
+     */
+    public void init() {
+        srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+            ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
+            Set<ConnectPoint> sinks = srManager.multicastRouteService.fetchSinks(mcastRoute);
+            sinks.forEach(sink -> {
+                processSinkAddedInternal(source, sink, mcastRoute.group());
+            });
+        });
     }
 
     /**
@@ -166,6 +210,9 @@
 
         // Process the egress device
         boolean isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan);
+        if (isLast) {
+            mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
+        }
 
         // If this is the last sink on the device, also update upstream
         Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
@@ -176,6 +223,7 @@
                 if (isLast) {
                     isLast = removePortFromDevice(link.src().deviceId(), link.src().port(),
                             mcastIp, assignedVlan);
+                    mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
                 }
             }
         }
@@ -192,6 +240,9 @@
             IpAddress mcastIp) {
         VlanId assignedVlan = assignedVlan();
 
+        // Process the ingress device
+        addFilterToDevice(source.deviceId(), source.port(), assignedVlan);
+
         // When source and sink are on the same device
         if (source.deviceId().equals(sink.deviceId())) {
             // Source and sink are on even the same port. There must be something wrong.
@@ -200,25 +251,91 @@
                 return;
             }
             addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan);
+            mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), McastRole.INGRESS);
             return;
         }
 
-        // Process the ingress device
-        addFilterToDevice(source.deviceId(), source.port(), assignedVlan);
-
         // Find a path. If present, create/update groups and flows for each hop
         Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
         if (mcastPath.isPresent()) {
-            mcastPath.get().links().forEach(link -> {
+            List<Link> links = mcastPath.get().links();
+            checkState(links.size() == 2,
+                    "Path in leaf-spine topology should always be two hops: ", links);
+
+            links.forEach(link -> {
                 addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan);
                 addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp, assignedVlan);
             });
+
             // Process the egress device
             addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan);
+
+            // Setup mcast roles
+            mcastRoleStore.put(new McastStoreKey(mcastIp, source.deviceId()),
+                    McastRole.INGRESS);
+            mcastRoleStore.put(new McastStoreKey(mcastIp, links.get(0).dst().deviceId()),
+                    McastRole.TRANSIT);
+            mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
+                    McastRole.EGRESS);
+        } else {
+            log.warn("Unable to find a path from {} to {}. Abort sinkAdded",
+                    source.deviceId(), sink.deviceId());
         }
     }
 
     /**
+     * Processes the LINK_DOWN event.
+     *
+     * @param affectedLink Link that is going down
+     */
+    protected void processLinkDown(Link affectedLink) {
+        VlanId assignedVlan = assignedVlan();
+
+        getAffectedGroups(affectedLink).forEach(mcastIp -> {
+            // Find out the ingress, transit and egress device of affected group
+            DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
+                    .stream().findAny().orElse(null);
+            DeviceId transitDevice = getDevice(mcastIp, McastRole.TRANSIT)
+                    .stream().findAny().orElse(null);
+            Set<DeviceId> egressDevices = getDevice(mcastIp, McastRole.EGRESS);
+            if (ingressDevice == null || transitDevice == null || egressDevices == null) {
+                log.warn("Missing ingress {}, transit {}, or egress {} devices",
+                        ingressDevice, transitDevice, egressDevices);
+                return;
+            }
+
+            // Remove entire transit
+            removeGroupFromDevice(transitDevice, mcastIp, assignedVlan);
+
+            // Remove transit-facing port on ingress device
+            PortNumber ingressTransitPort = ingressTransitPort(mcastIp);
+            if (ingressTransitPort != null) {
+                removePortFromDevice(ingressDevice, ingressTransitPort, mcastIp, assignedVlan);
+                mcastRoleStore.remove(new McastStoreKey(mcastIp, transitDevice));
+            }
+
+            // Construct a new path for each egress device
+            egressDevices.forEach(egressDevice -> {
+                Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
+                if (mcastPath.isPresent()) {
+                    List<Link> links = mcastPath.get().links();
+                    links.forEach(link -> {
+                        addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp, assignedVlan);
+                        addFilterToDevice(link.dst().deviceId(), link.dst().port(), assignedVlan);
+                    });
+                    // Setup new transit mcast role
+                    mcastRoleStore.put(new McastStoreKey(mcastIp,
+                            links.get(0).dst().deviceId()), McastRole.TRANSIT);
+                } else {
+                    log.warn("Fail to recover egress device {} from link failure {}",
+                            egressDevice, affectedLink);
+                    removeGroupFromDevice(egressDevice, mcastIp, assignedVlan);
+                }
+            });
+        });
+    }
+
+    /**
      * Adds filtering objective for given device and port.
      *
      * @param deviceId device ID
@@ -228,7 +345,8 @@
     private void addFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan) {
         // Do nothing if the port is configured as suppressed
         ConnectPoint connectPt = new ConnectPoint(deviceId, port);
-        if (srManager.deviceConfiguration.suppressSubnet().contains(connectPt) ||
+        if (srManager.deviceConfiguration == null ||
+                srManager.deviceConfiguration.suppressSubnet().contains(connectPt) ||
                 srManager.deviceConfiguration.suppressHost().contains(connectPt)) {
             log.info("Ignore suppressed port {}", connectPt);
             return;
@@ -236,8 +354,13 @@
 
         FilteringObjective.Builder filtObjBuilder =
                 filterObjBuilder(deviceId, port, assignedVlan);
-        srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add());
-        // TODO add objective context
+        ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
+                        deviceId, port.toLong(), assignedVlan),
+                (objective, error) ->
+                        log.warn("Failed to add filter on {}/{}, vlan {}: {}",
+                                deviceId, port.toLong(), assignedVlan, error));
+        srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
     }
 
     /**
@@ -251,17 +374,14 @@
      */
     private void addPortToDevice(DeviceId deviceId, PortNumber port,
             IpAddress mcastIp, VlanId assignedVlan) {
-        log.info("Add port {} to {}. mcastIp={}, assignedVlan={}",
-                port, deviceId, mcastIp, assignedVlan);
-        McastNextObjectiveStoreKey mcastNextObjectiveStoreKey =
-                new McastNextObjectiveStoreKey(mcastIp, deviceId);
+        McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
         ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
-        if (!mcastNextObjStore.containsKey(mcastNextObjectiveStoreKey)) {
+        if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
             // First time someone request this mcast group via this device
             portBuilder.add(port);
         } else {
             // This device already serves some subscribers of this mcast group
-            NextObjective nextObj = mcastNextObjStore.get(mcastNextObjectiveStoreKey).value();
+            NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
             // Stop if the port is already in the nextobj
             Set<PortNumber> existingPorts = getPorts(nextObj.next());
             if (existingPorts.contains(port)) {
@@ -271,14 +391,19 @@
             portBuilder.addAll(existingPorts).add(port).build();
         }
         // Create, store and apply the new nextObj and fwdObj
+        ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> log.debug("Successfully add {} on {}/{}, vlan {}",
+                        mcastIp, deviceId, port.toLong(), assignedVlan),
+                (objective, error) ->
+                        log.warn("Failed to add {} on {}/{}, vlan {}: {}",
+                                mcastIp, deviceId, port.toLong(), assignedVlan, error));
         NextObjective newNextObj =
                 nextObjBuilder(mcastIp, assignedVlan, portBuilder.build()).add();
         ForwardingObjective fwdObj =
-                fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add();
-        mcastNextObjStore.put(mcastNextObjectiveStoreKey, newNextObj);
+                fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
+        mcastNextObjStore.put(mcastStoreKey, newNextObj);
         srManager.flowObjectiveService.next(deviceId, newNextObj);
         srManager.flowObjectiveService.forward(deviceId, fwdObj);
-        // TODO add objective callback
     }
 
     /**
@@ -294,19 +419,17 @@
      */
     private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
             IpAddress mcastIp, VlanId assignedVlan) {
-        log.info("Remove port {} from {}. mcastIp={}, assignedVlan={}",
-                port, deviceId, mcastIp, assignedVlan);
-        McastNextObjectiveStoreKey mcastNextObjectiveStoreKey =
-                new McastNextObjectiveStoreKey(mcastIp, deviceId);
+        McastStoreKey mcastStoreKey =
+                new McastStoreKey(mcastIp, deviceId);
         // This device is not serving this multicast group
-        if (!mcastNextObjStore.containsKey(mcastNextObjectiveStoreKey)) {
+        if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
             log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
             return false;
         }
-        NextObjective nextObj = mcastNextObjStore.get(mcastNextObjectiveStoreKey).value();
+        NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
 
         Set<PortNumber> existingPorts = getPorts(nextObj.next());
-        // This device does not serve this multicast group
+        // This port does not serve this multicast group
         if (!existingPorts.contains(port)) {
             log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
             return false;
@@ -322,22 +445,91 @@
             // NOTE: Rely on GroupStore garbage collection rather than explicitly
             //       remove L3MG since there might be other flows/groups refer to
             //       the same L2IG
-            fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove();
-            mcastNextObjStore.remove(mcastNextObjectiveStoreKey);
+            ObjectiveContext context = new DefaultObjectiveContext(
+                    (objective) -> log.debug("Successfully remove {} on {}/{}, vlan {}",
+                            mcastIp, deviceId, port.toLong(), assignedVlan),
+                    (objective, error) ->
+                            log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
+                                    mcastIp, deviceId, port.toLong(), assignedVlan, error));
+            fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
+            mcastNextObjStore.remove(mcastStoreKey);
             srManager.flowObjectiveService.forward(deviceId, fwdObj);
         } else {
             // If this is not the last sink, update flows and groups
+            ObjectiveContext context = new DefaultObjectiveContext(
+                    (objective) -> log.debug("Successfully update {} on {}/{}, vlan {}",
+                            mcastIp, deviceId, port.toLong(), assignedVlan),
+                    (objective, error) ->
+                            log.warn("Failed to update {} on {}/{}, vlan {}: {}",
+                                    mcastIp, deviceId, port.toLong(), assignedVlan, error));
             newNextObj = nextObjBuilder(mcastIp, assignedVlan, existingPorts).add();
             fwdObj = fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add();
-            mcastNextObjStore.put(mcastNextObjectiveStoreKey, newNextObj);
+            mcastNextObjStore.put(mcastStoreKey, newNextObj);
             srManager.flowObjectiveService.next(deviceId, newNextObj);
             srManager.flowObjectiveService.forward(deviceId, fwdObj);
         }
-        // TODO add objective callback
-
         return existingPorts.isEmpty();
     }
 
+
+    /**
+     * Removes entire group on given device.
+     *
+     * @param deviceId device ID
+     * @param mcastIp multicast group to be removed
+     * @param assignedVlan assigned VLAN ID
+     */
+    private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
+            VlanId assignedVlan) {
+        McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
+        // This device is not serving this multicast group
+        if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
+            log.warn("{} is not serving {}. Abort.", deviceId, mcastIp);
+            return;
+        }
+        NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
+        // NOTE: Rely on GroupStore garbage collection rather than explicitly
+        //       remove L3MG since there might be other flows/groups refer to
+        //       the same L2IG
+        ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> log.debug("Successfully remove {} on {}, vlan {}",
+                        mcastIp, deviceId, assignedVlan),
+                (objective, error) ->
+                        log.warn("Failed to remove {} on {}, vlan {}: {}",
+                                mcastIp, deviceId, assignedVlan, error));
+        ForwardingObjective fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
+        srManager.flowObjectiveService.forward(deviceId, fwdObj);
+        mcastNextObjStore.remove(mcastStoreKey);
+        mcastRoleStore.remove(mcastStoreKey);
+    }
+
+    /**
+     * Remove all groups on given device.
+     *
+     * @param deviceId device ID
+     */
+    public void removeDevice(DeviceId deviceId) {
+        Iterator<Map.Entry<McastStoreKey, Versioned<NextObjective>>> itNextObj =
+                mcastNextObjStore.entrySet().iterator();
+        while (itNextObj.hasNext()) {
+            Map.Entry<McastStoreKey, Versioned<NextObjective>> entry = itNextObj.next();
+            if (entry.getKey().deviceId().equals(deviceId)) {
+                removeGroupFromDevice(entry.getKey().deviceId(), entry.getKey().mcastIp(), assignedVlan());
+                itNextObj.remove();
+            }
+        }
+
+        Iterator<Map.Entry<McastStoreKey, Versioned<McastRole>>> itRole =
+                mcastRoleStore.entrySet().iterator();
+        while (itRole.hasNext()) {
+            Map.Entry<McastStoreKey, Versioned<McastRole>> entry = itRole.next();
+            if (entry.getKey().deviceId().equals(deviceId)) {
+                itRole.remove();
+            }
+        }
+
+    }
+
     /**
      * Creates a next objective builder for multicast.
      *
@@ -456,16 +648,15 @@
     private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp) {
         List<Path> allPaths = Lists.newArrayList(
                 topologyService.getPaths(topologyService.currentTopology(), src, dst));
+        log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
         if (allPaths.isEmpty()) {
-            log.warn("Fail to find a path from {} to {}. Abort.", src, dst);
             return Optional.empty();
         }
 
         // If one of the available path is used before, use the same path
-        McastNextObjectiveStoreKey mcastNextObjectiveStoreKey =
-                new McastNextObjectiveStoreKey(mcastIp, src);
-        if (mcastNextObjStore.containsKey(mcastNextObjectiveStoreKey)) {
-            NextObjective nextObj = mcastNextObjStore.get(mcastNextObjectiveStoreKey).value();
+        McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, src);
+        if (mcastNextObjStore.containsKey(mcastStoreKey)) {
+            NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
             Set<PortNumber> existingPorts = getPorts(nextObj.next());
             for (Path path : allPaths) {
                 PortNumber srcPort = path.links().get(0).src().port();
@@ -480,6 +671,37 @@
     }
 
     /**
+     * Gets device(s) of given role in given multicast group.
+     *
+     * @param mcastIp multicast IP
+     * @param role multicast role
+     * @return set of device ID or empty set if not found
+     */
+    private Set<DeviceId> getDevice(IpAddress mcastIp, McastRole role) {
+        return mcastRoleStore.entrySet().stream()
+                .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
+                        entry.getValue().value() == role)
+                .map(Map.Entry::getKey).map(McastStoreKey::deviceId)
+                .collect(Collectors.toSet());
+    }
+
+    /**
+     * Gets groups which is affected by the link down event.
+     *
+     * @param link link going down
+     * @return a set of multicast IpAddress
+     */
+    private Set<IpAddress> getAffectedGroups(Link link) {
+        DeviceId deviceId = link.src().deviceId();
+        PortNumber port = link.src().port();
+        return mcastNextObjStore.entrySet().stream()
+                .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
+                        getPorts(entry.getValue().value().next()).contains(port))
+                .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
+                .collect(Collectors.toSet());
+    }
+
+    /**
      * Gets egress VLAN from McastConfig.
      *
      * @return egress VLAN or VlanId.NONE if not configured
@@ -500,4 +722,34 @@
                 VlanId.vlanId(SegmentRoutingManager.ASSIGNED_VLAN_NO_SUBNET) :
                 egressVlan();
     }
+
+    /**
+     * Gets the spine-facing port on ingress device of given multicast group.
+     *
+     * @param mcastIp multicast IP
+     * @return spine-facing port on ingress device
+     */
+    private PortNumber ingressTransitPort(IpAddress mcastIp) {
+        DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
+                .stream().findAny().orElse(null);
+        if (ingressDevice != null) {
+            NextObjective nextObj = mcastNextObjStore
+                    .get(new McastStoreKey(mcastIp, ingressDevice)).value();
+            Set<PortNumber> ports = getPorts(nextObj.next());
+
+            for (PortNumber port : ports) {
+                // Spine-facing port should have no subnet and no xconnect
+                if (srManager.deviceConfiguration != null &&
+                        srManager.deviceConfiguration.getPortSubnet(ingressDevice, port) == null &&
+                        srManager.deviceConfiguration.getXConnects().values().stream()
+                                .allMatch(connectPoints ->
+                                        connectPoints.stream().noneMatch(connectPoint ->
+                                                connectPoint.port().equals(port))
+                                )) {
+                    return port;
+                }
+            }
+        }
+        return null;
+    }
 }