[ONOS-7873] Add retry mechanism to gNMI stream channel manager

Change-Id: Ifdd5b1c3fe9d3588913697aace9b77b27fb442f5
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index ce9c5f8..9936ff6 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -24,25 +24,17 @@
 import gnmi.Gnmi.SetRequest;
 import gnmi.Gnmi.SetResponse;
 import gnmi.Gnmi.SubscribeRequest;
-import gnmi.Gnmi.SubscribeResponse;
 import gnmi.gNMIGrpc;
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
-import io.grpc.stub.ClientCallStreamObserver;
-import io.grpc.stub.StreamObserver;
 import org.onosproject.gnmi.api.GnmiClient;
 import org.onosproject.gnmi.api.GnmiClientKey;
-import org.onosproject.gnmi.api.GnmiEvent;
-import org.onosproject.gnmi.api.GnmiUpdate;
 import org.onosproject.grpc.ctl.AbstractGrpcClient;
 import org.slf4j.Logger;
 
-import java.net.ConnectException;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import static java.lang.String.format;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -54,14 +46,13 @@
     private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
     private final Logger log = getLogger(getClass());
     private final gNMIGrpc.gNMIBlockingStub blockingStub;
-    private StreamChannelManager streamChannelManager;
-    private GnmiControllerImpl controller;
+    private GnmiSubscriptionManager gnmiSubscriptionManager;
 
     GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
         super(clientKey);
         this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
-        this.streamChannelManager = new StreamChannelManager(managedChannel);
-        this.controller = controller;
+        this.gnmiSubscriptionManager =
+                new GnmiSubscriptionManager(managedChannel, deviceId, controller);
     }
 
     @Override
@@ -81,12 +72,12 @@
 
     @Override
     public boolean subscribe(SubscribeRequest request) {
-        return streamChannelManager.send(request);
+        return gnmiSubscriptionManager.subscribe(request);
     }
 
     @Override
     public void terminateSubscriptionChannel() {
-        streamChannelManager.complete();
+        gnmiSubscriptionManager.complete();
     }
 
     @Override
@@ -96,7 +87,7 @@
 
     @Override
     protected Void doShutdown() {
-        streamChannelManager.complete();
+        gnmiSubscriptionManager.shutdown();
         return super.doShutdown();
     }
 
@@ -139,121 +130,4 @@
             return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
         }
     }
-
-
-
-    /**
-     * A manager for the gNMI stream channel that opportunistically creates
-     * new stream RCP stubs (e.g. when one fails because of errors) and posts
-     * subscribe events via the gNMI controller.
-     */
-    private final class StreamChannelManager {
-
-        private final ManagedChannel channel;
-        private final AtomicBoolean open;
-        private final StreamObserver<SubscribeResponse> responseObserver;
-        private ClientCallStreamObserver<SubscribeRequest> requestObserver;
-
-        private StreamChannelManager(ManagedChannel channel) {
-            this.channel = channel;
-            this.responseObserver = new InternalStreamResponseObserver(this);
-            this.open = new AtomicBoolean(false);
-        }
-
-        private void initIfRequired() {
-            if (requestObserver == null) {
-                log.debug("Creating new stream channel for {}...", deviceId);
-                requestObserver = (ClientCallStreamObserver<SubscribeRequest>)
-                        gNMIGrpc.newStub(channel).subscribe(responseObserver);
-                open.set(false);
-            }
-        }
-
-        public boolean send(SubscribeRequest value) {
-            synchronized (this) {
-                initIfRequired();
-                try {
-                    requestObserver.onNext(value);
-                    return true;
-                } catch (Throwable ex) {
-                    if (ex instanceof StatusRuntimeException) {
-                        log.warn("Unable to send subscribe request to {}: {}",
-                                deviceId, ex.getMessage());
-                    } else {
-                        log.warn("Exception while sending subscribe request to {}",
-                                deviceId, ex);
-                    }
-                    complete();
-                    return false;
-                }
-            }
-        }
-
-        public void complete() {
-            synchronized (this) {
-                if (requestObserver != null) {
-                    requestObserver.onCompleted();
-                    requestObserver.cancel("Terminated", null);
-                    requestObserver = null;
-                }
-            }
-        }
-    }
-
-
-    /**
-     * Handles messages received from the device on the stream channel.
-     */
-    private final class InternalStreamResponseObserver
-            implements StreamObserver<SubscribeResponse> {
-
-        private final StreamChannelManager streamChannelManager;
-
-        private InternalStreamResponseObserver(
-                StreamChannelManager streamChannelManager) {
-            this.streamChannelManager = streamChannelManager;
-        }
-
-        @Override
-        public void onNext(SubscribeResponse message) {
-            executorService.submit(() -> doNext(message));
-        }
-
-        private void doNext(SubscribeResponse message) {
-            try {
-                log.debug("Received message on stream channel from {}: {}",
-                        deviceId, message.toString());
-                GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
-                GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
-                controller.postEvent(event);
-            } catch (Throwable ex) {
-                log.error("Exception while processing stream message from {}",
-                        deviceId, ex);
-            }
-        }
-
-        @Override
-        public void onError(Throwable throwable) {
-            if (throwable instanceof StatusRuntimeException) {
-                StatusRuntimeException sre = (StatusRuntimeException) throwable;
-                if (sre.getStatus().getCause() instanceof ConnectException) {
-                    log.warn("Device {} is unreachable ({})",
-                            deviceId, sre.getCause().getMessage());
-                } else {
-                    log.warn("Received error on stream channel for {}: {}",
-                            deviceId, throwable.getMessage());
-                }
-            } else {
-                log.warn(format("Received exception on stream channel for %s",
-                        deviceId), throwable);
-            }
-            streamChannelManager.complete();
-        }
-
-        @Override
-        public void onCompleted() {
-            log.warn("Stream channel for {} has completed", deviceId);
-            streamChannelManager.complete();
-        }
-    }
 }
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
new file mode 100644
index 0000000..050925f
--- /dev/null
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiSubscriptionManager.java
@@ -0,0 +1,211 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package protocols.gnmi.ctl.java.org.onosproject.gnmi.ctl;
+
+
+import gnmi.Gnmi;
+import gnmi.gNMIGrpc;
+import io.grpc.ManagedChannel;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.StreamObserver;
+import org.onosproject.gnmi.api.GnmiEvent;
+import org.onosproject.gnmi.api.GnmiUpdate;
+import org.onosproject.net.DeviceId;
+import org.slf4j.Logger;
+
+import java.net.ConnectException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.lang.String.format;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * A manager for the gNMI stream channel that opportunistically creates
+ * new stream RCP stubs (e.g. when one fails because of errors) and posts
+ * subscribe events via the gNMI controller.
+ */
+final class GnmiSubscriptionManager {
+
+    /**
+     * The state of the subscription manager.
+     */
+    enum State {
+
+        /**
+         * Subscription not exists.
+         */
+        INIT,
+
+        /**
+         * Exists a subscription and channel opened.
+         */
+        SUBSCRIBED,
+
+        /**
+         * Exists a subscription, but the channel does not open.
+         */
+        RETRYING,
+    }
+
+    // FIXME: make this configurable
+    private static final long DEFAULT_RECONNECT_DELAY = 5; // Seconds
+    private static final Logger log = getLogger(GnmiSubscriptionManager.class);
+    private final ManagedChannel channel;
+    private final DeviceId deviceId;
+    private final GnmiControllerImpl controller;
+
+    private final StreamObserver<Gnmi.SubscribeResponse> responseObserver;
+    private final AtomicReference<State> state = new AtomicReference<>(State.INIT);
+
+    private ClientCallStreamObserver<Gnmi.SubscribeRequest> requestObserver;
+    private Gnmi.SubscribeRequest existingSubscription;
+    private final ScheduledExecutorService streamCheckerExecutor =
+            newSingleThreadScheduledExecutor(groupedThreads("onos/gnmi-probe", "%d", log));
+
+    GnmiSubscriptionManager(ManagedChannel channel, DeviceId deviceId,
+                            GnmiControllerImpl controller) {
+        this.channel = channel;
+        this.deviceId = deviceId;
+        this.controller = controller;
+        this.responseObserver = new InternalStreamResponseObserver();
+        streamCheckerExecutor.scheduleAtFixedRate(this::checkGnmiStream, 0,
+                                                  DEFAULT_RECONNECT_DELAY,
+                                                  TimeUnit.SECONDS);
+    }
+
+    public void shutdown() {
+        log.info("gNMI subscription manager for device {} shutdown", deviceId);
+        streamCheckerExecutor.shutdown();
+        complete();
+    }
+
+    private void initIfRequired() {
+        if (requestObserver == null) {
+            log.debug("Creating new stream channel for {}...", deviceId);
+            requestObserver = (ClientCallStreamObserver<Gnmi.SubscribeRequest>)
+                    gNMIGrpc.newStub(channel).subscribe(responseObserver);
+
+        }
+    }
+
+    boolean subscribe(Gnmi.SubscribeRequest request) {
+        synchronized (state) {
+            if (state.get() == State.SUBSCRIBED) {
+                // Cancel subscription when we need to subscribe new thing
+                complete();
+            }
+
+            existingSubscription = request;
+            return send(request);
+        }
+    }
+
+    private boolean send(Gnmi.SubscribeRequest value) {
+        initIfRequired();
+        try {
+            requestObserver.onNext(value);
+            state.set(State.SUBSCRIBED);
+            return true;
+        } catch (Throwable ex) {
+            if (ex instanceof StatusRuntimeException) {
+                log.warn("Unable to send subscribe request to {}: {}",
+                        deviceId, ex.getMessage());
+            } else {
+                log.warn("Exception while sending subscribe request to {}",
+                        deviceId, ex);
+            }
+            state.set(State.RETRYING);
+            return false;
+        }
+    }
+
+    public void complete() {
+        synchronized (state) {
+            state.set(State.INIT);
+            if (requestObserver != null) {
+                requestObserver.onCompleted();
+                requestObserver.cancel("Terminated", null);
+                requestObserver = null;
+            }
+        }
+    }
+
+    private void checkGnmiStream() {
+        synchronized (state) {
+            if (state.get() != State.RETRYING) {
+                // No need to retry if the state is not RETRYING
+                return;
+            }
+            log.info("Try reconnecting gNMI stream to device {}", deviceId);
+
+            complete();
+            send(existingSubscription);
+        }
+    }
+
+    /**
+     * Handles messages received from the device on the stream channel.
+     */
+    private final class InternalStreamResponseObserver
+            implements StreamObserver<Gnmi.SubscribeResponse> {
+
+        @Override
+        public void onNext(Gnmi.SubscribeResponse message) {
+            try {
+                log.debug("Received message on stream channel from {}: {}",
+                        deviceId, message.toString());
+                GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
+                GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
+                controller.postEvent(event);
+            } catch (Throwable ex) {
+                log.error("Exception while processing stream message from {}",
+                        deviceId, ex);
+            }
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            if (throwable instanceof StatusRuntimeException) {
+                StatusRuntimeException sre = (StatusRuntimeException) throwable;
+                if (sre.getStatus().getCause() instanceof ConnectException) {
+                    log.warn("Device {} is unreachable ({})",
+                            deviceId, sre.getCause().getMessage());
+                } else {
+                    log.warn("Received error on stream channel for {}: {}",
+                            deviceId, throwable.getMessage());
+                }
+            } else {
+                log.warn(format("Received exception on stream channel for %s",
+                        deviceId), throwable);
+            }
+            state.set(State.RETRYING);
+        }
+
+        @Override
+        public void onCompleted() {
+            log.warn("Stream channel for {} has completed", deviceId);
+            state.set(State.RETRYING);
+        }
+    }
+}
+
+