blob: 382846c682c08783acf1ba527e5982e129a743de [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.event.impl;
tom94bb4a42014-08-27 22:12:02 -070017
tom5f38b3a2014-08-27 23:50:54 -070018import org.apache.felix.scr.annotations.Activate;
tom94bb4a42014-08-27 22:12:02 -070019import org.apache.felix.scr.annotations.Component;
tom5f38b3a2014-08-27 23:50:54 -070020import org.apache.felix.scr.annotations.Deactivate;
tom94bb4a42014-08-27 22:12:02 -070021import org.apache.felix.scr.annotations.Service;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070022import org.onlab.util.SharedExecutors;
Brian O'Connorabafb502014-12-02 22:26:20 -080023import org.onosproject.event.AbstractEvent;
24import org.onosproject.event.DefaultEventSinkRegistry;
25import org.onosproject.event.Event;
26import org.onosproject.event.EventDeliveryService;
27import org.onosproject.event.EventSink;
tom5f38b3a2014-08-27 23:50:54 -070028import org.slf4j.Logger;
tom94bb4a42014-08-27 22:12:02 -070029
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070030import java.util.TimerTask;
tom5f38b3a2014-08-27 23:50:54 -070031import java.util.concurrent.BlockingQueue;
tom94bb4a42014-08-27 22:12:02 -070032import java.util.concurrent.ExecutorService;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070033import java.util.concurrent.Future;
tom5f38b3a2014-08-27 23:50:54 -070034import java.util.concurrent.LinkedBlockingQueue;
35
36import static java.util.concurrent.Executors.newSingleThreadExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080037import static org.onlab.util.Tools.groupedThreads;
tom5f38b3a2014-08-27 23:50:54 -070038import static org.slf4j.LoggerFactory.getLogger;
tom94bb4a42014-08-27 22:12:02 -070039
40/**
41 * Simple implementation of an event dispatching service.
42 */
43@Component(immediate = true)
44@Service
tom202175a2014-09-19 19:00:11 -070045public class CoreEventDispatcher extends DefaultEventSinkRegistry
tom96dfcab2014-08-28 09:26:03 -070046 implements EventDeliveryService {
tom94bb4a42014-08-27 22:12:02 -070047
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070048 // Maximum number of millis a sink can take to process an event.
49 private static final long MAX_EXECUTE_MS = 1_000;
50 private static final long WATCHDOG_MS = MAX_EXECUTE_MS / 4;
51
tom5f38b3a2014-08-27 23:50:54 -070052 private final Logger log = getLogger(getClass());
53
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070054 private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
55
tom5f38b3a2014-08-27 23:50:54 -070056 private final ExecutorService executor =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080057 newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d"));
tom5f38b3a2014-08-27 23:50:54 -070058
59 @SuppressWarnings("unchecked")
60 private static final Event KILL_PILL = new AbstractEvent(null, 0) {
61 };
62
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070063 private DispatchLoop dispatchLoop;
tom5f38b3a2014-08-27 23:50:54 -070064
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070065 // Means to detect long-running sinks
66 private TimerTask watchdog;
67 private EventSink lastSink;
68 private long lastStart = 0;
69 private Future<?> dispatchFuture;
tom94bb4a42014-08-27 22:12:02 -070070
71 @Override
72 public void post(Event event) {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070073 if (!events.add(event)) {
74 log.error("Unable to post event {}", event);
75 }
tom94bb4a42014-08-27 22:12:02 -070076 }
77
tom5f38b3a2014-08-27 23:50:54 -070078 @Activate
79 public void activate() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070080 dispatchLoop = new DispatchLoop();
81 dispatchFuture = executor.submit(dispatchLoop);
82 watchdog = new Watchdog();
83 SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
tom5f38b3a2014-08-27 23:50:54 -070084 log.info("Started");
tom94bb4a42014-08-27 22:12:02 -070085 }
86
tom5f38b3a2014-08-27 23:50:54 -070087 @Deactivate
88 public void deactivate() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070089 dispatchLoop.stop();
90 watchdog.cancel();
tom5f38b3a2014-08-27 23:50:54 -070091 post(KILL_PILL);
92 log.info("Stopped");
tom94bb4a42014-08-27 22:12:02 -070093 }
94
tom5f38b3a2014-08-27 23:50:54 -070095 // Auxiliary event dispatching loop that feeds off the events queue.
96 private class DispatchLoop implements Runnable {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070097 private volatile boolean stopped;
98
tom5f38b3a2014-08-27 23:50:54 -070099 @Override
tom5f38b3a2014-08-27 23:50:54 -0700100 public void run() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700101 stopped = false;
tom5f38b3a2014-08-27 23:50:54 -0700102 log.info("Dispatch loop initiated");
103 while (!stopped) {
104 try {
105 // Fetch the next event and if it is the kill-pill, bail
106 Event event = events.take();
107 if (event == KILL_PILL) {
108 break;
109 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700110 process(event);
tom19bf4212014-08-29 13:08:29 -0700111 } catch (Exception e) {
tom5f38b3a2014-08-27 23:50:54 -0700112 log.warn("Error encountered while dispatching event:", e);
113 }
114 }
115 log.info("Dispatch loop terminated");
116 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700117
118 // Locate the sink for the event class and use it to process the event
119 @SuppressWarnings("unchecked")
120 private void process(Event event) {
121 EventSink sink = getSink(event.getClass());
122 if (sink != null) {
123 lastSink = sink;
124 lastStart = System.currentTimeMillis();
125 sink.process(event);
126 lastStart = 0;
127 } else {
128 log.warn("No sink registered for event class {}",
129 event.getClass());
130 }
131 }
132
133 void stop() {
134 stopped = true;
135 }
tom94bb4a42014-08-27 22:12:02 -0700136 }
137
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700138 // Monitors event sinks to make sure none take too long to execute.
139 private class Watchdog extends TimerTask {
140 @Override
141 public void run() {
142 long delta = System.currentTimeMillis() - lastStart;
143 if (lastStart > 0 && delta > MAX_EXECUTE_MS) {
144 log.error("Event sink {} exceeded execution time limit: {} ms",
145 lastSink.getClass().getName(), delta);
146
147 // Notify the sink that it has exceeded its time limit.
148 lastSink.onProcessLimit();
149
150 // Cancel the old dispatch loop and submit a new one.
151 dispatchLoop.stop();
152 dispatchLoop = new DispatchLoop();
153 dispatchFuture.cancel(true);
154 dispatchFuture = executor.submit(dispatchLoop);
155 }
156 }
157 }
tom94bb4a42014-08-27 22:12:02 -0700158}