blob: 3d9df6b364ea8d9245e1aebe007548195221ba58 [file] [log] [blame]
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07001/*
Ray Milkey34c95902015-04-15 09:47:53 -07002 * Copyright 2014-2015 Open Networking Laboratory
Thomas Vachuska4f1a60c2014-10-28 13:39:07 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Brian O'Connorabafb502014-12-02 22:26:20 -080016package org.onosproject.event.impl;
tom94bb4a42014-08-27 22:12:02 -070017
tom5f38b3a2014-08-27 23:50:54 -070018import org.apache.felix.scr.annotations.Activate;
tom94bb4a42014-08-27 22:12:02 -070019import org.apache.felix.scr.annotations.Component;
tom5f38b3a2014-08-27 23:50:54 -070020import org.apache.felix.scr.annotations.Deactivate;
tom94bb4a42014-08-27 22:12:02 -070021import org.apache.felix.scr.annotations.Service;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070022import org.onlab.util.SharedExecutors;
Brian O'Connorabafb502014-12-02 22:26:20 -080023import org.onosproject.event.AbstractEvent;
24import org.onosproject.event.DefaultEventSinkRegistry;
25import org.onosproject.event.Event;
26import org.onosproject.event.EventDeliveryService;
27import org.onosproject.event.EventSink;
tom5f38b3a2014-08-27 23:50:54 -070028import org.slf4j.Logger;
tom94bb4a42014-08-27 22:12:02 -070029
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070030import java.util.TimerTask;
tom5f38b3a2014-08-27 23:50:54 -070031import java.util.concurrent.BlockingQueue;
tom94bb4a42014-08-27 22:12:02 -070032import java.util.concurrent.ExecutorService;
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070033import java.util.concurrent.Future;
tom5f38b3a2014-08-27 23:50:54 -070034import java.util.concurrent.LinkedBlockingQueue;
35
Thomas Vachuska36002e62015-05-19 16:12:29 -070036import static com.google.common.base.Preconditions.checkArgument;
tom5f38b3a2014-08-27 23:50:54 -070037import static java.util.concurrent.Executors.newSingleThreadExecutor;
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080038import static org.onlab.util.Tools.groupedThreads;
tom5f38b3a2014-08-27 23:50:54 -070039import static org.slf4j.LoggerFactory.getLogger;
tom94bb4a42014-08-27 22:12:02 -070040
Heedo Kang4a47a302016-02-29 17:40:23 +090041import static org.onosproject.security.AppGuard.checkPermission;
42import static org.onosproject.security.AppPermission.Type.*;
tom94bb4a42014-08-27 22:12:02 -070043/**
44 * Simple implementation of an event dispatching service.
45 */
46@Component(immediate = true)
47@Service
tom202175a2014-09-19 19:00:11 -070048public class CoreEventDispatcher extends DefaultEventSinkRegistry
tom96dfcab2014-08-28 09:26:03 -070049 implements EventDeliveryService {
tom94bb4a42014-08-27 22:12:02 -070050
tom5f38b3a2014-08-27 23:50:54 -070051 private final Logger log = getLogger(getClass());
52
Thomas Vachuska36002e62015-05-19 16:12:29 -070053 // Default number of millis a sink can take to process an event.
Thomas Vachuska409b9cb2015-07-31 13:07:12 -070054 private static final long DEFAULT_EXECUTE_MS = 5_000; // ms
Thomas Vachuska36002e62015-05-19 16:12:29 -070055 private static final long WATCHDOG_MS = 250; // ms
56
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070057 private final BlockingQueue<Event> events = new LinkedBlockingQueue<>();
58
tom5f38b3a2014-08-27 23:50:54 -070059 private final ExecutorService executor =
Thomas Vachuska6f94ded2015-02-21 14:02:38 -080060 newSingleThreadExecutor(groupedThreads("onos/event", "dispatch-%d"));
tom5f38b3a2014-08-27 23:50:54 -070061
62 @SuppressWarnings("unchecked")
63 private static final Event KILL_PILL = new AbstractEvent(null, 0) {
64 };
65
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070066 private DispatchLoop dispatchLoop;
Thomas Vachuska36002e62015-05-19 16:12:29 -070067 private long maxProcessMillis = DEFAULT_EXECUTE_MS;
tom5f38b3a2014-08-27 23:50:54 -070068
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070069 // Means to detect long-running sinks
70 private TimerTask watchdog;
71 private EventSink lastSink;
72 private long lastStart = 0;
73 private Future<?> dispatchFuture;
tom94bb4a42014-08-27 22:12:02 -070074
75 @Override
76 public void post(Event event) {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070077 if (!events.add(event)) {
78 log.error("Unable to post event {}", event);
79 }
tom94bb4a42014-08-27 22:12:02 -070080 }
81
tom5f38b3a2014-08-27 23:50:54 -070082 @Activate
83 public void activate() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070084 dispatchLoop = new DispatchLoop();
85 dispatchFuture = executor.submit(dispatchLoop);
86 watchdog = new Watchdog();
87 SharedExecutors.getTimer().schedule(watchdog, WATCHDOG_MS, WATCHDOG_MS);
tom5f38b3a2014-08-27 23:50:54 -070088 log.info("Started");
tom94bb4a42014-08-27 22:12:02 -070089 }
90
tom5f38b3a2014-08-27 23:50:54 -070091 @Deactivate
92 public void deactivate() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -070093 dispatchLoop.stop();
94 watchdog.cancel();
tom5f38b3a2014-08-27 23:50:54 -070095 post(KILL_PILL);
96 log.info("Stopped");
tom94bb4a42014-08-27 22:12:02 -070097 }
98
Thomas Vachuska36002e62015-05-19 16:12:29 -070099 @Override
100 public void setDispatchTimeLimit(long millis) {
Heedo Kang4a47a302016-02-29 17:40:23 +0900101 checkPermission(EVENT_WRITE);
Thomas Vachuska36002e62015-05-19 16:12:29 -0700102 checkArgument(millis >= WATCHDOG_MS,
103 "Time limit must be greater than %s", WATCHDOG_MS);
104 maxProcessMillis = millis;
105 }
106
107 @Override
108 public long getDispatchTimeLimit() {
Heedo Kang4a47a302016-02-29 17:40:23 +0900109 checkPermission(EVENT_READ);
Thomas Vachuska36002e62015-05-19 16:12:29 -0700110 return maxProcessMillis;
111 }
112
tom5f38b3a2014-08-27 23:50:54 -0700113 // Auxiliary event dispatching loop that feeds off the events queue.
114 private class DispatchLoop implements Runnable {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700115 private volatile boolean stopped;
116
tom5f38b3a2014-08-27 23:50:54 -0700117 @Override
tom5f38b3a2014-08-27 23:50:54 -0700118 public void run() {
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700119 stopped = false;
tom5f38b3a2014-08-27 23:50:54 -0700120 log.info("Dispatch loop initiated");
121 while (!stopped) {
122 try {
123 // Fetch the next event and if it is the kill-pill, bail
124 Event event = events.take();
125 if (event == KILL_PILL) {
126 break;
127 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700128 process(event);
Thomas Vachuska409b9cb2015-07-31 13:07:12 -0700129 } catch (InterruptedException e) {
130 log.warn("Dispatch loop interrupted");
Jonathan Hartb68919e2016-02-05 15:22:36 -0800131 } catch (Exception | Error e) {
tom5f38b3a2014-08-27 23:50:54 -0700132 log.warn("Error encountered while dispatching event:", e);
133 }
134 }
135 log.info("Dispatch loop terminated");
136 }
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700137
138 // Locate the sink for the event class and use it to process the event
139 @SuppressWarnings("unchecked")
140 private void process(Event event) {
141 EventSink sink = getSink(event.getClass());
142 if (sink != null) {
143 lastSink = sink;
144 lastStart = System.currentTimeMillis();
145 sink.process(event);
146 lastStart = 0;
147 } else {
148 log.warn("No sink registered for event class {}",
Thomas Vachuska36002e62015-05-19 16:12:29 -0700149 event.getClass().getName());
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700150 }
151 }
152
153 void stop() {
154 stopped = true;
155 }
tom94bb4a42014-08-27 22:12:02 -0700156 }
157
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700158 // Monitors event sinks to make sure none take too long to execute.
159 private class Watchdog extends TimerTask {
160 @Override
161 public void run() {
162 long delta = System.currentTimeMillis() - lastStart;
Thomas Vachuska36002e62015-05-19 16:12:29 -0700163 if (lastStart > 0 && delta > maxProcessMillis) {
Thomas Vachuska99c92fd2015-06-01 11:44:53 -0700164 lastStart = 0;
Thomas Vachuska8d033672015-07-21 16:15:04 -0700165 log.warn("Event sink {} exceeded execution time limit: {} ms; spawning new dispatch loop",
Thomas Vachuskab17c41f2015-05-19 11:16:05 -0700166 lastSink.getClass().getName(), delta);
167
168 // Notify the sink that it has exceeded its time limit.
169 lastSink.onProcessLimit();
170
171 // Cancel the old dispatch loop and submit a new one.
172 dispatchLoop.stop();
173 dispatchLoop = new DispatchLoop();
174 dispatchFuture.cancel(true);
175 dispatchFuture = executor.submit(dispatchLoop);
176 }
177 }
178 }
tom94bb4a42014-08-27 22:12:02 -0700179}