Move all event handler to a different executor

Change-Id: I8e9f84b8f4cab5a5b1746f5279b462a5a2322ac5
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 93665ec..88a81cc 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -138,7 +138,6 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
@@ -253,8 +252,6 @@
     private ScheduledExecutorService executorService = Executors
             .newScheduledThreadPool(1, groupedThreads("SegmentRoutingManager", "event-%d", log));
 
-    @SuppressWarnings("unused")
-    private static ScheduledFuture<?> eventHandlerFuture = null;
     @SuppressWarnings("rawtypes")
     private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<>();
     Map<DeviceId, DefaultGroupHandler> groupHandlerMap =
@@ -1051,51 +1048,6 @@
         }
     }
 
-    private class InternalLinkListener implements LinkListener {
-        @Override
-        public void event(LinkEvent event) {
-            if (event.type() == LinkEvent.Type.LINK_ADDED ||
-                    event.type() == LinkEvent.Type.LINK_UPDATED ||
-                    event.type() == LinkEvent.Type.LINK_REMOVED) {
-                log.debug("Event {} received from Link Service", event.type());
-                scheduleEventHandlerIfNotScheduled(event);
-            }
-        }
-    }
-
-    private class InternalDeviceListener implements DeviceListener {
-        @Override
-        public void event(DeviceEvent event) {
-            switch (event.type()) {
-            case DEVICE_ADDED:
-            case PORT_UPDATED:
-            case PORT_ADDED:
-            case DEVICE_UPDATED:
-            case DEVICE_AVAILABILITY_CHANGED:
-                log.trace("Event {} received from Device Service", event.type());
-                scheduleEventHandlerIfNotScheduled(event);
-                break;
-            default:
-            }
-        }
-    }
-
-    /**
-     * Internal listener for topology events.
-     */
-    private class InternalTopologyListener implements TopologyListener {
-        @Override
-        public void event(TopologyEvent event) {
-            switch (event.type()) {
-            case TOPOLOGY_CHANGED:
-                log.debug("Event {} received from TopologyService", event.type());
-                scheduleEventHandlerIfNotScheduled(event);
-                break;
-            default:
-            }
-        }
-    }
-
     @SuppressWarnings("rawtypes")
     private void scheduleEventHandlerIfNotScheduled(Event event) {
         synchronized (THREAD_SCHED_LOCK) {
@@ -1104,8 +1056,7 @@
 
             if ((numOfHandlerScheduled - numOfHandlerExecution) == 0) {
                 //No pending scheduled event handling threads. So start a new one.
-                eventHandlerFuture = executorService
-                        .schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
+                executorService.schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
                 numOfHandlerScheduled++;
             }
             log.trace("numOfEventsQueued {}, numOfEventHandlerScheduled {}",
@@ -1179,6 +1130,85 @@
                                  event.type(), topologyEvent.subject().time(),
                                  topologyEvent.reasons().size());
                         topologyHandler.processTopologyChange(topologyEvent.reasons());
+                    } else if (event.type() == HostEvent.Type.HOST_ADDED) {
+                        hostHandler.processHostAddedEvent((HostEvent) event);
+                    } else if (event.type() == HostEvent.Type.HOST_MOVED) {
+                        hostHandler.processHostMovedEvent((HostEvent) event);
+                        routeHandler.processHostMovedEvent((HostEvent) event);
+                    } else if (event.type() == HostEvent.Type.HOST_REMOVED) {
+                        hostHandler.processHostRemovedEvent((HostEvent) event);
+                    } else if (event.type() == HostEvent.Type.HOST_UPDATED) {
+                        hostHandler.processHostUpdatedEvent((HostEvent) event);
+                    } else if (event.type() == RouteEvent.Type.ROUTE_ADDED) {
+                        routeHandler.processRouteAdded((RouteEvent) event);
+                    } else if (event.type() == RouteEvent.Type.ROUTE_UPDATED) {
+                        routeHandler.processRouteUpdated((RouteEvent) event);
+                    } else if (event.type() == RouteEvent.Type.ROUTE_REMOVED) {
+                        routeHandler.processRouteRemoved((RouteEvent) event);
+                    } else if (event.type() == RouteEvent.Type.ALTERNATIVE_ROUTES_CHANGED) {
+                        routeHandler.processAlternativeRoutesChanged((RouteEvent) event);
+                    } else if (event.type() == McastEvent.Type.SOURCE_ADDED ||
+                            event.type() == McastEvent.Type.SOURCE_UPDATED ||
+                            event.type() == McastEvent.Type.SINK_ADDED ||
+                            event.type() == McastEvent.Type.SINK_REMOVED ||
+                            event.type() == McastEvent.Type.ROUTE_REMOVED) {
+                        mcastHandler.processMcastEvent((McastEvent) event);
+                    } else if (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED) {
+                        NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+                        Class configClass = netcfgEvent.configClass();
+                        if (configClass.equals(SegmentRoutingAppConfig.class)) {
+                            appCfgHandler.processAppConfigAdded(netcfgEvent);
+                            log.info("App config event .. configuring network");
+                            cfgListener.configureNetwork();
+                        } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+                            log.info("Segment Routing Device Config added for {}", event.subject());
+                            cfgListener.configureNetwork();
+                        } else if (configClass.equals(XConnectConfig.class)) {
+                            xConnectHandler.processXConnectConfigAdded(netcfgEvent);
+                        } else if (configClass.equals(InterfaceConfig.class)) {
+                            log.info("Interface Config added for {}", event.subject());
+                            cfgListener.configureNetwork();
+                        } else {
+                            log.error("Unhandled config class: {}", configClass);
+                        }
+                    } else if (event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) {
+                        NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+                        Class configClass = netcfgEvent.configClass();
+                        if (configClass.equals(SegmentRoutingAppConfig.class)) {
+                            appCfgHandler.processAppConfigUpdated(netcfgEvent);
+                            log.info("App config event .. configuring network");
+                            cfgListener.configureNetwork();
+                        } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+                            log.info("Segment Routing Device Config updated for {}", event.subject());
+                            createOrUpdateDeviceConfiguration();
+                        } else if (configClass.equals(XConnectConfig.class)) {
+                            xConnectHandler.processXConnectConfigUpdated(netcfgEvent);
+                        } else if (configClass.equals(InterfaceConfig.class)) {
+                            log.info("Interface Config updated for {}", event.subject());
+                            createOrUpdateDeviceConfiguration();
+                            updateInterface((InterfaceConfig) netcfgEvent.config().get(),
+                                    (InterfaceConfig) netcfgEvent.prevConfig().get());
+                        } else {
+                            log.error("Unhandled config class: {}", configClass);
+                        }
+                    } else if (event.type() == NetworkConfigEvent.Type.CONFIG_REMOVED) {
+                        NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+                        Class configClass = netcfgEvent.configClass();
+                        if (configClass.equals(SegmentRoutingAppConfig.class)) {
+                            appCfgHandler.processAppConfigRemoved(netcfgEvent);
+                            log.info("App config event .. configuring network");
+                            cfgListener.configureNetwork();
+                        } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+                            // TODO Handle sr device config removal
+                            log.info("SegmentRoutingDeviceConfig removal is not handled in current implementation");
+                        } else if (configClass.equals(XConnectConfig.class)) {
+                            xConnectHandler.processXConnectConfigRemoved(netcfgEvent);
+                        } else if (configClass.equals(InterfaceConfig.class)) {
+                            // TODO Handle interface removal
+                            log.info("InterfaceConfig removal is not handled in current implementation");
+                        } else {
+                            log.error("Unhandled config class: {}", configClass);
+                        }
                     } else {
                         log.warn("Unhandled event type: {}", event.type());
                     }
@@ -1414,72 +1444,17 @@
 
         @Override
         public void event(NetworkConfigEvent event) {
-            // TODO move this part to NetworkConfigEventHandler
-            if (event.configClass().equals(SegmentRoutingDeviceConfig.class)) {
-                switch (event.type()) {
-                    case CONFIG_ADDED:
-                        log.info("Segment Routing Device Config added for {}", event.subject());
-                        configureNetwork();
-                        break;
-                    case CONFIG_UPDATED:
-                        log.info("Segment Routing Config updated for {}", event.subject());
-                        createOrUpdateDeviceConfiguration();
-                        // TODO support dynamic configuration
-                        break;
-                    default:
-                        break;
-                }
-            } else if (event.configClass().equals(InterfaceConfig.class)) {
-                switch (event.type()) {
-                    case CONFIG_ADDED:
-                        log.info("Interface Config added for {}", event.subject());
-                        configureNetwork();
-                        break;
-                    case CONFIG_UPDATED:
-                        log.info("Interface Config updated for {}", event.subject());
-                        createOrUpdateDeviceConfiguration();
-
-                        // Following code will be uncommented when [CORD-634] is fully implemented.
-                        // [CORD-634] Add dynamic config support for interfaces
-                        updateInterface((InterfaceConfig) event.config().get(),
-                                        (InterfaceConfig) event.prevConfig().get());
-                        // TODO support dynamic configuration
-                        break;
-                    default:
-                        break;
-                }
-            } else if (event.configClass().equals(SegmentRoutingAppConfig.class)) {
-                checkState(appCfgHandler != null, "NetworkConfigEventHandler is not initialized");
-                switch (event.type()) {
-                    case CONFIG_ADDED:
-                        appCfgHandler.processAppConfigAdded(event);
-                        break;
-                    case CONFIG_UPDATED:
-                        appCfgHandler.processAppConfigUpdated(event);
-                        break;
-                    case CONFIG_REMOVED:
-                        appCfgHandler.processAppConfigRemoved(event);
-                        break;
-                    default:
-                        break;
-                }
-                log.info("App config event .. configuring network");
-                configureNetwork();
-            } else if (event.configClass().equals(XConnectConfig.class)) {
-                checkState(xConnectHandler != null, "XConnectHandler is not initialized");
-                switch (event.type()) {
-                    case CONFIG_ADDED:
-                        xConnectHandler.processXConnectConfigAdded(event);
-                        break;
-                    case CONFIG_UPDATED:
-                        xConnectHandler.processXConnectConfigUpdated(event);
-                        break;
-                    case CONFIG_REMOVED:
-                        xConnectHandler.processXConnectConfigRemoved(event);
-                        break;
-                    default:
-                        break;
-                }
+            checkState(appCfgHandler != null, "NetworkConfigEventHandler is not initialized");
+            checkState(xConnectHandler != null, "XConnectHandler is not initialized");
+            switch (event.type()) {
+                case CONFIG_ADDED:
+                case CONFIG_UPDATED:
+                case CONFIG_REMOVED:
+                    log.trace("Schedule Network Config event {}", event);
+                    scheduleEventHandlerIfNotScheduled(event);
+                    break;
+                default:
+                    break;
             }
         }
 
@@ -1515,22 +1490,58 @@
         }
     }
 
+    private class InternalLinkListener implements LinkListener {
+        @Override
+        public void event(LinkEvent event) {
+            if (event.type() == LinkEvent.Type.LINK_ADDED ||
+                    event.type() == LinkEvent.Type.LINK_UPDATED ||
+                    event.type() == LinkEvent.Type.LINK_REMOVED) {
+                log.trace("Schedule Link event {}", event);
+                scheduleEventHandlerIfNotScheduled(event);
+            }
+        }
+    }
+
+    private class InternalDeviceListener implements DeviceListener {
+        @Override
+        public void event(DeviceEvent event) {
+            switch (event.type()) {
+                case DEVICE_ADDED:
+                case PORT_UPDATED:
+                case PORT_ADDED:
+                case DEVICE_UPDATED:
+                case DEVICE_AVAILABILITY_CHANGED:
+                    log.trace("Schedule Device event {}", event);
+                    scheduleEventHandlerIfNotScheduled(event);
+                    break;
+                default:
+            }
+        }
+    }
+
+    private class InternalTopologyListener implements TopologyListener {
+        @Override
+        public void event(TopologyEvent event) {
+            switch (event.type()) {
+                case TOPOLOGY_CHANGED:
+                    log.trace("Schedule Topology event {}", event);
+                    scheduleEventHandlerIfNotScheduled(event);
+                    break;
+                default:
+            }
+        }
+    }
+
     private class InternalHostListener implements HostListener {
         @Override
         public void event(HostEvent event) {
             switch (event.type()) {
                 case HOST_ADDED:
-                    hostHandler.processHostAddedEvent(event);
-                    break;
                 case HOST_MOVED:
-                    hostHandler.processHostMovedEvent(event);
-                    routeHandler.processHostMovedEvent(event);
-                    break;
                 case HOST_REMOVED:
-                    hostHandler.processHostRemovedEvent(event);
-                    break;
                 case HOST_UPDATED:
-                    hostHandler.processHostUpdatedEvent(event);
+                    log.trace("Schedule Host event {}", event);
+                    scheduleEventHandlerIfNotScheduled(event);
                     break;
                 default:
                     log.warn("Unsupported host event type: {}", event.type());
@@ -1548,10 +1559,12 @@
                 case SINK_ADDED:
                 case SINK_REMOVED:
                 case ROUTE_REMOVED:
-                    mcastHandler.processMcastEvent(event);
+                    log.trace("Schedule Mcast event {}", event);
+                    scheduleEventHandlerIfNotScheduled(event);
                     break;
                 case ROUTE_ADDED:
                 default:
+                    log.warn("Unsupported mcast event type: {}", event.type());
                     break;
             }
         }
@@ -1562,18 +1575,14 @@
         public void event(RouteEvent event) {
             switch (event.type()) {
                 case ROUTE_ADDED:
-                    routeHandler.processRouteAdded(event);
-                    break;
                 case ROUTE_UPDATED:
-                    routeHandler.processRouteUpdated(event);
-                    break;
                 case ROUTE_REMOVED:
-                    routeHandler.processRouteRemoved(event);
-                    break;
                 case ALTERNATIVE_ROUTES_CHANGED:
-                    routeHandler.processAlternativeRoutesChanged(event);
+                    log.trace("Schedule Route event {}", event);
+                    scheduleEventHandlerIfNotScheduled(event);
                     break;
                 default:
+                    log.warn("Unsupported route event type: {}", event.type());
                     break;
             }
         }