Add support for enabling/disabling ports for gNMI devices

This change also includes:
- Refactoring of gNMI protocol+driver to take advantage of the recent
changes to the gRPC protocol subsystem (e.g. no more locking, start RPC
with timeouts, etc.).
- Fixed Stratum driver to work after GeneralDeviceProvider refactoring
- Updated bmv2.py to generate ChassisConfig for stratum_bmv2
- Fixed portstate command to use the same port name as in the store

Change-Id: I0dad3bc73e4b6d907b5cf6b7b9a2852943226be7
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
index 6e164bf..a527642 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
@@ -17,7 +17,7 @@
 package org.onosproject.provider.general.device.impl;
 
 import com.google.common.annotations.Beta;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Striped;
 import gnmi.Gnmi.Notification;
 import gnmi.Gnmi.Path;
@@ -27,7 +27,6 @@
 import gnmi.Gnmi.SubscriptionList;
 import gnmi.Gnmi.SubscriptionMode;
 import gnmi.Gnmi.Update;
-import org.onlab.util.SharedExecutors;
 import org.onosproject.gnmi.api.GnmiClient;
 import org.onosproject.gnmi.api.GnmiController;
 import org.onosproject.gnmi.api.GnmiEvent;
@@ -40,6 +39,7 @@
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DefaultPortDescription;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
@@ -49,10 +49,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
 
 /**
  * Entity that manages gNMI subscription for devices using OpenConfig models and
@@ -61,7 +63,7 @@
 @Beta
 class GnmiDeviceStateSubscriber {
 
-    private static final String LAST_CHANGE = "last-change";
+    private static final String LAST_CHANGE = "last-changed";
 
     private static Logger log = LoggerFactory.getLogger(GnmiDeviceStateSubscriber.class);
 
@@ -70,12 +72,10 @@
     private final DeviceProviderService providerService;
     private final MastershipService mastershipService;
 
-    private final ExecutorService executorService = SharedExecutors.getPoolThreadExecutor();
-
     private final InternalGnmiEventListener gnmiEventListener = new InternalGnmiEventListener();
     private final InternalDeviceListener deviceEventListener = new InternalDeviceListener();
     private final InternalMastershipListener mastershipListener = new InternalMastershipListener();
-    private final Collection<DeviceId> deviceSubscribed = Sets.newHashSet();
+    private final Map<DeviceId, Set<PortNumber>> deviceSubscribed = Maps.newHashMap();
 
     private final Striped<Lock> deviceLocks = Striped.lock(30);
 
@@ -93,34 +93,40 @@
         mastershipService.addListener(mastershipListener);
         gnmiController.addListener(gnmiEventListener);
         // Subscribe to existing devices.
-        deviceService.getDevices().forEach(d -> executorService.execute(
-                () -> checkDeviceSubscription(d.id())));
+        deviceService.getDevices().forEach(d -> checkSubscription(d.id()));
     }
 
     public void deactivate() {
-        deviceSubscribed.forEach(this::unsubscribeIfNeeded);
+        deviceSubscribed.keySet().forEach(this::unsubscribeIfNeeded);
         deviceService.removeListener(deviceEventListener);
         mastershipService.removeListener(mastershipListener);
         gnmiController.removeListener(gnmiEventListener);
     }
 
-    private void checkDeviceSubscription(DeviceId deviceId) {
+    private void checkSubscription(DeviceId deviceId) {
+        if (gnmiController.getClient(deviceId) == null) {
+            // Ignore devices for which a gNMI client does not exist.
+            return;
+        }
         deviceLocks.get(deviceId).lock();
         try {
-            if (!deviceService.isAvailable(deviceId)
-                    || deviceService.getDevice(deviceId) == null
-                    || !mastershipService.isLocalMaster(deviceId)) {
-                // Device not available/removed or this instance is no longer
-                // master.
-                unsubscribeIfNeeded(deviceId);
-            } else {
+            if (shouldHaveSubscription(deviceId)) {
                 subscribeIfNeeded(deviceId);
+            } else {
+                unsubscribeIfNeeded(deviceId);
             }
         } finally {
             deviceLocks.get(deviceId).unlock();
         }
     }
 
+    private boolean shouldHaveSubscription(DeviceId deviceId) {
+        return deviceService.getDevice(deviceId) != null
+                && deviceService.isAvailable(deviceId)
+                && mastershipService.isLocalMaster(deviceId)
+                && !deviceService.getPorts(deviceId).isEmpty();
+    }
+
     private Path interfaceOperStatusPath(String interfaceName) {
         return Path.newBuilder()
                 .addElem(PathElem.newBuilder().setName("interfaces").build())
@@ -132,40 +138,31 @@
     }
 
     private void unsubscribeIfNeeded(DeviceId deviceId) {
-        if (!deviceSubscribed.contains(deviceId)) {
-            // Not subscribed.
-            return;
+        gnmiController.getClient(deviceId).unsubscribe();
+        if (deviceSubscribed.remove(deviceId) != null) {
+            log.info("Cancelled gNMI subscription for {}", deviceId);
         }
-        GnmiClient client = gnmiController.getClient(deviceId);
-        if (client == null) {
-            log.debug("Cannot find gNMI client for device {}", deviceId);
-        } else {
-            client.terminateSubscriptionChannel();
-        }
-        deviceSubscribed.remove(deviceId);
     }
 
     private void subscribeIfNeeded(DeviceId deviceId) {
-        if (deviceSubscribed.contains(deviceId)) {
-            // Already subscribed.
-            // FIXME: if a new port is added after the first subscription we are
-            // not subscribing to the new port.
+
+        Set<PortNumber> ports = deviceService.getPorts(deviceId).stream()
+                .map(Port::number)
+                .collect(Collectors.toSet());
+
+        if (Objects.equals(ports, deviceSubscribed.get(deviceId))) {
+            // Already subscribed for the same ports.
             return;
         }
 
         GnmiClient client = gnmiController.getClient(deviceId);
-        if (client == null) {
-            log.debug("Cannot find gNMI client for device {}, ignoring.", deviceId);
-            return;
-        }
 
-        List<Port> ports = deviceService.getPorts(deviceId);
         SubscriptionList.Builder subscriptionList = SubscriptionList.newBuilder();
         subscriptionList.setMode(SubscriptionList.Mode.STREAM);
         subscriptionList.setUpdatesOnly(true);
 
         ports.forEach(port -> {
-            String portName = port.number().name();
+            String portName = port.name();
             // Subscribe /interface/interface[name=port-name]/state/oper-status
             Path subscribePath = interfaceOperStatusPath(portName);
             Subscription interfaceOperStatusSub =
@@ -183,7 +180,9 @@
 
         client.subscribe(subscribeRequest);
 
-        deviceSubscribed.add(deviceId);
+        log.info("Started gNMI subscription for {} ports on {}", ports.size(), deviceId);
+
+        deviceSubscribed.put(deviceId, ports);
     }
 
     private void handleGnmiUpdate(GnmiUpdate eventSubject) {
@@ -247,14 +246,16 @@
 
         @Override
         public void event(GnmiEvent event) {
-            if (!deviceSubscribed.contains(event.subject().deviceId())) {
-                log.warn("Received gNMI event from {}, but we are not subscribed to it",
+            if (!deviceSubscribed.containsKey(event.subject().deviceId())) {
+                log.warn("Received gNMI event from {}, but we did'nt expect to " +
+                                 "be subscribed to it! Discarding event...",
                          event.subject().deviceId());
+                return;
             }
+
             log.debug("Received gNMI event {}", event.toString());
             if (event.type() == GnmiEvent.Type.UPDATE) {
-                executorService.execute(
-                        () -> handleGnmiUpdate((GnmiUpdate) event.subject()));
+                handleGnmiUpdate((GnmiUpdate) event.subject());
             } else {
                 log.debug("Unsupported gNMI event type: {}", event.type());
             }
@@ -265,7 +266,7 @@
 
         @Override
         public void event(MastershipEvent event) {
-            executorService.execute(() -> checkDeviceSubscription(event.subject()));
+            checkSubscription(event.subject());
         }
     }
 
@@ -278,8 +279,9 @@
                 case DEVICE_AVAILABILITY_CHANGED:
                 case DEVICE_UPDATED:
                 case DEVICE_REMOVED:
-                    executorService.execute(
-                            () -> checkDeviceSubscription(event.subject().id()));
+                case PORT_ADDED:
+                case PORT_REMOVED:
+                    checkSubscription(event.subject().id());
                     break;
                 default:
                     break;