blob: 8d93a63adb065cdbb57bbddf1636f59ae9eefd6b [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.gnmi.ctl;
import gnmi.Gnmi.CapabilityRequest;
import gnmi.Gnmi.CapabilityResponse;
import gnmi.Gnmi.GetRequest;
import gnmi.Gnmi.GetResponse;
import gnmi.Gnmi.Path;
import gnmi.Gnmi.PathElem;
import gnmi.Gnmi.SetRequest;
import gnmi.Gnmi.SetResponse;
import gnmi.Gnmi.SubscribeRequest;
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiClientKey;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
/**
* Implementation of gNMI client.
*/
public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
private static final int RPC_TIMEOUT_SECONDS = 10;
private static final GetRequest PING_REQUEST = GetRequest.newBuilder().addPath(
Path.newBuilder().addElem(
PathElem.newBuilder().setName("onos-gnmi-ping").build()
).build()).build();
private GnmiSubscriptionManager subscribeManager;
GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
super(clientKey, managedChannel, false, controller);
this.subscribeManager =
new GnmiSubscriptionManager(this, deviceId, controller);
}
@Override
public CompletableFuture<CapabilityResponse> capabilities() {
final CompletableFuture<CapabilityResponse> future = new CompletableFuture<>();
execRpc(s -> s.capabilities(
CapabilityRequest.getDefaultInstance(),
unaryObserver(future, CapabilityResponse.getDefaultInstance(),
"capabilities request"))
);
return future;
}
@Override
public CompletableFuture<GetResponse> get(GetRequest request) {
final CompletableFuture<GetResponse> future = new CompletableFuture<>();
execRpc(s -> s.get(request, unaryObserver(
future, GetResponse.getDefaultInstance(), "GET"))
);
return future;
}
@Override
public CompletableFuture<SetResponse> set(SetRequest request) {
final CompletableFuture<SetResponse> future = new CompletableFuture<>();
execRpc(s -> s.set(request, unaryObserver(
future, SetResponse.getDefaultInstance(), "SET"))
);
return future;
}
private <RES> StreamObserver<RES> unaryObserver(
final CompletableFuture<RES> future,
final RES defaultResponse,
final String opDescription) {
return new StreamObserver<RES>() {
@Override
public void onNext(RES value) {
future.complete(value);
}
@Override
public void onError(Throwable t) {
handleRpcError(t, opDescription);
future.complete(defaultResponse);
}
@Override
public void onCompleted() {
// Ignore. Unary call.
}
};
}
@Override
public void subscribe(SubscribeRequest request) {
subscribeManager.subscribe(request);
}
@Override
public void unsubscribe() {
subscribeManager.unsubscribe();
}
@Override
public CompletableFuture<Boolean> probeService() {
final CompletableFuture<Boolean> future = new CompletableFuture<>();
final StreamObserver<GetResponse> responseObserver = new StreamObserver<GetResponse>() {
@Override
public void onNext(GetResponse value) {
future.complete(true);
}
@Override
public void onError(Throwable t) {
// This gRPC call should throw INVALID_ARGUMENT status exception
// since "/onos-gnmi-ping" path does not exists in any config
// model For other status code such as UNIMPLEMENT, means the
// gNMI service is not available on the device.
future.complete(Status.fromThrowable(t).getCode()
== Status.Code.INVALID_ARGUMENT);
}
@Override
public void onCompleted() {
// Ignore. Unary call.
}
};
execRpc(s -> s.get(PING_REQUEST, responseObserver));
return future;
}
@Override
public void shutdown() {
subscribeManager.shutdown();
super.shutdown();
}
/**
* Forces execution of an RPC in a cancellable context with a timeout.
*
* @param stubConsumer P4Runtime stub consumer
*/
private void execRpc(Consumer<gNMIGrpc.gNMIStub> stubConsumer) {
if (log.isTraceEnabled()) {
log.trace("Executing RPC with timeout {} seconds (context deadline {})...",
RPC_TIMEOUT_SECONDS, context().getDeadline());
}
runInCancellableContext(() -> stubConsumer.accept(
gNMIGrpc.newStub(channel)
.withDeadlineAfter(RPC_TIMEOUT_SECONDS, TimeUnit.SECONDS)));
}
/**
* Forces execution of an RPC in a cancellable context with no timeout.
*
* @param stubConsumer P4Runtime stub consumer
*/
void execRpcNoTimeout(Consumer<gNMIGrpc.gNMIStub> stubConsumer) {
if (log.isTraceEnabled()) {
log.trace("Executing RPC with no timeout (context deadline {})...",
context().getDeadline());
}
runInCancellableContext(() -> stubConsumer.accept(
gNMIGrpc.newStub(channel)));
}
}