blob: 4756cde6e261dcced97d29b079f1536288830969 [file] [log] [blame]
Jian Li138f51f2021-01-06 03:29:58 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
Jian Lie7a702f2021-01-28 16:14:54 +090018import com.google.common.collect.Lists;
19import org.onlab.packet.IpAddress;
Jian Li138f51f2021-01-06 03:29:58 +090020import org.onlab.util.Tools;
21import org.onosproject.cfg.ComponentConfigService;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
Jian Lie7a702f2021-01-28 16:14:54 +090027import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
Jian Li138f51f2021-01-06 03:29:58 +090028import org.onosproject.kubevirtnode.api.KubevirtNode;
29import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
30import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
31import org.onosproject.kubevirtnode.api.KubevirtNodeHandler;
32import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
33import org.onosproject.kubevirtnode.api.KubevirtNodeState;
34import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
35import org.onosproject.net.Device;
36import org.onosproject.net.DeviceId;
37import org.onosproject.net.Port;
38import org.onosproject.net.behaviour.BridgeConfig;
39import org.onosproject.net.behaviour.BridgeDescription;
40import org.onosproject.net.behaviour.BridgeName;
41import org.onosproject.net.behaviour.ControllerInfo;
42import org.onosproject.net.behaviour.DefaultBridgeDescription;
43import org.onosproject.net.behaviour.DefaultPatchDescription;
44import org.onosproject.net.behaviour.DefaultTunnelDescription;
45import org.onosproject.net.behaviour.InterfaceConfig;
46import org.onosproject.net.behaviour.PatchDescription;
47import org.onosproject.net.behaviour.TunnelDescription;
48import org.onosproject.net.behaviour.TunnelEndPoints;
49import org.onosproject.net.behaviour.TunnelKey;
50import org.onosproject.net.device.DeviceAdminService;
51import org.onosproject.net.device.DeviceEvent;
52import org.onosproject.net.device.DeviceListener;
53import org.onosproject.net.device.DeviceService;
54import org.onosproject.ovsdb.controller.OvsdbClientService;
55import org.onosproject.ovsdb.controller.OvsdbController;
56import org.osgi.service.component.ComponentContext;
57import org.osgi.service.component.annotations.Activate;
58import org.osgi.service.component.annotations.Component;
59import org.osgi.service.component.annotations.Deactivate;
60import org.osgi.service.component.annotations.Modified;
61import org.osgi.service.component.annotations.Reference;
62import org.osgi.service.component.annotations.ReferenceCardinality;
63import org.slf4j.Logger;
64
65import java.util.Dictionary;
66import java.util.List;
67import java.util.Objects;
68import java.util.Set;
69import java.util.concurrent.ExecutorService;
70import java.util.stream.Collectors;
71
72import static java.lang.Thread.sleep;
73import static java.util.concurrent.Executors.newSingleThreadExecutor;
74import static org.onlab.packet.TpPort.tpPort;
75import static org.onlab.util.Tools.groupedThreads;
76import static org.onosproject.kubevirtnode.api.Constants.BRIDGE_PREFIX;
77import static org.onosproject.kubevirtnode.api.Constants.FLOW_KEY;
78import static org.onosproject.kubevirtnode.api.Constants.GENEVE;
79import static org.onosproject.kubevirtnode.api.Constants.GRE;
80import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_BRIDGE;
81import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_PHYSICAL_PREFIX;
Jian Li138f51f2021-01-06 03:29:58 +090082import static org.onosproject.kubevirtnode.api.Constants.PHYSICAL_TO_INTEGRATION_SUFFIX;
Jian Lib5ab63c2021-02-03 17:54:28 +090083import static org.onosproject.kubevirtnode.api.Constants.TENANT_BRIDGE_PREFIX;
Jian Li138f51f2021-01-06 03:29:58 +090084import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
Jian Li138f51f2021-01-06 03:29:58 +090085import static org.onosproject.kubevirtnode.api.Constants.VXLAN;
86import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
87import static org.onosproject.kubevirtnode.api.KubevirtNodeState.COMPLETE;
88import static org.onosproject.kubevirtnode.api.KubevirtNodeState.DEVICE_CREATED;
89import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INCOMPLETE;
90import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
91import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
92import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
93import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT;
94import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
95import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.addOrRemoveSystemInterface;
96import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getBooleanProperty;
97import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getOvsdbClient;
98import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.isOvsdbConnected;
99import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.structurePortName;
100import static org.onosproject.net.AnnotationKeys.PORT_NAME;
101import static org.slf4j.LoggerFactory.getLogger;
102
103/**
104 * Service bootstraps kubernetes node based on its type.
105 */
106@Component(immediate = true,
107 property = {
108 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
109 AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
110 }
111)
112public class DefaultKubevirtNodeHandler implements KubevirtNodeHandler {
113
114 private final Logger log = getLogger(getClass());
115
116 private static final String DEFAULT_OF_PROTO = "tcp";
117 private static final int DEFAULT_OFPORT = 6653;
118 private static final int DPID_BEGIN = 3;
119 private static final int NETWORK_BEGIN = 3;
120 private static final long SLEEP_SHORT_MS = 1000; // we wait 1s
Jian Lib9eb11d2021-02-19 18:13:10 +0900121 private static final long SLEEP_MID_MS = 2000; // we wait 2s
122 private static final long SLEEP_LONG_MS = 5000; // we wait 5s
Jian Li138f51f2021-01-06 03:29:58 +0900123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
125 protected CoreService coreService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
128 protected LeadershipService leadershipService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
131 protected ClusterService clusterService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
134 protected DeviceService deviceService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
137 protected DeviceAdminService deviceAdminService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
140 protected OvsdbController ovsdbController;
141
142 @Reference(cardinality = ReferenceCardinality.MANDATORY)
143 protected KubevirtNodeAdminService nodeAdminService;
144
145 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lie7a702f2021-01-28 16:14:54 +0900146 protected KubevirtApiConfigService apiConfigService;
147
148 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li138f51f2021-01-06 03:29:58 +0900149 protected ComponentConfigService componentConfigService;
150
151 /** OVSDB server listen port. */
152 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
153
154 /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
155 private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
156
157 private final ExecutorService eventExecutor = newSingleThreadExecutor(
158 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
159
160 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
161 private final DeviceListener bridgeListener = new InternalBridgeListener();
162 private final KubevirtNodeListener kubevirtNodeListener = new InternalKubevirtNodeListener();
163
164 private ApplicationId appId;
165 private NodeId localNode;
166
167 @Activate
168 protected void activate() {
169 appId = coreService.getAppId(APP_ID);
170 localNode = clusterService.getLocalNode().id();
171
172 componentConfigService.registerProperties(getClass());
173 leadershipService.runForLeadership(appId.name());
174 deviceService.addListener(ovsdbListener);
175 deviceService.addListener(bridgeListener);
176 nodeAdminService.addListener(kubevirtNodeListener);
177
178 log.info("Started");
179 }
180
181 @Deactivate
182 protected void deactivate() {
183 nodeAdminService.removeListener(kubevirtNodeListener);
184 deviceService.removeListener(bridgeListener);
185 deviceService.removeListener(ovsdbListener);
186 componentConfigService.unregisterProperties(getClass(), false);
187 leadershipService.withdraw(appId.name());
188 eventExecutor.shutdown();
189
190 log.info("Stopped");
191 }
192
193 @Modified
194 protected void modified(ComponentContext context) {
195 readComponentConfiguration(context);
196
197 log.info("Modified");
198 }
199
200 @Override
201 public void processInitState(KubevirtNode node) {
202 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
203 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
204 return;
205 }
206 if (!deviceService.isAvailable(node.intgBridge())) {
207 createBridge(node, INTEGRATION_BRIDGE, node.intgBridge());
208 }
209
210 if (!deviceService.isAvailable(node.tunBridge())) {
211 createBridge(node, TUNNEL_BRIDGE, node.tunBridge());
212 }
213 }
214
215 @Override
216 public void processDeviceCreatedState(KubevirtNode node) {
217 try {
218 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
219 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
220 return;
221 }
222
223 // create patch ports between integration to other bridges
Jian Li543fe852021-02-04 17:25:01 +0900224 // for now, we do not directly connect br-int with br-tun,
225 // as br-int only deals with FLAT and VLAN network
226 // createPatchInterfaces(node);
Jian Li138f51f2021-01-06 03:29:58 +0900227
228 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
229 createVxlanTunnelInterface(node);
230 }
231
232 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
233 createGreTunnelInterface(node);
234 }
235
236 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
237 createGeneveTunnelInterface(node);
238 }
Jian Li138f51f2021-01-06 03:29:58 +0900239 } catch (Exception e) {
240 log.error("Exception occurred because of {}", e);
241 }
242 }
243
244 @Override
245 public void processCompleteState(KubevirtNode node) {
246 // do something if needed
247 }
248
249 @Override
250 public void processIncompleteState(KubevirtNode node) {
251 // do something if needed
252 }
253
254 @Override
255 public void processOnBoardedState(KubevirtNode node) {
256 // do something if needed
257 }
258
259 /**
260 * Extracts properties from the component configuration context.
261 *
262 * @param context the component context
263 */
264 private void readComponentConfiguration(ComponentContext context) {
265 Dictionary<?, ?> properties = context.getProperties();
266
267 Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
268 if (ovsdbPortConfigured == null) {
269 ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
270 log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
271 } else {
272 ovsdbPortNum = ovsdbPortConfigured;
273 log.info("Configured. OVSDB port is {}", ovsdbPortNum);
274 }
275
276 Boolean autoRecoveryConfigured =
277 getBooleanProperty(properties, AUTO_RECOVERY);
278 if (autoRecoveryConfigured == null) {
279 autoRecovery = AUTO_RECOVERY_DEFAULT;
280 log.info("Auto recovery flag is NOT " +
281 "configured, default value is {}", autoRecovery);
282 } else {
283 autoRecovery = autoRecoveryConfigured;
284 log.info("Configured. Auto recovery flag is {}", autoRecovery);
285 }
286 }
287
288 /**
289 * Creates a bridge with a given name on a given kubernetes node.
290 *
291 * @param node kubevirt node
292 * @param bridgeName bridge name
293 * @param devId device identifier
294 */
295 private void createBridge(KubevirtNode node, String bridgeName, DeviceId devId) {
296 Device device = deviceService.getDevice(node.ovsdb());
297
Jian Lie7a702f2021-01-28 16:14:54 +0900298 IpAddress serverIp = apiConfigService.apiConfig().ipAddress();
299 ControllerInfo controlInfo = new ControllerInfo(serverIp, DEFAULT_OFPORT, DEFAULT_OF_PROTO);
300 List<ControllerInfo> controllers = Lists.newArrayList(controlInfo);
Jian Li138f51f2021-01-06 03:29:58 +0900301
302 String dpid = devId.toString().substring(DPID_BEGIN);
303
304 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
305 .name(bridgeName)
306 .failMode(BridgeDescription.FailMode.SECURE)
307 .datapathId(dpid)
308 .disableInBand()
309 .controllers(controllers);
310
311 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
312 bridgeConfig.addBridge(builder.build());
313 }
314
315 /**
316 * Creates a VXLAN tunnel interface in a given kubevirt node.
317 *
318 * @param node kubevirt node
319 */
320 private void createVxlanTunnelInterface(KubevirtNode node) {
321 createTunnelInterface(node, VXLAN, VXLAN);
322 }
323
324 /**
325 * Creates a GRE tunnel interface in a given kubevirt node.
326 *
327 * @param node kubevirt node
328 */
329 private void createGreTunnelInterface(KubevirtNode node) {
330 createTunnelInterface(node, GRE, GRE);
331 }
332
333 /**
334 * Creates a GENEVE tunnel interface in a given kubevirt node.
335 *
336 * @param node kubevirt node
337 */
338 private void createGeneveTunnelInterface(KubevirtNode node) {
339 createTunnelInterface(node, GENEVE, GENEVE);
340 }
341
342 /**
343 * Creates a tunnel interface in a given kubernetes node.
344 *
345 * @param node kubevirt node
Jian Lib9eb11d2021-02-19 18:13:10 +0900346 * @param type kubevirt type
347 * @param intfName tunnel interface name
Jian Li138f51f2021-01-06 03:29:58 +0900348 */
349 private void createTunnelInterface(KubevirtNode node,
350 String type, String intfName) {
351 if (isIntfEnabled(node, intfName)) {
352 return;
353 }
354
355 Device device = deviceService.getDevice(node.ovsdb());
356 if (device == null || !device.is(InterfaceConfig.class)) {
357 log.error("Failed to create tunnel interface on {}", node.ovsdb());
358 return;
359 }
360
361 TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
362
363 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
364 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
365 }
366
367 /**
368 * Builds tunnel description according to the network type.
369 *
370 * @param type network type
Jian Lib9eb11d2021-02-19 18:13:10 +0900371 * @param intfName tunnel interface
Jian Li138f51f2021-01-06 03:29:58 +0900372 * @return tunnel description
373 */
374 private TunnelDescription buildTunnelDesc(String type, String intfName) {
375 TunnelKey<String> key = new TunnelKey<>(FLOW_KEY);
376 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
377 TunnelDescription.Builder tdBuilder =
378 DefaultTunnelDescription.builder()
379 .deviceId(TUNNEL_BRIDGE)
380 .ifaceName(intfName)
381 .remote(TunnelEndPoints.flowTunnelEndpoint())
382 .key(key);
383
384 switch (type) {
385 case VXLAN:
386 tdBuilder.type(TunnelDescription.Type.VXLAN);
387 break;
388 case GRE:
389 tdBuilder.type(TunnelDescription.Type.GRE);
390 break;
391 case GENEVE:
392 tdBuilder.type(TunnelDescription.Type.GENEVE);
393 break;
394 default:
395 return null;
396 }
397
398 return tdBuilder.build();
399 }
400 return null;
401 }
402
403 /**
404 * Checks whether a given network interface in a given kubernetes node
405 * is enabled or not.
406 *
407 * @param node kubevirt node
408 * @param intf network interface name
409 * @return true if the given interface is enabled, false otherwise
410 */
411 private boolean isIntfEnabled(KubevirtNode node, String intf) {
412 return deviceService.isAvailable(node.tunBridge()) &&
413 deviceService.getPorts(node.tunBridge()).stream()
414 .anyMatch(port -> Objects.equals(
415 port.annotations().value(PORT_NAME), intf) &&
416 port.isEnabled());
417 }
418
Jian Li138f51f2021-01-06 03:29:58 +0900419 /**
420 * Bootstraps a new kubevirt node.
421 *
422 * @param node kubevirt node
423 */
424 private void bootstrapNode(KubevirtNode node) {
425 if (isCurrentStateDone(node)) {
426 setState(node, node.state().nextState());
427 } else {
428 log.trace("Processing {} state for {}", node.state(), node.hostname());
429 node.state().process(this, node);
430 }
431 }
432
433 /**
434 * Removes the existing kubevirt node.
435 *
436 * @param node kubevirt node
437 */
438 private void removeNode(KubevirtNode node) {
439 OvsdbClientService client = getOvsdbClient(node, ovsdbPortNum, ovsdbController);
440 if (client == null) {
441 log.info("Failed to get ovsdb client");
442 return;
443 }
444
445 // unprovision physical interfaces from the node
446 // this procedure includes detaching physical port from physical bridge,
447 // remove patch ports from br-int, removing physical bridge
448 unprovisionPhysicalInterfaces(node);
449
450 // delete tunnel bridge from the node
451 client.dropBridge(TUNNEL_BRIDGE);
452
453 // delete integration bridge from the node
454 client.dropBridge(INTEGRATION_BRIDGE);
455 }
456
457 /**
458 * Checks whether all requirements for this state are fulfilled or not.
459 *
460 * @param node kubevirt node
461 * @return true if all requirements are fulfilled, false otherwise
462 */
463 private boolean isCurrentStateDone(KubevirtNode node) {
464 switch (node.state()) {
465 case INIT:
466 return isInitStateDone(node);
467 case DEVICE_CREATED:
468 return isDeviceCreatedStateDone(node);
469 case COMPLETE:
470 case INCOMPLETE:
471 case ON_BOARDED:
472 // always return false
473 // run init CLI to re-trigger node bootstrap
474 return false;
475 default:
476 return true;
477 }
478 }
479
480 private boolean isInitStateDone(KubevirtNode node) {
481 if (!isOvsdbConnected(node, ovsdbPortNum,
482 ovsdbController, deviceService)) {
483 return false;
484 }
485
486 try {
487 // we need to wait a while, in case interfaces and bridges
488 // creation requires some time
489 sleep(SLEEP_SHORT_MS);
490 } catch (InterruptedException e) {
491 log.error("Exception caused during init state checking...");
492 }
493
494 cleanPhysicalInterfaces(node);
495
Jian Lib9eb11d2021-02-19 18:13:10 +0900496 // provision new physical interfaces on the given node
497 // this includes creating physical bridge, attaching physical port
498 // to physical bridge, adding patch ports to both physical bridge and br-int
499 provisionPhysicalInterfaces(node);
500
Jian Li138f51f2021-01-06 03:29:58 +0900501 return node.intgBridge() != null && node.tunBridge() != null &&
502 deviceService.isAvailable(node.intgBridge()) &&
503 deviceService.isAvailable(node.tunBridge());
504 }
505
506 private boolean isDeviceCreatedStateDone(KubevirtNode node) {
507
508 try {
509 // we need to wait a while, in case tunneling ports
510 // creation requires some time
Jian Lib9eb11d2021-02-19 18:13:10 +0900511 sleep(SLEEP_MID_MS);
Jian Li138f51f2021-01-06 03:29:58 +0900512 } catch (InterruptedException e) {
513 log.error("Exception caused during init state checking...");
514 }
515
516 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
Jian Lib9eb11d2021-02-19 18:13:10 +0900517 log.warn("VXLAN interface is not enabled!");
Jian Li138f51f2021-01-06 03:29:58 +0900518 return false;
519 }
520 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
Jian Lib9eb11d2021-02-19 18:13:10 +0900521 log.warn("GRE interface is not enabled!");
Jian Li138f51f2021-01-06 03:29:58 +0900522 return false;
523 }
524 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
Jian Lib9eb11d2021-02-19 18:13:10 +0900525 log.warn("GENEVE interface is not enabled!");
Jian Li138f51f2021-01-06 03:29:58 +0900526 return false;
527 }
528
529 for (KubevirtPhyInterface phyIntf : node.phyIntfs()) {
530 if (phyIntf == null) {
Jian Lib9eb11d2021-02-19 18:13:10 +0900531 log.warn("Physnet interface is invalid");
Jian Li138f51f2021-01-06 03:29:58 +0900532 return false;
533 }
534
Jian Lib9eb11d2021-02-19 18:13:10 +0900535 try {
536 // we need to wait a while, in case tunneling ports
537 // creation requires some time
538 sleep(SLEEP_LONG_MS);
539 } catch (InterruptedException e) {
540 log.error("Exception caused during init state checking...");
541 }
542
Jian Li138f51f2021-01-06 03:29:58 +0900543 String bridgeName = BRIDGE_PREFIX + phyIntf.network();
544 String patchPortName = structurePortName(
545 INTEGRATION_TO_PHYSICAL_PREFIX + phyIntf.network());
546
547 if (!(hasPhyBridge(node, bridgeName) &&
548 hasPhyPatchPort(node, patchPortName) &&
549 hasPhyIntf(node, phyIntf.intf()))) {
Jian Lib9eb11d2021-02-19 18:13:10 +0900550 log.warn("PhyBridge {}", hasPhyBridge(node, bridgeName));
551 log.warn("hasPhyPatchPort {}", hasPhyPatchPort(node, patchPortName));
552 log.warn("hasPhyIntf {}", hasPhyIntf(node, phyIntf.intf()));
Jian Li138f51f2021-01-06 03:29:58 +0900553 return false;
554 }
555 }
556
557 return true;
558 }
559
560 /**
561 * Configures the kubernetes node with new state.
562 *
563 * @param node kubevirt node
564 * @param newState a new state
565 */
566 private void setState(KubevirtNode node, KubevirtNodeState newState) {
567 if (node.state() == newState) {
568 return;
569 }
570 KubevirtNode updated = node.updateState(newState);
571 nodeAdminService.updateNode(updated);
572 log.info("Changed {} state: {}", node.hostname(), newState);
573 }
574
575 private void provisionPhysicalInterfaces(KubevirtNode node) {
576 node.phyIntfs().forEach(pi -> {
577 String bridgeName = BRIDGE_PREFIX + pi.network();
578 String patchPortName =
579 structurePortName(INTEGRATION_TO_PHYSICAL_PREFIX + pi.network());
580
581 if (!hasPhyBridge(node, bridgeName)) {
582 createPhysicalBridge(node, pi);
583 createPhysicalPatchPorts(node, pi);
584 attachPhysicalPort(node, pi);
Jian Lib9eb11d2021-02-19 18:13:10 +0900585
586 log.info("Creating physnet bridge {}", bridgeName);
587 log.info("Creating patch ports for physnet {}", bridgeName);
Jian Li138f51f2021-01-06 03:29:58 +0900588 } else {
589 // in case physical bridge exists, but patch port is missing on br-int,
590 // we will add patch port to connect br-int with physical bridge
591 if (!hasPhyPatchPort(node, patchPortName)) {
592 createPhysicalPatchPorts(node, pi);
Jian Lib9eb11d2021-02-19 18:13:10 +0900593
594 log.info("Creating patch ports for physnet {}", bridgeName);
Jian Li138f51f2021-01-06 03:29:58 +0900595 }
596 }
597 });
598 }
599
600 private void cleanPhysicalInterfaces(KubevirtNode node) {
601 Device device = deviceService.getDevice(node.ovsdb());
602
603 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
604
605 Set<String> bridgeNames = bridgeConfig.getBridges().stream()
606 .map(BridgeDescription::name).collect(Collectors.toSet());
607
608 Set<String> phyNetworkNames = node.phyIntfs().stream()
609 .map(pi -> BRIDGE_PREFIX + pi.network()).collect(Collectors.toSet());
610
611 // we remove existing physical bridges and patch ports, if the physical
612 // bridges are not defined in kubevirt node
613 for (String brName : bridgeNames) {
Jian Lib9eb11d2021-02-19 18:13:10 +0900614 // integration bridge and tunnel bridge should NOT be treated as
615 // physical bridges
616 if (brName.equals(INTEGRATION_BRIDGE) ||
617 brName.equals(TUNNEL_BRIDGE) ||
618 brName.startsWith(TENANT_BRIDGE_PREFIX)) {
619 continue;
620 }
621
Jian Li138f51f2021-01-06 03:29:58 +0900622 if (!phyNetworkNames.contains(brName)) {
Jian Li138f51f2021-01-06 03:29:58 +0900623 removePhysicalPatchPorts(node, brName.substring(NETWORK_BEGIN));
624 removePhysicalBridge(node, brName.substring(NETWORK_BEGIN));
Jian Lib9eb11d2021-02-19 18:13:10 +0900625 log.info("Removing physical bridge {}...", brName);
Jian Li138f51f2021-01-06 03:29:58 +0900626 }
627 }
628 }
629
630 private void unprovisionPhysicalInterfaces(KubevirtNode node) {
631 node.phyIntfs().forEach(pi -> {
632 detachPhysicalPort(node, pi.network(), pi.intf());
633 removePhysicalPatchPorts(node, pi.network());
634 removePhysicalBridge(node, pi.network());
635 });
636 }
637
638 private boolean hasPhyBridge(KubevirtNode node, String bridgeName) {
639 BridgeConfig bridgeConfig =
640 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
641 return bridgeConfig.getBridges().stream()
642 .anyMatch(br -> br.name().equals(bridgeName));
643 }
644
645 private boolean hasPhyPatchPort(KubevirtNode node, String patchPortName) {
646 List<Port> ports = deviceService.getPorts(node.intgBridge());
647 return ports.stream().anyMatch(p ->
648 p.annotations().value(PORT_NAME).equals(patchPortName));
649 }
650
651 private boolean hasPhyIntf(KubevirtNode node, String intfName) {
652 BridgeConfig bridgeConfig =
653 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
654 return bridgeConfig.getPorts().stream()
655 .anyMatch(p -> p.annotations().value(PORT_NAME).equals(intfName));
656 }
657
658 private void createPhysicalBridge(KubevirtNode osNode,
659 KubevirtPhyInterface phyInterface) {
660 Device device = deviceService.getDevice(osNode.ovsdb());
661
662 String bridgeName = BRIDGE_PREFIX + phyInterface.network();
663
664 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
665 .name(bridgeName)
666 .mcastSnoopingEnable();
667
668 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
669 bridgeConfig.addBridge(builder.build());
670 }
671
672 private void removePhysicalBridge(KubevirtNode node, String network) {
673 Device device = deviceService.getDevice(node.ovsdb());
674
675 BridgeName bridgeName = BridgeName.bridgeName(BRIDGE_PREFIX + network);
676
677 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
678 bridgeConfig.deleteBridge(bridgeName);
679 }
680
681 private void createPhysicalPatchPorts(KubevirtNode node,
682 KubevirtPhyInterface phyInterface) {
683 Device device = deviceService.getDevice(node.ovsdb());
684
685 if (device == null || !device.is(InterfaceConfig.class)) {
686 log.error("Failed to create patch interface on {}", node.ovsdb());
687 return;
688 }
689
690 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
691
692 String intToPhyPatchPort = structurePortName(
693 INTEGRATION_TO_PHYSICAL_PREFIX + phyInterface.network());
694 String phyToIntPatchPort = structurePortName(
695 phyInterface.network() + PHYSICAL_TO_INTEGRATION_SUFFIX);
696
697 // integration bridge -> physical bridge
698 PatchDescription intToPhyPatchDesc =
699 DefaultPatchDescription.builder()
700 .deviceId(INTEGRATION_BRIDGE)
701 .ifaceName(intToPhyPatchPort)
702 .peer(phyToIntPatchPort)
703 .build();
704
705 // physical bridge -> integration bridge
706 PatchDescription phyToIntPatchDesc =
707 DefaultPatchDescription.builder()
708 .deviceId(physicalDeviceId)
709 .ifaceName(phyToIntPatchPort)
710 .peer(intToPhyPatchPort)
711 .build();
712
713 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
714 ifaceConfig.addPatchMode(INTEGRATION_TO_PHYSICAL_PREFIX +
715 phyInterface.network(), intToPhyPatchDesc);
716 ifaceConfig.addPatchMode(phyInterface.network() +
717 PHYSICAL_TO_INTEGRATION_SUFFIX, phyToIntPatchDesc);
718
719 addOrRemoveSystemInterface(node, physicalDeviceId,
720 phyInterface.intf(), deviceService, true);
721 }
722
723 private void removePhysicalPatchPorts(KubevirtNode node, String network) {
724 Device device = deviceService.getDevice(node.ovsdb());
725
726 if (device == null || !device.is(InterfaceConfig.class)) {
727 log.error("Failed to remove patch interface on {}", node.ovsdb());
728 return;
729 }
730
731 String intToPhyPatchPort = structurePortName(
732 INTEGRATION_TO_PHYSICAL_PREFIX + network);
733
734 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
735 ifaceConfig.removePatchMode(intToPhyPatchPort);
736 }
737
738 private void attachPhysicalPort(KubevirtNode node,
739 KubevirtPhyInterface phyInterface) {
740
741 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
742
743 addOrRemoveSystemInterface(node, physicalDeviceId,
744 phyInterface.intf(), deviceService, true);
745 }
746
747 private void detachPhysicalPort(KubevirtNode node, String network, String portName) {
748 String physicalDeviceId = BRIDGE_PREFIX + network;
749
750 addOrRemoveSystemInterface(node, physicalDeviceId, portName, deviceService, false);
751 }
752
753 /**
754 * An internal OVSDB listener. This listener is used for listening the
755 * network facing events from OVSDB device. If a new OVSDB device is detected,
756 * ONOS tries to bootstrap the kubernetes node.
757 */
758 private class InternalOvsdbListener implements DeviceListener {
759
760 @Override
761 public boolean isRelevant(DeviceEvent event) {
762 return event.subject().type() == Device.Type.CONTROLLER;
763 }
764
765 private boolean isRelevantHelper() {
766 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
767 }
768
769 @Override
770 public void event(DeviceEvent event) {
771 Device device = event.subject();
772
773 switch (event.type()) {
774 case DEVICE_AVAILABILITY_CHANGED:
775 case DEVICE_ADDED:
776 eventExecutor.execute(() -> {
777
778 if (!isRelevantHelper()) {
779 return;
780 }
781
782 KubevirtNode node = nodeAdminService.node(device.id());
783
784 if (node == null) {
785 return;
786 }
787
788 if (deviceService.isAvailable(device.id())) {
789 log.debug("OVSDB {} detected", device.id());
790 bootstrapNode(node);
791 }
792 });
793 break;
794 case PORT_ADDED:
795 case PORT_REMOVED:
796 case DEVICE_REMOVED:
797 default:
798 // do nothing
799 break;
800 }
801 }
802 }
803
804 /**
805 * An internal integration bridge listener. This listener is used for
806 * listening the events from integration bridge. To listen the events from
807 * other types of bridge such as provider bridge or tunnel bridge, we need
808 * to augment KubevirtNodeService.node() method.
809 */
810 private class InternalBridgeListener implements DeviceListener {
811
812 @Override
813 public boolean isRelevant(DeviceEvent event) {
814 return event.subject().type() == Device.Type.SWITCH;
815 }
816
817 private boolean isRelevantHelper() {
818 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
819 }
820
821 @Override
822 public void event(DeviceEvent event) {
823 Device device = event.subject();
824 Port port = event.port();
825
826 switch (event.type()) {
827 case DEVICE_AVAILABILITY_CHANGED:
828 case DEVICE_ADDED:
829 eventExecutor.execute(() -> processDeviceAddition(device));
830 break;
831 case PORT_UPDATED:
832 case PORT_ADDED:
833 eventExecutor.execute(() -> processPortAddition(device, port));
834 break;
835 case PORT_REMOVED:
836 eventExecutor.execute(() -> processPortRemoval(device, port));
837 break;
838 case DEVICE_REMOVED:
839 default:
840 // do nothing
841 break;
842 }
843 }
844
845 void processDeviceAddition(Device device) {
846 if (!isRelevantHelper()) {
847 return;
848 }
849
850 KubevirtNode node = nodeAdminService.node(device.id());
851
852 if (node == null) {
853 return;
854 }
855
856 if (deviceService.isAvailable(device.id())) {
857 log.debug("Bridge created on {}", node.hostname());
858 bootstrapNode(node);
859 } else if (node.state() == COMPLETE) {
860 log.info("Device {} disconnected", device.id());
861 setState(node, INCOMPLETE);
862 }
863
864 if (autoRecovery) {
865 if (node.state() == INCOMPLETE || node.state() == DEVICE_CREATED) {
866 log.info("Device {} is reconnected", device.id());
867 nodeAdminService.updateNode(node.updateState(INIT));
868 }
869 }
870 }
871
872 void processPortAddition(Device device, Port port) {
873 if (!isRelevantHelper()) {
874 return;
875 }
876
877 KubevirtNode node = nodeAdminService.nodeByTunBridge(device.id());
878
879 if (node == null) {
880 return;
881 }
882
883 String portName = port.annotations().value(PORT_NAME);
884 if (node.state() == DEVICE_CREATED && (
885 Objects.equals(portName, VXLAN) ||
886 Objects.equals(portName, GRE) ||
887 Objects.equals(portName, GENEVE))) {
888 log.info("Interface {} added or updated to {}",
889 portName, device.id());
890 bootstrapNode(node);
891 }
892 }
893
894 void processPortRemoval(Device device, Port port) {
895 if (!isRelevantHelper()) {
896 return;
897 }
898
899 KubevirtNode node = nodeAdminService.node(device.id());
900
901 if (node == null) {
902 return;
903 }
904
905 String portName = port.annotations().value(PORT_NAME);
906 if (node.state() == COMPLETE && (
907 Objects.equals(portName, VXLAN) ||
908 Objects.equals(portName, GRE) ||
909 Objects.equals(portName, GENEVE))) {
910 log.warn("Interface {} removed from {}", portName, device.id());
911 setState(node, INCOMPLETE);
912 }
913 }
914 }
915
916 /**
917 * An internal kubevirt node listener.
918 * The notification is triggered by KubevirtNodeStore.
919 */
920 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
921
922 private boolean isRelevantHelper() {
923 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
924 }
925
926 @Override
927 public void event(KubevirtNodeEvent event) {
928 switch (event.type()) {
929 case KUBEVIRT_NODE_CREATED:
930 case KUBEVIRT_NODE_UPDATED:
931 eventExecutor.execute(() -> {
932 if (!isRelevantHelper()) {
933 return;
934 }
935 bootstrapNode(event.subject());
936 });
937 break;
938 case KUBEVIRT_NODE_REMOVED:
939 eventExecutor.execute(() -> {
940 if (!isRelevantHelper()) {
941 return;
942 }
943 removeNode(event.subject());
944 });
945 break;
946 case KUBEVIRT_NODE_INCOMPLETE:
947 default:
948 break;
949 }
950 }
951 }
952}