blob: 05e2978d0fee1934097b3a996483b1c5046999bd [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
19import io.grpc.Context;
Yi Tseng2a340f72018-11-02 16:52:47 -070020import io.grpc.StatusRuntimeException;
21import org.onlab.util.SharedExecutors;
22import org.onosproject.grpc.api.GrpcClient;
23import org.onosproject.grpc.api.GrpcClientKey;
24import org.onosproject.net.DeviceId;
25import org.slf4j.Logger;
26
Yi Tseng2a340f72018-11-02 16:52:47 -070027import java.util.concurrent.CompletableFuture;
28import java.util.concurrent.Executor;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
31import java.util.concurrent.TimeUnit;
32import java.util.concurrent.locks.Lock;
33import java.util.concurrent.locks.ReentrantLock;
34import java.util.function.Supplier;
35
36import static org.onlab.util.Tools.groupedThreads;
37import static org.slf4j.LoggerFactory.getLogger;
38
39/**
40 * Abstract client for gRPC service.
41 *
42 */
43public abstract class AbstractGrpcClient implements GrpcClient {
44
45 // Timeout in seconds to obtain the request lock.
Yi Tsengd7716482018-10-31 15:34:30 -070046 private static final int LOCK_TIMEOUT = 60;
Yi Tseng2a340f72018-11-02 16:52:47 -070047 private static final int DEFAULT_THREAD_POOL_SIZE = 10;
48
49 protected final Logger log = getLogger(getClass());
50
Yi Tsengd7716482018-10-31 15:34:30 -070051 private final Lock requestLock = new ReentrantLock();
52 private final Context.CancellableContext cancellableContext =
Yi Tseng2a340f72018-11-02 16:52:47 -070053 Context.current().withCancellation();
Yi Tsengd7716482018-10-31 15:34:30 -070054 private final Executor contextExecutor;
55
Yi Tseng2a340f72018-11-02 16:52:47 -070056 protected final ExecutorService executorService;
Yi Tsengd7716482018-10-31 15:34:30 -070057 protected final DeviceId deviceId;
Yi Tseng2a340f72018-11-02 16:52:47 -070058
Yi Tsengd7716482018-10-31 15:34:30 -070059 protected AbstractGrpcClient(GrpcClientKey clientKey) {
Yi Tseng2a340f72018-11-02 16:52:47 -070060 this.deviceId = clientKey.deviceId();
61 this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
62 "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
63 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Yi Tseng2a340f72018-11-02 16:52:47 -070064 }
65
66 @Override
67 public CompletableFuture<Void> shutdown() {
68 return supplyWithExecutor(this::doShutdown, "shutdown",
69 SharedExecutors.getPoolThreadExecutor());
70 }
71
72 protected Void doShutdown() {
73 log.debug("Shutting down client for {}...", deviceId);
74 cancellableContext.cancel(new InterruptedException(
75 "Requested client shutdown"));
76 this.executorService.shutdownNow();
77 try {
78 executorService.awaitTermination(5, TimeUnit.SECONDS);
79 } catch (InterruptedException e) {
80 log.warn("Executor service didn't shutdown in time.");
81 Thread.currentThread().interrupt();
82 }
83 return null;
84 }
85
86 /**
87 * Equivalent of supplyWithExecutor using the gRPC context executor of this
88 * client, such that if the context is cancelled (e.g. client shutdown) the
89 * RPC is automatically cancelled.
90 *
91 * @param <U> return type of supplier
92 * @param supplier the supplier to be executed
93 * @param opDescription the description of this supplier
94 * @return CompletableFuture includes the result of supplier
95 */
96 protected <U> CompletableFuture<U> supplyInContext(
97 Supplier<U> supplier, String opDescription) {
98 return supplyWithExecutor(supplier, opDescription, contextExecutor);
99 }
100
101 /**
102 * Submits a task for async execution via the given executor. All tasks
103 * submitted with this method will be executed sequentially.
104 *
105 * @param <U> return type of supplier
106 * @param supplier the supplier to be executed
107 * @param opDescription the description of this supplier
108 * @param executor the executor to execute this supplier
109 * @return CompletableFuture includes the result of supplier
110 */
Yi Tsengd7716482018-10-31 15:34:30 -0700111 private <U> CompletableFuture<U> supplyWithExecutor(
Yi Tseng2a340f72018-11-02 16:52:47 -0700112 Supplier<U> supplier, String opDescription, Executor executor) {
113 return CompletableFuture.supplyAsync(() -> {
114 // TODO: explore a more relaxed locking strategy.
115 try {
116 if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
117 log.error("LOCK TIMEOUT! This is likely a deadlock, "
118 + "please debug (executing {})",
119 opDescription);
120 throw new IllegalThreadStateException("Lock timeout");
121 }
122 } catch (InterruptedException e) {
123 log.warn("Thread interrupted while waiting for lock (executing {})",
124 opDescription);
125 throw new IllegalStateException(e);
126 }
127 try {
128 return supplier.get();
129 } catch (StatusRuntimeException ex) {
130 log.warn("Unable to execute {} on {}: {}",
131 opDescription, deviceId, ex.toString());
132 throw ex;
133 } catch (Throwable ex) {
134 log.error("Exception in client of {}, executing {}",
135 deviceId, opDescription, ex);
136 throw ex;
137 } finally {
138 requestLock.unlock();
139 }
140 }, executor);
141 }
142}