blob: 933c7cece0c6fad719a7bb924608dd20e34357e4 [file] [log] [blame]
Yi Tseng2a340f72018-11-02 16:52:47 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.grpc.ctl;
18
Carmelo Cascone3977ea42019-02-28 13:43:42 -080019import io.grpc.ConnectivityState;
Yi Tseng2a340f72018-11-02 16:52:47 -070020import io.grpc.Context;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080021import io.grpc.ManagedChannel;
Yi Tseng2a340f72018-11-02 16:52:47 -070022import io.grpc.StatusRuntimeException;
Yi Tseng2a340f72018-11-02 16:52:47 -070023import org.onosproject.grpc.api.GrpcClient;
Yi Tseng2a340f72018-11-02 16:52:47 -070024import org.onosproject.net.DeviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080025import org.onosproject.net.device.DeviceAgentEvent;
Yi Tseng2a340f72018-11-02 16:52:47 -070026import org.slf4j.Logger;
27
Carmelo Cascone3977ea42019-02-28 13:43:42 -080028import java.util.concurrent.atomic.AtomicBoolean;
Yi Tseng2a340f72018-11-02 16:52:47 -070029
Carmelo Cascone4c289b72019-01-22 15:30:45 -080030import static com.google.common.base.Preconditions.checkNotNull;
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080031import static java.lang.String.format;
Yi Tseng2a340f72018-11-02 16:52:47 -070032import static org.slf4j.LoggerFactory.getLogger;
33
34/**
35 * Abstract client for gRPC service.
Yi Tseng2a340f72018-11-02 16:52:47 -070036 */
37public abstract class AbstractGrpcClient implements GrpcClient {
38
Yi Tseng2a340f72018-11-02 16:52:47 -070039 protected final Logger log = getLogger(getClass());
40
Yi Tsengd7716482018-10-31 15:34:30 -070041 private final Context.CancellableContext cancellableContext =
Yi Tseng2a340f72018-11-02 16:52:47 -070042 Context.current().withCancellation();
Yi Tsengd7716482018-10-31 15:34:30 -070043
Yi Tsengd7716482018-10-31 15:34:30 -070044 protected final DeviceId deviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080045 protected final ManagedChannel channel;
46 private final boolean persistent;
47 private final AbstractGrpcClientController controller;
48 private final AtomicBoolean channelOpen = new AtomicBoolean(false);
Yi Tseng2a340f72018-11-02 16:52:47 -070049
Carmelo Cascone3977ea42019-02-28 13:43:42 -080050 /**
Carmelo Casconec2be50a2019-04-10 00:15:39 -070051 * Creates an new client for the given device and channel. Setting
52 * persistent to true avoids the gRPC channel to go {@link
53 * ConnectivityState#IDLE}. The controller instance is needed to propagate
54 * channel events.
Carmelo Cascone3977ea42019-02-28 13:43:42 -080055 *
Carmelo Casconec2be50a2019-04-10 00:15:39 -070056 * @param deviceId device ID
Carmelo Cascone3977ea42019-02-28 13:43:42 -080057 * @param channel channel
58 * @param persistent true if the gRPC should never stay IDLE
59 * @param controller controller
60 */
Carmelo Casconec2be50a2019-04-10 00:15:39 -070061 protected AbstractGrpcClient(DeviceId deviceId, ManagedChannel channel,
Carmelo Cascone3977ea42019-02-28 13:43:42 -080062 boolean persistent, AbstractGrpcClientController controller) {
Carmelo Casconec2be50a2019-04-10 00:15:39 -070063 checkNotNull(deviceId);
Carmelo Cascone3977ea42019-02-28 13:43:42 -080064 checkNotNull(channel);
Carmelo Casconec2be50a2019-04-10 00:15:39 -070065 this.deviceId = deviceId;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080066 this.channel = channel;
67 this.persistent = persistent;
68 this.controller = controller;
Carmelo Cascone3977ea42019-02-28 13:43:42 -080069
Carmelo Casconec2be50a2019-04-10 00:15:39 -070070 setChannelCallback(ConnectivityState.CONNECTING);
Carmelo Cascone3977ea42019-02-28 13:43:42 -080071 }
72
73 @Override
74 public boolean isServerReachable() {
75 final ConnectivityState state = channel.getState(false);
76 switch (state) {
77 case READY:
78 case IDLE:
79 return true;
80 case CONNECTING:
81 case TRANSIENT_FAILURE:
82 case SHUTDOWN:
83 return false;
84 default:
85 log.error("Unrecognized channel connectivity state {}", state);
86 return false;
87 }
Yi Tseng2a340f72018-11-02 16:52:47 -070088 }
89
90 @Override
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080091 public void shutdown() {
Carmelo Cascone4c289b72019-01-22 15:30:45 -080092 if (cancellableContext.isCancelled()) {
93 log.warn("Context is already cancelled, " +
94 "ignoring request to shutdown for {}...", deviceId);
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080095 return;
Carmelo Cascone4c289b72019-01-22 15:30:45 -080096 }
Carmelo Casconec2be50a2019-04-10 00:15:39 -070097 log.debug("Shutting down client for {}...", deviceId);
Yi Tseng2a340f72018-11-02 16:52:47 -070098 cancellableContext.cancel(new InterruptedException(
99 "Requested client shutdown"));
Yi Tseng2a340f72018-11-02 16:52:47 -0700100 }
101
102 /**
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800103 * Executes the given task in the cancellable context of this client.
104 *
105 * @param task task
106 * @throws IllegalStateException if context has been cancelled
107 */
108 protected void runInCancellableContext(Runnable task) {
109 if (this.cancellableContext.isCancelled()) {
110 throw new IllegalStateException(
111 "Context is cancelled (client has been shut down)");
112 }
113 this.cancellableContext.run(task);
114 }
115
116 /**
117 * Returns the context associated with this client.
118 *
119 * @return context
120 */
121 protected Context.CancellableContext context() {
122 return cancellableContext;
123 }
124
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800125 protected void handleRpcError(Throwable throwable, String opDescription) {
126 if (throwable instanceof StatusRuntimeException) {
127 final StatusRuntimeException sre = (StatusRuntimeException) throwable;
128 final String logMsg;
129 if (sre.getCause() == null) {
130 logMsg = sre.getMessage();
131 } else {
132 logMsg = format("%s (%s)", sre.getMessage(), sre.getCause().toString());
133 }
134 log.warn("Error while performing {} on {}: {}",
135 opDescription, deviceId, logMsg);
136 log.debug("", throwable);
137 return;
Carmelo Cascone4c289b72019-01-22 15:30:45 -0800138 }
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800139 log.error(format("Exception while performing %s on %s",
140 opDescription, deviceId), throwable);
Yi Tseng2a340f72018-11-02 16:52:47 -0700141 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800142
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700143 private void setChannelCallback(ConnectivityState sourceState) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800144 if (log.isTraceEnabled()) {
145 log.trace("Setting channel callback for {} with source state {}...",
146 deviceId, sourceState);
147 }
148 channel.notifyWhenStateChanged(
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700149 sourceState, this::channelStateCallback);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800150 }
151
152 /**
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700153 * Invoked at each change of the channel connectivity state. New callbacks
154 * are created as long as the channel is not shut down.
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800155 */
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700156 private void channelStateCallback() {
157 final ConnectivityState newState = channel.getState(false);
158 final DeviceAgentEvent.Type eventType;
159 switch (newState) {
160 // On gRPC connectivity states:
161 // https://github.com/grpc/grpc/blob/master/doc/connectivity-semantics-and-api.md
162 case READY:
163 eventType = DeviceAgentEvent.Type.CHANNEL_OPEN;
164 break;
165 case TRANSIENT_FAILURE:
166 eventType = DeviceAgentEvent.Type.CHANNEL_ERROR;
167 break;
168 case SHUTDOWN:
169 eventType = DeviceAgentEvent.Type.CHANNEL_CLOSED;
170 break;
171 case IDLE:
172 // IDLE and CONNECTING are transient states that will
173 // eventually move to READY or TRANSIENT_FAILURE. Do not
174 // generate an event for now.
175 if (persistent) {
176 log.debug("Forcing channel for {} to exist state IDLE...", deviceId);
177 channel.getState(true);
178 }
179 eventType = null;
180 break;
181 case CONNECTING:
182 eventType = null;
183 break;
184 default:
185 log.error("Unrecognized connectivity state {}", newState);
186 eventType = null;
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800187 }
188
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700189 if (log.isTraceEnabled()) {
190 log.trace("Detected channel connectivity change for {}, new state is {}",
191 deviceId, newState);
192 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800193
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700194 if (eventType != null) {
195 // Avoid sending consecutive duplicate events.
196 final boolean present = eventType == DeviceAgentEvent.Type.CHANNEL_OPEN;
197 final boolean past = channelOpen.getAndSet(present);
198 if (present != past) {
199 log.debug("Notifying event {} for {}", eventType, deviceId);
200 controller.postEvent(new DeviceAgentEvent(eventType, deviceId));
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800201 }
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700202 }
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800203
Carmelo Casconec2be50a2019-04-10 00:15:39 -0700204 if (newState != ConnectivityState.SHUTDOWN) {
205 // Channels never leave SHUTDOWN state, no need for a new callback.
206 setChannelCallback(newState);
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800207 }
208 }
Yi Tseng2a340f72018-11-02 16:52:47 -0700209}