Cleaned-up templates and added package.html files.
diff --git a/net/core/trivial/src/main/java/org/onlab/onos/event/impl/SimpleEventDispatcher.java b/net/core/trivial/src/main/java/org/onlab/onos/event/impl/SimpleEventDispatcher.java
index 67cbc73..7ef8735 100644
--- a/net/core/trivial/src/main/java/org/onlab/onos/event/impl/SimpleEventDispatcher.java
+++ b/net/core/trivial/src/main/java/org/onlab/onos/event/impl/SimpleEventDispatcher.java
@@ -1,46 +1,93 @@
package org.onlab.onos.event.impl;
+import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.event.AbstractEvent;
+import org.onlab.onos.event.DefaultEventSinkBroker;
import org.onlab.onos.event.Event;
import org.onlab.onos.event.EventDispatchService;
import org.onlab.onos.event.EventSink;
+import org.slf4j.Logger;
-import java.util.Set;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.namedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Simple implementation of an event dispatching service.
*/
@Component(immediate = true)
@Service
-public class SimpleEventDispatcher implements EventDispatchService {
+public class SimpleEventDispatcher extends DefaultEventSinkBroker
+ implements EventDispatchService {
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
+ private final Logger log = getLogger(getClass());
+
+ private final ExecutorService executor =
+ newSingleThreadExecutor(namedThreads("event-dispatch-%d"));
+
+ @SuppressWarnings("unchecked")
+ private static final Event KILL_PILL = new AbstractEvent(null, 0) {
+ };
+
+ private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
+
+ private volatile boolean stopped = false;
@Override
public void post(Event event) {
-
+ events.add(event);
}
- @Override
- public <E extends Event> void addSink(Class<E> eventClass, EventSink<E> sink) {
-
+ @Activate
+ public void activate() {
+ stopped = false;
+ executor.execute(new DispatchLoop());
+ log.info("Started");
}
- @Override
- public <E extends Event> void removeSink(Class<E> eventClass) {
-
+ @Deactivate
+ public void deactivate() {
+ stopped = true;
+ post(KILL_PILL);
+ log.info("Stopped");
}
- @Override
- public <E extends Event> EventSink<E> getSink(Class<E> eventClass) {
- return null;
+ // Auxiliary event dispatching loop that feeds off the events queue.
+ private class DispatchLoop implements Runnable {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ log.info("Dispatch loop initiated");
+ while (!stopped) {
+ try {
+ // Fetch the next event and if it is the kill-pill, bail
+ Event event = events.take();
+ if (event == KILL_PILL) {
+ break;
+ }
+
+ // Locate the sink for the event class and use it to
+ // process the event
+ EventSink sink = getSink(event.getClass());
+ if (sink != null) {
+ sink.process(event);
+ } else {
+ log.warn("No sink registered for event class {}",
+ event.getClass());
+ }
+ } catch (Throwable e) {
+ log.warn("Error encountered while dispatching event:", e);
+ }
+ }
+ log.info("Dispatch loop terminated");
+ }
}
- @Override
- public Set<Class<? extends Event>> getSinks() {
- return null;
- }
}