Add support for enabling/disabling ports for gNMI devices
This change also includes:
- Refactoring of gNMI protocol+driver to take advantage of the recent
changes to the gRPC protocol subsystem (e.g. no more locking, start RPC
with timeouts, etc.).
- Fixed Stratum driver to work after GeneralDeviceProvider refactoring
- Updated bmv2.py to generate ChassisConfig for stratum_bmv2
- Fixed portstate command to use the same port name as in the store
Change-Id: I0dad3bc73e4b6d907b5cf6b7b9a2852943226be7
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
index 6e164bf..a527642 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
@@ -17,7 +17,7 @@
package org.onosproject.provider.general.device.impl;
import com.google.common.annotations.Beta;
-import com.google.common.collect.Sets;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Striped;
import gnmi.Gnmi.Notification;
import gnmi.Gnmi.Path;
@@ -27,7 +27,6 @@
import gnmi.Gnmi.SubscriptionList;
import gnmi.Gnmi.SubscriptionMode;
import gnmi.Gnmi.Update;
-import org.onlab.util.SharedExecutors;
import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiController;
import org.onosproject.gnmi.api.GnmiEvent;
@@ -40,6 +39,7 @@
import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DefaultPortDescription;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
@@ -49,10 +49,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Collection;
import java.util.List;
-import java.util.concurrent.ExecutorService;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
+import java.util.stream.Collectors;
/**
* Entity that manages gNMI subscription for devices using OpenConfig models and
@@ -61,7 +63,7 @@
@Beta
class GnmiDeviceStateSubscriber {
- private static final String LAST_CHANGE = "last-change";
+ private static final String LAST_CHANGE = "last-changed";
private static Logger log = LoggerFactory.getLogger(GnmiDeviceStateSubscriber.class);
@@ -70,12 +72,10 @@
private final DeviceProviderService providerService;
private final MastershipService mastershipService;
- private final ExecutorService executorService = SharedExecutors.getPoolThreadExecutor();
-
private final InternalGnmiEventListener gnmiEventListener = new InternalGnmiEventListener();
private final InternalDeviceListener deviceEventListener = new InternalDeviceListener();
private final InternalMastershipListener mastershipListener = new InternalMastershipListener();
- private final Collection<DeviceId> deviceSubscribed = Sets.newHashSet();
+ private final Map<DeviceId, Set<PortNumber>> deviceSubscribed = Maps.newHashMap();
private final Striped<Lock> deviceLocks = Striped.lock(30);
@@ -93,34 +93,40 @@
mastershipService.addListener(mastershipListener);
gnmiController.addListener(gnmiEventListener);
// Subscribe to existing devices.
- deviceService.getDevices().forEach(d -> executorService.execute(
- () -> checkDeviceSubscription(d.id())));
+ deviceService.getDevices().forEach(d -> checkSubscription(d.id()));
}
public void deactivate() {
- deviceSubscribed.forEach(this::unsubscribeIfNeeded);
+ deviceSubscribed.keySet().forEach(this::unsubscribeIfNeeded);
deviceService.removeListener(deviceEventListener);
mastershipService.removeListener(mastershipListener);
gnmiController.removeListener(gnmiEventListener);
}
- private void checkDeviceSubscription(DeviceId deviceId) {
+ private void checkSubscription(DeviceId deviceId) {
+ if (gnmiController.getClient(deviceId) == null) {
+ // Ignore devices for which a gNMI client does not exist.
+ return;
+ }
deviceLocks.get(deviceId).lock();
try {
- if (!deviceService.isAvailable(deviceId)
- || deviceService.getDevice(deviceId) == null
- || !mastershipService.isLocalMaster(deviceId)) {
- // Device not available/removed or this instance is no longer
- // master.
- unsubscribeIfNeeded(deviceId);
- } else {
+ if (shouldHaveSubscription(deviceId)) {
subscribeIfNeeded(deviceId);
+ } else {
+ unsubscribeIfNeeded(deviceId);
}
} finally {
deviceLocks.get(deviceId).unlock();
}
}
+ private boolean shouldHaveSubscription(DeviceId deviceId) {
+ return deviceService.getDevice(deviceId) != null
+ && deviceService.isAvailable(deviceId)
+ && mastershipService.isLocalMaster(deviceId)
+ && !deviceService.getPorts(deviceId).isEmpty();
+ }
+
private Path interfaceOperStatusPath(String interfaceName) {
return Path.newBuilder()
.addElem(PathElem.newBuilder().setName("interfaces").build())
@@ -132,40 +138,31 @@
}
private void unsubscribeIfNeeded(DeviceId deviceId) {
- if (!deviceSubscribed.contains(deviceId)) {
- // Not subscribed.
- return;
+ gnmiController.getClient(deviceId).unsubscribe();
+ if (deviceSubscribed.remove(deviceId) != null) {
+ log.info("Cancelled gNMI subscription for {}", deviceId);
}
- GnmiClient client = gnmiController.getClient(deviceId);
- if (client == null) {
- log.debug("Cannot find gNMI client for device {}", deviceId);
- } else {
- client.terminateSubscriptionChannel();
- }
- deviceSubscribed.remove(deviceId);
}
private void subscribeIfNeeded(DeviceId deviceId) {
- if (deviceSubscribed.contains(deviceId)) {
- // Already subscribed.
- // FIXME: if a new port is added after the first subscription we are
- // not subscribing to the new port.
+
+ Set<PortNumber> ports = deviceService.getPorts(deviceId).stream()
+ .map(Port::number)
+ .collect(Collectors.toSet());
+
+ if (Objects.equals(ports, deviceSubscribed.get(deviceId))) {
+ // Already subscribed for the same ports.
return;
}
GnmiClient client = gnmiController.getClient(deviceId);
- if (client == null) {
- log.debug("Cannot find gNMI client for device {}, ignoring.", deviceId);
- return;
- }
- List<Port> ports = deviceService.getPorts(deviceId);
SubscriptionList.Builder subscriptionList = SubscriptionList.newBuilder();
subscriptionList.setMode(SubscriptionList.Mode.STREAM);
subscriptionList.setUpdatesOnly(true);
ports.forEach(port -> {
- String portName = port.number().name();
+ String portName = port.name();
// Subscribe /interface/interface[name=port-name]/state/oper-status
Path subscribePath = interfaceOperStatusPath(portName);
Subscription interfaceOperStatusSub =
@@ -183,7 +180,9 @@
client.subscribe(subscribeRequest);
- deviceSubscribed.add(deviceId);
+ log.info("Started gNMI subscription for {} ports on {}", ports.size(), deviceId);
+
+ deviceSubscribed.put(deviceId, ports);
}
private void handleGnmiUpdate(GnmiUpdate eventSubject) {
@@ -247,14 +246,16 @@
@Override
public void event(GnmiEvent event) {
- if (!deviceSubscribed.contains(event.subject().deviceId())) {
- log.warn("Received gNMI event from {}, but we are not subscribed to it",
+ if (!deviceSubscribed.containsKey(event.subject().deviceId())) {
+ log.warn("Received gNMI event from {}, but we did'nt expect to " +
+ "be subscribed to it! Discarding event...",
event.subject().deviceId());
+ return;
}
+
log.debug("Received gNMI event {}", event.toString());
if (event.type() == GnmiEvent.Type.UPDATE) {
- executorService.execute(
- () -> handleGnmiUpdate((GnmiUpdate) event.subject()));
+ handleGnmiUpdate((GnmiUpdate) event.subject());
} else {
log.debug("Unsupported gNMI event type: {}", event.type());
}
@@ -265,7 +266,7 @@
@Override
public void event(MastershipEvent event) {
- executorService.execute(() -> checkDeviceSubscription(event.subject()));
+ checkSubscription(event.subject());
}
}
@@ -278,8 +279,9 @@
case DEVICE_AVAILABILITY_CHANGED:
case DEVICE_UPDATED:
case DEVICE_REMOVED:
- executorService.execute(
- () -> checkDeviceSubscription(event.subject().id()));
+ case PORT_ADDED:
+ case PORT_REMOVED:
+ checkSubscription(event.subject().id());
break;
default:
break;