blob: d3c4534292df54f41484298f5e5cb96aa061fa0b [file] [log] [blame]
Thomas Vachuskab0317c62015-04-08 23:58:58 -07001/*
2 * Copyright 2015 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onlab.util;
17
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080018import org.onlab.metrics.MetricsComponent;
19import org.onlab.metrics.MetricsFeature;
20import org.onlab.metrics.MetricsService;
21
Thomas Vachuskab0317c62015-04-08 23:58:58 -070022import java.util.Collection;
23import java.util.List;
24import java.util.concurrent.Callable;
25import java.util.concurrent.ExecutionException;
26import java.util.concurrent.ExecutorService;
27import java.util.concurrent.Future;
28import java.util.concurrent.TimeUnit;
29import java.util.concurrent.TimeoutException;
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080030import com.codahale.metrics.Timer;
31
32
Thomas Vachuskab0317c62015-04-08 23:58:58 -070033
34/**
35 * Executor service wrapper for shared executors with safeguards on shutdown
36 * to prevent inadvertent shutdown.
37 */
38class SharedExecutorService implements ExecutorService {
39
40 private static final String NOT_ALLOWED = "Shutdown of shared executor is not allowed";
41
42 private ExecutorService executor;
43
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080044 private MetricsService metricsService = null;
45
46 private MetricsComponent executorMetrics;
47 private Timer queueMetrics = null;
48 private Timer delayMetrics = null;
49
50
Thomas Vachuskab0317c62015-04-08 23:58:58 -070051 /**
52 * Creates a wrapper for the given executor service.
53 *
54 * @param executor executor service to wrap
55 */
56 SharedExecutorService(ExecutorService executor) {
57 this.executor = executor;
58 }
59
60 /**
61 * Returns the backing executor service.
62 *
63 * @return backing executor service
64 */
65 ExecutorService backingExecutor() {
66 return executor;
67 }
68
69 /**
70 * Swaps the backing executor with a new one and shuts down the old one.
71 *
72 * @param executor new executor service
73 */
74 void setBackingExecutor(ExecutorService executor) {
75 ExecutorService oldExecutor = this.executor;
76 this.executor = executor;
77 oldExecutor.shutdown();
78 }
79
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -080080
Thomas Vachuskab0317c62015-04-08 23:58:58 -070081 @Override
82 public void shutdown() {
83 throw new UnsupportedOperationException(NOT_ALLOWED);
84 }
85
86 @Override
87 public List<Runnable> shutdownNow() {
88 throw new UnsupportedOperationException(NOT_ALLOWED);
89 }
90
91 @Override
92 public boolean isShutdown() {
93 return executor.isShutdown();
94 }
95
96 @Override
97 public boolean isTerminated() {
98 return executor.isTerminated();
99 }
100
101 @Override
102 public boolean awaitTermination(long timeout, TimeUnit unit)
103 throws InterruptedException {
104 return executor.awaitTermination(timeout, unit);
105 }
106
107 @Override
108 public <T> Future<T> submit(Callable<T> task) {
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800109 Counter taskCounter = new Counter();
110 taskCounter.reset();
111 return executor.submit(() -> {
112 T t = null;
113 long queueWaitTime = (long) taskCounter.duration();
114 String className;
115 if (task instanceof CallableExtended) {
116 className = ((CallableExtended) task).getRunnable().getClass().toString();
117 } else {
118 className = task.getClass().toString();
119 }
120 if (queueMetrics != null) {
121 queueMetrics.update(queueWaitTime, TimeUnit.SECONDS);
122 }
123 taskCounter.reset();
124 try {
125 t = task.call();
126 } catch (Exception e) { }
127 long taskwaittime = (long) taskCounter.duration();
128 if (delayMetrics != null) {
129 delayMetrics.update(taskwaittime, TimeUnit.SECONDS);
130 }
131 return t;
132 }
133 );
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700134 }
135
136 @Override
137 public <T> Future<T> submit(Runnable task, T result) {
138 return executor.submit(task, result);
139 }
140
141 @Override
142 public Future<?> submit(Runnable task) {
143 return executor.submit(task);
144 }
145
146 @Override
147 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
148 throws InterruptedException {
149 return executor.invokeAll(tasks);
150 }
151
152 @Override
153 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
154 long timeout, TimeUnit unit)
155 throws InterruptedException {
156 return executor.invokeAll(tasks, timeout, unit);
157 }
158
159 @Override
160 public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
161 throws InterruptedException, ExecutionException {
162 return executor.invokeAny(tasks);
163 }
164
165 @Override
166 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
167 long timeout, TimeUnit unit)
168 throws InterruptedException, ExecutionException, TimeoutException {
HIGUCHI Yuta1651e982015-09-04 13:45:08 -0700169 return executor.invokeAny(tasks, timeout, unit);
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700170 }
171
172 @Override
173 public void execute(Runnable command) {
174 executor.execute(command);
175 }
176
Murat Parlakisikdc17f7b2016-01-26 12:08:35 -0800177 public void setCalculatePoolPerformance(boolean calculatePoolPerformance, MetricsService metricsSrv) {
178 this.metricsService = metricsSrv;
179 if (calculatePoolPerformance) {
180 if (metricsService != null) {
181 executorMetrics = metricsService.registerComponent("SharedExecutor");
182 MetricsFeature mf = executorMetrics.registerFeature("*");
183 queueMetrics = metricsService.createTimer(executorMetrics, mf, "Queue");
184 delayMetrics = metricsService.createTimer(executorMetrics, mf, "Delay");
185 }
186 } else {
187 metricsService = null;
188 queueMetrics = null;
189 delayMetrics = null;
190 }
191 }
192
193 /**
194 * CallableExtended class is used to get Runnable Object
195 * from Callable Object.
196 *
197 */
198 class CallableExtended implements Callable {
199
200 private Runnable runnable;
201
202 /**
203 * Wrapper for Callable object .
204 * @param runnable Runnable object
205 */
206 public CallableExtended(Runnable runnable) {
207 this.runnable = runnable;
208 }
209 public Runnable getRunnable() {
210 return runnable;
211 }
212
213 @Override
214 public Object call() throws Exception {
215 runnable.run();
216 return null;
217 }
218 }
219
Thomas Vachuskab0317c62015-04-08 23:58:58 -0700220}