Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2018-present Open Networking Foundation |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package org.onosproject.grpc.ctl; |
| 18 | |
Carmelo Cascone | 3977ea4 | 2019-02-28 13:43:42 -0800 | [diff] [blame^] | 19 | import io.grpc.ConnectivityState; |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 20 | import io.grpc.Context; |
Carmelo Cascone | 3977ea4 | 2019-02-28 13:43:42 -0800 | [diff] [blame^] | 21 | import io.grpc.ManagedChannel; |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 22 | import io.grpc.StatusRuntimeException; |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 23 | import org.onosproject.grpc.api.GrpcClient; |
| 24 | import org.onosproject.grpc.api.GrpcClientKey; |
| 25 | import org.onosproject.net.DeviceId; |
Carmelo Cascone | 3977ea4 | 2019-02-28 13:43:42 -0800 | [diff] [blame^] | 26 | import org.onosproject.net.device.DeviceAgentEvent; |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 27 | import org.slf4j.Logger; |
| 28 | |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 29 | import java.util.concurrent.CompletableFuture; |
| 30 | import java.util.concurrent.Executor; |
| 31 | import java.util.concurrent.ExecutorService; |
| 32 | import java.util.concurrent.Executors; |
| 33 | import java.util.concurrent.TimeUnit; |
Carmelo Cascone | 3977ea4 | 2019-02-28 13:43:42 -0800 | [diff] [blame^] | 34 | import java.util.concurrent.atomic.AtomicBoolean; |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 35 | import java.util.concurrent.locks.Lock; |
| 36 | import java.util.concurrent.locks.ReentrantLock; |
| 37 | import java.util.function.Supplier; |
| 38 | |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 39 | import static com.google.common.base.Preconditions.checkNotNull; |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 40 | import static org.onlab.util.Tools.groupedThreads; |
| 41 | import static org.slf4j.LoggerFactory.getLogger; |
| 42 | |
| 43 | /** |
| 44 | * Abstract client for gRPC service. |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 45 | */ |
| 46 | public abstract class AbstractGrpcClient implements GrpcClient { |
| 47 | |
| 48 | // Timeout in seconds to obtain the request lock. |
Yi Tseng | d771648 | 2018-10-31 15:34:30 -0700 | [diff] [blame] | 49 | private static final int LOCK_TIMEOUT = 60; |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 50 | private static final int DEFAULT_THREAD_POOL_SIZE = 10; |
| 51 | |
| 52 | protected final Logger log = getLogger(getClass()); |
| 53 | |
Yi Tseng | d771648 | 2018-10-31 15:34:30 -0700 | [diff] [blame] | 54 | private final Lock requestLock = new ReentrantLock(); |
| 55 | private final Context.CancellableContext cancellableContext = |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 56 | Context.current().withCancellation(); |
Yi Tseng | d771648 | 2018-10-31 15:34:30 -0700 | [diff] [blame] | 57 | private final Executor contextExecutor; |
| 58 | |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 59 | protected final ExecutorService executorService; |
Yi Tseng | d771648 | 2018-10-31 15:34:30 -0700 | [diff] [blame] | 60 | protected final DeviceId deviceId; |
Carmelo Cascone | 3977ea4 | 2019-02-28 13:43:42 -0800 | [diff] [blame^] | 61 | protected final ManagedChannel channel; |
| 62 | private final boolean persistent; |
| 63 | private final AbstractGrpcClientController controller; |
| 64 | private final AtomicBoolean channelOpen = new AtomicBoolean(false); |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 65 | |
Carmelo Cascone | 3977ea4 | 2019-02-28 13:43:42 -0800 | [diff] [blame^] | 66 | /** |
| 67 | * Creates an new client for the given key and channel. Setting persistent |
| 68 | * to true avoids the gRPC channel to stay IDLE. The controller instance is |
| 69 | * needed to propagate channel events. |
| 70 | * |
| 71 | * @param clientKey client key |
| 72 | * @param channel channel |
| 73 | * @param persistent true if the gRPC should never stay IDLE |
| 74 | * @param controller controller |
| 75 | */ |
| 76 | protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel, |
| 77 | boolean persistent, AbstractGrpcClientController controller) { |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 78 | checkNotNull(clientKey); |
Carmelo Cascone | 3977ea4 | 2019-02-28 13:43:42 -0800 | [diff] [blame^] | 79 | checkNotNull(channel); |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 80 | this.deviceId = clientKey.deviceId(); |
Carmelo Cascone | 3977ea4 | 2019-02-28 13:43:42 -0800 | [diff] [blame^] | 81 | this.channel = channel; |
| 82 | this.persistent = persistent; |
| 83 | this.controller = controller; |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 84 | this.executorService = Executors.newFixedThreadPool(DEFAULT_THREAD_POOL_SIZE, groupedThreads( |
| 85 | "onos-grpc-" + clientKey.serviceName() + "-client-" + deviceId.toString(), "%d")); |
| 86 | this.contextExecutor = this.cancellableContext.fixedContextExecutor(executorService); |
Carmelo Cascone | 3977ea4 | 2019-02-28 13:43:42 -0800 | [diff] [blame^] | 87 | |
| 88 | setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING); |
| 89 | } |
| 90 | |
| 91 | @Override |
| 92 | public boolean isServerReachable() { |
| 93 | final ConnectivityState state = channel.getState(false); |
| 94 | switch (state) { |
| 95 | case READY: |
| 96 | case IDLE: |
| 97 | return true; |
| 98 | case CONNECTING: |
| 99 | case TRANSIENT_FAILURE: |
| 100 | case SHUTDOWN: |
| 101 | return false; |
| 102 | default: |
| 103 | log.error("Unrecognized channel connectivity state {}", state); |
| 104 | return false; |
| 105 | } |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 106 | } |
| 107 | |
| 108 | @Override |
| 109 | public CompletableFuture<Void> shutdown() { |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 110 | if (cancellableContext.isCancelled()) { |
| 111 | log.warn("Context is already cancelled, " + |
| 112 | "ignoring request to shutdown for {}...", deviceId); |
| 113 | return CompletableFuture.completedFuture(null); |
| 114 | } |
| 115 | return CompletableFuture.supplyAsync(this::doShutdown); |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 116 | } |
| 117 | |
| 118 | protected Void doShutdown() { |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 119 | log.warn("Shutting down client for {}...", deviceId); |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 120 | cancellableContext.cancel(new InterruptedException( |
| 121 | "Requested client shutdown")); |
| 122 | this.executorService.shutdownNow(); |
| 123 | try { |
| 124 | executorService.awaitTermination(5, TimeUnit.SECONDS); |
| 125 | } catch (InterruptedException e) { |
| 126 | log.warn("Executor service didn't shutdown in time."); |
| 127 | Thread.currentThread().interrupt(); |
| 128 | } |
| 129 | return null; |
| 130 | } |
| 131 | |
| 132 | /** |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 133 | * Executes the given task in the cancellable context of this client. |
| 134 | * |
| 135 | * @param task task |
| 136 | * @throws IllegalStateException if context has been cancelled |
| 137 | */ |
| 138 | protected void runInCancellableContext(Runnable task) { |
| 139 | if (this.cancellableContext.isCancelled()) { |
| 140 | throw new IllegalStateException( |
| 141 | "Context is cancelled (client has been shut down)"); |
| 142 | } |
| 143 | this.cancellableContext.run(task); |
| 144 | } |
| 145 | |
| 146 | /** |
| 147 | * Returns the context associated with this client. |
| 148 | * |
| 149 | * @return context |
| 150 | */ |
| 151 | protected Context.CancellableContext context() { |
| 152 | return cancellableContext; |
| 153 | } |
| 154 | |
| 155 | /** |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 156 | * Equivalent of supplyWithExecutor using the gRPC context executor of this |
| 157 | * client, such that if the context is cancelled (e.g. client shutdown) the |
| 158 | * RPC is automatically cancelled. |
| 159 | * |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 160 | * @param <U> return type of supplier |
| 161 | * @param supplier the supplier to be executed |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 162 | * @param opDescription the description of this supplier |
| 163 | * @return CompletableFuture includes the result of supplier |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 164 | * @throws IllegalStateException if client has been shut down |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 165 | */ |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 166 | protected <U> CompletableFuture<U> supplyInContext( |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 167 | Supplier<U> supplier, String opDescription) { |
| 168 | return supplyWithExecutor(supplier, opDescription, contextExecutor); |
| 169 | } |
| 170 | |
| 171 | /** |
| 172 | * Submits a task for async execution via the given executor. All tasks |
| 173 | * submitted with this method will be executed sequentially. |
| 174 | * |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 175 | * @param <U> return type of supplier |
| 176 | * @param supplier the supplier to be executed |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 177 | * @param opDescription the description of this supplier |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 178 | * @param executor the executor to execute this supplier |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 179 | * @return CompletableFuture includes the result of supplier |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 180 | * @throws IllegalStateException if client has been shut down |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 181 | */ |
Yi Tseng | d771648 | 2018-10-31 15:34:30 -0700 | [diff] [blame] | 182 | private <U> CompletableFuture<U> supplyWithExecutor( |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 183 | Supplier<U> supplier, String opDescription, Executor executor) { |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 184 | if (this.cancellableContext.isCancelled()) { |
| 185 | throw new IllegalStateException("Client has been shut down"); |
| 186 | } |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 187 | return CompletableFuture.supplyAsync(() -> { |
| 188 | // TODO: explore a more relaxed locking strategy. |
| 189 | try { |
| 190 | if (!requestLock.tryLock(LOCK_TIMEOUT, TimeUnit.SECONDS)) { |
| 191 | log.error("LOCK TIMEOUT! This is likely a deadlock, " |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 192 | + "please debug (executing {})", |
| 193 | opDescription); |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 194 | throw new IllegalThreadStateException("Lock timeout"); |
| 195 | } |
| 196 | } catch (InterruptedException e) { |
| 197 | log.warn("Thread interrupted while waiting for lock (executing {})", |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 198 | opDescription); |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 199 | throw new IllegalStateException(e); |
| 200 | } |
| 201 | try { |
| 202 | return supplier.get(); |
| 203 | } catch (StatusRuntimeException ex) { |
| 204 | log.warn("Unable to execute {} on {}: {}", |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 205 | opDescription, deviceId, ex.toString()); |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 206 | throw ex; |
| 207 | } catch (Throwable ex) { |
| 208 | log.error("Exception in client of {}, executing {}", |
Carmelo Cascone | 4c289b7 | 2019-01-22 15:30:45 -0800 | [diff] [blame] | 209 | deviceId, opDescription, ex); |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 210 | throw ex; |
| 211 | } finally { |
| 212 | requestLock.unlock(); |
| 213 | } |
| 214 | }, executor); |
| 215 | } |
Carmelo Cascone | 3977ea4 | 2019-02-28 13:43:42 -0800 | [diff] [blame^] | 216 | |
| 217 | private void setChannelCallback(DeviceId deviceId, ManagedChannel channel, |
| 218 | ConnectivityState sourceState) { |
| 219 | if (log.isTraceEnabled()) { |
| 220 | log.trace("Setting channel callback for {} with source state {}...", |
| 221 | deviceId, sourceState); |
| 222 | } |
| 223 | channel.notifyWhenStateChanged( |
| 224 | sourceState, new ChannelConnectivityCallback(deviceId, channel)); |
| 225 | } |
| 226 | |
| 227 | /** |
| 228 | * Runnable task invoked at each change of the channel connectivity state. |
| 229 | * New callbacks are created as long as the channel is not shut down. |
| 230 | */ |
| 231 | private final class ChannelConnectivityCallback implements Runnable { |
| 232 | |
| 233 | private final DeviceId deviceId; |
| 234 | private final ManagedChannel channel; |
| 235 | |
| 236 | private ChannelConnectivityCallback( |
| 237 | DeviceId deviceId, ManagedChannel channel) { |
| 238 | this.deviceId = deviceId; |
| 239 | this.channel = channel; |
| 240 | } |
| 241 | |
| 242 | @Override |
| 243 | public void run() { |
| 244 | final ConnectivityState newState = channel.getState(false); |
| 245 | final DeviceAgentEvent.Type eventType; |
| 246 | switch (newState) { |
| 247 | // On gRPC connectivity states: |
| 248 | // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md |
| 249 | case READY: |
| 250 | eventType = DeviceAgentEvent.Type.CHANNEL_OPEN; |
| 251 | break; |
| 252 | case TRANSIENT_FAILURE: |
| 253 | eventType = DeviceAgentEvent.Type.CHANNEL_ERROR; |
| 254 | break; |
| 255 | case SHUTDOWN: |
| 256 | eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED; |
| 257 | break; |
| 258 | case IDLE: |
| 259 | // IDLE and CONNECTING are transient states that will |
| 260 | // eventually move to READY or TRANSIENT_FAILURE. Do not |
| 261 | // generate an event for now. |
| 262 | if (persistent) { |
| 263 | log.debug("Forcing channel for {} to exist state IDLE...", deviceId); |
| 264 | channel.getState(true); |
| 265 | } |
| 266 | eventType = null; |
| 267 | break; |
| 268 | case CONNECTING: |
| 269 | eventType = null; |
| 270 | break; |
| 271 | default: |
| 272 | log.error("Unrecognized connectivity state {}", newState); |
| 273 | eventType = null; |
| 274 | } |
| 275 | |
| 276 | if (log.isTraceEnabled()) { |
| 277 | log.trace("Detected channel connectivity change for {}, new state is {}", |
| 278 | deviceId, newState); |
| 279 | } |
| 280 | |
| 281 | if (eventType != null) { |
| 282 | // Avoid sending consecutive duplicate events. |
| 283 | final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN; |
| 284 | final boolean past = channelOpen.getAndSet(present); |
| 285 | if (present != past) { |
| 286 | log.debug("Notifying event {} for {}", eventType, deviceId); |
| 287 | controller.postEvent(new DeviceAgentEvent(eventType, deviceId)); |
| 288 | } |
| 289 | } |
| 290 | |
| 291 | if (newState != ConnectivityState.SHUTDOWN) { |
| 292 | // Channels never leave SHUTDOWN state, no need for a new callback. |
| 293 | setChannelCallback(deviceId, channel, newState); |
| 294 | } |
| 295 | } |
| 296 | } |
Yi Tseng | 2a340f7 | 2018-11-02 16:52:47 -0700 | [diff] [blame] | 297 | } |