blob: e63ecdf15a6540b81b27d11e594e4c261f473c6c [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.event.impl;
tom94bb4a42014-08-27 22:12:02 -070017
tom5f38b3a2014-08-27 23:50:54 -070018import org.apache.felix.scr.annotations.Activate;
tom94bb4a42014-08-27 22:12:02 -070019import org.apache.felix.scr.annotations.Component;
tom5f38b3a2014-08-27 23:50:54 -070020import org.apache.felix.scr.annotations.Deactivate;
tom94bb4a42014-08-27 22:12:02 -070021import org.apache.felix.scr.annotations.Service;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070022import org.onlab.util.SharedExecutors;
Brian O'Connorabafb502014-12-02 22:26:20 -080023import org.onosproject.event.AbstractEvent;
24import org.onosproject.event.DefaultEventSinkRegistry;
25import org.onosproject.event.Event;
26import org.onosproject.event.EventDeliveryService;
27import org.onosproject.event.EventSink;
tom5f38b3a2014-08-27 23:50:54 -070028import org.slf4j.Logger;
tom94bb4a42014-08-27 22:12:02 -070029
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070030import java.util.TimerTask;
tom5f38b3a2014-08-27 23:50:54 -070031import java.util.concurrent.BlockingQueue;
tom94bb4a42014-08-27 22:12:02 -070032import java.util.concurrent.ExecutorService;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070033import java.util.concurrent.Future;
tom5f38b3a2014-08-27 23:50:54 -070034import java.util.concurrent.LinkedBlockingQueue;
35
Thomas Vachuska36002e62015-05-19 16:12:29 -070036import static com.google.common.base.Preconditions.checkArgument;
tom5f38b3a2014-08-27 23:50:54 -070037import static java.util.concurrent.Executors.newSingleThreadExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080038import static org.onlab.util.Tools.groupedThreads;
tom5f38b3a2014-08-27 23:50:54 -070039import static org.slf4j.LoggerFactory.getLogger;
tom94bb4a42014-08-27 22:12:02 -070040
41/**
42 * Simple implementation of an event dispatching service.
43 */
44@Component(immediate = true)
45@Service
tom202175a2014-09-19 19:00:11 -070046public class CoreEventDispatcher extends DefaultEventSinkRegistry
tom96dfcab2014-08-28 09:26:03 -070047 implements EventDeliveryService {
tom94bb4a42014-08-27 22:12:02 -070048
tom5f38b3a2014-08-27 23:50:54 -070049 private final Logger log = getLogger(getClass());
50
Thomas Vachuska36002e62015-05-19 16:12:29 -070051 // Default number of millis a sink can take to process an event.
Thomas Vachuska409b9cb2015-07-31 13:07:12 -070052 private static final long DEFAULT_EXECUTE_MS = 5_000; // ms
Thomas Vachuska36002e62015-05-19 16:12:29 -070053 private static final long WATCHDOG_MS = 250; // ms
54
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070055 private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
56
tom5f38b3a2014-08-27 23:50:54 -070057 private final ExecutorService executor =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080058 newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d"));
tom5f38b3a2014-08-27 23:50:54 -070059
60 @SuppressWarnings("unchecked")
61 private static final Event KILL_PILL = new AbstractEvent(null, 0) {
62 };
63
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070064 private DispatchLoop dispatchLoop;
Thomas Vachuska36002e62015-05-19 16:12:29 -070065 private long maxProcessMillis = DEFAULT_EXECUTE_MS;
tom5f38b3a2014-08-27 23:50:54 -070066
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070067 // Means to detect long-running sinks
68 private TimerTask watchdog;
69 private EventSink lastSink;
70 private long lastStart = 0;
71 private Future<?> dispatchFuture;
tom94bb4a42014-08-27 22:12:02 -070072
73 @Override
74 public void post(Event event) {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070075 if (!events.add(event)) {
76 log.error("Unable to post event {}", event);
77 }
tom94bb4a42014-08-27 22:12:02 -070078 }
79
tom5f38b3a2014-08-27 23:50:54 -070080 @Activate
81 public void activate() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070082 dispatchLoop = new DispatchLoop();
83 dispatchFuture = executor.submit(dispatchLoop);
84 watchdog = new Watchdog();
85 SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
tom5f38b3a2014-08-27 23:50:54 -070086 log.info("Started");
tom94bb4a42014-08-27 22:12:02 -070087 }
88
tom5f38b3a2014-08-27 23:50:54 -070089 @Deactivate
90 public void deactivate() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070091 dispatchLoop.stop();
92 watchdog.cancel();
tom5f38b3a2014-08-27 23:50:54 -070093 post(KILL_PILL);
94 log.info("Stopped");
tom94bb4a42014-08-27 22:12:02 -070095 }
96
Thomas Vachuska36002e62015-05-19 16:12:29 -070097 @Override
98 public void setDispatchTimeLimit(long millis) {
99 checkArgument(millis >= WATCHDOG_MS,
100 "Time limit must be greater than %s", WATCHDOG_MS);
101 maxProcessMillis = millis;
102 }
103
104 @Override
105 public long getDispatchTimeLimit() {
106 return maxProcessMillis;
107 }
108
tom5f38b3a2014-08-27 23:50:54 -0700109 // Auxiliary event dispatching loop that feeds off the events queue.
110 private class DispatchLoop implements Runnable {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700111 private volatile boolean stopped;
112
tom5f38b3a2014-08-27 23:50:54 -0700113 @Override
tom5f38b3a2014-08-27 23:50:54 -0700114 public void run() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700115 stopped = false;
tom5f38b3a2014-08-27 23:50:54 -0700116 log.info("Dispatch loop initiated");
117 while (!stopped) {
118 try {
119 // Fetch the next event and if it is the kill-pill, bail
120 Event event = events.take();
121 if (event == KILL_PILL) {
122 break;
123 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700124 process(event);
Thomas Vachuska409b9cb2015-07-31 13:07:12 -0700125 } catch (InterruptedException e) {
126 log.warn("Dispatch loop interrupted");
Jonathan Hartb68919e2016-02-05 15:22:36 -0800127 } catch (Exception | Error e) {
tom5f38b3a2014-08-27 23:50:54 -0700128 log.warn("Error encountered while dispatching event:", e);
129 }
130 }
131 log.info("Dispatch loop terminated");
132 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700133
134 // Locate the sink for the event class and use it to process the event
135 @SuppressWarnings("unchecked")
136 private void process(Event event) {
137 EventSink sink = getSink(event.getClass());
138 if (sink != null) {
139 lastSink = sink;
140 lastStart = System.currentTimeMillis();
141 sink.process(event);
142 lastStart = 0;
143 } else {
144 log.warn("No sink registered for event class {}",
Thomas Vachuska36002e62015-05-19 16:12:29 -0700145 event.getClass().getName());
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700146 }
147 }
148
149 void stop() {
150 stopped = true;
151 }
tom94bb4a42014-08-27 22:12:02 -0700152 }
153
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700154 // Monitors event sinks to make sure none take too long to execute.
155 private class Watchdog extends TimerTask {
156 @Override
157 public void run() {
158 long delta = System.currentTimeMillis() - lastStart;
Thomas Vachuska36002e62015-05-19 16:12:29 -0700159 if (lastStart > 0 && delta > maxProcessMillis) {
Thomas Vachuska99c92fd2015-06-01 11:44:53 -0700160 lastStart = 0;
Thomas Vachuska8d033672015-07-21 16:15:04 -0700161 log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop",
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700162 lastSink.getClass().getName(), delta);
163
164 // Notify the sink that it has exceeded its time limit.
165 lastSink.onProcessLimit();
166
167 // Cancel the old dispatch loop and submit a new one.
168 dispatchLoop.stop();
169 dispatchLoop = new DispatchLoop();
170 dispatchFuture.cancel(true);
171 dispatchFuture = executor.submit(dispatchLoop);
172 }
173 }
174 }
tom94bb4a42014-08-27 22:12:02 -0700175}