ONOS-3600 - use separate dispatch queues for different classes of events
Change-Id: I139a3f4eb58db233ac009d03664281eefe357157
diff --git a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
index e44709c..8f0114d 100644
--- a/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
+++ b/core/net/src/main/java/org/onosproject/event/impl/CoreEventDispatcher.java
@@ -15,6 +15,15 @@
*/
package org.onosproject.event.impl;
+import java.util.Map;
+import java.util.Set;
+import java.util.TimerTask;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -25,16 +34,17 @@
import org.onosproject.event.Event;
import org.onosproject.event.EventDeliveryService;
import org.onosproject.event.EventSink;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.host.HostEvent;
+import org.onosproject.net.intent.IntentEvent;
+import org.onosproject.net.link.LinkEvent;
+import org.onosproject.net.topology.TopologyEvent;
import org.slf4j.Logger;
import com.google.common.base.Stopwatch;
-
-import java.util.TimerTask;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
@@ -53,44 +63,58 @@
private final Logger log = getLogger(getClass());
- private boolean executionTimeLimit = false;
+
+ private DispatchLoop topologyDispatcher = new DispatchLoop("topology");
+ private DispatchLoop programmingDispatcher = new DispatchLoop("programming");
+ private DispatchLoop defaultDispatcher = new DispatchLoop("default");
+
+ private Map<Class, DispatchLoop> dispatcherMap =
+ new ImmutableMap.Builder<Class, DispatchLoop>()
+ .put(TopologyEvent.class, topologyDispatcher)
+ .put(DeviceEvent.class, topologyDispatcher)
+ .put(LinkEvent.class, topologyDispatcher)
+ .put(HostEvent.class, topologyDispatcher)
+ .put(FlowRuleEvent.class, programmingDispatcher)
+ .put(IntentEvent.class, programmingDispatcher)
+ .build();
+
+ private Set<DispatchLoop> dispatchers =
+ new ImmutableSet.Builder<DispatchLoop>()
+ .addAll(dispatcherMap.values())
+ .add(defaultDispatcher)
+ .build();
// Default number of millis a sink can take to process an event.
private static final long DEFAULT_EXECUTE_MS = 5_000; // ms
private static final long WATCHDOG_MS = 250; // ms
- private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
-
- private final ExecutorService executor =
- newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d", log));
-
@SuppressWarnings("unchecked")
private static final Event KILL_PILL = new AbstractEvent(null, 0) {
};
- private volatile DispatchLoop dispatchLoop;
private long maxProcessMillis = DEFAULT_EXECUTE_MS;
- // Means to detect long-running sinks
- private TimerTask watchdog;
- private volatile EventSink lastSink;
- private final Stopwatch stopwatch = Stopwatch.createUnstarted();
- private volatile Future<?> dispatchFuture;
+ private DispatchLoop getDispatcher(Event event) {
+ DispatchLoop dispatcher = dispatcherMap.get(event.getClass());
+ if (dispatcher == null) {
+ dispatcher = defaultDispatcher;
+ }
+ return dispatcher;
+ }
@Override
public void post(Event event) {
- if (!events.add(event)) {
+
+ if (!getDispatcher(event).add(event)) {
log.error("Unable to post event {}", event);
}
}
@Activate
public void activate() {
- dispatchLoop = new DispatchLoop();
- dispatchFuture = executor.submit(dispatchLoop);
if (maxProcessMillis != 0) {
- startWatchdog();
+ dispatchers.forEach(DispatchLoop::startWatchdog);
}
log.info("Started");
@@ -98,25 +122,11 @@
@Deactivate
public void deactivate() {
- dispatchLoop.stop();
- stopWatchdog();
- post(KILL_PILL);
+ dispatchers.forEach(DispatchLoop::stop);
+
log.info("Stopped");
}
- private void startWatchdog() {
- log.info("Starting watchdog task");
- watchdog = new Watchdog();
- SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
- }
-
- private void stopWatchdog() {
- log.info("Stopping watchdog task");
- if (watchdog != null) {
- watchdog.cancel();
- }
- }
-
@Override
public void setDispatchTimeLimit(long millis) {
checkPermission(EVENT_WRITE);
@@ -126,9 +136,9 @@
maxProcessMillis = millis;
if (millis == 0 && oldMillis != 0) {
- stopWatchdog();
+ dispatchers.forEach(DispatchLoop::stopWatchdog);
} else if (millis != 0 && oldMillis == 0) {
- startWatchdog();
+ dispatchers.forEach(DispatchLoop::startWatchdog);
}
}
@@ -140,7 +150,28 @@
// Auxiliary event dispatching loop that feeds off the events queue.
private class DispatchLoop implements Runnable {
+ private final String name;
private volatile boolean stopped;
+ private volatile EventSink lastSink;
+ // Means to detect long-running sinks
+ private final Stopwatch stopwatch = Stopwatch.createUnstarted();
+ private TimerTask watchdog;
+ private volatile Future<?> dispatchFuture;
+ private final BlockingQueue<Event> eventsQueue;
+ private final ExecutorService executor;
+
+ DispatchLoop(String name) {
+ this.name = name;
+ executor = newSingleThreadExecutor(
+ groupedThreads("onos/event",
+ "dispatch-" + name + "%d", log));
+ eventsQueue = new LinkedBlockingQueue<>();
+ dispatchFuture = executor.submit(this);
+ }
+
+ public boolean add(Event event) {
+ return eventsQueue.add(event);
+ }
@Override
public void run() {
@@ -149,7 +180,7 @@
while (!stopped) {
try {
// Fetch the next event and if it is the kill-pill, bail
- Event event = events.take();
+ Event event = eventsQueue.take();
if (event == KILL_PILL) {
break;
}
@@ -180,28 +211,46 @@
void stop() {
stopped = true;
+ stopWatchdog();
+ add(KILL_PILL);
}
- }
- // Monitors event sinks to make sure none take too long to execute.
- private class Watchdog extends TimerTask {
- @Override
- public void run() {
- long elapsedTimeMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS);
- if (elapsedTimeMillis > maxProcessMillis) {
- stopwatch.reset();
- log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop",
- lastSink.getClass().getName(), elapsedTimeMillis);
+ // Monitors event sinks to make sure none take too long to execute.
+ private class Watchdog extends TimerTask {
+ @Override
+ public void run() {
+ long elapsedTimeMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ if (elapsedTimeMillis > maxProcessMillis) {
+ stopwatch.reset();
+ log.warn("Event sink {} exceeded execution time limit: {} ms; " +
+ "spawning new dispatch loop",
+ lastSink.getClass().getName(), elapsedTimeMillis);
- // Notify the sink that it has exceeded its time limit.
- lastSink.onProcessLimit();
+ // Notify the sink that it has exceeded its time limit.
+ lastSink.onProcessLimit();
- // Cancel the old dispatch loop and submit a new one.
- dispatchLoop.stop();
- dispatchLoop = new DispatchLoop();
+ // Cancel the old dispatch loop and submit a new one.
+
+ stop();
dispatchFuture.cancel(true);
- dispatchFuture = executor.submit(dispatchLoop);
+ dispatchFuture = executor.submit(this);
+ }
+ }
+ }
+
+ private void startWatchdog() {
+ log.info("Starting watchdog task for dispatcher {}", name);
+ watchdog = new Watchdog();
+ SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
+ }
+
+ private void stopWatchdog() {
+ log.info("Stopping watchdog task for dispatcher {}", name);
+ if (watchdog != null) {
+ watchdog.cancel();
}
}
}
+
+
}
diff --git a/core/net/src/test/java/org/onosproject/event/impl/CoreEventDispatcherTest.java b/core/net/src/test/java/org/onosproject/event/impl/CoreEventDispatcherTest.java
index 77b2d68..1538e16 100644
--- a/core/net/src/test/java/org/onosproject/event/impl/CoreEventDispatcherTest.java
+++ b/core/net/src/test/java/org/onosproject/event/impl/CoreEventDispatcherTest.java
@@ -27,6 +27,7 @@
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* Test of the event dispatcher mechanism.
@@ -76,6 +77,17 @@
validate(prickleSink);
}
+ @Test
+ public void postEventSinkTakesTooLong() throws Exception {
+ SinkProcessTakesTooLong takesTooLong = new SinkProcessTakesTooLong();
+ dispatcher.setDispatchTimeLimit(250);
+ dispatcher.addSink(TooLongEvent.class, takesTooLong);
+ takesTooLong.latch = new CountDownLatch(1);
+ dispatcher.post(new TooLongEvent("XYZZY"));
+ takesTooLong.latch.await(1000, TimeUnit.MILLISECONDS);
+ assertTrue(takesTooLong.interrupted);
+ }
+
private void validate(Sink sink, String... strings) {
int i = 0;
assertEquals("incorrect event count", strings.length, sink.subjects.size());
@@ -129,4 +141,25 @@
}
}
+ private static class TooLongEvent extends AbstractEvent<Type, String> {
+ protected TooLongEvent(String subject) {
+ super(Type.FOO, subject);
+ }
+ }
+
+ private static class SinkProcessTakesTooLong
+ implements EventSink<TooLongEvent> {
+ boolean interrupted = false;
+ CountDownLatch latch;
+
+ public void process(TooLongEvent event) {
+ try {
+ Thread.sleep(5 * 1000);
+ } catch (InterruptedException ie) {
+ interrupted = true;
+ }
+ latch.countDown();
+ }
+ }
+
}