blob: 8806ebceabbb0cab976a154c203aa078e27a31a9 [file] [log] [blame]
Thomas Vachuskab0317c62015-04-08 23:58:58 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
Jian Li66865682016-03-02 11:43:09 -080018import com.codahale.metrics.Timer;
Jian Li54526b42016-03-02 19:36:53 -080019import com.google.common.base.Throwables;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080020import org.onlab.metrics.MetricsComponent;
21import org.onlab.metrics.MetricsFeature;
22import org.onlab.metrics.MetricsService;
Jian Li54526b42016-03-02 19:36:53 -080023import org.slf4j.Logger;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080024
Thomas Vachuskab0317c62015-04-08 23:58:58 -070025import java.util.Collection;
26import java.util.List;
27import java.util.concurrent.Callable;
28import java.util.concurrent.ExecutionException;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Future;
31import java.util.concurrent.TimeUnit;
32import java.util.concurrent.TimeoutException;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080033
Jian Li66865682016-03-02 11:43:09 -080034import static org.slf4j.LoggerFactory.getLogger;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080035
Thomas Vachuskab0317c62015-04-08 23:58:58 -070036
37/**
38 * Executor service wrapper for shared executors with safeguards on shutdown
39 * to prevent inadvertent shutdown.
40 */
41class SharedExecutorService implements ExecutorService {
42
43 private static final String NOT_ALLOWED = "Shutdown of shared executor is not allowed";
Jian Li54526b42016-03-02 19:36:53 -080044 private final Logger log = getLogger(getClass());
Thomas Vachuskab0317c62015-04-08 23:58:58 -070045
46 private ExecutorService executor;
47
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080048 private MetricsService metricsService = null;
49
50 private MetricsComponent executorMetrics;
51 private Timer queueMetrics = null;
52 private Timer delayMetrics = null;
53
54
Thomas Vachuskab0317c62015-04-08 23:58:58 -070055 /**
56 * Creates a wrapper for the given executor service.
57 *
58 * @param executor executor service to wrap
59 */
60 SharedExecutorService(ExecutorService executor) {
61 this.executor = executor;
62 }
63
64 /**
65 * Returns the backing executor service.
66 *
67 * @return backing executor service
68 */
69 ExecutorService backingExecutor() {
70 return executor;
71 }
72
73 /**
74 * Swaps the backing executor with a new one and shuts down the old one.
75 *
76 * @param executor new executor service
77 */
78 void setBackingExecutor(ExecutorService executor) {
79 ExecutorService oldExecutor = this.executor;
80 this.executor = executor;
81 oldExecutor.shutdown();
82 }
83
84 @Override
85 public void shutdown() {
86 throw new UnsupportedOperationException(NOT_ALLOWED);
87 }
88
89 @Override
90 public List<Runnable> shutdownNow() {
91 throw new UnsupportedOperationException(NOT_ALLOWED);
92 }
93
94 @Override
95 public boolean isShutdown() {
96 return executor.isShutdown();
97 }
98
99 @Override
100 public boolean isTerminated() {
101 return executor.isTerminated();
102 }
103
104 @Override
105 public boolean awaitTermination(long timeout, TimeUnit unit)
106 throws InterruptedException {
107 return executor.awaitTermination(timeout, unit);
108 }
109
110 @Override
111 public <T> Future<T> submit(Callable<T> task) {
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800112 Counter taskCounter = new Counter();
113 taskCounter.reset();
114 return executor.submit(() -> {
115 T t = null;
116 long queueWaitTime = (long) taskCounter.duration();
Jian Li66865682016-03-02 11:43:09 -0800117 Class className;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800118 if (task instanceof CallableExtended) {
Jian Li66865682016-03-02 11:43:09 -0800119 className = ((CallableExtended) task).getRunnable().getClass();
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800120 } else {
Jian Li66865682016-03-02 11:43:09 -0800121 className = task.getClass();
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800122 }
123 if (queueMetrics != null) {
124 queueMetrics.update(queueWaitTime, TimeUnit.SECONDS);
125 }
126 taskCounter.reset();
127 try {
128 t = task.call();
Jian Li66865682016-03-02 11:43:09 -0800129 } catch (Exception e) {
130 getLogger(className).error("Uncaught exception on " + className, e);
131 }
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800132 long taskwaittime = (long) taskCounter.duration();
133 if (delayMetrics != null) {
134 delayMetrics.update(taskwaittime, TimeUnit.SECONDS);
135 }
136 return t;
137 }
138 );
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700139 }
140
141 @Override
142 public <T> Future<T> submit(Runnable task, T result) {
Jian Li54526b42016-03-02 19:36:53 -0800143 return executor.submit(wrap(task), result);
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700144 }
145
146 @Override
147 public Future<?> submit(Runnable task) {
Jian Li54526b42016-03-02 19:36:53 -0800148 return executor.submit(wrap(task));
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700149 }
150
151 @Override
152 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
153 throws InterruptedException {
154 return executor.invokeAll(tasks);
155 }
156
157 @Override
158 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
159 long timeout, TimeUnit unit)
160 throws InterruptedException {
161 return executor.invokeAll(tasks, timeout, unit);
162 }
163
164 @Override
165 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
166 throws InterruptedException, ExecutionException {
167 return executor.invokeAny(tasks);
168 }
169
170 @Override
171 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
172 long timeout, TimeUnit unit)
173 throws InterruptedException, ExecutionException, TimeoutException {
HIGUCHI Yuta1651e982015-09-04 13:45:08 -0700174 return executor.invokeAny(tasks, timeout, unit);
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700175 }
176
177 @Override
178 public void execute(Runnable command) {
179 executor.execute(command);
180 }
181
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800182 public void setCalculatePoolPerformance(boolean calculatePoolPerformance, MetricsService metricsSrv) {
183 this.metricsService = metricsSrv;
184 if (calculatePoolPerformance) {
185 if (metricsService != null) {
186 executorMetrics = metricsService.registerComponent("SharedExecutor");
187 MetricsFeature mf = executorMetrics.registerFeature("*");
188 queueMetrics = metricsService.createTimer(executorMetrics, mf, "Queue");
189 delayMetrics = metricsService.createTimer(executorMetrics, mf, "Delay");
190 }
191 } else {
192 metricsService = null;
193 queueMetrics = null;
194 delayMetrics = null;
195 }
196 }
197
Jian Li54526b42016-03-02 19:36:53 -0800198 private Runnable wrap(Runnable command) {
199 return new LoggableRunnable(command);
200 }
201
202 /**
203 * A runnable class that allows to capture and log the exceptions.
204 */
205 private class LoggableRunnable implements Runnable {
206
207 private Runnable runnable;
208
209 public LoggableRunnable(Runnable runnable) {
210 super();
211 this.runnable = runnable;
212 }
213
214 @Override
215 public void run() {
216 try {
217 runnable.run();
218 } catch (Exception e) {
219 log.error("Uncaught exception on " + runnable.getClass().getSimpleName(), e);
220 throw Throwables.propagate(e);
221 }
222 }
223 }
224
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800225 /**
226 * CallableExtended class is used to get Runnable Object
227 * from Callable Object.
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800228 */
229 class CallableExtended implements Callable {
230
231 private Runnable runnable;
232
233 /**
234 * Wrapper for Callable object .
235 * @param runnable Runnable object
236 */
237 public CallableExtended(Runnable runnable) {
238 this.runnable = runnable;
239 }
240 public Runnable getRunnable() {
241 return runnable;
242 }
243
244 @Override
245 public Object call() throws Exception {
246 runnable.run();
247 return null;
248 }
249 }
250
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700251}