Fix HOST event handling in MulticastRouteManager
Change-Id: I721470bd1879c1dc252346a0f4f085ca80f54156
diff --git a/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastRouteData.java b/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastRouteData.java
index 9965818..e95c417 100644
--- a/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastRouteData.java
+++ b/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastRouteData.java
@@ -97,9 +97,7 @@
*/
public void addSources(Set<ConnectPoint> sources) {
checkArgument(!sources.contains(null));
- sources.forEach(source -> {
- this.sources.put(source, true);
- });
+ sources.forEach(source -> this.sources.put(source, true));
}
/**
@@ -129,7 +127,12 @@
public void addSinks(HostId hostId, Set<ConnectPoint> sinks) {
checkNotNull(hostId);
checkArgument(!sinks.contains(null));
- this.sinks.put(hostId, sinks);
+ //if existing we add to current set, otherwise we put them all
+ if (this.sinks.containsKey(hostId)) {
+ this.sinks.get(hostId).addAll(sinks);
+ } else {
+ this.sinks.put(hostId, sinks);
+ }
}
/**
@@ -168,7 +171,10 @@
public void removeSinks(HostId hostId, Set<ConnectPoint> sinks) {
checkNotNull(hostId);
checkArgument(!sinks.contains(null));
- this.sinks.get(hostId).removeAll(sinks);
+ //if existing we remove from current set, otherwise just skip them
+ if (this.sinks.containsKey(hostId)) {
+ this.sinks.get(hostId).removeAll(sinks);
+ }
}
/**
diff --git a/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastStore.java b/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastStore.java
index 53d444b..aa429b2 100644
--- a/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastStore.java
+++ b/apps/mcast/api/src/main/java/org/onosproject/mcast/api/McastStore.java
@@ -23,7 +23,7 @@
import java.util.Set;
/**
- * Entity responsible for storing multicast state information.
+ * Entity responsible for storing Multicast state information.
*/
@Beta
public interface McastStore extends Store<McastEvent, McastStoreDelegate> {
@@ -31,21 +31,21 @@
/**
* Updates the store with the route information.
*
- * @param route a multicast route
+ * @param route a Multicast route
*/
void storeRoute(McastRoute route);
/**
- * Updates the store with the route information.
+ * Removes from the store the route information.
*
- * @param route a multicast route
+ * @param route a Multicast route
*/
void removeRoute(McastRoute route);
/**
* Add to the store with source information for the given route.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @param sources a set of sources
*/
void storeSources(McastRoute route, Set<ConnectPoint> sources);
@@ -53,7 +53,7 @@
/**
* Removes from the store all the sources information for a given route.
*
- * @param route a multicast route
+ * @param route a Multicast route
*/
void removeSources(McastRoute route);
@@ -61,7 +61,7 @@
* Removes from the store the source information for the given route.
* value.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @param sources a set of sources
*/
void removeSources(McastRoute route, Set<ConnectPoint> sources);
@@ -70,7 +70,7 @@
* Updates the store with a host based sink information for a given route. There may be
* multiple sink connect points for the given host.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @param hostId the host sink
* @param sinks the sinks
*/
@@ -81,7 +81,7 @@
* The sinks stored with this method are not tied with any host.
* Traffic will be sent to all of them.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @param sinks set of specific connect points
*/
void addSinks(McastRoute route, Set<ConnectPoint> sinks);
@@ -89,73 +89,72 @@
/**
* Removes from the store all the sink information for a given route.
*
- * @param route a multicast route
+ * @param route a Multicast route
*/
void removeSinks(McastRoute route);
/**
* Removes from the store the complete set of sink information for a given host for a given route.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @param hostId a specific host
*/
void removeSink(McastRoute route, HostId hostId);
/**
- * Removes from the store the given set of sink information for a given host for a given route.
+ * Removes a set of sink connect points for a given host the route.
*
- * @param route a multicast route
- * @param hostId the host
- * @param sinks a set of multicast sink connect points
+ * @param route the multicast route
+ * @param hostId a sink host
+ * @param connectPoints a given set of connect points to remove
*/
- void removeSinks(McastRoute route, HostId hostId, Set<ConnectPoint> sinks);
+ void removeSinks(McastRoute route, HostId hostId, Set<ConnectPoint> connectPoints);
/**
* Removes from the store the set of non host bind sink information for a given route.
*
- * @param route a multicast route
- * @param sinks a set of multicast sinks
+ * @param route a Multicast route
+ * @param sinks a set of Multicast sinks
*/
void removeSinks(McastRoute route, Set<ConnectPoint> sinks);
/**
- * Obtains the sources for a multicast route.
+ * Obtains the sources for a Multicast route.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @return a connect point
*/
Set<ConnectPoint> sourcesFor(McastRoute route);
/**
- * Obtains the sinks for a multicast route.
+ * Obtains the sinks for a Multicast route.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @return a set of sinks
*/
Set<ConnectPoint> sinksFor(McastRoute route);
/**
- * Obtains the sinks for a given host for a given multicast route.
+ * Obtains the sinks for a given host for a given Multicast route.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @param hostId the host
* @return a set of sinks
*/
Set<ConnectPoint> sinksFor(McastRoute route, HostId hostId);
-
/**
- * Gets the set of all known multicast routes.
+ * Gets the set of all known Multicast routes.
*
- * @return set of multicast routes.
+ * @return set of Multicast routes.
*/
Set<McastRoute> getRoutes();
/**
- * Gets the multicast data for a given route.
+ * Gets the Multicast data for a given route.
*
* @param route the route
- * @return set of multicast routes.
+ * @return set of Multicast routes.
*/
McastRouteData getRouteData(McastRoute route);
}
diff --git a/apps/mcast/api/src/main/java/org/onosproject/mcast/api/MulticastRouteService.java b/apps/mcast/api/src/main/java/org/onosproject/mcast/api/MulticastRouteService.java
index d9b1721..37ebcd5 100644
--- a/apps/mcast/api/src/main/java/org/onosproject/mcast/api/MulticastRouteService.java
+++ b/apps/mcast/api/src/main/java/org/onosproject/mcast/api/MulticastRouteService.java
@@ -21,11 +21,10 @@
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.HostId;
-import java.util.Optional;
import java.util.Set;
/**
- * A service interface for maintaining multicast information.
+ * A service interface for maintaining Multicast information.
*/
@Beta
public interface MulticastRouteService
@@ -34,73 +33,73 @@
/**
* Adds an empty route to the information base for the given group IP.
*
- * @param route a multicast route
+ * @param route a Multicast route
*/
void add(McastRoute route);
/**
* Removes a route from the information base.
*
- * @param route a multicast route
+ * @param route a Multicast route
*/
void remove(McastRoute route);
/**
- * Gets all multicast routes in the system.
+ * Gets all Multicast routes in the system.
*
- * @return set of multicast routes
+ * @return set of Multicast routes
*/
Set<McastRoute> getRoutes();
/**
- * Gets a multicast route in the system.
+ * Gets a Multicast route in the system.
*
- * @param groupIp multicast group IP address
- * @param sourceIp multicasto source Ip address
- * @return set of multicast routes
+ * @param groupIp Multicast group IP address
+ * @param sourceIp Multicasto source Ip address
+ * @return set of Multicast routes
*/
- Optional<McastRoute> getRoute(IpAddress groupIp, IpAddress sourceIp);
+ Set<McastRoute> getRoute(IpAddress groupIp, IpAddress sourceIp);
/**
- * Adds a set of source to the route from where the
+ * Adds a set of sources connect points to the route from where the
* data stream is originating.
*
- * @param route the multicast route
+ * @param route the Multicast route
* @param sources a set of sources
*/
void addSources(McastRoute route, Set<ConnectPoint> sources);
/**
- * Removes all the sources from the route.
+ * Removes all the sources connect points from the route.
*
- * @param route the multicast route
+ * @param route the Multicast route
*/
void removeSources(McastRoute route);
/**
- * Removes a set of sources from the route.
+ * Removes a set of sources connect points from the route.
*
- * @param route the multicast route
+ * @param route the Multicast route
* @param sources a set of sources
*/
void removeSources(McastRoute route, Set<ConnectPoint> sources);
/**
- * Adds a set of sink to the route to which a data stream should be
+ * Adds a sink to the route to which a data stream should be
* sent to.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @param hostId a sink host
*/
void addSink(McastRoute route, HostId hostId);
/**
* Adds a set of sink to the route to which a data stream should be
- * sent to. If this method is used this the connect points will all
- * be used a sink for that Mcast Tree. For dual-homed sinks please use
- * {@link #addSink(McastRoute route, HostId hostId) addSink}.
+ * sent to. If this method is used the connect points will all be
+ * used as different sinks for that Mcast Tree. For dual-homed sinks
+ * please use {@link #addSink(McastRoute route, HostId hostId) addSink}.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @param sinks a set of sink connect point
*/
void addSink(McastRoute route, Set<ConnectPoint> sinks);
@@ -108,40 +107,31 @@
/**
* Removes all the sinks from the route.
*
- * @param route the multicast route
+ * @param route the Multicast route
*/
void removeSinks(McastRoute route);
/**
- * Removes a sink host from the route.
+ * Removes a sink from the route.
*
- * @param route the multicast route
+ * @param route the Multicast route
* @param hostId a sink host
*/
void removeSink(McastRoute route, HostId hostId);
/**
- * Removes a set of sink connect points for a given host the route.
- *
- * @param route the multicast route
- * @param hostId a sink host
- * @param connectPoints a given set of connect points to remove
- */
- void removeSinks(McastRoute route, HostId hostId, Set<ConnectPoint> connectPoints);
-
- /**
* Removes a set of sinks to the route to which a data stream should be
* sent to. If this method is used the mcast tree does not work
* for any other sink until it's added. For dual-homed sinks please use
* {@link #removeSink(McastRoute route, HostId hostId) removeSink}.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @param sink a sink connect point
*/
void removeSinks(McastRoute route, Set<ConnectPoint> sink);
/**
- * Return the Data for this route.
+ * Return the Multicast data for this route.
*
* @param route route
* @return the mcast route data
@@ -149,9 +139,9 @@
McastRouteData routeData(McastRoute route);
/**
- * Find the data source association for this multicast route.
+ * Find the data source association for this Multicast route.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @return a connect point
*/
Set<ConnectPoint> sources(McastRoute route);
@@ -159,24 +149,24 @@
/**
* Find the list of sinks for this route.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @return a list of connect points
*/
Set<ConnectPoint> sinks(McastRoute route);
/**
- * Find the list of sinks for a given host for this route.
+ * Find the set of connect points for a given sink for this route.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @param hostId the host
* @return a list of connect points
*/
Set<ConnectPoint> sinks(McastRoute route, HostId hostId);
/**
- * Obtains all the non host specific sinks for a multicast route.
+ * Obtains all the non host specific sinks for a Multicast route.
*
- * @param route a multicast route
+ * @param route a Multicast route
* @return a set of sinks
*/
Set<ConnectPoint> nonHostSinks(McastRoute route);
diff --git a/apps/mcast/cli/src/main/java/org/onosproject/mcast/cli/McastHostDeleteCommand.java b/apps/mcast/cli/src/main/java/org/onosproject/mcast/cli/McastHostDeleteCommand.java
index 6d872e8..88250bb 100644
--- a/apps/mcast/cli/src/main/java/org/onosproject/mcast/cli/McastHostDeleteCommand.java
+++ b/apps/mcast/cli/src/main/java/org/onosproject/mcast/cli/McastHostDeleteCommand.java
@@ -21,13 +21,8 @@
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.mcast.api.McastRoute;
import org.onosproject.mcast.api.MulticastRouteService;
-import org.onosproject.net.ConnectPoint;
import org.onosproject.net.HostId;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.stream.Collectors;
-
/**
* Deletes a multicast route.
*/
@@ -60,13 +55,6 @@
valueToShowInHelp = "00:00:00:00:00:00/None")
String host = null;
- @Option(name = "-cps", aliases = "--connectPoint",
- description = "Egress port of:XXXXXXXXXX/XX",
- valueToShowInHelp = "of:0000000000000001/1",
- multiValued = true)
- String[] egressList = null;
-
-
@Override
protected void execute() {
MulticastRouteService mcastRouteManager = get(MulticastRouteService.class);
@@ -89,12 +77,7 @@
print(U_FORMAT_MAPPING, mRoute.type(), mRoute.group(), mRoute.source());
return;
}
- if (host != null && egressList != null) {
- Set<ConnectPoint> sinksSet = Arrays.stream(egressList)
- .map(ConnectPoint::deviceConnectPoint)
- .collect(Collectors.toSet());
- mcastRouteManager.removeSinks(mRoute, hostId, sinksSet);
- } else if (host != null) {
+ if (host != null) {
mcastRouteManager.removeSink(mRoute, hostId);
}
print(U_FORMAT_MAPPING, mRoute.type(), mRoute.group(), mRoute.source());
diff --git a/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/DistributedMcastRoutesStore.java b/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/DistributedMcastRoutesStore.java
index 8bdf42e..8c90a1d 100644
--- a/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/DistributedMcastRoutesStore.java
+++ b/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/DistributedMcastRoutesStore.java
@@ -46,7 +46,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -75,9 +74,6 @@
private MapEventListener<McastRoute, McastRouteData> mcastRouteListener =
new McastRouteListener();
- private ScheduledExecutorService executor;
-
-
@Activate
public void activate() {
mcastRib = storageService.<McastRoute, McastRouteData>consistentMapBuilder()
@@ -117,7 +113,7 @@
@Override
public void storeSources(McastRoute route, Set<ConnectPoint> sources) {
- McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
+ mcastRoutes.compute(route, (k, v) -> {
v.addSources(sources);
return v;
});
@@ -125,7 +121,7 @@
@Override
public void removeSources(McastRoute route) {
- McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
+ mcastRoutes.compute(route, (k, v) -> {
v.removeSources();
return v;
});
@@ -133,7 +129,7 @@
@Override
public void removeSources(McastRoute route, Set<ConnectPoint> sources) {
- McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
+ mcastRoutes.compute(route, (k, v) -> {
v.removeSources(sources);
return v;
});
@@ -142,7 +138,7 @@
@Override
public void addSink(McastRoute route, HostId hostId, Set<ConnectPoint> sinks) {
- McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
+ mcastRoutes.compute(route, (k, v) -> {
v.addSinks(hostId, sinks);
return v;
});
@@ -150,7 +146,7 @@
@Override
public void addSinks(McastRoute route, Set<ConnectPoint> sinks) {
- McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
+ mcastRoutes.compute(route, (k, v) -> {
v.addSinks(HostId.NONE, sinks);
return v;
});
@@ -159,7 +155,7 @@
@Override
public void removeSinks(McastRoute route) {
- McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
+ mcastRoutes.compute(route, (k, v) -> {
v.removeSinks();
return v;
});
@@ -167,7 +163,7 @@
@Override
public void removeSink(McastRoute route, HostId hostId) {
- McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
+ mcastRoutes.compute(route, (k, v) -> {
v.removeSinks(hostId);
return v;
});
@@ -175,7 +171,7 @@
@Override
public void removeSinks(McastRoute route, HostId hostId, Set<ConnectPoint> sinks) {
- McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
+ mcastRoutes.compute(route, (k, v) -> {
v.removeSinks(hostId, sinks);
return v;
});
@@ -183,7 +179,7 @@
@Override
public void removeSinks(McastRoute route, Set<ConnectPoint> sinks) {
- McastRouteData data = mcastRoutes.compute(route, (k, v) -> {
+ mcastRoutes.compute(route, (k, v) -> {
v.removeSinks(HostId.NONE, sinks);
return v;
});
diff --git a/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java b/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java
index 6ed8b4b..43b145c 100644
--- a/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java
+++ b/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java
@@ -15,7 +15,9 @@
*/
package org.onosproject.mcast.impl;
+import com.google.common.base.Objects;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -34,6 +36,7 @@
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.Host;
import org.onosproject.net.HostId;
+import org.onosproject.net.HostLocation;
import org.onosproject.net.host.HostEvent;
import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
@@ -42,6 +45,7 @@
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.slf4j.LoggerFactory.getLogger;
@@ -104,11 +108,13 @@
}
@Override
- public Optional<McastRoute> getRoute(IpAddress groupIp, IpAddress sourceIp) {
- return store.getRoutes().stream().filter(route ->
- route.group().equals(groupIp) &&
- route.source().isPresent() &&
- route.source().get().equals(sourceIp)).findAny();
+ public Set<McastRoute> getRoute(IpAddress groupIp, IpAddress sourceIp) {
+ // Let's transform it into an optional
+ final Optional<IpAddress> source = Optional.ofNullable(sourceIp);
+ return store.getRoutes().stream()
+ .filter(route -> route.group().equals(groupIp) &&
+ Objects.equal(route.source(), source))
+ .collect(Collectors.toSet());
}
@Override
@@ -178,19 +184,10 @@
}
@Override
- public void removeSinks(McastRoute route, HostId hostId, Set<ConnectPoint> connectPoints) {
- checkNotNull(route, "Route cannot be null");
- if (checkRoute(route)) {
- store.removeSinks(route, hostId, connectPoints);
- }
-
- }
-
- @Override
public void removeSinks(McastRoute route, Set<ConnectPoint> connectPoints) {
checkNotNull(route, "Route cannot be null");
if (checkRoute(route)) {
- store.removeSinks(route, HostId.NONE, connectPoints);
+ store.removeSinks(route, connectPoints);
}
}
@@ -227,7 +224,7 @@
private class InternalMcastStoreDelegate implements McastStoreDelegate {
@Override
public void notify(McastEvent event) {
- log.debug("Event: {}", event);
+ log.debug("Notify event: {}", event);
post(event);
}
}
@@ -246,39 +243,61 @@
@Override
public void event(HostEvent event) {
HostId hostId = event.subject().id();
- Set<ConnectPoint> sinks = new HashSet<>();
- log.debug("{} event", event);
- //FIXME ther must be a better way
- event.subject().locations().forEach(hostLocation -> sinks.add(
- ConnectPoint.deviceConnectPoint(hostLocation.deviceId() + "/" + hostLocation.port())));
+ log.debug("Host event: {}", event);
switch (event.type()) {
case HOST_ADDED:
- case HOST_UPDATED:
+ //the host is added, if it already comes with some locations let's use them
+ eventAddSinks(hostId, event.subject().locations());
+ break;
case HOST_MOVED:
- if ((event.prevSubject() == null && event.subject() != null)
- || (event.prevSubject().locations().size() > event.subject().locations().size())) {
- store.getRoutes().stream().filter(mcastRoute -> {
- return store.getRouteData(mcastRoute).sinks().get(hostId) != null;
- }).forEach(route -> {
- store.removeSinks(route, hostId, sinks);
- });
- } else if (event.prevSubject().locations().size() < event.subject().locations().size()) {
- store.getRoutes().stream().filter(mcastRoute -> {
- return store.getRouteData(mcastRoute).sinks().get(hostId) != null;
- }).forEach(route -> {
- store.addSink(route, hostId, sinks);
- });
+ //both subjects must be null or the system is in an incoherent state
+ if ((event.prevSubject() != null && event.subject() != null)) {
+ //we compute the difference between old locations and new ones and remove the previous
+ Set<HostLocation> removedSinks = Sets.difference(event.prevSubject().locations(),
+ event.subject().locations()).immutableCopy();
+ if (!removedSinks.isEmpty()) {
+ eventRemoveSinks(hostId, removedSinks);
+ }
+ Set<HostLocation> addedSinks = Sets.difference(event.subject().locations(),
+ event.prevSubject().locations()).immutableCopy();
+ //if the host now has some new locations we add them to the sinks set
+ if (!addedSinks.isEmpty()) {
+ eventAddSinks(hostId, addedSinks);
+ }
}
break;
case HOST_REMOVED:
- store.getRoutes().stream().filter(mcastRoute -> {
- return store.getRouteData(mcastRoute).sinks().get(hostId) != null;
- }).forEach(route -> {
- store.removeSink(route, hostId);
- });
+ // Removing all the sinks for that specific host
+ // even if the locations are 0 we keep
+ // the host information in the route in case it shows up again
+ eventRemoveSinks(event.subject().id(), event.subject().locations());
+ break;
+ case HOST_UPDATED:
default:
- log.debug("Host event {} not supported", event.type());
+ log.debug("Host event {} not handled", event.type());
}
}
}
+
+ //Adds sinks for a given host event
+ private void eventRemoveSinks(HostId hostId, Set<HostLocation> removedSinks) {
+ Set<ConnectPoint> sinks = new HashSet<>();
+ // Build sink using host location
+ sinks.addAll(removedSinks);
+ // Filter by host id and then remove from each route the provided sinks
+ store.getRoutes().stream().filter(mcastRoute -> store.getRouteData(mcastRoute)
+ .sinks().get(hostId) != null)
+ .forEach(route -> store.removeSinks(route, hostId, sinks));
+ }
+
+ //Removes the sinks for a given host event
+ private void eventAddSinks(HostId hostId, Set<HostLocation> addedSinks) {
+ Set<ConnectPoint> sinks = new HashSet<>();
+ // Build sink using host location
+ sinks.addAll(addedSinks);
+ // Filter by host id and then add to each route the provided sinks
+ store.getRoutes().stream().filter(mcastRoute -> store.getRouteData(mcastRoute)
+ .sinks().get(hostId) != null)
+ .forEach(route -> store.addSink(route, hostId, sinks));
+ }
}