blob: 117c6e3df809478b7db77728488d710eed08451a [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
Carmelo Cascone3977ea42019-02-28 13:43:42 -080019import io.grpc.ConnectivityState;
Yi Tseng2a340f72018-11-02 16:52:47 -070020import io.grpc.Context;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080021import io.grpc.ManagedChannel;
Yi Tseng2a340f72018-11-02 16:52:47 -070022import io.grpc.StatusRuntimeException;
Yi Tseng2a340f72018-11-02 16:52:47 -070023import org.onosproject.grpc.api.GrpcClient;
24import org.onosproject.grpc.api.GrpcClientKey;
25import org.onosproject.net.DeviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080026import org.onosproject.net.device.DeviceAgentEvent;
Yi Tseng2a340f72018-11-02 16:52:47 -070027import org.slf4j.Logger;
28
Yi Tseng2a340f72018-11-02 16:52:47 -070029import java.util.concurrent.CompletableFuture;
30import java.util.concurrent.Executor;
31import java.util.concurrent.ExecutorService;
32import java.util.concurrent.Executors;
33import java.util.concurrent.TimeUnit;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080034import java.util.concurrent.atomic.AtomicBoolean;
Yi Tseng2a340f72018-11-02 16:52:47 -070035import java.util.concurrent.locks.Lock;
36import java.util.concurrent.locks.ReentrantLock;
37import java.util.function.Supplier;
38
Carmelo Cascone4c289b72019-01-22 15:30:45 -080039import static com.google.common.base.Preconditions.checkNotNull;
Yi Tseng2a340f72018-11-02 16:52:47 -070040import static org.onlab.util.Tools.groupedThreads;
41import static org.slf4j.LoggerFactory.getLogger;
42
43/**
44 * Abstract client for gRPC service.
Yi Tseng2a340f72018-11-02 16:52:47 -070045 */
46public abstract class AbstractGrpcClient implements GrpcClient {
47
48 // Timeout in seconds to obtain the request lock.
Yi Tsengd7716482018-10-31 15:34:30 -070049 private static final int LOCK_TIMEOUT = 60;
Yi Tseng2a340f72018-11-02 16:52:47 -070050 private static final int DEFAULT_THREAD_POOL_SIZE = 10;
51
52 protected final Logger log = getLogger(getClass());
53
Yi Tsengd7716482018-10-31 15:34:30 -070054 private final Lock requestLock = new ReentrantLock();
55 private final Context.CancellableContext cancellableContext =
Yi Tseng2a340f72018-11-02 16:52:47 -070056 Context.current().withCancellation();
Yi Tsengd7716482018-10-31 15:34:30 -070057 private final Executor contextExecutor;
58
Yi Tseng2a340f72018-11-02 16:52:47 -070059 protected final ExecutorService executorService;
Yi Tsengd7716482018-10-31 15:34:30 -070060 protected final DeviceId deviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080061 protected final ManagedChannel channel;
62 private final boolean persistent;
63 private final AbstractGrpcClientController controller;
64 private final AtomicBoolean channelOpen = new AtomicBoolean(false);
Yi Tseng2a340f72018-11-02 16:52:47 -070065
Carmelo Cascone3977ea42019-02-28 13:43:42 -080066 /**
67 * Creates an new client for the given key and channel. Setting persistent
68 * to true avoids the gRPC channel to stay IDLE. The controller instance is
69 * needed to propagate channel events.
70 *
71 * @param clientKey client key
72 * @param channel channel
73 * @param persistent true if the gRPC should never stay IDLE
74 * @param controller controller
75 */
76 protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel,
77 boolean persistent, AbstractGrpcClientController controller) {
Carmelo Cascone4c289b72019-01-22 15:30:45 -080078 checkNotNull(clientKey);
Carmelo Cascone3977ea42019-02-28 13:43:42 -080079 checkNotNull(channel);
Yi Tseng2a340f72018-11-02 16:52:47 -070080 this.deviceId = clientKey.deviceId();
Carmelo Cascone3977ea42019-02-28 13:43:42 -080081 this.channel = channel;
82 this.persistent = persistent;
83 this.controller = controller;
Yi Tseng2a340f72018-11-02 16:52:47 -070084 this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads(
85 "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d"));
86 this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService);
Carmelo Cascone3977ea42019-02-28 13:43:42 -080087
88 setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
89 }
90
91 @Override
92 public boolean isServerReachable() {
93 final ConnectivityState state = channel.getState(false);
94 switch (state) {
95 case READY:
96 case IDLE:
97 return true;
98 case CONNECTING:
99 case TRANSIENT_FAILURE:
100 case SHUTDOWN:
101 return false;
102 default:
103 log.error("Unrecognized channel connectivity state {}", state);
104 return false;
105 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700106 }
107
108 @Override
109 public CompletableFuture<Void> shutdown() {
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800110 if (cancellableContext.isCancelled()) {
111 log.warn("Context is already cancelled, " +
112 "ignoring request to shutdown for {}...", deviceId);
113 return CompletableFuture.completedFuture(null);
114 }
115 return CompletableFuture.supplyAsync(this::doShutdown);
Yi Tseng2a340f72018-11-02 16:52:47 -0700116 }
117
118 protected Void doShutdown() {
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800119 log.warn("Shutting down client for {}...", deviceId);
Yi Tseng2a340f72018-11-02 16:52:47 -0700120 cancellableContext.cancel(new InterruptedException(
121 "Requested client shutdown"));
122 this.executorService.shutdownNow();
123 try {
124 executorService.awaitTermination(5, TimeUnit.SECONDS);
125 } catch (InterruptedException e) {
126 log.warn("Executor service didn't shutdown in time.");
127 Thread.currentThread().interrupt();
128 }
129 return null;
130 }
131
132 /**
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800133 * Executes the given task in the cancellable context of this client.
134 *
135 * @param task task
136 * @throws IllegalStateException if context has been cancelled
137 */
138 protected void runInCancellableContext(Runnable task) {
139 if (this.cancellableContext.isCancelled()) {
140 throw new IllegalStateException(
141 "Context is cancelled (client has been shut down)");
142 }
143 this.cancellableContext.run(task);
144 }
145
146 /**
147 * Returns the context associated with this client.
148 *
149 * @return context
150 */
151 protected Context.CancellableContext context() {
152 return cancellableContext;
153 }
154
155 /**
Yi Tseng2a340f72018-11-02 16:52:47 -0700156 * Equivalent of supplyWithExecutor using the gRPC context executor of this
157 * client, such that if the context is cancelled (e.g. client shutdown) the
158 * RPC is automatically cancelled.
159 *
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800160 * @param <U> return type of supplier
161 * @param supplier the supplier to be executed
Yi Tseng2a340f72018-11-02 16:52:47 -0700162 * @param opDescription the description of this supplier
163 * @return CompletableFuture includes the result of supplier
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800164 * @throws IllegalStateException if client has been shut down
Yi Tseng2a340f72018-11-02 16:52:47 -0700165 */
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800166 protected <U> CompletableFuture<U> supplyInContext(
Yi Tseng2a340f72018-11-02 16:52:47 -0700167 Supplier<U> supplier, String opDescription) {
168 return supplyWithExecutor(supplier, opDescription, contextExecutor);
169 }
170
171 /**
172 * Submits a task for async execution via the given executor. All tasks
173 * submitted with this method will be executed sequentially.
174 *
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800175 * @param <U> return type of supplier
176 * @param supplier the supplier to be executed
Yi Tseng2a340f72018-11-02 16:52:47 -0700177 * @param opDescription the description of this supplier
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800178 * @param executor the executor to execute this supplier
Yi Tseng2a340f72018-11-02 16:52:47 -0700179 * @return CompletableFuture includes the result of supplier
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800180 * @throws IllegalStateException if client has been shut down
Yi Tseng2a340f72018-11-02 16:52:47 -0700181 */
Yi Tsengd7716482018-10-31 15:34:30 -0700182 private <U> CompletableFuture<U> supplyWithExecutor(
Yi Tseng2a340f72018-11-02 16:52:47 -0700183 Supplier<U> supplier, String opDescription, Executor executor) {
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800184 if (this.cancellableContext.isCancelled()) {
185 throw new IllegalStateException("Client has been shut down");
186 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700187 return CompletableFuture.supplyAsync(() -> {
188 // TODO: explore a more relaxed locking strategy.
189 try {
190 if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) {
191 log.error("LOCK TIMEOUT! This is likely a deadlock, "
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800192 + "please debug (executing {})",
193 opDescription);
Yi Tseng2a340f72018-11-02 16:52:47 -0700194 throw new IllegalThreadStateException("Lock timeout");
195 }
196 } catch (InterruptedException e) {
197 log.warn("Thread interrupted while waiting for lock (executing {})",
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800198 opDescription);
Yi Tseng2a340f72018-11-02 16:52:47 -0700199 throw new IllegalStateException(e);
200 }
201 try {
202 return supplier.get();
203 } catch (StatusRuntimeException ex) {
204 log.warn("Unable to execute {} on {}: {}",
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800205 opDescription, deviceId, ex.toString());
Yi Tseng2a340f72018-11-02 16:52:47 -0700206 throw ex;
207 } catch (Throwable ex) {
208 log.error("Exception in client of {}, executing {}",
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800209 deviceId, opDescription, ex);
Yi Tseng2a340f72018-11-02 16:52:47 -0700210 throw ex;
211 } finally {
212 requestLock.unlock();
213 }
214 }, executor);
215 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800216
217 private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
218 ConnectivityState sourceState) {
219 if (log.isTraceEnabled()) {
220 log.trace("Setting channel callback for {} with source state {}...",
221 deviceId, sourceState);
222 }
223 channel.notifyWhenStateChanged(
224 sourceState, new ChannelConnectivityCallback(deviceId, channel));
225 }
226
227 /**
228 * Runnable task invoked at each change of the channel connectivity state.
229 * New callbacks are created as long as the channel is not shut down.
230 */
231 private final class ChannelConnectivityCallback implements Runnable {
232
233 private final DeviceId deviceId;
234 private final ManagedChannel channel;
235
236 private ChannelConnectivityCallback(
237 DeviceId deviceId, ManagedChannel channel) {
238 this.deviceId = deviceId;
239 this.channel = channel;
240 }
241
242 @Override
243 public void run() {
244 final ConnectivityState newState = channel.getState(false);
245 final DeviceAgentEvent.Type eventType;
246 switch (newState) {
247 // On gRPC connectivity states:
248 // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
249 case READY:
250 eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
251 break;
252 case TRANSIENT_FAILURE:
253 eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
254 break;
255 case SHUTDOWN:
256 eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
257 break;
258 case IDLE:
259 // IDLE and CONNECTING are transient states that will
260 // eventually move to READY or TRANSIENT_FAILURE. Do not
261 // generate an event for now.
262 if (persistent) {
263 log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
264 channel.getState(true);
265 }
266 eventType = null;
267 break;
268 case CONNECTING:
269 eventType = null;
270 break;
271 default:
272 log.error("Unrecognized connectivity state {}", newState);
273 eventType = null;
274 }
275
276 if (log.isTraceEnabled()) {
277 log.trace("Detected channel connectivity change for {}, new state is {}",
278 deviceId, newState);
279 }
280
281 if (eventType != null) {
282 // Avoid sending consecutive duplicate events.
283 final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
284 final boolean past = channelOpen.getAndSet(present);
285 if (present != past) {
286 log.debug("Notifying event {} for {}", eventType, deviceId);
287 controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
288 }
289 }
290
291 if (newState != ConnectivityState.SHUTDOWN) {
292 // Channels never leave SHUTDOWN state, no need for a new callback.
293 setChannelCallback(deviceId, channel, newState);
294 }
295 }
296 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700297}