[ONOS-7873] Add retry mechanism to gNMI stream channel manager
Change-Id: Ifdd5b1c3fe9d3588913697aace9b77b27fb442f5
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index ce9c5f8..9936ff6 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -24,25 +24,17 @@
import gnmi.Gnmi.SetRequest;
import gnmi.Gnmi.SetResponse;
import gnmi.Gnmi.SubscribeRequest;
-import gnmi.Gnmi.SubscribeResponse;
import gnmi.gNMIGrpc;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
-import io.grpc.stub.ClientCallStreamObserver;
-import io.grpc.stub.StreamObserver;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiClientKey;
-import org.onosproject.gnmi.api.GnmiEvent;
-import org.onosproject.gnmi.api.GnmiUpdate;
import org.onosproject.grpc.ctl.AbstractGrpcClient;
import org.slf4j.Logger;
-import java.net.ConnectException;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
-import static java.lang.String.format;
import static org.slf4j.LoggerFactory.getLogger;
/**
@@ -54,14 +46,13 @@
private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
private final Logger log = getLogger(getClass());
private final gNMIGrpc.gNMIBlockingStub blockingStub;
- private StreamChannelManager streamChannelManager;
- private GnmiControllerImpl controller;
+ private GnmiSubscriptionManager gnmiSubscriptionManager;
GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
super(clientKey);
this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
- this.streamChannelManager = new StreamChannelManager(managedChannel);
- this.controller = controller;
+ this.gnmiSubscriptionManager =
+ new GnmiSubscriptionManager(managedChannel, deviceId, controller);
}
@Override
@@ -81,12 +72,12 @@
@Override
public boolean subscribe(SubscribeRequest request) {
- return streamChannelManager.send(request);
+ return gnmiSubscriptionManager.subscribe(request);
}
@Override
public void terminateSubscriptionChannel() {
- streamChannelManager.complete();
+ gnmiSubscriptionManager.complete();
}
@Override
@@ -96,7 +87,7 @@
@Override
protected Void doShutdown() {
- streamChannelManager.complete();
+ gnmiSubscriptionManager.shutdown();
return super.doShutdown();
}
@@ -139,121 +130,4 @@
return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
}
}
-
-
-
- /**
- * A manager for the gNMI stream channel that opportunistically creates
- * new stream RCP stubs (e.g. when one fails because of errors) and posts
- * subscribe events via the gNMI controller.
- */
- private final class StreamChannelManager {
-
- private final ManagedChannel channel;
- private final AtomicBoolean open;
- private final StreamObserver<SubscribeResponse> responseObserver;
- private ClientCallStreamObserver<SubscribeRequest> requestObserver;
-
- private StreamChannelManager(ManagedChannel channel) {
- this.channel = channel;
- this.responseObserver = new InternalStreamResponseObserver(this);
- this.open = new AtomicBoolean(false);
- }
-
- private void initIfRequired() {
- if (requestObserver == null) {
- log.debug("Creating new stream channel for {}...", deviceId);
- requestObserver = (ClientCallStreamObserver<SubscribeRequest>)
- gNMIGrpc.newStub(channel).subscribe(responseObserver);
- open.set(false);
- }
- }
-
- public boolean send(SubscribeRequest value) {
- synchronized (this) {
- initIfRequired();
- try {
- requestObserver.onNext(value);
- return true;
- } catch (Throwable ex) {
- if (ex instanceof StatusRuntimeException) {
- log.warn("Unable to send subscribe request to {}: {}",
- deviceId, ex.getMessage());
- } else {
- log.warn("Exception while sending subscribe request to {}",
- deviceId, ex);
- }
- complete();
- return false;
- }
- }
- }
-
- public void complete() {
- synchronized (this) {
- if (requestObserver != null) {
- requestObserver.onCompleted();
- requestObserver.cancel("Terminated", null);
- requestObserver = null;
- }
- }
- }
- }
-
-
- /**
- * Handles messages received from the device on the stream channel.
- */
- private final class InternalStreamResponseObserver
- implements StreamObserver<SubscribeResponse> {
-
- private final StreamChannelManager streamChannelManager;
-
- private InternalStreamResponseObserver(
- StreamChannelManager streamChannelManager) {
- this.streamChannelManager = streamChannelManager;
- }
-
- @Override
- public void onNext(SubscribeResponse message) {
- executorService.submit(() -> doNext(message));
- }
-
- private void doNext(SubscribeResponse message) {
- try {
- log.debug("Received message on stream channel from {}: {}",
- deviceId, message.toString());
- GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
- GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
- controller.postEvent(event);
- } catch (Throwable ex) {
- log.error("Exception while processing stream message from {}",
- deviceId, ex);
- }
- }
-
- @Override
- public void onError(Throwable throwable) {
- if (throwable instanceof StatusRuntimeException) {
- StatusRuntimeException sre = (StatusRuntimeException) throwable;
- if (sre.getStatus().getCause() instanceof ConnectException) {
- log.warn("Device {} is unreachable ({})",
- deviceId, sre.getCause().getMessage());
- } else {
- log.warn("Received error on stream channel for {}: {}",
- deviceId, throwable.getMessage());
- }
- } else {
- log.warn(format("Received exception on stream channel for %s",
- deviceId), throwable);
- }
- streamChannelManager.complete();
- }
-
- @Override
- public void onCompleted() {
- log.warn("Stream channel for {} has completed", deviceId);
- streamChannelManager.complete();
- }
- }
}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
new file mode 100644
index 0000000..050925f
--- /dev/null
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package protocols.gnmi.ctl.java.org.onosproject.gnmi.ctl;
+
+
+import gnmi.Gnmi;
+import gnmi.gNMIGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import org.onosproject.gnmi.api.GnmiEvent;
+import org.onosproject.gnmi.api.GnmiUpdate;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+
+import java.net.ConnectException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.lang.String.format;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * A manager for the gNMI stream channel that opportunistically creates
+ * new stream RCP stubs (e.g. when one fails because of errors) and posts
+ * subscribe events via the gNMI controller.
+ */
+final class GnmiSubscriptionManager {
+
+ /**
+ * The state of the subscription manager.
+ */
+ enum State {
+
+ /**
+ * Subscription not exists.
+ */
+ INIT,
+
+ /**
+ * Exists a subscription and channel opened.
+ */
+ SUBSCRIBED,
+
+ /**
+ * Exists a subscription, but the channel does not open.
+ */
+ RETRYING,
+ }
+
+ // FIXME: make this configurable
+ private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds
+ private static final Logger log = getLogger(GnmiSubscriptionManager.class);
+ private final ManagedChannel channel;
+ private final DeviceId deviceId;
+ private final GnmiControllerImpl controller;
+
+ private final StreamObserver<Gnmi.SubscribeResponse> responseObserver;
+ private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
+
+ private ClientCallStreamObserver<Gnmi.SubscribeRequest> requestObserver;
+ private Gnmi.SubscribeRequest existingSubscription;
+ private final ScheduledExecutorService streamCheckerExecutor =
+ newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-probe", "%d", log));
+
+ GnmiSubscriptionManager(ManagedChannel channel, DeviceId deviceId,
+ GnmiControllerImpl controller) {
+ this.channel = channel;
+ this.deviceId = deviceId;
+ this.controller = controller;
+ this.responseObserver = new InternalStreamResponseObserver();
+ streamCheckerExecutor.scheduleAtFixedRate(this::checkGnmiStream, 0,
+ DEFAULT_RECONNECT_DELAY,
+ TimeUnit.SECONDS);
+ }
+
+ public void shutdown() {
+ log.info("gNMI subscription manager for device {} shutdown", deviceId);
+ streamCheckerExecutor.shutdown();
+ complete();
+ }
+
+ private void initIfRequired() {
+ if (requestObserver == null) {
+ log.debug("Creating new stream channel for {}...", deviceId);
+ requestObserver = (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
+ gNMIGrpc.newStub(channel).subscribe(responseObserver);
+
+ }
+ }
+
+ boolean subscribe(Gnmi.SubscribeRequest request) {
+ synchronized (state) {
+ if (state.get() == State.SUBSCRIBED) {
+ // Cancel subscription when we need to subscribe new thing
+ complete();
+ }
+
+ existingSubscription = request;
+ return send(request);
+ }
+ }
+
+ private boolean send(Gnmi.SubscribeRequest value) {
+ initIfRequired();
+ try {
+ requestObserver.onNext(value);
+ state.set(State.SUBSCRIBED);
+ return true;
+ } catch (Throwable ex) {
+ if (ex instanceof StatusRuntimeException) {
+ log.warn("Unable to send subscribe request to {}: {}",
+ deviceId, ex.getMessage());
+ } else {
+ log.warn("Exception while sending subscribe request to {}",
+ deviceId, ex);
+ }
+ state.set(State.RETRYING);
+ return false;
+ }
+ }
+
+ public void complete() {
+ synchronized (state) {
+ state.set(State.INIT);
+ if (requestObserver != null) {
+ requestObserver.onCompleted();
+ requestObserver.cancel("Terminated", null);
+ requestObserver = null;
+ }
+ }
+ }
+
+ private void checkGnmiStream() {
+ synchronized (state) {
+ if (state.get() != State.RETRYING) {
+ // No need to retry if the state is not RETRYING
+ return;
+ }
+ log.info("Try reconnecting gNMI stream to device {}", deviceId);
+
+ complete();
+ send(existingSubscription);
+ }
+ }
+
+ /**
+ * Handles messages received from the device on the stream channel.
+ */
+ private final class InternalStreamResponseObserver
+ implements StreamObserver<Gnmi.SubscribeResponse> {
+
+ @Override
+ public void onNext(Gnmi.SubscribeResponse message) {
+ try {
+ log.debug("Received message on stream channel from {}: {}",
+ deviceId, message.toString());
+ GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
+ GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
+ controller.postEvent(event);
+ } catch (Throwable ex) {
+ log.error("Exception while processing stream message from {}",
+ deviceId, ex);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ if (throwable instanceof StatusRuntimeException) {
+ StatusRuntimeException sre = (StatusRuntimeException) throwable;
+ if (sre.getStatus().getCause() instanceof ConnectException) {
+ log.warn("Device {} is unreachable ({})",
+ deviceId, sre.getCause().getMessage());
+ } else {
+ log.warn("Received error on stream channel for {}: {}",
+ deviceId, throwable.getMessage());
+ }
+ } else {
+ log.warn(format("Received exception on stream channel for %s",
+ deviceId), throwable);
+ }
+ state.set(State.RETRYING);
+ }
+
+ @Override
+ public void onCompleted() {
+ log.warn("Stream channel for {} has completed", deviceId);
+ state.set(State.RETRYING);
+ }
+ }
+}
+
+