blob: 72d1ec0ad001ecee1e7239546e90603f23ef23ee [file] [log] [blame]
Jian Li4fe40e52021-01-06 03:29:58 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
Jian Li72f3dac2021-01-28 16:14:54 +090018import com.google.common.collect.Lists;
19import org.onlab.packet.IpAddress;
Jian Li4fe40e52021-01-06 03:29:58 +090020import org.onlab.util.Tools;
21import org.onosproject.cfg.ComponentConfigService;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
Jian Li72f3dac2021-01-28 16:14:54 +090027import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
Jian Li4fe40e52021-01-06 03:29:58 +090028import org.onosproject.kubevirtnode.api.KubevirtNode;
29import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
30import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
31import org.onosproject.kubevirtnode.api.KubevirtNodeHandler;
32import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
33import org.onosproject.kubevirtnode.api.KubevirtNodeState;
34import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
35import org.onosproject.net.Device;
36import org.onosproject.net.DeviceId;
37import org.onosproject.net.Port;
38import org.onosproject.net.behaviour.BridgeConfig;
39import org.onosproject.net.behaviour.BridgeDescription;
40import org.onosproject.net.behaviour.BridgeName;
41import org.onosproject.net.behaviour.ControllerInfo;
42import org.onosproject.net.behaviour.DefaultBridgeDescription;
43import org.onosproject.net.behaviour.DefaultPatchDescription;
44import org.onosproject.net.behaviour.DefaultTunnelDescription;
45import org.onosproject.net.behaviour.InterfaceConfig;
46import org.onosproject.net.behaviour.PatchDescription;
47import org.onosproject.net.behaviour.TunnelDescription;
48import org.onosproject.net.behaviour.TunnelEndPoints;
49import org.onosproject.net.behaviour.TunnelKey;
50import org.onosproject.net.device.DeviceAdminService;
51import org.onosproject.net.device.DeviceEvent;
52import org.onosproject.net.device.DeviceListener;
53import org.onosproject.net.device.DeviceService;
54import org.onosproject.ovsdb.controller.OvsdbClientService;
55import org.onosproject.ovsdb.controller.OvsdbController;
56import org.osgi.service.component.ComponentContext;
57import org.osgi.service.component.annotations.Activate;
58import org.osgi.service.component.annotations.Component;
59import org.osgi.service.component.annotations.Deactivate;
60import org.osgi.service.component.annotations.Modified;
61import org.osgi.service.component.annotations.Reference;
62import org.osgi.service.component.annotations.ReferenceCardinality;
63import org.slf4j.Logger;
64
65import java.util.Dictionary;
66import java.util.List;
67import java.util.Objects;
68import java.util.Set;
69import java.util.concurrent.ExecutorService;
70import java.util.stream.Collectors;
71
72import static java.lang.Thread.sleep;
73import static java.util.concurrent.Executors.newSingleThreadExecutor;
74import static org.onlab.packet.TpPort.tpPort;
75import static org.onlab.util.Tools.groupedThreads;
76import static org.onosproject.kubevirtnode.api.Constants.BRIDGE_PREFIX;
77import static org.onosproject.kubevirtnode.api.Constants.FLOW_KEY;
78import static org.onosproject.kubevirtnode.api.Constants.GENEVE;
79import static org.onosproject.kubevirtnode.api.Constants.GRE;
80import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_BRIDGE;
81import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_PHYSICAL_PREFIX;
82import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_TUNNEL;
83import static org.onosproject.kubevirtnode.api.Constants.PHYSICAL_TO_INTEGRATION_SUFFIX;
Jian Li556709c2021-02-03 17:54:28 +090084import static org.onosproject.kubevirtnode.api.Constants.TENANT_BRIDGE_PREFIX;
Jian Li4fe40e52021-01-06 03:29:58 +090085import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
86import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_TO_INTEGRATION;
87import static org.onosproject.kubevirtnode.api.Constants.VXLAN;
88import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
89import static org.onosproject.kubevirtnode.api.KubevirtNodeState.COMPLETE;
90import static org.onosproject.kubevirtnode.api.KubevirtNodeState.DEVICE_CREATED;
91import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INCOMPLETE;
92import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
93import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
94import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
95import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT;
96import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
97import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.addOrRemoveSystemInterface;
98import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getBooleanProperty;
99import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getOvsdbClient;
100import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.isOvsdbConnected;
101import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.structurePortName;
102import static org.onosproject.net.AnnotationKeys.PORT_NAME;
103import static org.slf4j.LoggerFactory.getLogger;
104
105/**
106 * Service bootstraps kubernetes node based on its type.
107 */
108@Component(immediate = true,
109 property = {
110 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
111 AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
112 }
113)
114public class DefaultKubevirtNodeHandler implements KubevirtNodeHandler {
115
116 private final Logger log = getLogger(getClass());
117
118 private static final String DEFAULT_OF_PROTO = "tcp";
119 private static final int DEFAULT_OFPORT = 6653;
120 private static final int DPID_BEGIN = 3;
121 private static final int NETWORK_BEGIN = 3;
122 private static final long SLEEP_SHORT_MS = 1000; // we wait 1s
123 private static final long SLEEP_LONG_MS = 2000; // we wait 2s
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
126 protected CoreService coreService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
129 protected LeadershipService leadershipService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
132 protected ClusterService clusterService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
135 protected DeviceService deviceService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
138 protected DeviceAdminService deviceAdminService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
141 protected OvsdbController ovsdbController;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY)
144 protected KubevirtNodeAdminService nodeAdminService;
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li72f3dac2021-01-28 16:14:54 +0900147 protected KubevirtApiConfigService apiConfigService;
148
149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4fe40e52021-01-06 03:29:58 +0900150 protected ComponentConfigService componentConfigService;
151
152 /** OVSDB server listen port. */
153 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
154
155 /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
156 private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
157
158 private final ExecutorService eventExecutor = newSingleThreadExecutor(
159 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
160
161 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
162 private final DeviceListener bridgeListener = new InternalBridgeListener();
163 private final KubevirtNodeListener kubevirtNodeListener = new InternalKubevirtNodeListener();
164
165 private ApplicationId appId;
166 private NodeId localNode;
167
168 @Activate
169 protected void activate() {
170 appId = coreService.getAppId(APP_ID);
171 localNode = clusterService.getLocalNode().id();
172
173 componentConfigService.registerProperties(getClass());
174 leadershipService.runForLeadership(appId.name());
175 deviceService.addListener(ovsdbListener);
176 deviceService.addListener(bridgeListener);
177 nodeAdminService.addListener(kubevirtNodeListener);
178
179 log.info("Started");
180 }
181
182 @Deactivate
183 protected void deactivate() {
184 nodeAdminService.removeListener(kubevirtNodeListener);
185 deviceService.removeListener(bridgeListener);
186 deviceService.removeListener(ovsdbListener);
187 componentConfigService.unregisterProperties(getClass(), false);
188 leadershipService.withdraw(appId.name());
189 eventExecutor.shutdown();
190
191 log.info("Stopped");
192 }
193
194 @Modified
195 protected void modified(ComponentContext context) {
196 readComponentConfiguration(context);
197
198 log.info("Modified");
199 }
200
201 @Override
202 public void processInitState(KubevirtNode node) {
203 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
204 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
205 return;
206 }
207 if (!deviceService.isAvailable(node.intgBridge())) {
208 createBridge(node, INTEGRATION_BRIDGE, node.intgBridge());
209 }
210
211 if (!deviceService.isAvailable(node.tunBridge())) {
212 createBridge(node, TUNNEL_BRIDGE, node.tunBridge());
213 }
214 }
215
216 @Override
217 public void processDeviceCreatedState(KubevirtNode node) {
218 try {
219 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
220 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
221 return;
222 }
223
224 // create patch ports between integration to other bridges
Jian Li858ccd72021-02-04 17:25:01 +0900225 // for now, we do not directly connect br-int with br-tun,
226 // as br-int only deals with FLAT and VLAN network
227 // createPatchInterfaces(node);
Jian Li4fe40e52021-01-06 03:29:58 +0900228
229 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
230 createVxlanTunnelInterface(node);
231 }
232
233 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
234 createGreTunnelInterface(node);
235 }
236
237 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
238 createGeneveTunnelInterface(node);
239 }
240
241 // provision new physical interfaces on the given node
242 // this includes creating physical bridge, attaching physical port
243 // to physical bridge, adding patch ports to both physical bridge and br-int
244 provisionPhysicalInterfaces(node);
245
246 } catch (Exception e) {
247 log.error("Exception occurred because of {}", e);
248 }
249 }
250
251 @Override
252 public void processCompleteState(KubevirtNode node) {
253 // do something if needed
254 }
255
256 @Override
257 public void processIncompleteState(KubevirtNode node) {
258 // do something if needed
259 }
260
261 @Override
262 public void processOnBoardedState(KubevirtNode node) {
263 // do something if needed
264 }
265
266 /**
267 * Extracts properties from the component configuration context.
268 *
269 * @param context the component context
270 */
271 private void readComponentConfiguration(ComponentContext context) {
272 Dictionary<?, ?> properties = context.getProperties();
273
274 Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
275 if (ovsdbPortConfigured == null) {
276 ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
277 log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
278 } else {
279 ovsdbPortNum = ovsdbPortConfigured;
280 log.info("Configured. OVSDB port is {}", ovsdbPortNum);
281 }
282
283 Boolean autoRecoveryConfigured =
284 getBooleanProperty(properties, AUTO_RECOVERY);
285 if (autoRecoveryConfigured == null) {
286 autoRecovery = AUTO_RECOVERY_DEFAULT;
287 log.info("Auto recovery flag is NOT " +
288 "configured, default value is {}", autoRecovery);
289 } else {
290 autoRecovery = autoRecoveryConfigured;
291 log.info("Configured. Auto recovery flag is {}", autoRecovery);
292 }
293 }
294
295 /**
296 * Creates a bridge with a given name on a given kubernetes node.
297 *
298 * @param node kubevirt node
299 * @param bridgeName bridge name
300 * @param devId device identifier
301 */
302 private void createBridge(KubevirtNode node, String bridgeName, DeviceId devId) {
303 Device device = deviceService.getDevice(node.ovsdb());
304
Jian Li72f3dac2021-01-28 16:14:54 +0900305 IpAddress serverIp = apiConfigService.apiConfig().ipAddress();
306 ControllerInfo controlInfo = new ControllerInfo(serverIp, DEFAULT_OFPORT, DEFAULT_OF_PROTO);
307 List<ControllerInfo> controllers = Lists.newArrayList(controlInfo);
Jian Li4fe40e52021-01-06 03:29:58 +0900308
309 String dpid = devId.toString().substring(DPID_BEGIN);
310
311 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
312 .name(bridgeName)
313 .failMode(BridgeDescription.FailMode.SECURE)
314 .datapathId(dpid)
315 .disableInBand()
316 .controllers(controllers);
317
318 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
319 bridgeConfig.addBridge(builder.build());
320 }
321
322 /**
323 * Creates a VXLAN tunnel interface in a given kubevirt node.
324 *
325 * @param node kubevirt node
326 */
327 private void createVxlanTunnelInterface(KubevirtNode node) {
328 createTunnelInterface(node, VXLAN, VXLAN);
329 }
330
331 /**
332 * Creates a GRE tunnel interface in a given kubevirt node.
333 *
334 * @param node kubevirt node
335 */
336 private void createGreTunnelInterface(KubevirtNode node) {
337 createTunnelInterface(node, GRE, GRE);
338 }
339
340 /**
341 * Creates a GENEVE tunnel interface in a given kubevirt node.
342 *
343 * @param node kubevirt node
344 */
345 private void createGeneveTunnelInterface(KubevirtNode node) {
346 createTunnelInterface(node, GENEVE, GENEVE);
347 }
348
349 /**
350 * Creates a tunnel interface in a given kubernetes node.
351 *
352 * @param node kubevirt node
353 */
354 private void createTunnelInterface(KubevirtNode node,
355 String type, String intfName) {
356 if (isIntfEnabled(node, intfName)) {
357 return;
358 }
359
360 Device device = deviceService.getDevice(node.ovsdb());
361 if (device == null || !device.is(InterfaceConfig.class)) {
362 log.error("Failed to create tunnel interface on {}", node.ovsdb());
363 return;
364 }
365
366 TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
367
368 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
369 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
370 }
371
372 /**
373 * Builds tunnel description according to the network type.
374 *
375 * @param type network type
376 * @return tunnel description
377 */
378 private TunnelDescription buildTunnelDesc(String type, String intfName) {
379 TunnelKey<String> key = new TunnelKey<>(FLOW_KEY);
380 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
381 TunnelDescription.Builder tdBuilder =
382 DefaultTunnelDescription.builder()
383 .deviceId(TUNNEL_BRIDGE)
384 .ifaceName(intfName)
385 .remote(TunnelEndPoints.flowTunnelEndpoint())
386 .key(key);
387
388 switch (type) {
389 case VXLAN:
390 tdBuilder.type(TunnelDescription.Type.VXLAN);
391 break;
392 case GRE:
393 tdBuilder.type(TunnelDescription.Type.GRE);
394 break;
395 case GENEVE:
396 tdBuilder.type(TunnelDescription.Type.GENEVE);
397 break;
398 default:
399 return null;
400 }
401
402 return tdBuilder.build();
403 }
404 return null;
405 }
406
407 /**
408 * Checks whether a given network interface in a given kubernetes node
409 * is enabled or not.
410 *
411 * @param node kubevirt node
412 * @param intf network interface name
413 * @return true if the given interface is enabled, false otherwise
414 */
415 private boolean isIntfEnabled(KubevirtNode node, String intf) {
416 return deviceService.isAvailable(node.tunBridge()) &&
417 deviceService.getPorts(node.tunBridge()).stream()
418 .anyMatch(port -> Objects.equals(
419 port.annotations().value(PORT_NAME), intf) &&
420 port.isEnabled());
421 }
422
423 private void createPatchInterfaces(KubevirtNode node) {
424 Device device = deviceService.getDevice(node.ovsdb());
425 if (device == null || !device.is(InterfaceConfig.class)) {
426 log.error("Failed to create patch interface on {}", node.ovsdb());
427 return;
428 }
429
430 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
431
432 // integration bridge -> tunnel bridge
433 PatchDescription brIntTunPatchDesc =
434 DefaultPatchDescription.builder()
435 .deviceId(INTEGRATION_BRIDGE)
436 .ifaceName(INTEGRATION_TO_TUNNEL)
437 .peer(TUNNEL_TO_INTEGRATION)
438 .build();
439
440 ifaceConfig.addPatchMode(INTEGRATION_TO_TUNNEL, brIntTunPatchDesc);
441
442 // tunnel bridge -> integration bridge
443 PatchDescription brTunIntPatchDesc =
444 DefaultPatchDescription.builder()
445 .deviceId(TUNNEL_BRIDGE)
446 .ifaceName(TUNNEL_TO_INTEGRATION)
447 .peer(INTEGRATION_TO_TUNNEL)
448 .build();
449
450 ifaceConfig.addPatchMode(TUNNEL_TO_INTEGRATION, brTunIntPatchDesc);
451 }
452
453 /**
454 * Bootstraps a new kubevirt node.
455 *
456 * @param node kubevirt node
457 */
458 private void bootstrapNode(KubevirtNode node) {
459 if (isCurrentStateDone(node)) {
460 setState(node, node.state().nextState());
461 } else {
462 log.trace("Processing {} state for {}", node.state(), node.hostname());
463 node.state().process(this, node);
464 }
465 }
466
467 /**
468 * Removes the existing kubevirt node.
469 *
470 * @param node kubevirt node
471 */
472 private void removeNode(KubevirtNode node) {
473 OvsdbClientService client = getOvsdbClient(node, ovsdbPortNum, ovsdbController);
474 if (client == null) {
475 log.info("Failed to get ovsdb client");
476 return;
477 }
478
479 // unprovision physical interfaces from the node
480 // this procedure includes detaching physical port from physical bridge,
481 // remove patch ports from br-int, removing physical bridge
482 unprovisionPhysicalInterfaces(node);
483
484 // delete tunnel bridge from the node
485 client.dropBridge(TUNNEL_BRIDGE);
486
487 // delete integration bridge from the node
488 client.dropBridge(INTEGRATION_BRIDGE);
489 }
490
491 /**
492 * Checks whether all requirements for this state are fulfilled or not.
493 *
494 * @param node kubevirt node
495 * @return true if all requirements are fulfilled, false otherwise
496 */
497 private boolean isCurrentStateDone(KubevirtNode node) {
498 switch (node.state()) {
499 case INIT:
500 return isInitStateDone(node);
501 case DEVICE_CREATED:
502 return isDeviceCreatedStateDone(node);
503 case COMPLETE:
504 case INCOMPLETE:
505 case ON_BOARDED:
506 // always return false
507 // run init CLI to re-trigger node bootstrap
508 return false;
509 default:
510 return true;
511 }
512 }
513
514 private boolean isInitStateDone(KubevirtNode node) {
515 if (!isOvsdbConnected(node, ovsdbPortNum,
516 ovsdbController, deviceService)) {
517 return false;
518 }
519
520 try {
521 // we need to wait a while, in case interfaces and bridges
522 // creation requires some time
523 sleep(SLEEP_SHORT_MS);
524 } catch (InterruptedException e) {
525 log.error("Exception caused during init state checking...");
526 }
527
528 cleanPhysicalInterfaces(node);
529
530 return node.intgBridge() != null && node.tunBridge() != null &&
531 deviceService.isAvailable(node.intgBridge()) &&
532 deviceService.isAvailable(node.tunBridge());
533 }
534
535 private boolean isDeviceCreatedStateDone(KubevirtNode node) {
536
537 try {
538 // we need to wait a while, in case tunneling ports
539 // creation requires some time
540 sleep(SLEEP_LONG_MS);
541 } catch (InterruptedException e) {
542 log.error("Exception caused during init state checking...");
543 }
544
545 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
546 return false;
547 }
548 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
549 return false;
550 }
551 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
552 return false;
553 }
554
555 for (KubevirtPhyInterface phyIntf : node.phyIntfs()) {
556 if (phyIntf == null) {
557 return false;
558 }
559
560 String bridgeName = BRIDGE_PREFIX + phyIntf.network();
561 String patchPortName = structurePortName(
562 INTEGRATION_TO_PHYSICAL_PREFIX + phyIntf.network());
563
564 if (!(hasPhyBridge(node, bridgeName) &&
565 hasPhyPatchPort(node, patchPortName) &&
566 hasPhyIntf(node, phyIntf.intf()))) {
567 return false;
568 }
569 }
570
571 return true;
572 }
573
574 /**
575 * Configures the kubernetes node with new state.
576 *
577 * @param node kubevirt node
578 * @param newState a new state
579 */
580 private void setState(KubevirtNode node, KubevirtNodeState newState) {
581 if (node.state() == newState) {
582 return;
583 }
584 KubevirtNode updated = node.updateState(newState);
585 nodeAdminService.updateNode(updated);
586 log.info("Changed {} state: {}", node.hostname(), newState);
587 }
588
589 private void provisionPhysicalInterfaces(KubevirtNode node) {
590 node.phyIntfs().forEach(pi -> {
591 String bridgeName = BRIDGE_PREFIX + pi.network();
592 String patchPortName =
593 structurePortName(INTEGRATION_TO_PHYSICAL_PREFIX + pi.network());
594
595 if (!hasPhyBridge(node, bridgeName)) {
596 createPhysicalBridge(node, pi);
597 createPhysicalPatchPorts(node, pi);
598 attachPhysicalPort(node, pi);
599 } else {
600 // in case physical bridge exists, but patch port is missing on br-int,
601 // we will add patch port to connect br-int with physical bridge
602 if (!hasPhyPatchPort(node, patchPortName)) {
603 createPhysicalPatchPorts(node, pi);
604 }
605 }
606 });
607 }
608
609 private void cleanPhysicalInterfaces(KubevirtNode node) {
610 Device device = deviceService.getDevice(node.ovsdb());
611
612 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
613
614 Set<String> bridgeNames = bridgeConfig.getBridges().stream()
615 .map(BridgeDescription::name).collect(Collectors.toSet());
616
617 Set<String> phyNetworkNames = node.phyIntfs().stream()
618 .map(pi -> BRIDGE_PREFIX + pi.network()).collect(Collectors.toSet());
619
620 // we remove existing physical bridges and patch ports, if the physical
621 // bridges are not defined in kubevirt node
622 for (String brName : bridgeNames) {
623 if (!phyNetworkNames.contains(brName)) {
624 // integration bridge and tunnel bridge should NOT be treated as
625 // physical bridges
Jian Li556709c2021-02-03 17:54:28 +0900626 if (brName.equals(INTEGRATION_BRIDGE) ||
627 brName.equals(TUNNEL_BRIDGE) ||
628 brName.startsWith(TENANT_BRIDGE_PREFIX)) {
Jian Li4fe40e52021-01-06 03:29:58 +0900629 continue;
630 }
631 removePhysicalPatchPorts(node, brName.substring(NETWORK_BEGIN));
632 removePhysicalBridge(node, brName.substring(NETWORK_BEGIN));
633 }
634 }
635 }
636
637 private void unprovisionPhysicalInterfaces(KubevirtNode node) {
638 node.phyIntfs().forEach(pi -> {
639 detachPhysicalPort(node, pi.network(), pi.intf());
640 removePhysicalPatchPorts(node, pi.network());
641 removePhysicalBridge(node, pi.network());
642 });
643 }
644
645 private boolean hasPhyBridge(KubevirtNode node, String bridgeName) {
646 BridgeConfig bridgeConfig =
647 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
648 return bridgeConfig.getBridges().stream()
649 .anyMatch(br -> br.name().equals(bridgeName));
650 }
651
652 private boolean hasPhyPatchPort(KubevirtNode node, String patchPortName) {
653 List<Port> ports = deviceService.getPorts(node.intgBridge());
654 return ports.stream().anyMatch(p ->
655 p.annotations().value(PORT_NAME).equals(patchPortName));
656 }
657
658 private boolean hasPhyIntf(KubevirtNode node, String intfName) {
659 BridgeConfig bridgeConfig =
660 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
661 return bridgeConfig.getPorts().stream()
662 .anyMatch(p -> p.annotations().value(PORT_NAME).equals(intfName));
663 }
664
665 private void createPhysicalBridge(KubevirtNode osNode,
666 KubevirtPhyInterface phyInterface) {
667 Device device = deviceService.getDevice(osNode.ovsdb());
668
669 String bridgeName = BRIDGE_PREFIX + phyInterface.network();
670
671 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
672 .name(bridgeName)
673 .mcastSnoopingEnable();
674
675 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
676 bridgeConfig.addBridge(builder.build());
677 }
678
679 private void removePhysicalBridge(KubevirtNode node, String network) {
680 Device device = deviceService.getDevice(node.ovsdb());
681
682 BridgeName bridgeName = BridgeName.bridgeName(BRIDGE_PREFIX + network);
683
684 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
685 bridgeConfig.deleteBridge(bridgeName);
686 }
687
688 private void createPhysicalPatchPorts(KubevirtNode node,
689 KubevirtPhyInterface phyInterface) {
690 Device device = deviceService.getDevice(node.ovsdb());
691
692 if (device == null || !device.is(InterfaceConfig.class)) {
693 log.error("Failed to create patch interface on {}", node.ovsdb());
694 return;
695 }
696
697 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
698
699 String intToPhyPatchPort = structurePortName(
700 INTEGRATION_TO_PHYSICAL_PREFIX + phyInterface.network());
701 String phyToIntPatchPort = structurePortName(
702 phyInterface.network() + PHYSICAL_TO_INTEGRATION_SUFFIX);
703
704 // integration bridge -> physical bridge
705 PatchDescription intToPhyPatchDesc =
706 DefaultPatchDescription.builder()
707 .deviceId(INTEGRATION_BRIDGE)
708 .ifaceName(intToPhyPatchPort)
709 .peer(phyToIntPatchPort)
710 .build();
711
712 // physical bridge -> integration bridge
713 PatchDescription phyToIntPatchDesc =
714 DefaultPatchDescription.builder()
715 .deviceId(physicalDeviceId)
716 .ifaceName(phyToIntPatchPort)
717 .peer(intToPhyPatchPort)
718 .build();
719
720 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
721 ifaceConfig.addPatchMode(INTEGRATION_TO_PHYSICAL_PREFIX +
722 phyInterface.network(), intToPhyPatchDesc);
723 ifaceConfig.addPatchMode(phyInterface.network() +
724 PHYSICAL_TO_INTEGRATION_SUFFIX, phyToIntPatchDesc);
725
726 addOrRemoveSystemInterface(node, physicalDeviceId,
727 phyInterface.intf(), deviceService, true);
728 }
729
730 private void removePhysicalPatchPorts(KubevirtNode node, String network) {
731 Device device = deviceService.getDevice(node.ovsdb());
732
733 if (device == null || !device.is(InterfaceConfig.class)) {
734 log.error("Failed to remove patch interface on {}", node.ovsdb());
735 return;
736 }
737
738 String intToPhyPatchPort = structurePortName(
739 INTEGRATION_TO_PHYSICAL_PREFIX + network);
740
741 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
742 ifaceConfig.removePatchMode(intToPhyPatchPort);
743 }
744
745 private void attachPhysicalPort(KubevirtNode node,
746 KubevirtPhyInterface phyInterface) {
747
748 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
749
750 addOrRemoveSystemInterface(node, physicalDeviceId,
751 phyInterface.intf(), deviceService, true);
752 }
753
754 private void detachPhysicalPort(KubevirtNode node, String network, String portName) {
755 String physicalDeviceId = BRIDGE_PREFIX + network;
756
757 addOrRemoveSystemInterface(node, physicalDeviceId, portName, deviceService, false);
758 }
759
760 /**
761 * An internal OVSDB listener. This listener is used for listening the
762 * network facing events from OVSDB device. If a new OVSDB device is detected,
763 * ONOS tries to bootstrap the kubernetes node.
764 */
765 private class InternalOvsdbListener implements DeviceListener {
766
767 @Override
768 public boolean isRelevant(DeviceEvent event) {
769 return event.subject().type() == Device.Type.CONTROLLER;
770 }
771
772 private boolean isRelevantHelper() {
773 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
774 }
775
776 @Override
777 public void event(DeviceEvent event) {
778 Device device = event.subject();
779
780 switch (event.type()) {
781 case DEVICE_AVAILABILITY_CHANGED:
782 case DEVICE_ADDED:
783 eventExecutor.execute(() -> {
784
785 if (!isRelevantHelper()) {
786 return;
787 }
788
789 KubevirtNode node = nodeAdminService.node(device.id());
790
791 if (node == null) {
792 return;
793 }
794
795 if (deviceService.isAvailable(device.id())) {
796 log.debug("OVSDB {} detected", device.id());
797 bootstrapNode(node);
798 }
799 });
800 break;
801 case PORT_ADDED:
802 case PORT_REMOVED:
803 case DEVICE_REMOVED:
804 default:
805 // do nothing
806 break;
807 }
808 }
809 }
810
811 /**
812 * An internal integration bridge listener. This listener is used for
813 * listening the events from integration bridge. To listen the events from
814 * other types of bridge such as provider bridge or tunnel bridge, we need
815 * to augment KubevirtNodeService.node() method.
816 */
817 private class InternalBridgeListener implements DeviceListener {
818
819 @Override
820 public boolean isRelevant(DeviceEvent event) {
821 return event.subject().type() == Device.Type.SWITCH;
822 }
823
824 private boolean isRelevantHelper() {
825 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
826 }
827
828 @Override
829 public void event(DeviceEvent event) {
830 Device device = event.subject();
831 Port port = event.port();
832
833 switch (event.type()) {
834 case DEVICE_AVAILABILITY_CHANGED:
835 case DEVICE_ADDED:
836 eventExecutor.execute(() -> processDeviceAddition(device));
837 break;
838 case PORT_UPDATED:
839 case PORT_ADDED:
840 eventExecutor.execute(() -> processPortAddition(device, port));
841 break;
842 case PORT_REMOVED:
843 eventExecutor.execute(() -> processPortRemoval(device, port));
844 break;
845 case DEVICE_REMOVED:
846 default:
847 // do nothing
848 break;
849 }
850 }
851
852 void processDeviceAddition(Device device) {
853 if (!isRelevantHelper()) {
854 return;
855 }
856
857 KubevirtNode node = nodeAdminService.node(device.id());
858
859 if (node == null) {
860 return;
861 }
862
863 if (deviceService.isAvailable(device.id())) {
864 log.debug("Bridge created on {}", node.hostname());
865 bootstrapNode(node);
866 } else if (node.state() == COMPLETE) {
867 log.info("Device {} disconnected", device.id());
868 setState(node, INCOMPLETE);
869 }
870
871 if (autoRecovery) {
872 if (node.state() == INCOMPLETE || node.state() == DEVICE_CREATED) {
873 log.info("Device {} is reconnected", device.id());
874 nodeAdminService.updateNode(node.updateState(INIT));
875 }
876 }
877 }
878
879 void processPortAddition(Device device, Port port) {
880 if (!isRelevantHelper()) {
881 return;
882 }
883
884 KubevirtNode node = nodeAdminService.nodeByTunBridge(device.id());
885
886 if (node == null) {
887 return;
888 }
889
890 String portName = port.annotations().value(PORT_NAME);
891 if (node.state() == DEVICE_CREATED && (
892 Objects.equals(portName, VXLAN) ||
893 Objects.equals(portName, GRE) ||
894 Objects.equals(portName, GENEVE))) {
895 log.info("Interface {} added or updated to {}",
896 portName, device.id());
897 bootstrapNode(node);
898 }
899 }
900
901 void processPortRemoval(Device device, Port port) {
902 if (!isRelevantHelper()) {
903 return;
904 }
905
906 KubevirtNode node = nodeAdminService.node(device.id());
907
908 if (node == null) {
909 return;
910 }
911
912 String portName = port.annotations().value(PORT_NAME);
913 if (node.state() == COMPLETE && (
914 Objects.equals(portName, VXLAN) ||
915 Objects.equals(portName, GRE) ||
916 Objects.equals(portName, GENEVE))) {
917 log.warn("Interface {} removed from {}", portName, device.id());
918 setState(node, INCOMPLETE);
919 }
920 }
921 }
922
923 /**
924 * An internal kubevirt node listener.
925 * The notification is triggered by KubevirtNodeStore.
926 */
927 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
928
929 private boolean isRelevantHelper() {
930 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
931 }
932
933 @Override
934 public void event(KubevirtNodeEvent event) {
935 switch (event.type()) {
936 case KUBEVIRT_NODE_CREATED:
937 case KUBEVIRT_NODE_UPDATED:
938 eventExecutor.execute(() -> {
939 if (!isRelevantHelper()) {
940 return;
941 }
942 bootstrapNode(event.subject());
943 });
944 break;
945 case KUBEVIRT_NODE_REMOVED:
946 eventExecutor.execute(() -> {
947 if (!isRelevantHelper()) {
948 return;
949 }
950 removeNode(event.subject());
951 });
952 break;
953 case KUBEVIRT_NODE_INCOMPLETE:
954 default:
955 break;
956 }
957 }
958 }
959}