blob: 55fd13c37b34b14eeaa9c5cd3cd85ad8cc5d4bdf [file] [log] [blame]
Jian Li138f51f2021-01-06 03:29:58 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
Jian Lie7a702f2021-01-28 16:14:54 +090018import com.google.common.collect.Lists;
19import org.onlab.packet.IpAddress;
Jian Li138f51f2021-01-06 03:29:58 +090020import org.onlab.util.Tools;
21import org.onosproject.cfg.ComponentConfigService;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
Jian Lie7a702f2021-01-28 16:14:54 +090027import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
Jian Li138f51f2021-01-06 03:29:58 +090028import org.onosproject.kubevirtnode.api.KubevirtNode;
29import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
30import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
31import org.onosproject.kubevirtnode.api.KubevirtNodeHandler;
32import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
33import org.onosproject.kubevirtnode.api.KubevirtNodeState;
34import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
35import org.onosproject.net.Device;
36import org.onosproject.net.DeviceId;
37import org.onosproject.net.Port;
38import org.onosproject.net.behaviour.BridgeConfig;
39import org.onosproject.net.behaviour.BridgeDescription;
40import org.onosproject.net.behaviour.BridgeName;
41import org.onosproject.net.behaviour.ControllerInfo;
42import org.onosproject.net.behaviour.DefaultBridgeDescription;
43import org.onosproject.net.behaviour.DefaultPatchDescription;
44import org.onosproject.net.behaviour.DefaultTunnelDescription;
45import org.onosproject.net.behaviour.InterfaceConfig;
46import org.onosproject.net.behaviour.PatchDescription;
47import org.onosproject.net.behaviour.TunnelDescription;
48import org.onosproject.net.behaviour.TunnelEndPoints;
49import org.onosproject.net.behaviour.TunnelKey;
50import org.onosproject.net.device.DeviceAdminService;
51import org.onosproject.net.device.DeviceEvent;
52import org.onosproject.net.device.DeviceListener;
53import org.onosproject.net.device.DeviceService;
54import org.onosproject.ovsdb.controller.OvsdbClientService;
55import org.onosproject.ovsdb.controller.OvsdbController;
56import org.osgi.service.component.ComponentContext;
57import org.osgi.service.component.annotations.Activate;
58import org.osgi.service.component.annotations.Component;
59import org.osgi.service.component.annotations.Deactivate;
60import org.osgi.service.component.annotations.Modified;
61import org.osgi.service.component.annotations.Reference;
62import org.osgi.service.component.annotations.ReferenceCardinality;
63import org.slf4j.Logger;
64
65import java.util.Dictionary;
66import java.util.List;
67import java.util.Objects;
68import java.util.Set;
69import java.util.concurrent.ExecutorService;
70import java.util.stream.Collectors;
71
72import static java.lang.Thread.sleep;
73import static java.util.concurrent.Executors.newSingleThreadExecutor;
74import static org.onlab.packet.TpPort.tpPort;
75import static org.onlab.util.Tools.groupedThreads;
76import static org.onosproject.kubevirtnode.api.Constants.BRIDGE_PREFIX;
77import static org.onosproject.kubevirtnode.api.Constants.FLOW_KEY;
78import static org.onosproject.kubevirtnode.api.Constants.GENEVE;
79import static org.onosproject.kubevirtnode.api.Constants.GRE;
80import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_BRIDGE;
81import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_PHYSICAL_PREFIX;
82import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_TUNNEL;
83import static org.onosproject.kubevirtnode.api.Constants.PHYSICAL_TO_INTEGRATION_SUFFIX;
84import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
85import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_TO_INTEGRATION;
86import static org.onosproject.kubevirtnode.api.Constants.VXLAN;
87import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
88import static org.onosproject.kubevirtnode.api.KubevirtNodeState.COMPLETE;
89import static org.onosproject.kubevirtnode.api.KubevirtNodeState.DEVICE_CREATED;
90import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INCOMPLETE;
91import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
92import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
93import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
94import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT;
95import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
96import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.addOrRemoveSystemInterface;
97import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getBooleanProperty;
98import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getOvsdbClient;
99import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.isOvsdbConnected;
100import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.structurePortName;
101import static org.onosproject.net.AnnotationKeys.PORT_NAME;
102import static org.slf4j.LoggerFactory.getLogger;
103
104/**
105 * Service bootstraps kubernetes node based on its type.
106 */
107@Component(immediate = true,
108 property = {
109 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
110 AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
111 }
112)
113public class DefaultKubevirtNodeHandler implements KubevirtNodeHandler {
114
115 private final Logger log = getLogger(getClass());
116
117 private static final String DEFAULT_OF_PROTO = "tcp";
118 private static final int DEFAULT_OFPORT = 6653;
119 private static final int DPID_BEGIN = 3;
120 private static final int NETWORK_BEGIN = 3;
121 private static final long SLEEP_SHORT_MS = 1000; // we wait 1s
122 private static final long SLEEP_LONG_MS = 2000; // we wait 2s
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
125 protected CoreService coreService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
128 protected LeadershipService leadershipService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
131 protected ClusterService clusterService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
134 protected DeviceService deviceService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
137 protected DeviceAdminService deviceAdminService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
140 protected OvsdbController ovsdbController;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY)
143 protected KubevirtNodeAdminService nodeAdminService;
144
145 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lie7a702f2021-01-28 16:14:54 +0900146 protected KubevirtApiConfigService apiConfigService;
147
148 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li138f51f2021-01-06 03:29:58 +0900149 protected ComponentConfigService componentConfigService;
150
151 /** OVSDB server listen port. */
152 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
153
154 /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
155 private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
156
157 private final ExecutorService eventExecutor = newSingleThreadExecutor(
158 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
159
160 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
161 private final DeviceListener bridgeListener = new InternalBridgeListener();
162 private final KubevirtNodeListener kubevirtNodeListener = new InternalKubevirtNodeListener();
163
164 private ApplicationId appId;
165 private NodeId localNode;
166
167 @Activate
168 protected void activate() {
169 appId = coreService.getAppId(APP_ID);
170 localNode = clusterService.getLocalNode().id();
171
172 componentConfigService.registerProperties(getClass());
173 leadershipService.runForLeadership(appId.name());
174 deviceService.addListener(ovsdbListener);
175 deviceService.addListener(bridgeListener);
176 nodeAdminService.addListener(kubevirtNodeListener);
177
178 log.info("Started");
179 }
180
181 @Deactivate
182 protected void deactivate() {
183 nodeAdminService.removeListener(kubevirtNodeListener);
184 deviceService.removeListener(bridgeListener);
185 deviceService.removeListener(ovsdbListener);
186 componentConfigService.unregisterProperties(getClass(), false);
187 leadershipService.withdraw(appId.name());
188 eventExecutor.shutdown();
189
190 log.info("Stopped");
191 }
192
193 @Modified
194 protected void modified(ComponentContext context) {
195 readComponentConfiguration(context);
196
197 log.info("Modified");
198 }
199
200 @Override
201 public void processInitState(KubevirtNode node) {
202 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
203 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
204 return;
205 }
206 if (!deviceService.isAvailable(node.intgBridge())) {
207 createBridge(node, INTEGRATION_BRIDGE, node.intgBridge());
208 }
209
210 if (!deviceService.isAvailable(node.tunBridge())) {
211 createBridge(node, TUNNEL_BRIDGE, node.tunBridge());
212 }
213 }
214
215 @Override
216 public void processDeviceCreatedState(KubevirtNode node) {
217 try {
218 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
219 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
220 return;
221 }
222
223 // create patch ports between integration to other bridges
224 createPatchInterfaces(node);
225
226 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
227 createVxlanTunnelInterface(node);
228 }
229
230 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
231 createGreTunnelInterface(node);
232 }
233
234 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
235 createGeneveTunnelInterface(node);
236 }
237
238 // provision new physical interfaces on the given node
239 // this includes creating physical bridge, attaching physical port
240 // to physical bridge, adding patch ports to both physical bridge and br-int
241 provisionPhysicalInterfaces(node);
242
243 } catch (Exception e) {
244 log.error("Exception occurred because of {}", e);
245 }
246 }
247
248 @Override
249 public void processCompleteState(KubevirtNode node) {
250 // do something if needed
251 }
252
253 @Override
254 public void processIncompleteState(KubevirtNode node) {
255 // do something if needed
256 }
257
258 @Override
259 public void processOnBoardedState(KubevirtNode node) {
260 // do something if needed
261 }
262
263 /**
264 * Extracts properties from the component configuration context.
265 *
266 * @param context the component context
267 */
268 private void readComponentConfiguration(ComponentContext context) {
269 Dictionary<?, ?> properties = context.getProperties();
270
271 Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
272 if (ovsdbPortConfigured == null) {
273 ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
274 log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
275 } else {
276 ovsdbPortNum = ovsdbPortConfigured;
277 log.info("Configured. OVSDB port is {}", ovsdbPortNum);
278 }
279
280 Boolean autoRecoveryConfigured =
281 getBooleanProperty(properties, AUTO_RECOVERY);
282 if (autoRecoveryConfigured == null) {
283 autoRecovery = AUTO_RECOVERY_DEFAULT;
284 log.info("Auto recovery flag is NOT " +
285 "configured, default value is {}", autoRecovery);
286 } else {
287 autoRecovery = autoRecoveryConfigured;
288 log.info("Configured. Auto recovery flag is {}", autoRecovery);
289 }
290 }
291
292 /**
293 * Creates a bridge with a given name on a given kubernetes node.
294 *
295 * @param node kubevirt node
296 * @param bridgeName bridge name
297 * @param devId device identifier
298 */
299 private void createBridge(KubevirtNode node, String bridgeName, DeviceId devId) {
300 Device device = deviceService.getDevice(node.ovsdb());
301
Jian Lie7a702f2021-01-28 16:14:54 +0900302 IpAddress serverIp = apiConfigService.apiConfig().ipAddress();
303 ControllerInfo controlInfo = new ControllerInfo(serverIp, DEFAULT_OFPORT, DEFAULT_OF_PROTO);
304 List<ControllerInfo> controllers = Lists.newArrayList(controlInfo);
Jian Li138f51f2021-01-06 03:29:58 +0900305
306 String dpid = devId.toString().substring(DPID_BEGIN);
307
308 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
309 .name(bridgeName)
310 .failMode(BridgeDescription.FailMode.SECURE)
311 .datapathId(dpid)
312 .disableInBand()
313 .controllers(controllers);
314
315 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
316 bridgeConfig.addBridge(builder.build());
317 }
318
319 /**
320 * Creates a VXLAN tunnel interface in a given kubevirt node.
321 *
322 * @param node kubevirt node
323 */
324 private void createVxlanTunnelInterface(KubevirtNode node) {
325 createTunnelInterface(node, VXLAN, VXLAN);
326 }
327
328 /**
329 * Creates a GRE tunnel interface in a given kubevirt node.
330 *
331 * @param node kubevirt node
332 */
333 private void createGreTunnelInterface(KubevirtNode node) {
334 createTunnelInterface(node, GRE, GRE);
335 }
336
337 /**
338 * Creates a GENEVE tunnel interface in a given kubevirt node.
339 *
340 * @param node kubevirt node
341 */
342 private void createGeneveTunnelInterface(KubevirtNode node) {
343 createTunnelInterface(node, GENEVE, GENEVE);
344 }
345
346 /**
347 * Creates a tunnel interface in a given kubernetes node.
348 *
349 * @param node kubevirt node
350 */
351 private void createTunnelInterface(KubevirtNode node,
352 String type, String intfName) {
353 if (isIntfEnabled(node, intfName)) {
354 return;
355 }
356
357 Device device = deviceService.getDevice(node.ovsdb());
358 if (device == null || !device.is(InterfaceConfig.class)) {
359 log.error("Failed to create tunnel interface on {}", node.ovsdb());
360 return;
361 }
362
363 TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
364
365 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
366 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
367 }
368
369 /**
370 * Builds tunnel description according to the network type.
371 *
372 * @param type network type
373 * @return tunnel description
374 */
375 private TunnelDescription buildTunnelDesc(String type, String intfName) {
376 TunnelKey<String> key = new TunnelKey<>(FLOW_KEY);
377 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
378 TunnelDescription.Builder tdBuilder =
379 DefaultTunnelDescription.builder()
380 .deviceId(TUNNEL_BRIDGE)
381 .ifaceName(intfName)
382 .remote(TunnelEndPoints.flowTunnelEndpoint())
383 .key(key);
384
385 switch (type) {
386 case VXLAN:
387 tdBuilder.type(TunnelDescription.Type.VXLAN);
388 break;
389 case GRE:
390 tdBuilder.type(TunnelDescription.Type.GRE);
391 break;
392 case GENEVE:
393 tdBuilder.type(TunnelDescription.Type.GENEVE);
394 break;
395 default:
396 return null;
397 }
398
399 return tdBuilder.build();
400 }
401 return null;
402 }
403
404 /**
405 * Checks whether a given network interface in a given kubernetes node
406 * is enabled or not.
407 *
408 * @param node kubevirt node
409 * @param intf network interface name
410 * @return true if the given interface is enabled, false otherwise
411 */
412 private boolean isIntfEnabled(KubevirtNode node, String intf) {
413 return deviceService.isAvailable(node.tunBridge()) &&
414 deviceService.getPorts(node.tunBridge()).stream()
415 .anyMatch(port -> Objects.equals(
416 port.annotations().value(PORT_NAME), intf) &&
417 port.isEnabled());
418 }
419
420 private void createPatchInterfaces(KubevirtNode node) {
421 Device device = deviceService.getDevice(node.ovsdb());
422 if (device == null || !device.is(InterfaceConfig.class)) {
423 log.error("Failed to create patch interface on {}", node.ovsdb());
424 return;
425 }
426
427 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
428
429 // integration bridge -> tunnel bridge
430 PatchDescription brIntTunPatchDesc =
431 DefaultPatchDescription.builder()
432 .deviceId(INTEGRATION_BRIDGE)
433 .ifaceName(INTEGRATION_TO_TUNNEL)
434 .peer(TUNNEL_TO_INTEGRATION)
435 .build();
436
437 ifaceConfig.addPatchMode(INTEGRATION_TO_TUNNEL, brIntTunPatchDesc);
438
439 // tunnel bridge -> integration bridge
440 PatchDescription brTunIntPatchDesc =
441 DefaultPatchDescription.builder()
442 .deviceId(TUNNEL_BRIDGE)
443 .ifaceName(TUNNEL_TO_INTEGRATION)
444 .peer(INTEGRATION_TO_TUNNEL)
445 .build();
446
447 ifaceConfig.addPatchMode(TUNNEL_TO_INTEGRATION, brTunIntPatchDesc);
448 }
449
450 /**
451 * Bootstraps a new kubevirt node.
452 *
453 * @param node kubevirt node
454 */
455 private void bootstrapNode(KubevirtNode node) {
456 if (isCurrentStateDone(node)) {
457 setState(node, node.state().nextState());
458 } else {
459 log.trace("Processing {} state for {}", node.state(), node.hostname());
460 node.state().process(this, node);
461 }
462 }
463
464 /**
465 * Removes the existing kubevirt node.
466 *
467 * @param node kubevirt node
468 */
469 private void removeNode(KubevirtNode node) {
470 OvsdbClientService client = getOvsdbClient(node, ovsdbPortNum, ovsdbController);
471 if (client == null) {
472 log.info("Failed to get ovsdb client");
473 return;
474 }
475
476 // unprovision physical interfaces from the node
477 // this procedure includes detaching physical port from physical bridge,
478 // remove patch ports from br-int, removing physical bridge
479 unprovisionPhysicalInterfaces(node);
480
481 // delete tunnel bridge from the node
482 client.dropBridge(TUNNEL_BRIDGE);
483
484 // delete integration bridge from the node
485 client.dropBridge(INTEGRATION_BRIDGE);
486 }
487
488 /**
489 * Checks whether all requirements for this state are fulfilled or not.
490 *
491 * @param node kubevirt node
492 * @return true if all requirements are fulfilled, false otherwise
493 */
494 private boolean isCurrentStateDone(KubevirtNode node) {
495 switch (node.state()) {
496 case INIT:
497 return isInitStateDone(node);
498 case DEVICE_CREATED:
499 return isDeviceCreatedStateDone(node);
500 case COMPLETE:
501 case INCOMPLETE:
502 case ON_BOARDED:
503 // always return false
504 // run init CLI to re-trigger node bootstrap
505 return false;
506 default:
507 return true;
508 }
509 }
510
511 private boolean isInitStateDone(KubevirtNode node) {
512 if (!isOvsdbConnected(node, ovsdbPortNum,
513 ovsdbController, deviceService)) {
514 return false;
515 }
516
517 try {
518 // we need to wait a while, in case interfaces and bridges
519 // creation requires some time
520 sleep(SLEEP_SHORT_MS);
521 } catch (InterruptedException e) {
522 log.error("Exception caused during init state checking...");
523 }
524
525 cleanPhysicalInterfaces(node);
526
527 return node.intgBridge() != null && node.tunBridge() != null &&
528 deviceService.isAvailable(node.intgBridge()) &&
529 deviceService.isAvailable(node.tunBridge());
530 }
531
532 private boolean isDeviceCreatedStateDone(KubevirtNode node) {
533
534 try {
535 // we need to wait a while, in case tunneling ports
536 // creation requires some time
537 sleep(SLEEP_LONG_MS);
538 } catch (InterruptedException e) {
539 log.error("Exception caused during init state checking...");
540 }
541
542 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
543 return false;
544 }
545 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
546 return false;
547 }
548 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
549 return false;
550 }
551
552 for (KubevirtPhyInterface phyIntf : node.phyIntfs()) {
553 if (phyIntf == null) {
554 return false;
555 }
556
557 String bridgeName = BRIDGE_PREFIX + phyIntf.network();
558 String patchPortName = structurePortName(
559 INTEGRATION_TO_PHYSICAL_PREFIX + phyIntf.network());
560
561 if (!(hasPhyBridge(node, bridgeName) &&
562 hasPhyPatchPort(node, patchPortName) &&
563 hasPhyIntf(node, phyIntf.intf()))) {
564 return false;
565 }
566 }
567
568 return true;
569 }
570
571 /**
572 * Configures the kubernetes node with new state.
573 *
574 * @param node kubevirt node
575 * @param newState a new state
576 */
577 private void setState(KubevirtNode node, KubevirtNodeState newState) {
578 if (node.state() == newState) {
579 return;
580 }
581 KubevirtNode updated = node.updateState(newState);
582 nodeAdminService.updateNode(updated);
583 log.info("Changed {} state: {}", node.hostname(), newState);
584 }
585
586 private void provisionPhysicalInterfaces(KubevirtNode node) {
587 node.phyIntfs().forEach(pi -> {
588 String bridgeName = BRIDGE_PREFIX + pi.network();
589 String patchPortName =
590 structurePortName(INTEGRATION_TO_PHYSICAL_PREFIX + pi.network());
591
592 if (!hasPhyBridge(node, bridgeName)) {
593 createPhysicalBridge(node, pi);
594 createPhysicalPatchPorts(node, pi);
595 attachPhysicalPort(node, pi);
596 } else {
597 // in case physical bridge exists, but patch port is missing on br-int,
598 // we will add patch port to connect br-int with physical bridge
599 if (!hasPhyPatchPort(node, patchPortName)) {
600 createPhysicalPatchPorts(node, pi);
601 }
602 }
603 });
604 }
605
606 private void cleanPhysicalInterfaces(KubevirtNode node) {
607 Device device = deviceService.getDevice(node.ovsdb());
608
609 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
610
611 Set<String> bridgeNames = bridgeConfig.getBridges().stream()
612 .map(BridgeDescription::name).collect(Collectors.toSet());
613
614 Set<String> phyNetworkNames = node.phyIntfs().stream()
615 .map(pi -> BRIDGE_PREFIX + pi.network()).collect(Collectors.toSet());
616
617 // we remove existing physical bridges and patch ports, if the physical
618 // bridges are not defined in kubevirt node
619 for (String brName : bridgeNames) {
620 if (!phyNetworkNames.contains(brName)) {
621 // integration bridge and tunnel bridge should NOT be treated as
622 // physical bridges
623 if (brName.equals(INTEGRATION_BRIDGE) || brName.equals(TUNNEL_BRIDGE)) {
624 continue;
625 }
626 removePhysicalPatchPorts(node, brName.substring(NETWORK_BEGIN));
627 removePhysicalBridge(node, brName.substring(NETWORK_BEGIN));
628 }
629 }
630 }
631
632 private void unprovisionPhysicalInterfaces(KubevirtNode node) {
633 node.phyIntfs().forEach(pi -> {
634 detachPhysicalPort(node, pi.network(), pi.intf());
635 removePhysicalPatchPorts(node, pi.network());
636 removePhysicalBridge(node, pi.network());
637 });
638 }
639
640 private boolean hasPhyBridge(KubevirtNode node, String bridgeName) {
641 BridgeConfig bridgeConfig =
642 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
643 return bridgeConfig.getBridges().stream()
644 .anyMatch(br -> br.name().equals(bridgeName));
645 }
646
647 private boolean hasPhyPatchPort(KubevirtNode node, String patchPortName) {
648 List<Port> ports = deviceService.getPorts(node.intgBridge());
649 return ports.stream().anyMatch(p ->
650 p.annotations().value(PORT_NAME).equals(patchPortName));
651 }
652
653 private boolean hasPhyIntf(KubevirtNode node, String intfName) {
654 BridgeConfig bridgeConfig =
655 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
656 return bridgeConfig.getPorts().stream()
657 .anyMatch(p -> p.annotations().value(PORT_NAME).equals(intfName));
658 }
659
660 private void createPhysicalBridge(KubevirtNode osNode,
661 KubevirtPhyInterface phyInterface) {
662 Device device = deviceService.getDevice(osNode.ovsdb());
663
664 String bridgeName = BRIDGE_PREFIX + phyInterface.network();
665
666 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
667 .name(bridgeName)
668 .mcastSnoopingEnable();
669
670 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
671 bridgeConfig.addBridge(builder.build());
672 }
673
674 private void removePhysicalBridge(KubevirtNode node, String network) {
675 Device device = deviceService.getDevice(node.ovsdb());
676
677 BridgeName bridgeName = BridgeName.bridgeName(BRIDGE_PREFIX + network);
678
679 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
680 bridgeConfig.deleteBridge(bridgeName);
681 }
682
683 private void createPhysicalPatchPorts(KubevirtNode node,
684 KubevirtPhyInterface phyInterface) {
685 Device device = deviceService.getDevice(node.ovsdb());
686
687 if (device == null || !device.is(InterfaceConfig.class)) {
688 log.error("Failed to create patch interface on {}", node.ovsdb());
689 return;
690 }
691
692 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
693
694 String intToPhyPatchPort = structurePortName(
695 INTEGRATION_TO_PHYSICAL_PREFIX + phyInterface.network());
696 String phyToIntPatchPort = structurePortName(
697 phyInterface.network() + PHYSICAL_TO_INTEGRATION_SUFFIX);
698
699 // integration bridge -> physical bridge
700 PatchDescription intToPhyPatchDesc =
701 DefaultPatchDescription.builder()
702 .deviceId(INTEGRATION_BRIDGE)
703 .ifaceName(intToPhyPatchPort)
704 .peer(phyToIntPatchPort)
705 .build();
706
707 // physical bridge -> integration bridge
708 PatchDescription phyToIntPatchDesc =
709 DefaultPatchDescription.builder()
710 .deviceId(physicalDeviceId)
711 .ifaceName(phyToIntPatchPort)
712 .peer(intToPhyPatchPort)
713 .build();
714
715 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
716 ifaceConfig.addPatchMode(INTEGRATION_TO_PHYSICAL_PREFIX +
717 phyInterface.network(), intToPhyPatchDesc);
718 ifaceConfig.addPatchMode(phyInterface.network() +
719 PHYSICAL_TO_INTEGRATION_SUFFIX, phyToIntPatchDesc);
720
721 addOrRemoveSystemInterface(node, physicalDeviceId,
722 phyInterface.intf(), deviceService, true);
723 }
724
725 private void removePhysicalPatchPorts(KubevirtNode node, String network) {
726 Device device = deviceService.getDevice(node.ovsdb());
727
728 if (device == null || !device.is(InterfaceConfig.class)) {
729 log.error("Failed to remove patch interface on {}", node.ovsdb());
730 return;
731 }
732
733 String intToPhyPatchPort = structurePortName(
734 INTEGRATION_TO_PHYSICAL_PREFIX + network);
735
736 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
737 ifaceConfig.removePatchMode(intToPhyPatchPort);
738 }
739
740 private void attachPhysicalPort(KubevirtNode node,
741 KubevirtPhyInterface phyInterface) {
742
743 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
744
745 addOrRemoveSystemInterface(node, physicalDeviceId,
746 phyInterface.intf(), deviceService, true);
747 }
748
749 private void detachPhysicalPort(KubevirtNode node, String network, String portName) {
750 String physicalDeviceId = BRIDGE_PREFIX + network;
751
752 addOrRemoveSystemInterface(node, physicalDeviceId, portName, deviceService, false);
753 }
754
755 /**
756 * An internal OVSDB listener. This listener is used for listening the
757 * network facing events from OVSDB device. If a new OVSDB device is detected,
758 * ONOS tries to bootstrap the kubernetes node.
759 */
760 private class InternalOvsdbListener implements DeviceListener {
761
762 @Override
763 public boolean isRelevant(DeviceEvent event) {
764 return event.subject().type() == Device.Type.CONTROLLER;
765 }
766
767 private boolean isRelevantHelper() {
768 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
769 }
770
771 @Override
772 public void event(DeviceEvent event) {
773 Device device = event.subject();
774
775 switch (event.type()) {
776 case DEVICE_AVAILABILITY_CHANGED:
777 case DEVICE_ADDED:
778 eventExecutor.execute(() -> {
779
780 if (!isRelevantHelper()) {
781 return;
782 }
783
784 KubevirtNode node = nodeAdminService.node(device.id());
785
786 if (node == null) {
787 return;
788 }
789
790 if (deviceService.isAvailable(device.id())) {
791 log.debug("OVSDB {} detected", device.id());
792 bootstrapNode(node);
793 }
794 });
795 break;
796 case PORT_ADDED:
797 case PORT_REMOVED:
798 case DEVICE_REMOVED:
799 default:
800 // do nothing
801 break;
802 }
803 }
804 }
805
806 /**
807 * An internal integration bridge listener. This listener is used for
808 * listening the events from integration bridge. To listen the events from
809 * other types of bridge such as provider bridge or tunnel bridge, we need
810 * to augment KubevirtNodeService.node() method.
811 */
812 private class InternalBridgeListener implements DeviceListener {
813
814 @Override
815 public boolean isRelevant(DeviceEvent event) {
816 return event.subject().type() == Device.Type.SWITCH;
817 }
818
819 private boolean isRelevantHelper() {
820 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
821 }
822
823 @Override
824 public void event(DeviceEvent event) {
825 Device device = event.subject();
826 Port port = event.port();
827
828 switch (event.type()) {
829 case DEVICE_AVAILABILITY_CHANGED:
830 case DEVICE_ADDED:
831 eventExecutor.execute(() -> processDeviceAddition(device));
832 break;
833 case PORT_UPDATED:
834 case PORT_ADDED:
835 eventExecutor.execute(() -> processPortAddition(device, port));
836 break;
837 case PORT_REMOVED:
838 eventExecutor.execute(() -> processPortRemoval(device, port));
839 break;
840 case DEVICE_REMOVED:
841 default:
842 // do nothing
843 break;
844 }
845 }
846
847 void processDeviceAddition(Device device) {
848 if (!isRelevantHelper()) {
849 return;
850 }
851
852 KubevirtNode node = nodeAdminService.node(device.id());
853
854 if (node == null) {
855 return;
856 }
857
858 if (deviceService.isAvailable(device.id())) {
859 log.debug("Bridge created on {}", node.hostname());
860 bootstrapNode(node);
861 } else if (node.state() == COMPLETE) {
862 log.info("Device {} disconnected", device.id());
863 setState(node, INCOMPLETE);
864 }
865
866 if (autoRecovery) {
867 if (node.state() == INCOMPLETE || node.state() == DEVICE_CREATED) {
868 log.info("Device {} is reconnected", device.id());
869 nodeAdminService.updateNode(node.updateState(INIT));
870 }
871 }
872 }
873
874 void processPortAddition(Device device, Port port) {
875 if (!isRelevantHelper()) {
876 return;
877 }
878
879 KubevirtNode node = nodeAdminService.nodeByTunBridge(device.id());
880
881 if (node == null) {
882 return;
883 }
884
885 String portName = port.annotations().value(PORT_NAME);
886 if (node.state() == DEVICE_CREATED && (
887 Objects.equals(portName, VXLAN) ||
888 Objects.equals(portName, GRE) ||
889 Objects.equals(portName, GENEVE))) {
890 log.info("Interface {} added or updated to {}",
891 portName, device.id());
892 bootstrapNode(node);
893 }
894 }
895
896 void processPortRemoval(Device device, Port port) {
897 if (!isRelevantHelper()) {
898 return;
899 }
900
901 KubevirtNode node = nodeAdminService.node(device.id());
902
903 if (node == null) {
904 return;
905 }
906
907 String portName = port.annotations().value(PORT_NAME);
908 if (node.state() == COMPLETE && (
909 Objects.equals(portName, VXLAN) ||
910 Objects.equals(portName, GRE) ||
911 Objects.equals(portName, GENEVE))) {
912 log.warn("Interface {} removed from {}", portName, device.id());
913 setState(node, INCOMPLETE);
914 }
915 }
916 }
917
918 /**
919 * An internal kubevirt node listener.
920 * The notification is triggered by KubevirtNodeStore.
921 */
922 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
923
924 private boolean isRelevantHelper() {
925 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
926 }
927
928 @Override
929 public void event(KubevirtNodeEvent event) {
930 switch (event.type()) {
931 case KUBEVIRT_NODE_CREATED:
932 case KUBEVIRT_NODE_UPDATED:
933 eventExecutor.execute(() -> {
934 if (!isRelevantHelper()) {
935 return;
936 }
937 bootstrapNode(event.subject());
938 });
939 break;
940 case KUBEVIRT_NODE_REMOVED:
941 eventExecutor.execute(() -> {
942 if (!isRelevantHelper()) {
943 return;
944 }
945 removeNode(event.subject());
946 });
947 break;
948 case KUBEVIRT_NODE_INCOMPLETE:
949 default:
950 break;
951 }
952 }
953 }
954}