Optimize Multicast sinks processing

Change-Id: I5e4c48076edbadc38943d9ee05359341ef55aaca
(cherry picked from commit a17ad9e3f8fe1d2925e6d9fd84d3d96ef79cc78d)
diff --git a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
index 4c83a38..2c82cf5 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/mcast/McastHandler.java
@@ -222,7 +222,7 @@
             case SINKS_REMOVED:
                 // FIXME To be addressed with multiple sources support
                 processSinksRemovedInternal(source, mcastIp,
-                                            mcastUpdate.sinks(), prevSinks);
+                                            mcastUpdate.sinks(), mcastPrevUpdate.sinks());
                 break;
             default:
                 break;
@@ -462,32 +462,24 @@
      * @param source the source connect point
      * @param mcastIp the ip address of the group
      * @param newSinks the new sinks to be processed
-     * @param allPrevSinks all previous sinks
+     * @param prevSinks the previous sinks
      */
     private void processSinksRemovedInternal(ConnectPoint source, IpAddress mcastIp,
                                              Map<HostId, Set<ConnectPoint>> newSinks,
-                                             Set<ConnectPoint> allPrevSinks) {
+                                             Map<HostId, Set<ConnectPoint>> prevSinks) {
         lastMcastChange = Instant.now();
         mcastLock();
-        // Let's instantiate the sinks to be removed
-        Set<ConnectPoint> sinksToBeRemoved = Sets.newHashSet();
         try {
+            // Remove the previous ones
+            Set<ConnectPoint> sinksToBeRemoved = processSinksToBeRemoved(mcastIp, prevSinks,
+                                                                         newSinks);
+            sinksToBeRemoved.forEach(sink -> processSinkRemovedInternal(source, sink, mcastIp));
             // Recover the dual-homed sinks
-            Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks);
+            Set<ConnectPoint> sinksToBeRecovered = processSinksToBeRecovered(mcastIp, newSinks,
+                                                                             prevSinks);
             sinksToBeRecovered.forEach(sink -> processSinkAddedInternal(source, sink, mcastIp, null));
-            // Get the only sinks to be processed (old ones)
-            Set<ConnectPoint> allNewSinks = newSinks.values()
-                    .stream()
-                    .flatMap(Collection::stream)
-                    .collect(Collectors.toSet());
-            // Remove the previous one
-            sinksToBeRemoved.addAll(Sets.difference(allPrevSinks, allNewSinks));
         } finally {
             mcastUnlock();
-            // Let's schedule the removal of the previous sinks
-            executorService.schedule(
-                    () -> sinksToBeRemoved.forEach(sink -> processSinkRemovedInternal(source, sink, mcastIp)),
-                    HOST_MOVED_DELAY_MS, TimeUnit.MILLISECONDS);
         }
     }
 
@@ -952,26 +944,65 @@
     }
 
     /**
+     * Process all the sinks related to a mcast group and return
+     * the ones to be removed.
+     *
+     * @param mcastIp the group address
+     * @param prevsinks the previous sinks to be evaluated
+     * @param newSinks the new sinks to be evaluted
+     * @return the set of the sinks to be removed
+     */
+    private Set<ConnectPoint> processSinksToBeRemoved(IpAddress mcastIp,
+                                                      Map<HostId, Set<ConnectPoint>> prevsinks,
+                                                      Map<HostId, Set<ConnectPoint>> newSinks) {
+        // Iterate over the sinks in order to build the set
+        // of the connect points to be removed from this group
+        final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
+        prevsinks.forEach(((hostId, connectPoints) -> {
+            // We have to check with the existing flows
+            ConnectPoint sinkToBeProcessed = connectPoints.stream()
+                    .filter(connectPoint -> isSink(mcastIp, connectPoint))
+                    .findFirst().orElse(null);
+            if (sinkToBeProcessed != null) {
+                // If the host has been removed or location has been removed
+                if (!newSinks.containsKey(hostId) ||
+                        !newSinks.get(hostId).contains(sinkToBeProcessed)) {
+                    sinksToBeProcessed.add(sinkToBeProcessed);
+                }
+            }
+        }));
+        // We have done, return the set
+        return sinksToBeProcessed;
+    }
+
+    /**
      * Process new locations and return the set of sinks to be added
      * in the context of the recovery.
      *
-     * @param sinks the remaining locations
+     * @param newSinks the remaining sinks
+     * @param prevSinks the previous sinks
      * @return the set of the sinks to be processed
      */
     private Set<ConnectPoint> processSinksToBeRecovered(IpAddress mcastIp,
-                                                        Map<HostId, Set<ConnectPoint>> sinks) {
+                                                        Map<HostId, Set<ConnectPoint>> newSinks,
+                                                        Map<HostId, Set<ConnectPoint>> prevSinks) {
         // Iterate over the sinks in order to build the set
         // of the connect points to be served by this group
         final Set<ConnectPoint> sinksToBeProcessed = Sets.newHashSet();
-        sinks.forEach((hostId, connectPoints) -> {
+        newSinks.forEach((hostId, connectPoints) -> {
             // If it has more than 1 locations
             if (connectPoints.size() > 1 || connectPoints.size() == 0) {
                 log.debug("Skip {} since sink {} has {} locations",
                          mcastIp, hostId, connectPoints.size());
                 return;
             }
-            sinksToBeProcessed.add(connectPoints.stream()
-                                           .findFirst().orElseGet(null));
+            // If previously it had two locations, we need to recover it
+            // Filter out if the remaining location is already served
+            if (prevSinks.containsKey(hostId) && prevSinks.get(hostId).size() == 2) {
+                sinksToBeProcessed.add(connectPoints.stream()
+                                               .filter(connectPoint -> !isSink(mcastIp, connectPoint))
+                                               .findFirst().orElseGet(null));
+            }
         });
         return sinksToBeProcessed;
     }
@@ -1005,16 +1036,7 @@
             }
             // We prefer to reuse existing flows
             ConnectPoint sinkToBeProcessed = connectPoints.stream()
-                    .filter(connectPoint -> {
-                        // Let's check if we are already serving that location
-                        McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId());
-                        if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
-                            return false;
-                        }
-                        // Get next and check with the port
-                        NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
-                        return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
-                    })
+                    .filter(connectPoint -> isSink(mcastIp, connectPoint))
                     .findFirst().orElse(null);
             if (sinkToBeProcessed != null) {
                 sinksToBeProcessed.add(sinkToBeProcessed);
@@ -1023,7 +1045,7 @@
             // Otherwise we prefer to reuse existing egresses
             Set<DeviceId> egresses = getDevice(mcastIp, EGRESS);
             sinkToBeProcessed = connectPoints.stream()
-                    .filter(egresses::contains)
+                    .filter(connectPoint -> egresses.contains(connectPoint.deviceId()))
                     .findFirst().orElse(null);
             if (sinkToBeProcessed != null) {
                 sinksToBeProcessed.add(sinkToBeProcessed);
@@ -1603,6 +1625,24 @@
     }
 
     /**
+     * Verify if a given connect point is sink for this group.
+     *
+     * @param mcastIp group address
+     * @param connectPoint connect point to be verified
+     * @return true if the connect point is sink of the group
+     */
+    private boolean isSink(IpAddress mcastIp, ConnectPoint connectPoint) {
+        // Let's check if we are already serving that location
+        McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, connectPoint.deviceId());
+        if (!mcastNextObjStore.containsKey(mcastStoreKey)) {
+            return false;
+        }
+        // Get next and check with the port
+        NextObjective mcastNext = mcastNextObjStore.get(mcastStoreKey).value();
+        return mcastUtils.getPorts(mcastNext.next()).contains(connectPoint.port());
+    }
+
+    /**
      * Updates filtering objective for given device and port.
      * It is called in general when the mcast config has been
      * changed.