[CORD-2838][CORD-2833] Revisit McastHandler and handle shortest paths with pair links

Includes also a refactoring of the path computation

Change-Id: Iff63780a3bb3e895e55c52211290c19d993e1905
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index a791ecb..968ad3b 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -33,6 +33,9 @@
 import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.mcast.api.McastEvent;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.McastRouteUpdate;
 import org.onosproject.net.config.basics.McastConfig;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
@@ -54,11 +57,10 @@
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
-import org.onosproject.net.mcast.McastEvent;
-import org.onosproject.net.mcast.McastRoute;
-import org.onosproject.net.mcast.McastRouteInfo;
+import org.onosproject.net.topology.LinkWeigher;
 import org.onosproject.net.topology.Topology;
 import org.onosproject.net.topology.TopologyService;
+import org.onosproject.segmentrouting.SRLinkWeigher;
 import org.onosproject.segmentrouting.SegmentRoutingManager;
 import org.onosproject.segmentrouting.SegmentRoutingService;
 import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
@@ -78,6 +80,7 @@
 import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
@@ -88,10 +91,9 @@
 
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
+
+import static org.onosproject.mcast.api.McastEvent.Type.*;
 import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID;
-import static org.onosproject.net.mcast.McastEvent.Type.ROUTE_REMOVED;
-import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_ADDED;
-import static org.onosproject.net.mcast.McastEvent.Type.SOURCE_UPDATED;
 import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
 import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
 import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
@@ -139,63 +141,116 @@
 
     private void enqueueMcastEvent(McastEvent mcastEvent) {
         log.debug("Enqueue mcastEvent {}", mcastEvent);
-        final McastRouteInfo mcastRouteInfo = mcastEvent.subject();
+        final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
+        final McastRouteUpdate prevUpdate = mcastEvent.prevSubject();
+        final IpAddress group = prevUpdate.route().group();
         // Let's create the keys of the cache
         ImmutableSet.Builder<ConnectPoint> sinksBuilder = ImmutableSet.builder();
+        Set<ConnectPoint> sinks;
         // For this event we will have a set of sinks
-        if (mcastEvent.type() == SOURCE_ADDED ||
-                mcastEvent.type() == SOURCE_UPDATED ||
-                mcastEvent.type() == ROUTE_REMOVED) {
-            // Add all the sinks
-            sinksBuilder.addAll(mcastRouteInfo.sinks());
+        if (mcastEvent.type() == SOURCES_ADDED ||
+                mcastEvent.type() == SOURCES_REMOVED) {
+            // FIXME To be addressed with multiple sources support
+            sinks = mcastRouteUpdate.sinks()
+                    .values()
+                    .stream()
+                    .flatMap(Collection::stream)
+                    .collect(Collectors.toSet());
         } else {
-            // We have just one sink in this case
-            ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
-            // It is always true, unless something of bad happened
-            // in the mcast route store
-            if (sink != null) {
-                sinksBuilder.add(sink);
+            Set<ConnectPoint> prevSinks = prevUpdate.sinks()
+                    .values()
+                    .stream()
+                    .flatMap(Collection::stream)
+                    .collect(Collectors.toSet());
+            if (mcastEvent.type() == ROUTE_REMOVED) {
+                // Get the old sinks, since current subject is null
+                sinks = prevSinks;
+            } else {
+                // Get new sinks
+                Set<ConnectPoint> newsinks = mcastRouteUpdate.sinks()
+                        .values()
+                        .stream()
+                        .flatMap(Collection::stream)
+                        .collect(Collectors.toSet());
+                // If it is a SINKS_ADDED event
+                if (mcastEvent.type() == SINKS_ADDED) {
+                    // Let's do the difference between current and prev subjects
+                    sinks = Sets.difference(newsinks, prevSinks);
+                } else {
+                    // Let's do the difference between prev and current subjects
+                    sinks = Sets.difference(prevSinks, newsinks);
+                }
             }
         }
+        // Add all the sinks
+        sinksBuilder.addAll(sinks);
         // Push the elements in the cache
         sinksBuilder.build().forEach(sink -> {
-            McastCacheKey cacheKey = new McastCacheKey(mcastRouteInfo.route().group(),
-                                                       sink);
+            McastCacheKey cacheKey = new McastCacheKey(group, sink);
             mcastEventCache.put(cacheKey, mcastEvent);
         });
     }
 
     private void dequeueMcastEvent(McastEvent mcastEvent) {
         log.debug("Dequeue mcastEvent {}", mcastEvent);
-        final McastRouteInfo mcastRouteInfo = mcastEvent.subject();
+        final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
+        final McastRouteUpdate prevUpdate = mcastEvent.prevSubject();
         // Get source, mcast group
-        ConnectPoint source = mcastRouteInfo.source().orElse(null);
-        IpAddress mcastIp = mcastRouteInfo.route().group();
+        // FIXME To be addressed with multiple sources support
+        ConnectPoint prevSource = prevUpdate.sources()
+                .stream()
+                .findFirst()
+                .orElse(null);
+        IpAddress mcastIp = prevUpdate.route().group();
+        Set<ConnectPoint> prevSinks = prevUpdate.sinks()
+                .values()
+                .stream()
+                .flatMap(Collection::stream)
+                .collect(Collectors.toSet());
+        Set<ConnectPoint> newSinks;
+        // Sinks to handled by SINKS_ADDED and SINKS_REMOVED procedures
+        Set<ConnectPoint> sinks;
         // According to the event type let's call the proper method
         switch (mcastEvent.type()) {
-            case SOURCE_ADDED:
-                // Get all the sinks and process
-                Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
-                sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp));
+            case SOURCES_ADDED:
+                // FIXME To be addressed with multiple sources support
+                // Get all the sinks
+                //Set<ConnectPoint> sinks = mcastRouteInfo.sinks();
+                // Compute the Mcast tree
+                //Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
+                // Process the given sinks using the pre-computed paths
+                //mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
                 break;
-            case SOURCE_UPDATED:
+            case SOURCES_REMOVED:
+                // FIXME To be addressed with multiple sources support
                 // Get old source
-                ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
+                //ConnectPoint oldSource = mcastEvent.prevSubject().source().orElse(null);
                 // Just the first cached element will be processed
-                processSourceUpdatedInternal(mcastIp, source, oldSource);
+                //processSourceUpdatedInternal(mcastIp, source, oldSource);
                 break;
             case ROUTE_REMOVED:
                 // Process the route removed, just the first cached element will be processed
-                processRouteRemovedInternal(source, mcastIp);
+                processRouteRemovedInternal(prevSource, mcastIp);
                 break;
-            case SINK_ADDED:
-                // Get the only sink and process
-                ConnectPoint sink = mcastRouteInfo.sink().orElse(null);
-                processSinkAddedInternal(source, sink, mcastIp);
+            case SINKS_ADDED:
+                // Get the only sinks to be processed (new ones)
+                newSinks = mcastRouteUpdate.sinks()
+                        .values()
+                        .stream()
+                        .flatMap(Collection::stream)
+                        .collect(Collectors.toSet());
+                sinks = Sets.difference(newSinks, prevSinks);
+                sinks.forEach(sink -> processSinkAddedInternal(prevSource, sink, mcastIp, null));
                 break;
-            case SINK_REMOVED:
-                sink = mcastRouteInfo.sink().orElse(null);
-                processSinkRemovedInternal(source, sink, mcastIp);
+            case SINKS_REMOVED:
+                // Get the only sinks to be processed (old ones)
+                newSinks = mcastRouteUpdate.sinks()
+                        .values()
+                        .stream()
+                        .flatMap(Collection::stream)
+                        .collect(Collectors.toSet());
+                sinks = Sets.difference(prevSinks, newSinks);
+                sinks.forEach(sink -> processSinkRemovedInternal(prevSource, sink, mcastIp));
                 break;
             default:
                 break;
@@ -283,11 +338,26 @@
      */
     public void init() {
         srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
-            ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
-            Set<ConnectPoint> sinks = srManager.multicastRouteService.fetchSinks(mcastRoute);
-            sinks.forEach(sink -> {
-                processSinkAddedInternal(source, sink, mcastRoute.group());
-            });
+            // FIXME To be addressed with multiple sources support
+            ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
+                    .stream()
+                    .findFirst()
+                    .orElse(null);
+            Set<ConnectPoint> sinks = srManager.multicastRouteService.sinks(mcastRoute);
+            // Filter out all the working sinks, we do not want to move them
+            sinks = sinks.stream()
+                    .filter(sink -> {
+                        McastStoreKey mcastKey = new McastStoreKey(mcastRoute.group(), sink.deviceId());
+                        Versioned<NextObjective> verMcastNext = mcastNextObjStore.get(mcastKey);
+                        return verMcastNext == null ||
+                                !getPorts(verMcastNext.value().next()).contains(sink.port());
+                    })
+                    .collect(Collectors.toSet());
+            // Compute the Mcast tree
+            Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
+            // Process the given sinks using the pre-computed paths
+            mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
+                                                                       mcastRoute.group(), paths));
         });
     }
 
@@ -306,12 +376,6 @@
      */
     public void processMcastEvent(McastEvent event) {
         log.info("process {}", event);
-        // Verify if it is a complete event
-        McastRouteInfo mcastRouteInfo = event.subject();
-        if (!mcastRouteInfo.isComplete()) {
-            log.info("Incompleted McastRouteInfo. Abort {}", event.type());
-            return;
-        }
         // Just enqueue for now
         enqueueMcastEvent(event);
     }
@@ -451,7 +515,8 @@
             }
 
             // If this is the last sink on the device, also update upstream
-            Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
+            Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
+                                               mcastIp, null);
             if (mcastPath.isPresent()) {
                 List<Link> links = Lists.newArrayList(mcastPath.get().links());
                 Collections.reverse(links);
@@ -482,7 +547,7 @@
      * @param mcastIp multicast group IP address
      */
     private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
-            IpAddress mcastIp) {
+            IpAddress mcastIp, List<Path> allPaths) {
         lastMcastChange = Instant.now();
         mcastLock();
         try {
@@ -511,7 +576,8 @@
             }
 
             // Find a path. If present, create/update groups and flows for each hop
-            Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp);
+            Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(),
+                                               mcastIp, allPaths);
             if (mcastPath.isPresent()) {
                 List<Link> links = mcastPath.get().links();
 
@@ -581,7 +647,7 @@
                 // Continue only when this instance is the master of source device
                 if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
                     log.debug("Skip {} due to lack of mastership of the source device {}",
-                             source.deviceId());
+                             mcastIp, source.deviceId());
                     return;
                 }
 
@@ -592,9 +658,15 @@
                 // Remove transit-facing ports on the ingress device
                 removeIngressTransitPorts(mcastIp, ingressDevice, source);
 
+                // Compute mcast tree for the the egress devices
+                Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
+
                 // Construct a new path for each egress device
-                egressDevices.forEach(egressDevice -> {
-                    Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
+                mcastTree.forEach((egressDevice, paths) -> {
+                    // We try to enforce the sinks path on the mcast tree
+                    Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
+                                                       mcastIp, paths);
+                    // If a path is present, let's install it
                     if (mcastPath.isPresent()) {
                         installPath(mcastIp, source, mcastPath.get());
                     } else {
@@ -680,9 +752,14 @@
                             return;
                         }
                     }
+
+                    // Compute mcast tree for the the egress devices
+                    Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
+
                     // Construct a new path for each egress device
-                    egressDevices.forEach(egressDevice -> {
-                        Optional<Path> mcastPath = getPath(ingressDevice, egressDevice, mcastIp);
+                    mcastTree.forEach((egressDevice, paths) -> {
+                        Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
+                                                           mcastIp, null);
                         // If there is a new path
                         if (mcastPath.isPresent()) {
                             // Let's install the new mcast path for this egress
@@ -1072,31 +1149,157 @@
         return builder.build();
     }
 
-    // Utility method to verify is a link is a pair-link
-    private boolean isPairLink(Link link) {
-        // Take src id, src port, dst id and dst port
-        final DeviceId srcId = link.src().deviceId();
-        final PortNumber srcPort = link.src().port();
-        final DeviceId dstId = link.dst().deviceId();
-        final PortNumber dstPort = link.dst().port();
-        // init as true
-        boolean isPairLink = true;
-        try {
-            // If one of this condition is not true; it is not a pair link
-            if (!(srManager.deviceConfiguration().isEdgeDevice(srcId) &&
-                  srManager.deviceConfiguration().isEdgeDevice(dstId) &&
-                  srManager.deviceConfiguration().getPairDeviceId(srcId).equals(dstId) &&
-                  srManager.deviceConfiguration().getPairLocalPort(srcId).equals(srcPort) &&
-                  srManager.deviceConfiguration().getPairLocalPort(dstId).equals(dstPort))) {
-                    isPairLink = false;
-                }
-        } catch (DeviceConfigNotFoundException e) {
-            // Configuration not provided
-            log.warn("Could not check if the link {} is pairlink "
-                             + "config not yet provided", link);
-            isPairLink = false;
+    /**
+     * Go through all the paths, looking for shared links to be used
+     * in the final path computation.
+     *
+     * @param egresses egress devices
+     * @param availablePaths all the available paths towards the egress
+     * @return shared links between egress devices
+     */
+    private Set<Link> exploreMcastTree(Set<DeviceId> egresses,
+                                       Map<DeviceId, List<Path>> availablePaths) {
+        // Length of the shortest path
+        int minLength = Integer.MAX_VALUE;
+        int length;
+        // Current paths
+        List<Path> currentPaths;
+        // Verify the source can still reach all the egresses
+        for (DeviceId egress : egresses) {
+            // From the source we cannot reach all the sinks
+            // just continue and let's figured out after
+            currentPaths = availablePaths.get(egress);
+            if (currentPaths.isEmpty()) {
+                continue;
+            }
+            // Get the length of the first one available,
+            // update the min length and save the paths
+            length = currentPaths.get(0).links().size();
+            if (length < minLength) {
+                minLength = length;
+            }
         }
-        return isPairLink;
+        // If there are no paths
+        if (minLength == Integer.MAX_VALUE) {
+            return Collections.emptySet();
+        }
+        // Iterate looking for shared links
+        int index = 0;
+        // Define the sets for the intersection
+        Set<Link> sharedLinks = Sets.newHashSet();
+        Set<Link> currentSharedLinks;
+        Set<Link> currentLinks;
+        DeviceId deviceToRemove = null;
+        // Let's find out the shared links
+        while (index < minLength) {
+            // Initialize the intersection with the paths related to the first egress
+            currentPaths = availablePaths.get(
+                    egresses.stream()
+                            .findFirst()
+                            .orElse(null)
+            );
+            currentSharedLinks = Sets.newHashSet();
+            // Iterate over the paths and take the "index" links
+            for (Path path : currentPaths) {
+                currentSharedLinks.add(path.links().get(index));
+            }
+            // Iterate over the remaining egress
+            for (DeviceId egress : egresses) {
+                // Iterate over the paths and take the "index" links
+                currentLinks = Sets.newHashSet();
+                for (Path path : availablePaths.get(egress)) {
+                    currentLinks.add(path.links().get(index));
+                }
+                // Do intersection
+                currentSharedLinks = Sets.intersection(currentSharedLinks, currentLinks);
+                // If there are no shared paths exit and record the device to remove
+                // we have to retry with a subset of sinks
+                if (currentSharedLinks.isEmpty()) {
+                    deviceToRemove = egress;
+                    index = minLength;
+                    break;
+                }
+            }
+            sharedLinks.addAll(currentSharedLinks);
+            index++;
+        }
+        // If the shared links is empty and there are egress
+        // let's retry another time with less sinks, we can
+        // still build optimal subtrees
+        if (sharedLinks.isEmpty() && egresses.size() > 1 && deviceToRemove != null) {
+            egresses.remove(deviceToRemove);
+            sharedLinks = exploreMcastTree(egresses, availablePaths);
+        }
+        return sharedLinks;
+    }
+
+    /**
+     * Build Mcast tree having as root the given source and as leaves the given egress points.
+     *
+     * @param source source of the tree
+     * @param sinks leaves of the tree
+     * @return the computed Mcast tree
+     */
+    private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
+                                                                Set<ConnectPoint> sinks) {
+        // Get the egress devices, remove source from the egress if present
+        Set<DeviceId> egresses = sinks.stream()
+                .map(ConnectPoint::deviceId)
+                .filter(deviceId -> !deviceId.equals(source))
+                .collect(Collectors.toSet());
+        Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
+        // Build final tree nad return it as it is
+        final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
+        mcastTree.forEach((egress, paths) ->
+            sinks.stream().filter(sink -> sink.deviceId().equals(egress))
+                    .forEach(sink -> finalTree.put(sink, mcastTree.get(sink.deviceId()))));
+        return finalTree;
+    }
+
+    /**
+     * Build Mcast tree having as root the given source and as leaves the given egress.
+     *
+     * @param source source of the tree
+     * @param egresses leaves of the tree
+     * @return the computed Mcast tree
+     */
+    private Map<DeviceId, List<Path>> computeMcastTree(DeviceId source,
+                                                       Set<DeviceId> egresses) {
+        // Pre-compute all the paths
+        Map<DeviceId, List<Path>> availablePaths = Maps.newHashMap();
+        // No links to enforce
+        egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
+                                                                       Collections.emptySet())));
+        // Explore the topology looking for shared links amongst the egresses
+        Set<Link> linksToEnforce = exploreMcastTree(Sets.newHashSet(egresses), availablePaths);
+        // Remove all the paths from the previous computation
+        availablePaths.clear();
+        // Build the final paths enforcing the shared links between egress devices
+        egresses.forEach(egress -> availablePaths.put(egress, getPaths(source, egress,
+                                                                       linksToEnforce)));
+        return availablePaths;
+    }
+
+    /**
+     * Gets path from src to dst computed using the custom link weigher.
+     *
+     * @param src source device ID
+     * @param dst destination device ID
+     * @return list of paths from src to dst
+     */
+    private List<Path> getPaths(DeviceId src, DeviceId dst, Set<Link> linksToEnforce) {
+        // Takes a snapshot of the topology
+        final Topology currentTopology = topologyService.currentTopology();
+        // Build a specific link weigher for this path computation
+        final LinkWeigher linkWeigher = new SRLinkWeigher(srManager, src, linksToEnforce);
+        // We will use our custom link weigher for our path
+        // computations and build the list of valid paths
+        List<Path> allPaths = Lists.newArrayList(
+                topologyService.getPaths(currentTopology, src, dst, linkWeigher)
+        );
+        // If there are no valid paths, just exit
+        log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
+        return allPaths;
     }
 
     /**
@@ -1107,18 +1310,17 @@
      * @param src source device ID
      * @param dst destination device ID
      * @param mcastIp multicast group
+     * @param allPaths paths list
      * @return an optional path from src to dst
      */
-    private Optional<Path> getPath(DeviceId src, DeviceId dst, IpAddress mcastIp) {
-        // Takes a snapshot of the topology
-        final Topology currentTopology = topologyService.currentTopology();
-        List<Path> allPaths = Lists.newArrayList(
-                topologyService.getPaths(currentTopology, src, dst)
-        );
-        // Create list of valid paths
-        allPaths.removeIf(path -> path.links().stream().anyMatch(this::isPairLink));
-        // If there are no valid paths, just exit
-        log.debug("{} path(s) found from {} to {}", allPaths.size(), src, dst);
+    private Optional<Path> getPath(DeviceId src, DeviceId dst,
+                                   IpAddress mcastIp, List<Path> allPaths) {
+        // Firstly we get all the valid paths, if the supplied are null
+        if (allPaths == null) {
+            allPaths = getPaths(src, dst, Collections.emptySet());
+        }
+
+        // If there are no paths just exit
         if (allPaths.isEmpty()) {
             return Optional.empty();
         }
@@ -1205,7 +1407,7 @@
         return mcastRoleStore.entrySet().stream()
                 .filter(entry -> entry.getKey().mcastIp().equals(mcastIp) &&
                         entry.getValue().value() == role)
-                .map(Map.Entry::getKey).map(McastStoreKey::deviceId)
+                .map(Entry::getKey).map(McastStoreKey::deviceId)
                 .collect(Collectors.toSet());
     }
 
@@ -1215,11 +1417,15 @@
      * @param mcastIp multicast IP
      * @return source connect point or null if not found
      */
+    // FIXME To be addressed with multiple sources support
     private ConnectPoint getSource(IpAddress mcastIp) {
-        return srManager.multicastRouteService.getRoutes().stream()
-                .filter(mcastRoute -> mcastRoute.group().equals(mcastIp))
-                .map(mcastRoute -> srManager.multicastRouteService.fetchSource(mcastRoute))
-                .findAny().orElse(null);
+        // FIXME we should support different types of routes
+        McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
+                .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
+                .findFirst().orElse(null);
+        return mcastRoute == null ? null : srManager.multicastRouteService.sources(mcastRoute)
+                .stream()
+                .findFirst().orElse(null);
     }
     /**
      * Gets sinks of given multicast group.
@@ -1228,10 +1434,12 @@
      * @return set of sinks or empty set if not found
      */
     private Set<ConnectPoint> getSinks(IpAddress mcastIp) {
-        return srManager.multicastRouteService.getRoutes().stream()
-                .filter(mcastRoute -> mcastRoute.group().equals(mcastIp))
-                .map(mcastRoute -> srManager.multicastRouteService.fetchSinks(mcastRoute))
-                .findAny().orElse(Collections.emptySet());
+        // FIXME we should support different types of routes
+        McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
+                .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
+                .findFirst().orElse(null);
+        return mcastRoute == null ?
+                Collections.emptySet() : srManager.multicastRouteService.sinks(mcastRoute);
     }
 
     /**
@@ -1246,7 +1454,7 @@
         return mcastNextObjStore.entrySet().stream()
                 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
                         getPorts(entry.getValue().value().next()).contains(port))
-                .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
+                .map(Entry::getKey).map(McastStoreKey::mcastIp)
                 .collect(Collectors.toSet());
     }
 
@@ -1259,7 +1467,7 @@
     private Set<IpAddress> getAffectedGroups(DeviceId deviceId) {
         return mcastNextObjStore.entrySet().stream()
                 .filter(entry -> entry.getKey().deviceId().equals(deviceId))
-                .map(Map.Entry::getKey).map(McastStoreKey::mcastIp)
+                .map(Entry::getKey).map(McastStoreKey::mcastIp)
                 .collect(Collectors.toSet());
     }
 
@@ -1433,7 +1641,10 @@
             // Iterates over the route and updates properly the filtering objective
             // on the source device.
             srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
-                ConnectPoint source = srManager.multicastRouteService.fetchSource(mcastRoute);
+                // FIXME To be addressed with multiple sources support
+                ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
+                        .stream()
+                        .findFirst().orElse(null);
                 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
                     if (install) {
                         addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
@@ -1570,12 +1781,12 @@
         if (mcastIp != null) {
             return mcastNextObjStore.entrySet().stream()
                     .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
-                    .collect(Collectors.toMap(Map.Entry::getKey,
+                    .collect(Collectors.toMap(Entry::getKey,
                                               entry -> entry.getValue().value().id()));
         }
         // Otherwise take all the groups
         return mcastNextObjStore.entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey,
+                .collect(Collectors.toMap(Entry::getKey,
                                           entry -> entry.getValue().value().id()));
     }
 
@@ -1584,12 +1795,12 @@
         if (mcastIp != null) {
             return mcastRoleStore.entrySet().stream()
                     .filter(mcastEntry -> mcastIp.equals(mcastEntry.getKey().mcastIp()))
-                    .collect(Collectors.toMap(Map.Entry::getKey,
+                    .collect(Collectors.toMap(Entry::getKey,
                                               entry -> entry.getValue().value()));
         }
         // Otherwise take all the groups
         return mcastRoleStore.entrySet().stream()
-                .collect(Collectors.toMap(Map.Entry::getKey,
+                .collect(Collectors.toMap(Entry::getKey,
                                           entry -> entry.getValue().value()));
     }