Add gNMI device state subscriber

Change-Id: I20cb5e130f4e416bf8678aab2e5268faf24ad06b
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
index 5beb238..6e65dd3 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiClient.java
@@ -22,6 +22,7 @@
 import gnmi.Gnmi.GetResponse;
 import gnmi.Gnmi.SetRequest;
 import gnmi.Gnmi.SetResponse;
+import gnmi.Gnmi.SubscribeRequest;
 import org.onosproject.grpc.api.GrpcClient;
 
 import java.util.concurrent.CompletableFuture;
@@ -56,12 +57,23 @@
     CompletableFuture<SetResponse> set(SetRequest request);
 
     /**
-     * Check weather the gNMI service is available or not by sending a
-     * dummy get request message.
+     * Subscribes to a given specific gNMI path.
+     *
+     * @param request the subscribe request
+     * @return true if subscribe successfully; false otherwise
+     */
+    boolean subscribe(SubscribeRequest request);
+
+    /**
+     * Terminates the subscription channel of this device.
+     */
+    void terminateSubscriptionChannel();
+
+    /**
+     * Check weather the gNMI service is available or not by sending a dummy get
+     * request message.
      *
      * @return true if gNMI service available; false otherwise
      */
     CompletableFuture<Boolean> isServiceAvailable();
-
-    // TODO: Support gNMI subscription
 }
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
index f4964ed..b0e0071 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiController.java
@@ -17,6 +17,7 @@
 package org.onosproject.gnmi.api;
 
 import com.google.common.annotations.Beta;
+import org.onosproject.event.ListenerService;
 import org.onosproject.grpc.api.GrpcClientController;
 
 /**
@@ -24,5 +25,6 @@
  */
 @Beta
 public interface GnmiController
-        extends GrpcClientController<GnmiClientKey, GnmiClient> {
+        extends GrpcClientController<GnmiClientKey, GnmiClient>,
+        ListenerService<GnmiEvent, GnmiEventListener> {
 }
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java
index 5129926..84031d0 100644
--- a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiEvent.java
@@ -32,15 +32,10 @@
         /**
          * Update.
          */
-        UPDATE,
-
-        /**
-         * Sync response.
-         */
-        SYNC_RESPONSE
+        UPDATE
     }
 
-    protected GnmiEvent(Type type, GnmiEventSubject subject) {
+    public GnmiEvent(Type type, GnmiEventSubject subject) {
         super(type, subject);
     }
 }
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUpdate.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUpdate.java
new file mode 100644
index 0000000..eeb7649
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUpdate.java
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import com.google.common.base.MoreObjects;
+import gnmi.Gnmi.Notification;
+import org.onosproject.net.DeviceId;
+
+/**
+ * Event class for gNMI update.
+ */
+public class GnmiUpdate implements GnmiEventSubject {
+    private DeviceId deviceId;
+    private Notification update;
+    private boolean syncResponse;
+
+    /**
+     * Default constructor.
+     *
+     * @param deviceId the device id for this event
+     * @param update the update for this event
+     * @param syncResponse indicate target has sent all values associated with
+     *                     the subscription at least once.
+     */
+    public GnmiUpdate(DeviceId deviceId, Notification update, boolean syncResponse) {
+        this.deviceId = deviceId;
+        this.update = update;
+        this.syncResponse = syncResponse;
+    }
+
+    /**
+     * Gets the update data.
+     *
+     * @return the update data
+     */
+    public Notification update() {
+        return update;
+    }
+
+    /**
+     * indicate target has sent all values associated with the subscription at
+     * least once.
+     *
+     * @return true if all value from target has sent
+     */
+    public boolean syncResponse() {
+        return syncResponse;
+    }
+
+    @Override
+    public DeviceId deviceId() {
+        return deviceId;
+    }
+
+    @Override
+    public String toString() {
+        return MoreObjects.toStringHelper(getClass())
+                .add("deviceId", deviceId)
+                .add("syncResponse", syncResponse)
+                .add("update", update)
+                .toString();
+    }
+}
diff --git a/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUtils.java b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUtils.java
new file mode 100644
index 0000000..6a71a19
--- /dev/null
+++ b/protocols/gnmi/api/src/main/java/org/onosproject/gnmi/api/GnmiUtils.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.gnmi.api;
+
+import gnmi.Gnmi.Path;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Utilities for gNMI protocol.
+ */
+public final class GnmiUtils {
+
+    private GnmiUtils() {
+        // Hide default constructor
+    }
+
+    /**
+     * Convert gNMI path to human readable string.
+     *
+     * @param path the gNMI path
+     * @return readable string of the path
+     */
+    public static String pathToString(Path path) {
+        StringBuilder pathStringBuilder = new StringBuilder();
+
+        path.getElemList().forEach(elem -> {
+            pathStringBuilder.append("/").append(elem.getName());
+            if (elem.getKeyCount() > 0) {
+                pathStringBuilder.append("[");
+                List<String> keys = elem.getKeyMap().entrySet().stream()
+                        .map(entry -> entry.getKey() + "=" + entry.getValue())
+                        .collect(Collectors.toList());
+                pathStringBuilder.append(String.join(", ", keys));
+                pathStringBuilder.append("]");
+            }
+        });
+        return pathStringBuilder.toString();
+    }
+}
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
index 22b226b..117b27e 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiClientImpl.java
@@ -23,17 +23,26 @@
 import gnmi.Gnmi.PathElem;
 import gnmi.Gnmi.SetRequest;
 import gnmi.Gnmi.SetResponse;
+import gnmi.Gnmi.SubscribeRequest;
+import gnmi.Gnmi.SubscribeResponse;
 import gnmi.gNMIGrpc;
 import io.grpc.ManagedChannel;
 import io.grpc.Status;
 import io.grpc.StatusRuntimeException;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.StreamObserver;
 import org.onosproject.gnmi.api.GnmiClient;
 import org.onosproject.gnmi.api.GnmiClientKey;
+import org.onosproject.gnmi.api.GnmiEvent;
+import org.onosproject.gnmi.api.GnmiUpdate;
 import org.onosproject.grpc.ctl.AbstractGrpcClient;
 import org.slf4j.Logger;
 
+import java.net.ConnectException;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static java.lang.String.format;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -45,10 +54,14 @@
     private static final GetRequest DUMMY_REQUEST = GetRequest.newBuilder().addPath(DUMMY_PATH).build();
     private final Logger log = getLogger(getClass());
     private final gNMIGrpc.gNMIBlockingStub blockingStub;
+    private StreamChannelManager streamChannelManager;
+    private GnmiControllerImpl controller;
 
-    GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel) {
+    GnmiClientImpl(GnmiClientKey clientKey, ManagedChannel managedChannel, GnmiControllerImpl controller) {
         super(clientKey);
         this.blockingStub = gNMIGrpc.newBlockingStub(managedChannel);
+        this.streamChannelManager = new StreamChannelManager(managedChannel);
+        this.controller = controller;
     }
 
     @Override
@@ -67,10 +80,26 @@
     }
 
     @Override
+    public boolean subscribe(SubscribeRequest request) {
+        return streamChannelManager.send(request);
+    }
+
+    @Override
+    public void terminateSubscriptionChannel() {
+        streamChannelManager.complete();
+    }
+
+    @Override
     public CompletableFuture<Boolean> isServiceAvailable() {
         return supplyInContext(this::doServiceAvailable, "isServiceAvailable");
     }
 
+    @Override
+    protected Void doShutdown() {
+        streamChannelManager.complete();
+        return super.doShutdown();
+    }
+
     private CapabilityResponse doCapability() {
         CapabilityRequest request = CapabilityRequest.newBuilder().build();
         try {
@@ -110,4 +139,121 @@
             return e.getStatus().getCode().equals(Status.Code.INVALID_ARGUMENT);
         }
     }
+
+
+
+    /**
+     * A manager for the gNMI stream channel that opportunistically creates
+     * new stream RCP stubs (e.g. when one fails because of errors) and posts
+     * subscribe events via the gNMI controller.
+     */
+    private final class StreamChannelManager {
+
+        private final ManagedChannel channel;
+        private final AtomicBoolean open;
+        private final StreamObserver<SubscribeResponse> responseObserver;
+        private ClientCallStreamObserver<SubscribeRequest> requestObserver;
+
+        private StreamChannelManager(ManagedChannel channel) {
+            this.channel = channel;
+            this.responseObserver = new InternalStreamResponseObserver(this);
+            this.open = new AtomicBoolean(false);
+        }
+
+        private void initIfRequired() {
+            if (requestObserver == null) {
+                log.debug("Creating new stream channel for {}...", deviceId);
+                requestObserver = (ClientCallStreamObserver<SubscribeRequest>)
+                        gNMIGrpc.newStub(channel).subscribe(responseObserver);
+                open.set(false);
+            }
+        }
+
+        public boolean send(SubscribeRequest value) {
+            synchronized (this) {
+                initIfRequired();
+                try {
+                    requestObserver.onNext(value);
+                    return true;
+                } catch (Throwable ex) {
+                    if (ex instanceof StatusRuntimeException) {
+                        log.warn("Unable to send subscribe request to {}: {}",
+                                deviceId, ex.getMessage());
+                    } else {
+                        log.warn("Exception while sending subscribe request to {}",
+                                deviceId, ex);
+                    }
+                    complete();
+                    return false;
+                }
+            }
+        }
+
+        public void complete() {
+            synchronized (this) {
+                if (requestObserver != null) {
+                    requestObserver.onCompleted();
+                    requestObserver.cancel("Terminated", null);
+                    requestObserver = null;
+                }
+            }
+        }
+    }
+
+
+    /**
+     * Handles messages received from the device on the stream channel.
+     */
+    private final class InternalStreamResponseObserver
+            implements StreamObserver<SubscribeResponse> {
+
+        private final StreamChannelManager streamChannelManager;
+
+        private InternalStreamResponseObserver(
+                StreamChannelManager streamChannelManager) {
+            this.streamChannelManager = streamChannelManager;
+        }
+
+        @Override
+        public void onNext(SubscribeResponse message) {
+            executorService.submit(() -> doNext(message));
+        }
+
+        private void doNext(SubscribeResponse message) {
+            try {
+                log.debug("Received message on stream channel from {}: {}",
+                        deviceId, message.toString());
+                GnmiUpdate update = new GnmiUpdate(deviceId, message.getUpdate(), message.getSyncResponse());
+                GnmiEvent event = new GnmiEvent(GnmiEvent.Type.UPDATE, update);
+                controller.postEvent(event);
+            } catch (Throwable ex) {
+                log.error("Exception while processing stream message from {}",
+                        deviceId, ex);
+            }
+        }
+
+        @Override
+        public void onError(Throwable throwable) {
+            if (throwable instanceof StatusRuntimeException) {
+                StatusRuntimeException sre = (StatusRuntimeException) throwable;
+                if (sre.getStatus().getCause() instanceof ConnectException) {
+                    log.warn("Device {} is unreachable ({})",
+                            deviceId, sre.getCause().getMessage());
+                } else {
+                    log.warn("Received error on stream channel for {}: {}",
+                            deviceId, throwable.getMessage());
+                }
+            } else {
+                log.warn(format("Received exception on stream channel for %s",
+                        deviceId), throwable);
+            }
+            streamChannelManager.complete();
+        }
+
+        @Override
+        public void onCompleted() {
+            log.warn("Stream channel for {} has completed", deviceId);
+            streamChannelManager.complete();
+        }
+    }
 }
diff --git a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
index 8392c4a..cf14712 100644
--- a/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
+++ b/protocols/gnmi/ctl/src/main/java/org/onosproject/gnmi/ctl/GnmiControllerImpl.java
@@ -41,6 +41,7 @@
     @Activate
     public void activate() {
         super.activate();
+        eventDispatcher.addSink(GnmiEvent.class, listenerRegistry);
         log.info("Started");
     }
 
@@ -52,6 +53,15 @@
 
     @Override
     protected GnmiClient createClientInstance(GnmiClientKey clientKey, ManagedChannel channel) {
-        return new GnmiClientImpl(clientKey, channel);
+        return new GnmiClientImpl(clientKey, channel, this);
+    }
+
+    /**
+     * Handles event from gNMI client.
+     *
+     * @param event the gNMI event
+     */
+    void postEvent(GnmiEvent event) {
+        post(event);
     }
 }
diff --git a/providers/general/BUILD b/providers/general/BUILD
index 0f4654f..2ac73a0 100644
--- a/providers/general/BUILD
+++ b/providers/general/BUILD
@@ -7,7 +7,7 @@
     category = "Provider",
     description = "General device southbound providers.",
     included_bundles = BUNDLES,
-    required_apps = [],
+    required_apps = ["org.onosproject.protocols.gnmi"],
     title = "General Device Provider",
     url = "http://onosproject.org",
 )
diff --git a/providers/general/device/BUILD b/providers/general/device/BUILD
index 2807457..40957b1 100644
--- a/providers/general/device/BUILD
+++ b/providers/general/device/BUILD
@@ -1,4 +1,9 @@
-COMPILE_DEPS = CORE_DEPS + JACKSON
+COMPILE_DEPS = CORE_DEPS + JACKSON + [
+    "//protocols/gnmi/stub:onos-protocols-gnmi-stub",
+    "//protocols/gnmi/api:onos-protocols-gnmi-api",
+    "@com_google_protobuf//:protobuf_java",
+    "//protocols/grpc/api:onos-protocols-grpc-api",
+]
 
 osgi_jar_with_tests(
     test_deps = TEST_ADAPTERS,
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
index 3ca56ee..bbf71f2 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GeneralDeviceProvider.java
@@ -25,6 +25,7 @@
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.CoreService;
+import org.onosproject.gnmi.api.GnmiController;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.DefaultAnnotations;
@@ -169,6 +170,16 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     private PiPipeconfWatchdogService pipeconfWatchdogService;
 
+    // FIXME: no longer general if we add a dependency to a protocol-specific
+    // service. Possible solutions are: rename this provider to
+    // StratumDeviceProvider, find a way to allow this provider to register for
+    // protocol specific events (e.g. port events) via drivers (similar to
+    // DeviceAgentListener).
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    private GnmiController gnmiController;
+
+    private GnmiDeviceStateSubscriber gnmiDeviceStateSubscriber;
+
     /**
      * Configure poll frequency for port status and statistics; default is 10 sec.
      */
@@ -228,6 +239,9 @@
         pipeconfWatchdogService.addListener(pipeconfWatchdogListener);
         rescheduleProbeTask(false);
         modified(context);
+        gnmiDeviceStateSubscriber = new GnmiDeviceStateSubscriber(gnmiController,
+                deviceService, mastershipService, providerService);
+        gnmiDeviceStateSubscriber.activate();
         log.info("Started");
     }
 
@@ -316,6 +330,8 @@
         providerRegistry.unregister(this);
         providerService = null;
         cfgService.unregisterConfigFactory(factory);
+        gnmiDeviceStateSubscriber.deactivate();
+        gnmiDeviceStateSubscriber = null;
         log.info("Stopped");
     }
 
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
new file mode 100644
index 0000000..b253f3e
--- /dev/null
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
@@ -0,0 +1,281 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.onosproject.provider.general.device.impl;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Striped;
+import gnmi.Gnmi.Notification;
+import gnmi.Gnmi.Path;
+import gnmi.Gnmi.PathElem;
+import gnmi.Gnmi.SubscribeRequest;
+import gnmi.Gnmi.Subscription;
+import gnmi.Gnmi.SubscriptionList;
+import gnmi.Gnmi.SubscriptionMode;
+import gnmi.Gnmi.Update;
+import org.onlab.util.SharedExecutors;
+import org.onosproject.gnmi.api.GnmiClient;
+import org.onosproject.gnmi.api.GnmiController;
+import org.onosproject.gnmi.api.GnmiEvent;
+import org.onosproject.gnmi.api.GnmiEventListener;
+import org.onosproject.gnmi.api.GnmiUpdate;
+import org.onosproject.gnmi.api.GnmiUtils;
+import org.onosproject.mastership.MastershipEvent;
+import org.onosproject.mastership.MastershipListener;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.SparseAnnotations;
+import org.onosproject.net.device.DefaultPortDescription;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceProviderService;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.device.PortDescription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.locks.Lock;
+
+/**
+ * Entity that manages gNMI subscription for devices using OpenConfig models and
+ * that reports relevant events to the core.
+ */
+@Beta
+class GnmiDeviceStateSubscriber {
+
+    private static Logger log = LoggerFactory.getLogger(GnmiDeviceStateSubscriber.class);
+
+    private final GnmiController gnmiController;
+    private final DeviceService deviceService;
+    private final DeviceProviderService providerService;
+    private final MastershipService mastershipService;
+
+    private final ExecutorService executorService = SharedExecutors.getPoolThreadExecutor();
+
+    private final InternalGnmiEventListener gnmiEventListener = new InternalGnmiEventListener();
+    private final InternalDeviceListener deviceEventListener = new InternalDeviceListener();
+    private final InternalMastershipListener mastershipListener = new InternalMastershipListener();
+    private final Collection<DeviceId> deviceSubscribed = Sets.newHashSet();
+
+    private final Striped<Lock> deviceLocks = Striped.lock(30);
+
+    GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService,
+                              MastershipService mastershipService,
+                              DeviceProviderService providerService) {
+        this.gnmiController = gnmiController;
+        this.deviceService = deviceService;
+        this.mastershipService = mastershipService;
+        this.providerService = providerService;
+    }
+
+    public void activate() {
+        deviceService.addListener(deviceEventListener);
+        mastershipService.addListener(mastershipListener);
+        gnmiController.addListener(gnmiEventListener);
+        // Subscribe to existing devices.
+        deviceService.getDevices().forEach(d -> executorService.execute(
+                () -> checkDeviceSubscription(d.id())));
+    }
+
+    public void deactivate() {
+        deviceSubscribed.forEach(this::unsubscribeIfNeeded);
+        deviceService.removeListener(deviceEventListener);
+        mastershipService.removeListener(mastershipListener);
+        gnmiController.removeListener(gnmiEventListener);
+    }
+
+    private void checkDeviceSubscription(DeviceId deviceId) {
+        deviceLocks.get(deviceId).lock();
+        try {
+            if (!deviceService.isAvailable(deviceId)
+                    || deviceService.getDevice(deviceId) == null
+                    || !mastershipService.isLocalMaster(deviceId)) {
+                // Device not available/removed or this instance is no longer
+                // master.
+                unsubscribeIfNeeded(deviceId);
+            } else {
+                subscribeIfNeeded(deviceId);
+            }
+        } finally {
+            deviceLocks.get(deviceId).unlock();
+        }
+    }
+
+    private Path interfaceOperStatusPath(String interfaceName) {
+        return Path.newBuilder()
+                .addElem(PathElem.newBuilder().setName("interfaces").build())
+                .addElem(PathElem.newBuilder()
+                                 .setName("interface").putKey("name", interfaceName).build())
+                .addElem(PathElem.newBuilder().setName("state").build())
+                .addElem(PathElem.newBuilder().setName("oper-status").build())
+                .build();
+    }
+
+    private void unsubscribeIfNeeded(DeviceId deviceId) {
+        if (!deviceSubscribed.contains(deviceId)) {
+            // Not subscribed.
+            return;
+        }
+        GnmiClient client = gnmiController.getClient(deviceId);
+        if (client == null) {
+            log.debug("Cannot find gNMI client for device {}", deviceId);
+        } else {
+            client.terminateSubscriptionChannel();
+        }
+        deviceSubscribed.remove(deviceId);
+    }
+
+    private void subscribeIfNeeded(DeviceId deviceId) {
+        if (deviceSubscribed.contains(deviceId)) {
+            // Already subscribed.
+            // FIXME: if a new port is added after the first subscription we are
+            // not subscribing to the new port.
+            return;
+        }
+
+        GnmiClient client = gnmiController.getClient(deviceId);
+        if (client == null) {
+            log.warn("Cannot find gNMI client for device {}", deviceId);
+            return;
+        }
+
+        List<Port> ports = deviceService.getPorts(deviceId);
+        SubscriptionList.Builder subscriptionList = SubscriptionList.newBuilder();
+        subscriptionList.setMode(SubscriptionList.Mode.STREAM);
+        subscriptionList.setUpdatesOnly(true);
+
+        ports.forEach(port -> {
+            String portName = port.number().name();
+            // Subscribe /interface/interface[name=port-name]/state/oper-status
+            Path subscribePath = interfaceOperStatusPath(portName);
+            Subscription interfaceOperStatusSub =
+                    Subscription.newBuilder()
+                            .setPath(subscribePath)
+                            .setMode(SubscriptionMode.ON_CHANGE)
+                            .build();
+            // TODO: more state subscription
+            subscriptionList.addSubscription(interfaceOperStatusSub);
+        });
+
+        SubscribeRequest subscribeRequest = SubscribeRequest.newBuilder()
+                .setSubscribe(subscriptionList.build())
+                .build();
+
+        client.subscribe(subscribeRequest);
+
+        deviceSubscribed.add(deviceId);
+    }
+
+    private void handleGnmiUpdate(GnmiUpdate eventSubject) {
+        Notification notification = eventSubject.update();
+        if (notification == null) {
+            log.warn("Cannot handle gNMI event without update data, abort");
+            log.debug("gNMI update:\n{}", eventSubject);
+            return;
+        }
+
+        List<Update> updateList = notification.getUpdateList();
+        updateList.forEach(update -> {
+            Path path = update.getPath();
+            PathElem lastElem = path.getElem(path.getElemCount() - 1);
+
+            // Use last element to identify which state updated
+            if ("oper-status".equals(lastElem.getName())) {
+                handleOperStatusUpdate(eventSubject.deviceId(), update);
+            } else {
+                log.debug("Unrecognized update {}", GnmiUtils.pathToString(path));
+            }
+        });
+    }
+
+    private void handleOperStatusUpdate(DeviceId deviceId, Update update) {
+        Path path = update.getPath();
+        // first element should be "interface"
+        String interfaceName = path.getElem(1).getKeyOrDefault("name", null);
+        if (interfaceName == null) {
+            log.error("No interface present in gNMI update, abort");
+            log.debug("gNMI update:\n{}", update);
+            return;
+        }
+
+        List<Port> portsFromDevice = deviceService.getPorts(deviceId);
+        portsFromDevice.forEach(port -> {
+            if (!port.number().name().equals(interfaceName)) {
+                return;
+            }
+
+            // Port/Interface name is identical in OpenConfig model, but not in ONOS
+            // This might cause some problem if we use one name to different port
+            PortDescription portDescription = DefaultPortDescription.builder()
+                    .portSpeed(port.portSpeed())
+                    .withPortNumber(port.number())
+                    .isEnabled(update.getVal().getStringVal().equals("UP"))
+                    .type(port.type())
+                    .annotations((SparseAnnotations) port.annotations())
+                    .build();
+            providerService.portStatusChanged(deviceId, portDescription);
+        });
+    }
+
+    class InternalGnmiEventListener implements GnmiEventListener {
+
+        @Override
+        public void event(GnmiEvent event) {
+            if (!deviceSubscribed.contains(event.subject().deviceId())) {
+                log.warn("Received gNMI event from {}, but we are not subscribed to it",
+                         event.subject().deviceId());
+            }
+            log.debug("Received gNMI event {}", event.toString());
+            if (event.type() == GnmiEvent.Type.UPDATE) {
+                executorService.execute(
+                        () -> handleGnmiUpdate((GnmiUpdate) event.subject()));
+            } else {
+                log.debug("Unsupported gNMI event type: {}", event.type());
+            }
+        }
+    }
+
+    class InternalMastershipListener implements MastershipListener {
+
+        @Override
+        public void event(MastershipEvent event) {
+            executorService.execute(() -> checkDeviceSubscription(event.subject()));
+        }
+    }
+
+    class InternalDeviceListener implements DeviceListener {
+
+        @Override
+        public void event(DeviceEvent event) {
+            switch (event.type()) {
+                case DEVICE_ADDED:
+                case DEVICE_AVAILABILITY_CHANGED:
+                case DEVICE_UPDATED:
+                case DEVICE_REMOVED:
+                    executorService.execute(
+                            () -> checkDeviceSubscription(event.subject().id()));
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+}