[CORD-2748] Multicast flows are not properly updated when changing source
Change-Id: Ia4a9e9f132156e4b9a7eb56dd943f1fd13bc1560
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
index 3bb6a52..95e264d 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/McastHandler.java
@@ -228,6 +228,24 @@
}
/**
+ * Processes the SOURCE_UPDATED event.
+ *
+ * @param event McastEvent with SOURCE_UPDATED type
+ */
+ protected void processSourceUpdated(McastEvent event) {
+ log.info("processSourceUpdated {}", event);
+ // Get old and new data
+ McastRouteInfo mcastRouteInfo = event.subject();
+ ConnectPoint newSource = mcastRouteInfo.source().orElse(null);
+ mcastRouteInfo = event.prevSubject();
+ ConnectPoint oldSource = mcastRouteInfo.source().orElse(null);
+ // and group ip
+ IpAddress mcastIp = mcastRouteInfo.route().group();
+ // Process the update event
+ processSourceUpdatedInternal(mcastIp, newSource, oldSource);
+ }
+
+ /**
* Processes the SINK_ADDED event.
*
* @param event McastEvent with SINK_ADDED type
@@ -279,12 +297,59 @@
}
// Get group ip and ingress connect point
IpAddress mcastIp = mcastRouteInfo.route().group();
- ConnectPoint source = mcastRouteInfo.source().get();
+ ConnectPoint source = mcastRouteInfo.source().orElse(null);
processRouteRemovedInternal(source, mcastIp);
}
/**
+ * Process the SOURCE_UPDATED event.
+ *
+ * @param newSource the updated srouce info
+ * @param oldSource the outdated source info
+ */
+ private void processSourceUpdatedInternal(IpAddress mcastIp,
+ ConnectPoint newSource,
+ ConnectPoint oldSource) {
+ lastMcastChange = Instant.now();
+ mcastLock();
+ try {
+ log.debug("Processing source updated for group {}", mcastIp);
+
+ // Build key for the store and retrieve old data
+ McastStoreKey mcastStoreKey = new McastStoreKey(mcastIp, oldSource.deviceId());
+
+ // Verify leadership on the operation
+ if (!isLeader(oldSource)) {
+ log.debug("Skip {} due to lack of leadership", mcastIp);
+ return;
+ }
+
+ // This device is not serving this multicast group
+ if (!mcastRoleStore.containsKey(mcastStoreKey) ||
+ !mcastNextObjStore.containsKey(mcastStoreKey)) {
+ log.warn("{} is not serving {}. Abort.", oldSource.deviceId(), mcastIp);
+ return;
+ }
+ NextObjective nextObjective = mcastNextObjStore.get(mcastStoreKey).value();
+ Set<PortNumber> outputPorts = getPorts(nextObjective.next());
+
+ // Let's remove old flows and groups
+ removeGroupFromDevice(oldSource.deviceId(), mcastIp, assignedVlan(oldSource));
+ // Push new flows and group
+ outputPorts.forEach(portNumber -> addPortToDevice(newSource.deviceId(), portNumber,
+ mcastIp, assignedVlan(newSource)));
+ addFilterToDevice(newSource.deviceId(), newSource.port(),
+ assignedVlan(newSource), mcastIp);
+ // Setup mcast roles
+ mcastRoleStore.put(new McastStoreKey(mcastIp, newSource.deviceId()),
+ McastRole.INGRESS);
+ } finally {
+ mcastUnlock();
+ }
+ }
+
+ /**
* Removes the entire mcast tree related to this group.
*
* @param mcastIp multicast group IP address
@@ -293,7 +358,7 @@
lastMcastChange = Instant.now();
mcastLock();
try {
- log.debug("Processing route down for group {}", mcastIp);
+ log.debug("Processing route removed for group {}", mcastIp);
// Find out the ingress, transit and egress device of the affected group
DeviceId ingressDevice = getDevice(mcastIp, McastRole.INGRESS)
@@ -322,7 +387,6 @@
if (ingressDevice != null) {
removeGroupFromDevice(ingressDevice, mcastIp, assignedVlan(source));
}
-
} finally {
mcastUnlock();
}
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index a47101d..41bb452 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -1533,6 +1533,9 @@
case SOURCE_ADDED:
mcastHandler.processSourceAdded(event);
break;
+ case SOURCE_UPDATED:
+ mcastHandler.processSourceUpdated(event);
+ break;
case SINK_ADDED:
mcastHandler.processSinkAdded(event);
break;
diff --git a/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java b/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java
index 1e1dd95..caa0284 100644
--- a/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java
+++ b/core/api/src/main/java/org/onosproject/net/mcast/McastEvent.java
@@ -17,6 +17,8 @@
import org.onosproject.event.AbstractEvent;
+import java.util.Objects;
+
import static com.google.common.base.MoreObjects.toStringHelper;
/**
@@ -46,6 +48,11 @@
SOURCE_ADDED,
/**
+ * A source for a mcast route has been updated.
+ */
+ SOURCE_UPDATED,
+
+ /**
* A sink for a mcast route (ie. the subject) has been added.
*/
SINK_ADDED,
@@ -56,15 +63,74 @@
SINK_REMOVED
}
+ // Used when an update event happens
+ private McastRouteInfo prevSubject;
+
+ /**
+ * Creates a McastEvent of a given type using the subject.
+ *
+ * @param type the event type
+ * @param subject the subject of the event type
+ */
public McastEvent(McastEvent.Type type, McastRouteInfo subject) {
super(type, subject);
}
+ /**
+ * Creates a McastEvent of a given type using the subject and
+ * the previous subject.
+ *
+ * @param type the event type
+ * @param subject the subject of the event
+ * @param prevSubject the previous subject of the event
+ */
+ public McastEvent(McastEvent.Type type, McastRouteInfo subject,
+ McastRouteInfo prevSubject) {
+ super(type, subject);
+ // For now we have just this kind of updates
+ if (type == Type.SOURCE_UPDATED) {
+ this.prevSubject = prevSubject;
+ }
+ }
+
+ /**
+ * Gets the previous subject in this mcast event.
+ *
+ * @return the previous subject, or null if previous subject is not
+ * specified.
+ */
+ public McastRouteInfo prevSubject() {
+ return this.prevSubject;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type(), subject(), prevSubject);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+
+ if (!(other instanceof McastEvent)) {
+ return false;
+ }
+
+ McastEvent that = (McastEvent) other;
+
+ return Objects.equals(this.subject(), that.subject()) &&
+ Objects.equals(this.type(), that.type()) &&
+ Objects.equals(this.prevSubject, that.prevSubject);
+ }
@Override
public String toString() {
return toStringHelper(this)
.add("type", type())
- .add("info", subject()).toString();
+ .add("info", subject())
+ .add("prevInfo", prevSubject())
+ .toString();
}
}
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
index b9fb869..83dee45 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/DistributedMcastStore.java
@@ -135,11 +135,28 @@
checkNotNull(newData);
checkNotNull(oldData);
+ // They are not equal
if (!Objects.equal(oldData.source(), newData.source())) {
- notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
- mcastRouteInfo(route,
- newData.sinks(),
- newData.source())));
+ // Both not null, it is an update event
+ if (oldData.source() != null && newData.source() != null) {
+ // Broadcast old and new data
+ notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_UPDATED,
+ mcastRouteInfo(route,
+ newData.sinks(),
+ newData.source()),
+ mcastRouteInfo(route,
+ oldData.sinks(),
+ oldData.source())));
+ } else if (oldData.source() == null && newData.source() != null) {
+ // It is a source added event, broadcast new data
+ notifyDelegate(new McastEvent(McastEvent.Type.SOURCE_ADDED,
+ mcastRouteInfo(route,
+ newData.sinks(),
+ newData.source())));
+ } else {
+ // Scenario not managed for now
+ log.warn("Unhandled scenario {} - new {} - old {}", event.type());
+ }
} else {
Sets.difference(newData.sinks(), oldData.sinks()).forEach(sink ->
notifyDelegate(new McastEvent(McastEvent.Type.SINK_ADDED,
diff --git a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
index b1214830..565a81d 100644
--- a/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
+++ b/core/store/dist/src/main/java/org/onosproject/store/mcast/impl/MulticastData.java
@@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
/**
@@ -101,4 +102,12 @@
Objects.equals(sinks(), other.sinks()) &&
Objects.equals(isEmpty, other.isEmpty);
}
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("source", source())
+ .add("sinks", sinks())
+ .toString();
+ }
}