blob: 73bb807609f36e8fc0337c66ed04a7253062071c [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.gnmi.ctl;
import com.google.common.util.concurrent.Futures;
import gnmi.Gnmi;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.ClientCallStreamObserver;
import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiEvent;
import org.onosproject.gnmi.api.GnmiUpdate;
import org.onosproject.net.DeviceId;
import org.slf4j.Logger;
import java.net.ConnectException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static java.lang.String.format;
import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
import static org.onlab.util.Tools.groupedThreads;
import static org.slf4j.LoggerFactory.getLogger;
/**
* A manager for the gNMI Subscribe RPC that opportunistically starts new RPC
* (e.g. when one fails because of errors) and posts subscribe events via the
* gNMI controller.
*/
final class GnmiSubscriptionManager {
// FIXME: make this configurable
private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds
private static final Logger log = getLogger(GnmiSubscriptionManager.class);
private final GnmiClientImpl client;
private final DeviceId deviceId;
private final GnmiControllerImpl controller;
private final StreamObserver<Gnmi.SubscribeResponse> responseObserver;
private final ScheduledExecutorService streamCheckerExecutor =
newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-subscribe-check", "%d", log));
private Future<?> checkTask;
private ClientCallStreamObserver<Gnmi.SubscribeRequest> requestObserver;
private Gnmi.SubscribeRequest existingSubscription;
private AtomicBoolean active = new AtomicBoolean(false);
GnmiSubscriptionManager(GnmiClientImpl client, DeviceId deviceId,
GnmiControllerImpl controller) {
this.client = client;
this.deviceId = deviceId;
this.controller = controller;
this.responseObserver = new InternalStreamResponseObserver();
}
void subscribe(Gnmi.SubscribeRequest request) {
synchronized (this) {
if (existingSubscription != null) {
if (existingSubscription.equals(request)) {
// Nothing to do. We are already subscribed for the same
// request.
log.debug("Ignoring re-subscription to same request",
deviceId);
return;
}
log.debug("Cancelling existing subscription for {} before " +
"starting a new one", deviceId);
complete();
}
existingSubscription = request;
sendSubscribeRequest();
if (checkTask != null) {
checkTask = streamCheckerExecutor.scheduleAtFixedRate(
this::checkSubscription, 0,
DEFAULT_RECONNECT_DELAY,
TimeUnit.SECONDS);
}
}
}
void unsubscribe() {
synchronized (this) {
if (checkTask != null) {
checkTask.cancel(false);
checkTask = null;
}
existingSubscription = null;
complete();
}
}
public void shutdown() {
log.debug("Shutting down gNMI subscription manager for {}", deviceId);
unsubscribe();
streamCheckerExecutor.shutdownNow();
}
private void checkSubscription() {
synchronized (this) {
if (existingSubscription != null && !active.get()) {
if (client.isServerReachable() || Futures.getUnchecked(client.probeService())) {
log.info("Re-starting Subscribe RPC for {}...", deviceId);
sendSubscribeRequest();
} else {
log.debug("Not restarting Subscribe RPC for {}, server is NOT reachable",
deviceId);
}
}
}
}
private void sendSubscribeRequest() {
if (requestObserver == null) {
log.debug("Starting new Subscribe RPC for {}...", deviceId);
client.execRpcNoTimeout(
s -> requestObserver =
(ClientCallStreamObserver<Gnmi.SubscribeRequest>)
s.subscribe(responseObserver)
);
}
requestObserver.onNext(existingSubscription);
active.set(true);
}
public void complete() {
synchronized (this) {
active.set(false);
if (requestObserver != null) {
requestObserver.onCompleted();
requestObserver.cancel("Terminated", null);
requestObserver = null;
}
}
}
/**
* Handles messages received from the device on the Subscribe RPC.
*/
private final class InternalStreamResponseObserver
implements StreamObserver<Gnmi.SubscribeResponse> {
@Override
public void onNext(Gnmi.SubscribeResponse message) {
try {
if (log.isTraceEnabled()) {
log.trace("Received SubscribeResponse from {}: {}",
deviceId, message.toString());
}
controller.postEvent(new GnmiEvent(GnmiEvent.Type.UPDATE, new GnmiUpdate(
deviceId, message.getUpdate(), message.getSyncResponse())));
} catch (Throwable ex) {
log.error("Exception processing SubscribeResponse from " + deviceId,
ex);
}
}
@Override
public void onError(Throwable throwable) {
complete();
if (throwable instanceof StatusRuntimeException) {
StatusRuntimeException sre = (StatusRuntimeException) throwable;
if (sre.getStatus().getCause() instanceof ConnectException) {
log.warn("{} is unreachable ({})",
deviceId, sre.getCause().getMessage());
} else {
log.warn("Error on Subscribe RPC for {}: {}",
deviceId, throwable.getMessage());
}
} else {
log.error(format("Exception on Subscribe RPC for %s",
deviceId), throwable);
}
}
@Override
public void onCompleted() {
complete();
log.warn("Subscribe RPC for {} has completed", deviceId);
}
}
@Override
protected void finalize() throws Throwable {
if (!streamCheckerExecutor.isShutdown()) {
log.error("Finalizing object but executor is still active! BUG? Shutting down...");
shutdown();
}
super.finalize();
}
}