blob: c6f67a36dbebed8e2f65dcad4d85dc36b0082e8d [file] [log] [blame]
Jian Li4fe40e52021-01-06 03:29:58 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
18import org.onlab.util.Tools;
19import org.onosproject.cfg.ComponentConfigService;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.LeadershipService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.kubevirtnode.api.KubevirtNode;
26import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
27import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
28import org.onosproject.kubevirtnode.api.KubevirtNodeHandler;
29import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
30import org.onosproject.kubevirtnode.api.KubevirtNodeState;
31import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.Port;
35import org.onosproject.net.behaviour.BridgeConfig;
36import org.onosproject.net.behaviour.BridgeDescription;
37import org.onosproject.net.behaviour.BridgeName;
38import org.onosproject.net.behaviour.ControllerInfo;
39import org.onosproject.net.behaviour.DefaultBridgeDescription;
40import org.onosproject.net.behaviour.DefaultPatchDescription;
41import org.onosproject.net.behaviour.DefaultTunnelDescription;
42import org.onosproject.net.behaviour.InterfaceConfig;
43import org.onosproject.net.behaviour.PatchDescription;
44import org.onosproject.net.behaviour.TunnelDescription;
45import org.onosproject.net.behaviour.TunnelEndPoints;
46import org.onosproject.net.behaviour.TunnelKey;
47import org.onosproject.net.device.DeviceAdminService;
48import org.onosproject.net.device.DeviceEvent;
49import org.onosproject.net.device.DeviceListener;
50import org.onosproject.net.device.DeviceService;
51import org.onosproject.ovsdb.controller.OvsdbClientService;
52import org.onosproject.ovsdb.controller.OvsdbController;
53import org.osgi.service.component.ComponentContext;
54import org.osgi.service.component.annotations.Activate;
55import org.osgi.service.component.annotations.Component;
56import org.osgi.service.component.annotations.Deactivate;
57import org.osgi.service.component.annotations.Modified;
58import org.osgi.service.component.annotations.Reference;
59import org.osgi.service.component.annotations.ReferenceCardinality;
60import org.slf4j.Logger;
61
62import java.util.Dictionary;
63import java.util.List;
64import java.util.Objects;
65import java.util.Set;
66import java.util.concurrent.ExecutorService;
67import java.util.stream.Collectors;
68
69import static java.lang.Thread.sleep;
70import static java.util.concurrent.Executors.newSingleThreadExecutor;
71import static org.onlab.packet.TpPort.tpPort;
72import static org.onlab.util.Tools.groupedThreads;
73import static org.onosproject.kubevirtnode.api.Constants.BRIDGE_PREFIX;
74import static org.onosproject.kubevirtnode.api.Constants.FLOW_KEY;
75import static org.onosproject.kubevirtnode.api.Constants.GENEVE;
76import static org.onosproject.kubevirtnode.api.Constants.GRE;
77import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_BRIDGE;
78import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_PHYSICAL_PREFIX;
79import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_TUNNEL;
80import static org.onosproject.kubevirtnode.api.Constants.PHYSICAL_TO_INTEGRATION_SUFFIX;
81import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
82import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_TO_INTEGRATION;
83import static org.onosproject.kubevirtnode.api.Constants.VXLAN;
84import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
85import static org.onosproject.kubevirtnode.api.KubevirtNodeState.COMPLETE;
86import static org.onosproject.kubevirtnode.api.KubevirtNodeState.DEVICE_CREATED;
87import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INCOMPLETE;
88import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
89import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
90import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
91import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT;
92import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
93import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.addOrRemoveSystemInterface;
94import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getBooleanProperty;
95import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getOvsdbClient;
96import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.isOvsdbConnected;
97import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.structurePortName;
98import static org.onosproject.net.AnnotationKeys.PORT_NAME;
99import static org.slf4j.LoggerFactory.getLogger;
100
101/**
102 * Service bootstraps kubernetes node based on its type.
103 */
104@Component(immediate = true,
105 property = {
106 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
107 AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
108 }
109)
110public class DefaultKubevirtNodeHandler implements KubevirtNodeHandler {
111
112 private final Logger log = getLogger(getClass());
113
114 private static final String DEFAULT_OF_PROTO = "tcp";
115 private static final int DEFAULT_OFPORT = 6653;
116 private static final int DPID_BEGIN = 3;
117 private static final int NETWORK_BEGIN = 3;
118 private static final long SLEEP_SHORT_MS = 1000; // we wait 1s
119 private static final long SLEEP_LONG_MS = 2000; // we wait 2s
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY)
122 protected CoreService coreService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
125 protected LeadershipService leadershipService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
128 protected ClusterService clusterService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
131 protected DeviceService deviceService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
134 protected DeviceAdminService deviceAdminService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
137 protected OvsdbController ovsdbController;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
140 protected KubevirtNodeAdminService nodeAdminService;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY)
143 protected ComponentConfigService componentConfigService;
144
145 /** OVSDB server listen port. */
146 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
147
148 /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
149 private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
150
151 private final ExecutorService eventExecutor = newSingleThreadExecutor(
152 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
153
154 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
155 private final DeviceListener bridgeListener = new InternalBridgeListener();
156 private final KubevirtNodeListener kubevirtNodeListener = new InternalKubevirtNodeListener();
157
158 private ApplicationId appId;
159 private NodeId localNode;
160
161 @Activate
162 protected void activate() {
163 appId = coreService.getAppId(APP_ID);
164 localNode = clusterService.getLocalNode().id();
165
166 componentConfigService.registerProperties(getClass());
167 leadershipService.runForLeadership(appId.name());
168 deviceService.addListener(ovsdbListener);
169 deviceService.addListener(bridgeListener);
170 nodeAdminService.addListener(kubevirtNodeListener);
171
172 log.info("Started");
173 }
174
175 @Deactivate
176 protected void deactivate() {
177 nodeAdminService.removeListener(kubevirtNodeListener);
178 deviceService.removeListener(bridgeListener);
179 deviceService.removeListener(ovsdbListener);
180 componentConfigService.unregisterProperties(getClass(), false);
181 leadershipService.withdraw(appId.name());
182 eventExecutor.shutdown();
183
184 log.info("Stopped");
185 }
186
187 @Modified
188 protected void modified(ComponentContext context) {
189 readComponentConfiguration(context);
190
191 log.info("Modified");
192 }
193
194 @Override
195 public void processInitState(KubevirtNode node) {
196 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
197 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
198 return;
199 }
200 if (!deviceService.isAvailable(node.intgBridge())) {
201 createBridge(node, INTEGRATION_BRIDGE, node.intgBridge());
202 }
203
204 if (!deviceService.isAvailable(node.tunBridge())) {
205 createBridge(node, TUNNEL_BRIDGE, node.tunBridge());
206 }
207 }
208
209 @Override
210 public void processDeviceCreatedState(KubevirtNode node) {
211 try {
212 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
213 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
214 return;
215 }
216
217 // create patch ports between integration to other bridges
218 createPatchInterfaces(node);
219
220 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
221 createVxlanTunnelInterface(node);
222 }
223
224 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
225 createGreTunnelInterface(node);
226 }
227
228 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
229 createGeneveTunnelInterface(node);
230 }
231
232 // provision new physical interfaces on the given node
233 // this includes creating physical bridge, attaching physical port
234 // to physical bridge, adding patch ports to both physical bridge and br-int
235 provisionPhysicalInterfaces(node);
236
237 } catch (Exception e) {
238 log.error("Exception occurred because of {}", e);
239 }
240 }
241
242 @Override
243 public void processCompleteState(KubevirtNode node) {
244 // do something if needed
245 }
246
247 @Override
248 public void processIncompleteState(KubevirtNode node) {
249 // do something if needed
250 }
251
252 @Override
253 public void processOnBoardedState(KubevirtNode node) {
254 // do something if needed
255 }
256
257 /**
258 * Extracts properties from the component configuration context.
259 *
260 * @param context the component context
261 */
262 private void readComponentConfiguration(ComponentContext context) {
263 Dictionary<?, ?> properties = context.getProperties();
264
265 Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
266 if (ovsdbPortConfigured == null) {
267 ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
268 log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
269 } else {
270 ovsdbPortNum = ovsdbPortConfigured;
271 log.info("Configured. OVSDB port is {}", ovsdbPortNum);
272 }
273
274 Boolean autoRecoveryConfigured =
275 getBooleanProperty(properties, AUTO_RECOVERY);
276 if (autoRecoveryConfigured == null) {
277 autoRecovery = AUTO_RECOVERY_DEFAULT;
278 log.info("Auto recovery flag is NOT " +
279 "configured, default value is {}", autoRecovery);
280 } else {
281 autoRecovery = autoRecoveryConfigured;
282 log.info("Configured. Auto recovery flag is {}", autoRecovery);
283 }
284 }
285
286 /**
287 * Creates a bridge with a given name on a given kubernetes node.
288 *
289 * @param node kubevirt node
290 * @param bridgeName bridge name
291 * @param devId device identifier
292 */
293 private void createBridge(KubevirtNode node, String bridgeName, DeviceId devId) {
294 Device device = deviceService.getDevice(node.ovsdb());
295
296 List<ControllerInfo> controllers = clusterService.getNodes().stream()
297 .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
298 .collect(Collectors.toList());
299
300 String dpid = devId.toString().substring(DPID_BEGIN);
301
302 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
303 .name(bridgeName)
304 .failMode(BridgeDescription.FailMode.SECURE)
305 .datapathId(dpid)
306 .disableInBand()
307 .controllers(controllers);
308
309 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
310 bridgeConfig.addBridge(builder.build());
311 }
312
313 /**
314 * Creates a VXLAN tunnel interface in a given kubevirt node.
315 *
316 * @param node kubevirt node
317 */
318 private void createVxlanTunnelInterface(KubevirtNode node) {
319 createTunnelInterface(node, VXLAN, VXLAN);
320 }
321
322 /**
323 * Creates a GRE tunnel interface in a given kubevirt node.
324 *
325 * @param node kubevirt node
326 */
327 private void createGreTunnelInterface(KubevirtNode node) {
328 createTunnelInterface(node, GRE, GRE);
329 }
330
331 /**
332 * Creates a GENEVE tunnel interface in a given kubevirt node.
333 *
334 * @param node kubevirt node
335 */
336 private void createGeneveTunnelInterface(KubevirtNode node) {
337 createTunnelInterface(node, GENEVE, GENEVE);
338 }
339
340 /**
341 * Creates a tunnel interface in a given kubernetes node.
342 *
343 * @param node kubevirt node
344 */
345 private void createTunnelInterface(KubevirtNode node,
346 String type, String intfName) {
347 if (isIntfEnabled(node, intfName)) {
348 return;
349 }
350
351 Device device = deviceService.getDevice(node.ovsdb());
352 if (device == null || !device.is(InterfaceConfig.class)) {
353 log.error("Failed to create tunnel interface on {}", node.ovsdb());
354 return;
355 }
356
357 TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
358
359 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
360 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
361 }
362
363 /**
364 * Builds tunnel description according to the network type.
365 *
366 * @param type network type
367 * @return tunnel description
368 */
369 private TunnelDescription buildTunnelDesc(String type, String intfName) {
370 TunnelKey<String> key = new TunnelKey<>(FLOW_KEY);
371 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
372 TunnelDescription.Builder tdBuilder =
373 DefaultTunnelDescription.builder()
374 .deviceId(TUNNEL_BRIDGE)
375 .ifaceName(intfName)
376 .remote(TunnelEndPoints.flowTunnelEndpoint())
377 .key(key);
378
379 switch (type) {
380 case VXLAN:
381 tdBuilder.type(TunnelDescription.Type.VXLAN);
382 break;
383 case GRE:
384 tdBuilder.type(TunnelDescription.Type.GRE);
385 break;
386 case GENEVE:
387 tdBuilder.type(TunnelDescription.Type.GENEVE);
388 break;
389 default:
390 return null;
391 }
392
393 return tdBuilder.build();
394 }
395 return null;
396 }
397
398 /**
399 * Checks whether a given network interface in a given kubernetes node
400 * is enabled or not.
401 *
402 * @param node kubevirt node
403 * @param intf network interface name
404 * @return true if the given interface is enabled, false otherwise
405 */
406 private boolean isIntfEnabled(KubevirtNode node, String intf) {
407 return deviceService.isAvailable(node.tunBridge()) &&
408 deviceService.getPorts(node.tunBridge()).stream()
409 .anyMatch(port -> Objects.equals(
410 port.annotations().value(PORT_NAME), intf) &&
411 port.isEnabled());
412 }
413
414 private void createPatchInterfaces(KubevirtNode node) {
415 Device device = deviceService.getDevice(node.ovsdb());
416 if (device == null || !device.is(InterfaceConfig.class)) {
417 log.error("Failed to create patch interface on {}", node.ovsdb());
418 return;
419 }
420
421 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
422
423 // integration bridge -> tunnel bridge
424 PatchDescription brIntTunPatchDesc =
425 DefaultPatchDescription.builder()
426 .deviceId(INTEGRATION_BRIDGE)
427 .ifaceName(INTEGRATION_TO_TUNNEL)
428 .peer(TUNNEL_TO_INTEGRATION)
429 .build();
430
431 ifaceConfig.addPatchMode(INTEGRATION_TO_TUNNEL, brIntTunPatchDesc);
432
433 // tunnel bridge -> integration bridge
434 PatchDescription brTunIntPatchDesc =
435 DefaultPatchDescription.builder()
436 .deviceId(TUNNEL_BRIDGE)
437 .ifaceName(TUNNEL_TO_INTEGRATION)
438 .peer(INTEGRATION_TO_TUNNEL)
439 .build();
440
441 ifaceConfig.addPatchMode(TUNNEL_TO_INTEGRATION, brTunIntPatchDesc);
442 }
443
444 /**
445 * Bootstraps a new kubevirt node.
446 *
447 * @param node kubevirt node
448 */
449 private void bootstrapNode(KubevirtNode node) {
450 if (isCurrentStateDone(node)) {
451 setState(node, node.state().nextState());
452 } else {
453 log.trace("Processing {} state for {}", node.state(), node.hostname());
454 node.state().process(this, node);
455 }
456 }
457
458 /**
459 * Removes the existing kubevirt node.
460 *
461 * @param node kubevirt node
462 */
463 private void removeNode(KubevirtNode node) {
464 OvsdbClientService client = getOvsdbClient(node, ovsdbPortNum, ovsdbController);
465 if (client == null) {
466 log.info("Failed to get ovsdb client");
467 return;
468 }
469
470 // unprovision physical interfaces from the node
471 // this procedure includes detaching physical port from physical bridge,
472 // remove patch ports from br-int, removing physical bridge
473 unprovisionPhysicalInterfaces(node);
474
475 // delete tunnel bridge from the node
476 client.dropBridge(TUNNEL_BRIDGE);
477
478 // delete integration bridge from the node
479 client.dropBridge(INTEGRATION_BRIDGE);
480 }
481
482 /**
483 * Checks whether all requirements for this state are fulfilled or not.
484 *
485 * @param node kubevirt node
486 * @return true if all requirements are fulfilled, false otherwise
487 */
488 private boolean isCurrentStateDone(KubevirtNode node) {
489 switch (node.state()) {
490 case INIT:
491 return isInitStateDone(node);
492 case DEVICE_CREATED:
493 return isDeviceCreatedStateDone(node);
494 case COMPLETE:
495 case INCOMPLETE:
496 case ON_BOARDED:
497 // always return false
498 // run init CLI to re-trigger node bootstrap
499 return false;
500 default:
501 return true;
502 }
503 }
504
505 private boolean isInitStateDone(KubevirtNode node) {
506 if (!isOvsdbConnected(node, ovsdbPortNum,
507 ovsdbController, deviceService)) {
508 return false;
509 }
510
511 try {
512 // we need to wait a while, in case interfaces and bridges
513 // creation requires some time
514 sleep(SLEEP_SHORT_MS);
515 } catch (InterruptedException e) {
516 log.error("Exception caused during init state checking...");
517 }
518
519 cleanPhysicalInterfaces(node);
520
521 return node.intgBridge() != null && node.tunBridge() != null &&
522 deviceService.isAvailable(node.intgBridge()) &&
523 deviceService.isAvailable(node.tunBridge());
524 }
525
526 private boolean isDeviceCreatedStateDone(KubevirtNode node) {
527
528 try {
529 // we need to wait a while, in case tunneling ports
530 // creation requires some time
531 sleep(SLEEP_LONG_MS);
532 } catch (InterruptedException e) {
533 log.error("Exception caused during init state checking...");
534 }
535
536 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
537 return false;
538 }
539 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
540 return false;
541 }
542 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
543 return false;
544 }
545
546 for (KubevirtPhyInterface phyIntf : node.phyIntfs()) {
547 if (phyIntf == null) {
548 return false;
549 }
550
551 String bridgeName = BRIDGE_PREFIX + phyIntf.network();
552 String patchPortName = structurePortName(
553 INTEGRATION_TO_PHYSICAL_PREFIX + phyIntf.network());
554
555 if (!(hasPhyBridge(node, bridgeName) &&
556 hasPhyPatchPort(node, patchPortName) &&
557 hasPhyIntf(node, phyIntf.intf()))) {
558 return false;
559 }
560 }
561
562 return true;
563 }
564
565 /**
566 * Configures the kubernetes node with new state.
567 *
568 * @param node kubevirt node
569 * @param newState a new state
570 */
571 private void setState(KubevirtNode node, KubevirtNodeState newState) {
572 if (node.state() == newState) {
573 return;
574 }
575 KubevirtNode updated = node.updateState(newState);
576 nodeAdminService.updateNode(updated);
577 log.info("Changed {} state: {}", node.hostname(), newState);
578 }
579
580 private void provisionPhysicalInterfaces(KubevirtNode node) {
581 node.phyIntfs().forEach(pi -> {
582 String bridgeName = BRIDGE_PREFIX + pi.network();
583 String patchPortName =
584 structurePortName(INTEGRATION_TO_PHYSICAL_PREFIX + pi.network());
585
586 if (!hasPhyBridge(node, bridgeName)) {
587 createPhysicalBridge(node, pi);
588 createPhysicalPatchPorts(node, pi);
589 attachPhysicalPort(node, pi);
590 } else {
591 // in case physical bridge exists, but patch port is missing on br-int,
592 // we will add patch port to connect br-int with physical bridge
593 if (!hasPhyPatchPort(node, patchPortName)) {
594 createPhysicalPatchPorts(node, pi);
595 }
596 }
597 });
598 }
599
600 private void cleanPhysicalInterfaces(KubevirtNode node) {
601 Device device = deviceService.getDevice(node.ovsdb());
602
603 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
604
605 Set<String> bridgeNames = bridgeConfig.getBridges().stream()
606 .map(BridgeDescription::name).collect(Collectors.toSet());
607
608 Set<String> phyNetworkNames = node.phyIntfs().stream()
609 .map(pi -> BRIDGE_PREFIX + pi.network()).collect(Collectors.toSet());
610
611 // we remove existing physical bridges and patch ports, if the physical
612 // bridges are not defined in kubevirt node
613 for (String brName : bridgeNames) {
614 if (!phyNetworkNames.contains(brName)) {
615 // integration bridge and tunnel bridge should NOT be treated as
616 // physical bridges
617 if (brName.equals(INTEGRATION_BRIDGE) || brName.equals(TUNNEL_BRIDGE)) {
618 continue;
619 }
620 removePhysicalPatchPorts(node, brName.substring(NETWORK_BEGIN));
621 removePhysicalBridge(node, brName.substring(NETWORK_BEGIN));
622 }
623 }
624 }
625
626 private void unprovisionPhysicalInterfaces(KubevirtNode node) {
627 node.phyIntfs().forEach(pi -> {
628 detachPhysicalPort(node, pi.network(), pi.intf());
629 removePhysicalPatchPorts(node, pi.network());
630 removePhysicalBridge(node, pi.network());
631 });
632 }
633
634 private boolean hasPhyBridge(KubevirtNode node, String bridgeName) {
635 BridgeConfig bridgeConfig =
636 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
637 return bridgeConfig.getBridges().stream()
638 .anyMatch(br -> br.name().equals(bridgeName));
639 }
640
641 private boolean hasPhyPatchPort(KubevirtNode node, String patchPortName) {
642 List<Port> ports = deviceService.getPorts(node.intgBridge());
643 return ports.stream().anyMatch(p ->
644 p.annotations().value(PORT_NAME).equals(patchPortName));
645 }
646
647 private boolean hasPhyIntf(KubevirtNode node, String intfName) {
648 BridgeConfig bridgeConfig =
649 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
650 return bridgeConfig.getPorts().stream()
651 .anyMatch(p -> p.annotations().value(PORT_NAME).equals(intfName));
652 }
653
654 private void createPhysicalBridge(KubevirtNode osNode,
655 KubevirtPhyInterface phyInterface) {
656 Device device = deviceService.getDevice(osNode.ovsdb());
657
658 String bridgeName = BRIDGE_PREFIX + phyInterface.network();
659
660 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
661 .name(bridgeName)
662 .mcastSnoopingEnable();
663
664 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
665 bridgeConfig.addBridge(builder.build());
666 }
667
668 private void removePhysicalBridge(KubevirtNode node, String network) {
669 Device device = deviceService.getDevice(node.ovsdb());
670
671 BridgeName bridgeName = BridgeName.bridgeName(BRIDGE_PREFIX + network);
672
673 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
674 bridgeConfig.deleteBridge(bridgeName);
675 }
676
677 private void createPhysicalPatchPorts(KubevirtNode node,
678 KubevirtPhyInterface phyInterface) {
679 Device device = deviceService.getDevice(node.ovsdb());
680
681 if (device == null || !device.is(InterfaceConfig.class)) {
682 log.error("Failed to create patch interface on {}", node.ovsdb());
683 return;
684 }
685
686 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
687
688 String intToPhyPatchPort = structurePortName(
689 INTEGRATION_TO_PHYSICAL_PREFIX + phyInterface.network());
690 String phyToIntPatchPort = structurePortName(
691 phyInterface.network() + PHYSICAL_TO_INTEGRATION_SUFFIX);
692
693 // integration bridge -> physical bridge
694 PatchDescription intToPhyPatchDesc =
695 DefaultPatchDescription.builder()
696 .deviceId(INTEGRATION_BRIDGE)
697 .ifaceName(intToPhyPatchPort)
698 .peer(phyToIntPatchPort)
699 .build();
700
701 // physical bridge -> integration bridge
702 PatchDescription phyToIntPatchDesc =
703 DefaultPatchDescription.builder()
704 .deviceId(physicalDeviceId)
705 .ifaceName(phyToIntPatchPort)
706 .peer(intToPhyPatchPort)
707 .build();
708
709 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
710 ifaceConfig.addPatchMode(INTEGRATION_TO_PHYSICAL_PREFIX +
711 phyInterface.network(), intToPhyPatchDesc);
712 ifaceConfig.addPatchMode(phyInterface.network() +
713 PHYSICAL_TO_INTEGRATION_SUFFIX, phyToIntPatchDesc);
714
715 addOrRemoveSystemInterface(node, physicalDeviceId,
716 phyInterface.intf(), deviceService, true);
717 }
718
719 private void removePhysicalPatchPorts(KubevirtNode node, String network) {
720 Device device = deviceService.getDevice(node.ovsdb());
721
722 if (device == null || !device.is(InterfaceConfig.class)) {
723 log.error("Failed to remove patch interface on {}", node.ovsdb());
724 return;
725 }
726
727 String intToPhyPatchPort = structurePortName(
728 INTEGRATION_TO_PHYSICAL_PREFIX + network);
729
730 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
731 ifaceConfig.removePatchMode(intToPhyPatchPort);
732 }
733
734 private void attachPhysicalPort(KubevirtNode node,
735 KubevirtPhyInterface phyInterface) {
736
737 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
738
739 addOrRemoveSystemInterface(node, physicalDeviceId,
740 phyInterface.intf(), deviceService, true);
741 }
742
743 private void detachPhysicalPort(KubevirtNode node, String network, String portName) {
744 String physicalDeviceId = BRIDGE_PREFIX + network;
745
746 addOrRemoveSystemInterface(node, physicalDeviceId, portName, deviceService, false);
747 }
748
749 /**
750 * An internal OVSDB listener. This listener is used for listening the
751 * network facing events from OVSDB device. If a new OVSDB device is detected,
752 * ONOS tries to bootstrap the kubernetes node.
753 */
754 private class InternalOvsdbListener implements DeviceListener {
755
756 @Override
757 public boolean isRelevant(DeviceEvent event) {
758 return event.subject().type() == Device.Type.CONTROLLER;
759 }
760
761 private boolean isRelevantHelper() {
762 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
763 }
764
765 @Override
766 public void event(DeviceEvent event) {
767 Device device = event.subject();
768
769 switch (event.type()) {
770 case DEVICE_AVAILABILITY_CHANGED:
771 case DEVICE_ADDED:
772 eventExecutor.execute(() -> {
773
774 if (!isRelevantHelper()) {
775 return;
776 }
777
778 KubevirtNode node = nodeAdminService.node(device.id());
779
780 if (node == null) {
781 return;
782 }
783
784 if (deviceService.isAvailable(device.id())) {
785 log.debug("OVSDB {} detected", device.id());
786 bootstrapNode(node);
787 }
788 });
789 break;
790 case PORT_ADDED:
791 case PORT_REMOVED:
792 case DEVICE_REMOVED:
793 default:
794 // do nothing
795 break;
796 }
797 }
798 }
799
800 /**
801 * An internal integration bridge listener. This listener is used for
802 * listening the events from integration bridge. To listen the events from
803 * other types of bridge such as provider bridge or tunnel bridge, we need
804 * to augment KubevirtNodeService.node() method.
805 */
806 private class InternalBridgeListener implements DeviceListener {
807
808 @Override
809 public boolean isRelevant(DeviceEvent event) {
810 return event.subject().type() == Device.Type.SWITCH;
811 }
812
813 private boolean isRelevantHelper() {
814 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
815 }
816
817 @Override
818 public void event(DeviceEvent event) {
819 Device device = event.subject();
820 Port port = event.port();
821
822 switch (event.type()) {
823 case DEVICE_AVAILABILITY_CHANGED:
824 case DEVICE_ADDED:
825 eventExecutor.execute(() -> processDeviceAddition(device));
826 break;
827 case PORT_UPDATED:
828 case PORT_ADDED:
829 eventExecutor.execute(() -> processPortAddition(device, port));
830 break;
831 case PORT_REMOVED:
832 eventExecutor.execute(() -> processPortRemoval(device, port));
833 break;
834 case DEVICE_REMOVED:
835 default:
836 // do nothing
837 break;
838 }
839 }
840
841 void processDeviceAddition(Device device) {
842 if (!isRelevantHelper()) {
843 return;
844 }
845
846 KubevirtNode node = nodeAdminService.node(device.id());
847
848 if (node == null) {
849 return;
850 }
851
852 if (deviceService.isAvailable(device.id())) {
853 log.debug("Bridge created on {}", node.hostname());
854 bootstrapNode(node);
855 } else if (node.state() == COMPLETE) {
856 log.info("Device {} disconnected", device.id());
857 setState(node, INCOMPLETE);
858 }
859
860 if (autoRecovery) {
861 if (node.state() == INCOMPLETE || node.state() == DEVICE_CREATED) {
862 log.info("Device {} is reconnected", device.id());
863 nodeAdminService.updateNode(node.updateState(INIT));
864 }
865 }
866 }
867
868 void processPortAddition(Device device, Port port) {
869 if (!isRelevantHelper()) {
870 return;
871 }
872
873 KubevirtNode node = nodeAdminService.nodeByTunBridge(device.id());
874
875 if (node == null) {
876 return;
877 }
878
879 String portName = port.annotations().value(PORT_NAME);
880 if (node.state() == DEVICE_CREATED && (
881 Objects.equals(portName, VXLAN) ||
882 Objects.equals(portName, GRE) ||
883 Objects.equals(portName, GENEVE))) {
884 log.info("Interface {} added or updated to {}",
885 portName, device.id());
886 bootstrapNode(node);
887 }
888 }
889
890 void processPortRemoval(Device device, Port port) {
891 if (!isRelevantHelper()) {
892 return;
893 }
894
895 KubevirtNode node = nodeAdminService.node(device.id());
896
897 if (node == null) {
898 return;
899 }
900
901 String portName = port.annotations().value(PORT_NAME);
902 if (node.state() == COMPLETE && (
903 Objects.equals(portName, VXLAN) ||
904 Objects.equals(portName, GRE) ||
905 Objects.equals(portName, GENEVE))) {
906 log.warn("Interface {} removed from {}", portName, device.id());
907 setState(node, INCOMPLETE);
908 }
909 }
910 }
911
912 /**
913 * An internal kubevirt node listener.
914 * The notification is triggered by KubevirtNodeStore.
915 */
916 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
917
918 private boolean isRelevantHelper() {
919 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
920 }
921
922 @Override
923 public void event(KubevirtNodeEvent event) {
924 switch (event.type()) {
925 case KUBEVIRT_NODE_CREATED:
926 case KUBEVIRT_NODE_UPDATED:
927 eventExecutor.execute(() -> {
928 if (!isRelevantHelper()) {
929 return;
930 }
931 bootstrapNode(event.subject());
932 });
933 break;
934 case KUBEVIRT_NODE_REMOVED:
935 eventExecutor.execute(() -> {
936 if (!isRelevantHelper()) {
937 return;
938 }
939 removeNode(event.subject());
940 });
941 break;
942 case KUBEVIRT_NODE_INCOMPLETE:
943 default:
944 break;
945 }
946 }
947 }
948}