blob: ce9c5f840eda5fd110f675dd08e6506a7342c2a4 [file] [log] [blame]
Yi Tseng890dc3f2018-11-01 13:23:11 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package protocols.gnmi.ctl.java.org.onosproject.gnmi.ctl;
17
18import gnmi.Gnmi.CapabilityRequest;
19import gnmi.Gnmi.CapabilityResponse;
20import gnmi.Gnmi.GetRequest;
21import gnmi.Gnmi.GetResponse;
Yi Tseng5f7fef52018-11-05 11:30:47 -080022import gnmi.Gnmi.Path;
23import gnmi.Gnmi.PathElem;
Yi Tseng890dc3f2018-11-01 13:23:11 -070024import gnmi.Gnmi.SetRequest;
25import gnmi.Gnmi.SetResponse;
Yi Tsenge616d752018-11-27 10:53:27 -080026import gnmi.Gnmi.SubscribeRequest;
27import gnmi.Gnmi.SubscribeResponse;
Yi Tseng890dc3f2018-11-01 13:23:11 -070028import gnmi.gNMIGrpc;
29import io.grpc.ManagedChannel;
Yi Tseng5f7fef52018-11-05 11:30:47 -080030import io.grpc.Status;
Yi Tseng890dc3f2018-11-01 13:23:11 -070031import io.grpc.StatusRuntimeException;
Yi Tsenge616d752018-11-27 10:53:27 -080032import io.grpc.stub.ClientCallStreamObserver;
33import io.grpc.stub.StreamObserver;
Yi Tsengd7716482018-10-31 15:34:30 -070034import org.onosproject.gnmi.api.GnmiClient;
Yi Tseng890dc3f2018-11-01 13:23:11 -070035import org.onosproject.gnmi.api.GnmiClientKey;
Yi Tsenge616d752018-11-27 10:53:27 -080036import org.onosproject.gnmi.api.GnmiEvent;
37import org.onosproject.gnmi.api.GnmiUpdate;
Yi Tseng890dc3f2018-11-01 13:23:11 -070038import org.onosproject.grpc.ctl.AbstractGrpcClient;
39import org.slf4j.Logger;
Yi Tseng890dc3f2018-11-01 13:23:11 -070040
Yi Tsenge616d752018-11-27 10:53:27 -080041import java.net.ConnectException;
Yi Tseng890dc3f2018-11-01 13:23:11 -070042import java.util.concurrent.CompletableFuture;
Yi Tsenge616d752018-11-27 10:53:27 -080043import java.util.concurrent.atomic.AtomicBoolean;
Yi Tseng890dc3f2018-11-01 13:23:11 -070044
Yi Tsenge616d752018-11-27 10:53:27 -080045import static java.lang.String.format;
Yi Tseng890dc3f2018-11-01 13:23:11 -070046import static org.slf4j.LoggerFactory.getLogger;
47
48/**
49 * Implementation of gNMI client.
50 */
51public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
Yi Tseng5f7fef52018-11-05 11:30:47 -080052 private static final PathElem DUMMY_PATH_ELEM = PathElem.newBuilder().setName("onos-gnmi-test").build();
53 private static final Path DUMMY_PATH = Path.newBuilder().addElem(DUMMY_PATH_ELEM).build();
54 private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
Yi Tseng890dc3f2018-11-01 13:23:11 -070055 private final Logger log = getLogger(getClass());
56 private final gNMIGrpc.gNMIBlockingStub blockingStub;
Yi Tsenge616d752018-11-27 10:53:27 -080057 private StreamChannelManager streamChannelManager;
58 private GnmiControllerImpl controller;
Yi Tseng890dc3f2018-11-01 13:23:11 -070059
Yi Tsenge616d752018-11-27 10:53:27 -080060 GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
Yi Tsengd7716482018-10-31 15:34:30 -070061 super(clientKey);
Yi Tseng890dc3f2018-11-01 13:23:11 -070062 this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
Yi Tsenge616d752018-11-27 10:53:27 -080063 this.streamChannelManager = new StreamChannelManager(managedChannel);
64 this.controller = controller;
Yi Tseng890dc3f2018-11-01 13:23:11 -070065 }
66
67 @Override
68 public CompletableFuture<CapabilityResponse> capability() {
69 return supplyInContext(this::doCapability, "capability");
70 }
71
72 @Override
73 public CompletableFuture<GetResponse> get(GetRequest request) {
74 return supplyInContext(() -> doGet(request), "get");
75 }
76
77 @Override
78 public CompletableFuture<SetResponse> set(SetRequest request) {
79 return supplyInContext(() -> doSet(request), "set");
80 }
81
Yi Tseng5f7fef52018-11-05 11:30:47 -080082 @Override
Yi Tsenge616d752018-11-27 10:53:27 -080083 public boolean subscribe(SubscribeRequest request) {
84 return streamChannelManager.send(request);
85 }
86
87 @Override
88 public void terminateSubscriptionChannel() {
89 streamChannelManager.complete();
90 }
91
92 @Override
Yi Tseng5f7fef52018-11-05 11:30:47 -080093 public CompletableFuture<Boolean> isServiceAvailable() {
94 return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
95 }
96
Yi Tsenge616d752018-11-27 10:53:27 -080097 @Override
98 protected Void doShutdown() {
99 streamChannelManager.complete();
100 return super.doShutdown();
101 }
102
Yi Tseng890dc3f2018-11-01 13:23:11 -0700103 private CapabilityResponse doCapability() {
104 CapabilityRequest request = CapabilityRequest.newBuilder().build();
105 try {
106 return blockingStub.capabilities(request);
107 } catch (StatusRuntimeException e) {
108 log.warn("Unable to get capability from {}: {}", deviceId, e.getMessage());
Yi Tseng59d5f3e2018-11-27 23:09:41 -0800109 return CapabilityResponse.getDefaultInstance();
Yi Tseng890dc3f2018-11-01 13:23:11 -0700110 }
111 }
112
113 private GetResponse doGet(GetRequest request) {
114 try {
115 return blockingStub.get(request);
116 } catch (StatusRuntimeException e) {
117 log.warn("Unable to get data from {}: {}", deviceId, e.getMessage());
Yi Tseng59d5f3e2018-11-27 23:09:41 -0800118 return GetResponse.getDefaultInstance();
Yi Tseng890dc3f2018-11-01 13:23:11 -0700119 }
120 }
121
122 private SetResponse doSet(SetRequest request) {
123 try {
124 return blockingStub.set(request);
125 } catch (StatusRuntimeException e) {
126 log.warn("Unable to set data to {}: {}", deviceId, e.getMessage());
Yi Tseng59d5f3e2018-11-27 23:09:41 -0800127 return SetResponse.getDefaultInstance();
Yi Tseng890dc3f2018-11-01 13:23:11 -0700128 }
129 }
Yi Tseng5f7fef52018-11-05 11:30:47 -0800130
131 private boolean doServiceAvailable() {
132 try {
Yi Tsengd7716482018-10-31 15:34:30 -0700133 return blockingStub.get(DUMMY_REQUEST) != null;
Yi Tseng5f7fef52018-11-05 11:30:47 -0800134 } catch (StatusRuntimeException e) {
135 // This gRPC call should throw INVALID_ARGUMENT status exception
136 // since "/onos-gnmi-test" path does not exists in any config model
137 // For other status code such as UNIMPLEMENT, means the gNMI
138 // service is not available on the device.
139 return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
140 }
141 }
Yi Tsenge616d752018-11-27 10:53:27 -0800142
143
144
145 /**
146 * A manager for the gNMI stream channel that opportunistically creates
147 * new stream RCP stubs (e.g. when one fails because of errors) and posts
148 * subscribe events via the gNMI controller.
149 */
150 private final class StreamChannelManager {
151
152 private final ManagedChannel channel;
153 private final AtomicBoolean open;
154 private final StreamObserver<SubscribeResponse> responseObserver;
155 private ClientCallStreamObserver<SubscribeRequest> requestObserver;
156
157 private StreamChannelManager(ManagedChannel channel) {
158 this.channel = channel;
159 this.responseObserver = new InternalStreamResponseObserver(this);
160 this.open = new AtomicBoolean(false);
161 }
162
163 private void initIfRequired() {
164 if (requestObserver == null) {
165 log.debug("Creating new stream channel for {}...", deviceId);
166 requestObserver = (ClientCallStreamObserver<SubscribeRequest>)
167 gNMIGrpc.newStub(channel).subscribe(responseObserver);
168 open.set(false);
169 }
170 }
171
172 public boolean send(SubscribeRequest value) {
173 synchronized (this) {
174 initIfRequired();
175 try {
176 requestObserver.onNext(value);
177 return true;
178 } catch (Throwable ex) {
179 if (ex instanceof StatusRuntimeException) {
180 log.warn("Unable to send subscribe request to {}: {}",
181 deviceId, ex.getMessage());
182 } else {
183 log.warn("Exception while sending subscribe request to {}",
184 deviceId, ex);
185 }
186 complete();
187 return false;
188 }
189 }
190 }
191
192 public void complete() {
193 synchronized (this) {
194 if (requestObserver != null) {
195 requestObserver.onCompleted();
196 requestObserver.cancel("Terminated", null);
197 requestObserver = null;
198 }
199 }
200 }
201 }
202
203
204 /**
205 * Handles messages received from the device on the stream channel.
206 */
207 private final class InternalStreamResponseObserver
208 implements StreamObserver<SubscribeResponse> {
209
210 private final StreamChannelManager streamChannelManager;
211
212 private InternalStreamResponseObserver(
213 StreamChannelManager streamChannelManager) {
214 this.streamChannelManager = streamChannelManager;
215 }
216
217 @Override
218 public void onNext(SubscribeResponse message) {
219 executorService.submit(() -> doNext(message));
220 }
221
222 private void doNext(SubscribeResponse message) {
223 try {
224 log.debug("Received message on stream channel from {}: {}",
225 deviceId, message.toString());
226 GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
227 GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
228 controller.postEvent(event);
229 } catch (Throwable ex) {
230 log.error("Exception while processing stream message from {}",
231 deviceId, ex);
232 }
233 }
234
235 @Override
236 public void onError(Throwable throwable) {
237 if (throwable instanceof StatusRuntimeException) {
238 StatusRuntimeException sre = (StatusRuntimeException) throwable;
239 if (sre.getStatus().getCause() instanceof ConnectException) {
240 log.warn("Device {} is unreachable ({})",
241 deviceId, sre.getCause().getMessage());
242 } else {
243 log.warn("Received error on stream channel for {}: {}",
244 deviceId, throwable.getMessage());
245 }
246 } else {
247 log.warn(format("Received exception on stream channel for %s",
248 deviceId), throwable);
249 }
250 streamChannelManager.complete();
251 }
252
253 @Override
254 public void onCompleted() {
255 log.warn("Stream channel for {} has completed", deviceId);
256 streamChannelManager.complete();
257 }
258 }
Yi Tseng890dc3f2018-11-01 13:23:11 -0700259}