[CORD-2834] Handling of dual-homed sinks

Includes also McastUtils to move some code out of the McastHandler

Change-Id: I101637ee600c9d524f17e9f3fc29d63256844956
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
index b0875ca..930432d 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
@@ -18,6 +18,7 @@
 
 import org.onlab.packet.IpAddress;
 import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.HostId;
 
 import java.util.Objects;
 
@@ -31,6 +32,8 @@
 class McastCacheKey {
     // The group ip
     private final IpAddress mcastIp;
+    // The sink id
+    private final HostId sinkHost;
     // The sink connect point
     private final ConnectPoint sink;
 
@@ -39,6 +42,8 @@
      *
      * @param mcastIp multicast group IP address
      * @param sink connect point of the sink
+     *
+     * @deprecated in 1.12 ("Magpie") release.
      */
     public McastCacheKey(IpAddress mcastIp, ConnectPoint sink) {
         checkNotNull(mcastIp, "mcastIp cannot be null");
@@ -46,6 +51,22 @@
         checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address");
         this.mcastIp = mcastIp;
         this.sink = sink;
+        this.sinkHost = null;
+    }
+
+    /**
+     * Constructs a key for multicast event cache.
+     *
+     * @param mcastIp multicast group IP address
+     * @param hostId id of the sink
+     */
+    public McastCacheKey(IpAddress mcastIp, HostId hostId) {
+        checkNotNull(mcastIp, "mcastIp cannot be null");
+        checkNotNull(hostId, "sink cannot be null");
+        checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address");
+        this.mcastIp = mcastIp;
+        this.sinkHost = hostId;
+        this.sink = null;
     }
 
     /**
@@ -61,11 +82,22 @@
      * Returns the sink of this key.
      *
      * @return connect point of the sink
+     *
+     * @deprecated in 1.12 ("Magpie") release.
      */
     public ConnectPoint sink() {
         return sink;
     }
 
+    /**
+     * Returns the sink of this key.
+     *
+     * @return host id of the sink
+     */
+    public HostId sinkHost() {
+        return sinkHost;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -77,7 +109,8 @@
         McastCacheKey that =
                 (McastCacheKey) o;
         return (Objects.equals(this.mcastIp, that.mcastIp) &&
-                Objects.equals(this.sink, that.sink));
+                Objects.equals(this.sink, that.sink) &&
+                Objects.equals(this.sinkHost, that.sinkHost));
     }
 
     @Override
@@ -90,6 +123,7 @@
         return toStringHelper(getClass())
                 .add("mcastIp", mcastIp)
                 .add("sink", sink)
+                .add("sinkHost", sinkHost)
                 .toString();
     }
 }
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 968ad3b..4c83a38 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -20,40 +20,27 @@
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalCause;
 import com.google.common.cache.RemovalNotification;
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import org.onlab.packet.Ethernet;
 import org.onlab.packet.IpAddress;
-import org.onlab.packet.IpPrefix;
-import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.KryoNamespace;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mcast.api.McastEvent;
 import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.McastRouteData;
 import org.onosproject.mcast.api.McastRouteUpdate;
-import org.onosproject.net.config.basics.McastConfig;
+import org.onosproject.net.HostId;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Link;
 import org.onosproject.net.Path;
 import org.onosproject.net.PortNumber;
-import org.onosproject.net.flow.DefaultTrafficSelector;
-import org.onosproject.net.flow.DefaultTrafficTreatment;
-import org.onosproject.net.flow.TrafficSelector;
-import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.flow.criteria.Criteria;
-import org.onosproject.net.flow.criteria.VlanIdCriterion;
-import org.onosproject.net.flow.instructions.Instructions.OutputInstruction;
-import org.onosproject.net.flowobjective.DefaultFilteringObjective;
-import org.onosproject.net.flowobjective.DefaultForwardingObjective;
-import org.onosproject.net.flowobjective.DefaultNextObjective;
 import org.onosproject.net.flowobjective.DefaultObjectiveContext;
-import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.flowobjective.NextObjective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
@@ -62,14 +49,10 @@
 import org.onosproject.net.topology.TopologyService;
 import org.onosproject.segmentrouting.SRLinkWeigher;
 import org.onosproject.segmentrouting.SegmentRoutingManager;
-import org.onosproject.segmentrouting.SegmentRoutingService;
-import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
-import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
 import org.onosproject.segmentrouting.storekey.McastStoreKey;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.Serializer;
-import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Versioned;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -92,9 +75,11 @@
 import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static org.onlab.util.Tools.groupedThreads;
 
-import static org.onosproject.mcast.api.McastEvent.Type.*;
-import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID;
-import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SOURCES_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_REMOVED;
+import static org.onosproject.mcast.api.McastEvent.Type.SINKS_ADDED;
+import static org.onosproject.mcast.api.McastEvent.Type.ROUTE_REMOVED;
 import static org.onosproject.segmentrouting.mcast.McastRole.EGRESS;
 import static org.onosproject.segmentrouting.mcast.McastRole.INGRESS;
 import static org.onosproject.segmentrouting.mcast.McastRole.TRANSIT;
@@ -103,17 +88,24 @@
  * Handles Multicast related events.
  */
 public class McastHandler {
+    // Logger instance
     private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
+    // Reference to srManager and most used internal objects
     private final SegmentRoutingManager srManager;
-    private final ApplicationId coreAppId;
-    private final StorageService storageService;
     private final TopologyService topologyService;
+    // Internal store of the Mcast nextobjectives
     private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
-    private final KryoNamespace.Builder mcastKryo;
+    // Internal store of the Mcast roles
     private final ConsistentMap<McastStoreKey, McastRole> mcastRoleStore;
+    // McastUtils
+    private final McastUtils mcastUtils;
 
     // Wait time for the cache
     private static final int WAIT_TIME_MS = 1000;
+
+    // Wait time for the removal of the old location
+    private static final int HOST_MOVED_DELAY_MS = 1000;
+
     /**
      * The mcastEventCache is implemented to avoid race condition by giving more time to the
      * underlying subsystems to process previous calls.
@@ -123,7 +115,7 @@
             .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
                 // Get group ip, sink and related event
                 IpAddress mcastIp = notification.getKey().mcastIp();
-                ConnectPoint sink = notification.getKey().sink();
+                HostId sink = notification.getKey().sinkHost();
                 McastEvent mcastEvent = notification.getValue();
                 RemovalCause cause = notification.getCause();
                 log.debug("mcastEventCache removal event. group={}, sink={}, mcastEvent={}, cause={}",
@@ -140,50 +132,42 @@
             }).build();
 
     private void enqueueMcastEvent(McastEvent mcastEvent) {
-        log.debug("Enqueue mcastEvent {}", mcastEvent);
+        // Retrieve, currentData, prevData and the group
         final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
-        final McastRouteUpdate prevUpdate = mcastEvent.prevSubject();
-        final IpAddress group = prevUpdate.route().group();
+        final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
+        final IpAddress group = mcastRoutePrevUpdate.route().group();
         // Let's create the keys of the cache
-        ImmutableSet.Builder<ConnectPoint> sinksBuilder = ImmutableSet.builder();
-        Set<ConnectPoint> sinks;
-        // For this event we will have a set of sinks
+        ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
         if (mcastEvent.type() == SOURCES_ADDED ||
                 mcastEvent.type() == SOURCES_REMOVED) {
             // FIXME To be addressed with multiple sources support
-            sinks = mcastRouteUpdate.sinks()
-                    .values()
-                    .stream()
-                    .flatMap(Collection::stream)
-                    .collect(Collectors.toSet());
-        } else {
-            Set<ConnectPoint> prevSinks = prevUpdate.sinks()
-                    .values()
-                    .stream()
-                    .flatMap(Collection::stream)
-                    .collect(Collectors.toSet());
-            if (mcastEvent.type() == ROUTE_REMOVED) {
-                // Get the old sinks, since current subject is null
-                sinks = prevSinks;
-            } else {
-                // Get new sinks
-                Set<ConnectPoint> newsinks = mcastRouteUpdate.sinks()
-                        .values()
-                        .stream()
-                        .flatMap(Collection::stream)
-                        .collect(Collectors.toSet());
-                // If it is a SINKS_ADDED event
-                if (mcastEvent.type() == SINKS_ADDED) {
-                    // Let's do the difference between current and prev subjects
-                    sinks = Sets.difference(newsinks, prevSinks);
-                } else {
-                    // Let's do the difference between prev and current subjects
-                    sinks = Sets.difference(prevSinks, newsinks);
+            sinksBuilder.addAll(Collections.emptySet());
+        } else if (mcastEvent.type() == SINKS_ADDED) {
+            // We need to process the host id one by one
+            mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
+                // Get the previous locations and verify if there are changes
+                Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
+                Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
+                        prevConnectPoints : Collections.emptySet());
+                if (!changes.isEmpty()) {
+                    sinksBuilder.add(hostId);
                 }
-            }
+            }));
+        } else if (mcastEvent.type() == SINKS_REMOVED) {
+            // We need to process the host id one by one
+            mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
+                // Get the current locations and verify if there are changes
+                Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
+                Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
+                        currentConnectPoints : Collections.emptySet());
+                if (!changes.isEmpty()) {
+                    sinksBuilder.add(hostId);
+                }
+            }));
+        } else if (mcastEvent.type() == ROUTE_REMOVED) {
+            // Current subject is null, just take the previous host ids
+            sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
         }
-        // Add all the sinks
-        sinksBuilder.addAll(sinks);
         // Push the elements in the cache
         sinksBuilder.build().forEach(sink -> {
             McastCacheKey cacheKey = new McastCacheKey(group, sink);
@@ -192,24 +176,22 @@
     }
 
     private void dequeueMcastEvent(McastEvent mcastEvent) {
-        log.debug("Dequeue mcastEvent {}", mcastEvent);
-        final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
-        final McastRouteUpdate prevUpdate = mcastEvent.prevSubject();
+        // Get new and old data
+        final McastRouteUpdate mcastUpdate = mcastEvent.subject();
+        final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
         // Get source, mcast group
         // FIXME To be addressed with multiple sources support
-        ConnectPoint prevSource = prevUpdate.sources()
+        final ConnectPoint source = mcastPrevUpdate.sources()
                 .stream()
                 .findFirst()
                 .orElse(null);
-        IpAddress mcastIp = prevUpdate.route().group();
-        Set<ConnectPoint> prevSinks = prevUpdate.sinks()
+        IpAddress mcastIp = mcastPrevUpdate.route().group();
+        // Get all the previous sinks
+        Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
                 .values()
                 .stream()
                 .flatMap(Collection::stream)
                 .collect(Collectors.toSet());
-        Set<ConnectPoint> newSinks;
-        // Sinks to handled by SINKS_ADDED and SINKS_REMOVED procedures
-        Set<ConnectPoint> sinks;
         // According to the event type let's call the proper method
         switch (mcastEvent.type()) {
             case SOURCES_ADDED:
@@ -230,27 +212,17 @@
                 break;
             case ROUTE_REMOVED:
                 // Process the route removed, just the first cached element will be processed
-                processRouteRemovedInternal(prevSource, mcastIp);
+                processRouteRemovedInternal(source, mcastIp);
                 break;
             case SINKS_ADDED:
-                // Get the only sinks to be processed (new ones)
-                newSinks = mcastRouteUpdate.sinks()
-                        .values()
-                        .stream()
-                        .flatMap(Collection::stream)
-                        .collect(Collectors.toSet());
-                sinks = Sets.difference(newSinks, prevSinks);
-                sinks.forEach(sink -> processSinkAddedInternal(prevSource, sink, mcastIp, null));
+                // FIXME To be addressed with multiple sources support
+                processSinksAddedInternal(source, mcastIp,
+                                          mcastUpdate.sinks(), prevSinks);
                 break;
             case SINKS_REMOVED:
-                // Get the only sinks to be processed (old ones)
-                newSinks = mcastRouteUpdate.sinks()
-                        .values()
-                        .stream()
-                        .flatMap(Collection::stream)
-                        .collect(Collectors.toSet());
-                sinks = Sets.difference(prevSinks, newSinks);
-                sinks.forEach(sink -> processSinkRemovedInternal(prevSource, sink, mcastIp));
+                // FIXME To be addressed with multiple sources support
+                processSinksRemovedInternal(source, mcastIp,
+                                            mcastUpdate.sinks(), prevSinks);
                 break;
             default:
                 break;
@@ -306,28 +278,28 @@
      * @param srManager Segment Routing manager
      */
     public McastHandler(SegmentRoutingManager srManager) {
-        coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
+        ApplicationId coreAppId = srManager.coreService.getAppId(CoreService.CORE_APP_NAME);
         this.srManager = srManager;
-        this.storageService = srManager.storageService;
         this.topologyService = srManager.topologyService;
-        mcastKryo = new KryoNamespace.Builder()
+        KryoNamespace.Builder mcastKryo = new KryoNamespace.Builder()
                 .register(KryoNamespaces.API)
                 .register(McastStoreKey.class)
                 .register(McastRole.class);
-        mcastNextObjStore = storageService
+        mcastNextObjStore = srManager.storageService
                 .<McastStoreKey, NextObjective>consistentMapBuilder()
                 .withName("onos-mcast-nextobj-store")
                 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-NextObj")))
                 .build();
-        mcastRoleStore = storageService
+        mcastRoleStore = srManager.storageService
                 .<McastStoreKey, McastRole>consistentMapBuilder()
                 .withName("onos-mcast-role-store")
                 .withSerializer(Serializer.using(mcastKryo.build("McastHandler-Role")))
                 .build();
+        // Let's create McastUtils object
+        mcastUtils = new McastUtils(srManager, coreAppId, log);
         // Init the executor service and the buckets corrector
         executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
-                                               MCAST_VERIFY_INTERVAL,
-                                               TimeUnit.SECONDS);
+                                               MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
         // Schedule the clean up, this will allow the processing of the expired events
         executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
                                             WAIT_TIME_MS, TimeUnit.MILLISECONDS);
@@ -337,28 +309,36 @@
      * Read initial multicast from mcast store.
      */
     public void init() {
-        srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
-            // FIXME To be addressed with multiple sources support
-            ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
-                    .stream()
-                    .findFirst()
-                    .orElse(null);
-            Set<ConnectPoint> sinks = srManager.multicastRouteService.sinks(mcastRoute);
-            // Filter out all the working sinks, we do not want to move them
-            sinks = sinks.stream()
-                    .filter(sink -> {
-                        McastStoreKey mcastKey = new McastStoreKey(mcastRoute.group(), sink.deviceId());
-                        Versioned<NextObjective> verMcastNext = mcastNextObjStore.get(mcastKey);
-                        return verMcastNext == null ||
-                                !getPorts(verMcastNext.value().next()).contains(sink.port());
-                    })
-                    .collect(Collectors.toSet());
-            // Compute the Mcast tree
-            Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
-            // Process the given sinks using the pre-computed paths
-            mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
-                                                                       mcastRoute.group(), paths));
-        });
+        lastMcastChange = Instant.now();
+        mcastLock();
+        try {
+            srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+                // FIXME To be addressed with multiple sources support
+                ConnectPoint source = srManager.multicastRouteService.sources(mcastRoute)
+                        .stream()
+                        .findFirst()
+                        .orElse(null);
+                // Get all the sinks and process them
+                McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
+                Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(), mcastRouteData.sinks());
+                // Filter out all the working sinks, we do not want to move them
+                sinks = sinks.stream()
+                        .filter(sink -> {
+                            McastStoreKey mcastKey = new McastStoreKey(mcastRoute.group(), sink.deviceId());
+                            Versioned<NextObjective> verMcastNext = mcastNextObjStore.get(mcastKey);
+                            return verMcastNext == null ||
+                                    !mcastUtils.getPorts(verMcastNext.value().next()).contains(sink.port());
+                        })
+                        .collect(Collectors.toSet());
+                // Compute the Mcast tree
+                Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(source.deviceId(), sinks);
+                // Process the given sinks using the pre-computed paths
+                mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
+                                                                           mcastRoute.group(), paths));
+            });
+        } finally {
+            mcastUnlock();
+        }
     }
 
     /**
@@ -398,7 +378,7 @@
             McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
 
             // Verify leadership on the operation
-            if (!isLeader(oldSource)) {
+            if (!mcastUtils.isLeader(oldSource)) {
                 log.debug("Skip {} due to lack of leadership", mcastIp);
                 return;
             }
@@ -410,18 +390,19 @@
                 return;
             }
             NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
-            Set<PortNumber> outputPorts = getPorts(nextObjective.next());
+            Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
 
             // This an optimization to avoid unnecessary removal and add
-            if (!assignedVlanFromNext(nextObjective).equals(assignedVlan(newSource))) {
+            if (!mcastUtils.assignedVlanFromNext(nextObjective)
+                    .equals(mcastUtils.assignedVlan(newSource))) {
                 // Let's remove old flows and groups
-                removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
+                removeGroupFromDevice(oldSource.deviceId(), mcastIp, mcastUtils.assignedVlan(oldSource));
                 // Push new flows and group
                 outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
-                                                                  mcastIp, assignedVlan(newSource)));
+                                                                  mcastIp, mcastUtils.assignedVlan(newSource)));
             }
-            addFilterToDevice(newSource.deviceId(), newSource.port(),
-                              assignedVlan(newSource), mcastIp, INGRESS);
+            mcastUtils.addFilterToDevice(newSource.deviceId(), newSource.port(),
+                              mcastUtils.assignedVlan(newSource), mcastIp, INGRESS);
             // Setup mcast roles
             mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
                                INGRESS);
@@ -448,7 +429,7 @@
             Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
 
             // Verify leadership on the operation
-            if (!isLeader(source)) {
+            if (!mcastUtils.isLeader(source)) {
                 log.debug("Skip {} due to lack of leadership", mcastIp);
                 return;
             }
@@ -456,24 +437,60 @@
             // If there are no egress devices, sinks could be only on the ingress
             if (!egressDevices.isEmpty()) {
                 egressDevices.forEach(
-                        deviceId -> removeGroupFromDevice(deviceId, mcastIp, assignedVlan(null))
+                        deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
                 );
             }
             // Transit could be empty if sinks are on the ingress
             if (!transitDevices.isEmpty()) {
                 transitDevices.forEach(
-                        deviceId -> removeGroupFromDevice(deviceId, mcastIp, assignedVlan(null))
+                        deviceId -> removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null))
                 );
             }
             // Ingress device should be not null
             if (ingressDevice != null) {
-                removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
+                removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
             }
         } finally {
             mcastUnlock();
         }
     }
 
+
+    /**
+     * Process sinks to be removed.
+     *
+     * @param source the source connect point
+     * @param mcastIp the ip address of the group
+     * @param newSinks the new sinks to be processed
+     * @param allPrevSinks all previous sinks
+     */
+    private void processSinksRemovedInternal(ConnectPoint source, IpAddress mcastIp,
+                                             Map<HostId, Set<ConnectPoint>> newSinks,
+                                             Set<ConnectPoint> allPrevSinks) {
+        lastMcastChange = Instant.now();
+        mcastLock();
+        // Let's instantiate the sinks to be removed
+        Set<ConnectPoint> sinksToBeRemoved = Sets.newHashSet();
+        try {
+            // Recover the dual-homed sinks
+            Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks);
+            sinksToBeRecovered.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
+            // Get the only sinks to be processed (old ones)
+            Set<ConnectPoint> allNewSinks = newSinks.values()
+                    .stream()
+                    .flatMap(Collection::stream)
+                    .collect(Collectors.toSet());
+            // Remove the previous one
+            sinksToBeRemoved.addAll(Sets.difference(allPrevSinks, allNewSinks));
+        } finally {
+            mcastUnlock();
+            // Let's schedule the removal of the previous sinks
+            executorService.schedule(
+                    () -> sinksToBeRemoved.forEach(sink -> processSinkRemovedInternal(source, sink, mcastIp)),
+                    HOST_MOVED_DELAY_MS, TimeUnit.MILLISECONDS);
+        }
+    }
+
     /**
      * Removes a path from source to sink for given multicast group.
      *
@@ -482,17 +499,17 @@
      * @param mcastIp multicast group IP address
      */
     private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
-                                          IpAddress mcastIp) {
+                                            IpAddress mcastIp) {
         lastMcastChange = Instant.now();
         mcastLock();
         try {
             // Verify leadership on the operation
-            if (!isLeader(source)) {
+            if (!mcastUtils.isLeader(source)) {
                 log.debug("Skip {} due to lack of leadership", mcastIp);
                 return;
             }
 
-            boolean isLast = false;
+            boolean isLast;
             // When source and sink are on the same device
             if (source.deviceId().equals(sink.deviceId())) {
                 // Source and sink are on even the same port. There must be something wrong.
@@ -501,7 +518,7 @@
                              mcastIp, sink, source);
                     return;
                 }
-                isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
+                isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
                 if (isLast) {
                     mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
                 }
@@ -509,7 +526,7 @@
             }
 
             // Process the egress device
-            isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
+            isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
             if (isLast) {
                 mcastRoleStore.remove(new McastStoreKey(mcastIp, sink.deviceId()));
             }
@@ -526,7 +543,8 @@
                                 link.src().deviceId(),
                                 link.src().port(),
                                 mcastIp,
-                                assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null)
+                                mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ?
+                                                     source : null)
                         );
                         if (isLast) {
                             mcastRoleStore.remove(new McastStoreKey(mcastIp, link.src().deviceId()));
@@ -539,6 +557,31 @@
         }
     }
 
+
+    /**
+     * Process sinks to be added.
+     *
+     * @param source the source connect point
+     * @param mcastIp the group IP
+     * @param newSinks the new sinks to be processed
+     * @param allPrevSinks all previous sinks
+     */
+    private void processSinksAddedInternal(ConnectPoint source, IpAddress mcastIp,
+                                           Map<HostId, Set<ConnectPoint>> newSinks,
+                                           Set<ConnectPoint> allPrevSinks) {
+        lastMcastChange = Instant.now();
+        mcastLock();
+        try {
+            // Get the only sinks to be processed (new ones)
+            Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
+            // Install new sinks
+            sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
+            sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
+        } finally {
+            mcastUnlock();
+        }
+    }
+
     /**
      * Establishes a path from source to sink for given multicast group.
      *
@@ -547,7 +590,7 @@
      * @param mcastIp multicast group IP address
      */
     private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
-            IpAddress mcastIp, List<Path> allPaths) {
+                                          IpAddress mcastIp, List<Path> allPaths) {
         lastMcastChange = Instant.now();
         mcastLock();
         try {
@@ -559,8 +602,8 @@
             }
 
             // Process the ingress device
-            addFilterToDevice(source.deviceId(), source.port(),
-                              assignedVlan(source), mcastIp, INGRESS);
+            mcastUtils.addFilterToDevice(source.deviceId(), source.port(),
+                              mcastUtils.assignedVlan(source), mcastIp, INGRESS);
 
             // When source and sink are on the same device
             if (source.deviceId().equals(sink.deviceId())) {
@@ -570,7 +613,7 @@
                              mcastIp, sink, source);
                     return;
                 }
-                addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(source));
+                addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
                 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()), INGRESS);
                 return;
             }
@@ -588,9 +631,10 @@
                 // Setup properly the transit
                 links.forEach(link -> {
                     addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
-                                    assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
-                    addFilterToDevice(link.dst().deviceId(), link.dst().port(),
-                                      assignedVlan(null), mcastIp, null);
+                                    mcastUtils.assignedVlan(link.src().deviceId()
+                                                                    .equals(source.deviceId()) ? source : null));
+                    mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
+                                      mcastUtils.assignedVlan(null), mcastIp, null);
                 });
 
                 // Setup mcast role for the transit
@@ -600,7 +644,7 @@
                                                             TRANSIT));
 
                 // Process the egress device
-                addPortToDevice(sink.deviceId(), sink.port(), mcastIp, assignedVlan(null));
+                addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
                 // Setup mcast role for egress
                 mcastRoleStore.put(new McastStoreKey(mcastIp, sink.deviceId()),
                                    EGRESS);
@@ -633,7 +677,7 @@
                         .stream().findAny().orElse(null);
                 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
                 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
-                ConnectPoint source = getSource(mcastIp);
+                ConnectPoint source = mcastUtils.getSource(mcastIp);
 
                 // Do not proceed if ingress device or source of this group are missing
                 // If sinks are in other leafs, we have ingress, transit, egress, and source
@@ -653,28 +697,50 @@
 
                 // Remove entire transit
                 transitDevices.forEach(transitDevice ->
-                                removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null)));
+                                removeGroupFromDevice(transitDevice, mcastIp,
+                                                      mcastUtils.assignedVlan(null)));
 
                 // Remove transit-facing ports on the ingress device
                 removeIngressTransitPorts(mcastIp, ingressDevice, source);
 
+                // TODO create a shared procedure with DEVICE_DOWN
                 // Compute mcast tree for the the egress devices
                 Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
 
-                // Construct a new path for each egress device
+                // We have to verify, if there are egresses without paths
+                Set<DeviceId> notRecovered = Sets.newHashSet();
                 mcastTree.forEach((egressDevice, paths) -> {
-                    // We try to enforce the sinks path on the mcast tree
+                    // Let's check if there is at least a path
                     Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
                                                        mcastIp, paths);
-                    // If a path is present, let's install it
-                    if (mcastPath.isPresent()) {
-                        installPath(mcastIp, source, mcastPath.get());
-                    } else {
+                    // No paths, we have to try with alternative location
+                    if (!mcastPath.isPresent()) {
+                        notRecovered.add(egressDevice);
+                        // We were not able to find an alternative path for this egress
                         log.warn("Fail to recover egress device {} from link failure {}",
                                  egressDevice, affectedLink);
-                        removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+                        removeGroupFromDevice(egressDevice, mcastIp,
+                                              mcastUtils.assignedVlan(null));
                     }
                 });
+
+                // Fast path, we can recover all the locations
+                if (notRecovered.isEmpty()) {
+                    // Construct a new path for each egress device
+                    mcastTree.forEach((egressDevice, paths) -> {
+                        // We try to enforce the sinks path on the mcast tree
+                        Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
+                                                           mcastIp, paths);
+                        // If a path is present, let's install it
+                        if (mcastPath.isPresent()) {
+                            installPath(mcastIp, source, mcastPath.get());
+                        }
+                    });
+                } else {
+                    // Let's try to recover using alternate
+                    recoverSinks(egressDevices, notRecovered, mcastIp,
+                                 ingressDevice, source, true);
+                }
             });
         } finally {
             mcastUnlock();
@@ -701,7 +767,7 @@
                         .stream().findAny().orElse(null);
                 Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
                 Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
-                ConnectPoint source = getSource(mcastIp);
+                ConnectPoint source = mcastUtils.getSource(mcastIp);
 
                 // Do not proceed if ingress device or source of this group are missing
                 // If sinks are in other leafs, we have ingress, transit, egress, and source
@@ -713,7 +779,7 @@
                 }
 
                 // Verify leadership on the operation
-                if (!isLeader(source)) {
+                if (!mcastUtils.isLeader(source)) {
                     log.debug("Skip {} due to lack of leadership", mcastIp);
                     return;
                 }
@@ -722,17 +788,19 @@
                 if (!transitDevices.isEmpty()) {
                     // Remove entire transit
                     transitDevices.forEach(transitDevice ->
-                                    removeGroupFromDevice(transitDevice, mcastIp, assignedVlan(null)));
+                                    removeGroupFromDevice(transitDevice, mcastIp,
+                                                          mcastUtils.assignedVlan(null)));
                 }
                 // If the ingress is down
                 if (ingressDevice.equals(deviceDown)) {
                     // Remove entire ingress
-                    removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
+                    removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
                     // If other sinks different from the ingress exist
                     if (!egressDevices.isEmpty()) {
                         // Remove all the remaining egress
                         egressDevices.forEach(
-                                egressDevice -> removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null))
+                                egressDevice -> removeGroupFromDevice(egressDevice, mcastIp,
+                                                                      mcastUtils.assignedVlan(null))
                         );
                     }
                 } else {
@@ -743,7 +811,7 @@
                     // One of the egress device is down
                     if (egressDevices.contains(deviceDown)) {
                         // Remove entire device down
-                        removeGroupFromDevice(deviceDown, mcastIp, assignedVlan(null));
+                        removeGroupFromDevice(deviceDown, mcastIp, mcastUtils.assignedVlan(null));
                         // Remove the device down from egress
                         egressDevices.remove(deviceDown);
                         // If there are no more egress and ingress does not have sinks
@@ -756,21 +824,39 @@
                     // Compute mcast tree for the the egress devices
                     Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, egressDevices);
 
-                    // Construct a new path for each egress device
+                    // We have to verify, if there are egresses without paths
+                    Set<DeviceId> notRecovered = Sets.newHashSet();
                     mcastTree.forEach((egressDevice, paths) -> {
+                        // Let's check if there is at least a path
                         Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
-                                                           mcastIp, null);
-                        // If there is a new path
-                        if (mcastPath.isPresent()) {
-                            // Let's install the new mcast path for this egress
-                            installPath(mcastIp, source, mcastPath.get());
-                        } else {
+                                                           mcastIp, paths);
+                        // No paths, we have to try with alternative location
+                        if (!mcastPath.isPresent()) {
+                            notRecovered.add(egressDevice);
                             // We were not able to find an alternative path for this egress
                             log.warn("Fail to recover egress device {} from device down {}",
                                      egressDevice, deviceDown);
-                            removeGroupFromDevice(egressDevice, mcastIp, assignedVlan(null));
+                            removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
                         }
                     });
+
+                    // Fast path, we can recover all the locations
+                    if (notRecovered.isEmpty()) {
+                        // Construct a new path for each egress device
+                        mcastTree.forEach((egressDevice, paths) -> {
+                            // We try to enforce the sinks path on the mcast tree
+                            Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
+                                                               mcastIp, paths);
+                            // If a path is present, let's install it
+                            if (mcastPath.isPresent()) {
+                                installPath(mcastIp, source, mcastPath.get());
+                            }
+                        });
+                    } else {
+                        // Let's try to recover using alternate
+                        recoverSinks(egressDevices, notRecovered, mcastIp,
+                                     ingressDevice, source, false);
+                    }
                 }
             });
         } finally {
@@ -779,6 +865,187 @@
     }
 
     /**
+     * Try to recover sinks using alternate locations.
+     *
+     * @param egressDevices the original egress devices
+     * @param notRecovered the devices not recovered
+     * @param mcastIp the group address
+     * @param ingressDevice the ingress device
+     * @param source the source connect point
+     * @param isLinkFailure true if it is a link failure, otherwise false
+     */
+    private void recoverSinks(Set<DeviceId> egressDevices, Set<DeviceId> notRecovered,
+                              IpAddress mcastIp, DeviceId ingressDevice, ConnectPoint source,
+                              boolean isLinkFailure) {
+        // Recovered devices
+        Set<DeviceId> recovered = Sets.difference(egressDevices, notRecovered);
+        // Total affected sinks
+        Set<ConnectPoint> totalAffectedSinks = Sets.newHashSet();
+        // Total sinks
+        Set<ConnectPoint> totalSinks = Sets.newHashSet();
+        // Let's compute all the affected sinks and all the sinks
+        notRecovered.forEach(deviceId -> {
+            totalAffectedSinks.addAll(
+                    mcastUtils.getAffectedSinks(deviceId, mcastIp)
+                            .values()
+                            .stream()
+                            .flatMap(Collection::stream)
+                            .filter(connectPoint -> connectPoint.deviceId().equals(deviceId))
+                            .collect(Collectors.toSet())
+            );
+            totalSinks.addAll(
+                    mcastUtils.getAffectedSinks(deviceId, mcastIp)
+                            .values()
+                            .stream()
+                            .flatMap(Collection::stream)
+                            .collect(Collectors.toSet())
+            );
+        });
+
+        // Sinks to be added
+        Set<ConnectPoint> sinksToBeAdded = Sets.difference(totalSinks, totalAffectedSinks);
+        // New egress devices, filtering out the source
+        Set<DeviceId> newEgressDevice = sinksToBeAdded.stream()
+                .map(ConnectPoint::deviceId)
+                .collect(Collectors.toSet());
+        // Let's add the devices recovered from the previous round
+        newEgressDevice.addAll(recovered);
+        // Let's do a copy of the new egresses and filter out the source
+        Set<DeviceId> copyNewEgressDevice = ImmutableSet.copyOf(newEgressDevice);
+        newEgressDevice = newEgressDevice.stream()
+                .filter(deviceId -> !deviceId.equals(ingressDevice))
+                .collect(Collectors.toSet());
+
+        // Re-compute mcast tree for the the egress devices
+        Map<DeviceId, List<Path>> mcastTree = computeMcastTree(ingressDevice, newEgressDevice);
+        // if the source was originally in the new locations, add new sinks
+        if (copyNewEgressDevice.contains(ingressDevice)) {
+            sinksToBeAdded.stream()
+                    .filter(connectPoint -> connectPoint.deviceId().equals(ingressDevice))
+                    .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, ImmutableList.of()));
+        }
+
+        // Construct a new path for each egress device
+        mcastTree.forEach((egressDevice, paths) -> {
+            // We try to enforce the sinks path on the mcast tree
+            Optional<Path> mcastPath = getPath(ingressDevice, egressDevice,
+                                               mcastIp, paths);
+            // If a path is present, let's install it
+            if (mcastPath.isPresent()) {
+                // Using recovery procedure
+                if (recovered.contains(egressDevice)) {
+                    installPath(mcastIp, source, mcastPath.get());
+                } else {
+                    // otherwise we need to threat as new sink
+                    sinksToBeAdded.stream()
+                            .filter(connectPoint -> connectPoint.deviceId().equals(egressDevice))
+                            .forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, paths));
+                }
+            } else {
+                // We were not able to find an alternative path for this egress
+                log.warn("Fail to recover egress device {} from {} failure",
+                         egressDevice, isLinkFailure ? "Link" : "Device");
+                removeGroupFromDevice(egressDevice, mcastIp, mcastUtils.assignedVlan(null));
+            }
+        });
+
+    }
+
+    /**
+     * Process new locations and return the set of sinks to be added
+     * in the context of the recovery.
+     *
+     * @param sinks the remaining locations
+     * @return the set of the sinks to be processed
+     */
+    private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
+                                                        Map<HostId, Set<ConnectPoint>> sinks) {
+        // Iterate over the sinks in order to build the set
+        // of the connect points to be served by this group
+        final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
+        sinks.forEach((hostId, connectPoints) -> {
+            // If it has more than 1 locations
+            if (connectPoints.size() > 1 || connectPoints.size() == 0) {
+                log.debug("Skip {} since sink {} has {} locations",
+                         mcastIp, hostId, connectPoints.size());
+                return;
+            }
+            sinksToBeProcessed.add(connectPoints.stream()
+                                           .findFirst().orElseGet(null));
+        });
+        return sinksToBeProcessed;
+    }
+
+    /**
+     * Process all the sinks related to a mcast group and return
+     * the ones to be processed.
+     *
+     * @param source the source connect point
+     * @param mcastIp the group address
+     * @param sinks the sinks to be evaluated
+     * @return the set of the sinks to be processed
+     */
+    private Set<ConnectPoint> processSinksToBeAdded(ConnectPoint source, IpAddress mcastIp,
+                                                    Map<HostId, Set<ConnectPoint>> sinks) {
+        // Iterate over the sinks in order to build the set
+        // of the connect points to be served by this group
+        final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
+        sinks.forEach(((hostId, connectPoints) -> {
+            // If it has more than 2 locations
+            if (connectPoints.size() > 2 || connectPoints.size() == 0) {
+                log.debug("Skip {} since sink {} has {} locations",
+                         mcastIp, hostId, connectPoints.size());
+                return;
+            }
+            // If it has one location, just use it
+            if (connectPoints.size() == 1) {
+                sinksToBeProcessed.add(connectPoints.stream()
+                                               .findFirst().orElseGet(null));
+                return;
+            }
+            // We prefer to reuse existing flows
+            ConnectPoint sinkToBeProcessed = connectPoints.stream()
+                    .filter(connectPoint -> {
+                        // Let's check if we are already serving that location
+                        McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId());
+                        if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
+                            return false;
+                        }
+                        // Get next and check with the port
+                        NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
+                        return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
+                    })
+                    .findFirst().orElse(null);
+            if (sinkToBeProcessed != null) {
+                sinksToBeProcessed.add(sinkToBeProcessed);
+                return;
+            }
+            // Otherwise we prefer to reuse existing egresses
+            Set<DeviceId> egresses = getDevice(mcastIp, EGRESS);
+            sinkToBeProcessed = connectPoints.stream()
+                    .filter(egresses::contains)
+                    .findFirst().orElse(null);
+            if (sinkToBeProcessed != null) {
+                sinksToBeProcessed.add(sinkToBeProcessed);
+                return;
+            }
+            // Otherwise we prefer a location co-located with the source (if it exists)
+            sinkToBeProcessed = connectPoints.stream()
+                    .filter(connectPoint -> connectPoint.deviceId().equals(source.deviceId()))
+                    .findFirst().orElse(null);
+            if (sinkToBeProcessed != null) {
+                sinksToBeProcessed.add(sinkToBeProcessed);
+                return;
+            }
+            // Finally, we randomly pick a new location
+            sinksToBeProcessed.add(connectPoints.stream()
+                                           .findFirst().orElseGet(null));
+        }));
+        // We have done, return the set
+        return sinksToBeProcessed;
+    }
+
+    /**
      * Utility method to remove all the ingress transit ports.
      *
      * @param mcastIp the group ip
@@ -791,7 +1058,7 @@
         ingressTransitPorts.forEach(ingressTransitPort -> {
             if (ingressTransitPort != null) {
                 boolean isLast = removePortFromDevice(ingressDevice, ingressTransitPort,
-                                                      mcastIp, assignedVlan(source));
+                                                      mcastIp, mcastUtils.assignedVlan(source));
                 if (isLast) {
                     mcastRoleStore.remove(new McastStoreKey(mcastIp, ingressDevice));
                 }
@@ -800,43 +1067,6 @@
     }
 
     /**
-     * Adds filtering objective for given device and port.
-     *
-     * @param deviceId device ID
-     * @param port ingress port number
-     * @param assignedVlan assigned VLAN ID
-     */
-    private void addFilterToDevice(DeviceId deviceId, PortNumber port,
-                                   VlanId assignedVlan, IpAddress mcastIp, McastRole mcastRole) {
-        // Do nothing if the port is configured as suppressed
-        ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
-        SegmentRoutingAppConfig appConfig = srManager.cfgService
-                .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
-        if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
-            log.info("Ignore suppressed port {}", connectPoint);
-            return;
-        }
-
-        MacAddress routerMac;
-        try {
-            routerMac = srManager.deviceConfiguration().getDeviceMac(deviceId);
-        } catch (DeviceConfigNotFoundException dcnfe) {
-            log.warn("Fail to push filtering objective since device is not configured. Abort");
-            return;
-        }
-
-        FilteringObjective.Builder filtObjBuilder =
-                filterObjBuilder(port, assignedVlan, mcastIp, routerMac, mcastRole);
-        ObjectiveContext context = new DefaultObjectiveContext(
-                (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
-                        deviceId, port.toLong(), assignedVlan),
-                (objective, error) ->
-                        log.warn("Failed to add filter on {}/{}, vlan {}: {}",
-                                deviceId, port.toLong(), assignedVlan, error));
-        srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
-    }
-
-    /**
      * Adds a port to given multicast group on given device. This involves the
      * update of L3 multicast group and multicast routing table entry.
      *
@@ -846,7 +1076,7 @@
      * @param assignedVlan assigned VLAN ID
      */
     private void addPortToDevice(DeviceId deviceId, PortNumber port,
-            IpAddress mcastIp, VlanId assignedVlan) {
+                                 IpAddress mcastIp, VlanId assignedVlan) {
         McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
         ImmutableSet.Builder<PortNumber> portBuilder = ImmutableSet.builder();
         NextObjective newNextObj;
@@ -854,7 +1084,7 @@
             // First time someone request this mcast group via this device
             portBuilder.add(port);
             // New nextObj
-            newNextObj = nextObjBuilder(mcastIp, assignedVlan,
+            newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
                                         portBuilder.build(), null).add();
             // Store the new port
             mcastNextObjStore.put(mcastStoreKey, newNextObj);
@@ -862,7 +1092,7 @@
             // This device already serves some subscribers of this mcast group
             NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
             // Stop if the port is already in the nextobj
-            Set<PortNumber> existingPorts = getPorts(nextObj.next());
+            Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
             if (existingPorts.contains(port)) {
                 log.info("NextObj for {}/{} already exists. Abort", deviceId, port);
                 return;
@@ -870,14 +1100,14 @@
             // Let's add the port and reuse the previous one
             portBuilder.addAll(existingPorts).add(port);
             // Reuse previous nextObj
-            newNextObj = nextObjBuilder(mcastIp, assignedVlan,
+            newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
                                         portBuilder.build(), nextObj.id()).addToExisting();
             // Store the final next objective and send only the difference to the driver
             mcastNextObjStore.put(mcastStoreKey, newNextObj);
             // Add just the new port
             portBuilder = ImmutableSet.builder();
             portBuilder.add(port);
-            newNextObj = nextObjBuilder(mcastIp, assignedVlan,
+            newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
                                         portBuilder.build(), nextObj.id()).addToExisting();
         }
         // Create, store and apply the new nextObj and fwdObj
@@ -887,8 +1117,8 @@
                 (objective, error) ->
                         log.warn("Failed to add {} on {}/{}, vlan {}: {}",
                                 mcastIp, deviceId, port.toLong(), assignedVlan, error));
-        ForwardingObjective fwdObj =
-                fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
+        ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan,
+                                                              newNextObj.id()).add(context);
         srManager.flowObjectiveService.next(deviceId, newNextObj);
         srManager.flowObjectiveService.forward(deviceId, fwdObj);
     }
@@ -905,7 +1135,7 @@
      * @return true if this is the last sink on this device
      */
     private boolean removePortFromDevice(DeviceId deviceId, PortNumber port,
-            IpAddress mcastIp, VlanId assignedVlan) {
+                                         IpAddress mcastIp, VlanId assignedVlan) {
         McastStoreKey mcastStoreKey =
                 new McastStoreKey(mcastIp, deviceId);
         // This device is not serving this multicast group
@@ -915,7 +1145,7 @@
         }
         NextObjective nextObj = mcastNextObjStore.get(mcastStoreKey).value();
 
-        Set<PortNumber> existingPorts = getPorts(nextObj.next());
+        Set<PortNumber> existingPorts = mcastUtils.getPorts(nextObj.next());
         // This port does not serve this multicast group
         if (!existingPorts.contains(port)) {
             log.warn("{} is not serving {} on port {}. Abort.", deviceId, mcastIp, port);
@@ -939,7 +1169,7 @@
                     (objective, error) ->
                             log.warn("Failed to remove {} on {}/{}, vlan {}: {}",
                                     mcastIp, deviceId, port.toLong(), assignedVlan, error));
-            fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
+            fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
             mcastNextObjStore.remove(mcastStoreKey);
         } else {
             // If this is not the last sink, update flows and groups
@@ -950,13 +1180,13 @@
                             log.warn("Failed to update {} on {}/{}, vlan {}: {}",
                                     mcastIp, deviceId, port.toLong(), assignedVlan, error));
             // Here we store the next objective with the remaining port
-            newNextObj = nextObjBuilder(mcastIp, assignedVlan,
+            newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
                                         existingPorts, nextObj.id()).removeFromExisting();
-            fwdObj = fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
+            fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, newNextObj.id()).add(context);
             mcastNextObjStore.put(mcastStoreKey, newNextObj);
         }
         // Let's modify the next objective removing the bucket
-        newNextObj = nextObjBuilder(mcastIp, assignedVlan,
+        newNextObj = mcastUtils.nextObjBuilder(mcastIp, assignedVlan,
                                     ImmutableSet.of(port), nextObj.id()).removeFromExisting();
         srManager.flowObjectiveService.next(deviceId, newNextObj);
         srManager.flowObjectiveService.forward(deviceId, fwdObj);
@@ -971,7 +1201,7 @@
      * @param assignedVlan assigned VLAN ID
      */
     private void removeGroupFromDevice(DeviceId deviceId, IpAddress mcastIp,
-            VlanId assignedVlan) {
+                                       VlanId assignedVlan) {
         McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, deviceId);
         // This device is not serving this multicast group
         if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
@@ -988,7 +1218,7 @@
                 (objective, error) ->
                         log.warn("Failed to remove {} on {}, vlan {}: {}",
                                 mcastIp, deviceId, assignedVlan, error));
-        ForwardingObjective fwdObj = fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
+        ForwardingObjective fwdObj = mcastUtils.fwdObjBuilder(mcastIp, assignedVlan, nextObj.id()).remove(context);
         srManager.flowObjectiveService.forward(deviceId, fwdObj);
         mcastNextObjStore.remove(mcastStoreKey);
         mcastRoleStore.remove(mcastStoreKey);
@@ -1006,9 +1236,9 @@
         // and a new filter objective on the destination port
         links.forEach(link -> {
             addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
-                            assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
-            addFilterToDevice(link.dst().deviceId(), link.dst().port(),
-                              assignedVlan(null), mcastIp, null);
+                            mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
+            mcastUtils.addFilterToDevice(link.dst().deviceId(), link.dst().port(),
+                              mcastUtils.assignedVlan(null), mcastIp, null);
         });
 
         // Setup mcast role for the transit
@@ -1019,137 +1249,6 @@
     }
 
     /**
-     * Creates a next objective builder for multicast.
-     *
-     * @param mcastIp multicast group
-     * @param assignedVlan assigned VLAN ID
-     * @param outPorts set of output port numbers
-     * @return next objective builder
-     */
-    private NextObjective.Builder nextObjBuilder(IpAddress mcastIp,
-            VlanId assignedVlan, Set<PortNumber> outPorts, Integer nextId) {
-        // If nextId is null allocate a new one
-        if (nextId == null) {
-            nextId = srManager.flowObjectiveService.allocateNextId();
-        }
-
-        TrafficSelector metadata =
-                DefaultTrafficSelector.builder()
-                        .matchVlanId(assignedVlan)
-                        .matchIPDst(mcastIp.toIpPrefix())
-                        .build();
-
-        NextObjective.Builder nextObjBuilder = DefaultNextObjective
-                .builder().withId(nextId)
-                .withType(NextObjective.Type.BROADCAST).fromApp(srManager.appId())
-                .withMeta(metadata);
-
-        outPorts.forEach(port -> {
-            TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
-            if (egressVlan().equals(VlanId.NONE)) {
-                tBuilder.popVlan();
-            }
-            tBuilder.setOutput(port);
-            nextObjBuilder.addTreatment(tBuilder.build());
-        });
-
-        return nextObjBuilder;
-    }
-
-    /**
-     * Creates a forwarding objective builder for multicast.
-     *
-     * @param mcastIp multicast group
-     * @param assignedVlan assigned VLAN ID
-     * @param nextId next ID of the L3 multicast group
-     * @return forwarding objective builder
-     */
-    private ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
-            VlanId assignedVlan, int nextId) {
-        TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
-        IpPrefix mcastPrefix = mcastIp.toIpPrefix();
-
-        if (mcastIp.isIp4()) {
-            sbuilder.matchEthType(Ethernet.TYPE_IPV4);
-            sbuilder.matchIPDst(mcastPrefix);
-        } else {
-            sbuilder.matchEthType(Ethernet.TYPE_IPV6);
-            sbuilder.matchIPv6Dst(mcastPrefix);
-        }
-
-
-        TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
-        metabuilder.matchVlanId(assignedVlan);
-
-        ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
-        fwdBuilder.withSelector(sbuilder.build())
-                .withMeta(metabuilder.build())
-                .nextStep(nextId)
-                .withFlag(ForwardingObjective.Flag.SPECIFIC)
-                .fromApp(srManager.appId())
-                .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
-        return fwdBuilder;
-    }
-
-    /**
-     * Creates a filtering objective builder for multicast.
-     *
-     * @param ingressPort ingress port of the multicast stream
-     * @param assignedVlan assigned VLAN ID
-     * @param routerMac router MAC. This is carried in metadata and used from some switches that
-     *                  need to put unicast entry before multicast entry in TMAC table.
-     * @return filtering objective builder
-     */
-    private FilteringObjective.Builder filterObjBuilder(PortNumber ingressPort,
-            VlanId assignedVlan, IpAddress mcastIp, MacAddress routerMac, McastRole mcastRole) {
-        FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
-        // Let's add the in port matching and the priority
-        filtBuilder.withKey(Criteria.matchInPort(ingressPort))
-                .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
-        // According to the mcast role we match on the proper vlan
-        // If the role is null we are on the transit or on the egress
-        if (mcastRole == null) {
-            filtBuilder.addCondition(Criteria.matchVlanId(egressVlan()));
-        } else {
-            filtBuilder.addCondition(Criteria.matchVlanId(ingressVlan()));
-        }
-        // According to the IP type we set the proper match on the mac address
-        if (mcastIp.isIp4()) {
-            filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
-                    MacAddress.IPV4_MULTICAST_MASK));
-        } else {
-            filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
-                     MacAddress.IPV6_MULTICAST_MASK));
-        }
-        // We finally build the meta treatment
-        TrafficTreatment tt = DefaultTrafficTreatment.builder()
-                .pushVlan().setVlanId(assignedVlan)
-                .setEthDst(routerMac)
-                .build();
-        filtBuilder.withMeta(tt);
-        // Done, we return a permit filtering objective
-        return filtBuilder.permit().fromApp(srManager.appId());
-    }
-
-    /**
-     * Gets output ports information from treatments.
-     *
-     * @param treatments collection of traffic treatments
-     * @return set of output port numbers
-     */
-    private Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
-        ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
-        treatments.forEach(treatment -> {
-            treatment.allInstructions().stream()
-                    .filter(instr -> instr instanceof OutputInstruction)
-                    .forEach(instr -> {
-                        builder.add(((OutputInstruction) instr).port());
-                    });
-        });
-        return builder.build();
-    }
-
-    /**
      * Go through all the paths, looking for shared links to be used
      * in the final path computation.
      *
@@ -1167,13 +1266,13 @@
         // Verify the source can still reach all the egresses
         for (DeviceId egress : egresses) {
             // From the source we cannot reach all the sinks
-            // just continue and let's figured out after
+            // just continue and let's figure out after
             currentPaths = availablePaths.get(egress);
             if (currentPaths.isEmpty()) {
                 continue;
             }
             // Get the length of the first one available,
-            // update the min length and save the paths
+            // update the min length
             length = currentPaths.get(0).links().size();
             if (length < minLength) {
                 minLength = length;
@@ -1189,7 +1288,7 @@
         Set<Link> sharedLinks = Sets.newHashSet();
         Set<Link> currentSharedLinks;
         Set<Link> currentLinks;
-        DeviceId deviceToRemove = null;
+        DeviceId egressToRemove = null;
         // Let's find out the shared links
         while (index < minLength) {
             // Initialize the intersection with the paths related to the first egress
@@ -1215,7 +1314,7 @@
                 // If there are no shared paths exit and record the device to remove
                 // we have to retry with a subset of sinks
                 if (currentSharedLinks.isEmpty()) {
-                    deviceToRemove = egress;
+                    egressToRemove = egress;
                     index = minLength;
                     break;
                 }
@@ -1226,8 +1325,8 @@
         // If the shared links is empty and there are egress
         // let's retry another time with less sinks, we can
         // still build optimal subtrees
-        if (sharedLinks.isEmpty() && egresses.size() > 1 && deviceToRemove != null) {
-            egresses.remove(deviceToRemove);
+        if (sharedLinks.isEmpty() && egresses.size() > 1 && egressToRemove != null) {
+            egresses.remove(egressToRemove);
             sharedLinks = exploreMcastTree(egresses, availablePaths);
         }
         return sharedLinks;
@@ -1241,18 +1340,20 @@
      * @return the computed Mcast tree
      */
     private Map<ConnectPoint, List<Path>> computeSinkMcastTree(DeviceId source,
-                                                                Set<ConnectPoint> sinks) {
+                                                               Set<ConnectPoint> sinks) {
         // Get the egress devices, remove source from the egress if present
         Set<DeviceId> egresses = sinks.stream()
                 .map(ConnectPoint::deviceId)
                 .filter(deviceId -> !deviceId.equals(source))
                 .collect(Collectors.toSet());
         Map<DeviceId, List<Path>> mcastTree = computeMcastTree(source, egresses);
-        // Build final tree nad return it as it is
+        // Build final tree and return it as it is
         final Map<ConnectPoint, List<Path>> finalTree = Maps.newHashMap();
-        mcastTree.forEach((egress, paths) ->
-            sinks.stream().filter(sink -> sink.deviceId().equals(egress))
-                    .forEach(sink -> finalTree.put(sink, mcastTree.get(sink.deviceId()))));
+        // We need to put back the source if it was originally present
+        sinks.forEach(sink -> {
+            List<Path> sinkPaths = mcastTree.get(sink.deviceId());
+            finalTree.put(sink, sinkPaths != null ? sinkPaths : ImmutableList.of());
+        });
         return finalTree;
     }
 
@@ -1356,7 +1457,7 @@
                 }
                 // Get the output ports on the next
                 nextObj = mcastNextObjStore.get(mcastStoreKey).value();
-                existingPorts = getPorts(nextObj.next());
+                existingPorts = mcastUtils.getPorts(nextObj.next());
                 // And the src port on the link
                 srcPort = hop.src().port();
                 // the src port is not used as output, exit
@@ -1412,37 +1513,6 @@
     }
 
     /**
-     * Gets source connect point of given multicast group.
-     *
-     * @param mcastIp multicast IP
-     * @return source connect point or null if not found
-     */
-    // FIXME To be addressed with multiple sources support
-    private ConnectPoint getSource(IpAddress mcastIp) {
-        // FIXME we should support different types of routes
-        McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
-                .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
-                .findFirst().orElse(null);
-        return mcastRoute == null ? null : srManager.multicastRouteService.sources(mcastRoute)
-                .stream()
-                .findFirst().orElse(null);
-    }
-    /**
-     * Gets sinks of given multicast group.
-     *
-     * @param mcastIp multicast IP
-     * @return set of sinks or empty set if not found
-     */
-    private Set<ConnectPoint> getSinks(IpAddress mcastIp) {
-        // FIXME we should support different types of routes
-        McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
-                .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
-                .findFirst().orElse(null);
-        return mcastRoute == null ?
-                Collections.emptySet() : srManager.multicastRouteService.sinks(mcastRoute);
-    }
-
-    /**
      * Gets groups which is affected by the link down event.
      *
      * @param link link going down
@@ -1453,7 +1523,7 @@
         PortNumber port = link.src().port();
         return mcastNextObjStore.entrySet().stream()
                 .filter(entry -> entry.getKey().deviceId().equals(deviceId) &&
-                        getPorts(entry.getValue().value().next()).contains(port))
+                        mcastUtils.getPorts(entry.getValue().value().next()).contains(port))
                 .map(Entry::getKey).map(McastStoreKey::mcastIp)
                 .collect(Collectors.toSet());
     }
@@ -1472,59 +1542,6 @@
     }
 
     /**
-     * Gets ingress VLAN from McastConfig.
-     *
-     * @return ingress VLAN or VlanId.NONE if not configured
-     */
-    private VlanId ingressVlan() {
-        McastConfig mcastConfig =
-                srManager.cfgService.getConfig(coreAppId, McastConfig.class);
-        return (mcastConfig != null) ? mcastConfig.ingressVlan() : VlanId.NONE;
-    }
-
-    /**
-     * Gets egress VLAN from McastConfig.
-     *
-     * @return egress VLAN or VlanId.NONE if not configured
-     */
-    private VlanId egressVlan() {
-        McastConfig mcastConfig =
-                srManager.cfgService.getConfig(coreAppId, McastConfig.class);
-        return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
-    }
-
-    /**
-     * Gets assigned VLAN according to the value of egress VLAN.
-     * If connect point is specified, try to reuse the assigned VLAN on the connect point.
-     *
-     * @param cp connect point; Can be null if not specified
-     * @return assigned VLAN ID
-     */
-    private VlanId assignedVlan(ConnectPoint cp) {
-        // Use the egressVlan if it is tagged
-        if (!egressVlan().equals(VlanId.NONE)) {
-            return egressVlan();
-        }
-        // Reuse unicast VLAN if the port has subnet configured
-        if (cp != null) {
-            VlanId untaggedVlan = srManager.getInternalVlanId(cp);
-            return (untaggedVlan != null) ? untaggedVlan : INTERNAL_VLAN;
-        }
-        // Use DEFAULT_VLAN if none of the above matches
-        return SegmentRoutingManager.INTERNAL_VLAN;
-    }
-
-    /**
-     * Gets assigned VLAN according to the value in the meta.
-     *
-     * @param nextObjective nextObjective to analyze
-     * @return assigned VLAN ID
-     */
-    private VlanId assignedVlanFromNext(NextObjective nextObjective) {
-        return ((VlanIdCriterion) nextObjective.meta().getCriterion(VLAN_VID)).vlanId();
-    }
-
-    /**
      * Gets the spine-facing port on ingress device of given multicast group.
      *
      * @param mcastIp multicast IP
@@ -1537,7 +1554,7 @@
         if (ingressDevice != null) {
             NextObjective nextObj = mcastNextObjStore
                     .get(new McastStoreKey(mcastIp, ingressDevice)).value();
-            Set<PortNumber> ports = getPorts(nextObj.next());
+            Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
             // Let's find out all the ingress-transit ports
             for (PortNumber port : ports) {
                 // Spine-facing port should have no subnet and no xconnect
@@ -1570,7 +1587,7 @@
             if (versionedNextObj != null) {
                 NextObjective nextObj = versionedNextObj.value();
                 // Retrieves all the output ports
-                Set<PortNumber> ports = getPorts(nextObj.next());
+                Set<PortNumber> ports = mcastUtils.getPorts(nextObj.next());
                 // Tries to find at least one port that is not spine-facing
                 for (PortNumber port : ports) {
                     // Spine-facing port should have no subnet and no xconnect
@@ -1586,44 +1603,6 @@
     }
 
     /**
-     * Removes filtering objective for given device and port.
-     *
-     * @param deviceId device ID
-     * @param port ingress port number
-     * @param assignedVlan assigned VLAN ID
-     * @param mcastIp multicast IP address
-     */
-    private void removeFilterToDevice(DeviceId deviceId, PortNumber port,
-                                      VlanId assignedVlan, IpAddress mcastIp, McastRole mcastRole) {
-        // Do nothing if the port is configured as suppressed
-        ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
-        SegmentRoutingAppConfig appConfig = srManager.cfgService
-                .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
-        if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
-            log.info("Ignore suppressed port {}", connectPoint);
-            return;
-        }
-
-        MacAddress routerMac;
-        try {
-            routerMac = srManager.deviceConfiguration().getDeviceMac(deviceId);
-        } catch (DeviceConfigNotFoundException dcnfe) {
-            log.warn("Fail to push filtering objective since device is not configured. Abort");
-            return;
-        }
-
-        FilteringObjective.Builder filtObjBuilder =
-                filterObjBuilder(port, assignedVlan, mcastIp, routerMac, mcastRole);
-        ObjectiveContext context = new DefaultObjectiveContext(
-                (objective) -> log.debug("Successfully removed filter on {}/{}, vlan {}",
-                                         deviceId, port.toLong(), assignedVlan),
-                (objective, error) ->
-                        log.warn("Failed to remove filter on {}/{}, vlan {}: {}",
-                                 deviceId, port.toLong(), assignedVlan, error));
-        srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.remove(context));
-    }
-
-    /**
      * Updates filtering objective for given device and port.
      * It is called in general when the mcast config has been
      * changed.
@@ -1647,9 +1626,9 @@
                         .findFirst().orElse(null);
                 if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
                     if (install) {
-                        addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
+                        mcastUtils.addFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), INGRESS);
                     } else {
-                        removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
+                        mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
                     }
                 }
             });
@@ -1658,26 +1637,6 @@
         }
     }
 
-    private boolean isLeader(ConnectPoint source) {
-        // Continue only when we have the mastership on the operation
-        if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
-            // When the source is available we just check the mastership
-            if (srManager.deviceService.isAvailable(source.deviceId())) {
-                return false;
-            }
-            // Fallback with Leadership service
-            // source id is used a topic
-            NodeId leader = srManager.leadershipService.runForLeadership(
-                    source.deviceId().toString()).leaderNodeId();
-            // Verify if this node is the leader
-            if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
-                return false;
-            }
-        }
-        // Done
-        return true;
-    }
-
     /**
      * Performs bucket verification operation for all mcast groups in the devices.
      * Firstly, it verifies that mcast is stable before trying verification operation.
@@ -1710,8 +1669,10 @@
                         Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT);
                         Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS);
                         // Get source and sinks from Mcast Route Service and warn about errors
-                        ConnectPoint source = getSource(mcastIp);
-                        Set<ConnectPoint> sinks = getSinks(mcastIp);
+                        ConnectPoint source = mcastUtils.getSource(mcastIp);
+                        Set<ConnectPoint> sinks = mcastUtils.getSinks(mcastIp).values().stream()
+                                .flatMap(Collection::stream)
+                                .collect(Collectors.toSet());
 
                         // Do not proceed if ingress device or source of this group are missing
                         if (ingressDevice == null || source == null) {
@@ -1750,11 +1711,12 @@
                             if (mcastNextObjStore.containsKey(currentKey)) {
                                 NextObjective currentNext = mcastNextObjStore.get(currentKey).value();
                                 // Get current ports
-                                Set<PortNumber> currentPorts = getPorts(currentNext.next());
+                                Set<PortNumber> currentPorts = mcastUtils.getPorts(currentNext.next());
                                 // Rebuild the next objective
-                                currentNext = nextObjBuilder(
+                                currentNext = mcastUtils.nextObjBuilder(
                                         mcastIp,
-                                        assignedVlan(deviceId.equals(source.deviceId()) ? source : null),
+                                        mcastUtils.assignedVlan(deviceId.equals(source.deviceId()) ?
+                                                                        source : null),
                                         currentPorts,
                                         currentNext.id()
                                 ).verify();
@@ -1807,7 +1769,7 @@
     public Map<ConnectPoint, List<ConnectPoint>> getMcastPaths(IpAddress mcastIp) {
         Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
         // Get the source
-        ConnectPoint source = getSource(mcastIp);
+        ConnectPoint source = mcastUtils.getSource(mcastIp);
         // Source cannot be null, we don't know the starting point
         if (source != null) {
             // Init steps
@@ -1837,7 +1799,7 @@
             // Build egress connectpoints
             NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
             // Get Ports
-            Set<PortNumber> outputPorts = getPorts(nextObjective.next());
+            Set<PortNumber> outputPorts = mcastUtils.getPorts(nextObjective.next());
             // Build relative cps
             ImmutableSet.Builder<ConnectPoint> cpBuilder = ImmutableSet.builder();
             outputPorts.forEach(portNumber -> cpBuilder.add(new ConnectPoint(toVisit, portNumber)));
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
new file mode 100644
index 0000000..37f273b
--- /dev/null
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastUtils.java
@@ -0,0 +1,428 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.segmentrouting.mcast;
+
+import com.google.common.collect.ImmutableSet;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.HostId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.config.basics.McastConfig;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flow.criteria.VlanIdCriterion;
+import org.onosproject.net.flow.instructions.Instructions;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
+import org.onosproject.net.flowobjective.DefaultNextObjective;
+import org.onosproject.net.flowobjective.DefaultObjectiveContext;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.flowobjective.NextObjective;
+import org.onosproject.net.flowobjective.ObjectiveContext;
+import org.onosproject.segmentrouting.SegmentRoutingManager;
+import org.onosproject.segmentrouting.SegmentRoutingService;
+import org.onosproject.segmentrouting.config.DeviceConfigNotFoundException;
+import org.onosproject.segmentrouting.config.SegmentRoutingAppConfig;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.onosproject.net.flow.criteria.Criterion.Type.VLAN_VID;
+import static org.onosproject.segmentrouting.SegmentRoutingManager.INTERNAL_VLAN;
+
+/**
+ * Utility class for Multicast Handler.
+ */
+class McastUtils {
+
+    // Internal reference to the log
+    private final Logger log;
+    // Internal reference to SR Manager
+    private SegmentRoutingManager srManager;
+    // Internal reference to the app id
+    private ApplicationId coreAppId;
+
+    /**
+     * Builds a new McastUtils object.
+     *
+     * @param srManager the SR manager
+     * @param coreAppId the core application id
+     * @param log log reference of the McastHandler
+     */
+    McastUtils(SegmentRoutingManager srManager, ApplicationId coreAppId, Logger log) {
+        this.srManager = srManager;
+        this.coreAppId = coreAppId;
+        this.log = log;
+    }
+
+    /**
+     * Given a connect point define a leader for it.
+     *
+     * @param source the source connect point
+     * @return true if this instance is the leader, otherwise false
+     */
+    boolean isLeader(ConnectPoint source) {
+        // Continue only when we have the mastership on the operation
+        if (!srManager.mastershipService.isLocalMaster(source.deviceId())) {
+            // When the source is available we just check the mastership
+            if (srManager.deviceService.isAvailable(source.deviceId())) {
+                return false;
+            }
+            // Fallback with Leadership service
+            // source id is used a topic
+            NodeId leader = srManager.leadershipService.runForLeadership(
+                    source.deviceId().toString()).leaderNodeId();
+            // Verify if this node is the leader
+            if (!srManager.clusterService.getLocalNode().id().equals(leader)) {
+                return false;
+            }
+        }
+        // Done
+        return true;
+    }
+
+    /**
+     * Get router mac using application config and the connect point.
+     *
+     * @param deviceId the device id
+     * @param port the port number
+     * @return the router mac if the port is configured, otherwise null
+     */
+    private MacAddress getRouterMac(DeviceId deviceId, PortNumber port) {
+        // Do nothing if the port is configured as suppressed
+        ConnectPoint connectPoint = new ConnectPoint(deviceId, port);
+        SegmentRoutingAppConfig appConfig = srManager.cfgService
+                .getConfig(srManager.appId(), SegmentRoutingAppConfig.class);
+        if (appConfig != null && appConfig.suppressSubnet().contains(connectPoint)) {
+            log.info("Ignore suppressed port {}", connectPoint);
+            return MacAddress.NONE;
+        }
+        // Get the router mac using the device configuration
+        MacAddress routerMac;
+        try {
+            routerMac = srManager.deviceConfiguration().getDeviceMac(deviceId);
+        } catch (DeviceConfigNotFoundException dcnfe) {
+            log.warn("Fail to push filtering objective since device is not configured. Abort");
+            return MacAddress.NONE;
+        }
+        return routerMac;
+    }
+
+    /**
+     * Adds filtering objective for given device and port.
+     *
+     * @param deviceId device ID
+     * @param port ingress port number
+     * @param assignedVlan assigned VLAN ID
+     * @param mcastIp the group address
+     * @param mcastRole the role of the device
+     */
+    void addFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan,
+                           IpAddress mcastIp, McastRole mcastRole) {
+
+        MacAddress routerMac = getRouterMac(deviceId, port);
+        if (routerMac.equals(MacAddress.NONE)) {
+            return;
+        }
+
+        FilteringObjective.Builder filtObjBuilder = filterObjBuilder(port, assignedVlan, mcastIp,
+                                                                     routerMac, mcastRole);
+        ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> log.debug("Successfully add filter on {}/{}, vlan {}",
+                                         deviceId, port.toLong(), assignedVlan),
+                (objective, error) ->
+                        log.warn("Failed to add filter on {}/{}, vlan {}: {}",
+                                 deviceId, port.toLong(), assignedVlan, error));
+        srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.add(context));
+    }
+
+    /**
+     * Removes filtering objective for given device and port.
+     *
+     * @param deviceId device ID
+     * @param port ingress port number
+     * @param assignedVlan assigned VLAN ID
+     * @param mcastIp multicast IP address
+     * @param mcastRole the multicast role of the device
+     */
+    void removeFilterToDevice(DeviceId deviceId, PortNumber port, VlanId assignedVlan,
+                              IpAddress mcastIp, McastRole mcastRole) {
+
+        MacAddress routerMac = getRouterMac(deviceId, port);
+        if (routerMac.equals(MacAddress.NONE)) {
+            return;
+        }
+
+        FilteringObjective.Builder filtObjBuilder =
+                filterObjBuilder(port, assignedVlan, mcastIp, routerMac, mcastRole);
+        ObjectiveContext context = new DefaultObjectiveContext(
+                (objective) -> log.debug("Successfully removed filter on {}/{}, vlan {}",
+                                         deviceId, port.toLong(), assignedVlan),
+                (objective, error) ->
+                        log.warn("Failed to remove filter on {}/{}, vlan {}: {}",
+                                 deviceId, port.toLong(), assignedVlan, error));
+        srManager.flowObjectiveService.filter(deviceId, filtObjBuilder.remove(context));
+    }
+
+    /**
+     * Gets assigned VLAN according to the value in the meta.
+     *
+     * @param nextObjective nextObjective to analyze
+     * @return assigned VLAN ID
+     */
+    VlanId assignedVlanFromNext(NextObjective nextObjective) {
+        return ((VlanIdCriterion) nextObjective.meta().getCriterion(VLAN_VID)).vlanId();
+    }
+
+    /**
+     * Gets ingress VLAN from McastConfig.
+     *
+     * @return ingress VLAN or VlanId.NONE if not configured
+     */
+    private VlanId ingressVlan() {
+        McastConfig mcastConfig =
+                srManager.cfgService.getConfig(coreAppId, McastConfig.class);
+        return (mcastConfig != null) ? mcastConfig.ingressVlan() : VlanId.NONE;
+    }
+
+    /**
+     * Gets egress VLAN from McastConfig.
+     *
+     * @return egress VLAN or VlanId.NONE if not configured
+     */
+    private VlanId egressVlan() {
+        McastConfig mcastConfig =
+                srManager.cfgService.getConfig(coreAppId, McastConfig.class);
+        return (mcastConfig != null) ? mcastConfig.egressVlan() : VlanId.NONE;
+    }
+
+    /**
+     * Gets assigned VLAN according to the value of egress VLAN.
+     * If connect point is specified, try to reuse the assigned VLAN on the connect point.
+     *
+     * @param cp connect point; Can be null if not specified
+     * @return assigned VLAN ID
+     */
+    VlanId assignedVlan(ConnectPoint cp) {
+        // Use the egressVlan if it is tagged
+        if (!egressVlan().equals(VlanId.NONE)) {
+            return egressVlan();
+        }
+        // Reuse unicast VLAN if the port has subnet configured
+        if (cp != null) {
+            VlanId untaggedVlan = srManager.getInternalVlanId(cp);
+            return (untaggedVlan != null) ? untaggedVlan : INTERNAL_VLAN;
+        }
+        // Use DEFAULT_VLAN if none of the above matches
+        return SegmentRoutingManager.INTERNAL_VLAN;
+    }
+
+    /**
+     * Gets source connect point of given multicast group.
+     *
+     * @param mcastIp multicast IP
+     * @return source connect point or null if not found
+     */
+    // FIXME To be addressed with multiple sources support
+    ConnectPoint getSource(IpAddress mcastIp) {
+        // FIXME we should support different types of routes
+        McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
+                .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
+                .findFirst().orElse(null);
+        return mcastRoute == null ? null : srManager.multicastRouteService.sources(mcastRoute)
+                .stream()
+                .findFirst().orElse(null);
+    }
+
+    /**
+     * Gets sinks of given multicast group.
+     *
+     * @param mcastIp multicast IP
+     * @return map of sinks or empty map if not found
+     */
+    Map<HostId, Set<ConnectPoint>> getSinks(IpAddress mcastIp) {
+        // FIXME we should support different types of routes
+        McastRoute mcastRoute = srManager.multicastRouteService.getRoutes().stream()
+                .filter(mcastRouteInternal -> mcastRouteInternal.group().equals(mcastIp))
+                .findFirst().orElse(null);
+        return mcastRoute == null ?
+                Collections.emptyMap() :
+                srManager.multicastRouteService.routeData(mcastRoute).sinks();
+    }
+
+    /**
+     * Get sinks affected by this egress device.
+     *
+     * @param egressDevice the egress device
+     * @param mcastIp the mcast ip address
+     * @return the map of the sinks affected
+     */
+    Map<HostId, Set<ConnectPoint>> getAffectedSinks(DeviceId egressDevice,
+                                                            IpAddress mcastIp) {
+        return getSinks(mcastIp).entrySet()
+                .stream()
+                .filter(hostIdSetEntry -> hostIdSetEntry.getValue().stream()
+                        .map(ConnectPoint::deviceId)
+                        .anyMatch(deviceId -> deviceId.equals(egressDevice))
+                ).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    }
+
+    /**
+     * Creates a next objective builder for multicast.
+     *
+     * @param mcastIp multicast group
+     * @param assignedVlan assigned VLAN ID
+     * @param outPorts set of output port numbers
+     * @param nextId the next id
+     * @return next objective builder
+     */
+    NextObjective.Builder nextObjBuilder(IpAddress mcastIp, VlanId assignedVlan,
+                                         Set<PortNumber> outPorts, Integer nextId) {
+        // If nextId is null allocate a new one
+        if (nextId == null) {
+            nextId = srManager.flowObjectiveService.allocateNextId();
+        }
+        // Build the meta selector with the fwd objective info
+        TrafficSelector metadata =
+                DefaultTrafficSelector.builder()
+                        .matchVlanId(assignedVlan)
+                        .matchIPDst(mcastIp.toIpPrefix())
+                        .build();
+        // Define the nextobjective type
+        NextObjective.Builder nextObjBuilder = DefaultNextObjective
+                .builder().withId(nextId)
+                .withType(NextObjective.Type.BROADCAST)
+                .fromApp(srManager.appId())
+                .withMeta(metadata);
+        // Add the output ports
+        outPorts.forEach(port -> {
+            TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+            if (egressVlan().equals(VlanId.NONE)) {
+                tBuilder.popVlan();
+            }
+            tBuilder.setOutput(port);
+            nextObjBuilder.addTreatment(tBuilder.build());
+        });
+        // Done return the complete builder
+        return nextObjBuilder;
+    }
+
+    /**
+     * Creates a forwarding objective builder for multicast.
+     *
+     * @param mcastIp multicast group
+     * @param assignedVlan assigned VLAN ID
+     * @param nextId next ID of the L3 multicast group
+     * @return forwarding objective builder
+     */
+    ForwardingObjective.Builder fwdObjBuilder(IpAddress mcastIp,
+                                                      VlanId assignedVlan, int nextId) {
+        TrafficSelector.Builder sbuilder = DefaultTrafficSelector.builder();
+        // Let's the matching on the group address
+        // TODO SSM support in future
+        if (mcastIp.isIp6()) {
+            sbuilder.matchEthType(Ethernet.TYPE_IPV6);
+            sbuilder.matchIPv6Dst(mcastIp.toIpPrefix());
+        } else {
+            sbuilder.matchEthType(Ethernet.TYPE_IPV4);
+            sbuilder.matchIPDst(mcastIp.toIpPrefix());
+        }
+        // Then build the meta selector
+        TrafficSelector.Builder metabuilder = DefaultTrafficSelector.builder();
+        metabuilder.matchVlanId(assignedVlan);
+        // Finally return the completed builder
+        ForwardingObjective.Builder fwdBuilder = DefaultForwardingObjective.builder();
+        fwdBuilder.withSelector(sbuilder.build())
+                .withMeta(metabuilder.build())
+                .nextStep(nextId)
+                .withFlag(ForwardingObjective.Flag.SPECIFIC)
+                .fromApp(srManager.appId())
+                .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
+        return fwdBuilder;
+    }
+
+    /**
+     * Creates a filtering objective builder for multicast.
+     *
+     * @param ingressPort ingress port of the multicast stream
+     * @param assignedVlan assigned VLAN ID
+     * @param mcastIp the group address
+     * @param routerMac router MAC. This is carried in metadata and used from some switches that
+     *                  need to put unicast entry before multicast entry in TMAC table.
+     * @param mcastRole the Multicast role
+     * @return filtering objective builder
+     */
+    private FilteringObjective.Builder filterObjBuilder(PortNumber ingressPort, VlanId assignedVlan,
+                                                IpAddress mcastIp, MacAddress routerMac, McastRole mcastRole) {
+        FilteringObjective.Builder filtBuilder = DefaultFilteringObjective.builder();
+        // Let's add the in port matching and the priority
+        filtBuilder.withKey(Criteria.matchInPort(ingressPort))
+                .withPriority(SegmentRoutingService.DEFAULT_PRIORITY);
+        // According to the mcast role we match on the proper vlan
+        // If the role is null we are on the transit or on the egress
+        if (mcastRole == null) {
+            filtBuilder.addCondition(Criteria.matchVlanId(egressVlan()));
+        } else {
+            filtBuilder.addCondition(Criteria.matchVlanId(ingressVlan()));
+        }
+        // According to the IP type we set the proper match on the mac address
+        if (mcastIp.isIp4()) {
+            filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV4_MULTICAST,
+                                                                MacAddress.IPV4_MULTICAST_MASK));
+        } else {
+            filtBuilder.addCondition(Criteria.matchEthDstMasked(MacAddress.IPV6_MULTICAST,
+                                                                MacAddress.IPV6_MULTICAST_MASK));
+        }
+        // We finally build the meta treatment
+        TrafficTreatment tt = DefaultTrafficTreatment.builder()
+                .pushVlan().setVlanId(assignedVlan)
+                .setEthDst(routerMac)
+                .build();
+        filtBuilder.withMeta(tt);
+        // Done, we return a permit filtering objective
+        return filtBuilder.permit().fromApp(srManager.appId());
+    }
+
+    /**
+     * Gets output ports information from treatments.
+     *
+     * @param treatments collection of traffic treatments
+     * @return set of output port numbers
+     */
+    Set<PortNumber> getPorts(Collection<TrafficTreatment> treatments) {
+        ImmutableSet.Builder<PortNumber> builder = ImmutableSet.builder();
+        treatments.forEach(treatment -> treatment.allInstructions().stream()
+                    .filter(instr -> instr instanceof Instructions.OutputInstruction)
+                    .forEach(instr -> builder.add(((Instructions.OutputInstruction) instr).port())));
+        return builder.build();
+    }
+}