Handle host, route and mcast events in separate executors

Change-Id: Ic9c4964533354e965691e6db5fa323df91cb4124
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index c17df68..7a857ac 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -135,7 +135,6 @@
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -241,20 +240,26 @@
     private SegmentRoutingNeighbourDispatcher neighbourHandler = null;
     private DefaultL2TunnelHandler l2TunnelHandler = null;
     private TopologyHandler topologyHandler = null;
-    private InternalEventHandler eventHandler = new InternalEventHandler();
     private final InternalHostListener hostListener = new InternalHostListener();
     private final InternalConfigListener cfgListener = new InternalConfigListener(this);
     private final InternalMcastListener mcastListener = new InternalMcastListener();
     private final InternalRouteEventListener routeListener = new InternalRouteEventListener();
     private final InternalTopologyListener topologyListener = new InternalTopologyListener();
 
-    private ScheduledExecutorService executorService = Executors
-            .newScheduledThreadPool(1, groupedThreads("SegmentRoutingManager", "event-%d", log));
+    // Handles device, link, topology and network config events
+    private ScheduledExecutorService mainEventExecutor = Executors
+            .newScheduledThreadPool(1, groupedThreads("sr-event-main", "%d", log));
 
-    @SuppressWarnings("rawtypes")
-    private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<>();
-    Map<DeviceId, DefaultGroupHandler> groupHandlerMap =
-            new ConcurrentHashMap<>();
+    // Handles host, route, mcast events
+    private ScheduledExecutorService hostEventExecutor = Executors
+            .newScheduledThreadPool(1, groupedThreads("sr-event-host", "%d", log));
+    private ScheduledExecutorService routeEventExecutor = Executors
+            .newScheduledThreadPool(1, groupedThreads("sr-event-route", "%d", log));
+    private ScheduledExecutorService mcastEventExecutor = Executors
+            .newScheduledThreadPool(1, groupedThreads("sr-event-mcast", "%d", log));
+
+
+    Map<DeviceId, DefaultGroupHandler> groupHandlerMap = new ConcurrentHashMap<>();
     /**
      * Per device next objective ID store with (device id + destination set) as key.
      * Used to keep track on MPLS group information.
@@ -997,175 +1002,149 @@
         }
     }
 
-    @SuppressWarnings("rawtypes")
-    private void scheduleEventHandlerIfNotScheduled(Event event) {
-        synchronized (THREAD_SCHED_LOCK) {
-            eventQueue.add(event);
-            numOfEventsQueued++;
-
-            if ((numOfHandlerScheduled - numOfHandlerExecution) == 0) {
-                //No pending scheduled event handling threads. So start a new one.
-                executorService.schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
-                numOfHandlerScheduled++;
-            }
-            log.trace("numOfEventsQueued {}, numOfEventHandlerScheduled {}",
-                      numOfEventsQueued,
-                      numOfHandlerScheduled);
-        }
-    }
-
     private class InternalEventHandler implements Runnable {
+        private Event event;
+
+        InternalEventHandler(Event event) {
+            this.event = event;
+        }
+
         @Override
         public void run() {
-            while (true) {
-                try {
-                    @SuppressWarnings("rawtypes")
-                    Event event;
-                    synchronized (THREAD_SCHED_LOCK) {
-                        if (!eventQueue.isEmpty()) {
-                            event = eventQueue.poll();
-                            numOfEventsExecuted++;
-                        } else {
-                            numOfHandlerExecution++;
-                            log.debug("numOfHandlerExecution {} numOfEventsExecuted {}",
-                                      numOfHandlerExecution, numOfEventsExecuted);
-                            break;
-                        }
-                    }
-                    // TODO We should also change SR routing and PW to listen to TopologyEvents
-                    if (event.type() == LinkEvent.Type.LINK_ADDED ||
-                            event.type() == LinkEvent.Type.LINK_UPDATED) {
-                        linkHandler.processLinkAdded((Link) event.subject());
-                    } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
-                        linkHandler.processLinkRemoved((Link) event.subject());
-                    } else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
-                            event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
-                            event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
-                        DeviceId deviceId = ((Device) event.subject()).id();
-                        if (deviceService.isAvailable(deviceId)) {
-                            log.info("** DEVICE UP Processing device event {} "
-                                    + "for available device {}",
-                                     event.type(), ((Device) event.subject()).id());
-                            processDeviceAdded((Device) event.subject());
-                        } else {
-                            log.info(" ** DEVICE DOWN Processing device event {}"
-                                    + " for unavailable device {}",
-                                     event.type(), ((Device) event.subject()).id());
-                            processDeviceRemoved((Device) event.subject());
-                        }
-                    } else if (event.type() == DeviceEvent.Type.PORT_ADDED) {
-                        // typically these calls come when device is added first time
-                        // so port filtering rules are handled at the device_added event.
-                        // port added calls represent all ports on the device,
-                        // enabled or not.
-                        log.trace("** PORT ADDED {}/{} -> {}",
-                                  ((DeviceEvent) event).subject().id(),
-                                  ((DeviceEvent) event).port().number(),
-                                  event.type());
-                    } else if (event.type() == DeviceEvent.Type.PORT_UPDATED) {
-                        // these calls happen for every subsequent event
-                        // ports enabled, disabled, switch goes away, comes back
-                        log.info("** PORT UPDATED {}/{} -> {}",
-                                 event.subject(),
-                                 ((DeviceEvent) event).port(),
-                                 event.type());
-                        processPortUpdated(((Device) event.subject()),
-                                           ((DeviceEvent) event).port());
-                    } else if (event.type() == TopologyEvent.Type.TOPOLOGY_CHANGED) {
-                        // Process topology event, needed for all modules relying on
-                        // topology service for path computation
-                        TopologyEvent topologyEvent = (TopologyEvent) event;
-                        log.info("Processing topology event {}, topology age {}, reasons {}",
-                                 event.type(), topologyEvent.subject().time(),
-                                 topologyEvent.reasons().size());
-                        topologyHandler.processTopologyChange(topologyEvent.reasons());
-                    } else if (event.type() == HostEvent.Type.HOST_ADDED) {
-                        hostHandler.processHostAddedEvent((HostEvent) event);
-                    } else if (event.type() == HostEvent.Type.HOST_MOVED) {
-                        hostHandler.processHostMovedEvent((HostEvent) event);
-                        routeHandler.processHostMovedEvent((HostEvent) event);
-                    } else if (event.type() == HostEvent.Type.HOST_REMOVED) {
-                        hostHandler.processHostRemovedEvent((HostEvent) event);
-                    } else if (event.type() == HostEvent.Type.HOST_UPDATED) {
-                        hostHandler.processHostUpdatedEvent((HostEvent) event);
-                    } else if (event.type() == RouteEvent.Type.ROUTE_ADDED) {
-                        routeHandler.processRouteAdded((RouteEvent) event);
-                    } else if (event.type() == RouteEvent.Type.ROUTE_UPDATED) {
-                        routeHandler.processRouteUpdated((RouteEvent) event);
-                    } else if (event.type() == RouteEvent.Type.ROUTE_REMOVED) {
-                        routeHandler.processRouteRemoved((RouteEvent) event);
-                    } else if (event.type() == RouteEvent.Type.ALTERNATIVE_ROUTES_CHANGED) {
-                        routeHandler.processAlternativeRoutesChanged((RouteEvent) event);
-                    } else if (event.type() == McastEvent.Type.SOURCES_ADDED ||
-                            event.type() == McastEvent.Type.SOURCES_REMOVED ||
-                            event.type() == McastEvent.Type.SINKS_ADDED ||
-                            event.type() == McastEvent.Type.SINKS_REMOVED ||
-                            event.type() == McastEvent.Type.ROUTE_ADDED ||
-                            event.type() == McastEvent.Type.ROUTE_REMOVED) {
-                        mcastHandler.processMcastEvent((McastEvent) event);
-                    } else if (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED) {
-                        NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
-                        Class configClass = netcfgEvent.configClass();
-                        if (configClass.equals(SegmentRoutingAppConfig.class)) {
-                            appCfgHandler.processAppConfigAdded(netcfgEvent);
-                            log.info("App config event .. configuring network");
-                            cfgListener.configureNetwork();
-                        } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
-                            log.info("Segment Routing Device Config added for {}", event.subject());
-                            cfgListener.configureNetwork();
-                        } else if (configClass.equals(XConnectConfig.class)) {
-                            xConnectHandler.processXConnectConfigAdded(netcfgEvent);
-                        } else if (configClass.equals(InterfaceConfig.class)) {
-                            log.info("Interface Config added for {}", event.subject());
-                            cfgListener.configureNetwork();
-                        } else {
-                            log.error("Unhandled config class: {}", configClass);
-                        }
-                    } else if (event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) {
-                        NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
-                        Class configClass = netcfgEvent.configClass();
-                        if (configClass.equals(SegmentRoutingAppConfig.class)) {
-                            appCfgHandler.processAppConfigUpdated(netcfgEvent);
-                            log.info("App config event .. configuring network");
-                            cfgListener.configureNetwork();
-                        } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
-                            log.info("Segment Routing Device Config updated for {}", event.subject());
-                            createOrUpdateDeviceConfiguration();
-                        } else if (configClass.equals(XConnectConfig.class)) {
-                            xConnectHandler.processXConnectConfigUpdated(netcfgEvent);
-                        } else if (configClass.equals(InterfaceConfig.class)) {
-                            log.info("Interface Config updated for {}", event.subject());
-                            createOrUpdateDeviceConfiguration();
-                            updateInterface((InterfaceConfig) netcfgEvent.config().get(),
-                                    (InterfaceConfig) netcfgEvent.prevConfig().get());
-                        } else {
-                            log.error("Unhandled config class: {}", configClass);
-                        }
-                    } else if (event.type() == NetworkConfigEvent.Type.CONFIG_REMOVED) {
-                        NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
-                        Class configClass = netcfgEvent.configClass();
-                        if (configClass.equals(SegmentRoutingAppConfig.class)) {
-                            appCfgHandler.processAppConfigRemoved(netcfgEvent);
-                            log.info("App config event .. configuring network");
-                            cfgListener.configureNetwork();
-                        } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
-                            // TODO Handle sr device config removal
-                            log.info("SegmentRoutingDeviceConfig removal is not handled in current implementation");
-                        } else if (configClass.equals(XConnectConfig.class)) {
-                            xConnectHandler.processXConnectConfigRemoved(netcfgEvent);
-                        } else if (configClass.equals(InterfaceConfig.class)) {
-                            // TODO Handle interface removal
-                            log.info("InterfaceConfig removal is not handled in current implementation");
-                        } else {
-                            log.error("Unhandled config class: {}", configClass);
-                        }
+            try {
+                // TODO We should also change SR routing and PW to listen to TopologyEvents
+                if (event.type() == LinkEvent.Type.LINK_ADDED ||
+                        event.type() == LinkEvent.Type.LINK_UPDATED) {
+                    linkHandler.processLinkAdded((Link) event.subject());
+                } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
+                    linkHandler.processLinkRemoved((Link) event.subject());
+                } else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
+                        event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
+                        event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
+                    DeviceId deviceId = ((Device) event.subject()).id();
+                    if (deviceService.isAvailable(deviceId)) {
+                        log.info("** DEVICE UP Processing device event {} "
+                                + "for available device {}",
+                                 event.type(), ((Device) event.subject()).id());
+                        processDeviceAdded((Device) event.subject());
                     } else {
-                        log.warn("Unhandled event type: {}", event.type());
+                        log.info(" ** DEVICE DOWN Processing device event {}"
+                                + " for unavailable device {}",
+                                 event.type(), ((Device) event.subject()).id());
+                        processDeviceRemoved((Device) event.subject());
                     }
-                } catch (Exception e) {
-                    log.error("SegmentRouting event handler thread thrown an exception: {}",
-                              e.getMessage(), e);
+                } else if (event.type() == DeviceEvent.Type.PORT_ADDED) {
+                    // typically these calls come when device is added first time
+                    // so port filtering rules are handled at the device_added event.
+                    // port added calls represent all ports on the device,
+                    // enabled or not.
+                    log.trace("** PORT ADDED {}/{} -> {}",
+                              ((DeviceEvent) event).subject().id(),
+                              ((DeviceEvent) event).port().number(),
+                              event.type());
+                } else if (event.type() == DeviceEvent.Type.PORT_UPDATED) {
+                    // these calls happen for every subsequent event
+                    // ports enabled, disabled, switch goes away, comes back
+                    log.info("** PORT UPDATED {}/{} -> {}",
+                             event.subject(),
+                             ((DeviceEvent) event).port(),
+                             event.type());
+                    processPortUpdated(((Device) event.subject()),
+                                       ((DeviceEvent) event).port());
+                } else if (event.type() == TopologyEvent.Type.TOPOLOGY_CHANGED) {
+                    // Process topology event, needed for all modules relying on
+                    // topology service for path computation
+                    TopologyEvent topologyEvent = (TopologyEvent) event;
+                    log.info("Processing topology event {}, topology age {}, reasons {}",
+                             event.type(), topologyEvent.subject().time(),
+                             topologyEvent.reasons().size());
+                    topologyHandler.processTopologyChange(topologyEvent.reasons());
+                } else if (event.type() == HostEvent.Type.HOST_ADDED) {
+                    hostHandler.processHostAddedEvent((HostEvent) event);
+                } else if (event.type() == HostEvent.Type.HOST_MOVED) {
+                    hostHandler.processHostMovedEvent((HostEvent) event);
+                    routeHandler.processHostMovedEvent((HostEvent) event);
+                } else if (event.type() == HostEvent.Type.HOST_REMOVED) {
+                    hostHandler.processHostRemovedEvent((HostEvent) event);
+                } else if (event.type() == HostEvent.Type.HOST_UPDATED) {
+                    hostHandler.processHostUpdatedEvent((HostEvent) event);
+                } else if (event.type() == RouteEvent.Type.ROUTE_ADDED) {
+                    routeHandler.processRouteAdded((RouteEvent) event);
+                } else if (event.type() == RouteEvent.Type.ROUTE_UPDATED) {
+                    routeHandler.processRouteUpdated((RouteEvent) event);
+                } else if (event.type() == RouteEvent.Type.ROUTE_REMOVED) {
+                    routeHandler.processRouteRemoved((RouteEvent) event);
+                } else if (event.type() == RouteEvent.Type.ALTERNATIVE_ROUTES_CHANGED) {
+                    routeHandler.processAlternativeRoutesChanged((RouteEvent) event);
+                } else if (event.type() == McastEvent.Type.SOURCES_ADDED ||
+                        event.type() == McastEvent.Type.SOURCES_REMOVED ||
+                        event.type() == McastEvent.Type.SINKS_ADDED ||
+                        event.type() == McastEvent.Type.SINKS_REMOVED ||
+                        event.type() == McastEvent.Type.ROUTE_ADDED ||
+                        event.type() == McastEvent.Type.ROUTE_REMOVED) {
+                    mcastHandler.processMcastEvent((McastEvent) event);
+                } else if (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED) {
+                    NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+                    Class configClass = netcfgEvent.configClass();
+                    if (configClass.equals(SegmentRoutingAppConfig.class)) {
+                        appCfgHandler.processAppConfigAdded(netcfgEvent);
+                        log.info("App config event .. configuring network");
+                        cfgListener.configureNetwork();
+                    } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+                        log.info("Segment Routing Device Config added for {}", event.subject());
+                        cfgListener.configureNetwork();
+                    } else if (configClass.equals(XConnectConfig.class)) {
+                        xConnectHandler.processXConnectConfigAdded(netcfgEvent);
+                    } else if (configClass.equals(InterfaceConfig.class)) {
+                        log.info("Interface Config added for {}", event.subject());
+                        cfgListener.configureNetwork();
+                    } else {
+                        log.error("Unhandled config class: {}", configClass);
+                    }
+                } else if (event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) {
+                    NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+                    Class configClass = netcfgEvent.configClass();
+                    if (configClass.equals(SegmentRoutingAppConfig.class)) {
+                        appCfgHandler.processAppConfigUpdated(netcfgEvent);
+                        log.info("App config event .. configuring network");
+                        cfgListener.configureNetwork();
+                    } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+                        log.info("Segment Routing Device Config updated for {}", event.subject());
+                        createOrUpdateDeviceConfiguration();
+                    } else if (configClass.equals(XConnectConfig.class)) {
+                        xConnectHandler.processXConnectConfigUpdated(netcfgEvent);
+                    } else if (configClass.equals(InterfaceConfig.class)) {
+                        log.info("Interface Config updated for {}", event.subject());
+                        createOrUpdateDeviceConfiguration();
+                        updateInterface((InterfaceConfig) netcfgEvent.config().get(),
+                                (InterfaceConfig) netcfgEvent.prevConfig().get());
+                    } else {
+                        log.error("Unhandled config class: {}", configClass);
+                    }
+                } else if (event.type() == NetworkConfigEvent.Type.CONFIG_REMOVED) {
+                    NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+                    Class configClass = netcfgEvent.configClass();
+                    if (configClass.equals(SegmentRoutingAppConfig.class)) {
+                        appCfgHandler.processAppConfigRemoved(netcfgEvent);
+                        log.info("App config event .. configuring network");
+                        cfgListener.configureNetwork();
+                    } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+                        // TODO Handle sr device config removal
+                        log.info("SegmentRoutingDeviceConfig removal is not handled in current implementation");
+                    } else if (configClass.equals(XConnectConfig.class)) {
+                        xConnectHandler.processXConnectConfigRemoved(netcfgEvent);
+                    } else if (configClass.equals(InterfaceConfig.class)) {
+                        // TODO Handle interface removal
+                        log.info("InterfaceConfig removal is not handled in current implementation");
+                    } else {
+                        log.error("Unhandled config class: {}", configClass);
+                    }
+                } else {
+                    log.warn("Unhandled event type: {}", event.type());
                 }
+            } catch (Exception e) {
+                log.error("SegmentRouting event handler thread thrown an exception: {}",
+                          e.getMessage(), e);
             }
         }
     }
@@ -1386,8 +1365,7 @@
             if (!programmingScheduled.get()) {
                 log.info("Buffering config calls for {} secs", PROGRAM_DELAY);
                 programmingScheduled.set(true);
-                executorService.schedule(new ConfigChange(), PROGRAM_DELAY,
-                                         TimeUnit.SECONDS);
+                mainEventExecutor.schedule(new ConfigChange(), PROGRAM_DELAY, TimeUnit.SECONDS);
             }
             mcastHandler.init();
         }
@@ -1401,7 +1379,7 @@
                 case CONFIG_UPDATED:
                 case CONFIG_REMOVED:
                     log.trace("Schedule Network Config event {}", event);
-                    scheduleEventHandlerIfNotScheduled(event);
+                    mainEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
                     break;
                 default:
                     break;
@@ -1447,7 +1425,7 @@
                     event.type() == LinkEvent.Type.LINK_UPDATED ||
                     event.type() == LinkEvent.Type.LINK_REMOVED) {
                 log.trace("Schedule Link event {}", event);
-                scheduleEventHandlerIfNotScheduled(event);
+                mainEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
             }
         }
     }
@@ -1462,7 +1440,7 @@
                 case DEVICE_UPDATED:
                 case DEVICE_AVAILABILITY_CHANGED:
                     log.trace("Schedule Device event {}", event);
-                    scheduleEventHandlerIfNotScheduled(event);
+                    mainEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
                     break;
                 default:
             }
@@ -1475,7 +1453,7 @@
             switch (event.type()) {
                 case TOPOLOGY_CHANGED:
                     log.trace("Schedule Topology event {}", event);
-                    scheduleEventHandlerIfNotScheduled(event);
+                    mainEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
                     break;
                 default:
             }
@@ -1491,7 +1469,7 @@
                 case HOST_REMOVED:
                 case HOST_UPDATED:
                     log.trace("Schedule Host event {}", event);
-                    scheduleEventHandlerIfNotScheduled(event);
+                    hostEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
                     break;
                 default:
                     log.warn("Unsupported host event type: {}", event.type());
@@ -1510,7 +1488,7 @@
                 case SINKS_REMOVED:
                 case ROUTE_REMOVED:
                     log.trace("Schedule Mcast event {}", event);
-                    scheduleEventHandlerIfNotScheduled(event);
+                    mcastEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
                     break;
                 case ROUTE_ADDED:
                 default:
@@ -1529,7 +1507,7 @@
                 case ROUTE_REMOVED:
                 case ALTERNATIVE_ROUTES_CHANGED:
                     log.trace("Schedule Route event {}", event);
-                    scheduleEventHandlerIfNotScheduled(event);
+                    routeEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
                     break;
                 default:
                     log.warn("Unsupported route event type: {}", event.type());