blob: 4764a56aa08f1a23330078ef540d8bc1ba460a3f [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import io.grpc.Context;
20import io.grpc.ManagedChannel;
21import io.grpc.StatusRuntimeException;
22import org.onlab.util.SharedExecutors;
23import org.onosproject.grpc.api.GrpcClient;
24import org.onosproject.grpc.api.GrpcClientKey;
25import org.onosproject.net.DeviceId;
26import org.slf4j.Logger;
27
28
29import java.util.concurrent.CompletableFuture;
30import java.util.concurrent.Executor;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.Executors;
33import java.util.concurrent.TimeUnit;
34import java.util.concurrent.locks.Lock;
35import java.util.concurrent.locks.ReentrantLock;
36import java.util.function.Supplier;
37
38import static org.onlab.util.Tools.groupedThreads;
39import static org.slf4j.LoggerFactory.getLogger;
40
41/**
42 * Abstract client for gRPC service.
43 *
44 */
45public abstract class AbstractGrpcClient implements GrpcClient {
46
47 // Timeout in seconds to obtain the request lock.
48 protected static final int LOCK_TIMEOUT = 60;
49 private static final int DEFAULT_THREAD_POOL_SIZE = 10;
50
51 protected final Logger log = getLogger(getClass());
52
53 protected final Lock requestLock = new ReentrantLock();
54 protected final Context.CancellableContext cancellableContext =
55 Context.current().withCancellation();
56 protected final ExecutorService executorService;
57 protected final Executor contextExecutor;
58
59 protected ManagedChannel channel;
60 protected DeviceId deviceId;
61
62 protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel) {
63 this.deviceId = clientKey.deviceId();
64 this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
65 "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
66 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
67 this.channel = channel;
68 }
69
70 @Override
71 public CompletableFuture<Void> shutdown() {
72 return supplyWithExecutor(this::doShutdown, "shutdown",
73 SharedExecutors.getPoolThreadExecutor());
74 }
75
76 protected Void doShutdown() {
77 log.debug("Shutting down client for {}...", deviceId);
78 cancellableContext.cancel(new InterruptedException(
79 "Requested client shutdown"));
80 this.executorService.shutdownNow();
81 try {
82 executorService.awaitTermination(5, TimeUnit.SECONDS);
83 } catch (InterruptedException e) {
84 log.warn("Executor service didn't shutdown in time.");
85 Thread.currentThread().interrupt();
86 }
87 return null;
88 }
89
90 /**
91 * Equivalent of supplyWithExecutor using the gRPC context executor of this
92 * client, such that if the context is cancelled (e.g. client shutdown) the
93 * RPC is automatically cancelled.
94 *
95 * @param <U> return type of supplier
96 * @param supplier the supplier to be executed
97 * @param opDescription the description of this supplier
98 * @return CompletableFuture includes the result of supplier
99 */
100 protected <U> CompletableFuture<U> supplyInContext(
101 Supplier<U> supplier, String opDescription) {
102 return supplyWithExecutor(supplier, opDescription, contextExecutor);
103 }
104
105 /**
106 * Submits a task for async execution via the given executor. All tasks
107 * submitted with this method will be executed sequentially.
108 *
109 * @param <U> return type of supplier
110 * @param supplier the supplier to be executed
111 * @param opDescription the description of this supplier
112 * @param executor the executor to execute this supplier
113 * @return CompletableFuture includes the result of supplier
114 */
115 protected <U> CompletableFuture<U> supplyWithExecutor(
116 Supplier<U> supplier, String opDescription, Executor executor) {
117 return CompletableFuture.supplyAsync(() -> {
118 // TODO: explore a more relaxed locking strategy.
119 try {
120 if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
121 log.error("LOCK TIMEOUT! This is likely a deadlock, "
122 + "please debug (executing {})",
123 opDescription);
124 throw new IllegalThreadStateException("Lock timeout");
125 }
126 } catch (InterruptedException e) {
127 log.warn("Thread interrupted while waiting for lock (executing {})",
128 opDescription);
129 throw new IllegalStateException(e);
130 }
131 try {
132 return supplier.get();
133 } catch (StatusRuntimeException ex) {
134 log.warn("Unable to execute {} on {}: {}",
135 opDescription, deviceId, ex.toString());
136 throw ex;
137 } catch (Throwable ex) {
138 log.error("Exception in client of {}, executing {}",
139 deviceId, opDescription, ex);
140 throw ex;
141 } finally {
142 requestLock.unlock();
143 }
144 }, executor);
145 }
146}