blob: 12e922a9bdf97847959d54bc7480d91fa1348848 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Brian O'Connor5ab426f2016-04-09 01:19:45 -07002 * Copyright 2014-present Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.event.impl;
tom94bb4a42014-08-27 22:12:02 -070017
tom5f38b3a2014-08-27 23:50:54 -070018import org.apache.felix.scr.annotations.Activate;
tom94bb4a42014-08-27 22:12:02 -070019import org.apache.felix.scr.annotations.Component;
tom5f38b3a2014-08-27 23:50:54 -070020import org.apache.felix.scr.annotations.Deactivate;
tom94bb4a42014-08-27 22:12:02 -070021import org.apache.felix.scr.annotations.Service;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070022import org.onlab.util.SharedExecutors;
Brian O'Connorabafb502014-12-02 22:26:20 -080023import org.onosproject.event.AbstractEvent;
24import org.onosproject.event.DefaultEventSinkRegistry;
25import org.onosproject.event.Event;
26import org.onosproject.event.EventDeliveryService;
27import org.onosproject.event.EventSink;
tom5f38b3a2014-08-27 23:50:54 -070028import org.slf4j.Logger;
tom94bb4a42014-08-27 22:12:02 -070029
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070030import java.util.TimerTask;
tom5f38b3a2014-08-27 23:50:54 -070031import java.util.concurrent.BlockingQueue;
tom94bb4a42014-08-27 22:12:02 -070032import java.util.concurrent.ExecutorService;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070033import java.util.concurrent.Future;
tom5f38b3a2014-08-27 23:50:54 -070034import java.util.concurrent.LinkedBlockingQueue;
35
Thomas Vachuska36002e62015-05-19 16:12:29 -070036import static com.google.common.base.Preconditions.checkArgument;
tom5f38b3a2014-08-27 23:50:54 -070037import static java.util.concurrent.Executors.newSingleThreadExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080038import static org.onlab.util.Tools.groupedThreads;
Heedo Kang4a47a302016-02-29 17:40:23 +090039import static org.onosproject.security.AppGuard.checkPermission;
Jonathan Hart943893f2016-04-08 13:38:54 -070040import static org.onosproject.security.AppPermission.Type.EVENT_READ;
41import static org.onosproject.security.AppPermission.Type.EVENT_WRITE;
42import static org.slf4j.LoggerFactory.getLogger;
tom94bb4a42014-08-27 22:12:02 -070043/**
44 * Simple implementation of an event dispatching service.
45 */
46@Component(immediate = true)
47@Service
tom202175a2014-09-19 19:00:11 -070048public class CoreEventDispatcher extends DefaultEventSinkRegistry
tom96dfcab2014-08-28 09:26:03 -070049 implements EventDeliveryService {
tom94bb4a42014-08-27 22:12:02 -070050
tom5f38b3a2014-08-27 23:50:54 -070051 private final Logger log = getLogger(getClass());
52
Jonathan Hart943893f2016-04-08 13:38:54 -070053 private boolean executionTimeLimit = false;
54
Thomas Vachuska36002e62015-05-19 16:12:29 -070055 // Default number of millis a sink can take to process an event.
Thomas Vachuska409b9cb2015-07-31 13:07:12 -070056 private static final long DEFAULT_EXECUTE_MS = 5_000; // ms
Thomas Vachuska36002e62015-05-19 16:12:29 -070057 private static final long WATCHDOG_MS = 250; // ms
58
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070059 private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
60
tom5f38b3a2014-08-27 23:50:54 -070061 private final ExecutorService executor =
HIGUCHI Yutad9e01052016-04-14 09:31:42 -070062 newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d", log));
tom5f38b3a2014-08-27 23:50:54 -070063
64 @SuppressWarnings("unchecked")
65 private static final Event KILL_PILL = new AbstractEvent(null, 0) {
66 };
67
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070068 private DispatchLoop dispatchLoop;
Thomas Vachuska36002e62015-05-19 16:12:29 -070069 private long maxProcessMillis = DEFAULT_EXECUTE_MS;
tom5f38b3a2014-08-27 23:50:54 -070070
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070071 // Means to detect long-running sinks
72 private TimerTask watchdog;
73 private EventSink lastSink;
74 private long lastStart = 0;
75 private Future<?> dispatchFuture;
tom94bb4a42014-08-27 22:12:02 -070076
77 @Override
78 public void post(Event event) {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070079 if (!events.add(event)) {
80 log.error("Unable to post event {}", event);
81 }
tom94bb4a42014-08-27 22:12:02 -070082 }
83
tom5f38b3a2014-08-27 23:50:54 -070084 @Activate
85 public void activate() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070086 dispatchLoop = new DispatchLoop();
87 dispatchFuture = executor.submit(dispatchLoop);
Jonathan Hart943893f2016-04-08 13:38:54 -070088
89 if (maxProcessMillis != 0) {
90 startWatchdog();
91 }
92
tom5f38b3a2014-08-27 23:50:54 -070093 log.info("Started");
tom94bb4a42014-08-27 22:12:02 -070094 }
95
tom5f38b3a2014-08-27 23:50:54 -070096 @Deactivate
97 public void deactivate() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070098 dispatchLoop.stop();
Jonathan Hart943893f2016-04-08 13:38:54 -070099 stopWatchdog();
tom5f38b3a2014-08-27 23:50:54 -0700100 post(KILL_PILL);
101 log.info("Stopped");
tom94bb4a42014-08-27 22:12:02 -0700102 }
103
Jonathan Hart943893f2016-04-08 13:38:54 -0700104 private void startWatchdog() {
105 log.info("Starting watchdog task");
106 watchdog = new Watchdog();
107 SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
108 }
109
110 private void stopWatchdog() {
111 log.info("Stopping watchdog task");
112 if (watchdog != null) {
113 watchdog.cancel();
114 }
115 }
116
Thomas Vachuska36002e62015-05-19 16:12:29 -0700117 @Override
118 public void setDispatchTimeLimit(long millis) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900119 checkPermission(EVENT_WRITE);
Jonathan Hart943893f2016-04-08 13:38:54 -0700120 checkArgument(millis == 0 || millis >= WATCHDOG_MS,
Thomas Vachuska36002e62015-05-19 16:12:29 -0700121 "Time limit must be greater than %s", WATCHDOG_MS);
Jonathan Hart943893f2016-04-08 13:38:54 -0700122 long oldMillis = maxProcessMillis;
Thomas Vachuska36002e62015-05-19 16:12:29 -0700123 maxProcessMillis = millis;
Jonathan Hart943893f2016-04-08 13:38:54 -0700124
125 if (millis == 0 && oldMillis != 0) {
126 stopWatchdog();
127 } else if (millis != 0 && oldMillis == 0) {
128 startWatchdog();
129 }
Thomas Vachuska36002e62015-05-19 16:12:29 -0700130 }
131
132 @Override
133 public long getDispatchTimeLimit() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900134 checkPermission(EVENT_READ);
Thomas Vachuska36002e62015-05-19 16:12:29 -0700135 return maxProcessMillis;
136 }
137
tom5f38b3a2014-08-27 23:50:54 -0700138 // Auxiliary event dispatching loop that feeds off the events queue.
139 private class DispatchLoop implements Runnable {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700140 private volatile boolean stopped;
141
tom5f38b3a2014-08-27 23:50:54 -0700142 @Override
tom5f38b3a2014-08-27 23:50:54 -0700143 public void run() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700144 stopped = false;
tom5f38b3a2014-08-27 23:50:54 -0700145 log.info("Dispatch loop initiated");
146 while (!stopped) {
147 try {
148 // Fetch the next event and if it is the kill-pill, bail
149 Event event = events.take();
150 if (event == KILL_PILL) {
151 break;
152 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700153 process(event);
Thomas Vachuska409b9cb2015-07-31 13:07:12 -0700154 } catch (InterruptedException e) {
155 log.warn("Dispatch loop interrupted");
Jonathan Hartb68919e2016-02-05 15:22:36 -0800156 } catch (Exception | Error e) {
tom5f38b3a2014-08-27 23:50:54 -0700157 log.warn("Error encountered while dispatching event:", e);
158 }
159 }
160 log.info("Dispatch loop terminated");
161 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700162
163 // Locate the sink for the event class and use it to process the event
164 @SuppressWarnings("unchecked")
165 private void process(Event event) {
166 EventSink sink = getSink(event.getClass());
167 if (sink != null) {
168 lastSink = sink;
169 lastStart = System.currentTimeMillis();
170 sink.process(event);
171 lastStart = 0;
172 } else {
173 log.warn("No sink registered for event class {}",
Thomas Vachuska36002e62015-05-19 16:12:29 -0700174 event.getClass().getName());
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700175 }
176 }
177
178 void stop() {
179 stopped = true;
180 }
tom94bb4a42014-08-27 22:12:02 -0700181 }
182
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700183 // Monitors event sinks to make sure none take too long to execute.
184 private class Watchdog extends TimerTask {
185 @Override
186 public void run() {
187 long delta = System.currentTimeMillis() - lastStart;
Thomas Vachuska36002e62015-05-19 16:12:29 -0700188 if (lastStart > 0 && delta > maxProcessMillis) {
Thomas Vachuska99c92fd2015-06-01 11:44:53 -0700189 lastStart = 0;
Thomas Vachuska8d033672015-07-21 16:15:04 -0700190 log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop",
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700191 lastSink.getClass().getName(), delta);
192
193 // Notify the sink that it has exceeded its time limit.
194 lastSink.onProcessLimit();
195
196 // Cancel the old dispatch loop and submit a new one.
197 dispatchLoop.stop();
198 dispatchLoop = new DispatchLoop();
199 dispatchFuture.cancel(true);
200 dispatchFuture = executor.submit(dispatchLoop);
201 }
202 }
203 }
tom94bb4a42014-08-27 22:12:02 -0700204}