Add gNMI device state subscriber

Change-Id: I20cb5e130f4e416bf8678aab2e5268faf24ad06b
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index 22b226b..117b27e 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -23,17 +23,26 @@
 import gnmi.Gnmi.PathElem;
 import gnmi.Gnmi.SetRequest;
 import gnmi.Gnmi.SetResponse;
+import gnmi.Gnmi.SubscribeRequest;
+import gnmi.Gnmi.SubscribeResponse;
 import gnmi.gNMIGrpc;
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.StreamObserver;
 import org.onosproject.gnmi.api.GnmiClient;
 import org.onosproject.gnmi.api.GnmiClientKey;
+import org.onosproject.gnmi.api.GnmiEvent;
+import org.onosproject.gnmi.api.GnmiUpdate;
 import org.onosproject.grpc.ctl.AbstractGrpcClient;
 import org.slf4j.Logger;
 
+import java.net.ConnectException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.lang.String.format;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -45,10 +54,14 @@
     private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
     private final Logger log = getLogger(getClass());
     private final gNMIGrpc.gNMIBlockingStub blockingStub;
+    private StreamChannelManager streamChannelManager;
+    private GnmiControllerImpl controller;
 
-    GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel) {
+    GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
         super(clientKey);
         this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
+        this.streamChannelManager = new StreamChannelManager(managedChannel);
+        this.controller = controller;
     }
 
     @Override
@@ -67,10 +80,26 @@
     }
 
     @Override
+    public boolean subscribe(SubscribeRequest request) {
+        return streamChannelManager.send(request);
+    }
+
+    @Override
+    public void terminateSubscriptionChannel() {
+        streamChannelManager.complete();
+    }
+
+    @Override
     public CompletableFuture<Boolean> isServiceAvailable() {
         return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
     }
 
+    @Override
+    protected Void doShutdown() {
+        streamChannelManager.complete();
+        return super.doShutdown();
+    }
+
     private CapabilityResponse doCapability() {
         CapabilityRequest request = CapabilityRequest.newBuilder().build();
         try {
@@ -110,4 +139,121 @@
             return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
         }
     }
+
+
+
+    /**
+     * A manager for the gNMI stream channel that opportunistically creates
+     * new stream RCP stubs (e.g. when one fails because of errors) and posts
+     * subscribe events via the gNMI controller.
+     */
+    private final class StreamChannelManager {
+
+        private final ManagedChannel channel;
+        private final AtomicBoolean open;
+        private final StreamObserver<SubscribeResponse> responseObserver;
+        private ClientCallStreamObserver<SubscribeRequest> requestObserver;
+
+        private StreamChannelManager(ManagedChannel channel) {
+            this.channel = channel;
+            this.responseObserver = new InternalStreamResponseObserver(this);
+            this.open = new AtomicBoolean(false);
+        }
+
+        private void initIfRequired() {
+            if (requestObserver == null) {
+                log.debug("Creating new stream channel for {}...", deviceId);
+                requestObserver = (ClientCallStreamObserver<SubscribeRequest>)
+                        gNMIGrpc.newStub(channel).subscribe(responseObserver);
+                open.set(false);
+            }
+        }
+
+        public boolean send(SubscribeRequest value) {
+            synchronized (this) {
+                initIfRequired();
+                try {
+                    requestObserver.onNext(value);
+                    return true;
+                } catch (Throwable ex) {
+                    if (ex instanceof StatusRuntimeException) {
+                        log.warn("Unable to send subscribe request to {}: {}",
+                                deviceId, ex.getMessage());
+                    } else {
+                        log.warn("Exception while sending subscribe request to {}",
+                                deviceId, ex);
+                    }
+                    complete();
+                    return false;
+                }
+            }
+        }
+
+        public void complete() {
+            synchronized (this) {
+                if (requestObserver != null) {
+                    requestObserver.onCompleted();
+                    requestObserver.cancel("Terminated", null);
+                    requestObserver = null;
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Handles messages received from the device on the stream channel.
+     */
+    private final class InternalStreamResponseObserver
+            implements StreamObserver<SubscribeResponse> {
+
+        private final StreamChannelManager streamChannelManager;
+
+        private InternalStreamResponseObserver(
+                StreamChannelManager streamChannelManager) {
+            this.streamChannelManager = streamChannelManager;
+        }
+
+        @Override
+        public void onNext(SubscribeResponse message) {
+            executorService.submit(() -> doNext(message));
+        }
+
+        private void doNext(SubscribeResponse message) {
+            try {
+                log.debug("Received message on stream channel from {}: {}",
+                        deviceId, message.toString());
+                GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
+                GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
+                controller.postEvent(event);
+            } catch (Throwable ex) {
+                log.error("Exception while processing stream message from {}",
+                        deviceId, ex);
+            }
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            if (throwable instanceof StatusRuntimeException) {
+                StatusRuntimeException sre = (StatusRuntimeException) throwable;
+                if (sre.getStatus().getCause() instanceof ConnectException) {
+                    log.warn("Device {} is unreachable ({})",
+                            deviceId, sre.getCause().getMessage());
+                } else {
+                    log.warn("Received error on stream channel for {}: {}",
+                            deviceId, throwable.getMessage());
+                }
+            } else {
+                log.warn(format("Received exception on stream channel for %s",
+                        deviceId), throwable);
+            }
+            streamChannelManager.complete();
+        }
+
+        @Override
+        public void onCompleted() {
+            log.warn("Stream channel for {} has completed", deviceId);
+            streamChannelManager.complete();
+        }
+    }
 }
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
index 8392c4a..cf14712 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
@@ -41,6 +41,7 @@
     @Activate
     public void activate() {
         super.activate();
+        eventDispatcher.addSink(GnmiEvent.class, listenerRegistry);
         log.info("Started");
     }
 
@@ -52,6 +53,15 @@
 
     @Override
     protected GnmiClient createClientInstance(GnmiClientKey clientKey, ManagedChannel channel) {
-        return new GnmiClientImpl(clientKey, channel);
+        return new GnmiClientImpl(clientKey, channel, this);
+    }
+
+    /**
+     * Handles event from gNMI client.
+     *
+     * @param event the gNMI event
+     */
+    void postEvent(GnmiEvent event) {
+        post(event);
     }
 }