blob: 3834676e90454183ed7edb6946d5d7e13c824c91 [file] [log] [blame]
tom94bb4a42014-08-27 22:12:02 -07001package org.onlab.onos.event.impl;
2
tom5f38b3a2014-08-27 23:50:54 -07003import org.apache.felix.scr.annotations.Activate;
tom94bb4a42014-08-27 22:12:02 -07004import org.apache.felix.scr.annotations.Component;
tom5f38b3a2014-08-27 23:50:54 -07005import org.apache.felix.scr.annotations.Deactivate;
tom94bb4a42014-08-27 22:12:02 -07006import org.apache.felix.scr.annotations.Service;
tom5f38b3a2014-08-27 23:50:54 -07007import org.onlab.onos.event.AbstractEvent;
tom96dfcab2014-08-28 09:26:03 -07008import org.onlab.onos.event.DefaultEventSinkRegistry;
tom94bb4a42014-08-27 22:12:02 -07009import org.onlab.onos.event.Event;
tom96dfcab2014-08-28 09:26:03 -070010import org.onlab.onos.event.EventDeliveryService;
tom94bb4a42014-08-27 22:12:02 -070011import org.onlab.onos.event.EventSink;
tom5f38b3a2014-08-27 23:50:54 -070012import org.slf4j.Logger;
tom94bb4a42014-08-27 22:12:02 -070013
tom5f38b3a2014-08-27 23:50:54 -070014import java.util.concurrent.BlockingQueue;
tom94bb4a42014-08-27 22:12:02 -070015import java.util.concurrent.ExecutorService;
tom5f38b3a2014-08-27 23:50:54 -070016import java.util.concurrent.LinkedBlockingQueue;
17
18import static java.util.concurrent.Executors.newSingleThreadExecutor;
19import static org.onlab.util.Tools.namedThreads;
20import static org.slf4j.LoggerFactory.getLogger;
tom94bb4a42014-08-27 22:12:02 -070021
22/**
23 * Simple implementation of an event dispatching service.
24 */
25@Component(immediate = true)
26@Service
tom96dfcab2014-08-28 09:26:03 -070027public class SimpleEventDispatcher extends DefaultEventSinkRegistry
28 implements EventDeliveryService {
tom94bb4a42014-08-27 22:12:02 -070029
tom5f38b3a2014-08-27 23:50:54 -070030 private final Logger log = getLogger(getClass());
31
32 private final ExecutorService executor =
33 newSingleThreadExecutor(namedThreads("event-dispatch-%d"));
34
35 @SuppressWarnings("unchecked")
36 private static final Event KILL_PILL = new AbstractEvent(null, 0) {
37 };
38
39 private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
40
41 private volatile boolean stopped = false;
tom94bb4a42014-08-27 22:12:02 -070042
43 @Override
44 public void post(Event event) {
tom5f38b3a2014-08-27 23:50:54 -070045 events.add(event);
tom94bb4a42014-08-27 22:12:02 -070046 }
47
tom5f38b3a2014-08-27 23:50:54 -070048 @Activate
49 public void activate() {
50 stopped = false;
51 executor.execute(new DispatchLoop());
52 log.info("Started");
tom94bb4a42014-08-27 22:12:02 -070053 }
54
tom5f38b3a2014-08-27 23:50:54 -070055 @Deactivate
56 public void deactivate() {
57 stopped = true;
58 post(KILL_PILL);
59 log.info("Stopped");
tom94bb4a42014-08-27 22:12:02 -070060 }
61
tom5f38b3a2014-08-27 23:50:54 -070062 // Auxiliary event dispatching loop that feeds off the events queue.
63 private class DispatchLoop implements Runnable {
64 @Override
65 @SuppressWarnings("unchecked")
66 public void run() {
67 log.info("Dispatch loop initiated");
68 while (!stopped) {
69 try {
70 // Fetch the next event and if it is the kill-pill, bail
71 Event event = events.take();
72 if (event == KILL_PILL) {
73 break;
74 }
75
76 // Locate the sink for the event class and use it to
77 // process the event
78 EventSink sink = getSink(event.getClass());
79 if (sink != null) {
80 sink.process(event);
81 } else {
82 log.warn("No sink registered for event class {}",
83 event.getClass());
84 }
tom19bf4212014-08-29 13:08:29 -070085 } catch (Exception e) {
tom5f38b3a2014-08-27 23:50:54 -070086 log.warn("Error encountered while dispatching event:", e);
87 }
88 }
89 log.info("Dispatch loop terminated");
90 }
tom94bb4a42014-08-27 22:12:02 -070091 }
92
tom94bb4a42014-08-27 22:12:02 -070093}