blob: 8d93a63adb065cdbb57bbddf1636f59ae9eefd6b [file] [log] [blame]
Yi Tseng890dc3f2018-11-01 13:23:11 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Carmelo Cascone3370c962019-02-07 18:24:19 -080016
17package org.onosproject.gnmi.ctl;
Yi Tseng890dc3f2018-11-01 13:23:11 -070018
19import gnmi.Gnmi.CapabilityRequest;
20import gnmi.Gnmi.CapabilityResponse;
21import gnmi.Gnmi.GetRequest;
22import gnmi.Gnmi.GetResponse;
Yi Tseng5f7fef52018-11-05 11:30:47 -080023import gnmi.Gnmi.Path;
24import gnmi.Gnmi.PathElem;
Yi Tseng890dc3f2018-11-01 13:23:11 -070025import gnmi.Gnmi.SetRequest;
26import gnmi.Gnmi.SetResponse;
Yi Tsenge616d752018-11-27 10:53:27 -080027import gnmi.Gnmi.SubscribeRequest;
Yi Tseng890dc3f2018-11-01 13:23:11 -070028import gnmi.gNMIGrpc;
29import io.grpc.ManagedChannel;
Yi Tseng5f7fef52018-11-05 11:30:47 -080030import io.grpc.Status;
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080031import io.grpc.stub.StreamObserver;
Yi Tsengd7716482018-10-31 15:34:30 -070032import org.onosproject.gnmi.api.GnmiClient;
Yi Tseng890dc3f2018-11-01 13:23:11 -070033import org.onosproject.gnmi.api.GnmiClientKey;
34import org.onosproject.grpc.ctl.AbstractGrpcClient;
Yi Tseng890dc3f2018-11-01 13:23:11 -070035
36import java.util.concurrent.CompletableFuture;
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080037import java.util.concurrent.TimeUnit;
38import java.util.function.Consumer;
Yi Tseng890dc3f2018-11-01 13:23:11 -070039
40/**
41 * Implementation of gNMI client.
42 */
43public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080044
45 private static final int RPC_TIMEOUT_SECONDS = 10;
46
47 private static final GetRequest PING_REQUEST = GetRequest.newBuilder().addPath(
48 Path.newBuilder().addElem(
49 PathElem.newBuilder().setName("onos-gnmi-ping").build()
50 ).build()).build();
51
52 private GnmiSubscriptionManager subscribeManager;
Yi Tseng890dc3f2018-11-01 13:23:11 -070053
Yi Tsenge616d752018-11-27 10:53:27 -080054 GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
Carmelo Cascone3977ea42019-02-28 13:43:42 -080055 super(clientKey, managedChannel, false, controller);
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080056 this.subscribeManager =
57 new GnmiSubscriptionManager(this, deviceId, controller);
Yi Tseng890dc3f2018-11-01 13:23:11 -070058 }
59
60 @Override
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080061 public CompletableFuture<CapabilityResponse> capabilities() {
62 final CompletableFuture<CapabilityResponse> future = new CompletableFuture<>();
63 execRpc(s -> s.capabilities(
64 CapabilityRequest.getDefaultInstance(),
65 unaryObserver(future, CapabilityResponse.getDefaultInstance(),
66 "capabilities request"))
67 );
68 return future;
Yi Tseng890dc3f2018-11-01 13:23:11 -070069 }
70
71 @Override
72 public CompletableFuture<GetResponse> get(GetRequest request) {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080073 final CompletableFuture<GetResponse> future = new CompletableFuture<>();
74 execRpc(s -> s.get(request, unaryObserver(
75 future, GetResponse.getDefaultInstance(), "GET"))
76 );
77 return future;
Yi Tseng890dc3f2018-11-01 13:23:11 -070078 }
79
80 @Override
81 public CompletableFuture<SetResponse> set(SetRequest request) {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -080082 final CompletableFuture<SetResponse> future = new CompletableFuture<>();
83 execRpc(s -> s.set(request, unaryObserver(
84 future, SetResponse.getDefaultInstance(), "SET"))
85 );
86 return future;
87 }
88
89 private <RES> StreamObserver<RES> unaryObserver(
90 final CompletableFuture<RES> future,
91 final RES defaultResponse,
92 final String opDescription) {
93 return new StreamObserver<RES>() {
94 @Override
95 public void onNext(RES value) {
96 future.complete(value);
97 }
98
99 @Override
100 public void onError(Throwable t) {
101 handleRpcError(t, opDescription);
102 future.complete(defaultResponse);
103 }
104
105 @Override
106 public void onCompleted() {
107 // Ignore. Unary call.
108 }
109 };
Yi Tseng890dc3f2018-11-01 13:23:11 -0700110 }
111
Yi Tseng5f7fef52018-11-05 11:30:47 -0800112 @Override
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800113 public void subscribe(SubscribeRequest request) {
114 subscribeManager.subscribe(request);
Yi Tsenge616d752018-11-27 10:53:27 -0800115 }
116
117 @Override
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800118 public void unsubscribe() {
119 subscribeManager.unsubscribe();
Yi Tsenge616d752018-11-27 10:53:27 -0800120 }
121
122 @Override
Carmelo Cascone3977ea42019-02-28 13:43:42 -0800123 public CompletableFuture<Boolean> probeService() {
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800124 final CompletableFuture<Boolean> future = new CompletableFuture<>();
125 final StreamObserver<GetResponse> responseObserver = new StreamObserver<GetResponse>() {
126 @Override
127 public void onNext(GetResponse value) {
128 future.complete(true);
129 }
130
131 @Override
132 public void onError(Throwable t) {
133 // This gRPC call should throw INVALID_ARGUMENT status exception
134 // since "/onos-gnmi-ping" path does not exists in any config
135 // model For other status code such as UNIMPLEMENT, means the
136 // gNMI service is not available on the device.
137 future.complete(Status.fromThrowable(t).getCode()
138 == Status.Code.INVALID_ARGUMENT);
139 }
140
141 @Override
142 public void onCompleted() {
143 // Ignore. Unary call.
144 }
145 };
146 execRpc(s -> s.get(PING_REQUEST, responseObserver));
147 return future;
Yi Tseng5f7fef52018-11-05 11:30:47 -0800148 }
149
Yi Tsenge616d752018-11-27 10:53:27 -0800150 @Override
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800151 public void shutdown() {
152 subscribeManager.shutdown();
153 super.shutdown();
Yi Tsenge616d752018-11-27 10:53:27 -0800154 }
155
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800156 /**
157 * Forces execution of an RPC in a cancellable context with a timeout.
158 *
159 * @param stubConsumer P4Runtime stub consumer
160 */
161 private void execRpc(Consumer<gNMIGrpc.gNMIStub> stubConsumer) {
162 if (log.isTraceEnabled()) {
163 log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
164 RPC_TIMEOUT_SECONDS, context().getDeadline());
Yi Tseng890dc3f2018-11-01 13:23:11 -0700165 }
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800166 runInCancellableContext(() -> stubConsumer.accept(
167 gNMIGrpc.newStub(channel)
168 .withDeadlineAfter(RPC_TIMEOUT_SECONDS, TimeUnit.SECONDS)));
Yi Tseng890dc3f2018-11-01 13:23:11 -0700169 }
170
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800171 /**
172 * Forces execution of an RPC in a cancellable context with no timeout.
173 *
174 * @param stubConsumer P4Runtime stub consumer
175 */
176 void execRpcNoTimeout(Consumer<gNMIGrpc.gNMIStub> stubConsumer) {
177 if (log.isTraceEnabled()) {
178 log.trace("Executing RPC with no timeout (context deadline {})...",
179 context().getDeadline());
Yi Tseng890dc3f2018-11-01 13:23:11 -0700180 }
Carmelo Casconeab5d41e2019-03-06 18:02:34 -0800181 runInCancellableContext(() -> stubConsumer.accept(
182 gNMIGrpc.newStub(channel)));
Yi Tseng5f7fef52018-11-05 11:30:47 -0800183 }
Yi Tseng890dc3f2018-11-01 13:23:11 -0700184}