Handle host event in separate thread in mcast route service
Change-Id: Idfe083218adfac6a1a0a073f92fd3177fc4386ac
diff --git a/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java b/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java
index 9f15b6c..bc848d1 100644
--- a/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java
+++ b/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java
@@ -45,9 +45,12 @@
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -71,9 +74,11 @@
protected HostService hostService;
private HostListener hostListener = new InternalHostListener();
+ private ExecutorService hostEventExecutor;
@Activate
public void activate() {
+ hostEventExecutor = Executors.newSingleThreadExecutor(groupedThreads("mcast-event-host", "%d", log));
hostService.addListener(hostListener);
eventDispatcher.addSink(McastEvent.class, listenerRegistry);
store.setDelegate(delegate);
@@ -82,6 +87,7 @@
@Deactivate
public void deactivate() {
+ hostEventExecutor.shutdown();
hostService.removeListener(hostListener);
store.unsetDelegate(delegate);
eventDispatcher.removeSink(McastEvent.class);
@@ -279,62 +285,64 @@
@Override
public void event(HostEvent event) {
- HostId hostId = event.subject().id();
- log.debug("Host event: {}", event);
- Set<McastRoute> routesForSource = routesForSource(hostId);
- Set<McastRoute> routesForSink = routesForSink(hostId);
- switch (event.type()) {
- case HOST_ADDED:
- //the host is added, if it already comes with some locations let's use them
- if (!routesForSource.isEmpty()) {
- eventAddSources(hostId, event.subject().locations(), routesForSource);
- }
- if (!routesForSink.isEmpty()) {
- eventAddSinks(hostId, event.subject().locations(), routesForSink);
- }
- break;
- case HOST_MOVED:
- //both subjects must be null or the system is in an incoherent state
- if ((event.prevSubject() != null && event.subject() != null)) {
- //we compute the difference between old locations and new ones and remove the previous
- Set<HostLocation> removedConnectPoint = Sets.difference(event.prevSubject().locations(),
+ hostEventExecutor.execute(() -> {
+ HostId hostId = event.subject().id();
+ log.debug("Host event: {}", event);
+ Set<McastRoute> routesForSource = routesForSource(hostId);
+ Set<McastRoute> routesForSink = routesForSink(hostId);
+ switch (event.type()) {
+ case HOST_ADDED:
+ //the host is added, if it already comes with some locations let's use them
+ if (!routesForSource.isEmpty()) {
+ eventAddSources(hostId, event.subject().locations(), routesForSource);
+ }
+ if (!routesForSink.isEmpty()) {
+ eventAddSinks(hostId, event.subject().locations(), routesForSink);
+ }
+ break;
+ case HOST_MOVED:
+ //both subjects must be null or the system is in an incoherent state
+ if ((event.prevSubject() != null && event.subject() != null)) {
+ //we compute the difference between old locations and new ones and remove the previous
+ Set<HostLocation> removedConnectPoint = Sets.difference(event.prevSubject().locations(),
event.subject().locations()).immutableCopy();
- if (!removedConnectPoint.isEmpty()) {
- if (!routesForSource.isEmpty()) {
- eventRemoveSources(hostId, removedConnectPoint, routesForSource);
+ if (!removedConnectPoint.isEmpty()) {
+ if (!routesForSource.isEmpty()) {
+ eventRemoveSources(hostId, removedConnectPoint, routesForSource);
+ }
+ if (!routesForSink.isEmpty()) {
+ eventRemoveSinks(hostId, removedConnectPoint, routesForSink);
+ }
}
- if (!routesForSink.isEmpty()) {
- eventRemoveSinks(hostId, removedConnectPoint, routesForSink);
- }
- }
- Set<HostLocation> addedConnectPoints = Sets.difference(event.subject().locations(),
+ Set<HostLocation> addedConnectPoints = Sets.difference(event.subject().locations(),
event.prevSubject().locations()).immutableCopy();
- //if the host now has some new locations we add them to the sinks set
- if (!addedConnectPoints.isEmpty()) {
- if (!routesForSource.isEmpty()) {
- eventAddSources(hostId, addedConnectPoints, routesForSource);
- }
- if (!routesForSink.isEmpty()) {
- eventAddSinks(hostId, addedConnectPoints, routesForSink);
+ //if the host now has some new locations we add them to the sinks set
+ if (!addedConnectPoints.isEmpty()) {
+ if (!routesForSource.isEmpty()) {
+ eventAddSources(hostId, addedConnectPoints, routesForSource);
+ }
+ if (!routesForSink.isEmpty()) {
+ eventAddSinks(hostId, addedConnectPoints, routesForSink);
+ }
}
}
- }
- break;
- case HOST_REMOVED:
- // Removing all the connect points for that specific host
- // even if the locations are 0 we keep
- // the host information in the route in case it shows up again
- if (!routesForSource.isEmpty()) {
- eventRemoveSources(hostId, event.subject().locations(), routesForSource);
- }
- if (!routesForSink.isEmpty()) {
- eventRemoveSinks(hostId, event.subject().locations(), routesForSink);
- }
- break;
- case HOST_UPDATED:
- default:
- log.debug("Host event {} not handled", event.type());
- }
+ break;
+ case HOST_REMOVED:
+ // Removing all the connect points for that specific host
+ // even if the locations are 0 we keep
+ // the host information in the route in case it shows up again
+ if (!routesForSource.isEmpty()) {
+ eventRemoveSources(hostId, event.subject().locations(), routesForSource);
+ }
+ if (!routesForSink.isEmpty()) {
+ eventRemoveSinks(hostId, event.subject().locations(), routesForSink);
+ }
+ break;
+ case HOST_UPDATED:
+ default:
+ log.debug("Host event {} not handled", event.type());
+ }
+ });
}
}