blob: 903166686a2220bc9cc6dd68820233b7b9c3eee3 [file] [log] [blame]
Jian Li4fe40e52021-01-06 03:29:58 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
Jian Li72f3dac2021-01-28 16:14:54 +090018import com.google.common.collect.Lists;
19import org.onlab.packet.IpAddress;
Jian Li4fe40e52021-01-06 03:29:58 +090020import org.onlab.util.Tools;
21import org.onosproject.cfg.ComponentConfigService;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
Jian Li72f3dac2021-01-28 16:14:54 +090027import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
Jian Li4fe40e52021-01-06 03:29:58 +090028import org.onosproject.kubevirtnode.api.KubevirtNode;
29import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
30import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
31import org.onosproject.kubevirtnode.api.KubevirtNodeHandler;
32import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
33import org.onosproject.kubevirtnode.api.KubevirtNodeState;
34import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
35import org.onosproject.net.Device;
36import org.onosproject.net.DeviceId;
37import org.onosproject.net.Port;
38import org.onosproject.net.behaviour.BridgeConfig;
39import org.onosproject.net.behaviour.BridgeDescription;
40import org.onosproject.net.behaviour.BridgeName;
41import org.onosproject.net.behaviour.ControllerInfo;
42import org.onosproject.net.behaviour.DefaultBridgeDescription;
43import org.onosproject.net.behaviour.DefaultPatchDescription;
44import org.onosproject.net.behaviour.DefaultTunnelDescription;
45import org.onosproject.net.behaviour.InterfaceConfig;
46import org.onosproject.net.behaviour.PatchDescription;
47import org.onosproject.net.behaviour.TunnelDescription;
48import org.onosproject.net.behaviour.TunnelEndPoints;
49import org.onosproject.net.behaviour.TunnelKey;
50import org.onosproject.net.device.DeviceAdminService;
51import org.onosproject.net.device.DeviceEvent;
52import org.onosproject.net.device.DeviceListener;
53import org.onosproject.net.device.DeviceService;
54import org.onosproject.ovsdb.controller.OvsdbClientService;
55import org.onosproject.ovsdb.controller.OvsdbController;
56import org.osgi.service.component.ComponentContext;
57import org.osgi.service.component.annotations.Activate;
58import org.osgi.service.component.annotations.Component;
59import org.osgi.service.component.annotations.Deactivate;
60import org.osgi.service.component.annotations.Modified;
61import org.osgi.service.component.annotations.Reference;
62import org.osgi.service.component.annotations.ReferenceCardinality;
63import org.slf4j.Logger;
64
65import java.util.Dictionary;
66import java.util.List;
67import java.util.Objects;
68import java.util.Set;
69import java.util.concurrent.ExecutorService;
70import java.util.stream.Collectors;
71
72import static java.lang.Thread.sleep;
73import static java.util.concurrent.Executors.newSingleThreadExecutor;
74import static org.onlab.packet.TpPort.tpPort;
75import static org.onlab.util.Tools.groupedThreads;
76import static org.onosproject.kubevirtnode.api.Constants.BRIDGE_PREFIX;
77import static org.onosproject.kubevirtnode.api.Constants.FLOW_KEY;
78import static org.onosproject.kubevirtnode.api.Constants.GENEVE;
79import static org.onosproject.kubevirtnode.api.Constants.GRE;
80import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_BRIDGE;
81import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_PHYSICAL_PREFIX;
82import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_TUNNEL;
83import static org.onosproject.kubevirtnode.api.Constants.PHYSICAL_TO_INTEGRATION_SUFFIX;
Jian Li556709c2021-02-03 17:54:28 +090084import static org.onosproject.kubevirtnode.api.Constants.TENANT_BRIDGE_PREFIX;
Jian Li4fe40e52021-01-06 03:29:58 +090085import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
86import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_TO_INTEGRATION;
87import static org.onosproject.kubevirtnode.api.Constants.VXLAN;
88import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
89import static org.onosproject.kubevirtnode.api.KubevirtNodeState.COMPLETE;
90import static org.onosproject.kubevirtnode.api.KubevirtNodeState.DEVICE_CREATED;
91import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INCOMPLETE;
92import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
93import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
94import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
95import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT;
96import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
97import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.addOrRemoveSystemInterface;
98import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getBooleanProperty;
99import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getOvsdbClient;
100import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.isOvsdbConnected;
101import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.structurePortName;
102import static org.onosproject.net.AnnotationKeys.PORT_NAME;
103import static org.slf4j.LoggerFactory.getLogger;
104
105/**
106 * Service bootstraps kubernetes node based on its type.
107 */
108@Component(immediate = true,
109 property = {
110 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
111 AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
112 }
113)
114public class DefaultKubevirtNodeHandler implements KubevirtNodeHandler {
115
116 private final Logger log = getLogger(getClass());
117
118 private static final String DEFAULT_OF_PROTO = "tcp";
119 private static final int DEFAULT_OFPORT = 6653;
120 private static final int DPID_BEGIN = 3;
121 private static final int NETWORK_BEGIN = 3;
122 private static final long SLEEP_SHORT_MS = 1000; // we wait 1s
123 private static final long SLEEP_LONG_MS = 2000; // we wait 2s
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
126 protected CoreService coreService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
129 protected LeadershipService leadershipService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
132 protected ClusterService clusterService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
135 protected DeviceService deviceService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
138 protected DeviceAdminService deviceAdminService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
141 protected OvsdbController ovsdbController;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY)
144 protected KubevirtNodeAdminService nodeAdminService;
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li72f3dac2021-01-28 16:14:54 +0900147 protected KubevirtApiConfigService apiConfigService;
148
149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4fe40e52021-01-06 03:29:58 +0900150 protected ComponentConfigService componentConfigService;
151
152 /** OVSDB server listen port. */
153 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
154
155 /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
156 private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
157
158 private final ExecutorService eventExecutor = newSingleThreadExecutor(
159 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
160
161 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
162 private final DeviceListener bridgeListener = new InternalBridgeListener();
163 private final KubevirtNodeListener kubevirtNodeListener = new InternalKubevirtNodeListener();
164
165 private ApplicationId appId;
166 private NodeId localNode;
167
168 @Activate
169 protected void activate() {
170 appId = coreService.getAppId(APP_ID);
171 localNode = clusterService.getLocalNode().id();
172
173 componentConfigService.registerProperties(getClass());
174 leadershipService.runForLeadership(appId.name());
175 deviceService.addListener(ovsdbListener);
176 deviceService.addListener(bridgeListener);
177 nodeAdminService.addListener(kubevirtNodeListener);
178
179 log.info("Started");
180 }
181
182 @Deactivate
183 protected void deactivate() {
184 nodeAdminService.removeListener(kubevirtNodeListener);
185 deviceService.removeListener(bridgeListener);
186 deviceService.removeListener(ovsdbListener);
187 componentConfigService.unregisterProperties(getClass(), false);
188 leadershipService.withdraw(appId.name());
189 eventExecutor.shutdown();
190
191 log.info("Stopped");
192 }
193
194 @Modified
195 protected void modified(ComponentContext context) {
196 readComponentConfiguration(context);
197
198 log.info("Modified");
199 }
200
201 @Override
202 public void processInitState(KubevirtNode node) {
203 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
204 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
205 return;
206 }
207 if (!deviceService.isAvailable(node.intgBridge())) {
208 createBridge(node, INTEGRATION_BRIDGE, node.intgBridge());
209 }
210
211 if (!deviceService.isAvailable(node.tunBridge())) {
212 createBridge(node, TUNNEL_BRIDGE, node.tunBridge());
213 }
214 }
215
216 @Override
217 public void processDeviceCreatedState(KubevirtNode node) {
218 try {
219 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
220 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
221 return;
222 }
223
224 // create patch ports between integration to other bridges
225 createPatchInterfaces(node);
226
227 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
228 createVxlanTunnelInterface(node);
229 }
230
231 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
232 createGreTunnelInterface(node);
233 }
234
235 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
236 createGeneveTunnelInterface(node);
237 }
238
239 // provision new physical interfaces on the given node
240 // this includes creating physical bridge, attaching physical port
241 // to physical bridge, adding patch ports to both physical bridge and br-int
242 provisionPhysicalInterfaces(node);
243
244 } catch (Exception e) {
245 log.error("Exception occurred because of {}", e);
246 }
247 }
248
249 @Override
250 public void processCompleteState(KubevirtNode node) {
251 // do something if needed
252 }
253
254 @Override
255 public void processIncompleteState(KubevirtNode node) {
256 // do something if needed
257 }
258
259 @Override
260 public void processOnBoardedState(KubevirtNode node) {
261 // do something if needed
262 }
263
264 /**
265 * Extracts properties from the component configuration context.
266 *
267 * @param context the component context
268 */
269 private void readComponentConfiguration(ComponentContext context) {
270 Dictionary<?, ?> properties = context.getProperties();
271
272 Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
273 if (ovsdbPortConfigured == null) {
274 ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
275 log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
276 } else {
277 ovsdbPortNum = ovsdbPortConfigured;
278 log.info("Configured. OVSDB port is {}", ovsdbPortNum);
279 }
280
281 Boolean autoRecoveryConfigured =
282 getBooleanProperty(properties, AUTO_RECOVERY);
283 if (autoRecoveryConfigured == null) {
284 autoRecovery = AUTO_RECOVERY_DEFAULT;
285 log.info("Auto recovery flag is NOT " +
286 "configured, default value is {}", autoRecovery);
287 } else {
288 autoRecovery = autoRecoveryConfigured;
289 log.info("Configured. Auto recovery flag is {}", autoRecovery);
290 }
291 }
292
293 /**
294 * Creates a bridge with a given name on a given kubernetes node.
295 *
296 * @param node kubevirt node
297 * @param bridgeName bridge name
298 * @param devId device identifier
299 */
300 private void createBridge(KubevirtNode node, String bridgeName, DeviceId devId) {
301 Device device = deviceService.getDevice(node.ovsdb());
302
Jian Li72f3dac2021-01-28 16:14:54 +0900303 IpAddress serverIp = apiConfigService.apiConfig().ipAddress();
304 ControllerInfo controlInfo = new ControllerInfo(serverIp, DEFAULT_OFPORT, DEFAULT_OF_PROTO);
305 List<ControllerInfo> controllers = Lists.newArrayList(controlInfo);
Jian Li4fe40e52021-01-06 03:29:58 +0900306
307 String dpid = devId.toString().substring(DPID_BEGIN);
308
309 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
310 .name(bridgeName)
311 .failMode(BridgeDescription.FailMode.SECURE)
312 .datapathId(dpid)
313 .disableInBand()
314 .controllers(controllers);
315
316 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
317 bridgeConfig.addBridge(builder.build());
318 }
319
320 /**
321 * Creates a VXLAN tunnel interface in a given kubevirt node.
322 *
323 * @param node kubevirt node
324 */
325 private void createVxlanTunnelInterface(KubevirtNode node) {
326 createTunnelInterface(node, VXLAN, VXLAN);
327 }
328
329 /**
330 * Creates a GRE tunnel interface in a given kubevirt node.
331 *
332 * @param node kubevirt node
333 */
334 private void createGreTunnelInterface(KubevirtNode node) {
335 createTunnelInterface(node, GRE, GRE);
336 }
337
338 /**
339 * Creates a GENEVE tunnel interface in a given kubevirt node.
340 *
341 * @param node kubevirt node
342 */
343 private void createGeneveTunnelInterface(KubevirtNode node) {
344 createTunnelInterface(node, GENEVE, GENEVE);
345 }
346
347 /**
348 * Creates a tunnel interface in a given kubernetes node.
349 *
350 * @param node kubevirt node
351 */
352 private void createTunnelInterface(KubevirtNode node,
353 String type, String intfName) {
354 if (isIntfEnabled(node, intfName)) {
355 return;
356 }
357
358 Device device = deviceService.getDevice(node.ovsdb());
359 if (device == null || !device.is(InterfaceConfig.class)) {
360 log.error("Failed to create tunnel interface on {}", node.ovsdb());
361 return;
362 }
363
364 TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
365
366 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
367 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
368 }
369
370 /**
371 * Builds tunnel description according to the network type.
372 *
373 * @param type network type
374 * @return tunnel description
375 */
376 private TunnelDescription buildTunnelDesc(String type, String intfName) {
377 TunnelKey<String> key = new TunnelKey<>(FLOW_KEY);
378 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
379 TunnelDescription.Builder tdBuilder =
380 DefaultTunnelDescription.builder()
381 .deviceId(TUNNEL_BRIDGE)
382 .ifaceName(intfName)
383 .remote(TunnelEndPoints.flowTunnelEndpoint())
384 .key(key);
385
386 switch (type) {
387 case VXLAN:
388 tdBuilder.type(TunnelDescription.Type.VXLAN);
389 break;
390 case GRE:
391 tdBuilder.type(TunnelDescription.Type.GRE);
392 break;
393 case GENEVE:
394 tdBuilder.type(TunnelDescription.Type.GENEVE);
395 break;
396 default:
397 return null;
398 }
399
400 return tdBuilder.build();
401 }
402 return null;
403 }
404
405 /**
406 * Checks whether a given network interface in a given kubernetes node
407 * is enabled or not.
408 *
409 * @param node kubevirt node
410 * @param intf network interface name
411 * @return true if the given interface is enabled, false otherwise
412 */
413 private boolean isIntfEnabled(KubevirtNode node, String intf) {
414 return deviceService.isAvailable(node.tunBridge()) &&
415 deviceService.getPorts(node.tunBridge()).stream()
416 .anyMatch(port -> Objects.equals(
417 port.annotations().value(PORT_NAME), intf) &&
418 port.isEnabled());
419 }
420
421 private void createPatchInterfaces(KubevirtNode node) {
422 Device device = deviceService.getDevice(node.ovsdb());
423 if (device == null || !device.is(InterfaceConfig.class)) {
424 log.error("Failed to create patch interface on {}", node.ovsdb());
425 return;
426 }
427
428 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
429
430 // integration bridge -> tunnel bridge
431 PatchDescription brIntTunPatchDesc =
432 DefaultPatchDescription.builder()
433 .deviceId(INTEGRATION_BRIDGE)
434 .ifaceName(INTEGRATION_TO_TUNNEL)
435 .peer(TUNNEL_TO_INTEGRATION)
436 .build();
437
438 ifaceConfig.addPatchMode(INTEGRATION_TO_TUNNEL, brIntTunPatchDesc);
439
440 // tunnel bridge -> integration bridge
441 PatchDescription brTunIntPatchDesc =
442 DefaultPatchDescription.builder()
443 .deviceId(TUNNEL_BRIDGE)
444 .ifaceName(TUNNEL_TO_INTEGRATION)
445 .peer(INTEGRATION_TO_TUNNEL)
446 .build();
447
448 ifaceConfig.addPatchMode(TUNNEL_TO_INTEGRATION, brTunIntPatchDesc);
449 }
450
451 /**
452 * Bootstraps a new kubevirt node.
453 *
454 * @param node kubevirt node
455 */
456 private void bootstrapNode(KubevirtNode node) {
457 if (isCurrentStateDone(node)) {
458 setState(node, node.state().nextState());
459 } else {
460 log.trace("Processing {} state for {}", node.state(), node.hostname());
461 node.state().process(this, node);
462 }
463 }
464
465 /**
466 * Removes the existing kubevirt node.
467 *
468 * @param node kubevirt node
469 */
470 private void removeNode(KubevirtNode node) {
471 OvsdbClientService client = getOvsdbClient(node, ovsdbPortNum, ovsdbController);
472 if (client == null) {
473 log.info("Failed to get ovsdb client");
474 return;
475 }
476
477 // unprovision physical interfaces from the node
478 // this procedure includes detaching physical port from physical bridge,
479 // remove patch ports from br-int, removing physical bridge
480 unprovisionPhysicalInterfaces(node);
481
482 // delete tunnel bridge from the node
483 client.dropBridge(TUNNEL_BRIDGE);
484
485 // delete integration bridge from the node
486 client.dropBridge(INTEGRATION_BRIDGE);
487 }
488
489 /**
490 * Checks whether all requirements for this state are fulfilled or not.
491 *
492 * @param node kubevirt node
493 * @return true if all requirements are fulfilled, false otherwise
494 */
495 private boolean isCurrentStateDone(KubevirtNode node) {
496 switch (node.state()) {
497 case INIT:
498 return isInitStateDone(node);
499 case DEVICE_CREATED:
500 return isDeviceCreatedStateDone(node);
501 case COMPLETE:
502 case INCOMPLETE:
503 case ON_BOARDED:
504 // always return false
505 // run init CLI to re-trigger node bootstrap
506 return false;
507 default:
508 return true;
509 }
510 }
511
512 private boolean isInitStateDone(KubevirtNode node) {
513 if (!isOvsdbConnected(node, ovsdbPortNum,
514 ovsdbController, deviceService)) {
515 return false;
516 }
517
518 try {
519 // we need to wait a while, in case interfaces and bridges
520 // creation requires some time
521 sleep(SLEEP_SHORT_MS);
522 } catch (InterruptedException e) {
523 log.error("Exception caused during init state checking...");
524 }
525
526 cleanPhysicalInterfaces(node);
527
528 return node.intgBridge() != null && node.tunBridge() != null &&
529 deviceService.isAvailable(node.intgBridge()) &&
530 deviceService.isAvailable(node.tunBridge());
531 }
532
533 private boolean isDeviceCreatedStateDone(KubevirtNode node) {
534
535 try {
536 // we need to wait a while, in case tunneling ports
537 // creation requires some time
538 sleep(SLEEP_LONG_MS);
539 } catch (InterruptedException e) {
540 log.error("Exception caused during init state checking...");
541 }
542
543 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
544 return false;
545 }
546 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
547 return false;
548 }
549 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
550 return false;
551 }
552
553 for (KubevirtPhyInterface phyIntf : node.phyIntfs()) {
554 if (phyIntf == null) {
555 return false;
556 }
557
558 String bridgeName = BRIDGE_PREFIX + phyIntf.network();
559 String patchPortName = structurePortName(
560 INTEGRATION_TO_PHYSICAL_PREFIX + phyIntf.network());
561
562 if (!(hasPhyBridge(node, bridgeName) &&
563 hasPhyPatchPort(node, patchPortName) &&
564 hasPhyIntf(node, phyIntf.intf()))) {
565 return false;
566 }
567 }
568
569 return true;
570 }
571
572 /**
573 * Configures the kubernetes node with new state.
574 *
575 * @param node kubevirt node
576 * @param newState a new state
577 */
578 private void setState(KubevirtNode node, KubevirtNodeState newState) {
579 if (node.state() == newState) {
580 return;
581 }
582 KubevirtNode updated = node.updateState(newState);
583 nodeAdminService.updateNode(updated);
584 log.info("Changed {} state: {}", node.hostname(), newState);
585 }
586
587 private void provisionPhysicalInterfaces(KubevirtNode node) {
588 node.phyIntfs().forEach(pi -> {
589 String bridgeName = BRIDGE_PREFIX + pi.network();
590 String patchPortName =
591 structurePortName(INTEGRATION_TO_PHYSICAL_PREFIX + pi.network());
592
593 if (!hasPhyBridge(node, bridgeName)) {
594 createPhysicalBridge(node, pi);
595 createPhysicalPatchPorts(node, pi);
596 attachPhysicalPort(node, pi);
597 } else {
598 // in case physical bridge exists, but patch port is missing on br-int,
599 // we will add patch port to connect br-int with physical bridge
600 if (!hasPhyPatchPort(node, patchPortName)) {
601 createPhysicalPatchPorts(node, pi);
602 }
603 }
604 });
605 }
606
607 private void cleanPhysicalInterfaces(KubevirtNode node) {
608 Device device = deviceService.getDevice(node.ovsdb());
609
610 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
611
612 Set<String> bridgeNames = bridgeConfig.getBridges().stream()
613 .map(BridgeDescription::name).collect(Collectors.toSet());
614
615 Set<String> phyNetworkNames = node.phyIntfs().stream()
616 .map(pi -> BRIDGE_PREFIX + pi.network()).collect(Collectors.toSet());
617
618 // we remove existing physical bridges and patch ports, if the physical
619 // bridges are not defined in kubevirt node
620 for (String brName : bridgeNames) {
621 if (!phyNetworkNames.contains(brName)) {
622 // integration bridge and tunnel bridge should NOT be treated as
623 // physical bridges
Jian Li556709c2021-02-03 17:54:28 +0900624 if (brName.equals(INTEGRATION_BRIDGE) ||
625 brName.equals(TUNNEL_BRIDGE) ||
626 brName.startsWith(TENANT_BRIDGE_PREFIX)) {
Jian Li4fe40e52021-01-06 03:29:58 +0900627 continue;
628 }
629 removePhysicalPatchPorts(node, brName.substring(NETWORK_BEGIN));
630 removePhysicalBridge(node, brName.substring(NETWORK_BEGIN));
631 }
632 }
633 }
634
635 private void unprovisionPhysicalInterfaces(KubevirtNode node) {
636 node.phyIntfs().forEach(pi -> {
637 detachPhysicalPort(node, pi.network(), pi.intf());
638 removePhysicalPatchPorts(node, pi.network());
639 removePhysicalBridge(node, pi.network());
640 });
641 }
642
643 private boolean hasPhyBridge(KubevirtNode node, String bridgeName) {
644 BridgeConfig bridgeConfig =
645 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
646 return bridgeConfig.getBridges().stream()
647 .anyMatch(br -> br.name().equals(bridgeName));
648 }
649
650 private boolean hasPhyPatchPort(KubevirtNode node, String patchPortName) {
651 List<Port> ports = deviceService.getPorts(node.intgBridge());
652 return ports.stream().anyMatch(p ->
653 p.annotations().value(PORT_NAME).equals(patchPortName));
654 }
655
656 private boolean hasPhyIntf(KubevirtNode node, String intfName) {
657 BridgeConfig bridgeConfig =
658 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
659 return bridgeConfig.getPorts().stream()
660 .anyMatch(p -> p.annotations().value(PORT_NAME).equals(intfName));
661 }
662
663 private void createPhysicalBridge(KubevirtNode osNode,
664 KubevirtPhyInterface phyInterface) {
665 Device device = deviceService.getDevice(osNode.ovsdb());
666
667 String bridgeName = BRIDGE_PREFIX + phyInterface.network();
668
669 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
670 .name(bridgeName)
671 .mcastSnoopingEnable();
672
673 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
674 bridgeConfig.addBridge(builder.build());
675 }
676
677 private void removePhysicalBridge(KubevirtNode node, String network) {
678 Device device = deviceService.getDevice(node.ovsdb());
679
680 BridgeName bridgeName = BridgeName.bridgeName(BRIDGE_PREFIX + network);
681
682 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
683 bridgeConfig.deleteBridge(bridgeName);
684 }
685
686 private void createPhysicalPatchPorts(KubevirtNode node,
687 KubevirtPhyInterface phyInterface) {
688 Device device = deviceService.getDevice(node.ovsdb());
689
690 if (device == null || !device.is(InterfaceConfig.class)) {
691 log.error("Failed to create patch interface on {}", node.ovsdb());
692 return;
693 }
694
695 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
696
697 String intToPhyPatchPort = structurePortName(
698 INTEGRATION_TO_PHYSICAL_PREFIX + phyInterface.network());
699 String phyToIntPatchPort = structurePortName(
700 phyInterface.network() + PHYSICAL_TO_INTEGRATION_SUFFIX);
701
702 // integration bridge -> physical bridge
703 PatchDescription intToPhyPatchDesc =
704 DefaultPatchDescription.builder()
705 .deviceId(INTEGRATION_BRIDGE)
706 .ifaceName(intToPhyPatchPort)
707 .peer(phyToIntPatchPort)
708 .build();
709
710 // physical bridge -> integration bridge
711 PatchDescription phyToIntPatchDesc =
712 DefaultPatchDescription.builder()
713 .deviceId(physicalDeviceId)
714 .ifaceName(phyToIntPatchPort)
715 .peer(intToPhyPatchPort)
716 .build();
717
718 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
719 ifaceConfig.addPatchMode(INTEGRATION_TO_PHYSICAL_PREFIX +
720 phyInterface.network(), intToPhyPatchDesc);
721 ifaceConfig.addPatchMode(phyInterface.network() +
722 PHYSICAL_TO_INTEGRATION_SUFFIX, phyToIntPatchDesc);
723
724 addOrRemoveSystemInterface(node, physicalDeviceId,
725 phyInterface.intf(), deviceService, true);
726 }
727
728 private void removePhysicalPatchPorts(KubevirtNode node, String network) {
729 Device device = deviceService.getDevice(node.ovsdb());
730
731 if (device == null || !device.is(InterfaceConfig.class)) {
732 log.error("Failed to remove patch interface on {}", node.ovsdb());
733 return;
734 }
735
736 String intToPhyPatchPort = structurePortName(
737 INTEGRATION_TO_PHYSICAL_PREFIX + network);
738
739 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
740 ifaceConfig.removePatchMode(intToPhyPatchPort);
741 }
742
743 private void attachPhysicalPort(KubevirtNode node,
744 KubevirtPhyInterface phyInterface) {
745
746 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
747
748 addOrRemoveSystemInterface(node, physicalDeviceId,
749 phyInterface.intf(), deviceService, true);
750 }
751
752 private void detachPhysicalPort(KubevirtNode node, String network, String portName) {
753 String physicalDeviceId = BRIDGE_PREFIX + network;
754
755 addOrRemoveSystemInterface(node, physicalDeviceId, portName, deviceService, false);
756 }
757
758 /**
759 * An internal OVSDB listener. This listener is used for listening the
760 * network facing events from OVSDB device. If a new OVSDB device is detected,
761 * ONOS tries to bootstrap the kubernetes node.
762 */
763 private class InternalOvsdbListener implements DeviceListener {
764
765 @Override
766 public boolean isRelevant(DeviceEvent event) {
767 return event.subject().type() == Device.Type.CONTROLLER;
768 }
769
770 private boolean isRelevantHelper() {
771 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
772 }
773
774 @Override
775 public void event(DeviceEvent event) {
776 Device device = event.subject();
777
778 switch (event.type()) {
779 case DEVICE_AVAILABILITY_CHANGED:
780 case DEVICE_ADDED:
781 eventExecutor.execute(() -> {
782
783 if (!isRelevantHelper()) {
784 return;
785 }
786
787 KubevirtNode node = nodeAdminService.node(device.id());
788
789 if (node == null) {
790 return;
791 }
792
793 if (deviceService.isAvailable(device.id())) {
794 log.debug("OVSDB {} detected", device.id());
795 bootstrapNode(node);
796 }
797 });
798 break;
799 case PORT_ADDED:
800 case PORT_REMOVED:
801 case DEVICE_REMOVED:
802 default:
803 // do nothing
804 break;
805 }
806 }
807 }
808
809 /**
810 * An internal integration bridge listener. This listener is used for
811 * listening the events from integration bridge. To listen the events from
812 * other types of bridge such as provider bridge or tunnel bridge, we need
813 * to augment KubevirtNodeService.node() method.
814 */
815 private class InternalBridgeListener implements DeviceListener {
816
817 @Override
818 public boolean isRelevant(DeviceEvent event) {
819 return event.subject().type() == Device.Type.SWITCH;
820 }
821
822 private boolean isRelevantHelper() {
823 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
824 }
825
826 @Override
827 public void event(DeviceEvent event) {
828 Device device = event.subject();
829 Port port = event.port();
830
831 switch (event.type()) {
832 case DEVICE_AVAILABILITY_CHANGED:
833 case DEVICE_ADDED:
834 eventExecutor.execute(() -> processDeviceAddition(device));
835 break;
836 case PORT_UPDATED:
837 case PORT_ADDED:
838 eventExecutor.execute(() -> processPortAddition(device, port));
839 break;
840 case PORT_REMOVED:
841 eventExecutor.execute(() -> processPortRemoval(device, port));
842 break;
843 case DEVICE_REMOVED:
844 default:
845 // do nothing
846 break;
847 }
848 }
849
850 void processDeviceAddition(Device device) {
851 if (!isRelevantHelper()) {
852 return;
853 }
854
855 KubevirtNode node = nodeAdminService.node(device.id());
856
857 if (node == null) {
858 return;
859 }
860
861 if (deviceService.isAvailable(device.id())) {
862 log.debug("Bridge created on {}", node.hostname());
863 bootstrapNode(node);
864 } else if (node.state() == COMPLETE) {
865 log.info("Device {} disconnected", device.id());
866 setState(node, INCOMPLETE);
867 }
868
869 if (autoRecovery) {
870 if (node.state() == INCOMPLETE || node.state() == DEVICE_CREATED) {
871 log.info("Device {} is reconnected", device.id());
872 nodeAdminService.updateNode(node.updateState(INIT));
873 }
874 }
875 }
876
877 void processPortAddition(Device device, Port port) {
878 if (!isRelevantHelper()) {
879 return;
880 }
881
882 KubevirtNode node = nodeAdminService.nodeByTunBridge(device.id());
883
884 if (node == null) {
885 return;
886 }
887
888 String portName = port.annotations().value(PORT_NAME);
889 if (node.state() == DEVICE_CREATED && (
890 Objects.equals(portName, VXLAN) ||
891 Objects.equals(portName, GRE) ||
892 Objects.equals(portName, GENEVE))) {
893 log.info("Interface {} added or updated to {}",
894 portName, device.id());
895 bootstrapNode(node);
896 }
897 }
898
899 void processPortRemoval(Device device, Port port) {
900 if (!isRelevantHelper()) {
901 return;
902 }
903
904 KubevirtNode node = nodeAdminService.node(device.id());
905
906 if (node == null) {
907 return;
908 }
909
910 String portName = port.annotations().value(PORT_NAME);
911 if (node.state() == COMPLETE && (
912 Objects.equals(portName, VXLAN) ||
913 Objects.equals(portName, GRE) ||
914 Objects.equals(portName, GENEVE))) {
915 log.warn("Interface {} removed from {}", portName, device.id());
916 setState(node, INCOMPLETE);
917 }
918 }
919 }
920
921 /**
922 * An internal kubevirt node listener.
923 * The notification is triggered by KubevirtNodeStore.
924 */
925 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
926
927 private boolean isRelevantHelper() {
928 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
929 }
930
931 @Override
932 public void event(KubevirtNodeEvent event) {
933 switch (event.type()) {
934 case KUBEVIRT_NODE_CREATED:
935 case KUBEVIRT_NODE_UPDATED:
936 eventExecutor.execute(() -> {
937 if (!isRelevantHelper()) {
938 return;
939 }
940 bootstrapNode(event.subject());
941 });
942 break;
943 case KUBEVIRT_NODE_REMOVED:
944 eventExecutor.execute(() -> {
945 if (!isRelevantHelper()) {
946 return;
947 }
948 removeNode(event.subject());
949 });
950 break;
951 case KUBEVIRT_NODE_INCOMPLETE:
952 default:
953 break;
954 }
955 }
956 }
957}