Made time limit for event processing configurable; cleaned-up duplicate code.
Change-Id: I08e7f1c9f4cdbd6404f1eb5e3544989e7a728d92
diff --git a/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java b/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
index 8e0de97..5c78bb4 100644
--- a/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
+++ b/core/net/src/main/java/org/onosproject/core/impl/CoreManager.java
@@ -17,12 +17,12 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.apache.felix.scr.annotations.Property;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Modified;
import org.onlab.util.SharedExecutors;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
@@ -32,9 +32,11 @@
import org.onosproject.core.IdBlockStore;
import org.onosproject.core.IdGenerator;
import org.onosproject.core.Version;
+import org.onosproject.event.EventDeliveryService;
import org.osgi.service.component.ComponentContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.io.File;
import java.util.Dictionary;
import java.util.List;
@@ -64,9 +66,18 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService cfgService;
- @Property(name = "sharedThreadPoolSize", intValue = 30,
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected EventDeliveryService eventDeliveryService;
+
+ private static final int DEFAULT_POOL_SIZE = 30;
+ @Property(name = "sharedThreadPoolSize", intValue = DEFAULT_POOL_SIZE,
label = "Configure shared pool maximum size ")
- private int sharedThreadPoolSize = 30;
+ private int sharedThreadPoolSize = DEFAULT_POOL_SIZE;
+
+ private static final int DEFAULT_EVENT_TIME = 2000;
+ @Property(name = "maxEventTimeLimit", intValue = DEFAULT_EVENT_TIME,
+ label = "Maximum number of millis an event sink has to process an event")
+ private int maxEventTimeLimit = DEFAULT_EVENT_TIME;
@Activate
public void activate() {
@@ -121,30 +132,33 @@
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context.getProperties();
- Integer sharedThreadPoolSizeConfig =
- getIntegerProperty(properties, "sharedThreadPoolSize");
- if (sharedThreadPoolSizeConfig == null) {
- log.info("Shared Pool Size is not configured, default value is {}",
- sharedThreadPoolSize);
- } else {
- if (sharedThreadPoolSizeConfig > 0) {
- sharedThreadPoolSize = sharedThreadPoolSizeConfig;
- SharedExecutors.setPoolSize(sharedThreadPoolSize);
- log.info("Configured. Shared Pool Size is configured to {}",
- sharedThreadPoolSize);
- } else {
- log.warn("Shared Pool Size size must be greater than 0");
- }
- }
- }
+ Integer poolSize = getIntegerProperty(properties, "sharedThreadPoolSize");
+ if (poolSize != null && poolSize > 1) {
+ sharedThreadPoolSize = poolSize;
+ SharedExecutors.setPoolSize(sharedThreadPoolSize);
+ } else if (poolSize != null) {
+ log.warn("sharedThreadPoolSize must be greater than 1");
+ }
+
+ Integer timeLimit = getIntegerProperty(properties, "maxEventTimeLimit");
+ if (timeLimit != null && timeLimit > 1) {
+ maxEventTimeLimit = timeLimit;
+ eventDeliveryService.setDispatchTimeLimit(maxEventTimeLimit);
+ } else if (timeLimit != null) {
+ log.warn("maxEventTimeLimit must be greater than 1");
+ }
+
+ log.info("Settings: sharedThreadPoolSize={}, maxEventTimeLimit={}",
+ sharedThreadPoolSize, maxEventTimeLimit);
+ }
/**
* Get Integer property from the propertyName
* Return null if propertyName is not found.
*
- * @param properties properties to be looked up
+ * @param properties properties to be looked up
* @param propertyName the name of the property to look up
* @return value when the propertyName is defined or return null
*/
diff --git a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
index 382846c..6d97460 100644
--- a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
+++ b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
@@ -33,6 +33,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
+import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -45,12 +46,12 @@
public class CoreEventDispatcher extends DefaultEventSinkRegistry
implements EventDeliveryService {
- // Maximum number of millis a sink can take to process an event.
- private static final long MAX_EXECUTE_MS = 1_000;
- private static final long WATCHDOG_MS = MAX_EXECUTE_MS / 4;
-
private final Logger log = getLogger(getClass());
+ // Default number of millis a sink can take to process an event.
+ private static final long DEFAULT_EXECUTE_MS = 2_000; // ms
+ private static final long WATCHDOG_MS = 250; // ms
+
private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
private final ExecutorService executor =
@@ -61,6 +62,7 @@
};
private DispatchLoop dispatchLoop;
+ private long maxProcessMillis = DEFAULT_EXECUTE_MS;
// Means to detect long-running sinks
private TimerTask watchdog;
@@ -92,6 +94,18 @@
log.info("Stopped");
}
+ @Override
+ public void setDispatchTimeLimit(long millis) {
+ checkArgument(millis >= WATCHDOG_MS,
+ "Time limit must be greater than %s", WATCHDOG_MS);
+ maxProcessMillis = millis;
+ }
+
+ @Override
+ public long getDispatchTimeLimit() {
+ return maxProcessMillis;
+ }
+
// Auxiliary event dispatching loop that feeds off the events queue.
private class DispatchLoop implements Runnable {
private volatile boolean stopped;
@@ -126,7 +140,7 @@
lastStart = 0;
} else {
log.warn("No sink registered for event class {}",
- event.getClass());
+ event.getClass().getName());
}
}
@@ -140,7 +154,7 @@
@Override
public void run() {
long delta = System.currentTimeMillis() - lastStart;
- if (lastStart > 0 && delta > MAX_EXECUTE_MS) {
+ if (lastStart > 0 && delta > maxProcessMillis) {
log.error("Event sink {} exceeded execution time limit: {} ms",
lastSink.getClass().getName(), delta);