[CORD-2748] Multicast flows are not properly updated when changing source

Change-Id: Ia4a9e9f132156e4b9a7eb56dd943f1fd13bc1560
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
index 3bb6a52..95e264d 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
@@ -228,6 +228,24 @@
     }
 
     /**
+     * Processes the SOURCE_UPDATED event.
+     *
+     * @param event McastEvent with SOURCE_UPDATED type
+     */
+    protected void processSourceUpdated(McastEvent event) {
+        log.info("processSourceUpdated {}", event);
+        // Get old and new data
+        McastRouteInfo mcastRouteInfo = event.subject();
+        ConnectPoint newSource = mcastRouteInfo.source().orElse(null);
+        mcastRouteInfo = event.prevSubject();
+        ConnectPoint oldSource = mcastRouteInfo.source().orElse(null);
+        // and group ip
+        IpAddress mcastIp = mcastRouteInfo.route().group();
+        // Process the update event
+        processSourceUpdatedInternal(mcastIp, newSource, oldSource);
+    }
+
+    /**
      * Processes the SINK_ADDED event.
      *
      * @param event McastEvent with SINK_ADDED type
@@ -279,12 +297,59 @@
         }
         // Get group ip and ingress connect point
         IpAddress mcastIp = mcastRouteInfo.route().group();
-        ConnectPoint source = mcastRouteInfo.source().get();
+        ConnectPoint source = mcastRouteInfo.source().orElse(null);
 
         processRouteRemovedInternal(source, mcastIp);
     }
 
     /**
+     * Process the SOURCE_UPDATED event.
+     *
+     * @param newSource the updated srouce info
+     * @param oldSource the outdated source info
+     */
+    private void processSourceUpdatedInternal(IpAddress mcastIp,
+                                              ConnectPoint newSource,
+                                              ConnectPoint oldSource) {
+        lastMcastChange = Instant.now();
+        mcastLock();
+        try {
+            log.debug("Processing source updated for group {}", mcastIp);
+
+            // Build key for the store and retrieve old data
+            McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
+
+            // Verify leadership on the operation
+            if (!isLeader(oldSource)) {
+                log.debug("Skip {} due to lack of leadership", mcastIp);
+                return;
+            }
+
+            // This device is not serving this multicast group
+            if (!mcastRoleStore.containsKey(mcastStoreKey) ||
+                    !mcastNextObjStore.containsKey(mcastStoreKey)) {
+                log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
+                return;
+            }
+            NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
+            Set<PortNumber> outputPorts = getPorts(nextObjective.next());
+
+            // Let's remove old flows and groups
+            removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
+            // Push new flows and group
+            outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
+                                                              mcastIp, assignedVlan(newSource)));
+            addFilterToDevice(newSource.deviceId(), newSource.port(),
+                              assignedVlan(newSource), mcastIp);
+            // Setup mcast roles
+            mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
+                               McastRole.INGRESS);
+        } finally {
+            mcastUnlock();
+        }
+    }
+
+    /**
      * Removes the entire mcast tree related to this group.
      *
      * @param mcastIp multicast group IP address
@@ -293,7 +358,7 @@
         lastMcastChange = Instant.now();
         mcastLock();
         try {
-            log.debug("Processing route down for group {}", mcastIp);
+            log.debug("Processing route removed for group {}", mcastIp);
 
             // Find out the ingress, transit and egress device of the affected group
             DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
@@ -322,7 +387,6 @@
             if (ingressDevice != null) {
                 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
             }
-
         } finally {
             mcastUnlock();
         }
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index a47101d..41bb452 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -1533,6 +1533,9 @@
                 case SOURCE_ADDED:
                     mcastHandler.processSourceAdded(event);
                     break;
+                case SOURCE_UPDATED:
+                    mcastHandler.processSourceUpdated(event);
+                    break;
                 case SINK_ADDED:
                     mcastHandler.processSinkAdded(event);
                     break;