tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 1 | package org.onlab.onos.event.impl; |
| 2 | |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 3 | import org.apache.felix.scr.annotations.Activate; |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 4 | import org.apache.felix.scr.annotations.Component; |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 5 | import org.apache.felix.scr.annotations.Deactivate; |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 6 | import org.apache.felix.scr.annotations.Service; |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 7 | import org.onlab.onos.event.AbstractEvent; |
tom | 96dfcab | 2014-08-28 09:26:03 -0700 | [diff] [blame] | 8 | import org.onlab.onos.event.DefaultEventSinkRegistry; |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 9 | import org.onlab.onos.event.Event; |
tom | 96dfcab | 2014-08-28 09:26:03 -0700 | [diff] [blame] | 10 | import org.onlab.onos.event.EventDeliveryService; |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 11 | import org.onlab.onos.event.EventSink; |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 12 | import org.slf4j.Logger; |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 13 | |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 14 | import java.util.concurrent.BlockingQueue; |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 15 | import java.util.concurrent.ExecutorService; |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 16 | import java.util.concurrent.LinkedBlockingQueue; |
| 17 | |
| 18 | import static java.util.concurrent.Executors.newSingleThreadExecutor; |
| 19 | import static org.onlab.util.Tools.namedThreads; |
| 20 | import static org.slf4j.LoggerFactory.getLogger; |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 21 | |
| 22 | /** |
| 23 | * Simple implementation of an event dispatching service. |
| 24 | */ |
| 25 | @Component(immediate = true) |
| 26 | @Service |
tom | 96dfcab | 2014-08-28 09:26:03 -0700 | [diff] [blame] | 27 | public class SimpleEventDispatcher extends DefaultEventSinkRegistry |
| 28 | implements EventDeliveryService { |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 29 | |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 30 | private final Logger log = getLogger(getClass()); |
| 31 | |
| 32 | private final ExecutorService executor = |
| 33 | newSingleThreadExecutor(namedThreads("event-dispatch-%d")); |
| 34 | |
| 35 | @SuppressWarnings("unchecked") |
| 36 | private static final Event KILL_PILL = new AbstractEvent(null, 0) { |
| 37 | }; |
| 38 | |
| 39 | private final BlockingQueue<Event> events = new LinkedBlockingQueue<>(); |
| 40 | |
| 41 | private volatile boolean stopped = false; |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 42 | |
| 43 | @Override |
| 44 | public void post(Event event) { |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 45 | events.add(event); |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 46 | } |
| 47 | |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 48 | @Activate |
| 49 | public void activate() { |
| 50 | stopped = false; |
| 51 | executor.execute(new DispatchLoop()); |
| 52 | log.info("Started"); |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 53 | } |
| 54 | |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 55 | @Deactivate |
| 56 | public void deactivate() { |
| 57 | stopped = true; |
| 58 | post(KILL_PILL); |
| 59 | log.info("Stopped"); |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 60 | } |
| 61 | |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 62 | // Auxiliary event dispatching loop that feeds off the events queue. |
| 63 | private class DispatchLoop implements Runnable { |
| 64 | @Override |
| 65 | @SuppressWarnings("unchecked") |
| 66 | public void run() { |
| 67 | log.info("Dispatch loop initiated"); |
| 68 | while (!stopped) { |
| 69 | try { |
| 70 | // Fetch the next event and if it is the kill-pill, bail |
| 71 | Event event = events.take(); |
| 72 | if (event == KILL_PILL) { |
| 73 | break; |
| 74 | } |
| 75 | |
| 76 | // Locate the sink for the event class and use it to |
| 77 | // process the event |
| 78 | EventSink sink = getSink(event.getClass()); |
| 79 | if (sink != null) { |
| 80 | sink.process(event); |
| 81 | } else { |
| 82 | log.warn("No sink registered for event class {}", |
| 83 | event.getClass()); |
| 84 | } |
tom | 19bf421 | 2014-08-29 13:08:29 -0700 | [diff] [blame] | 85 | } catch (Exception e) { |
tom | 5f38b3a | 2014-08-27 23:50:54 -0700 | [diff] [blame] | 86 | log.warn("Error encountered while dispatching event:", e); |
| 87 | } |
| 88 | } |
| 89 | log.info("Dispatch loop terminated"); |
| 90 | } |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 91 | } |
| 92 | |
tom | 94bb4a4 | 2014-08-27 22:12:02 -0700 | [diff] [blame] | 93 | } |