blob: 58c26c44c4bf64ff170be690b32f159d2e6916c7 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2014-present Open Networking Foundation
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.event.impl;
tom94bb4a42014-08-27 22:12:02 -070017
Ray Milkeyce48f962016-10-20 15:15:49 -070018import java.util.Map;
19import java.util.Set;
20import java.util.TimerTask;
21import java.util.concurrent.BlockingQueue;
22import java.util.concurrent.ExecutorService;
23import java.util.concurrent.Future;
24import java.util.concurrent.LinkedBlockingQueue;
25import java.util.concurrent.TimeUnit;
26
tom5f38b3a2014-08-27 23:50:54 -070027import org.apache.felix.scr.annotations.Activate;
tom94bb4a42014-08-27 22:12:02 -070028import org.apache.felix.scr.annotations.Component;
tom5f38b3a2014-08-27 23:50:54 -070029import org.apache.felix.scr.annotations.Deactivate;
tom94bb4a42014-08-27 22:12:02 -070030import org.apache.felix.scr.annotations.Service;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070031import org.onlab.util.SharedExecutors;
Brian O'Connorabafb502014-12-02 22:26:20 -080032import org.onosproject.event.AbstractEvent;
33import org.onosproject.event.DefaultEventSinkRegistry;
34import org.onosproject.event.Event;
35import org.onosproject.event.EventDeliveryService;
36import org.onosproject.event.EventSink;
Ray Milkeyce48f962016-10-20 15:15:49 -070037import org.onosproject.net.device.DeviceEvent;
38import org.onosproject.net.flow.FlowRuleEvent;
39import org.onosproject.net.host.HostEvent;
40import org.onosproject.net.intent.IntentEvent;
41import org.onosproject.net.link.LinkEvent;
42import org.onosproject.net.topology.TopologyEvent;
tom5f38b3a2014-08-27 23:50:54 -070043import org.slf4j.Logger;
tom94bb4a42014-08-27 22:12:02 -070044
Madan Jampani6a292312016-06-24 09:19:59 -070045import com.google.common.base.Stopwatch;
Ray Milkeyce48f962016-10-20 15:15:49 -070046import com.google.common.collect.ImmutableMap;
47import com.google.common.collect.ImmutableSet;
tom5f38b3a2014-08-27 23:50:54 -070048
Thomas Vachuska36002e62015-05-19 16:12:29 -070049import static com.google.common.base.Preconditions.checkArgument;
tom5f38b3a2014-08-27 23:50:54 -070050import static java.util.concurrent.Executors.newSingleThreadExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080051import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090052import static org.onosproject.security.AppGuard.checkPermission;
Jonathan Hart943893f2016-04-08 13:38:54 -070053import static org.onosproject.security.AppPermission.Type.EVENT_READ;
54import static org.onosproject.security.AppPermission.Type.EVENT_WRITE;
55import static org.slf4j.LoggerFactory.getLogger;
tom94bb4a42014-08-27 22:12:02 -070056/**
57 * Simple implementation of an event dispatching service.
58 */
59@Component(immediate = true)
60@Service
tom202175a2014-09-19 19:00:11 -070061public class CoreEventDispatcher extends DefaultEventSinkRegistry
tom96dfcab2014-08-28 09:26:03 -070062 implements EventDeliveryService {
tom94bb4a42014-08-27 22:12:02 -070063
tom5f38b3a2014-08-27 23:50:54 -070064 private final Logger log = getLogger(getClass());
65
Ray Milkeyce48f962016-10-20 15:15:49 -070066
67 private DispatchLoop topologyDispatcher = new DispatchLoop("topology");
68 private DispatchLoop programmingDispatcher = new DispatchLoop("programming");
69 private DispatchLoop defaultDispatcher = new DispatchLoop("default");
70
71 private Map<Class, DispatchLoop> dispatcherMap =
72 new ImmutableMap.Builder<Class, DispatchLoop>()
73 .put(TopologyEvent.class, topologyDispatcher)
74 .put(DeviceEvent.class, topologyDispatcher)
75 .put(LinkEvent.class, topologyDispatcher)
76 .put(HostEvent.class, topologyDispatcher)
77 .put(FlowRuleEvent.class, programmingDispatcher)
78 .put(IntentEvent.class, programmingDispatcher)
79 .build();
80
81 private Set<DispatchLoop> dispatchers =
82 new ImmutableSet.Builder<DispatchLoop>()
83 .addAll(dispatcherMap.values())
84 .add(defaultDispatcher)
85 .build();
Jonathan Hart943893f2016-04-08 13:38:54 -070086
Thomas Vachuska36002e62015-05-19 16:12:29 -070087 // Default number of millis a sink can take to process an event.
Thomas Vachuska409b9cb2015-07-31 13:07:12 -070088 private static final long DEFAULT_EXECUTE_MS = 5_000; // ms
Thomas Vachuska36002e62015-05-19 16:12:29 -070089 private static final long WATCHDOG_MS = 250; // ms
90
tom5f38b3a2014-08-27 23:50:54 -070091 @SuppressWarnings("unchecked")
92 private static final Event KILL_PILL = new AbstractEvent(null, 0) {
93 };
94
Thomas Vachuska36002e62015-05-19 16:12:29 -070095 private long maxProcessMillis = DEFAULT_EXECUTE_MS;
tom5f38b3a2014-08-27 23:50:54 -070096
Ray Milkeyce48f962016-10-20 15:15:49 -070097 private DispatchLoop getDispatcher(Event event) {
98 DispatchLoop dispatcher = dispatcherMap.get(event.getClass());
99 if (dispatcher == null) {
100 dispatcher = defaultDispatcher;
101 }
102 return dispatcher;
103 }
tom94bb4a42014-08-27 22:12:02 -0700104
105 @Override
106 public void post(Event event) {
Ray Milkeyce48f962016-10-20 15:15:49 -0700107
108 if (!getDispatcher(event).add(event)) {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700109 log.error("Unable to post event {}", event);
110 }
tom94bb4a42014-08-27 22:12:02 -0700111 }
112
tom5f38b3a2014-08-27 23:50:54 -0700113 @Activate
114 public void activate() {
Jonathan Hart943893f2016-04-08 13:38:54 -0700115
116 if (maxProcessMillis != 0) {
Ray Milkeyce48f962016-10-20 15:15:49 -0700117 dispatchers.forEach(DispatchLoop::startWatchdog);
Jonathan Hart943893f2016-04-08 13:38:54 -0700118 }
119
tom5f38b3a2014-08-27 23:50:54 -0700120 log.info("Started");
tom94bb4a42014-08-27 22:12:02 -0700121 }
122
tom5f38b3a2014-08-27 23:50:54 -0700123 @Deactivate
124 public void deactivate() {
Ray Milkeyce48f962016-10-20 15:15:49 -0700125 dispatchers.forEach(DispatchLoop::stop);
126
tom5f38b3a2014-08-27 23:50:54 -0700127 log.info("Stopped");
tom94bb4a42014-08-27 22:12:02 -0700128 }
129
Thomas Vachuska36002e62015-05-19 16:12:29 -0700130 @Override
131 public void setDispatchTimeLimit(long millis) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900132 checkPermission(EVENT_WRITE);
Jonathan Hart943893f2016-04-08 13:38:54 -0700133 checkArgument(millis == 0 || millis >= WATCHDOG_MS,
Thomas Vachuska36002e62015-05-19 16:12:29 -0700134 "Time limit must be greater than %s", WATCHDOG_MS);
Jonathan Hart943893f2016-04-08 13:38:54 -0700135 long oldMillis = maxProcessMillis;
Thomas Vachuska36002e62015-05-19 16:12:29 -0700136 maxProcessMillis = millis;
Jonathan Hart943893f2016-04-08 13:38:54 -0700137
138 if (millis == 0 && oldMillis != 0) {
Ray Milkeyce48f962016-10-20 15:15:49 -0700139 dispatchers.forEach(DispatchLoop::stopWatchdog);
Jonathan Hart943893f2016-04-08 13:38:54 -0700140 } else if (millis != 0 && oldMillis == 0) {
Ray Milkeyce48f962016-10-20 15:15:49 -0700141 dispatchers.forEach(DispatchLoop::startWatchdog);
Jonathan Hart943893f2016-04-08 13:38:54 -0700142 }
Thomas Vachuska36002e62015-05-19 16:12:29 -0700143 }
144
145 @Override
146 public long getDispatchTimeLimit() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900147 checkPermission(EVENT_READ);
Thomas Vachuska36002e62015-05-19 16:12:29 -0700148 return maxProcessMillis;
149 }
150
tom5f38b3a2014-08-27 23:50:54 -0700151 // Auxiliary event dispatching loop that feeds off the events queue.
152 private class DispatchLoop implements Runnable {
Ray Milkeyce48f962016-10-20 15:15:49 -0700153 private final String name;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700154 private volatile boolean stopped;
Ray Milkeyce48f962016-10-20 15:15:49 -0700155 private volatile EventSink lastSink;
156 // Means to detect long-running sinks
157 private final Stopwatch stopwatch = Stopwatch.createUnstarted();
158 private TimerTask watchdog;
159 private volatile Future<?> dispatchFuture;
160 private final BlockingQueue<Event> eventsQueue;
161 private final ExecutorService executor;
162
163 DispatchLoop(String name) {
164 this.name = name;
165 executor = newSingleThreadExecutor(
166 groupedThreads("onos/event",
167 "dispatch-" + name + "%d", log));
168 eventsQueue = new LinkedBlockingQueue<>();
169 dispatchFuture = executor.submit(this);
170 }
171
172 public boolean add(Event event) {
173 return eventsQueue.add(event);
174 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700175
tom5f38b3a2014-08-27 23:50:54 -0700176 @Override
tom5f38b3a2014-08-27 23:50:54 -0700177 public void run() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700178 stopped = false;
jaegonkima9ff68b2017-08-20 09:08:22 +0900179 log.info("Dispatch loop({}) initiated", name);
tom5f38b3a2014-08-27 23:50:54 -0700180 while (!stopped) {
181 try {
182 // Fetch the next event and if it is the kill-pill, bail
Ray Milkeyce48f962016-10-20 15:15:49 -0700183 Event event = eventsQueue.take();
jaegonkima9ff68b2017-08-20 09:08:22 +0900184 if (event != KILL_PILL) {
185 process(event);
tom5f38b3a2014-08-27 23:50:54 -0700186 }
Thomas Vachuska409b9cb2015-07-31 13:07:12 -0700187 } catch (InterruptedException e) {
188 log.warn("Dispatch loop interrupted");
Jonathan Hartb68919e2016-02-05 15:22:36 -0800189 } catch (Exception | Error e) {
tom5f38b3a2014-08-27 23:50:54 -0700190 log.warn("Error encountered while dispatching event:", e);
191 }
192 }
jaegonkima9ff68b2017-08-20 09:08:22 +0900193 log.info("Dispatch loop({}) terminated", name);
tom5f38b3a2014-08-27 23:50:54 -0700194 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700195
196 // Locate the sink for the event class and use it to process the event
197 @SuppressWarnings("unchecked")
198 private void process(Event event) {
199 EventSink sink = getSink(event.getClass());
200 if (sink != null) {
201 lastSink = sink;
Madan Jampani6a292312016-06-24 09:19:59 -0700202 stopwatch.start();
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700203 sink.process(event);
Madan Jampani6a292312016-06-24 09:19:59 -0700204 stopwatch.reset();
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700205 } else {
206 log.warn("No sink registered for event class {}",
Thomas Vachuska36002e62015-05-19 16:12:29 -0700207 event.getClass().getName());
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700208 }
209 }
210
211 void stop() {
212 stopped = true;
Ray Milkeyce48f962016-10-20 15:15:49 -0700213 add(KILL_PILL);
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700214 }
tom94bb4a42014-08-27 22:12:02 -0700215
Tian Jian37964762017-07-28 16:06:16 +0800216 void restart() {
217 dispatchFuture.cancel(true);
218 dispatchFuture = executor.submit(this);
219 }
220
Ray Milkeyce48f962016-10-20 15:15:49 -0700221 // Monitors event sinks to make sure none take too long to execute.
222 private class Watchdog extends TimerTask {
223 @Override
224 public void run() {
225 long elapsedTimeMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS);
226 if (elapsedTimeMillis > maxProcessMillis) {
227 stopwatch.reset();
228 log.warn("Event sink {} exceeded execution time limit: {} ms; " +
229 "spawning new dispatch loop",
230 lastSink.getClass().getName(), elapsedTimeMillis);
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700231
Ray Milkeyce48f962016-10-20 15:15:49 -0700232 // Notify the sink that it has exceeded its time limit.
233 lastSink.onProcessLimit();
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700234
Ray Milkeyce48f962016-10-20 15:15:49 -0700235 // Cancel the old dispatch loop and submit a new one.
236
Tian Jian37964762017-07-28 16:06:16 +0800237 stop();
238 restart();
Ray Milkeyce48f962016-10-20 15:15:49 -0700239 }
240 }
241 }
242
243 private void startWatchdog() {
244 log.info("Starting watchdog task for dispatcher {}", name);
245 watchdog = new Watchdog();
246 SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
247 }
248
249 private void stopWatchdog() {
250 log.info("Stopping watchdog task for dispatcher {}", name);
251 if (watchdog != null) {
252 watchdog.cancel();
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700253 }
254 }
255 }
Ray Milkeyce48f962016-10-20 15:15:49 -0700256
257
tom94bb4a42014-08-27 22:12:02 -0700258}