blob: 028c5fb04726c8ce5051180f00a106556965f7bf [file] [log] [blame]
Yi Tsenge616d752018-11-27 10:53:27 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
20import com.google.common.collect.Sets;
21import com.google.common.util.concurrent.Striped;
22import gnmi.Gnmi.Notification;
23import gnmi.Gnmi.Path;
24import gnmi.Gnmi.PathElem;
25import gnmi.Gnmi.SubscribeRequest;
26import gnmi.Gnmi.Subscription;
27import gnmi.Gnmi.SubscriptionList;
28import gnmi.Gnmi.SubscriptionMode;
29import gnmi.Gnmi.Update;
30import org.onlab.util.SharedExecutors;
31import org.onosproject.gnmi.api.GnmiClient;
32import org.onosproject.gnmi.api.GnmiController;
33import org.onosproject.gnmi.api.GnmiEvent;
34import org.onosproject.gnmi.api.GnmiEventListener;
35import org.onosproject.gnmi.api.GnmiUpdate;
36import org.onosproject.gnmi.api.GnmiUtils;
37import org.onosproject.mastership.MastershipEvent;
38import org.onosproject.mastership.MastershipListener;
39import org.onosproject.mastership.MastershipService;
Yi Tseng59d5f3e2018-11-27 23:09:41 -080040import org.onosproject.net.DefaultAnnotations;
Yi Tsenge616d752018-11-27 10:53:27 -080041import org.onosproject.net.DeviceId;
42import org.onosproject.net.Port;
Yi Tsenge616d752018-11-27 10:53:27 -080043import org.onosproject.net.device.DefaultPortDescription;
44import org.onosproject.net.device.DeviceEvent;
45import org.onosproject.net.device.DeviceListener;
46import org.onosproject.net.device.DeviceProviderService;
47import org.onosproject.net.device.DeviceService;
48import org.onosproject.net.device.PortDescription;
49import org.slf4j.Logger;
50import org.slf4j.LoggerFactory;
51
52import java.util.Collection;
53import java.util.List;
54import java.util.concurrent.ExecutorService;
55import java.util.concurrent.locks.Lock;
56
57/**
58 * Entity that manages gNMI subscription for devices using OpenConfig models and
59 * that reports relevant events to the core.
60 */
61@Beta
62class GnmiDeviceStateSubscriber {
63
Yi Tseng59d5f3e2018-11-27 23:09:41 -080064 private static final String LAST_CHANGE = "last-change";
65
Yi Tsenge616d752018-11-27 10:53:27 -080066 private static Logger log = LoggerFactory.getLogger(GnmiDeviceStateSubscriber.class);
67
68 private final GnmiController gnmiController;
69 private final DeviceService deviceService;
70 private final DeviceProviderService providerService;
71 private final MastershipService mastershipService;
72
73 private final ExecutorService executorService = SharedExecutors.getPoolThreadExecutor();
74
75 private final InternalGnmiEventListener gnmiEventListener = new InternalGnmiEventListener();
76 private final InternalDeviceListener deviceEventListener = new InternalDeviceListener();
77 private final InternalMastershipListener mastershipListener = new InternalMastershipListener();
78 private final Collection<DeviceId> deviceSubscribed = Sets.newHashSet();
79
80 private final Striped<Lock> deviceLocks = Striped.lock(30);
81
82 GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService,
83 MastershipService mastershipService,
84 DeviceProviderService providerService) {
85 this.gnmiController = gnmiController;
86 this.deviceService = deviceService;
87 this.mastershipService = mastershipService;
88 this.providerService = providerService;
89 }
90
91 public void activate() {
92 deviceService.addListener(deviceEventListener);
93 mastershipService.addListener(mastershipListener);
94 gnmiController.addListener(gnmiEventListener);
95 // Subscribe to existing devices.
96 deviceService.getDevices().forEach(d -> executorService.execute(
97 () -> checkDeviceSubscription(d.id())));
98 }
99
100 public void deactivate() {
101 deviceSubscribed.forEach(this::unsubscribeIfNeeded);
102 deviceService.removeListener(deviceEventListener);
103 mastershipService.removeListener(mastershipListener);
104 gnmiController.removeListener(gnmiEventListener);
105 }
106
107 private void checkDeviceSubscription(DeviceId deviceId) {
108 deviceLocks.get(deviceId).lock();
109 try {
110 if (!deviceService.isAvailable(deviceId)
111 || deviceService.getDevice(deviceId) == null
112 || !mastershipService.isLocalMaster(deviceId)) {
113 // Device not available/removed or this instance is no longer
114 // master.
115 unsubscribeIfNeeded(deviceId);
116 } else {
117 subscribeIfNeeded(deviceId);
118 }
119 } finally {
120 deviceLocks.get(deviceId).unlock();
121 }
122 }
123
124 private Path interfaceOperStatusPath(String interfaceName) {
125 return Path.newBuilder()
126 .addElem(PathElem.newBuilder().setName("interfaces").build())
127 .addElem(PathElem.newBuilder()
128 .setName("interface").putKey("name", interfaceName).build())
129 .addElem(PathElem.newBuilder().setName("state").build())
130 .addElem(PathElem.newBuilder().setName("oper-status").build())
131 .build();
132 }
133
134 private void unsubscribeIfNeeded(DeviceId deviceId) {
135 if (!deviceSubscribed.contains(deviceId)) {
136 // Not subscribed.
137 return;
138 }
139 GnmiClient client = gnmiController.getClient(deviceId);
140 if (client == null) {
141 log.debug("Cannot find gNMI client for device {}", deviceId);
142 } else {
143 client.terminateSubscriptionChannel();
144 }
145 deviceSubscribed.remove(deviceId);
146 }
147
148 private void subscribeIfNeeded(DeviceId deviceId) {
149 if (deviceSubscribed.contains(deviceId)) {
150 // Already subscribed.
151 // FIXME: if a new port is added after the first subscription we are
152 // not subscribing to the new port.
153 return;
154 }
155
156 GnmiClient client = gnmiController.getClient(deviceId);
157 if (client == null) {
158 log.warn("Cannot find gNMI client for device {}", deviceId);
159 return;
160 }
161
162 List<Port> ports = deviceService.getPorts(deviceId);
163 SubscriptionList.Builder subscriptionList = SubscriptionList.newBuilder();
164 subscriptionList.setMode(SubscriptionList.Mode.STREAM);
165 subscriptionList.setUpdatesOnly(true);
166
167 ports.forEach(port -> {
168 String portName = port.number().name();
169 // Subscribe /interface/interface[name=port-name]/state/oper-status
170 Path subscribePath = interfaceOperStatusPath(portName);
171 Subscription interfaceOperStatusSub =
172 Subscription.newBuilder()
173 .setPath(subscribePath)
174 .setMode(SubscriptionMode.ON_CHANGE)
175 .build();
176 // TODO: more state subscription
177 subscriptionList.addSubscription(interfaceOperStatusSub);
178 });
179
180 SubscribeRequest subscribeRequest = SubscribeRequest.newBuilder()
181 .setSubscribe(subscriptionList.build())
182 .build();
183
184 client.subscribe(subscribeRequest);
185
186 deviceSubscribed.add(deviceId);
187 }
188
189 private void handleGnmiUpdate(GnmiUpdate eventSubject) {
190 Notification notification = eventSubject.update();
191 if (notification == null) {
192 log.warn("Cannot handle gNMI event without update data, abort");
193 log.debug("gNMI update:\n{}", eventSubject);
194 return;
195 }
196
197 List<Update> updateList = notification.getUpdateList();
198 updateList.forEach(update -> {
199 Path path = update.getPath();
200 PathElem lastElem = path.getElem(path.getElemCount() - 1);
201
202 // Use last element to identify which state updated
203 if ("oper-status".equals(lastElem.getName())) {
Yi Tseng59d5f3e2018-11-27 23:09:41 -0800204 handleOperStatusUpdate(eventSubject.deviceId(), update,
205 notification.getTimestamp());
Yi Tsenge616d752018-11-27 10:53:27 -0800206 } else {
207 log.debug("Unrecognized update {}", GnmiUtils.pathToString(path));
208 }
209 });
210 }
211
Yi Tseng59d5f3e2018-11-27 23:09:41 -0800212 private void handleOperStatusUpdate(DeviceId deviceId, Update update, long timestamp) {
Yi Tsenge616d752018-11-27 10:53:27 -0800213 Path path = update.getPath();
214 // first element should be "interface"
215 String interfaceName = path.getElem(1).getKeyOrDefault("name", null);
216 if (interfaceName == null) {
217 log.error("No interface present in gNMI update, abort");
218 log.debug("gNMI update:\n{}", update);
219 return;
220 }
221
222 List<Port> portsFromDevice = deviceService.getPorts(deviceId);
223 portsFromDevice.forEach(port -> {
224 if (!port.number().name().equals(interfaceName)) {
225 return;
226 }
227
Yi Tseng59d5f3e2018-11-27 23:09:41 -0800228 DefaultAnnotations portAnnotations = DefaultAnnotations.builder()
229 .putAll(port.annotations())
230 .set(LAST_CHANGE, String.valueOf(timestamp))
231 .build();
232
Yi Tsenge616d752018-11-27 10:53:27 -0800233 // Port/Interface name is identical in OpenConfig model, but not in ONOS
234 // This might cause some problem if we use one name to different port
235 PortDescription portDescription = DefaultPortDescription.builder()
236 .portSpeed(port.portSpeed())
237 .withPortNumber(port.number())
238 .isEnabled(update.getVal().getStringVal().equals("UP"))
239 .type(port.type())
Yi Tseng59d5f3e2018-11-27 23:09:41 -0800240 .annotations(portAnnotations)
Yi Tsenge616d752018-11-27 10:53:27 -0800241 .build();
242 providerService.portStatusChanged(deviceId, portDescription);
243 });
244 }
245
246 class InternalGnmiEventListener implements GnmiEventListener {
247
248 @Override
249 public void event(GnmiEvent event) {
250 if (!deviceSubscribed.contains(event.subject().deviceId())) {
251 log.warn("Received gNMI event from {}, but we are not subscribed to it",
252 event.subject().deviceId());
253 }
254 log.debug("Received gNMI event {}", event.toString());
255 if (event.type() == GnmiEvent.Type.UPDATE) {
256 executorService.execute(
257 () -> handleGnmiUpdate((GnmiUpdate) event.subject()));
258 } else {
259 log.debug("Unsupported gNMI event type: {}", event.type());
260 }
261 }
262 }
263
264 class InternalMastershipListener implements MastershipListener {
265
266 @Override
267 public void event(MastershipEvent event) {
268 executorService.execute(() -> checkDeviceSubscription(event.subject()));
269 }
270 }
271
272 class InternalDeviceListener implements DeviceListener {
273
274 @Override
275 public void event(DeviceEvent event) {
276 switch (event.type()) {
277 case DEVICE_ADDED:
278 case DEVICE_AVAILABILITY_CHANGED:
279 case DEVICE_UPDATED:
280 case DEVICE_REMOVED:
281 executorService.execute(
282 () -> checkDeviceSubscription(event.subject().id()));
283 break;
284 default:
285 break;
286 }
287 }
288 }
289}