Handle host event in separate thread in mcast route service

Change-Id: Idfe083218adfac6a1a0a073f92fd3177fc4386ac
(cherry picked from commit d939eeb1ca0e53ed7d57f410b96f278d917242e7)
diff --git a/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java b/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java
index 9f15b6c..bc848d1 100644
--- a/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java
+++ b/apps/mcast/impl/src/main/java/org/onosproject/mcast/impl/MulticastRouteManager.java
@@ -45,9 +45,12 @@
 import java.util.HashSet;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -71,9 +74,11 @@
     protected HostService hostService;
 
     private HostListener hostListener = new InternalHostListener();
+    private ExecutorService hostEventExecutor;
 
     @Activate
     public void activate() {
+        hostEventExecutor = Executors.newSingleThreadExecutor(groupedThreads("mcast-event-host", "%d", log));
         hostService.addListener(hostListener);
         eventDispatcher.addSink(McastEvent.class, listenerRegistry);
         store.setDelegate(delegate);
@@ -82,6 +87,7 @@
 
     @Deactivate
     public void deactivate() {
+        hostEventExecutor.shutdown();
         hostService.removeListener(hostListener);
         store.unsetDelegate(delegate);
         eventDispatcher.removeSink(McastEvent.class);
@@ -279,62 +285,64 @@
 
         @Override
         public void event(HostEvent event) {
-            HostId hostId = event.subject().id();
-            log.debug("Host event: {}", event);
-            Set<McastRoute> routesForSource = routesForSource(hostId);
-            Set<McastRoute> routesForSink = routesForSink(hostId);
-            switch (event.type()) {
-                case HOST_ADDED:
-                    //the host is added, if it already comes with some locations let's use them
-                    if (!routesForSource.isEmpty()) {
-                        eventAddSources(hostId, event.subject().locations(), routesForSource);
-                    }
-                    if (!routesForSink.isEmpty()) {
-                        eventAddSinks(hostId, event.subject().locations(), routesForSink);
-                    }
-                    break;
-                case HOST_MOVED:
-                    //both subjects must be null or the system is in an incoherent state
-                    if ((event.prevSubject() != null && event.subject() != null)) {
-                        //we compute the difference between old locations and new ones and remove the previous
-                        Set<HostLocation> removedConnectPoint = Sets.difference(event.prevSubject().locations(),
+            hostEventExecutor.execute(() -> {
+                HostId hostId = event.subject().id();
+                log.debug("Host event: {}", event);
+                Set<McastRoute> routesForSource = routesForSource(hostId);
+                Set<McastRoute> routesForSink = routesForSink(hostId);
+                switch (event.type()) {
+                    case HOST_ADDED:
+                        //the host is added, if it already comes with some locations let's use them
+                        if (!routesForSource.isEmpty()) {
+                            eventAddSources(hostId, event.subject().locations(), routesForSource);
+                        }
+                        if (!routesForSink.isEmpty()) {
+                            eventAddSinks(hostId, event.subject().locations(), routesForSink);
+                        }
+                        break;
+                    case HOST_MOVED:
+                        //both subjects must be null or the system is in an incoherent state
+                        if ((event.prevSubject() != null && event.subject() != null)) {
+                            //we compute the difference between old locations and new ones and remove the previous
+                            Set<HostLocation> removedConnectPoint = Sets.difference(event.prevSubject().locations(),
                                 event.subject().locations()).immutableCopy();
-                        if (!removedConnectPoint.isEmpty()) {
-                            if (!routesForSource.isEmpty()) {
-                                eventRemoveSources(hostId, removedConnectPoint, routesForSource);
+                            if (!removedConnectPoint.isEmpty()) {
+                                if (!routesForSource.isEmpty()) {
+                                    eventRemoveSources(hostId, removedConnectPoint, routesForSource);
+                                }
+                                if (!routesForSink.isEmpty()) {
+                                    eventRemoveSinks(hostId, removedConnectPoint, routesForSink);
+                                }
                             }
-                            if (!routesForSink.isEmpty()) {
-                                eventRemoveSinks(hostId, removedConnectPoint, routesForSink);
-                            }
-                        }
-                        Set<HostLocation> addedConnectPoints = Sets.difference(event.subject().locations(),
+                            Set<HostLocation> addedConnectPoints = Sets.difference(event.subject().locations(),
                                 event.prevSubject().locations()).immutableCopy();
-                        //if the host now has some new locations we add them to the sinks set
-                        if (!addedConnectPoints.isEmpty()) {
-                            if (!routesForSource.isEmpty()) {
-                                eventAddSources(hostId, addedConnectPoints, routesForSource);
-                            }
-                            if (!routesForSink.isEmpty()) {
-                                eventAddSinks(hostId, addedConnectPoints, routesForSink);
+                            //if the host now has some new locations we add them to the sinks set
+                            if (!addedConnectPoints.isEmpty()) {
+                                if (!routesForSource.isEmpty()) {
+                                    eventAddSources(hostId, addedConnectPoints, routesForSource);
+                                }
+                                if (!routesForSink.isEmpty()) {
+                                    eventAddSinks(hostId, addedConnectPoints, routesForSink);
+                                }
                             }
                         }
-                    }
-                    break;
-                case HOST_REMOVED:
-                    // Removing all the connect points for that specific host
-                    // even if the locations are 0 we keep
-                    // the host information in the route in case it shows up again
-                    if (!routesForSource.isEmpty()) {
-                        eventRemoveSources(hostId, event.subject().locations(), routesForSource);
-                    }
-                    if (!routesForSink.isEmpty()) {
-                        eventRemoveSinks(hostId, event.subject().locations(), routesForSink);
-                    }
-                    break;
-                case HOST_UPDATED:
-                default:
-                    log.debug("Host event {} not handled", event.type());
-            }
+                        break;
+                    case HOST_REMOVED:
+                        // Removing all the connect points for that specific host
+                        // even if the locations are 0 we keep
+                        // the host information in the route in case it shows up again
+                        if (!routesForSource.isEmpty()) {
+                            eventRemoveSources(hostId, event.subject().locations(), routesForSource);
+                        }
+                        if (!routesForSink.isEmpty()) {
+                            eventRemoveSinks(hostId, event.subject().locations(), routesForSink);
+                        }
+                        break;
+                    case HOST_UPDATED:
+                    default:
+                        log.debug("Host event {} not handled", event.type());
+                }
+            });
         }
     }