blob: e44709c9141ad80f8be501c9166ed819477111a0 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2014-present Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.event.impl;
tom94bb4a42014-08-27 22:12:02 -070017
tom5f38b3a2014-08-27 23:50:54 -070018import org.apache.felix.scr.annotations.Activate;
tom94bb4a42014-08-27 22:12:02 -070019import org.apache.felix.scr.annotations.Component;
tom5f38b3a2014-08-27 23:50:54 -070020import org.apache.felix.scr.annotations.Deactivate;
tom94bb4a42014-08-27 22:12:02 -070021import org.apache.felix.scr.annotations.Service;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070022import org.onlab.util.SharedExecutors;
Brian O'Connorabafb502014-12-02 22:26:20 -080023import org.onosproject.event.AbstractEvent;
24import org.onosproject.event.DefaultEventSinkRegistry;
25import org.onosproject.event.Event;
26import org.onosproject.event.EventDeliveryService;
27import org.onosproject.event.EventSink;
tom5f38b3a2014-08-27 23:50:54 -070028import org.slf4j.Logger;
tom94bb4a42014-08-27 22:12:02 -070029
Madan Jampani6a292312016-06-24 09:19:59 -070030import com.google.common.base.Stopwatch;
31
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070032import java.util.TimerTask;
tom5f38b3a2014-08-27 23:50:54 -070033import java.util.concurrent.BlockingQueue;
tom94bb4a42014-08-27 22:12:02 -070034import java.util.concurrent.ExecutorService;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070035import java.util.concurrent.Future;
tom5f38b3a2014-08-27 23:50:54 -070036import java.util.concurrent.LinkedBlockingQueue;
Madan Jampani6a292312016-06-24 09:19:59 -070037import java.util.concurrent.TimeUnit;
tom5f38b3a2014-08-27 23:50:54 -070038
Thomas Vachuska36002e62015-05-19 16:12:29 -070039import static com.google.common.base.Preconditions.checkArgument;
tom5f38b3a2014-08-27 23:50:54 -070040import static java.util.concurrent.Executors.newSingleThreadExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080041import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090042import static org.onosproject.security.AppGuard.checkPermission;
Jonathan Hart943893f2016-04-08 13:38:54 -070043import static org.onosproject.security.AppPermission.Type.EVENT_READ;
44import static org.onosproject.security.AppPermission.Type.EVENT_WRITE;
45import static org.slf4j.LoggerFactory.getLogger;
tom94bb4a42014-08-27 22:12:02 -070046/**
47 * Simple implementation of an event dispatching service.
48 */
49@Component(immediate = true)
50@Service
tom202175a2014-09-19 19:00:11 -070051public class CoreEventDispatcher extends DefaultEventSinkRegistry
tom96dfcab2014-08-28 09:26:03 -070052 implements EventDeliveryService {
tom94bb4a42014-08-27 22:12:02 -070053
tom5f38b3a2014-08-27 23:50:54 -070054 private final Logger log = getLogger(getClass());
55
Jonathan Hart943893f2016-04-08 13:38:54 -070056 private boolean executionTimeLimit = false;
57
Thomas Vachuska36002e62015-05-19 16:12:29 -070058 // Default number of millis a sink can take to process an event.
Thomas Vachuska409b9cb2015-07-31 13:07:12 -070059 private static final long DEFAULT_EXECUTE_MS = 5_000; // ms
Thomas Vachuska36002e62015-05-19 16:12:29 -070060 private static final long WATCHDOG_MS = 250; // ms
61
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070062 private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
63
tom5f38b3a2014-08-27 23:50:54 -070064 private final ExecutorService executor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -070065 newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d", log));
tom5f38b3a2014-08-27 23:50:54 -070066
67 @SuppressWarnings("unchecked")
68 private static final Event KILL_PILL = new AbstractEvent(null, 0) {
69 };
70
Sho SHIMIZU72390782016-08-18 11:15:46 -070071 private volatile DispatchLoop dispatchLoop;
Thomas Vachuska36002e62015-05-19 16:12:29 -070072 private long maxProcessMillis = DEFAULT_EXECUTE_MS;
tom5f38b3a2014-08-27 23:50:54 -070073
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070074 // Means to detect long-running sinks
75 private TimerTask watchdog;
Sho SHIMIZUecfbeb62016-08-18 11:37:17 -070076 private volatile EventSink lastSink;
Madan Jampani6a292312016-06-24 09:19:59 -070077 private final Stopwatch stopwatch = Stopwatch.createUnstarted();
Sho SHIMIZUd21c4dd2016-08-18 11:26:16 -070078 private volatile Future<?> dispatchFuture;
tom94bb4a42014-08-27 22:12:02 -070079
80 @Override
81 public void post(Event event) {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070082 if (!events.add(event)) {
83 log.error("Unable to post event {}", event);
84 }
tom94bb4a42014-08-27 22:12:02 -070085 }
86
tom5f38b3a2014-08-27 23:50:54 -070087 @Activate
88 public void activate() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070089 dispatchLoop = new DispatchLoop();
90 dispatchFuture = executor.submit(dispatchLoop);
Jonathan Hart943893f2016-04-08 13:38:54 -070091
92 if (maxProcessMillis != 0) {
93 startWatchdog();
94 }
95
tom5f38b3a2014-08-27 23:50:54 -070096 log.info("Started");
tom94bb4a42014-08-27 22:12:02 -070097 }
98
tom5f38b3a2014-08-27 23:50:54 -070099 @Deactivate
100 public void deactivate() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700101 dispatchLoop.stop();
Jonathan Hart943893f2016-04-08 13:38:54 -0700102 stopWatchdog();
tom5f38b3a2014-08-27 23:50:54 -0700103 post(KILL_PILL);
104 log.info("Stopped");
tom94bb4a42014-08-27 22:12:02 -0700105 }
106
Jonathan Hart943893f2016-04-08 13:38:54 -0700107 private void startWatchdog() {
108 log.info("Starting watchdog task");
109 watchdog = new Watchdog();
110 SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
111 }
112
113 private void stopWatchdog() {
114 log.info("Stopping watchdog task");
115 if (watchdog != null) {
116 watchdog.cancel();
117 }
118 }
119
Thomas Vachuska36002e62015-05-19 16:12:29 -0700120 @Override
121 public void setDispatchTimeLimit(long millis) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900122 checkPermission(EVENT_WRITE);
Jonathan Hart943893f2016-04-08 13:38:54 -0700123 checkArgument(millis == 0 || millis >= WATCHDOG_MS,
Thomas Vachuska36002e62015-05-19 16:12:29 -0700124 "Time limit must be greater than %s", WATCHDOG_MS);
Jonathan Hart943893f2016-04-08 13:38:54 -0700125 long oldMillis = maxProcessMillis;
Thomas Vachuska36002e62015-05-19 16:12:29 -0700126 maxProcessMillis = millis;
Jonathan Hart943893f2016-04-08 13:38:54 -0700127
128 if (millis == 0 && oldMillis != 0) {
129 stopWatchdog();
130 } else if (millis != 0 && oldMillis == 0) {
131 startWatchdog();
132 }
Thomas Vachuska36002e62015-05-19 16:12:29 -0700133 }
134
135 @Override
136 public long getDispatchTimeLimit() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900137 checkPermission(EVENT_READ);
Thomas Vachuska36002e62015-05-19 16:12:29 -0700138 return maxProcessMillis;
139 }
140
tom5f38b3a2014-08-27 23:50:54 -0700141 // Auxiliary event dispatching loop that feeds off the events queue.
142 private class DispatchLoop implements Runnable {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700143 private volatile boolean stopped;
144
tom5f38b3a2014-08-27 23:50:54 -0700145 @Override
tom5f38b3a2014-08-27 23:50:54 -0700146 public void run() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700147 stopped = false;
tom5f38b3a2014-08-27 23:50:54 -0700148 log.info("Dispatch loop initiated");
149 while (!stopped) {
150 try {
151 // Fetch the next event and if it is the kill-pill, bail
152 Event event = events.take();
153 if (event == KILL_PILL) {
154 break;
155 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700156 process(event);
Thomas Vachuska409b9cb2015-07-31 13:07:12 -0700157 } catch (InterruptedException e) {
158 log.warn("Dispatch loop interrupted");
Jonathan Hartb68919e2016-02-05 15:22:36 -0800159 } catch (Exception | Error e) {
tom5f38b3a2014-08-27 23:50:54 -0700160 log.warn("Error encountered while dispatching event:", e);
161 }
162 }
163 log.info("Dispatch loop terminated");
164 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700165
166 // Locate the sink for the event class and use it to process the event
167 @SuppressWarnings("unchecked")
168 private void process(Event event) {
169 EventSink sink = getSink(event.getClass());
170 if (sink != null) {
171 lastSink = sink;
Madan Jampani6a292312016-06-24 09:19:59 -0700172 stopwatch.start();
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700173 sink.process(event);
Madan Jampani6a292312016-06-24 09:19:59 -0700174 stopwatch.reset();
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700175 } else {
176 log.warn("No sink registered for event class {}",
Thomas Vachuska36002e62015-05-19 16:12:29 -0700177 event.getClass().getName());
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700178 }
179 }
180
181 void stop() {
182 stopped = true;
183 }
tom94bb4a42014-08-27 22:12:02 -0700184 }
185
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700186 // Monitors event sinks to make sure none take too long to execute.
187 private class Watchdog extends TimerTask {
188 @Override
189 public void run() {
Madan Jampani6a292312016-06-24 09:19:59 -0700190 long elapsedTimeMillis = stopwatch.elapsed(TimeUnit.MILLISECONDS);
191 if (elapsedTimeMillis > maxProcessMillis) {
192 stopwatch.reset();
Thomas Vachuska8d033672015-07-21 16:15:04 -0700193 log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop",
Madan Jampani6a292312016-06-24 09:19:59 -0700194 lastSink.getClass().getName(), elapsedTimeMillis);
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700195
196 // Notify the sink that it has exceeded its time limit.
197 lastSink.onProcessLimit();
198
199 // Cancel the old dispatch loop and submit a new one.
200 dispatchLoop.stop();
201 dispatchLoop = new DispatchLoop();
202 dispatchFuture.cancel(true);
203 dispatchFuture = executor.submit(dispatchLoop);
204 }
205 }
206 }
tom94bb4a42014-08-27 22:12:02 -0700207}