blob: 050925f624243163d44bb02a057d2c97564a1614 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package protocols.gnmi.ctl.java.org.onosproject.gnmi.ctl;
import gnmi.Gnmi;
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiEvent;
import org.onosproject.gnmi.api.GnmiUpdate;
import org.onosproject.net.DeviceId;
import org.slf4j.Logger;
import java.net.ConnectException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* A manager for the gNMI stream channel that opportunistically creates
* new stream RCP stubs (e.g. when one fails because of errors) and posts
* subscribe events via the gNMI controller.
*/
final class GnmiSubscriptionManager {
/**
* The state of the subscription manager.
*/
enum State {
/**
* Subscription not exists.
*/
INIT,
/**
* Exists a subscription and channel opened.
*/
SUBSCRIBED,
/**
* Exists a subscription, but the channel does not open.
*/
RETRYING,
}
// FIXME: make this configurable
private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds
private static final Logger log = getLogger(GnmiSubscriptionManager.class);
private final ManagedChannel channel;
private final DeviceId deviceId;
private final GnmiControllerImpl controller;
private final StreamObserver<Gnmi.SubscribeResponse> responseObserver;
private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
private ClientCallStreamObserver<Gnmi.SubscribeRequest> requestObserver;
private Gnmi.SubscribeRequest existingSubscription;
private final ScheduledExecutorService streamCheckerExecutor =
newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-probe", "%d", log));
GnmiSubscriptionManager(ManagedChannel channel, DeviceId deviceId,
GnmiControllerImpl controller) {
this.channel = channel;
this.deviceId = deviceId;
this.controller = controller;
this.responseObserver = new InternalStreamResponseObserver();
streamCheckerExecutor.scheduleAtFixedRate(this::checkGnmiStream, 0,
DEFAULT_RECONNECT_DELAY,
TimeUnit.SECONDS);
}
public void shutdown() {
log.info("gNMI subscription manager for device {} shutdown", deviceId);
streamCheckerExecutor.shutdown();
complete();
}
private void initIfRequired() {
if (requestObserver == null) {
log.debug("Creating new stream channel for {}...", deviceId);
requestObserver = (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
gNMIGrpc.newStub(channel).subscribe(responseObserver);
}
}
boolean subscribe(Gnmi.SubscribeRequest request) {
synchronized (state) {
if (state.get() == State.SUBSCRIBED) {
// Cancel subscription when we need to subscribe new thing
complete();
}
existingSubscription = request;
return send(request);
}
}
private boolean send(Gnmi.SubscribeRequest value) {
initIfRequired();
try {
requestObserver.onNext(value);
state.set(State.SUBSCRIBED);
return true;
} catch (Throwable ex) {
if (ex instanceof StatusRuntimeException) {
log.warn("Unable to send subscribe request to {}: {}",
deviceId, ex.getMessage());
} else {
log.warn("Exception while sending subscribe request to {}",
deviceId, ex);
}
state.set(State.RETRYING);
return false;
}
}
public void complete() {
synchronized (state) {
state.set(State.INIT);
if (requestObserver != null) {
requestObserver.onCompleted();
requestObserver.cancel("Terminated", null);
requestObserver = null;
}
}
}
private void checkGnmiStream() {
synchronized (state) {
if (state.get() != State.RETRYING) {
// No need to retry if the state is not RETRYING
return;
}
log.info("Try reconnecting gNMI stream to device {}", deviceId);
complete();
send(existingSubscription);
}
}
/**
* Handles messages received from the device on the stream channel.
*/
private final class InternalStreamResponseObserver
implements StreamObserver<Gnmi.SubscribeResponse> {
@Override
public void onNext(Gnmi.SubscribeResponse message) {
try {
log.debug("Received message on stream channel from {}: {}",
deviceId, message.toString());
GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
controller.postEvent(event);
} catch (Throwable ex) {
log.error("Exception while processing stream message from {}",
deviceId, ex);
}
}
@Override
public void onError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) throwable;
if (sre.getStatus().getCause() instanceof ConnectException) {
log.warn("Device {} is unreachable ({})",
deviceId, sre.getCause().getMessage());
} else {
log.warn("Received error on stream channel for {}: {}",
deviceId, throwable.getMessage());
}
} else {
log.warn(format("Received exception on stream channel for %s",
deviceId), throwable);
}
state.set(State.RETRYING);
}
@Override
public void onCompleted() {
log.warn("Stream channel for {} has completed", deviceId);
state.set(State.RETRYING);
}
}
}