Yuta HIGUCHI | 8810aa4 | 2017-08-02 15:05:37 -0700 | [diff] [blame] | 1 | /* |
| 2 | * Copyright 2017-present Open Networking Foundation |
| 3 | * |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | * |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | * |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package org.onosproject.d.config.sync.impl; |
| 17 | |
| 18 | import static java.util.concurrent.CompletableFuture.completedFuture; |
| 19 | import static org.onosproject.d.config.DeviceResourceIds.isUnderDeviceRootNode; |
| 20 | import static org.onosproject.d.config.DeviceResourceIds.toDeviceId; |
| 21 | import static org.onosproject.d.config.DeviceResourceIds.toResourceId; |
| 22 | import static org.onosproject.d.config.sync.operation.SetResponse.response; |
| 23 | import static org.slf4j.LoggerFactory.getLogger; |
| 24 | |
Yuta HIGUCHI | 110ba89 | 2018-01-30 13:47:55 -0800 | [diff] [blame] | 25 | import java.time.Duration; |
Yuta HIGUCHI | 8810aa4 | 2017-08-02 15:05:37 -0700 | [diff] [blame] | 26 | import java.util.Collection; |
| 27 | import java.util.HashMap; |
| 28 | import java.util.List; |
| 29 | import java.util.Map; |
| 30 | import java.util.concurrent.CompletableFuture; |
| 31 | import java.util.stream.Collectors; |
| 32 | |
| 33 | import org.apache.felix.scr.annotations.Activate; |
| 34 | import org.apache.felix.scr.annotations.Component; |
| 35 | import org.apache.felix.scr.annotations.Deactivate; |
| 36 | import org.apache.felix.scr.annotations.Reference; |
| 37 | import org.apache.felix.scr.annotations.ReferenceCardinality; |
| 38 | import org.apache.felix.scr.annotations.Service; |
| 39 | import org.onlab.util.Tools; |
| 40 | import org.onosproject.config.DynamicConfigEvent; |
| 41 | import org.onosproject.config.DynamicConfigEvent.Type; |
| 42 | import org.onosproject.config.DynamicConfigListener; |
| 43 | import org.onosproject.config.DynamicConfigService; |
| 44 | import org.onosproject.config.Filter; |
| 45 | import org.onosproject.d.config.DataNodes; |
| 46 | import org.onosproject.d.config.DeviceResourceIds; |
| 47 | import org.onosproject.d.config.ResourceIds; |
| 48 | import org.onosproject.d.config.sync.DeviceConfigSynchronizationProvider; |
| 49 | import org.onosproject.d.config.sync.DeviceConfigSynchronizationProviderRegistry; |
| 50 | import org.onosproject.d.config.sync.DeviceConfigSynchronizationProviderService; |
| 51 | import org.onosproject.d.config.sync.operation.SetRequest; |
| 52 | import org.onosproject.d.config.sync.operation.SetResponse; |
| 53 | import org.onosproject.d.config.sync.operation.SetResponse.Code; |
| 54 | import org.onosproject.net.DeviceId; |
| 55 | import org.onosproject.net.config.NetworkConfigService; |
| 56 | import org.onosproject.net.provider.AbstractProviderRegistry; |
| 57 | import org.onosproject.net.provider.AbstractProviderService; |
| 58 | import org.onosproject.store.primitives.TransactionId; |
| 59 | import org.onosproject.yang.model.DataNode; |
| 60 | import org.onosproject.yang.model.ResourceId; |
| 61 | import org.slf4j.Logger; |
| 62 | |
| 63 | import com.google.common.annotations.Beta; |
| 64 | import com.google.common.collect.ImmutableList; |
| 65 | import com.google.common.collect.ImmutableMap; |
| 66 | |
| 67 | /** |
| 68 | * Component to bridge Dynamic Config store and the Device configuration state. |
| 69 | * <p> |
| 70 | * <ul> |
| 71 | * <li> Propagate DynamicConfig service change downward to Device side via provider. |
| 72 | * <li> Propagate Device triggered change event upward to DyamicConfig service. |
| 73 | * </ul> |
| 74 | */ |
| 75 | @Beta |
| 76 | @Component(immediate = true) |
| 77 | @Service |
| 78 | public class DynamicDeviceConfigSynchronizer |
| 79 | extends AbstractProviderRegistry<DeviceConfigSynchronizationProvider, |
| 80 | DeviceConfigSynchronizationProviderService> |
| 81 | implements DeviceConfigSynchronizationProviderRegistry { |
| 82 | |
| 83 | private static final Logger log = getLogger(DynamicDeviceConfigSynchronizer.class); |
| 84 | |
| 85 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 86 | protected DynamicConfigService dynConfigService; |
| 87 | |
| 88 | @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY) |
| 89 | protected NetworkConfigService netcfgService; |
| 90 | |
| 91 | private DynamicConfigListener listener = new InnerDyConListener(); |
| 92 | |
Yuta HIGUCHI | 110ba89 | 2018-01-30 13:47:55 -0800 | [diff] [blame] | 93 | // FIXME hack for unconsolidated event bug |
| 94 | private Duration quietPeriod = Duration.ofSeconds(2); |
| 95 | private long quietUntil = 0; |
| 96 | |
Yuta HIGUCHI | 8810aa4 | 2017-08-02 15:05:37 -0700 | [diff] [blame] | 97 | @Activate |
| 98 | public void activate() { |
| 99 | // TODO start background task to sync Controller and Device? |
| 100 | dynConfigService.addListener(listener); |
| 101 | log.info("Started"); |
| 102 | } |
| 103 | |
| 104 | @Deactivate |
| 105 | public void deactivate() { |
| 106 | dynConfigService.removeListener(listener); |
| 107 | log.info("Stopped"); |
| 108 | } |
| 109 | |
| 110 | |
| 111 | @Override |
| 112 | protected DeviceConfigSynchronizationProviderService createProviderService( |
| 113 | DeviceConfigSynchronizationProvider provider) { |
| 114 | return new InternalConfigSynchronizationServiceProvider(provider); |
| 115 | } |
| 116 | |
| 117 | @Override |
| 118 | protected DeviceConfigSynchronizationProvider defaultProvider() { |
| 119 | // TODO return provider instance which can deal with "general" provider? |
| 120 | return super.defaultProvider(); |
| 121 | } |
| 122 | |
| 123 | /** |
| 124 | * Proxy to relay Device change event for propagating running "state" |
| 125 | * information up to dynamic configuration service. |
| 126 | */ |
| 127 | class InternalConfigSynchronizationServiceProvider |
| 128 | extends AbstractProviderService<DeviceConfigSynchronizationProvider> |
| 129 | implements DeviceConfigSynchronizationProviderService { |
| 130 | |
| 131 | protected InternalConfigSynchronizationServiceProvider(DeviceConfigSynchronizationProvider provider) { |
| 132 | super(provider); |
| 133 | } |
| 134 | |
| 135 | // TODO API for passive information propagation to be added later on |
| 136 | } |
| 137 | |
| 138 | /** |
| 139 | * DynamicConfigListener to trigger active synchronization toward the device. |
| 140 | */ |
| 141 | class InnerDyConListener implements DynamicConfigListener { |
| 142 | |
| 143 | @Override |
| 144 | public boolean isRelevant(DynamicConfigEvent event) { |
| 145 | // TODO NetconfActiveComponent.isRelevant(DynamicConfigEvent) |
| 146 | // seems to be doing some filtering |
| 147 | // Logic filtering for L3VPN is probably a demo hack, |
| 148 | // but is there any portion of it which is really needed? |
| 149 | // e.g., listen only for device tree events? |
| 150 | |
| 151 | ResourceId path = event.subject(); |
| 152 | // TODO only device tree related event is relevant. |
| 153 | // 1) path is under device tree |
| 154 | // 2) path is root, and DataNode contains element under node |
| 155 | // ... |
| 156 | return true; |
| 157 | } |
| 158 | |
| 159 | @Override |
| 160 | public void event(DynamicConfigEvent event) { |
| 161 | // Note: removed accumulator in the old code assuming, |
| 162 | // event accumulation will happen on Device Config Event level. |
| 163 | |
| 164 | // TODO execute off event dispatch thread |
| 165 | processEventNonBatch(event); |
| 166 | } |
| 167 | |
| 168 | } |
| 169 | |
| 170 | void processEventNonBatch(DynamicConfigEvent event) { |
Yuta HIGUCHI | 110ba89 | 2018-01-30 13:47:55 -0800 | [diff] [blame] | 171 | if (System.currentTimeMillis() < quietUntil) { |
| 172 | log.trace("Ignoring {}. Quiet period until {}", |
| 173 | event, Tools.defaultOffsetDataTime(quietUntil)); |
| 174 | return; |
| 175 | } |
| 176 | |
Yuta HIGUCHI | 8810aa4 | 2017-08-02 15:05:37 -0700 | [diff] [blame] | 177 | ResourceId path = event.subject(); |
| 178 | if (isUnderDeviceRootNode(path)) { |
Yuta HIGUCHI | 9285d97 | 2017-10-16 16:15:00 -0700 | [diff] [blame] | 179 | log.trace("processing event:{}", event); |
Yuta HIGUCHI | 8810aa4 | 2017-08-02 15:05:37 -0700 | [diff] [blame] | 180 | |
| 181 | DeviceId deviceId = DeviceResourceIds.toDeviceId(path); |
| 182 | ResourceId deviceRootPath = DeviceResourceIds.toResourceId(deviceId); |
| 183 | |
Yuta HIGUCHI | e057dee | 2017-09-15 13:56:10 -0700 | [diff] [blame] | 184 | ResourceId absPath = ResourceIds.concat(ResourceIds.ROOT_ID, path); |
| 185 | ResourceId relPath = ResourceIds.relativize(deviceRootPath, absPath); |
| 186 | // give me everything Filter |
Yuta HIGUCHI | 8810aa4 | 2017-08-02 15:05:37 -0700 | [diff] [blame] | 187 | Filter giveMeEverything = Filter.builder().build(); |
| 188 | |
| 189 | DataNode node = dynConfigService.readNode(path, giveMeEverything); |
| 190 | SetRequest request; |
| 191 | switch (event.type()) { |
| 192 | |
| 193 | case NODE_ADDED: |
| 194 | case NODE_REPLACED: |
| 195 | request = SetRequest.builder().replace(relPath, node).build(); |
Yuta HIGUCHI | 6800ced | 2017-11-20 11:15:37 -0800 | [diff] [blame] | 196 | break; |
Yuta HIGUCHI | 8810aa4 | 2017-08-02 15:05:37 -0700 | [diff] [blame] | 197 | case NODE_UPDATED: |
| 198 | // Event has no pay load, only thing we can do is replace. |
| 199 | request = SetRequest.builder().replace(relPath, node).build(); |
| 200 | break; |
| 201 | case NODE_DELETED: |
| 202 | request = SetRequest.builder().delete(relPath).build(); |
| 203 | break; |
| 204 | case UNKNOWN_OPRN: |
| 205 | default: |
| 206 | log.error("Unexpected event {}, aborting", event); |
| 207 | return; |
| 208 | } |
| 209 | |
| 210 | log.info("Dispatching {} request {}", deviceId, request); |
| 211 | CompletableFuture<SetResponse> response = dispatchRequest(deviceId, request); |
| 212 | response.whenComplete((resp, e) -> { |
| 213 | if (e == null) { |
| 214 | if (resp.code() == Code.OK) { |
| 215 | log.info("{} for {} complete", resp, deviceId); |
| 216 | } else { |
| 217 | log.warn("{} for {} had problem", resp, deviceId); |
| 218 | } |
| 219 | } else { |
| 220 | log.error("Request to {} failed {}", deviceId, response, e); |
| 221 | } |
| 222 | }); |
Yuta HIGUCHI | 110ba89 | 2018-01-30 13:47:55 -0800 | [diff] [blame] | 223 | |
| 224 | // FIXME hack for unconsolidated event bug |
| 225 | quietUntil = System.currentTimeMillis() + quietPeriod.toMillis(); |
Yuta HIGUCHI | 9285d97 | 2017-10-16 16:15:00 -0700 | [diff] [blame] | 226 | } else { |
Yuta HIGUCHI | e848fa9 | 2018-03-01 21:14:09 -0800 | [diff] [blame] | 227 | log.info("Ignored event's ResourceId: {}", event.subject()); |
Yuta HIGUCHI | 8810aa4 | 2017-08-02 15:05:37 -0700 | [diff] [blame] | 228 | } |
| 229 | } |
| 230 | |
| 231 | |
| 232 | // was sketch to handle case, where event could contain batch of things... |
| 233 | private void processEvent(DynamicConfigEvent event) { |
| 234 | // TODO assuming event object will contain batch of (atomic) change event |
| 235 | |
| 236 | // What the new event will contain: |
| 237 | Type evtType = event.type(); |
| 238 | |
| 239 | // Transaction ID, can be null |
| 240 | TransactionId txId = null; |
| 241 | |
| 242 | // TODO this might change into collection of (path, val_before, val_after) |
| 243 | |
| 244 | ResourceId path = event.subject(); |
| 245 | // data node (can be tree) representing change, it could be incremental update |
| 246 | DataNode val = null; |
| 247 | |
| 248 | // build per-Device SetRequest |
| 249 | // val could be a tree, containing multiple Device tree, |
| 250 | // break them down into per-Device sub-tree |
| 251 | Map<DeviceId, SetRequest.Builder> requests = new HashMap<>(); |
| 252 | |
| 253 | if (isUnderDeviceRootNode(path)) { |
| 254 | // about single device |
| 255 | buildDeviceRequest(requests, evtType, path, toDeviceId(path), val); |
| 256 | |
| 257 | } else if (DeviceResourceIds.isRootOrDevicesNode(path)) { |
| 258 | // => potentially contain changes spanning multiple Devices |
| 259 | Map<DeviceId, DataNode> perDevices = perDevices(path, val); |
| 260 | |
| 261 | perDevices.forEach((did, dataNode) -> { |
| 262 | buildDeviceRequest(requests, evtType, toResourceId(did), did, dataNode); |
| 263 | }); |
| 264 | |
| 265 | // TODO special care is probably required for delete cases |
| 266 | // especially delete all under devices |
| 267 | |
| 268 | } else { |
| 269 | log.warn("Event not related to a Device?"); |
| 270 | } |
| 271 | |
| 272 | |
| 273 | // TODO assuming event is a batch, |
| 274 | // potentially containing changes for multiple devices, |
| 275 | // who will process/coordinate the batch event? |
| 276 | |
| 277 | |
| 278 | // TODO loop through per-Device change set |
| 279 | List<CompletableFuture<SetResponse>> responses = |
| 280 | requests.entrySet().stream() |
| 281 | .map(entry -> dispatchRequest(entry.getKey(), entry.getValue().build())) |
| 282 | .collect(Collectors.toList()); |
| 283 | |
| 284 | // wait for all responses |
| 285 | List<SetResponse> allResults = Tools.allOf(responses).join(); |
| 286 | // TODO deal with partial failure case (multi-device coordination) |
| 287 | log.info("DEBUG: results: {}", allResults); |
| 288 | } |
| 289 | |
| 290 | // might make sense to make this public |
| 291 | CompletableFuture<SetResponse> dispatchRequest(DeviceId devId, SetRequest req) { |
| 292 | |
| 293 | // determine appropriate provider for this Device |
| 294 | DeviceConfigSynchronizationProvider provider = this.getProvider(devId); |
| 295 | |
| 296 | if (provider == null) { |
| 297 | // no appropriate provider found |
| 298 | // return completed future with failed SetResponse |
| 299 | return completedFuture(response(req, |
| 300 | SetResponse.Code.FAILED_PRECONDITION, |
| 301 | "no provider found for " + devId)); |
| 302 | } |
| 303 | |
| 304 | // dispatch request |
| 305 | return provider.setConfiguration(devId, req) |
| 306 | .handle((resp, err) -> { |
| 307 | if (err == null) { |
| 308 | // set complete |
| 309 | log.info("DEBUG: Req:{}, Resp:{}", req, resp); |
| 310 | return resp; |
| 311 | } else { |
| 312 | // fatal error |
| 313 | log.error("Fatal error on {}", req, err); |
| 314 | return response(req, |
| 315 | SetResponse.Code.UNKNOWN, |
| 316 | "Unknown error " + err); |
| 317 | } |
| 318 | }); |
| 319 | } |
| 320 | |
| 321 | |
| 322 | // may eventually reuse with batch event |
| 323 | /** |
| 324 | * Build device request about a Device. |
| 325 | * |
| 326 | * @param requests map containing request builder to populate |
| 327 | * @param evtType change request type |
| 328 | * @param path to {@code val} |
| 329 | * @param did DeviceId which {@code path} is about |
| 330 | * @param val changed node to write |
| 331 | */ |
| 332 | private void buildDeviceRequest(Map<DeviceId, SetRequest.Builder> requests, |
| 333 | Type evtType, |
| 334 | ResourceId path, |
| 335 | DeviceId did, |
| 336 | DataNode val) { |
| 337 | |
| 338 | SetRequest.Builder request = |
| 339 | requests.computeIfAbsent(did, d -> SetRequest.builder()); |
| 340 | |
| 341 | switch (evtType) { |
| 342 | case NODE_ADDED: |
| 343 | case NODE_REPLACED: |
| 344 | request.replace(path, val); |
| 345 | break; |
| 346 | |
| 347 | case NODE_UPDATED: |
| 348 | // TODO Auto-generated method stub |
| 349 | request.update(path, val); |
| 350 | break; |
| 351 | |
| 352 | case NODE_DELETED: |
| 353 | // TODO Auto-generated method stub |
| 354 | request.delete(path); |
| 355 | break; |
| 356 | |
| 357 | case UNKNOWN_OPRN: |
| 358 | default: |
| 359 | log.warn("Ignoring unexpected {}", evtType); |
| 360 | break; |
| 361 | } |
| 362 | } |
| 363 | |
| 364 | /** |
| 365 | * Breaks down tree {@code val} into per Device subtree. |
| 366 | * |
| 367 | * @param path pointing to {@code val} |
| 368 | * @param val tree which contains only 1 Device. |
| 369 | * @return Device node relative DataNode for each DeviceId |
| 370 | * @throws IllegalArgumentException |
| 371 | */ |
| 372 | private static Map<DeviceId, DataNode> perDevices(ResourceId path, DataNode val) { |
| 373 | if (DeviceResourceIds.isUnderDeviceRootNode(path)) { |
| 374 | // - if path is device root or it's subtree, path alone is sufficient |
| 375 | return ImmutableMap.of(DeviceResourceIds.toDeviceId(path), val); |
| 376 | |
| 377 | } else if (DeviceResourceIds.isRootOrDevicesNode(path)) { |
| 378 | // - if path is "/" or devices, it might be constructible from val tree |
| 379 | final Collection<DataNode> devicesChildren; |
| 380 | if (DeviceResourceIds.isRootNode(path)) { |
| 381 | // root |
| 382 | devicesChildren = DataNodes.childOnlyByName(val, DeviceResourceIds.DEVICES_NAME) |
| 383 | .map(dn -> DataNodes.children(dn)) |
| 384 | .orElse(ImmutableList.of()); |
| 385 | } else { |
| 386 | // devices |
| 387 | devicesChildren = DataNodes.children(val); |
| 388 | } |
| 389 | |
| 390 | return devicesChildren.stream() |
| 391 | // TODO use full schemaId for filtering when ready |
| 392 | .filter(dn -> dn.key().schemaId().name().equals(DeviceResourceIds.DEVICE_NAME)) |
| 393 | .collect(Collectors.toMap(dn -> DeviceResourceIds.toDeviceId(dn.key()), |
| 394 | dn -> dn)); |
| 395 | |
| 396 | } |
| 397 | throw new IllegalArgumentException(path + " not related to Device"); |
| 398 | } |
| 399 | |
| 400 | } |