blob: b253f3e7f13f00e423b448504b7c1026949c8133 [file] [log] [blame]
Yi Tsenge616d752018-11-27 10:53:27 -08001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onosproject.provider.general.device.impl;
18
19import com.google.common.annotations.Beta;
20import com.google.common.collect.Sets;
21import com.google.common.util.concurrent.Striped;
22import gnmi.Gnmi.Notification;
23import gnmi.Gnmi.Path;
24import gnmi.Gnmi.PathElem;
25import gnmi.Gnmi.SubscribeRequest;
26import gnmi.Gnmi.Subscription;
27import gnmi.Gnmi.SubscriptionList;
28import gnmi.Gnmi.SubscriptionMode;
29import gnmi.Gnmi.Update;
30import org.onlab.util.SharedExecutors;
31import org.onosproject.gnmi.api.GnmiClient;
32import org.onosproject.gnmi.api.GnmiController;
33import org.onosproject.gnmi.api.GnmiEvent;
34import org.onosproject.gnmi.api.GnmiEventListener;
35import org.onosproject.gnmi.api.GnmiUpdate;
36import org.onosproject.gnmi.api.GnmiUtils;
37import org.onosproject.mastership.MastershipEvent;
38import org.onosproject.mastership.MastershipListener;
39import org.onosproject.mastership.MastershipService;
40import org.onosproject.net.DeviceId;
41import org.onosproject.net.Port;
42import org.onosproject.net.SparseAnnotations;
43import org.onosproject.net.device.DefaultPortDescription;
44import org.onosproject.net.device.DeviceEvent;
45import org.onosproject.net.device.DeviceListener;
46import org.onosproject.net.device.DeviceProviderService;
47import org.onosproject.net.device.DeviceService;
48import org.onosproject.net.device.PortDescription;
49import org.slf4j.Logger;
50import org.slf4j.LoggerFactory;
51
52import java.util.Collection;
53import java.util.List;
54import java.util.concurrent.ExecutorService;
55import java.util.concurrent.locks.Lock;
56
57/**
58 * Entity that manages gNMI subscription for devices using OpenConfig models and
59 * that reports relevant events to the core.
60 */
61@Beta
62class GnmiDeviceStateSubscriber {
63
64 private static Logger log = LoggerFactory.getLogger(GnmiDeviceStateSubscriber.class);
65
66 private final GnmiController gnmiController;
67 private final DeviceService deviceService;
68 private final DeviceProviderService providerService;
69 private final MastershipService mastershipService;
70
71 private final ExecutorService executorService = SharedExecutors.getPoolThreadExecutor();
72
73 private final InternalGnmiEventListener gnmiEventListener = new InternalGnmiEventListener();
74 private final InternalDeviceListener deviceEventListener = new InternalDeviceListener();
75 private final InternalMastershipListener mastershipListener = new InternalMastershipListener();
76 private final Collection<DeviceId> deviceSubscribed = Sets.newHashSet();
77
78 private final Striped<Lock> deviceLocks = Striped.lock(30);
79
80 GnmiDeviceStateSubscriber(GnmiController gnmiController, DeviceService deviceService,
81 MastershipService mastershipService,
82 DeviceProviderService providerService) {
83 this.gnmiController = gnmiController;
84 this.deviceService = deviceService;
85 this.mastershipService = mastershipService;
86 this.providerService = providerService;
87 }
88
89 public void activate() {
90 deviceService.addListener(deviceEventListener);
91 mastershipService.addListener(mastershipListener);
92 gnmiController.addListener(gnmiEventListener);
93 // Subscribe to existing devices.
94 deviceService.getDevices().forEach(d -> executorService.execute(
95 () -> checkDeviceSubscription(d.id())));
96 }
97
98 public void deactivate() {
99 deviceSubscribed.forEach(this::unsubscribeIfNeeded);
100 deviceService.removeListener(deviceEventListener);
101 mastershipService.removeListener(mastershipListener);
102 gnmiController.removeListener(gnmiEventListener);
103 }
104
105 private void checkDeviceSubscription(DeviceId deviceId) {
106 deviceLocks.get(deviceId).lock();
107 try {
108 if (!deviceService.isAvailable(deviceId)
109 || deviceService.getDevice(deviceId) == null
110 || !mastershipService.isLocalMaster(deviceId)) {
111 // Device not available/removed or this instance is no longer
112 // master.
113 unsubscribeIfNeeded(deviceId);
114 } else {
115 subscribeIfNeeded(deviceId);
116 }
117 } finally {
118 deviceLocks.get(deviceId).unlock();
119 }
120 }
121
122 private Path interfaceOperStatusPath(String interfaceName) {
123 return Path.newBuilder()
124 .addElem(PathElem.newBuilder().setName("interfaces").build())
125 .addElem(PathElem.newBuilder()
126 .setName("interface").putKey("name", interfaceName).build())
127 .addElem(PathElem.newBuilder().setName("state").build())
128 .addElem(PathElem.newBuilder().setName("oper-status").build())
129 .build();
130 }
131
132 private void unsubscribeIfNeeded(DeviceId deviceId) {
133 if (!deviceSubscribed.contains(deviceId)) {
134 // Not subscribed.
135 return;
136 }
137 GnmiClient client = gnmiController.getClient(deviceId);
138 if (client == null) {
139 log.debug("Cannot find gNMI client for device {}", deviceId);
140 } else {
141 client.terminateSubscriptionChannel();
142 }
143 deviceSubscribed.remove(deviceId);
144 }
145
146 private void subscribeIfNeeded(DeviceId deviceId) {
147 if (deviceSubscribed.contains(deviceId)) {
148 // Already subscribed.
149 // FIXME: if a new port is added after the first subscription we are
150 // not subscribing to the new port.
151 return;
152 }
153
154 GnmiClient client = gnmiController.getClient(deviceId);
155 if (client == null) {
156 log.warn("Cannot find gNMI client for device {}", deviceId);
157 return;
158 }
159
160 List<Port> ports = deviceService.getPorts(deviceId);
161 SubscriptionList.Builder subscriptionList = SubscriptionList.newBuilder();
162 subscriptionList.setMode(SubscriptionList.Mode.STREAM);
163 subscriptionList.setUpdatesOnly(true);
164
165 ports.forEach(port -> {
166 String portName = port.number().name();
167 // Subscribe /interface/interface[name=port-name]/state/oper-status
168 Path subscribePath = interfaceOperStatusPath(portName);
169 Subscription interfaceOperStatusSub =
170 Subscription.newBuilder()
171 .setPath(subscribePath)
172 .setMode(SubscriptionMode.ON_CHANGE)
173 .build();
174 // TODO: more state subscription
175 subscriptionList.addSubscription(interfaceOperStatusSub);
176 });
177
178 SubscribeRequest subscribeRequest = SubscribeRequest.newBuilder()
179 .setSubscribe(subscriptionList.build())
180 .build();
181
182 client.subscribe(subscribeRequest);
183
184 deviceSubscribed.add(deviceId);
185 }
186
187 private void handleGnmiUpdate(GnmiUpdate eventSubject) {
188 Notification notification = eventSubject.update();
189 if (notification == null) {
190 log.warn("Cannot handle gNMI event without update data, abort");
191 log.debug("gNMI update:\n{}", eventSubject);
192 return;
193 }
194
195 List<Update> updateList = notification.getUpdateList();
196 updateList.forEach(update -> {
197 Path path = update.getPath();
198 PathElem lastElem = path.getElem(path.getElemCount() - 1);
199
200 // Use last element to identify which state updated
201 if ("oper-status".equals(lastElem.getName())) {
202 handleOperStatusUpdate(eventSubject.deviceId(), update);
203 } else {
204 log.debug("Unrecognized update {}", GnmiUtils.pathToString(path));
205 }
206 });
207 }
208
209 private void handleOperStatusUpdate(DeviceId deviceId, Update update) {
210 Path path = update.getPath();
211 // first element should be "interface"
212 String interfaceName = path.getElem(1).getKeyOrDefault("name", null);
213 if (interfaceName == null) {
214 log.error("No interface present in gNMI update, abort");
215 log.debug("gNMI update:\n{}", update);
216 return;
217 }
218
219 List<Port> portsFromDevice = deviceService.getPorts(deviceId);
220 portsFromDevice.forEach(port -> {
221 if (!port.number().name().equals(interfaceName)) {
222 return;
223 }
224
225 // Port/Interface name is identical in OpenConfig model, but not in ONOS
226 // This might cause some problem if we use one name to different port
227 PortDescription portDescription = DefaultPortDescription.builder()
228 .portSpeed(port.portSpeed())
229 .withPortNumber(port.number())
230 .isEnabled(update.getVal().getStringVal().equals("UP"))
231 .type(port.type())
232 .annotations((SparseAnnotations) port.annotations())
233 .build();
234 providerService.portStatusChanged(deviceId, portDescription);
235 });
236 }
237
238 class InternalGnmiEventListener implements GnmiEventListener {
239
240 @Override
241 public void event(GnmiEvent event) {
242 if (!deviceSubscribed.contains(event.subject().deviceId())) {
243 log.warn("Received gNMI event from {}, but we are not subscribed to it",
244 event.subject().deviceId());
245 }
246 log.debug("Received gNMI event {}", event.toString());
247 if (event.type() == GnmiEvent.Type.UPDATE) {
248 executorService.execute(
249 () -> handleGnmiUpdate((GnmiUpdate) event.subject()));
250 } else {
251 log.debug("Unsupported gNMI event type: {}", event.type());
252 }
253 }
254 }
255
256 class InternalMastershipListener implements MastershipListener {
257
258 @Override
259 public void event(MastershipEvent event) {
260 executorService.execute(() -> checkDeviceSubscription(event.subject()));
261 }
262 }
263
264 class InternalDeviceListener implements DeviceListener {
265
266 @Override
267 public void event(DeviceEvent event) {
268 switch (event.type()) {
269 case DEVICE_ADDED:
270 case DEVICE_AVAILABILITY_CHANGED:
271 case DEVICE_UPDATED:
272 case DEVICE_REMOVED:
273 executorService.execute(
274 () -> checkDeviceSubscription(event.subject().id()));
275 break;
276 default:
277 break;
278 }
279 }
280 }
281}