blob: 79a0baf06217e9242f645cc228e4382669272834 [file] [log] [blame]
Thomas Vachuskab0317c62015-04-08 23:58:58 -07001/*
Brian O'Connora09fe5b2017-08-03 21:12:30 -07002 * Copyright 2015-present Open Networking Foundation
Thomas Vachuskab0317c62015-04-08 23:58:58 -07003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
Jian Li66865682016-03-02 11:43:09 -080018import com.codahale.metrics.Timer;
Jian Li54526b42016-03-02 19:36:53 -080019import com.google.common.base.Throwables;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080020import org.onlab.metrics.MetricsComponent;
21import org.onlab.metrics.MetricsFeature;
22import org.onlab.metrics.MetricsService;
Jian Li54526b42016-03-02 19:36:53 -080023import org.slf4j.Logger;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080024
Thomas Vachuskab0317c62015-04-08 23:58:58 -070025import java.util.Collection;
26import java.util.List;
27import java.util.concurrent.Callable;
28import java.util.concurrent.ExecutionException;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Future;
31import java.util.concurrent.TimeUnit;
32import java.util.concurrent.TimeoutException;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080033
Jian Li66865682016-03-02 11:43:09 -080034import static org.slf4j.LoggerFactory.getLogger;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080035
Thomas Vachuskab0317c62015-04-08 23:58:58 -070036
37/**
38 * Executor service wrapper for shared executors with safeguards on shutdown
39 * to prevent inadvertent shutdown.
40 */
41class SharedExecutorService implements ExecutorService {
42
43 private static final String NOT_ALLOWED = "Shutdown of shared executor is not allowed";
Jian Li54526b42016-03-02 19:36:53 -080044 private final Logger log = getLogger(getClass());
Thomas Vachuskab0317c62015-04-08 23:58:58 -070045
46 private ExecutorService executor;
47
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080048 private MetricsService metricsService = null;
49
50 private MetricsComponent executorMetrics;
51 private Timer queueMetrics = null;
52 private Timer delayMetrics = null;
53
54
Thomas Vachuskab0317c62015-04-08 23:58:58 -070055 /**
56 * Creates a wrapper for the given executor service.
57 *
58 * @param executor executor service to wrap
59 */
60 SharedExecutorService(ExecutorService executor) {
61 this.executor = executor;
62 }
63
64 /**
65 * Returns the backing executor service.
66 *
67 * @return backing executor service
68 */
69 ExecutorService backingExecutor() {
70 return executor;
71 }
72
73 /**
74 * Swaps the backing executor with a new one and shuts down the old one.
75 *
76 * @param executor new executor service
77 */
78 void setBackingExecutor(ExecutorService executor) {
79 ExecutorService oldExecutor = this.executor;
80 this.executor = executor;
81 oldExecutor.shutdown();
82 }
83
84 @Override
85 public void shutdown() {
86 throw new UnsupportedOperationException(NOT_ALLOWED);
87 }
88
89 @Override
90 public List<Runnable> shutdownNow() {
91 throw new UnsupportedOperationException(NOT_ALLOWED);
92 }
93
94 @Override
95 public boolean isShutdown() {
96 return executor.isShutdown();
97 }
98
99 @Override
100 public boolean isTerminated() {
101 return executor.isTerminated();
102 }
103
104 @Override
105 public boolean awaitTermination(long timeout, TimeUnit unit)
106 throws InterruptedException {
107 return executor.awaitTermination(timeout, unit);
108 }
109
110 @Override
111 public <T> Future<T> submit(Callable<T> task) {
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800112 Counter taskCounter = new Counter();
113 taskCounter.reset();
114 return executor.submit(() -> {
Thomas Vachuska0666f152016-08-05 12:03:54 -0700115 T t = null;
116 long queueWaitTime = (long) taskCounter.duration();
117 Class className;
118 if (task instanceof CallableExtended) {
119 className = ((CallableExtended) task).getRunnable().getClass();
120 } else {
121 className = task.getClass();
122 }
123 if (queueMetrics != null) {
124 queueMetrics.update(queueWaitTime, TimeUnit.SECONDS);
125 }
126 taskCounter.reset();
127 try {
128 t = task.call();
129 } catch (Exception e) {
130 getLogger(className).error("Uncaught exception on " + className, e);
131 }
132 long taskwaittime = (long) taskCounter.duration();
133 if (delayMetrics != null) {
134 delayMetrics.update(taskwaittime, TimeUnit.SECONDS);
135 }
136 return t;
137 });
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700138 }
139
140 @Override
141 public <T> Future<T> submit(Runnable task, T result) {
Jian Li54526b42016-03-02 19:36:53 -0800142 return executor.submit(wrap(task), result);
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700143 }
144
145 @Override
146 public Future<?> submit(Runnable task) {
Jian Li54526b42016-03-02 19:36:53 -0800147 return executor.submit(wrap(task));
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700148 }
149
150 @Override
151 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
152 throws InterruptedException {
153 return executor.invokeAll(tasks);
154 }
155
156 @Override
157 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
158 long timeout, TimeUnit unit)
159 throws InterruptedException {
160 return executor.invokeAll(tasks, timeout, unit);
161 }
162
163 @Override
164 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
165 throws InterruptedException, ExecutionException {
166 return executor.invokeAny(tasks);
167 }
168
169 @Override
170 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
171 long timeout, TimeUnit unit)
172 throws InterruptedException, ExecutionException, TimeoutException {
HIGUCHI Yuta1651e982015-09-04 13:45:08 -0700173 return executor.invokeAny(tasks, timeout, unit);
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700174 }
175
176 @Override
177 public void execute(Runnable command) {
178 executor.execute(command);
179 }
180
Thomas Vachuska0666f152016-08-05 12:03:54 -0700181 /**
182 * Enables or disables calculation of the pool performance metrics. If
183 * the metrics service is not null metric collection will be enabled;
184 * otherwise it will be disabled.
185 *
186 * @param metricsService optional metric service
187 */
188 public void setMetricsService(MetricsService metricsService) {
189 if (this.metricsService == null && metricsService != null) {
190 // If metrics service was newly introduced, initialize metrics.
191 executorMetrics = metricsService.registerComponent("SharedExecutor");
192 MetricsFeature mf = executorMetrics.registerFeature("*");
193 queueMetrics = metricsService.createTimer(executorMetrics, mf, "Queue");
194 delayMetrics = metricsService.createTimer(executorMetrics, mf, "Delay");
195 } else if (this.metricsService != null && metricsService == null) {
196 // If the metrics service was newly withdrawn, tear-down metrics.
197 queueMetrics = null;
198 delayMetrics = null;
199 } // Otherwise just record the metrics service
200 this.metricsService = metricsService;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800201 }
202
Jian Li54526b42016-03-02 19:36:53 -0800203 private Runnable wrap(Runnable command) {
204 return new LoggableRunnable(command);
205 }
206
207 /**
208 * A runnable class that allows to capture and log the exceptions.
209 */
210 private class LoggableRunnable implements Runnable {
211
212 private Runnable runnable;
213
214 public LoggableRunnable(Runnable runnable) {
215 super();
216 this.runnable = runnable;
217 }
218
219 @Override
220 public void run() {
221 try {
222 runnable.run();
223 } catch (Exception e) {
224 log.error("Uncaught exception on " + runnable.getClass().getSimpleName(), e);
225 throw Throwables.propagate(e);
226 }
227 }
228 }
229
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800230 /**
Thomas Vachuska0666f152016-08-05 12:03:54 -0700231 * CallableExtended class is used to get Runnable Object
232 * from Callable Object.
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800233 */
234 class CallableExtended implements Callable {
235
236 private Runnable runnable;
237
238 /**
239 * Wrapper for Callable object .
Thomas Vachuska0666f152016-08-05 12:03:54 -0700240 *
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800241 * @param runnable Runnable object
242 */
243 public CallableExtended(Runnable runnable) {
244 this.runnable = runnable;
245 }
Thomas Vachuska0666f152016-08-05 12:03:54 -0700246
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800247 public Runnable getRunnable() {
248 return runnable;
249 }
250
251 @Override
252 public Object call() throws Exception {
253 runnable.run();
254 return null;
255 }
256 }
257
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700258}