blob: befa334470cc75cf03113c67ba065e9f6e42392b [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
Carmelo Cascone3977ea42019-02-28 13:43:42 -080019import io.grpc.ConnectivityState;
Yi Tseng2a340f72018-11-02 16:52:47 -070020import io.grpc.Context;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080021import io.grpc.ManagedChannel;
Yi Tseng2a340f72018-11-02 16:52:47 -070022import io.grpc.StatusRuntimeException;
Yi Tseng2a340f72018-11-02 16:52:47 -070023import org.onosproject.grpc.api.GrpcClient;
24import org.onosproject.grpc.api.GrpcClientKey;
25import org.onosproject.net.DeviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080026import org.onosproject.net.device.DeviceAgentEvent;
Yi Tseng2a340f72018-11-02 16:52:47 -070027import org.slf4j.Logger;
28
Carmelo Cascone3977ea42019-02-28 13:43:42 -080029import java.util.concurrent.atomic.AtomicBoolean;
Yi Tseng2a340f72018-11-02 16:52:47 -070030
Carmelo Cascone4c289b72019-01-22 15:30:45 -080031import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080032import static java.lang.String.format;
Yi Tseng2a340f72018-11-02 16:52:47 -070033import static org.slf4j.LoggerFactory.getLogger;
34
35/**
36 * Abstract client for gRPC service.
Yi Tseng2a340f72018-11-02 16:52:47 -070037 */
38public abstract class AbstractGrpcClient implements GrpcClient {
39
Yi Tseng2a340f72018-11-02 16:52:47 -070040 protected final Logger log = getLogger(getClass());
41
Yi Tsengd7716482018-10-31 15:34:30 -070042 private final Context.CancellableContext cancellableContext =
Yi Tseng2a340f72018-11-02 16:52:47 -070043 Context.current().withCancellation();
Yi Tsengd7716482018-10-31 15:34:30 -070044
Yi Tsengd7716482018-10-31 15:34:30 -070045 protected final DeviceId deviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080046 protected final ManagedChannel channel;
47 private final boolean persistent;
48 private final AbstractGrpcClientController controller;
49 private final AtomicBoolean channelOpen = new AtomicBoolean(false);
Yi Tseng2a340f72018-11-02 16:52:47 -070050
Carmelo Cascone3977ea42019-02-28 13:43:42 -080051 /**
52 * Creates an new client for the given key and channel. Setting persistent
53 * to true avoids the gRPC channel to stay IDLE. The controller instance is
54 * needed to propagate channel events.
55 *
56 * @param clientKey client key
57 * @param channel channel
58 * @param persistent true if the gRPC should never stay IDLE
59 * @param controller controller
60 */
61 protected AbstractGrpcClient(GrpcClientKey clientKey, ManagedChannel channel,
62 boolean persistent, AbstractGrpcClientController controller) {
Carmelo Cascone4c289b72019-01-22 15:30:45 -080063 checkNotNull(clientKey);
Carmelo Cascone3977ea42019-02-28 13:43:42 -080064 checkNotNull(channel);
Yi Tseng2a340f72018-11-02 16:52:47 -070065 this.deviceId = clientKey.deviceId();
Carmelo Cascone3977ea42019-02-28 13:43:42 -080066 this.channel = channel;
67 this.persistent = persistent;
68 this.controller = controller;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080069
70 setChannelCallback(clientKey.deviceId(), channel, ConnectivityState.CONNECTING);
71 }
72
73 @Override
74 public boolean isServerReachable() {
75 final ConnectivityState state = channel.getState(false);
76 switch (state) {
77 case READY:
78 case IDLE:
79 return true;
80 case CONNECTING:
81 case TRANSIENT_FAILURE:
82 case SHUTDOWN:
83 return false;
84 default:
85 log.error("Unrecognized channel connectivity state {}", state);
86 return false;
87 }
Yi Tseng2a340f72018-11-02 16:52:47 -070088 }
89
90 @Override
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080091 public void shutdown() {
Carmelo Cascone4c289b72019-01-22 15:30:45 -080092 if (cancellableContext.isCancelled()) {
93 log.warn("Context is already cancelled, " +
94 "ignoring request to shutdown for {}...", deviceId);
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080095 return;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080096 }
Carmelo Cascone4c289b72019-01-22 15:30:45 -080097 log.warn("Shutting down client for {}...", deviceId);
Yi Tseng2a340f72018-11-02 16:52:47 -070098 cancellableContext.cancel(new InterruptedException(
99 "Requested client shutdown"));
Yi Tseng2a340f72018-11-02 16:52:47 -0700100 }
101
102 /**
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800103 * Executes the given task in the cancellable context of this client.
104 *
105 * @param task task
106 * @throws IllegalStateException if context has been cancelled
107 */
108 protected void runInCancellableContext(Runnable task) {
109 if (this.cancellableContext.isCancelled()) {
110 throw new IllegalStateException(
111 "Context is cancelled (client has been shut down)");
112 }
113 this.cancellableContext.run(task);
114 }
115
116 /**
117 * Returns the context associated with this client.
118 *
119 * @return context
120 */
121 protected Context.CancellableContext context() {
122 return cancellableContext;
123 }
124
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800125 protected void handleRpcError(Throwable throwable, String opDescription) {
126 if (throwable instanceof StatusRuntimeException) {
127 final StatusRuntimeException sre = (StatusRuntimeException) throwable;
128 final String logMsg;
129 if (sre.getCause() == null) {
130 logMsg = sre.getMessage();
131 } else {
132 logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
133 }
134 log.warn("Error while performing {} on {}: {}",
135 opDescription, deviceId, logMsg);
136 log.debug("", throwable);
137 return;
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800138 }
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800139 log.error(format("Exception while performing %s on %s",
140 opDescription, deviceId), throwable);
Yi Tseng2a340f72018-11-02 16:52:47 -0700141 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800142
143 private void setChannelCallback(DeviceId deviceId, ManagedChannel channel,
144 ConnectivityState sourceState) {
145 if (log.isTraceEnabled()) {
146 log.trace("Setting channel callback for {} with source state {}...",
147 deviceId, sourceState);
148 }
149 channel.notifyWhenStateChanged(
150 sourceState, new ChannelConnectivityCallback(deviceId, channel));
151 }
152
153 /**
154 * Runnable task invoked at each change of the channel connectivity state.
155 * New callbacks are created as long as the channel is not shut down.
156 */
157 private final class ChannelConnectivityCallback implements Runnable {
158
159 private final DeviceId deviceId;
160 private final ManagedChannel channel;
161
162 private ChannelConnectivityCallback(
163 DeviceId deviceId, ManagedChannel channel) {
164 this.deviceId = deviceId;
165 this.channel = channel;
166 }
167
168 @Override
169 public void run() {
170 final ConnectivityState newState = channel.getState(false);
171 final DeviceAgentEvent.Type eventType;
172 switch (newState) {
173 // On gRPC connectivity states:
174 // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
175 case READY:
176 eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
177 break;
178 case TRANSIENT_FAILURE:
179 eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
180 break;
181 case SHUTDOWN:
182 eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
183 break;
184 case IDLE:
185 // IDLE and CONNECTING are transient states that will
186 // eventually move to READY or TRANSIENT_FAILURE. Do not
187 // generate an event for now.
188 if (persistent) {
189 log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
190 channel.getState(true);
191 }
192 eventType = null;
193 break;
194 case CONNECTING:
195 eventType = null;
196 break;
197 default:
198 log.error("Unrecognized connectivity state {}", newState);
199 eventType = null;
200 }
201
202 if (log.isTraceEnabled()) {
203 log.trace("Detected channel connectivity change for {}, new state is {}",
204 deviceId, newState);
205 }
206
207 if (eventType != null) {
208 // Avoid sending consecutive duplicate events.
209 final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
210 final boolean past = channelOpen.getAndSet(present);
211 if (present != past) {
212 log.debug("Notifying event {} for {}", eventType, deviceId);
213 controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
214 }
215 }
216
217 if (newState != ConnectivityState.SHUTDOWN) {
218 // Channels never leave SHUTDOWN state, no need for a new callback.
219 setChannelCallback(deviceId, channel, newState);
220 }
221 }
222 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700223}