Move all event handler to a different executor
Change-Id: I8e9f84b8f4cab5a5b1746f5279b462a5a2322ac5
diff --git a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index 93665ec..88a81cc 100644
--- a/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/apps/segmentrouting/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -138,7 +138,6 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -253,8 +252,6 @@
private ScheduledExecutorService executorService = Executors
.newScheduledThreadPool(1, groupedThreads("SegmentRoutingManager", "event-%d", log));
- @SuppressWarnings("unused")
- private static ScheduledFuture<?> eventHandlerFuture = null;
@SuppressWarnings("rawtypes")
private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<>();
Map<DeviceId, DefaultGroupHandler> groupHandlerMap =
@@ -1051,51 +1048,6 @@
}
}
- private class InternalLinkListener implements LinkListener {
- @Override
- public void event(LinkEvent event) {
- if (event.type() == LinkEvent.Type.LINK_ADDED ||
- event.type() == LinkEvent.Type.LINK_UPDATED ||
- event.type() == LinkEvent.Type.LINK_REMOVED) {
- log.debug("Event {} received from Link Service", event.type());
- scheduleEventHandlerIfNotScheduled(event);
- }
- }
- }
-
- private class InternalDeviceListener implements DeviceListener {
- @Override
- public void event(DeviceEvent event) {
- switch (event.type()) {
- case DEVICE_ADDED:
- case PORT_UPDATED:
- case PORT_ADDED:
- case DEVICE_UPDATED:
- case DEVICE_AVAILABILITY_CHANGED:
- log.trace("Event {} received from Device Service", event.type());
- scheduleEventHandlerIfNotScheduled(event);
- break;
- default:
- }
- }
- }
-
- /**
- * Internal listener for topology events.
- */
- private class InternalTopologyListener implements TopologyListener {
- @Override
- public void event(TopologyEvent event) {
- switch (event.type()) {
- case TOPOLOGY_CHANGED:
- log.debug("Event {} received from TopologyService", event.type());
- scheduleEventHandlerIfNotScheduled(event);
- break;
- default:
- }
- }
- }
-
@SuppressWarnings("rawtypes")
private void scheduleEventHandlerIfNotScheduled(Event event) {
synchronized (THREAD_SCHED_LOCK) {
@@ -1104,8 +1056,7 @@
if ((numOfHandlerScheduled - numOfHandlerExecution) == 0) {
//No pending scheduled event handling threads. So start a new one.
- eventHandlerFuture = executorService
- .schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
+ executorService.schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
numOfHandlerScheduled++;
}
log.trace("numOfEventsQueued {}, numOfEventHandlerScheduled {}",
@@ -1179,6 +1130,85 @@
event.type(), topologyEvent.subject().time(),
topologyEvent.reasons().size());
topologyHandler.processTopologyChange(topologyEvent.reasons());
+ } else if (event.type() == HostEvent.Type.HOST_ADDED) {
+ hostHandler.processHostAddedEvent((HostEvent) event);
+ } else if (event.type() == HostEvent.Type.HOST_MOVED) {
+ hostHandler.processHostMovedEvent((HostEvent) event);
+ routeHandler.processHostMovedEvent((HostEvent) event);
+ } else if (event.type() == HostEvent.Type.HOST_REMOVED) {
+ hostHandler.processHostRemovedEvent((HostEvent) event);
+ } else if (event.type() == HostEvent.Type.HOST_UPDATED) {
+ hostHandler.processHostUpdatedEvent((HostEvent) event);
+ } else if (event.type() == RouteEvent.Type.ROUTE_ADDED) {
+ routeHandler.processRouteAdded((RouteEvent) event);
+ } else if (event.type() == RouteEvent.Type.ROUTE_UPDATED) {
+ routeHandler.processRouteUpdated((RouteEvent) event);
+ } else if (event.type() == RouteEvent.Type.ROUTE_REMOVED) {
+ routeHandler.processRouteRemoved((RouteEvent) event);
+ } else if (event.type() == RouteEvent.Type.ALTERNATIVE_ROUTES_CHANGED) {
+ routeHandler.processAlternativeRoutesChanged((RouteEvent) event);
+ } else if (event.type() == McastEvent.Type.SOURCE_ADDED ||
+ event.type() == McastEvent.Type.SOURCE_UPDATED ||
+ event.type() == McastEvent.Type.SINK_ADDED ||
+ event.type() == McastEvent.Type.SINK_REMOVED ||
+ event.type() == McastEvent.Type.ROUTE_REMOVED) {
+ mcastHandler.processMcastEvent((McastEvent) event);
+ } else if (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED) {
+ NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+ Class configClass = netcfgEvent.configClass();
+ if (configClass.equals(SegmentRoutingAppConfig.class)) {
+ appCfgHandler.processAppConfigAdded(netcfgEvent);
+ log.info("App config event .. configuring network");
+ cfgListener.configureNetwork();
+ } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+ log.info("Segment Routing Device Config added for {}", event.subject());
+ cfgListener.configureNetwork();
+ } else if (configClass.equals(XConnectConfig.class)) {
+ xConnectHandler.processXConnectConfigAdded(netcfgEvent);
+ } else if (configClass.equals(InterfaceConfig.class)) {
+ log.info("Interface Config added for {}", event.subject());
+ cfgListener.configureNetwork();
+ } else {
+ log.error("Unhandled config class: {}", configClass);
+ }
+ } else if (event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) {
+ NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+ Class configClass = netcfgEvent.configClass();
+ if (configClass.equals(SegmentRoutingAppConfig.class)) {
+ appCfgHandler.processAppConfigUpdated(netcfgEvent);
+ log.info("App config event .. configuring network");
+ cfgListener.configureNetwork();
+ } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+ log.info("Segment Routing Device Config updated for {}", event.subject());
+ createOrUpdateDeviceConfiguration();
+ } else if (configClass.equals(XConnectConfig.class)) {
+ xConnectHandler.processXConnectConfigUpdated(netcfgEvent);
+ } else if (configClass.equals(InterfaceConfig.class)) {
+ log.info("Interface Config updated for {}", event.subject());
+ createOrUpdateDeviceConfiguration();
+ updateInterface((InterfaceConfig) netcfgEvent.config().get(),
+ (InterfaceConfig) netcfgEvent.prevConfig().get());
+ } else {
+ log.error("Unhandled config class: {}", configClass);
+ }
+ } else if (event.type() == NetworkConfigEvent.Type.CONFIG_REMOVED) {
+ NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+ Class configClass = netcfgEvent.configClass();
+ if (configClass.equals(SegmentRoutingAppConfig.class)) {
+ appCfgHandler.processAppConfigRemoved(netcfgEvent);
+ log.info("App config event .. configuring network");
+ cfgListener.configureNetwork();
+ } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+ // TODO Handle sr device config removal
+ log.info("SegmentRoutingDeviceConfig removal is not handled in current implementation");
+ } else if (configClass.equals(XConnectConfig.class)) {
+ xConnectHandler.processXConnectConfigRemoved(netcfgEvent);
+ } else if (configClass.equals(InterfaceConfig.class)) {
+ // TODO Handle interface removal
+ log.info("InterfaceConfig removal is not handled in current implementation");
+ } else {
+ log.error("Unhandled config class: {}", configClass);
+ }
} else {
log.warn("Unhandled event type: {}", event.type());
}
@@ -1414,72 +1444,17 @@
@Override
public void event(NetworkConfigEvent event) {
- // TODO move this part to NetworkConfigEventHandler
- if (event.configClass().equals(SegmentRoutingDeviceConfig.class)) {
- switch (event.type()) {
- case CONFIG_ADDED:
- log.info("Segment Routing Device Config added for {}", event.subject());
- configureNetwork();
- break;
- case CONFIG_UPDATED:
- log.info("Segment Routing Config updated for {}", event.subject());
- createOrUpdateDeviceConfiguration();
- // TODO support dynamic configuration
- break;
- default:
- break;
- }
- } else if (event.configClass().equals(InterfaceConfig.class)) {
- switch (event.type()) {
- case CONFIG_ADDED:
- log.info("Interface Config added for {}", event.subject());
- configureNetwork();
- break;
- case CONFIG_UPDATED:
- log.info("Interface Config updated for {}", event.subject());
- createOrUpdateDeviceConfiguration();
-
- // Following code will be uncommented when [CORD-634] is fully implemented.
- // [CORD-634] Add dynamic config support for interfaces
- updateInterface((InterfaceConfig) event.config().get(),
- (InterfaceConfig) event.prevConfig().get());
- // TODO support dynamic configuration
- break;
- default:
- break;
- }
- } else if (event.configClass().equals(SegmentRoutingAppConfig.class)) {
- checkState(appCfgHandler != null, "NetworkConfigEventHandler is not initialized");
- switch (event.type()) {
- case CONFIG_ADDED:
- appCfgHandler.processAppConfigAdded(event);
- break;
- case CONFIG_UPDATED:
- appCfgHandler.processAppConfigUpdated(event);
- break;
- case CONFIG_REMOVED:
- appCfgHandler.processAppConfigRemoved(event);
- break;
- default:
- break;
- }
- log.info("App config event .. configuring network");
- configureNetwork();
- } else if (event.configClass().equals(XConnectConfig.class)) {
- checkState(xConnectHandler != null, "XConnectHandler is not initialized");
- switch (event.type()) {
- case CONFIG_ADDED:
- xConnectHandler.processXConnectConfigAdded(event);
- break;
- case CONFIG_UPDATED:
- xConnectHandler.processXConnectConfigUpdated(event);
- break;
- case CONFIG_REMOVED:
- xConnectHandler.processXConnectConfigRemoved(event);
- break;
- default:
- break;
- }
+ checkState(appCfgHandler != null, "NetworkConfigEventHandler is not initialized");
+ checkState(xConnectHandler != null, "XConnectHandler is not initialized");
+ switch (event.type()) {
+ case CONFIG_ADDED:
+ case CONFIG_UPDATED:
+ case CONFIG_REMOVED:
+ log.trace("Schedule Network Config event {}", event);
+ scheduleEventHandlerIfNotScheduled(event);
+ break;
+ default:
+ break;
}
}
@@ -1515,22 +1490,58 @@
}
}
+ private class InternalLinkListener implements LinkListener {
+ @Override
+ public void event(LinkEvent event) {
+ if (event.type() == LinkEvent.Type.LINK_ADDED ||
+ event.type() == LinkEvent.Type.LINK_UPDATED ||
+ event.type() == LinkEvent.Type.LINK_REMOVED) {
+ log.trace("Schedule Link event {}", event);
+ scheduleEventHandlerIfNotScheduled(event);
+ }
+ }
+ }
+
+ private class InternalDeviceListener implements DeviceListener {
+ @Override
+ public void event(DeviceEvent event) {
+ switch (event.type()) {
+ case DEVICE_ADDED:
+ case PORT_UPDATED:
+ case PORT_ADDED:
+ case DEVICE_UPDATED:
+ case DEVICE_AVAILABILITY_CHANGED:
+ log.trace("Schedule Device event {}", event);
+ scheduleEventHandlerIfNotScheduled(event);
+ break;
+ default:
+ }
+ }
+ }
+
+ private class InternalTopologyListener implements TopologyListener {
+ @Override
+ public void event(TopologyEvent event) {
+ switch (event.type()) {
+ case TOPOLOGY_CHANGED:
+ log.trace("Schedule Topology event {}", event);
+ scheduleEventHandlerIfNotScheduled(event);
+ break;
+ default:
+ }
+ }
+ }
+
private class InternalHostListener implements HostListener {
@Override
public void event(HostEvent event) {
switch (event.type()) {
case HOST_ADDED:
- hostHandler.processHostAddedEvent(event);
- break;
case HOST_MOVED:
- hostHandler.processHostMovedEvent(event);
- routeHandler.processHostMovedEvent(event);
- break;
case HOST_REMOVED:
- hostHandler.processHostRemovedEvent(event);
- break;
case HOST_UPDATED:
- hostHandler.processHostUpdatedEvent(event);
+ log.trace("Schedule Host event {}", event);
+ scheduleEventHandlerIfNotScheduled(event);
break;
default:
log.warn("Unsupported host event type: {}", event.type());
@@ -1548,10 +1559,12 @@
case SINK_ADDED:
case SINK_REMOVED:
case ROUTE_REMOVED:
- mcastHandler.processMcastEvent(event);
+ log.trace("Schedule Mcast event {}", event);
+ scheduleEventHandlerIfNotScheduled(event);
break;
case ROUTE_ADDED:
default:
+ log.warn("Unsupported mcast event type: {}", event.type());
break;
}
}
@@ -1562,18 +1575,14 @@
public void event(RouteEvent event) {
switch (event.type()) {
case ROUTE_ADDED:
- routeHandler.processRouteAdded(event);
- break;
case ROUTE_UPDATED:
- routeHandler.processRouteUpdated(event);
- break;
case ROUTE_REMOVED:
- routeHandler.processRouteRemoved(event);
- break;
case ALTERNATIVE_ROUTES_CHANGED:
- routeHandler.processAlternativeRoutesChanged(event);
+ log.trace("Schedule Route event {}", event);
+ scheduleEventHandlerIfNotScheduled(event);
break;
default:
+ log.warn("Unsupported route event type: {}", event.type());
break;
}
}