Handle host, route and mcast events in separate executors
Change-Id: Ic9c4964533354e965691e6db5fa323df91cb4124
diff --git a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
index c17df68..7a857ac 100644
--- a/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
+++ b/app/src/main/java/org/onosproject/segmentrouting/SegmentRoutingManager.java
@@ -135,7 +135,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -241,20 +240,26 @@
private SegmentRoutingNeighbourDispatcher neighbourHandler = null;
private DefaultL2TunnelHandler l2TunnelHandler = null;
private TopologyHandler topologyHandler = null;
- private InternalEventHandler eventHandler = new InternalEventHandler();
private final InternalHostListener hostListener = new InternalHostListener();
private final InternalConfigListener cfgListener = new InternalConfigListener(this);
private final InternalMcastListener mcastListener = new InternalMcastListener();
private final InternalRouteEventListener routeListener = new InternalRouteEventListener();
private final InternalTopologyListener topologyListener = new InternalTopologyListener();
- private ScheduledExecutorService executorService = Executors
- .newScheduledThreadPool(1, groupedThreads("SegmentRoutingManager", "event-%d", log));
+ // Handles device, link, topology and network config events
+ private ScheduledExecutorService mainEventExecutor = Executors
+ .newScheduledThreadPool(1, groupedThreads("sr-event-main", "%d", log));
- @SuppressWarnings("rawtypes")
- private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<>();
- Map<DeviceId, DefaultGroupHandler> groupHandlerMap =
- new ConcurrentHashMap<>();
+ // Handles host, route, mcast events
+ private ScheduledExecutorService hostEventExecutor = Executors
+ .newScheduledThreadPool(1, groupedThreads("sr-event-host", "%d", log));
+ private ScheduledExecutorService routeEventExecutor = Executors
+ .newScheduledThreadPool(1, groupedThreads("sr-event-route", "%d", log));
+ private ScheduledExecutorService mcastEventExecutor = Executors
+ .newScheduledThreadPool(1, groupedThreads("sr-event-mcast", "%d", log));
+
+
+ Map<DeviceId, DefaultGroupHandler> groupHandlerMap = new ConcurrentHashMap<>();
/**
* Per device next objective ID store with (device id + destination set) as key.
* Used to keep track on MPLS group information.
@@ -997,175 +1002,149 @@
}
}
- @SuppressWarnings("rawtypes")
- private void scheduleEventHandlerIfNotScheduled(Event event) {
- synchronized (THREAD_SCHED_LOCK) {
- eventQueue.add(event);
- numOfEventsQueued++;
-
- if ((numOfHandlerScheduled - numOfHandlerExecution) == 0) {
- //No pending scheduled event handling threads. So start a new one.
- executorService.schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
- numOfHandlerScheduled++;
- }
- log.trace("numOfEventsQueued {}, numOfEventHandlerScheduled {}",
- numOfEventsQueued,
- numOfHandlerScheduled);
- }
- }
-
private class InternalEventHandler implements Runnable {
+ private Event event;
+
+ InternalEventHandler(Event event) {
+ this.event = event;
+ }
+
@Override
public void run() {
- while (true) {
- try {
- @SuppressWarnings("rawtypes")
- Event event;
- synchronized (THREAD_SCHED_LOCK) {
- if (!eventQueue.isEmpty()) {
- event = eventQueue.poll();
- numOfEventsExecuted++;
- } else {
- numOfHandlerExecution++;
- log.debug("numOfHandlerExecution {} numOfEventsExecuted {}",
- numOfHandlerExecution, numOfEventsExecuted);
- break;
- }
- }
- // TODO We should also change SR routing and PW to listen to TopologyEvents
- if (event.type() == LinkEvent.Type.LINK_ADDED ||
- event.type() == LinkEvent.Type.LINK_UPDATED) {
- linkHandler.processLinkAdded((Link) event.subject());
- } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
- linkHandler.processLinkRemoved((Link) event.subject());
- } else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
- event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
- event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
- DeviceId deviceId = ((Device) event.subject()).id();
- if (deviceService.isAvailable(deviceId)) {
- log.info("** DEVICE UP Processing device event {} "
- + "for available device {}",
- event.type(), ((Device) event.subject()).id());
- processDeviceAdded((Device) event.subject());
- } else {
- log.info(" ** DEVICE DOWN Processing device event {}"
- + " for unavailable device {}",
- event.type(), ((Device) event.subject()).id());
- processDeviceRemoved((Device) event.subject());
- }
- } else if (event.type() == DeviceEvent.Type.PORT_ADDED) {
- // typically these calls come when device is added first time
- // so port filtering rules are handled at the device_added event.
- // port added calls represent all ports on the device,
- // enabled or not.
- log.trace("** PORT ADDED {}/{} -> {}",
- ((DeviceEvent) event).subject().id(),
- ((DeviceEvent) event).port().number(),
- event.type());
- } else if (event.type() == DeviceEvent.Type.PORT_UPDATED) {
- // these calls happen for every subsequent event
- // ports enabled, disabled, switch goes away, comes back
- log.info("** PORT UPDATED {}/{} -> {}",
- event.subject(),
- ((DeviceEvent) event).port(),
- event.type());
- processPortUpdated(((Device) event.subject()),
- ((DeviceEvent) event).port());
- } else if (event.type() == TopologyEvent.Type.TOPOLOGY_CHANGED) {
- // Process topology event, needed for all modules relying on
- // topology service for path computation
- TopologyEvent topologyEvent = (TopologyEvent) event;
- log.info("Processing topology event {}, topology age {}, reasons {}",
- event.type(), topologyEvent.subject().time(),
- topologyEvent.reasons().size());
- topologyHandler.processTopologyChange(topologyEvent.reasons());
- } else if (event.type() == HostEvent.Type.HOST_ADDED) {
- hostHandler.processHostAddedEvent((HostEvent) event);
- } else if (event.type() == HostEvent.Type.HOST_MOVED) {
- hostHandler.processHostMovedEvent((HostEvent) event);
- routeHandler.processHostMovedEvent((HostEvent) event);
- } else if (event.type() == HostEvent.Type.HOST_REMOVED) {
- hostHandler.processHostRemovedEvent((HostEvent) event);
- } else if (event.type() == HostEvent.Type.HOST_UPDATED) {
- hostHandler.processHostUpdatedEvent((HostEvent) event);
- } else if (event.type() == RouteEvent.Type.ROUTE_ADDED) {
- routeHandler.processRouteAdded((RouteEvent) event);
- } else if (event.type() == RouteEvent.Type.ROUTE_UPDATED) {
- routeHandler.processRouteUpdated((RouteEvent) event);
- } else if (event.type() == RouteEvent.Type.ROUTE_REMOVED) {
- routeHandler.processRouteRemoved((RouteEvent) event);
- } else if (event.type() == RouteEvent.Type.ALTERNATIVE_ROUTES_CHANGED) {
- routeHandler.processAlternativeRoutesChanged((RouteEvent) event);
- } else if (event.type() == McastEvent.Type.SOURCES_ADDED ||
- event.type() == McastEvent.Type.SOURCES_REMOVED ||
- event.type() == McastEvent.Type.SINKS_ADDED ||
- event.type() == McastEvent.Type.SINKS_REMOVED ||
- event.type() == McastEvent.Type.ROUTE_ADDED ||
- event.type() == McastEvent.Type.ROUTE_REMOVED) {
- mcastHandler.processMcastEvent((McastEvent) event);
- } else if (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED) {
- NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
- Class configClass = netcfgEvent.configClass();
- if (configClass.equals(SegmentRoutingAppConfig.class)) {
- appCfgHandler.processAppConfigAdded(netcfgEvent);
- log.info("App config event .. configuring network");
- cfgListener.configureNetwork();
- } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
- log.info("Segment Routing Device Config added for {}", event.subject());
- cfgListener.configureNetwork();
- } else if (configClass.equals(XConnectConfig.class)) {
- xConnectHandler.processXConnectConfigAdded(netcfgEvent);
- } else if (configClass.equals(InterfaceConfig.class)) {
- log.info("Interface Config added for {}", event.subject());
- cfgListener.configureNetwork();
- } else {
- log.error("Unhandled config class: {}", configClass);
- }
- } else if (event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) {
- NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
- Class configClass = netcfgEvent.configClass();
- if (configClass.equals(SegmentRoutingAppConfig.class)) {
- appCfgHandler.processAppConfigUpdated(netcfgEvent);
- log.info("App config event .. configuring network");
- cfgListener.configureNetwork();
- } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
- log.info("Segment Routing Device Config updated for {}", event.subject());
- createOrUpdateDeviceConfiguration();
- } else if (configClass.equals(XConnectConfig.class)) {
- xConnectHandler.processXConnectConfigUpdated(netcfgEvent);
- } else if (configClass.equals(InterfaceConfig.class)) {
- log.info("Interface Config updated for {}", event.subject());
- createOrUpdateDeviceConfiguration();
- updateInterface((InterfaceConfig) netcfgEvent.config().get(),
- (InterfaceConfig) netcfgEvent.prevConfig().get());
- } else {
- log.error("Unhandled config class: {}", configClass);
- }
- } else if (event.type() == NetworkConfigEvent.Type.CONFIG_REMOVED) {
- NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
- Class configClass = netcfgEvent.configClass();
- if (configClass.equals(SegmentRoutingAppConfig.class)) {
- appCfgHandler.processAppConfigRemoved(netcfgEvent);
- log.info("App config event .. configuring network");
- cfgListener.configureNetwork();
- } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
- // TODO Handle sr device config removal
- log.info("SegmentRoutingDeviceConfig removal is not handled in current implementation");
- } else if (configClass.equals(XConnectConfig.class)) {
- xConnectHandler.processXConnectConfigRemoved(netcfgEvent);
- } else if (configClass.equals(InterfaceConfig.class)) {
- // TODO Handle interface removal
- log.info("InterfaceConfig removal is not handled in current implementation");
- } else {
- log.error("Unhandled config class: {}", configClass);
- }
+ try {
+ // TODO We should also change SR routing and PW to listen to TopologyEvents
+ if (event.type() == LinkEvent.Type.LINK_ADDED ||
+ event.type() == LinkEvent.Type.LINK_UPDATED) {
+ linkHandler.processLinkAdded((Link) event.subject());
+ } else if (event.type() == LinkEvent.Type.LINK_REMOVED) {
+ linkHandler.processLinkRemoved((Link) event.subject());
+ } else if (event.type() == DeviceEvent.Type.DEVICE_ADDED ||
+ event.type() == DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED ||
+ event.type() == DeviceEvent.Type.DEVICE_UPDATED) {
+ DeviceId deviceId = ((Device) event.subject()).id();
+ if (deviceService.isAvailable(deviceId)) {
+ log.info("** DEVICE UP Processing device event {} "
+ + "for available device {}",
+ event.type(), ((Device) event.subject()).id());
+ processDeviceAdded((Device) event.subject());
} else {
- log.warn("Unhandled event type: {}", event.type());
+ log.info(" ** DEVICE DOWN Processing device event {}"
+ + " for unavailable device {}",
+ event.type(), ((Device) event.subject()).id());
+ processDeviceRemoved((Device) event.subject());
}
- } catch (Exception e) {
- log.error("SegmentRouting event handler thread thrown an exception: {}",
- e.getMessage(), e);
+ } else if (event.type() == DeviceEvent.Type.PORT_ADDED) {
+ // typically these calls come when device is added first time
+ // so port filtering rules are handled at the device_added event.
+ // port added calls represent all ports on the device,
+ // enabled or not.
+ log.trace("** PORT ADDED {}/{} -> {}",
+ ((DeviceEvent) event).subject().id(),
+ ((DeviceEvent) event).port().number(),
+ event.type());
+ } else if (event.type() == DeviceEvent.Type.PORT_UPDATED) {
+ // these calls happen for every subsequent event
+ // ports enabled, disabled, switch goes away, comes back
+ log.info("** PORT UPDATED {}/{} -> {}",
+ event.subject(),
+ ((DeviceEvent) event).port(),
+ event.type());
+ processPortUpdated(((Device) event.subject()),
+ ((DeviceEvent) event).port());
+ } else if (event.type() == TopologyEvent.Type.TOPOLOGY_CHANGED) {
+ // Process topology event, needed for all modules relying on
+ // topology service for path computation
+ TopologyEvent topologyEvent = (TopologyEvent) event;
+ log.info("Processing topology event {}, topology age {}, reasons {}",
+ event.type(), topologyEvent.subject().time(),
+ topologyEvent.reasons().size());
+ topologyHandler.processTopologyChange(topologyEvent.reasons());
+ } else if (event.type() == HostEvent.Type.HOST_ADDED) {
+ hostHandler.processHostAddedEvent((HostEvent) event);
+ } else if (event.type() == HostEvent.Type.HOST_MOVED) {
+ hostHandler.processHostMovedEvent((HostEvent) event);
+ routeHandler.processHostMovedEvent((HostEvent) event);
+ } else if (event.type() == HostEvent.Type.HOST_REMOVED) {
+ hostHandler.processHostRemovedEvent((HostEvent) event);
+ } else if (event.type() == HostEvent.Type.HOST_UPDATED) {
+ hostHandler.processHostUpdatedEvent((HostEvent) event);
+ } else if (event.type() == RouteEvent.Type.ROUTE_ADDED) {
+ routeHandler.processRouteAdded((RouteEvent) event);
+ } else if (event.type() == RouteEvent.Type.ROUTE_UPDATED) {
+ routeHandler.processRouteUpdated((RouteEvent) event);
+ } else if (event.type() == RouteEvent.Type.ROUTE_REMOVED) {
+ routeHandler.processRouteRemoved((RouteEvent) event);
+ } else if (event.type() == RouteEvent.Type.ALTERNATIVE_ROUTES_CHANGED) {
+ routeHandler.processAlternativeRoutesChanged((RouteEvent) event);
+ } else if (event.type() == McastEvent.Type.SOURCES_ADDED ||
+ event.type() == McastEvent.Type.SOURCES_REMOVED ||
+ event.type() == McastEvent.Type.SINKS_ADDED ||
+ event.type() == McastEvent.Type.SINKS_REMOVED ||
+ event.type() == McastEvent.Type.ROUTE_ADDED ||
+ event.type() == McastEvent.Type.ROUTE_REMOVED) {
+ mcastHandler.processMcastEvent((McastEvent) event);
+ } else if (event.type() == NetworkConfigEvent.Type.CONFIG_ADDED) {
+ NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+ Class configClass = netcfgEvent.configClass();
+ if (configClass.equals(SegmentRoutingAppConfig.class)) {
+ appCfgHandler.processAppConfigAdded(netcfgEvent);
+ log.info("App config event .. configuring network");
+ cfgListener.configureNetwork();
+ } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+ log.info("Segment Routing Device Config added for {}", event.subject());
+ cfgListener.configureNetwork();
+ } else if (configClass.equals(XConnectConfig.class)) {
+ xConnectHandler.processXConnectConfigAdded(netcfgEvent);
+ } else if (configClass.equals(InterfaceConfig.class)) {
+ log.info("Interface Config added for {}", event.subject());
+ cfgListener.configureNetwork();
+ } else {
+ log.error("Unhandled config class: {}", configClass);
+ }
+ } else if (event.type() == NetworkConfigEvent.Type.CONFIG_UPDATED) {
+ NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+ Class configClass = netcfgEvent.configClass();
+ if (configClass.equals(SegmentRoutingAppConfig.class)) {
+ appCfgHandler.processAppConfigUpdated(netcfgEvent);
+ log.info("App config event .. configuring network");
+ cfgListener.configureNetwork();
+ } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+ log.info("Segment Routing Device Config updated for {}", event.subject());
+ createOrUpdateDeviceConfiguration();
+ } else if (configClass.equals(XConnectConfig.class)) {
+ xConnectHandler.processXConnectConfigUpdated(netcfgEvent);
+ } else if (configClass.equals(InterfaceConfig.class)) {
+ log.info("Interface Config updated for {}", event.subject());
+ createOrUpdateDeviceConfiguration();
+ updateInterface((InterfaceConfig) netcfgEvent.config().get(),
+ (InterfaceConfig) netcfgEvent.prevConfig().get());
+ } else {
+ log.error("Unhandled config class: {}", configClass);
+ }
+ } else if (event.type() == NetworkConfigEvent.Type.CONFIG_REMOVED) {
+ NetworkConfigEvent netcfgEvent = (NetworkConfigEvent) event;
+ Class configClass = netcfgEvent.configClass();
+ if (configClass.equals(SegmentRoutingAppConfig.class)) {
+ appCfgHandler.processAppConfigRemoved(netcfgEvent);
+ log.info("App config event .. configuring network");
+ cfgListener.configureNetwork();
+ } else if (configClass.equals(SegmentRoutingDeviceConfig.class)) {
+ // TODO Handle sr device config removal
+ log.info("SegmentRoutingDeviceConfig removal is not handled in current implementation");
+ } else if (configClass.equals(XConnectConfig.class)) {
+ xConnectHandler.processXConnectConfigRemoved(netcfgEvent);
+ } else if (configClass.equals(InterfaceConfig.class)) {
+ // TODO Handle interface removal
+ log.info("InterfaceConfig removal is not handled in current implementation");
+ } else {
+ log.error("Unhandled config class: {}", configClass);
+ }
+ } else {
+ log.warn("Unhandled event type: {}", event.type());
}
+ } catch (Exception e) {
+ log.error("SegmentRouting event handler thread thrown an exception: {}",
+ e.getMessage(), e);
}
}
}
@@ -1386,8 +1365,7 @@
if (!programmingScheduled.get()) {
log.info("Buffering config calls for {} secs", PROGRAM_DELAY);
programmingScheduled.set(true);
- executorService.schedule(new ConfigChange(), PROGRAM_DELAY,
- TimeUnit.SECONDS);
+ mainEventExecutor.schedule(new ConfigChange(), PROGRAM_DELAY, TimeUnit.SECONDS);
}
mcastHandler.init();
}
@@ -1401,7 +1379,7 @@
case CONFIG_UPDATED:
case CONFIG_REMOVED:
log.trace("Schedule Network Config event {}", event);
- scheduleEventHandlerIfNotScheduled(event);
+ mainEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
break;
default:
break;
@@ -1447,7 +1425,7 @@
event.type() == LinkEvent.Type.LINK_UPDATED ||
event.type() == LinkEvent.Type.LINK_REMOVED) {
log.trace("Schedule Link event {}", event);
- scheduleEventHandlerIfNotScheduled(event);
+ mainEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
}
}
}
@@ -1462,7 +1440,7 @@
case DEVICE_UPDATED:
case DEVICE_AVAILABILITY_CHANGED:
log.trace("Schedule Device event {}", event);
- scheduleEventHandlerIfNotScheduled(event);
+ mainEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
break;
default:
}
@@ -1475,7 +1453,7 @@
switch (event.type()) {
case TOPOLOGY_CHANGED:
log.trace("Schedule Topology event {}", event);
- scheduleEventHandlerIfNotScheduled(event);
+ mainEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
break;
default:
}
@@ -1491,7 +1469,7 @@
case HOST_REMOVED:
case HOST_UPDATED:
log.trace("Schedule Host event {}", event);
- scheduleEventHandlerIfNotScheduled(event);
+ hostEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
break;
default:
log.warn("Unsupported host event type: {}", event.type());
@@ -1510,7 +1488,7 @@
case SINKS_REMOVED:
case ROUTE_REMOVED:
log.trace("Schedule Mcast event {}", event);
- scheduleEventHandlerIfNotScheduled(event);
+ mcastEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
break;
case ROUTE_ADDED:
default:
@@ -1529,7 +1507,7 @@
case ROUTE_REMOVED:
case ALTERNATIVE_ROUTES_CHANGED:
log.trace("Schedule Route event {}", event);
- scheduleEventHandlerIfNotScheduled(event);
+ routeEventExecutor.schedule(new InternalEventHandler(event), 100, TimeUnit.MILLISECONDS);
break;
default:
log.warn("Unsupported route event type: {}", event.type());