Added a detection mechanism for long-running or dead-locked event sinks and listeners.
Change-Id: I21308b058902a94c31c34c2ec2878cd13213874e
diff --git a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
index 8850104..382846c 100644
--- a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
+++ b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
@@ -19,6 +19,7 @@
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.SharedExecutors;
import org.onosproject.event.AbstractEvent;
import org.onosproject.event.DefaultEventSinkRegistry;
import org.onosproject.event.Event;
@@ -26,8 +27,10 @@
import org.onosproject.event.EventSink;
import org.slf4j.Logger;
+import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
@@ -42,8 +45,14 @@
public class CoreEventDispatcher extends DefaultEventSinkRegistry
implements EventDeliveryService {
+ // Maximum number of millis a sink can take to process an event.
+ private static final long MAX_EXECUTE_MS = 1_000;
+ private static final long WATCHDOG_MS = MAX_EXECUTE_MS / 4;
+
private final Logger log = getLogger(getClass());
+ private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
+
private final ExecutorService executor =
newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d"));
@@ -51,34 +60,45 @@
private static final Event KILL_PILL = new AbstractEvent(null, 0) {
};
- private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
+ private DispatchLoop dispatchLoop;
- private volatile boolean stopped = false;
+ // Means to detect long-running sinks
+ private TimerTask watchdog;
+ private EventSink lastSink;
+ private long lastStart = 0;
+ private Future<?> dispatchFuture;
@Override
public void post(Event event) {
- events.add(event);
+ if (!events.add(event)) {
+ log.error("Unable to post event {}", event);
+ }
}
@Activate
public void activate() {
- stopped = false;
- executor.execute(new DispatchLoop());
+ dispatchLoop = new DispatchLoop();
+ dispatchFuture = executor.submit(dispatchLoop);
+ watchdog = new Watchdog();
+ SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
log.info("Started");
}
@Deactivate
public void deactivate() {
- stopped = true;
+ dispatchLoop.stop();
+ watchdog.cancel();
post(KILL_PILL);
log.info("Stopped");
}
// Auxiliary event dispatching loop that feeds off the events queue.
private class DispatchLoop implements Runnable {
+ private volatile boolean stopped;
+
@Override
- @SuppressWarnings("unchecked")
public void run() {
+ stopped = false;
log.info("Dispatch loop initiated");
while (!stopped) {
try {
@@ -87,22 +107,52 @@
if (event == KILL_PILL) {
break;
}
-
- // Locate the sink for the event class and use it to
- // process the event
- EventSink sink = getSink(event.getClass());
- if (sink != null) {
- sink.process(event);
- } else {
- log.warn("No sink registered for event class {}",
- event.getClass());
- }
+ process(event);
} catch (Exception e) {
log.warn("Error encountered while dispatching event:", e);
}
}
log.info("Dispatch loop terminated");
}
+
+ // Locate the sink for the event class and use it to process the event
+ @SuppressWarnings("unchecked")
+ private void process(Event event) {
+ EventSink sink = getSink(event.getClass());
+ if (sink != null) {
+ lastSink = sink;
+ lastStart = System.currentTimeMillis();
+ sink.process(event);
+ lastStart = 0;
+ } else {
+ log.warn("No sink registered for event class {}",
+ event.getClass());
+ }
+ }
+
+ void stop() {
+ stopped = true;
+ }
}
+ // Monitors event sinks to make sure none take too long to execute.
+ private class Watchdog extends TimerTask {
+ @Override
+ public void run() {
+ long delta = System.currentTimeMillis() - lastStart;
+ if (lastStart > 0 && delta > MAX_EXECUTE_MS) {
+ log.error("Event sink {} exceeded execution time limit: {} ms",
+ lastSink.getClass().getName(), delta);
+
+ // Notify the sink that it has exceeded its time limit.
+ lastSink.onProcessLimit();
+
+ // Cancel the old dispatch loop and submit a new one.
+ dispatchLoop.stop();
+ dispatchLoop = new DispatchLoop();
+ dispatchFuture.cancel(true);
+ dispatchFuture = executor.submit(dispatchLoop);
+ }
+ }
+ }
}