Removes the Multicast cache and repurposes the Multicast workers

Additionally reduce the locking inside MulticastHandler

Change-Id: Id8dd9aff5322388e3aeec8ebc67465719719514f
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
deleted file mode 100644
index 55d2131..0000000
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastCacheKey.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.onosproject.segmentrouting.mcast;
-
-import org.onlab.packet.IpAddress;
-import org.onosproject.net.ConnectPoint;
-import org.onosproject.net.HostId;
-
-import java.util.Objects;
-
-import static com.google.common.base.MoreObjects.toStringHelper;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-/**
- * Key of the multicast event cache.
- */
-class McastCacheKey {
-    // The group ip
-    private final IpAddress mcastIp;
-    // The sink id
-    private final HostId sinkHost;
-    // The sink connect point
-    private final ConnectPoint sink;
-
-    /**
-     * Constructs a key for multicast event cache.
-     *
-     * @param mcastIp multicast group IP address
-     * @param sink connect point of the sink
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    public McastCacheKey(IpAddress mcastIp, ConnectPoint sink) {
-        checkNotNull(mcastIp, "mcastIp cannot be null");
-        checkNotNull(sink, "sink cannot be null");
-        checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address");
-        this.mcastIp = mcastIp;
-        this.sink = sink;
-        this.sinkHost = null;
-    }
-
-    /**
-     * Constructs a key for multicast event cache.
-     *
-     * @param mcastIp multicast group IP address
-     * @param hostId id of the sink
-     */
-    public McastCacheKey(IpAddress mcastIp, HostId hostId) {
-        checkNotNull(mcastIp, "mcastIp cannot be null");
-        checkNotNull(hostId, "sink cannot be null");
-        checkArgument(mcastIp.isMulticast(), "mcastIp must be a multicast address");
-        this.mcastIp = mcastIp;
-        this.sinkHost = hostId;
-        this.sink = null;
-    }
-
-    /**
-     * Returns the multicast IP address of this key.
-     *
-     * @return multicast IP
-     */
-    public IpAddress mcastIp() {
-        return mcastIp;
-    }
-
-    /**
-     * Returns the sink of this key.
-     *
-     * @return connect point of the sink
-     *
-     * @deprecated in 1.12 ("Magpie") release.
-     */
-    @Deprecated
-    public ConnectPoint sink() {
-        return sink;
-    }
-
-    /**
-     * Returns the sink of this key.
-     *
-     * @return host id of the sink
-     */
-    public HostId sinkHost() {
-        return sinkHost;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (!(o instanceof McastCacheKey)) {
-            return false;
-        }
-        McastCacheKey that =
-                (McastCacheKey) o;
-        return (Objects.equals(this.mcastIp, that.mcastIp) &&
-                Objects.equals(this.sink, that.sink) &&
-                Objects.equals(this.sinkHost, that.sinkHost));
-    }
-
-    @Override
-    public int hashCode() {
-         return Objects.hash(mcastIp, sink);
-    }
-
-    @Override
-    public String toString() {
-        return toStringHelper(getClass())
-                .add("mcastIp", mcastIp)
-                .add("sink", sink)
-                .add("sinkHost", sinkHost)
-                .toString();
-    }
-}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 52618f4..5533413 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -17,10 +17,6 @@
 package org.onosproject.segmentrouting.mcast;
 
 import com.google.common.base.Objects;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalCause;
-import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -76,8 +72,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static java.util.concurrent.Executors.newScheduledThreadPool;
@@ -98,6 +93,7 @@
  * Handles Multicast related events.
  */
 public class McastHandler {
+    // Internal elements
     private static final Logger log = LoggerFactory.getLogger(McastHandler.class);
     private final SegmentRoutingManager srManager;
     private final TopologyService topologyService;
@@ -105,152 +101,21 @@
     private final ConsistentMap<McastStoreKey, NextObjective> mcastNextObjStore;
     private final ConsistentMap<McastRoleStoreKey, McastRole> mcastRoleStore;
     private final DistributedSet<McastFilteringObjStoreKey> mcastFilteringObjStore;
-
-    // Wait time for the cache
-    private static final int WAIT_TIME_MS = 1000;
-
-    //The mcastEventCache is implemented to avoid race condition by giving more time
-    // to the underlying subsystems to process previous calls.
-    private Cache<McastCacheKey, McastEvent> mcastEventCache = CacheBuilder.newBuilder()
-            .expireAfterWrite(WAIT_TIME_MS, TimeUnit.MILLISECONDS)
-            .removalListener((RemovalNotification<McastCacheKey, McastEvent> notification) -> {
-                IpAddress mcastIp = notification.getKey().mcastIp();
-                HostId sink = notification.getKey().sinkHost();
-                McastEvent mcastEvent = notification.getValue();
-                RemovalCause cause = notification.getCause();
-                // If it expires or it has been replaced, we deque the event - no when evicted
-                switch (notification.getCause()) {
-                    case REPLACED:
-                    case EXPIRED:
-                        dequeueMcastEvent(mcastEvent);
-                        break;
-                    default:
-                        break;
-                }
-            }).build();
-
-    private void enqueueMcastEvent(McastEvent mcastEvent) {
-        final McastRouteUpdate mcastRouteUpdate = mcastEvent.subject();
-        final McastRouteUpdate mcastRoutePrevUpdate = mcastEvent.prevSubject();
-        final IpAddress group = mcastRoutePrevUpdate.route().group();
-        ImmutableSet.Builder<HostId> sinksBuilder = ImmutableSet.builder();
-        if (mcastEvent.type() == SOURCES_ADDED ||
-                mcastEvent.type() == SOURCES_REMOVED) {
-            // Current subject and prev just differ for the source connect points
-            sinksBuilder.addAll(mcastRouteUpdate.sinks().keySet());
-        } else if (mcastEvent.type() == SINKS_ADDED) {
-            mcastRouteUpdate.sinks().forEach(((hostId, connectPoints) -> {
-                // Get the previous locations and verify if there are changes
-                Set<ConnectPoint> prevConnectPoints = mcastRoutePrevUpdate.sinks().get(hostId);
-                Set<ConnectPoint> changes = Sets.difference(connectPoints, prevConnectPoints != null ?
-                        prevConnectPoints : Collections.emptySet());
-                if (!changes.isEmpty()) {
-                    sinksBuilder.add(hostId);
-                }
-            }));
-        } else if (mcastEvent.type() == SINKS_REMOVED) {
-            mcastRoutePrevUpdate.sinks().forEach(((hostId, connectPoints) -> {
-                // Get the current locations and verify if there are changes
-                Set<ConnectPoint> currentConnectPoints = mcastRouteUpdate.sinks().get(hostId);
-                Set<ConnectPoint> changes = Sets.difference(connectPoints, currentConnectPoints != null ?
-                        currentConnectPoints : Collections.emptySet());
-                if (!changes.isEmpty()) {
-                    sinksBuilder.add(hostId);
-                }
-            }));
-        } else if (mcastEvent.type() == ROUTE_REMOVED) {
-            // Current subject is null, just take the previous host ids
-            sinksBuilder.addAll(mcastRoutePrevUpdate.sinks().keySet());
-        }
-        sinksBuilder.build().forEach(sink -> {
-            McastCacheKey cacheKey = new McastCacheKey(group, sink);
-            mcastEventCache.put(cacheKey, mcastEvent);
-        });
-    }
-
-    private void dequeueMcastEvent(McastEvent mcastEvent) {
-        final McastRouteUpdate mcastUpdate = mcastEvent.subject();
-        final McastRouteUpdate mcastPrevUpdate = mcastEvent.prevSubject();
-        IpAddress mcastIp = mcastPrevUpdate.route().group();
-        Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
-                .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
-        Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
-                .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
-        Set<ConnectPoint> sources;
-        switch (mcastEvent.type()) {
-            case SOURCES_ADDED:
-                sources = mcastUpdate.sources()
-                        .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
-                Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
-                processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
-                break;
-            case SOURCES_REMOVED:
-                sources = mcastUpdate.sources()
-                        .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
-                Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
-                processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
-                break;
-            case ROUTE_REMOVED:
-                processRouteRemovedInternal(prevSources, mcastIp);
-                break;
-            case SINKS_ADDED:
-                processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
-                break;
-            case SINKS_REMOVED:
-                processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
-                break;
-            default:
-                break;
-        }
-    }
-
-    // Mcast lock to serialize local operations
-    private final Lock mcastLock = new ReentrantLock();
-    private void mcastLock() {
-        mcastLock.lock();
-    }
-    private void mcastUnlock() {
-        mcastLock.unlock();
-    }
     // Stability threshold for Mcast. Seconds
     private static final long MCAST_STABLITY_THRESHOLD = 5;
-    // Last change done
-    private Instant lastMcastChange = Instant.now();
-    // Last bucker corrector execution
-    private Instant lastBktCorrExecution = Instant.now();
-
-    /**
-     * Determines if mcast in the network has been stable in the last
-     * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
-     * to the last mcast change timestamp.
-     *
-     * @return true if stable
-     */
-    private boolean isMcastStable() {
-        long last = (long) (lastMcastChange.toEpochMilli() / 1000.0);
-        long now = (long) (Instant.now().toEpochMilli() / 1000.0);
-        log.trace("Multicast stable since {}s", now - last);
-        return (now - last) > MCAST_STABLITY_THRESHOLD;
-    }
-
-    /**
-     * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
-     * by comparing the current time with the last corrector execution.
-     *
-     * @return true if stable
-     */
-    private boolean wasBktCorrRunning() {
-        long last = (long) (lastBktCorrExecution.toEpochMilli() / 1000.0);
-        long now = (long) (Instant.now().toEpochMilli() / 1000.0);
-        log.trace("McastBucketCorrector executed {}s ago", now - last);
-        return (now - last) < MCAST_VERIFY_INTERVAL;
-    }
-
     // Verify interval for Mcast bucket corrector
     private static final long MCAST_VERIFY_INTERVAL = 30;
-    // Executor for mcast bucket corrector and for cache
-    private ScheduledExecutorService executorService
-            = newScheduledThreadPool(1, groupedThreads("mcastWorker", "mcastWorker-%d", log));
+    // Max verify that can be processed at the same time
+    private static final int MAX_VERIFY_ON_FLIGHT = 10;
+    // Last change done
+    private AtomicReference<Instant> lastMcastChange = new AtomicReference<>(Instant.now());
+    // Last bucker corrector execution
+    private AtomicReference<Instant> lastBktCorrExecution = new AtomicReference<>(Instant.now());
+    // Executors for mcast bucket corrector and for the events
+    private ScheduledExecutorService mcastCorrector
+            = newScheduledThreadPool(1, groupedThreads("onos", "m-corrector", log));
+    private ScheduledExecutorService mcastWorker
+            = newScheduledThreadPool(1, groupedThreads("onos", "m-worker-%d", log));
 
     /**
      * Constructs the McastEventHandler.
@@ -288,64 +153,88 @@
                 .build()
                 .asDistributedSet();
         mcastUtils = new McastUtils(srManager, coreAppId, log);
-        // Init the executor service, the buckets corrector and schedule the clean up
-        executorService.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
-                                               MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
-        executorService.scheduleAtFixedRate(mcastEventCache::cleanUp, 0,
-                                            WAIT_TIME_MS, TimeUnit.MILLISECONDS);
+        // Init the executor for the buckets corrector
+        mcastCorrector.scheduleWithFixedDelay(new McastBucketCorrector(), 10,
+                                              MCAST_VERIFY_INTERVAL, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Determines if mcast in the network has been stable in the last
+     * MCAST_STABLITY_THRESHOLD seconds, by comparing the current time
+     * to the last mcast change timestamp.
+     *
+     * @return true if stable
+     */
+    private boolean isMcastStable() {
+        long last = (long) (lastMcastChange.get().toEpochMilli() / 1000.0);
+        long now = (long) (Instant.now().toEpochMilli() / 1000.0);
+        log.trace("Multicast stable since {}s", now - last);
+        return (now - last) > MCAST_STABLITY_THRESHOLD;
+    }
+
+    /**
+     * Assures there are always MCAST_VERIFY_INTERVAL seconds between each execution,
+     * by comparing the current time with the last corrector execution.
+     *
+     * @return true if stable
+     */
+    private boolean wasBktCorrRunning() {
+        long last = (long) (lastBktCorrExecution.get().toEpochMilli() / 1000.0);
+        long now = (long) (Instant.now().toEpochMilli() / 1000.0);
+        log.trace("McastBucketCorrector executed {}s ago", now - last);
+        return (now - last) < MCAST_VERIFY_INTERVAL;
     }
 
     /**
      * Read initial multicast configuration from mcast store.
      */
     public void init() {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
-                log.debug("Init group {}", mcastRoute.group());
-                if (!mcastUtils.isLeader(mcastRoute.group())) {
-                    log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+        mcastWorker.execute(this::initInternal);
+    }
+
+    private void initInternal() {
+        lastMcastChange.set(Instant.now());
+        srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+            log.debug("Init group {}", mcastRoute.group());
+            if (!mcastUtils.isLeader(mcastRoute.group())) {
+                log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+                return;
+            }
+            McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
+            // For each source process the mcast tree
+            srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
+                Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
+                Set<DeviceId> visited = Sets.newHashSet();
+                List<ConnectPoint> currentPath = Lists.newArrayList(source);
+                mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
+                                           currentPath, mcastRoute.group(), source);
+                // Get all the sinks and process them
+                Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
+                                                                mcastRouteData.sinks());
+                // Filter out all the working sinks, we do not want to move them
+                // TODO we need a better way to distinguish flows coming from different sources
+                sinks = sinks.stream()
+                        .filter(sink -> !mcastPaths.containsKey(sink) ||
+                                !isSinkForSource(mcastRoute.group(), sink, source))
+                        .collect(Collectors.toSet());
+                if (sinks.isEmpty()) {
+                    log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
                     return;
                 }
-                McastRouteData mcastRouteData = srManager.multicastRouteService.routeData(mcastRoute);
-                // For each source process the mcast tree
-                srManager.multicastRouteService.sources(mcastRoute).forEach(source -> {
-                    Map<ConnectPoint, List<ConnectPoint>> mcastPaths = Maps.newHashMap();
-                    Set<DeviceId> visited = Sets.newHashSet();
-                    List<ConnectPoint> currentPath = Lists.newArrayList(source);
-                    mcastUtils.buildMcastPaths(mcastNextObjStore.asJavaMap(), source.deviceId(), visited, mcastPaths,
-                                    currentPath, mcastRoute.group(), source);
-                    // Get all the sinks and process them
-                    Set<ConnectPoint> sinks = processSinksToBeAdded(source, mcastRoute.group(),
-                                                                    mcastRouteData.sinks());
-                    // Filter out all the working sinks, we do not want to move them
-                    // TODO we need a better way to distinguish flows coming from different sources
-                    sinks = sinks.stream()
-                            .filter(sink -> !mcastPaths.containsKey(sink) ||
-                                    !isSinkForSource(mcastRoute.group(), sink, source))
-                            .collect(Collectors.toSet());
-                    if (sinks.isEmpty()) {
-                        log.debug("Skip {} for source {} nothing to do", mcastRoute.group(), source);
-                        return;
-                    }
-                    Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastRoute.group(),
-                                                                                  source.deviceId(), sinks);
-                    mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
-                                                                               mcastRoute.group(), paths));
-                });
+                Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastRoute.group(),
+                                                                              source.deviceId(), sinks);
+                mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink,
+                                                                           mcastRoute.group(), paths));
             });
-        } finally {
-            mcastUnlock();
-        }
+        });
     }
 
     /**
      * Clean up when deactivating the application.
      */
     public void terminate() {
-        mcastEventCache.invalidateAll();
-        executorService.shutdown();
+        mcastCorrector.shutdown();
+        mcastWorker.shutdown();
         mcastNextObjStore.destroy();
         mcastRoleStore.destroy();
         mcastFilteringObjStore.destroy();
@@ -357,14 +246,46 @@
      * Processes the SOURCE_ADDED, SOURCE_UPDATED, SINK_ADDED,
      * SINK_REMOVED, ROUTE_ADDED and ROUTE_REMOVED events.
      *
-     * @param event McastEvent with SOURCE_ADDED type
+     * @param event the multicast event to be processed
      */
     public void processMcastEvent(McastEvent event) {
-        // If it is a route added, we do not enqueue
+        mcastWorker.execute(() -> processMcastEventInternal(event));
+    }
+
+    private void processMcastEventInternal(McastEvent event) {
+        lastMcastChange.set(Instant.now());
+        // Current subject is null, for ROUTE_REMOVED events
+        final McastRouteUpdate mcastUpdate = event.subject();
+        final McastRouteUpdate mcastPrevUpdate = event.prevSubject();
+        IpAddress mcastIp = mcastPrevUpdate.route().group();
+        Set<ConnectPoint> prevSinks = mcastPrevUpdate.sinks()
+                .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+        Set<ConnectPoint> prevSources = mcastPrevUpdate.sources()
+                .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+        Set<ConnectPoint> sources;
+        // Events handling
         if (event.type() == ROUTE_ADDED) {
-            processRouteAddedInternal(event.subject().route().group());
+            processRouteAddedInternal(mcastUpdate.route().group());
+        } else if (event.type() == ROUTE_REMOVED) {
+            processRouteRemovedInternal(prevSources, mcastIp);
+        } else if (event.type() == SOURCES_ADDED) {
+            // Current subject and prev just differ for the source connect points
+            sources = mcastUpdate.sources()
+                    .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+            Set<ConnectPoint> sourcesToBeAdded = Sets.difference(sources, prevSources);
+            processSourcesAddedInternal(sourcesToBeAdded, mcastIp, mcastUpdate.sinks());
+        } else if (event.type() == SOURCES_REMOVED) {
+            // Current subject and prev just differ for the source connect points
+            sources = mcastUpdate.sources()
+                    .values().stream().flatMap(Collection::stream).collect(Collectors.toSet());
+            Set<ConnectPoint> sourcesToBeRemoved = Sets.difference(prevSources, sources);
+            processSourcesRemovedInternal(sourcesToBeRemoved, sources, mcastIp, mcastUpdate.sinks());
+        } else if (event.type() == SINKS_ADDED) {
+            processSinksAddedInternal(prevSources, mcastIp, mcastUpdate.sinks(), prevSinks);
+        } else if (event.type() == SINKS_REMOVED) {
+            processSinksRemovedInternal(prevSources, mcastIp, mcastUpdate.sinks(), mcastPrevUpdate.sinks());
         } else {
-            enqueueMcastEvent(event);
+            log.warn("Event {} not handled", event);
         }
     }
 
@@ -377,23 +298,22 @@
      */
     private void processSourcesAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
                                              Map<HostId, Set<ConnectPoint>> sinks) {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            log.info("Processing sources added {} for group {}", sources, mcastIp);
-            if (!mcastUtils.isLeader(mcastIp)) {
-                log.debug("Skip {} due to lack of leadership", mcastIp);
-                return;
-            }
-            sources.forEach(source -> {
-                Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
-                Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastIp, source.deviceId(),
-                                                                              sinksToBeAdded);
-                mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
-            });
-        } finally {
-            mcastUnlock();
+        lastMcastChange.set(Instant.now());
+        log.info("Processing sources added {} for group {}", sources, mcastIp);
+        if (!mcastUtils.isLeader(mcastIp)) {
+            log.debug("Skip {} due to lack of leadership", mcastIp);
+            return;
         }
+        if (sources.isEmpty()) {
+            log.debug("Skip {} due to empty sources to be added", mcastIp);
+            return;
+        }
+        sources.forEach(source -> {
+            Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, sinks);
+            Map<ConnectPoint, List<Path>> mcasTree = computeSinkMcastTree(mcastIp, source.deviceId(),
+                                                                          sinksToBeAdded);
+            mcasTree.forEach((sink, paths) -> processSinkAddedInternal(source, sink, mcastIp, paths));
+        });
     }
 
     /**
@@ -408,119 +328,114 @@
                                                Set<ConnectPoint> remainingSources,
                                                IpAddress mcastIp,
                                                Map<HostId, Set<ConnectPoint>> sinks) {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            log.info("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
-            if (!mcastUtils.isLeader(mcastIp)) {
-                log.debug("Skip {} due to lack of leadership", mcastIp);
-                return;
-            }
-            if (remainingSources.isEmpty()) {
-                log.debug("There are no more sources for {}", mcastIp);
-                processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
-                return;
-            }
-            // Skip offline devices
-            Set<ConnectPoint> candidateSources = sourcesToBeRemoved.stream()
-                    .filter(source -> srManager.deviceService.isAvailable(source.deviceId()))
+        lastMcastChange.set(Instant.now());
+        log.info("Processing sources removed {} for group {}", sourcesToBeRemoved, mcastIp);
+        if (!mcastUtils.isLeader(mcastIp)) {
+            log.debug("Skip {} due to lack of leadership", mcastIp);
+            return;
+        }
+        if (remainingSources.isEmpty()) {
+            log.debug("There are no more sources for {}", mcastIp);
+            processRouteRemovedInternal(sourcesToBeRemoved, mcastIp);
+            return;
+        }
+        // Skip offline devices
+        Set<ConnectPoint> candidateSources = sourcesToBeRemoved.stream()
+                .filter(source -> srManager.deviceService.isAvailable(source.deviceId()))
+                .collect(Collectors.toSet());
+        if (candidateSources.isEmpty()) {
+            log.debug("Skip {} due to empty sources to be removed", mcastIp);
+            return;
+        }
+        // Let's heal the trees
+        Set<Link> remainingLinks = Sets.newHashSet();
+        Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
+        Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
+        Set<ConnectPoint> totalSources = Sets.newHashSet(candidateSources);
+        totalSources.addAll(remainingSources);
+        // Calculate all the links used by the sources
+        totalSources.forEach(source -> {
+            Set<ConnectPoint> currentSinks = sinks.values()
+                    .stream().flatMap(Collection::stream)
+                    .filter(sink -> isSinkForSource(mcastIp, sink, source))
                     .collect(Collectors.toSet());
-            if (candidateSources.isEmpty()) {
-                log.debug("Skip {} due to empty sources to be removed", mcastIp);
-                return;
-            }
-            // Let's heal the trees
-            Set<Link> remainingLinks = Sets.newHashSet();
-            Map<ConnectPoint, Set<Link>> candidateLinks = Maps.newHashMap();
-            Map<ConnectPoint, Set<ConnectPoint>> candidateSinks = Maps.newHashMap();
-            Set<ConnectPoint> totalSources = Sets.newHashSet(candidateSources);
-            totalSources.addAll(remainingSources);
-            // Calculate all the links used by the sources
-            totalSources.forEach(source -> {
-                Set<ConnectPoint> currentSinks = sinks.values()
-                        .stream().flatMap(Collection::stream)
-                        .filter(sink -> isSinkForSource(mcastIp, sink, source))
-                        .collect(Collectors.toSet());
-                candidateSinks.put(source, currentSinks);
-                currentSinks.forEach(currentSink -> {
-                    Optional<Path> currentPath = getPath(source.deviceId(), currentSink.deviceId(),
-                                                         mcastIp, null, source);
-                    if (currentPath.isPresent()) {
-                        if (!candidateSources.contains(source)) {
-                            remainingLinks.addAll(currentPath.get().links());
-                        } else {
-                            candidateLinks.put(source, Sets.newHashSet(currentPath.get().links()));
-                        }
+            candidateSinks.put(source, currentSinks);
+            currentSinks.forEach(currentSink -> {
+                Optional<Path> currentPath = getPath(source.deviceId(), currentSink.deviceId(),
+                                                     mcastIp, null, source);
+                if (currentPath.isPresent()) {
+                    if (!candidateSources.contains(source)) {
+                        remainingLinks.addAll(currentPath.get().links());
+                    } else {
+                        candidateLinks.put(source, Sets.newHashSet(currentPath.get().links()));
                     }
-                });
-            });
-            // Clean transit links
-            candidateLinks.forEach((source, currentCandidateLinks) -> {
-                Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, remainingLinks)
-                        .immutableCopy();
-                if (!linksToBeRemoved.isEmpty()) {
-                    currentCandidateLinks.forEach(link -> {
-                        DeviceId srcLink = link.src().deviceId();
-                        // Remove ports only on links to be removed
-                        if (linksToBeRemoved.contains(link)) {
-                            removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
-                                                 mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
-                                                                                 source : null));
-                        }
-                        // Remove role on the candidate links
-                        mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
-                    });
                 }
             });
-            // Clean ingress and egress
-            candidateSources.forEach(source -> {
-                Set<ConnectPoint> currentSinks = candidateSinks.get(source);
-                currentSinks.forEach(currentSink -> {
-                    VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
-                                                                          source : null);
-                    // Sinks co-located with the source
-                    if (source.deviceId().equals(currentSink.deviceId())) {
-                        if (source.port().equals(currentSink.port())) {
-                            log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
-                                     mcastIp, currentSink, source);
-                            return;
-                        }
-                        // We need to check against the other sources and if it is
-                        // necessary remove the port from the device - no overlap
-                        Set<VlanId> otherVlans = remainingSources.stream()
-                                // Only sources co-located and having this sink
-                                .filter(remainingSource -> remainingSource.deviceId()
-                                        .equals(source.deviceId()) && candidateSinks.get(remainingSource)
-                                        .contains(currentSink))
-                                .map(remainingSource -> mcastUtils.assignedVlan(
-                                        remainingSource.deviceId().equals(currentSink.deviceId()) ?
-                                                remainingSource : null)).collect(Collectors.toSet());
-                        if (!otherVlans.contains(assignedVlan)) {
-                            removePortFromDevice(currentSink.deviceId(), currentSink.port(),
-                                                 mcastIp, assignedVlan);
-                        }
-                        mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
-                                                                    source));
+        });
+        // Clean transit links
+        candidateLinks.forEach((source, currentCandidateLinks) -> {
+            Set<Link> linksToBeRemoved = Sets.difference(currentCandidateLinks, remainingLinks)
+                    .immutableCopy();
+            if (!linksToBeRemoved.isEmpty()) {
+                currentCandidateLinks.forEach(link -> {
+                    DeviceId srcLink = link.src().deviceId();
+                    // Remove ports only on links to be removed
+                    if (linksToBeRemoved.contains(link)) {
+                        removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
+                                             mcastUtils.assignedVlan(srcLink.equals(source.deviceId()) ?
+                                                                             source : null));
+                    }
+                    // Remove role on the candidate links
+                    mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, srcLink, source));
+                });
+            }
+        });
+        // Clean ingress and egress
+        candidateSources.forEach(source -> {
+            Set<ConnectPoint> currentSinks = candidateSinks.get(source);
+            currentSinks.forEach(currentSink -> {
+                VlanId assignedVlan = mcastUtils.assignedVlan(source.deviceId().equals(currentSink.deviceId()) ?
+                                                                      source : null);
+                // Sinks co-located with the source
+                if (source.deviceId().equals(currentSink.deviceId())) {
+                    if (source.port().equals(currentSink.port())) {
+                        log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
+                                 mcastIp, currentSink, source);
                         return;
                     }
+                    // We need to check against the other sources and if it is
+                    // necessary remove the port from the device - no overlap
                     Set<VlanId> otherVlans = remainingSources.stream()
-                            .filter(remainingSource -> candidateSinks.get(remainingSource)
+                            // Only sources co-located and having this sink
+                            .filter(remainingSource -> remainingSource.deviceId()
+                                    .equals(source.deviceId()) && candidateSinks.get(remainingSource)
                                     .contains(currentSink))
                             .map(remainingSource -> mcastUtils.assignedVlan(
                                     remainingSource.deviceId().equals(currentSink.deviceId()) ?
                                             remainingSource : null)).collect(Collectors.toSet());
-                    // Sinks on other leaves
                     if (!otherVlans.contains(assignedVlan)) {
                         removePortFromDevice(currentSink.deviceId(), currentSink.port(),
                                              mcastIp, assignedVlan);
                     }
                     mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
                                                                 source));
-                });
+                    return;
+                }
+                Set<VlanId> otherVlans = remainingSources.stream()
+                        .filter(remainingSource -> candidateSinks.get(remainingSource)
+                                .contains(currentSink))
+                        .map(remainingSource -> mcastUtils.assignedVlan(
+                                remainingSource.deviceId().equals(currentSink.deviceId()) ?
+                                        remainingSource : null)).collect(Collectors.toSet());
+                // Sinks on other leaves
+                if (!otherVlans.contains(assignedVlan)) {
+                    removePortFromDevice(currentSink.deviceId(), currentSink.port(),
+                                         mcastIp, assignedVlan);
+                }
+                mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, currentSink.deviceId(),
+                                                            source));
             });
-        } finally {
-            mcastUnlock();
-        }
+        });
     }
 
     /**
@@ -529,15 +444,10 @@
      * @param mcastIp the group address
      */
     private void processRouteAddedInternal(IpAddress mcastIp) {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            log.info("Processing route added for Multicast group {}", mcastIp);
-            // Just elect a new leader
-            mcastUtils.isLeader(mcastIp);
-        } finally {
-            mcastUnlock();
-        }
+        lastMcastChange.set(Instant.now());
+        log.info("Processing route added for Multicast group {}", mcastIp);
+        // Just elect a new leader
+        mcastUtils.isLeader(mcastIp);
     }
 
     /**
@@ -546,44 +456,39 @@
      * @param mcastIp multicast group IP address
      */
     private void processRouteRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp) {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            log.info("Processing route removed for group {}", mcastIp);
-            if (!mcastUtils.isLeader(mcastIp)) {
-                log.debug("Skip {} due to lack of leadership", mcastIp);
-                mcastUtils.withdrawLeader(mcastIp);
-                return;
-            }
-            sources.forEach(source -> {
-                // Find out the ingress, transit and egress device of the affected group
-                DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
-                        .stream().findFirst().orElse(null);
-                Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
-                Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
-                // If there are no egress and transit devices, sinks could be only on the ingress
-                if (!egressDevices.isEmpty()) {
-                    egressDevices.forEach(deviceId -> {
-                        removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
-                        mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
-                    });
-                }
-                if (!transitDevices.isEmpty()) {
-                    transitDevices.forEach(deviceId -> {
-                        removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
-                        mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
-                    });
-                }
-                if (ingressDevice != null) {
-                    removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
-                    mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
-                }
-            });
-            // Finally, withdraw the leadership
+        lastMcastChange.set(Instant.now());
+        log.info("Processing route removed for group {}", mcastIp);
+        if (!mcastUtils.isLeader(mcastIp)) {
+            log.debug("Skip {} due to lack of leadership", mcastIp);
             mcastUtils.withdrawLeader(mcastIp);
-        } finally {
-            mcastUnlock();
+            return;
         }
+        sources.forEach(source -> {
+            // Find out the ingress, transit and egress device of the affected group
+            DeviceId ingressDevice = getDevice(mcastIp, INGRESS, source)
+                    .stream().findFirst().orElse(null);
+            Set<DeviceId> transitDevices = getDevice(mcastIp, TRANSIT, source);
+            Set<DeviceId> egressDevices = getDevice(mcastIp, EGRESS, source);
+            // If there are no egress and transit devices, sinks could be only on the ingress
+            if (!egressDevices.isEmpty()) {
+                egressDevices.forEach(deviceId -> {
+                    removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
+                    mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
+                });
+            }
+            if (!transitDevices.isEmpty()) {
+                transitDevices.forEach(deviceId -> {
+                    removeGroupFromDevice(deviceId, mcastIp, mcastUtils.assignedVlan(null));
+                    mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, deviceId, source));
+                });
+            }
+            if (ingressDevice != null) {
+                removeGroupFromDevice(ingressDevice, mcastIp, mcastUtils.assignedVlan(source));
+                mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, ingressDevice, source));
+            }
+        });
+        // Finally, withdraw the leadership
+        mcastUtils.withdrawLeader(mcastIp);
     }
 
     /**
@@ -597,40 +502,39 @@
     private void processSinksRemovedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
                                              Map<HostId, Set<ConnectPoint>> newSinks,
                                              Map<HostId, Set<ConnectPoint>> prevSinks) {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            log.info("Processing sinks removed for group {} and for sources {}",
-                      mcastIp, sources);
-            if (!mcastUtils.isLeader(mcastIp)) {
-                log.debug("Skip {} due to lack of leadership", mcastIp);
-                return;
-            }
-            Map<ConnectPoint, Map<ConnectPoint, Optional<Path>>> treesToBeRemoved = Maps.newHashMap();
-            Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
-            sources.forEach(source -> {
-                // Save the path associated to the sinks to be removed
-                Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
-                                                                             newSinks, source);
-                Map<ConnectPoint, Optional<Path>> treeToBeRemoved = Maps.newHashMap();
-                sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, getPath(source.deviceId(),
-                                                                                   sink.deviceId(), mcastIp,
-                                                                                   null, source)));
-                treesToBeRemoved.put(source, treeToBeRemoved);
-                // Recover the dual-homed sinks
-                Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
-                                                                                 prevSinks, source);
-                treesToBeAdded.put(source, sinksToBeRecovered);
-            });
-            // Remove the sinks taking into account the multiple sources and the original paths
-            treesToBeRemoved.forEach((source, tree) ->
-                tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path)));
-            // Add new sinks according to the recovery procedure
-            treesToBeAdded.forEach((source, sinks) ->
-                sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
-        } finally {
-            mcastUnlock();
+        lastMcastChange.set(Instant.now());
+        log.info("Processing sinks removed for group {} and for sources {}",
+                  mcastIp, sources);
+        if (!mcastUtils.isLeader(mcastIp)) {
+            log.debug("Skip {} due to lack of leadership", mcastIp);
+            return;
         }
+        Map<ConnectPoint, Map<ConnectPoint, Optional<Path>>> treesToBeRemoved = Maps.newHashMap();
+        Map<ConnectPoint, Set<ConnectPoint>> treesToBeAdded = Maps.newHashMap();
+        sources.forEach(source -> {
+            // Save the path associated to the sinks to be removed
+            Set<ConnectPoint> candidateSinks = processSinksToBeRemoved(mcastIp, prevSinks,
+                                                                         newSinks, source);
+            // Skip offline devices
+            Set<ConnectPoint> sinksToBeRemoved = candidateSinks.stream()
+                    .filter(sink -> srManager.deviceService.isAvailable(sink.deviceId()))
+                    .collect(Collectors.toSet());
+            Map<ConnectPoint, Optional<Path>> treeToBeRemoved = Maps.newHashMap();
+            sinksToBeRemoved.forEach(sink -> treeToBeRemoved.put(sink, getPath(source.deviceId(),
+                                                                               sink.deviceId(), mcastIp,
+                                                                               null, source)));
+            treesToBeRemoved.put(source, treeToBeRemoved);
+            // Recover the dual-homed sinks
+            Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
+                                                                             prevSinks, source);
+            treesToBeAdded.put(source, sinksToBeRecovered);
+        });
+        // Remove the sinks taking into account the multiple sources and the original paths
+        treesToBeRemoved.forEach((source, tree) ->
+            tree.forEach((sink, path) -> processSinkRemovedInternal(source, sink, mcastIp, path)));
+        // Add new sinks according to the recovery procedure
+        treesToBeAdded.forEach((source, sinks) ->
+            sinks.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null)));
     }
 
     /**
@@ -643,49 +547,45 @@
      */
     private void processSinkRemovedInternal(ConnectPoint source, ConnectPoint sink,
                                             IpAddress mcastIp, Optional<Path> mcastPath) {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            log.info("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
-            boolean isLast;
-            // When source and sink are on the same device
-            if (source.deviceId().equals(sink.deviceId())) {
-                // Source and sink are on even the same port. There must be something wrong.
-                if (source.port().equals(sink.port())) {
-                    log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
-                    return;
-                }
-                isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
-                if (isLast) {
-                    mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
-                }
+        lastMcastChange.set(Instant.now());
+        log.info("Processing sink removed {} for group {} and for source {}", sink, mcastIp, source);
+        boolean isLast;
+        // When source and sink are on the same device
+        if (source.deviceId().equals(sink.deviceId())) {
+            // Source and sink are on even the same port. There must be something wrong.
+            if (source.port().equals(sink.port())) {
+                log.warn("Skip {} since sink {} is on the same port of source {}. Abort", mcastIp, sink, source);
                 return;
             }
-            // Process the egress device
-            isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
+            isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
             if (isLast) {
                 mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
             }
-            // If this is the last sink on the device, also update upstream
-            if (mcastPath.isPresent()) {
-                List<Link> links = Lists.newArrayList(mcastPath.get().links());
-                Collections.reverse(links);
-                for (Link link : links) {
+            return;
+        }
+        // Process the egress device
+        isLast = removePortFromDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
+        if (isLast) {
+            mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, sink.deviceId(), source));
+        }
+        // If this is the last sink on the device, also update upstream
+        if (mcastPath.isPresent()) {
+            List<Link> links = Lists.newArrayList(mcastPath.get().links());
+            Collections.reverse(links);
+            for (Link link : links) {
+                if (isLast) {
+                    isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
+                    mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
                     if (isLast) {
-                        isLast = removePortFromDevice(link.src().deviceId(), link.src().port(), mcastIp,
-                        mcastUtils.assignedVlan(link.src().deviceId().equals(source.deviceId()) ? source : null));
-                        if (isLast) {
-                            mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
-                        }
+                        mcastRoleStore.remove(new McastRoleStoreKey(mcastIp, link.src().deviceId(), source));
                     }
                 }
             }
-        } finally {
-            mcastUnlock();
+        } else {
+            log.warn("Unable to find a path from {} to {}. Abort sinkRemoved", source.deviceId(), sink.deviceId());
         }
     }
 
-
     /**
      * Process sinks to be added.
      *
@@ -697,22 +597,17 @@
     private void processSinksAddedInternal(Set<ConnectPoint> sources, IpAddress mcastIp,
                                            Map<HostId, Set<ConnectPoint>> newSinks,
                                            Set<ConnectPoint> allPrevSinks) {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            log.info("Processing sinks added for group {} and for sources {}", mcastIp, sources);
-            if (!mcastUtils.isLeader(mcastIp)) {
-                log.debug("Skip {} due to lack of leadership", mcastIp);
-                return;
-            }
-            sources.forEach(source -> {
-                Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
-                sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
-                sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
-            });
-        } finally {
-            mcastUnlock();
+        lastMcastChange.set(Instant.now());
+        log.info("Processing sinks added for group {} and for sources {}", mcastIp, sources);
+        if (!mcastUtils.isLeader(mcastIp)) {
+            log.debug("Skip {} due to lack of leadership", mcastIp);
+            return;
         }
+        sources.forEach(source -> {
+            Set<ConnectPoint> sinksToBeAdded = processSinksToBeAdded(source, mcastIp, newSinks);
+            sinksToBeAdded = Sets.difference(sinksToBeAdded, allPrevSinks);
+            sinksToBeAdded.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
+        });
     }
 
     /**
@@ -724,69 +619,70 @@
      */
     private void processSinkAddedInternal(ConnectPoint source, ConnectPoint sink,
                                           IpAddress mcastIp, List<Path> allPaths) {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            log.info("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
-            // Process the ingress device
-            McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
-                                                               mcastUtils.assignedVlan(source), mcastIp.isIp4());
-            addFilterToDevice(mcastFilterObjStoreKey, mcastIp, INGRESS);
-            if (source.deviceId().equals(sink.deviceId())) {
-                if (source.port().equals(sink.port())) {
-                    log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
-                             mcastIp, sink, source);
-                    return;
-                }
-                addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
-                mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
+        lastMcastChange.set(Instant.now());
+        log.info("Processing sink added {} for group {} and for source {}", sink, mcastIp, source);
+        // Process the ingress device
+        McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
+                                                           mcastUtils.assignedVlan(source), mcastIp.isIp4());
+        addFilterToDevice(mcastFilterObjStoreKey, mcastIp, INGRESS);
+        if (source.deviceId().equals(sink.deviceId())) {
+            if (source.port().equals(sink.port())) {
+                log.warn("Skip {} since sink {} is on the same port of source {}. Abort",
+                         mcastIp, sink, source);
                 return;
             }
-            // Find a path. If present, create/update groups and flows for each hop
-            Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths, source);
-            if (mcastPath.isPresent()) {
-                List<Link> links = mcastPath.get().links();
-                // Setup mcast role for ingress
-                mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
-                // Setup properly the transit forwarding
-                links.forEach(link -> {
-                    addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
-                                    mcastUtils.assignedVlan(link.src().deviceId()
-                                                                    .equals(source.deviceId()) ? source : null));
-                    McastFilteringObjStoreKey filteringKey = new McastFilteringObjStoreKey(link.dst(),
-                                                                       mcastUtils.assignedVlan(null), mcastIp.isIp4());
-                    addFilterToDevice(filteringKey, mcastIp, null);
-                });
-                // Setup mcast role for the transit
-                links.stream()
-                        .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
-                        .forEach(link -> {
-                            log.trace("Transit links {}", link);
-                            mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
-                                    source), TRANSIT);
-                        });
-                // Process the egress device
-                addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
-                // Setup mcast role for egress
-                mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
-            } else {
-                log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
-            }
-        } finally {
-            mcastUnlock();
+            addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(source));
+            mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), INGRESS);
+            return;
+        }
+        // Find a path. If present, create/update groups and flows for each hop
+        Optional<Path> mcastPath = getPath(source.deviceId(), sink.deviceId(), mcastIp, allPaths, source);
+        if (mcastPath.isPresent()) {
+            List<Link> links = mcastPath.get().links();
+            // Setup mcast role for ingress
+            mcastRoleStore.put(new McastRoleStoreKey(mcastIp, source.deviceId(), source), INGRESS);
+            // Setup properly the transit forwarding
+            links.forEach(link -> {
+                addPortToDevice(link.src().deviceId(), link.src().port(), mcastIp,
+                                mcastUtils.assignedVlan(link.src().deviceId()
+                                                                .equals(source.deviceId()) ? source : null));
+                McastFilteringObjStoreKey filteringKey = new McastFilteringObjStoreKey(link.dst(),
+                                                                   mcastUtils.assignedVlan(null), mcastIp.isIp4());
+                addFilterToDevice(filteringKey, mcastIp, null);
+            });
+            // Setup mcast role for the transit
+            links.stream()
+                    .filter(link -> !link.dst().deviceId().equals(sink.deviceId()))
+                    .forEach(link -> {
+                        log.trace("Transit links {}", link);
+                        mcastRoleStore.put(new McastRoleStoreKey(mcastIp, link.dst().deviceId(),
+                                source), TRANSIT);
+                    });
+            // Process the egress device
+            addPortToDevice(sink.deviceId(), sink.port(), mcastIp, mcastUtils.assignedVlan(null));
+            // Setup mcast role for egress
+            mcastRoleStore.put(new McastRoleStoreKey(mcastIp, sink.deviceId(), source), EGRESS);
+        } else {
+            log.warn("Unable to find a path from {} to {}. Abort sinkAdded", source.deviceId(), sink.deviceId());
         }
     }
 
     /**
-     * Processes PORT_UPDATED event.
+     * Processes the PORT_UPDATED event.
      *
      * @param affectedDevice Affected device
      * @param affectedPort Affected port
      */
     public void processPortUpdate(Device affectedDevice, Port affectedPort) {
+        mcastWorker.execute(() -> processPortUpdateInternal(affectedDevice, affectedPort));
+    }
+
+    private void processPortUpdateInternal(Device affectedDevice, Port affectedPort) {
         // Clean the filtering obj store. Edge port case.
+        lastMcastChange.set(Instant.now());
         ConnectPoint portDown = new ConnectPoint(affectedDevice.id(), affectedPort.number());
         if (!affectedPort.isEnabled()) {
+            log.info("Processing port down {}", portDown);
             updateFilterObjStoreByPort(portDown);
         }
     }
@@ -797,19 +693,18 @@
      * @param linkDown Link that is going down
      */
     public void processLinkDown(Link linkDown) {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            // Get mcast groups affected by the link going down
-            Set<IpAddress> affectedGroups = getAffectedGroups(linkDown);
-            log.info("Processing link down {} for groups {}", linkDown, affectedGroups);
-            affectedGroups.forEach(mcastIp -> {
-                log.debug("Processing link down {} for group {}", linkDown, mcastIp);
-                recoverFailure(mcastIp, linkDown);
-            });
-        } finally {
-            mcastUnlock();
-        }
+        mcastWorker.execute(() -> processLinkDownInternal(linkDown));
+    }
+
+    private void processLinkDownInternal(Link linkDown) {
+        lastMcastChange.set(Instant.now());
+        // Get mcast groups affected by the link going down
+        Set<IpAddress> affectedGroups = getAffectedGroups(linkDown);
+        log.info("Processing link down {} for groups {}", linkDown, affectedGroups);
+        affectedGroups.forEach(mcastIp -> {
+            log.debug("Processing link down {} for group {}", linkDown, mcastIp);
+            recoverFailure(mcastIp, linkDown);
+        });
     }
 
     /**
@@ -818,20 +713,19 @@
      * @param deviceDown device going down
      */
     public void processDeviceDown(DeviceId deviceDown) {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            // Get the mcast groups affected by the device going down
-            Set<IpAddress> affectedGroups = getAffectedGroups(deviceDown);
-            log.info("Processing device down {} for groups {}", deviceDown, affectedGroups);
-            updateFilterObjStoreByDevice(deviceDown);
-            affectedGroups.forEach(mcastIp -> {
-                log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
-                recoverFailure(mcastIp, deviceDown);
-            });
-        } finally {
-            mcastUnlock();
-        }
+        mcastWorker.execute(() -> processDeviceDownInternal(deviceDown));
+    }
+
+    private void processDeviceDownInternal(DeviceId deviceDown) {
+        lastMcastChange.set(Instant.now());
+        // Get the mcast groups affected by the device going down
+        Set<IpAddress> affectedGroups = getAffectedGroups(deviceDown);
+        log.info("Processing device down {} for groups {}", deviceDown, affectedGroups);
+        updateFilterObjStoreByDevice(deviceDown);
+        affectedGroups.forEach(mcastIp -> {
+            log.debug("Processing device down {} for group {}", deviceDown, mcastIp);
+            recoverFailure(mcastIp, deviceDown);
+        });
     }
 
     /**
@@ -1804,33 +1698,33 @@
      */
     public void updateFilterToDevice(DeviceId deviceId, PortNumber portNum,
                                         VlanId vlanId, boolean install) {
-        lastMcastChange = Instant.now();
-        mcastLock();
-        try {
-            // Iterates over the route and updates properly the filtering objective on the source device.
-            srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
-                log.debug("Update filter for {}", mcastRoute.group());
-                if (!mcastUtils.isLeader(mcastRoute.group())) {
-                    log.debug("Skip {} due to lack of leadership", mcastRoute.group());
-                    return;
-                }
-                // Get the sources and for each one update properly the filtering objectives
-                Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
-                sources.forEach(source -> {
-                    if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
-                        if (install) {
-                            McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
-                                    vlanId, mcastRoute.group().isIp4());
-                            addFilterToDevice(mcastFilterObjStoreKey, mcastRoute.group(), INGRESS);
-                        } else {
-                            mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
-                        }
+        mcastWorker.execute(() -> updateFilterToDeviceInternal(deviceId, portNum, vlanId, install));
+    }
+
+    private void updateFilterToDeviceInternal(DeviceId deviceId, PortNumber portNum,
+                                              VlanId vlanId, boolean install) {
+        lastMcastChange.set(Instant.now());
+        // Iterates over the route and updates properly the filtering objective on the source device.
+        srManager.multicastRouteService.getRoutes().forEach(mcastRoute -> {
+            log.debug("Update filter for {}", mcastRoute.group());
+            if (!mcastUtils.isLeader(mcastRoute.group())) {
+                log.debug("Skip {} due to lack of leadership", mcastRoute.group());
+                return;
+            }
+            // Get the sources and for each one update properly the filtering objectives
+            Set<ConnectPoint> sources = srManager.multicastRouteService.sources(mcastRoute);
+            sources.forEach(source -> {
+                if (source.deviceId().equals(deviceId) && source.port().equals(portNum)) {
+                    if (install) {
+                        McastFilteringObjStoreKey mcastFilterObjStoreKey = new McastFilteringObjStoreKey(source,
+                                                                             vlanId, mcastRoute.group().isIp4());
+                        addFilterToDevice(mcastFilterObjStoreKey, mcastRoute.group(), INGRESS);
+                    } else {
+                        mcastUtils.removeFilterToDevice(deviceId, portNum, vlanId, mcastRoute.group(), null);
                     }
-                });
+                }
             });
-        } finally {
-            mcastUnlock();
-        }
+        });
     }
 
     /**
@@ -1931,33 +1825,32 @@
      * the operation is totally delegated to the driver.
      */
     private final class McastBucketCorrector implements Runnable {
-        // Internal params
-        private static final int MAX_VERIFY_ON_FLIGHT = 10;
         private final AtomicInteger verifyOnFlight = new AtomicInteger(0);
         // Define the context used for the back pressure mechanism
         private final ObjectiveContext context = new DefaultObjectiveContext(
                 (objective) -> {
                     synchronized (verifyOnFlight) {
-                        verifyOnFlight.decrementAndGet();
+                        log.trace("Verify {} done", objective.id());
+                        verifyOnFlight.updateAndGet(i -> i > 0 ? i - 1 : i);
                         verifyOnFlight.notify();
                     }
                 },
                 (objective, error) -> {
                     synchronized (verifyOnFlight) {
-                        verifyOnFlight.decrementAndGet();
+                        log.trace("Verify {} error {}", objective.id(), error);
+                        verifyOnFlight.updateAndGet(i -> i > 0 ? i - 1 : i);
                         verifyOnFlight.notify();
                     }
                 });
 
         @Override
         public void run() {
-            if (!isMcastStable() || wasBktCorrRunning()) {
-                return;
-            }
-            mcastLock();
             try {
                 // Iterates over the routes and verify the related next objectives
                 for (McastRoute mcastRoute : srManager.multicastRouteService.getRoutes()) {
+                    if (!isMcastStable() || wasBktCorrRunning()) {
+                        return;
+                    }
                     IpAddress mcastIp = mcastRoute.group();
                     log.trace("Running mcast buckets corrector for mcast group: {}", mcastIp);
                     // Verify leadership on the operation
@@ -1995,9 +1888,7 @@
                         }
                         // Create the set of the devices to be processed
                         ImmutableSet.Builder<DeviceId> devicesBuilder = ImmutableSet.builder();
-                        if (!ingressDevices.isEmpty()) {
-                            devicesBuilder.addAll(ingressDevices);
-                        }
+                        devicesBuilder.addAll(ingressDevices);
                         if (!transitDevices.isEmpty()) {
                             devicesBuilder.addAll(transitDevices);
                         }
@@ -2040,12 +1931,17 @@
                             }
                         }
                     }
+                    // Let's wait the group before start the next one
+                    synchronized (verifyOnFlight) {
+                        while (verifyOnFlight.get() > 0) {
+                            verifyOnFlight.wait();
+                        }
+                    }
                 }
             } catch (InterruptedException e) {
                 log.warn("BktCorr has been interrupted");
             } finally {
-                lastBktCorrExecution = Instant.now();
-                mcastUnlock();
+                lastBktCorrExecution.set(Instant.now());
             }
         }
     }