blob: ce9c5f840eda5fd110f675dd08e6506a7342c2a4 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package protocols.gnmi.ctl.java.org.onosproject.gnmi.ctl;
import gnmi.Gnmi.CapabilityRequest;
import gnmi.Gnmi.CapabilityResponse;
import gnmi.Gnmi.GetRequest;
import gnmi.Gnmi.GetResponse;
import gnmi.Gnmi.Path;
import gnmi.Gnmi.PathElem;
import gnmi.Gnmi.SetRequest;
import gnmi.Gnmi.SetResponse;
import gnmi.Gnmi.SubscribeRequest;
import gnmi.Gnmi.SubscribeResponse;
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiClientKey;
import org.onosproject.gnmi.api.GnmiEvent;
import org.onosproject.gnmi.api.GnmiUpdate;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
import org.slf4j.Logger;
import java.net.ConnectException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
/**
* Implementation of gNMI client.
*/
public class GnmiClientImpl extends AbstractGrpcClient implements GnmiClient {
private static final PathElem DUMMY_PATH_ELEM = PathElem.newBuilder().setName("onos-gnmi-test").build();
private static final Path DUMMY_PATH = Path.newBuilder().addElem(DUMMY_PATH_ELEM).build();
private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
private final Logger log = getLogger(getClass());
private final gNMIGrpc.gNMIBlockingStub blockingStub;
private StreamChannelManager streamChannelManager;
private GnmiControllerImpl controller;
GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
super(clientKey);
this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
this.streamChannelManager = new StreamChannelManager(managedChannel);
this.controller = controller;
}
@Override
public CompletableFuture<CapabilityResponse> capability() {
return supplyInContext(this::doCapability, "capability");
}
@Override
public CompletableFuture<GetResponse> get(GetRequest request) {
return supplyInContext(() -> doGet(request), "get");
}
@Override
public CompletableFuture<SetResponse> set(SetRequest request) {
return supplyInContext(() -> doSet(request), "set");
}
@Override
public boolean subscribe(SubscribeRequest request) {
return streamChannelManager.send(request);
}
@Override
public void terminateSubscriptionChannel() {
streamChannelManager.complete();
}
@Override
public CompletableFuture<Boolean> isServiceAvailable() {
return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
}
@Override
protected Void doShutdown() {
streamChannelManager.complete();
return super.doShutdown();
}
private CapabilityResponse doCapability() {
CapabilityRequest request = CapabilityRequest.newBuilder().build();
try {
return blockingStub.capabilities(request);
} catch (StatusRuntimeException e) {
log.warn("Unable to get capability from {}: {}", deviceId, e.getMessage());
return CapabilityResponse.getDefaultInstance();
}
}
private GetResponse doGet(GetRequest request) {
try {
return blockingStub.get(request);
} catch (StatusRuntimeException e) {
log.warn("Unable to get data from {}: {}", deviceId, e.getMessage());
return GetResponse.getDefaultInstance();
}
}
private SetResponse doSet(SetRequest request) {
try {
return blockingStub.set(request);
} catch (StatusRuntimeException e) {
log.warn("Unable to set data to {}: {}", deviceId, e.getMessage());
return SetResponse.getDefaultInstance();
}
}
private boolean doServiceAvailable() {
try {
return blockingStub.get(DUMMY_REQUEST) != null;
} catch (StatusRuntimeException e) {
// This gRPC call should throw INVALID_ARGUMENT status exception
// since "/onos-gnmi-test" path does not exists in any config model
// For other status code such as UNIMPLEMENT, means the gNMI
// service is not available on the device.
return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
}
}
/**
* A manager for the gNMI stream channel that opportunistically creates
* new stream RCP stubs (e.g. when one fails because of errors) and posts
* subscribe events via the gNMI controller.
*/
private final class StreamChannelManager {
private final ManagedChannel channel;
private final AtomicBoolean open;
private final StreamObserver<SubscribeResponse> responseObserver;
private ClientCallStreamObserver<SubscribeRequest> requestObserver;
private StreamChannelManager(ManagedChannel channel) {
this.channel = channel;
this.responseObserver = new InternalStreamResponseObserver(this);
this.open = new AtomicBoolean(false);
}
private void initIfRequired() {
if (requestObserver == null) {
log.debug("Creating new stream channel for {}...", deviceId);
requestObserver = (ClientCallStreamObserver<SubscribeRequest>)
gNMIGrpc.newStub(channel).subscribe(responseObserver);
open.set(false);
}
}
public boolean send(SubscribeRequest value) {
synchronized (this) {
initIfRequired();
try {
requestObserver.onNext(value);
return true;
} catch (Throwable ex) {
if (ex instanceof StatusRuntimeException) {
log.warn("Unable to send subscribe request to {}: {}",
deviceId, ex.getMessage());
} else {
log.warn("Exception while sending subscribe request to {}",
deviceId, ex);
}
complete();
return false;
}
}
}
public void complete() {
synchronized (this) {
if (requestObserver != null) {
requestObserver.onCompleted();
requestObserver.cancel("Terminated", null);
requestObserver = null;
}
}
}
}
/**
* Handles messages received from the device on the stream channel.
*/
private final class InternalStreamResponseObserver
implements StreamObserver<SubscribeResponse> {
private final StreamChannelManager streamChannelManager;
private InternalStreamResponseObserver(
StreamChannelManager streamChannelManager) {
this.streamChannelManager = streamChannelManager;
}
@Override
public void onNext(SubscribeResponse message) {
executorService.submit(() -> doNext(message));
}
private void doNext(SubscribeResponse message) {
try {
log.debug("Received message on stream channel from {}: {}",
deviceId, message.toString());
GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
controller.postEvent(event);
} catch (Throwable ex) {
log.error("Exception while processing stream message from {}",
deviceId, ex);
}
}
@Override
public void onError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) throwable;
if (sre.getStatus().getCause() instanceof ConnectException) {
log.warn("Device {} is unreachable ({})",
deviceId, sre.getCause().getMessage());
} else {
log.warn("Received error on stream channel for {}: {}",
deviceId, throwable.getMessage());
}
} else {
log.warn(format("Received exception on stream channel for %s",
deviceId), throwable);
}
streamChannelManager.complete();
}
@Override
public void onCompleted() {
log.warn("Stream channel for {} has completed", deviceId);
streamChannelManager.complete();
}
}
}