Fix race condition when processing first Update of gNMI Subscribe RPC
We simply update the subscription map before starting the RPC
Change-Id: I0cedea3ef8415e29d021fc7d396d84cb9a2a33ed
diff --git a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
index a527642..aabd92e 100644
--- a/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
+++ b/providers/general/device/src/main/java/org/onosproject/provider/general/device/impl/GnmiDeviceStateSubscriber.java
@@ -27,7 +27,6 @@
import gnmi.Gnmi.SubscriptionList;
import gnmi.Gnmi.SubscriptionMode;
import gnmi.Gnmi.Update;
-import org.onosproject.gnmi.api.GnmiClient;
import org.onosproject.gnmi.api.GnmiController;
import org.onosproject.gnmi.api.GnmiEvent;
import org.onosproject.gnmi.api.GnmiEventListener;
@@ -155,34 +154,25 @@
return;
}
- GnmiClient client = gnmiController.getClient(deviceId);
+ // Subscribe for the new set of ports.
+ deviceSubscribed.put(deviceId, ports);
- SubscriptionList.Builder subscriptionList = SubscriptionList.newBuilder();
- subscriptionList.setMode(SubscriptionList.Mode.STREAM);
- subscriptionList.setUpdatesOnly(true);
-
- ports.forEach(port -> {
- String portName = port.name();
- // Subscribe /interface/interface[name=port-name]/state/oper-status
- Path subscribePath = interfaceOperStatusPath(portName);
- Subscription interfaceOperStatusSub =
- Subscription.newBuilder()
- .setPath(subscribePath)
- .setMode(SubscriptionMode.ON_CHANGE)
- .build();
- // TODO: more state subscription
- subscriptionList.addSubscription(interfaceOperStatusSub);
- });
-
- SubscribeRequest subscribeRequest = SubscribeRequest.newBuilder()
- .setSubscribe(subscriptionList.build())
+ // Send subscription request.
+ final SubscriptionList subscriptionList = SubscriptionList.newBuilder()
+ .setMode(SubscriptionList.Mode.STREAM)
+ .setUpdatesOnly(true)
+ .addAllSubscription(ports.stream().map(
+ port -> Subscription.newBuilder()
+ .setPath(interfaceOperStatusPath(port.name()))
+ .setMode(SubscriptionMode.ON_CHANGE)
+ .build()).collect(Collectors.toList()))
.build();
-
- client.subscribe(subscribeRequest);
+ gnmiController.getClient(deviceId).subscribe(
+ SubscribeRequest.newBuilder()
+ .setSubscribe(subscriptionList)
+ .build());
log.info("Started gNMI subscription for {} ports on {}", ports.size(), deviceId);
-
- deviceSubscribed.put(deviceId, ports);
}
private void handleGnmiUpdate(GnmiUpdate eventSubject) {