blob: d12ba17ca8900dfad813d639266e6b2c380dc993 [file] [log] [blame]
Thomas Vachuskab0317c62015-04-08 23:58:58 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Thomas Vachuskab0317c62015-04-08 23:58:58 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
Jian Li66865682016-03-02 11:43:09 -080018import com.codahale.metrics.Timer;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080019import org.onlab.metrics.MetricsComponent;
20import org.onlab.metrics.MetricsFeature;
21import org.onlab.metrics.MetricsService;
Jian Li54526b42016-03-02 19:36:53 -080022import org.slf4j.Logger;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080023
Thomas Vachuskab0317c62015-04-08 23:58:58 -070024import java.util.Collection;
25import java.util.List;
26import java.util.concurrent.Callable;
27import java.util.concurrent.ExecutionException;
28import java.util.concurrent.ExecutorService;
29import java.util.concurrent.Future;
30import java.util.concurrent.TimeUnit;
31import java.util.concurrent.TimeoutException;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080032
Jian Li66865682016-03-02 11:43:09 -080033import static org.slf4j.LoggerFactory.getLogger;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080034
Thomas Vachuskab0317c62015-04-08 23:58:58 -070035
36/**
37 * Executor service wrapper for shared executors with safeguards on shutdown
38 * to prevent inadvertent shutdown.
39 */
40class SharedExecutorService implements ExecutorService {
41
42 private static final String NOT_ALLOWED = "Shutdown of shared executor is not allowed";
Jian Li54526b42016-03-02 19:36:53 -080043 private final Logger log = getLogger(getClass());
Thomas Vachuskab0317c62015-04-08 23:58:58 -070044
45 private ExecutorService executor;
46
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080047 private MetricsService metricsService = null;
48
49 private MetricsComponent executorMetrics;
50 private Timer queueMetrics = null;
51 private Timer delayMetrics = null;
52
53
Thomas Vachuskab0317c62015-04-08 23:58:58 -070054 /**
55 * Creates a wrapper for the given executor service.
56 *
57 * @param executor executor service to wrap
58 */
59 SharedExecutorService(ExecutorService executor) {
60 this.executor = executor;
61 }
62
63 /**
64 * Returns the backing executor service.
65 *
66 * @return backing executor service
67 */
68 ExecutorService backingExecutor() {
69 return executor;
70 }
71
72 /**
73 * Swaps the backing executor with a new one and shuts down the old one.
74 *
75 * @param executor new executor service
76 */
77 void setBackingExecutor(ExecutorService executor) {
78 ExecutorService oldExecutor = this.executor;
79 this.executor = executor;
80 oldExecutor.shutdown();
81 }
82
83 @Override
84 public void shutdown() {
85 throw new UnsupportedOperationException(NOT_ALLOWED);
86 }
87
88 @Override
89 public List<Runnable> shutdownNow() {
90 throw new UnsupportedOperationException(NOT_ALLOWED);
91 }
92
93 @Override
94 public boolean isShutdown() {
95 return executor.isShutdown();
96 }
97
98 @Override
99 public boolean isTerminated() {
100 return executor.isTerminated();
101 }
102
103 @Override
104 public boolean awaitTermination(long timeout, TimeUnit unit)
105 throws InterruptedException {
106 return executor.awaitTermination(timeout, unit);
107 }
108
109 @Override
110 public <T> Future<T> submit(Callable<T> task) {
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800111 Counter taskCounter = new Counter();
112 taskCounter.reset();
113 return executor.submit(() -> {
Thomas Vachuska0666f152016-08-05 12:03:54 -0700114 T t = null;
115 long queueWaitTime = (long) taskCounter.duration();
116 Class className;
117 if (task instanceof CallableExtended) {
118 className = ((CallableExtended) task).getRunnable().getClass();
119 } else {
120 className = task.getClass();
121 }
122 if (queueMetrics != null) {
123 queueMetrics.update(queueWaitTime, TimeUnit.SECONDS);
124 }
125 taskCounter.reset();
126 try {
127 t = task.call();
128 } catch (Exception e) {
129 getLogger(className).error("Uncaught exception on " + className, e);
130 }
131 long taskwaittime = (long) taskCounter.duration();
132 if (delayMetrics != null) {
133 delayMetrics.update(taskwaittime, TimeUnit.SECONDS);
134 }
135 return t;
136 });
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700137 }
138
139 @Override
140 public <T> Future<T> submit(Runnable task, T result) {
Jian Li54526b42016-03-02 19:36:53 -0800141 return executor.submit(wrap(task), result);
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700142 }
143
144 @Override
145 public Future<?> submit(Runnable task) {
Jian Li54526b42016-03-02 19:36:53 -0800146 return executor.submit(wrap(task));
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700147 }
148
149 @Override
150 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
151 throws InterruptedException {
152 return executor.invokeAll(tasks);
153 }
154
155 @Override
156 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
157 long timeout, TimeUnit unit)
158 throws InterruptedException {
159 return executor.invokeAll(tasks, timeout, unit);
160 }
161
162 @Override
163 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
164 throws InterruptedException, ExecutionException {
165 return executor.invokeAny(tasks);
166 }
167
168 @Override
169 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
170 long timeout, TimeUnit unit)
171 throws InterruptedException, ExecutionException, TimeoutException {
HIGUCHI Yuta1651e982015-09-04 13:45:08 -0700172 return executor.invokeAny(tasks, timeout, unit);
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700173 }
174
175 @Override
176 public void execute(Runnable command) {
177 executor.execute(command);
178 }
179
Thomas Vachuska0666f152016-08-05 12:03:54 -0700180 /**
181 * Enables or disables calculation of the pool performance metrics. If
182 * the metrics service is not null metric collection will be enabled;
183 * otherwise it will be disabled.
184 *
185 * @param metricsService optional metric service
186 */
187 public void setMetricsService(MetricsService metricsService) {
188 if (this.metricsService == null && metricsService != null) {
189 // If metrics service was newly introduced, initialize metrics.
190 executorMetrics = metricsService.registerComponent("SharedExecutor");
191 MetricsFeature mf = executorMetrics.registerFeature("*");
192 queueMetrics = metricsService.createTimer(executorMetrics, mf, "Queue");
193 delayMetrics = metricsService.createTimer(executorMetrics, mf, "Delay");
194 } else if (this.metricsService != null && metricsService == null) {
195 // If the metrics service was newly withdrawn, tear-down metrics.
196 queueMetrics = null;
197 delayMetrics = null;
198 } // Otherwise just record the metrics service
199 this.metricsService = metricsService;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800200 }
201
Jian Li54526b42016-03-02 19:36:53 -0800202 private Runnable wrap(Runnable command) {
203 return new LoggableRunnable(command);
204 }
205
206 /**
207 * A runnable class that allows to capture and log the exceptions.
208 */
209 private class LoggableRunnable implements Runnable {
210
211 private Runnable runnable;
212
213 public LoggableRunnable(Runnable runnable) {
214 super();
215 this.runnable = runnable;
216 }
217
218 @Override
219 public void run() {
220 try {
221 runnable.run();
222 } catch (Exception e) {
223 log.error("Uncaught exception on " + runnable.getClass().getSimpleName(), e);
Ray Milkey6a51cb92018-03-06 09:03:03 -0800224 throw new IllegalStateException(e);
Jian Li54526b42016-03-02 19:36:53 -0800225 }
226 }
227 }
228
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800229 /**
Thomas Vachuska0666f152016-08-05 12:03:54 -0700230 * CallableExtended class is used to get Runnable Object
231 * from Callable Object.
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800232 */
233 class CallableExtended implements Callable {
234
235 private Runnable runnable;
236
237 /**
238 * Wrapper for Callable object .
Thomas Vachuska0666f152016-08-05 12:03:54 -0700239 *
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800240 * @param runnable Runnable object
241 */
242 public CallableExtended(Runnable runnable) {
243 this.runnable = runnable;
244 }
Thomas Vachuska0666f152016-08-05 12:03:54 -0700245
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800246 public Runnable getRunnable() {
247 return runnable;
248 }
249
250 @Override
251 public Object call() throws Exception {
252 runnable.run();
253 return null;
254 }
255 }
256
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700257}