blob: ec50bc5890644ad8c2e9da066872d35fa02ad271 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.event.impl;
tom94bb4a42014-08-27 22:12:02 -070017
Ray Milkeyd84f89b2018-08-17 14:54:17 -070018import com.google.common.base.Stopwatch;
19import com.google.common.collect.ImmutableMap;
20import com.google.common.collect.ImmutableSet;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070021import org.onlab.util.SharedExecutors;
Brian O'Connorabafb502014-12-02 22:26:20 -080022import org.onosproject.event.AbstractEvent;
23import org.onosproject.event.DefaultEventSinkRegistry;
24import org.onosproject.event.Event;
25import org.onosproject.event.EventDeliveryService;
26import org.onosproject.event.EventSink;
Ray Milkeyce48f962016-10-20 15:15:49 -070027import org.onosproject.net.device.DeviceEvent;
28import org.onosproject.net.flow.FlowRuleEvent;
29import org.onosproject.net.host.HostEvent;
30import org.onosproject.net.intent.IntentEvent;
31import org.onosproject.net.link.LinkEvent;
32import org.onosproject.net.topology.TopologyEvent;
Ray Milkeyd84f89b2018-08-17 14:54:17 -070033import org.osgi.service.component.annotations.Activate;
34import org.osgi.service.component.annotations.Component;
35import org.osgi.service.component.annotations.Deactivate;
tom5f38b3a2014-08-27 23:50:54 -070036import org.slf4j.Logger;
tom94bb4a42014-08-27 22:12:02 -070037
Ray Milkeyd84f89b2018-08-17 14:54:17 -070038import java.util.Map;
39import java.util.Set;
40import java.util.TimerTask;
41import java.util.concurrent.BlockingQueue;
42import java.util.concurrent.ExecutorService;
43import java.util.concurrent.Future;
44import java.util.concurrent.LinkedBlockingQueue;
45import java.util.concurrent.TimeUnit;
tom5f38b3a2014-08-27 23:50:54 -070046
Thomas Vachuska36002e62015-05-19 16:12:29 -070047import static com.google.common.base.Preconditions.checkArgument;
tom5f38b3a2014-08-27 23:50:54 -070048import static java.util.concurrent.Executors.newSingleThreadExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080049import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090050import static org.onosproject.security.AppGuard.checkPermission;
Jonathan Hart943893f2016-04-08 13:38:54 -070051import static org.onosproject.security.AppPermission.Type.EVENT_READ;
52import static org.onosproject.security.AppPermission.Type.EVENT_WRITE;
53import static org.slf4j.LoggerFactory.getLogger;
tom94bb4a42014-08-27 22:12:02 -070054/**
55 * Simple implementation of an event dispatching service.
56 */
Ray Milkeyd84f89b2018-08-17 14:54:17 -070057@Component(immediate = true, service = EventDeliveryService.class)
tom202175a2014-09-19 19:00:11 -070058public class CoreEventDispatcher extends DefaultEventSinkRegistry
tom96dfcab2014-08-28 09:26:03 -070059 implements EventDeliveryService {
tom94bb4a42014-08-27 22:12:02 -070060
tom5f38b3a2014-08-27 23:50:54 -070061 private final Logger log = getLogger(getClass());
62
Ray Milkeyce48f962016-10-20 15:15:49 -070063
64 private DispatchLoop topologyDispatcher = new DispatchLoop("topology");
65 private DispatchLoop programmingDispatcher = new DispatchLoop("programming");
66 private DispatchLoop defaultDispatcher = new DispatchLoop("default");
67
68 private Map<Class, DispatchLoop> dispatcherMap =
69 new ImmutableMap.Builder<Class, DispatchLoop>()
70 .put(TopologyEvent.class, topologyDispatcher)
71 .put(DeviceEvent.class, topologyDispatcher)
72 .put(LinkEvent.class, topologyDispatcher)
73 .put(HostEvent.class, topologyDispatcher)
74 .put(FlowRuleEvent.class, programmingDispatcher)
75 .put(IntentEvent.class, programmingDispatcher)
76 .build();
77
78 private Set<DispatchLoop> dispatchers =
79 new ImmutableSet.Builder<DispatchLoop>()
80 .addAll(dispatcherMap.values())
81 .add(defaultDispatcher)
82 .build();
Jonathan Hart943893f2016-04-08 13:38:54 -070083
Thomas Vachuska36002e62015-05-19 16:12:29 -070084 // Default number of millis a sink can take to process an event.
Thomas Vachuska409b9cb2015-07-31 13:07:12 -070085 private static final long DEFAULT_EXECUTE_MS = 5_000; // ms
Thomas Vachuska36002e62015-05-19 16:12:29 -070086 private static final long WATCHDOG_MS = 250; // ms
87
tom5f38b3a2014-08-27 23:50:54 -070088 @SuppressWarnings("unchecked")
89 private static final Event KILL_PILL = new AbstractEvent(null, 0) {
90 };
91
Thomas Vachuska36002e62015-05-19 16:12:29 -070092 private long maxProcessMillis = DEFAULT_EXECUTE_MS;
tom5f38b3a2014-08-27 23:50:54 -070093
Ray Milkeyce48f962016-10-20 15:15:49 -070094 private DispatchLoop getDispatcher(Event event) {
95 DispatchLoop dispatcher = dispatcherMap.get(event.getClass());
96 if (dispatcher == null) {
97 dispatcher = defaultDispatcher;
98 }
99 return dispatcher;
100 }
tom94bb4a42014-08-27 22:12:02 -0700101
102 @Override
103 public void post(Event event) {
Ray Milkeyce48f962016-10-20 15:15:49 -0700104
105 if (!getDispatcher(event).add(event)) {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700106 log.error("Unable to post event {}", event);
107 }
tom94bb4a42014-08-27 22:12:02 -0700108 }
109
tom5f38b3a2014-08-27 23:50:54 -0700110 @Activate
111 public void activate() {
Jonathan Hart943893f2016-04-08 13:38:54 -0700112
113 if (maxProcessMillis != 0) {
Ray Milkeyce48f962016-10-20 15:15:49 -0700114 dispatchers.forEach(DispatchLoop::startWatchdog);
Jonathan Hart943893f2016-04-08 13:38:54 -0700115 }
116
tom5f38b3a2014-08-27 23:50:54 -0700117 log.info("Started");
tom94bb4a42014-08-27 22:12:02 -0700118 }
119
tom5f38b3a2014-08-27 23:50:54 -0700120 @Deactivate
121 public void deactivate() {
Ray Milkeyce48f962016-10-20 15:15:49 -0700122 dispatchers.forEach(DispatchLoop::stop);
123
tom5f38b3a2014-08-27 23:50:54 -0700124 log.info("Stopped");
tom94bb4a42014-08-27 22:12:02 -0700125 }
126
Thomas Vachuska36002e62015-05-19 16:12:29 -0700127 @Override
128 public void setDispatchTimeLimit(long millis) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900129 checkPermission(EVENT_WRITE);
Jonathan Hart943893f2016-04-08 13:38:54 -0700130 checkArgument(millis == 0 || millis >= WATCHDOG_MS,
Thomas Vachuska36002e62015-05-19 16:12:29 -0700131 "Time limit must be greater than %s", WATCHDOG_MS);
Jonathan Hart943893f2016-04-08 13:38:54 -0700132 long oldMillis = maxProcessMillis;
Thomas Vachuska36002e62015-05-19 16:12:29 -0700133 maxProcessMillis = millis;
Jonathan Hart943893f2016-04-08 13:38:54 -0700134
135 if (millis == 0 && oldMillis != 0) {
Ray Milkeyce48f962016-10-20 15:15:49 -0700136 dispatchers.forEach(DispatchLoop::stopWatchdog);
Jonathan Hart943893f2016-04-08 13:38:54 -0700137 } else if (millis != 0 && oldMillis == 0) {
Ray Milkeyce48f962016-10-20 15:15:49 -0700138 dispatchers.forEach(DispatchLoop::startWatchdog);
Jonathan Hart943893f2016-04-08 13:38:54 -0700139 }
Thomas Vachuska36002e62015-05-19 16:12:29 -0700140 }
141
142 @Override
143 public long getDispatchTimeLimit() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900144 checkPermission(EVENT_READ);
Thomas Vachuska36002e62015-05-19 16:12:29 -0700145 return maxProcessMillis;
146 }
147
tom5f38b3a2014-08-27 23:50:54 -0700148 // Auxiliary event dispatching loop that feeds off the events queue.
149 private class DispatchLoop implements Runnable {
Ray Milkeyce48f962016-10-20 15:15:49 -0700150 private final String name;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700151 private volatile boolean stopped;
Ray Milkeyce48f962016-10-20 15:15:49 -0700152 private volatile EventSink lastSink;
153 // Means to detect long-running sinks
154 private final Stopwatch stopwatch = Stopwatch.createUnstarted();
155 private TimerTask watchdog;
156 private volatile Future<?> dispatchFuture;
157 private final BlockingQueue<Event> eventsQueue;
158 private final ExecutorService executor;
159
160 DispatchLoop(String name) {
161 this.name = name;
162 executor = newSingleThreadExecutor(
163 groupedThreads("onos/event",
164 "dispatch-" + name + "%d", log));
165 eventsQueue = new LinkedBlockingQueue<>();
166 dispatchFuture = executor.submit(this);
167 }
168
169 public boolean add(Event event) {
170 return eventsQueue.add(event);
171 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700172
tom5f38b3a2014-08-27 23:50:54 -0700173 @Override
tom5f38b3a2014-08-27 23:50:54 -0700174 public void run() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700175 stopped = false;
jaegonkima9ff68b2017-08-20 09:08:22 +0900176 log.info("Dispatch loop({}) initiated", name);
tom5f38b3a2014-08-27 23:50:54 -0700177 while (!stopped) {
178 try {
179 // Fetch the next event and if it is the kill-pill, bail
Ray Milkeyce48f962016-10-20 15:15:49 -0700180 Event event = eventsQueue.take();
jaegonkima9ff68b2017-08-20 09:08:22 +0900181 if (event != KILL_PILL) {
182 process(event);
tom5f38b3a2014-08-27 23:50:54 -0700183 }
Thomas Vachuska409b9cb2015-07-31 13:07:12 -0700184 } catch (InterruptedException e) {
185 log.warn("Dispatch loop interrupted");
Jonathan Hartb68919e2016-02-05 15:22:36 -0800186 } catch (Exception | Error e) {
tom5f38b3a2014-08-27 23:50:54 -0700187 log.warn("Error encountered while dispatching event:", e);
188 }
189 }
jaegonkima9ff68b2017-08-20 09:08:22 +0900190 log.info("Dispatch loop({}) terminated", name);
tom5f38b3a2014-08-27 23:50:54 -0700191 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700192
193 // Locate the sink for the event class and use it to process the event
194 @SuppressWarnings("unchecked")
195 private void process(Event event) {
196 EventSink sink = getSink(event.getClass());
197 if (sink != null) {
198 lastSink = sink;
Madan Jampani6a292312016-06-24 09:19:59 -0700199 stopwatch.start();
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700200 sink.process(event);
Madan Jampani6a292312016-06-24 09:19:59 -0700201 stopwatch.reset();
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700202 } else {
203 log.warn("No sink registered for event class {}",
Thomas Vachuska36002e62015-05-19 16:12:29 -0700204 event.getClass().getName());
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700205 }
206 }
207
208 void stop() {
209 stopped = true;
Ray Milkeyce48f962016-10-20 15:15:49 -0700210 add(KILL_PILL);
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700211 }
tom94bb4a42014-08-27 22:12:02 -0700212
Tian Jian37964762017-07-28 16:06:16 +0800213 void restart() {
214 dispatchFuture.cancel(true);
215 dispatchFuture = executor.submit(this);
216 }
217
Ray Milkeyce48f962016-10-20 15:15:49 -0700218 // Monitors event sinks to make sure none take too long to execute.
219 private class Watchdog extends TimerTask {
220 @Override
221 public void run() {
222 long elapsedTimeMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS);
223 if (elapsedTimeMillis > maxProcessMillis) {
224 stopwatch.reset();
225 log.warn("Event sink {} exceeded execution time limit: {} ms; " +
226 "spawning new dispatch loop",
227 lastSink.getClass().getName(), elapsedTimeMillis);
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700228
Ray Milkeyce48f962016-10-20 15:15:49 -0700229 // Notify the sink that it has exceeded its time limit.
230 lastSink.onProcessLimit();
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700231
Ray Milkeyce48f962016-10-20 15:15:49 -0700232 // Cancel the old dispatch loop and submit a new one.
233
Tian Jian37964762017-07-28 16:06:16 +0800234 stop();
235 restart();
Ray Milkeyce48f962016-10-20 15:15:49 -0700236 }
237 }
238 }
239
240 private void startWatchdog() {
241 log.info("Starting watchdog task for dispatcher {}", name);
242 watchdog = new Watchdog();
243 SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
244 }
245
246 private void stopWatchdog() {
247 log.info("Stopping watchdog task for dispatcher {}", name);
248 if (watchdog != null) {
249 watchdog.cancel();
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700250 }
251 }
252 }
Ray Milkeyce48f962016-10-20 15:15:49 -0700253
254
tom94bb4a42014-08-27 22:12:02 -0700255}