Add gNMI device state subscriber
Change-Id: I20cb5e130f4e416bf8678aab2e5268faf24ad06b
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index 22b226b..117b27e 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -23,17 +23,26 @@
import gnmi.Gnmi.PathElem;
import gnmi.Gnmi.SetRequest;
import gnmi.Gnmi.SetResponse;
+import gnmi.Gnmi.SubscribeRequest;
+import gnmi.Gnmi.SubscribeResponse;
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiClientKey;
+import org.onosproject.gnmi.api.GnmiEvent;
+import org.onosproject.gnmi.api.GnmiUpdate;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
import org.slf4j.Logger;
+import java.net.ConnectException;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -45,10 +54,14 @@
private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
private final Logger log = getLogger(getClass());
private final gNMIGrpc.gNMIBlockingStub blockingStub;
+ private StreamChannelManager streamChannelManager;
+ private GnmiControllerImpl controller;
- GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel) {
+ GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
super(clientKey);
this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
+ this.streamChannelManager = new StreamChannelManager(managedChannel);
+ this.controller = controller;
}
@Override
@@ -67,10 +80,26 @@
}
@Override
+ public boolean subscribe(SubscribeRequest request) {
+ return streamChannelManager.send(request);
+ }
+
+ @Override
+ public void terminateSubscriptionChannel() {
+ streamChannelManager.complete();
+ }
+
+ @Override
public CompletableFuture<Boolean> isServiceAvailable() {
return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
}
+ @Override
+ protected Void doShutdown() {
+ streamChannelManager.complete();
+ return super.doShutdown();
+ }
+
private CapabilityResponse doCapability() {
CapabilityRequest request = CapabilityRequest.newBuilder().build();
try {
@@ -110,4 +139,121 @@
return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
}
}
+
+
+
+ /**
+ * A manager for the gNMI stream channel that opportunistically creates
+ * new stream RCP stubs (e.g. when one fails because of errors) and posts
+ * subscribe events via the gNMI controller.
+ */
+ private final class StreamChannelManager {
+
+ private final ManagedChannel channel;
+ private final AtomicBoolean open;
+ private final StreamObserver<SubscribeResponse> responseObserver;
+ private ClientCallStreamObserver<SubscribeRequest> requestObserver;
+
+ private StreamChannelManager(ManagedChannel channel) {
+ this.channel = channel;
+ this.responseObserver = new InternalStreamResponseObserver(this);
+ this.open = new AtomicBoolean(false);
+ }
+
+ private void initIfRequired() {
+ if (requestObserver == null) {
+ log.debug("Creating new stream channel for {}...", deviceId);
+ requestObserver = (ClientCallStreamObserver<SubscribeRequest>)
+ gNMIGrpc.newStub(channel).subscribe(responseObserver);
+ open.set(false);
+ }
+ }
+
+ public boolean send(SubscribeRequest value) {
+ synchronized (this) {
+ initIfRequired();
+ try {
+ requestObserver.onNext(value);
+ return true;
+ } catch (Throwable ex) {
+ if (ex instanceof StatusRuntimeException) {
+ log.warn("Unable to send subscribe request to {}: {}",
+ deviceId, ex.getMessage());
+ } else {
+ log.warn("Exception while sending subscribe request to {}",
+ deviceId, ex);
+ }
+ complete();
+ return false;
+ }
+ }
+ }
+
+ public void complete() {
+ synchronized (this) {
+ if (requestObserver != null) {
+ requestObserver.onCompleted();
+ requestObserver.cancel("Terminated", null);
+ requestObserver = null;
+ }
+ }
+ }
+ }
+
+
+ /**
+ * Handles messages received from the device on the stream channel.
+ */
+ private final class InternalStreamResponseObserver
+ implements StreamObserver<SubscribeResponse> {
+
+ private final StreamChannelManager streamChannelManager;
+
+ private InternalStreamResponseObserver(
+ StreamChannelManager streamChannelManager) {
+ this.streamChannelManager = streamChannelManager;
+ }
+
+ @Override
+ public void onNext(SubscribeResponse message) {
+ executorService.submit(() -> doNext(message));
+ }
+
+ private void doNext(SubscribeResponse message) {
+ try {
+ log.debug("Received message on stream channel from {}: {}",
+ deviceId, message.toString());
+ GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
+ GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
+ controller.postEvent(event);
+ } catch (Throwable ex) {
+ log.error("Exception while processing stream message from {}",
+ deviceId, ex);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ if (throwable instanceof StatusRuntimeException) {
+ StatusRuntimeException sre = (StatusRuntimeException) throwable;
+ if (sre.getStatus().getCause() instanceof ConnectException) {
+ log.warn("Device {} is unreachable ({})",
+ deviceId, sre.getCause().getMessage());
+ } else {
+ log.warn("Received error on stream channel for {}: {}",
+ deviceId, throwable.getMessage());
+ }
+ } else {
+ log.warn(format("Received exception on stream channel for %s",
+ deviceId), throwable);
+ }
+ streamChannelManager.complete();
+ }
+
+ @Override
+ public void onCompleted() {
+ log.warn("Stream channel for {} has completed", deviceId);
+ streamChannelManager.complete();
+ }
+ }
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
index 8392c4a..cf14712 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
@@ -41,6 +41,7 @@
@Activate
public void activate() {
super.activate();
+ eventDispatcher.addSink(GnmiEvent.class, listenerRegistry);
log.info("Started");
}
@@ -52,6 +53,15 @@
@Override
protected GnmiClient createClientInstance(GnmiClientKey clientKey, ManagedChannel channel) {
- return new GnmiClientImpl(clientKey, channel);
+ return new GnmiClientImpl(clientKey, channel, this);
+ }
+
+ /**
+ * Handles event from gNMI client.
+ *
+ * @param event the gNMI event
+ */
+ void postEvent(GnmiEvent event) {
+ post(event);
}
}