[CORD-2748] Multicast flows are not properly updated when changing source

Change-Id: Ia4a9e9f132156e4b9a7eb56dd943f1fd13bc1560
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
index 3bb6a52..95e264d 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
@@ -228,6 +228,24 @@
     }
 
     /**
+     * Processes the SOURCE_UPDATED event.
+     *
+     * @param event McastEvent with SOURCE_UPDATED type
+     */
+    protected void processSourceUpdated(McastEvent event) {
+        log.info("processSourceUpdated {}", event);
+        // Get old and new data
+        McastRouteInfo mcastRouteInfo = event.subject();
+        ConnectPoint newSource = mcastRouteInfo.source().orElse(null);
+        mcastRouteInfo = event.prevSubject();
+        ConnectPoint oldSource = mcastRouteInfo.source().orElse(null);
+        // and group ip
+        IpAddress mcastIp = mcastRouteInfo.route().group();
+        // Process the update event
+        processSourceUpdatedInternal(mcastIp, newSource, oldSource);
+    }
+
+    /**
      * Processes the SINK_ADDED event.
      *
      * @param event McastEvent with SINK_ADDED type
@@ -279,12 +297,59 @@
         }
         // Get group ip and ingress connect point
         IpAddress mcastIp = mcastRouteInfo.route().group();
-        ConnectPoint source = mcastRouteInfo.source().get();
+        ConnectPoint source = mcastRouteInfo.source().orElse(null);
 
         processRouteRemovedInternal(source, mcastIp);
     }
 
     /**
+     * Process the SOURCE_UPDATED event.
+     *
+     * @param newSource the updated srouce info
+     * @param oldSource the outdated source info
+     */
+    private void processSourceUpdatedInternal(IpAddress mcastIp,
+                                              ConnectPoint newSource,
+                                              ConnectPoint oldSource) {
+        lastMcastChange = Instant.now();
+        mcastLock();
+        try {
+            log.debug("Processing source updated for group {}", mcastIp);
+
+            // Build key for the store and retrieve old data
+            McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
+
+            // Verify leadership on the operation
+            if (!isLeader(oldSource)) {
+                log.debug("Skip {} due to lack of leadership", mcastIp);
+                return;
+            }
+
+            // This device is not serving this multicast group
+            if (!mcastRoleStore.containsKey(mcastStoreKey) ||
+                    !mcastNextObjStore.containsKey(mcastStoreKey)) {
+                log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
+                return;
+            }
+            NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
+            Set<PortNumber> outputPorts = getPorts(nextObjective.next());
+
+            // Let's remove old flows and groups
+            removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
+            // Push new flows and group
+            outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
+                                                              mcastIp, assignedVlan(newSource)));
+            addFilterToDevice(newSource.deviceId(), newSource.port(),
+                              assignedVlan(newSource), mcastIp);
+            // Setup mcast roles
+            mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
+                               McastRole.INGRESS);
+        } finally {
+            mcastUnlock();
+        }
+    }
+
+    /**
      * Removes the entire mcast tree related to this group.
      *
      * @param mcastIp multicast group IP address
@@ -293,7 +358,7 @@
         lastMcastChange = Instant.now();
         mcastLock();
         try {
-            log.debug("Processing route down for group {}", mcastIp);
+            log.debug("Processing route removed for group {}", mcastIp);
 
             // Find out the ingress, transit and egress device of the affected group
             DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
@@ -322,7 +387,6 @@
             if (ingressDevice != null) {
                 removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
             }
-
         } finally {
             mcastUnlock();
         }
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index a47101d..41bb452 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -1533,6 +1533,9 @@
                 case SOURCE_ADDED:
                     mcastHandler.processSourceAdded(event);
                     break;
+                case SOURCE_UPDATED:
+                    mcastHandler.processSourceUpdated(event);
+                    break;
                 case SINK_ADDED:
                     mcastHandler.processSinkAdded(event);
                     break;
diff --git a/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java b/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java
index 1e1dd95..caa0284 100644
--- a/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java
+++ b/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java
@@ -17,6 +17,8 @@
 
 import org.onosproject.event.AbstractEvent;
 
+import java.util.Objects;
+
 import static com.google.common.base.MoreObjects.toStringHelper;
 
 /**
@@ -46,6 +48,11 @@
         SOURCE_ADDED,
 
         /**
+         * A source for a mcast route has been updated.
+         */
+        SOURCE_UPDATED,
+
+        /**
          * A sink for a mcast route (ie. the subject) has been added.
          */
         SINK_ADDED,
@@ -56,15 +63,74 @@
         SINK_REMOVED
     }
 
+    // Used when an update event happens
+    private McastRouteInfo prevSubject;
+
+    /**
+     * Creates a McastEvent of a given type using the subject.
+     *
+     * @param type the event type
+     * @param subject the subject of the event type
+     */
     public McastEvent(McastEvent.Type type, McastRouteInfo subject) {
         super(type, subject);
     }
 
+    /**
+     * Creates a McastEvent of a given type using the subject and
+     * the previous subject.
+     *
+     * @param type the event type
+     * @param subject the subject of the event
+     * @param prevSubject the previous subject of the event
+     */
+    public McastEvent(McastEvent.Type type, McastRouteInfo subject,
+                      McastRouteInfo prevSubject) {
+        super(type, subject);
+        // For now we have just this kind of updates
+        if (type == Type.SOURCE_UPDATED) {
+            this.prevSubject = prevSubject;
+        }
+    }
+
+    /**
+     * Gets the previous subject in this mcast event.
+     *
+     * @return the previous subject, or null if previous subject is not
+     *         specified.
+     */
+    public McastRouteInfo prevSubject() {
+        return this.prevSubject;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(type(), subject(), prevSubject);
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) {
+            return true;
+        }
+
+        if (!(other instanceof McastEvent)) {
+            return false;
+        }
+
+        McastEvent that = (McastEvent) other;
+
+        return Objects.equals(this.subject(), that.subject()) &&
+                Objects.equals(this.type(), that.type()) &&
+                Objects.equals(this.prevSubject, that.prevSubject);
+    }
 
     @Override
     public String toString() {
         return toStringHelper(this)
                 .add("type", type())
-                .add("info", subject()).toString();
+                .add("info", subject())
+                .add("prevInfo", prevSubject())
+                .toString();
     }
 }
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
index b9fb869..83dee45 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
@@ -135,11 +135,28 @@
                     checkNotNull(newData);
                     checkNotNull(oldData);
 
+                    // They are not equal
                     if (!Objects.equal(oldData.source(), newData.source())) {
-                        notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
-                                                      mcastRouteInfo(route,
-                                                                     newData.sinks(),
-                                                                     newData.source())));
+                        // Both not null, it is an update event
+                        if (oldData.source() != null && newData.source() != null) {
+                            // Broadcast old and new data
+                            notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_UPDATED,
+                                                          mcastRouteInfo(route,
+                                                                         newData.sinks(),
+                                                                         newData.source()),
+                                                          mcastRouteInfo(route,
+                                                                         oldData.sinks(),
+                                                                         oldData.source())));
+                        } else if (oldData.source() == null && newData.source() != null) {
+                            // It is a source added event, broadcast new data
+                            notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
+                                                          mcastRouteInfo(route,
+                                                                         newData.sinks(),
+                                                                         newData.source())));
+                        } else {
+                            // Scenario not managed for now
+                            log.warn("Unhandled scenario {} - new {} - old {}", event.type());
+                        }
                     } else {
                         Sets.difference(newData.sinks(), oldData.sinks()).forEach(sink ->
                             notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
index b1214830..565a81d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
@@ -25,6 +25,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static com.google.common.base.MoreObjects.toStringHelper;
 import static com.google.common.base.Preconditions.checkNotNull;
 
 /**
@@ -101,4 +102,12 @@
                 Objects.equals(sinks(), other.sinks()) &&
                 Objects.equals(isEmpty, other.isEmpty);
     }
+
+    @Override
+    public String toString() {
+        return toStringHelper(this)
+                .add("source", source())
+                .add("sinks", sinks())
+                .toString();
+    }
 }