blob: 0850ba2dae7def1962264148ac3105421441537f [file] [log] [blame]
Jian Li4fe40e52021-01-06 03:29:58 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
Jian Li72f3dac2021-01-28 16:14:54 +090018import com.google.common.collect.Lists;
19import org.onlab.packet.IpAddress;
Jian Li4fe40e52021-01-06 03:29:58 +090020import org.onlab.util.Tools;
21import org.onosproject.cfg.ComponentConfigService;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
Jian Li72f3dac2021-01-28 16:14:54 +090027import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
Jian Li4fe40e52021-01-06 03:29:58 +090028import org.onosproject.kubevirtnode.api.KubevirtNode;
29import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
30import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
31import org.onosproject.kubevirtnode.api.KubevirtNodeHandler;
32import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
33import org.onosproject.kubevirtnode.api.KubevirtNodeState;
34import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
35import org.onosproject.net.Device;
36import org.onosproject.net.DeviceId;
37import org.onosproject.net.Port;
38import org.onosproject.net.behaviour.BridgeConfig;
39import org.onosproject.net.behaviour.BridgeDescription;
40import org.onosproject.net.behaviour.BridgeName;
41import org.onosproject.net.behaviour.ControllerInfo;
42import org.onosproject.net.behaviour.DefaultBridgeDescription;
43import org.onosproject.net.behaviour.DefaultPatchDescription;
44import org.onosproject.net.behaviour.DefaultTunnelDescription;
45import org.onosproject.net.behaviour.InterfaceConfig;
46import org.onosproject.net.behaviour.PatchDescription;
47import org.onosproject.net.behaviour.TunnelDescription;
48import org.onosproject.net.behaviour.TunnelEndPoints;
49import org.onosproject.net.behaviour.TunnelKey;
50import org.onosproject.net.device.DeviceAdminService;
51import org.onosproject.net.device.DeviceEvent;
52import org.onosproject.net.device.DeviceListener;
53import org.onosproject.net.device.DeviceService;
Daniel Parka5ba88d2021-05-28 15:46:46 +090054import org.onosproject.net.flow.FlowRuleService;
Jian Li4fe40e52021-01-06 03:29:58 +090055import org.onosproject.ovsdb.controller.OvsdbClientService;
56import org.onosproject.ovsdb.controller.OvsdbController;
57import org.osgi.service.component.ComponentContext;
58import org.osgi.service.component.annotations.Activate;
59import org.osgi.service.component.annotations.Component;
60import org.osgi.service.component.annotations.Deactivate;
61import org.osgi.service.component.annotations.Modified;
62import org.osgi.service.component.annotations.Reference;
63import org.osgi.service.component.annotations.ReferenceCardinality;
64import org.slf4j.Logger;
65
66import java.util.Dictionary;
67import java.util.List;
68import java.util.Objects;
69import java.util.Set;
70import java.util.concurrent.ExecutorService;
71import java.util.stream.Collectors;
72
73import static java.lang.Thread.sleep;
74import static java.util.concurrent.Executors.newSingleThreadExecutor;
75import static org.onlab.packet.TpPort.tpPort;
76import static org.onlab.util.Tools.groupedThreads;
77import static org.onosproject.kubevirtnode.api.Constants.BRIDGE_PREFIX;
78import static org.onosproject.kubevirtnode.api.Constants.FLOW_KEY;
79import static org.onosproject.kubevirtnode.api.Constants.GENEVE;
80import static org.onosproject.kubevirtnode.api.Constants.GRE;
81import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_BRIDGE;
82import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_PHYSICAL_PREFIX;
Daniel Parkf3136042021-03-10 07:49:11 +090083import static org.onosproject.kubevirtnode.api.Constants.INTEGRATION_TO_TUNNEL;
Jian Li4fe40e52021-01-06 03:29:58 +090084import static org.onosproject.kubevirtnode.api.Constants.PHYSICAL_TO_INTEGRATION_SUFFIX;
Jian Li4b3436a2022-03-23 13:07:19 +090085import static org.onosproject.kubevirtnode.api.Constants.STT;
Jian Li556709c2021-02-03 17:54:28 +090086import static org.onosproject.kubevirtnode.api.Constants.TENANT_BRIDGE_PREFIX;
Jian Li4fe40e52021-01-06 03:29:58 +090087import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_BRIDGE;
Daniel Parkf3136042021-03-10 07:49:11 +090088import static org.onosproject.kubevirtnode.api.Constants.TUNNEL_TO_INTEGRATION;
Jian Li4fe40e52021-01-06 03:29:58 +090089import static org.onosproject.kubevirtnode.api.Constants.VXLAN;
Daniel Parkf3136042021-03-10 07:49:11 +090090import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.GATEWAY;
Daniel Park17fe7982022-04-04 17:48:01 +090091import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
Jian Li4fe40e52021-01-06 03:29:58 +090092import static org.onosproject.kubevirtnode.api.KubevirtNodeService.APP_ID;
93import static org.onosproject.kubevirtnode.api.KubevirtNodeState.COMPLETE;
94import static org.onosproject.kubevirtnode.api.KubevirtNodeState.DEVICE_CREATED;
95import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INCOMPLETE;
96import static org.onosproject.kubevirtnode.api.KubevirtNodeState.INIT;
97import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
98import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
99import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT;
100import static org.onosproject.kubevirtnode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
101import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.addOrRemoveSystemInterface;
102import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getBooleanProperty;
103import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.getOvsdbClient;
104import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.isOvsdbConnected;
Jian Li94b6d162021-04-15 17:09:11 +0900105import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.resolveHostname;
Jian Li4fe40e52021-01-06 03:29:58 +0900106import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.structurePortName;
107import static org.onosproject.net.AnnotationKeys.PORT_NAME;
108import static org.slf4j.LoggerFactory.getLogger;
109
110/**
111 * Service bootstraps kubernetes node based on its type.
112 */
113@Component(immediate = true,
114 property = {
115 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
116 AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
117 }
118)
119public class DefaultKubevirtNodeHandler implements KubevirtNodeHandler {
120
121 private final Logger log = getLogger(getClass());
122
123 private static final String DEFAULT_OF_PROTO = "tcp";
124 private static final int DEFAULT_OFPORT = 6653;
125 private static final int DPID_BEGIN = 3;
126 private static final int NETWORK_BEGIN = 3;
127 private static final long SLEEP_SHORT_MS = 1000; // we wait 1s
Jian Li4b249702021-02-19 18:13:10 +0900128 private static final long SLEEP_MID_MS = 2000; // we wait 2s
129 private static final long SLEEP_LONG_MS = 5000; // we wait 5s
Jian Li4fe40e52021-01-06 03:29:58 +0900130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
132 protected CoreService coreService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
135 protected LeadershipService leadershipService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
138 protected ClusterService clusterService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
141 protected DeviceService deviceService;
142
143 @Reference(cardinality = ReferenceCardinality.MANDATORY)
144 protected DeviceAdminService deviceAdminService;
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY)
147 protected OvsdbController ovsdbController;
148
149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
150 protected KubevirtNodeAdminService nodeAdminService;
151
152 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li72f3dac2021-01-28 16:14:54 +0900153 protected KubevirtApiConfigService apiConfigService;
154
155 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4fe40e52021-01-06 03:29:58 +0900156 protected ComponentConfigService componentConfigService;
157
Daniel Parka5ba88d2021-05-28 15:46:46 +0900158 @Reference(cardinality = ReferenceCardinality.MANDATORY)
159 protected FlowRuleService flowRuleService;
160
Jian Li4fe40e52021-01-06 03:29:58 +0900161 /** OVSDB server listen port. */
162 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
163
164 /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
165 private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
166
167 private final ExecutorService eventExecutor = newSingleThreadExecutor(
168 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
169
170 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
171 private final DeviceListener bridgeListener = new InternalBridgeListener();
172 private final KubevirtNodeListener kubevirtNodeListener = new InternalKubevirtNodeListener();
173
174 private ApplicationId appId;
175 private NodeId localNode;
176
177 @Activate
178 protected void activate() {
179 appId = coreService.getAppId(APP_ID);
180 localNode = clusterService.getLocalNode().id();
181
182 componentConfigService.registerProperties(getClass());
183 leadershipService.runForLeadership(appId.name());
184 deviceService.addListener(ovsdbListener);
185 deviceService.addListener(bridgeListener);
186 nodeAdminService.addListener(kubevirtNodeListener);
187
188 log.info("Started");
189 }
190
191 @Deactivate
192 protected void deactivate() {
193 nodeAdminService.removeListener(kubevirtNodeListener);
194 deviceService.removeListener(bridgeListener);
195 deviceService.removeListener(ovsdbListener);
196 componentConfigService.unregisterProperties(getClass(), false);
197 leadershipService.withdraw(appId.name());
198 eventExecutor.shutdown();
199
200 log.info("Stopped");
201 }
202
203 @Modified
204 protected void modified(ComponentContext context) {
205 readComponentConfiguration(context);
206
207 log.info("Modified");
208 }
209
210 @Override
211 public void processInitState(KubevirtNode node) {
212 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
213 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
214 return;
215 }
216 if (!deviceService.isAvailable(node.intgBridge())) {
217 createBridge(node, INTEGRATION_BRIDGE, node.intgBridge());
218 }
219
220 if (!deviceService.isAvailable(node.tunBridge())) {
221 createBridge(node, TUNNEL_BRIDGE, node.tunBridge());
222 }
223 }
224
225 @Override
226 public void processDeviceCreatedState(KubevirtNode node) {
227 try {
228 if (!isOvsdbConnected(node, ovsdbPortNum, ovsdbController, deviceService)) {
229 ovsdbController.connect(node.managementIp(), tpPort(ovsdbPortNum));
230 return;
231 }
232
233 // create patch ports between integration to other bridges
Jian Li858ccd72021-02-04 17:25:01 +0900234 // for now, we do not directly connect br-int with br-tun,
235 // as br-int only deals with FLAT and VLAN network
236 // createPatchInterfaces(node);
Jian Li4fe40e52021-01-06 03:29:58 +0900237
238 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
239 createVxlanTunnelInterface(node);
240 }
241
242 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
243 createGreTunnelInterface(node);
244 }
245
246 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
247 createGeneveTunnelInterface(node);
248 }
Jian Li4b3436a2022-03-23 13:07:19 +0900249
250 if (node.dataIp() != null && !isIntfEnabled(node, STT)) {
251 createSttTunnelInterface(node);
252 }
Jian Li4fe40e52021-01-06 03:29:58 +0900253 } catch (Exception e) {
254 log.error("Exception occurred because of {}", e);
255 }
256 }
257
258 @Override
259 public void processCompleteState(KubevirtNode node) {
260 // do something if needed
261 }
262
263 @Override
264 public void processIncompleteState(KubevirtNode node) {
265 // do something if needed
266 }
267
268 @Override
269 public void processOnBoardedState(KubevirtNode node) {
270 // do something if needed
271 }
272
273 /**
274 * Extracts properties from the component configuration context.
275 *
276 * @param context the component context
277 */
278 private void readComponentConfiguration(ComponentContext context) {
279 Dictionary<?, ?> properties = context.getProperties();
280
281 Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
282 if (ovsdbPortConfigured == null) {
283 ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
284 log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
285 } else {
286 ovsdbPortNum = ovsdbPortConfigured;
287 log.info("Configured. OVSDB port is {}", ovsdbPortNum);
288 }
289
290 Boolean autoRecoveryConfigured =
291 getBooleanProperty(properties, AUTO_RECOVERY);
292 if (autoRecoveryConfigured == null) {
293 autoRecovery = AUTO_RECOVERY_DEFAULT;
294 log.info("Auto recovery flag is NOT " +
295 "configured, default value is {}", autoRecovery);
296 } else {
297 autoRecovery = autoRecoveryConfigured;
298 log.info("Configured. Auto recovery flag is {}", autoRecovery);
299 }
300 }
301
302 /**
303 * Creates a bridge with a given name on a given kubernetes node.
304 *
305 * @param node kubevirt node
306 * @param bridgeName bridge name
307 * @param devId device identifier
308 */
309 private void createBridge(KubevirtNode node, String bridgeName, DeviceId devId) {
310 Device device = deviceService.getDevice(node.ovsdb());
311
Jian Lie0eaf5c2021-09-06 10:02:13 +0900312 IpAddress controllerIp = apiConfigService.apiConfig().controllerIp();
Jian Li94b6d162021-04-15 17:09:11 +0900313 String serviceFqdn = apiConfigService.apiConfig().serviceFqdn();
314 IpAddress serviceIp = null;
315
Jian Lie0eaf5c2021-09-06 10:02:13 +0900316 if (controllerIp == null) {
317 if (serviceFqdn != null) {
318 serviceIp = resolveHostname(serviceFqdn);
319 }
320
321 if (serviceIp != null) {
322 controllerIp = serviceIp;
323 } else {
324 controllerIp = apiConfigService.apiConfig().ipAddress();
325 }
Jian Li94b6d162021-04-15 17:09:11 +0900326 }
327
Jian Lie0eaf5c2021-09-06 10:02:13 +0900328 ControllerInfo controlInfo = new ControllerInfo(controllerIp, DEFAULT_OFPORT, DEFAULT_OF_PROTO);
Jian Li72f3dac2021-01-28 16:14:54 +0900329 List<ControllerInfo> controllers = Lists.newArrayList(controlInfo);
Jian Li4fe40e52021-01-06 03:29:58 +0900330
331 String dpid = devId.toString().substring(DPID_BEGIN);
332
333 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
334 .name(bridgeName)
335 .failMode(BridgeDescription.FailMode.SECURE)
336 .datapathId(dpid)
337 .disableInBand()
338 .controllers(controllers);
339
340 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
341 bridgeConfig.addBridge(builder.build());
342 }
343
344 /**
345 * Creates a VXLAN tunnel interface in a given kubevirt node.
346 *
347 * @param node kubevirt node
348 */
349 private void createVxlanTunnelInterface(KubevirtNode node) {
350 createTunnelInterface(node, VXLAN, VXLAN);
351 }
352
353 /**
354 * Creates a GRE tunnel interface in a given kubevirt node.
355 *
356 * @param node kubevirt node
357 */
358 private void createGreTunnelInterface(KubevirtNode node) {
359 createTunnelInterface(node, GRE, GRE);
360 }
361
362 /**
363 * Creates a GENEVE tunnel interface in a given kubevirt node.
364 *
365 * @param node kubevirt node
366 */
367 private void createGeneveTunnelInterface(KubevirtNode node) {
368 createTunnelInterface(node, GENEVE, GENEVE);
369 }
370
Jian Li4b3436a2022-03-23 13:07:19 +0900371 private void createSttTunnelInterface(KubevirtNode node) {
372 createTunnelInterface(node, STT, STT);
373 }
374
Jian Li4fe40e52021-01-06 03:29:58 +0900375 /**
376 * Creates a tunnel interface in a given kubernetes node.
377 *
378 * @param node kubevirt node
Jian Li4b249702021-02-19 18:13:10 +0900379 * @param type kubevirt type
380 * @param intfName tunnel interface name
Jian Li4fe40e52021-01-06 03:29:58 +0900381 */
382 private void createTunnelInterface(KubevirtNode node,
383 String type, String intfName) {
384 if (isIntfEnabled(node, intfName)) {
385 return;
386 }
387
388 Device device = deviceService.getDevice(node.ovsdb());
389 if (device == null || !device.is(InterfaceConfig.class)) {
390 log.error("Failed to create tunnel interface on {}", node.ovsdb());
391 return;
392 }
393
394 TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
395
396 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
397 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
398 }
399
400 /**
401 * Builds tunnel description according to the network type.
402 *
403 * @param type network type
Jian Li4b249702021-02-19 18:13:10 +0900404 * @param intfName tunnel interface
Jian Li4fe40e52021-01-06 03:29:58 +0900405 * @return tunnel description
406 */
407 private TunnelDescription buildTunnelDesc(String type, String intfName) {
408 TunnelKey<String> key = new TunnelKey<>(FLOW_KEY);
Jian Li4b3436a2022-03-23 13:07:19 +0900409 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type) || STT.equals(type)) {
Jian Li4fe40e52021-01-06 03:29:58 +0900410 TunnelDescription.Builder tdBuilder =
411 DefaultTunnelDescription.builder()
412 .deviceId(TUNNEL_BRIDGE)
413 .ifaceName(intfName)
414 .remote(TunnelEndPoints.flowTunnelEndpoint())
415 .key(key);
416
417 switch (type) {
418 case VXLAN:
419 tdBuilder.type(TunnelDescription.Type.VXLAN);
420 break;
421 case GRE:
422 tdBuilder.type(TunnelDescription.Type.GRE);
423 break;
424 case GENEVE:
425 tdBuilder.type(TunnelDescription.Type.GENEVE);
426 break;
Jian Li4b3436a2022-03-23 13:07:19 +0900427 case STT:
428 tdBuilder.type(TunnelDescription.Type.STT);
429 break;
Jian Li4fe40e52021-01-06 03:29:58 +0900430 default:
431 return null;
432 }
433
434 return tdBuilder.build();
435 }
436 return null;
437 }
438
439 /**
440 * Checks whether a given network interface in a given kubernetes node
441 * is enabled or not.
442 *
443 * @param node kubevirt node
444 * @param intf network interface name
445 * @return true if the given interface is enabled, false otherwise
446 */
447 private boolean isIntfEnabled(KubevirtNode node, String intf) {
448 return deviceService.isAvailable(node.tunBridge()) &&
449 deviceService.getPorts(node.tunBridge()).stream()
450 .anyMatch(port -> Objects.equals(
451 port.annotations().value(PORT_NAME), intf) &&
452 port.isEnabled());
453 }
454
Jian Li4fe40e52021-01-06 03:29:58 +0900455 /**
456 * Bootstraps a new kubevirt node.
457 *
458 * @param node kubevirt node
459 */
460 private void bootstrapNode(KubevirtNode node) {
461 if (isCurrentStateDone(node)) {
462 setState(node, node.state().nextState());
463 } else {
464 log.trace("Processing {} state for {}", node.state(), node.hostname());
465 node.state().process(this, node);
466 }
467 }
468
469 /**
470 * Removes the existing kubevirt node.
471 *
472 * @param node kubevirt node
473 */
474 private void removeNode(KubevirtNode node) {
475 OvsdbClientService client = getOvsdbClient(node, ovsdbPortNum, ovsdbController);
476 if (client == null) {
477 log.info("Failed to get ovsdb client");
478 return;
479 }
480
Daniel Parka5ba88d2021-05-28 15:46:46 +0900481 // purges all the flow rules installed on the node
482 flowRuleService.purgeFlowRules(node.intgBridge());
483 flowRuleService.purgeFlowRules(node.tunBridge());
484
Jian Li4fe40e52021-01-06 03:29:58 +0900485 // unprovision physical interfaces from the node
486 // this procedure includes detaching physical port from physical bridge,
487 // remove patch ports from br-int, removing physical bridge
488 unprovisionPhysicalInterfaces(node);
489
490 // delete tunnel bridge from the node
491 client.dropBridge(TUNNEL_BRIDGE);
492
493 // delete integration bridge from the node
494 client.dropBridge(INTEGRATION_BRIDGE);
495 }
496
497 /**
498 * Checks whether all requirements for this state are fulfilled or not.
499 *
500 * @param node kubevirt node
501 * @return true if all requirements are fulfilled, false otherwise
502 */
503 private boolean isCurrentStateDone(KubevirtNode node) {
504 switch (node.state()) {
505 case INIT:
506 return isInitStateDone(node);
507 case DEVICE_CREATED:
508 return isDeviceCreatedStateDone(node);
509 case COMPLETE:
510 case INCOMPLETE:
511 case ON_BOARDED:
512 // always return false
513 // run init CLI to re-trigger node bootstrap
514 return false;
515 default:
516 return true;
517 }
518 }
519
520 private boolean isInitStateDone(KubevirtNode node) {
521 if (!isOvsdbConnected(node, ovsdbPortNum,
522 ovsdbController, deviceService)) {
523 return false;
524 }
525
526 try {
527 // we need to wait a while, in case interfaces and bridges
528 // creation requires some time
529 sleep(SLEEP_SHORT_MS);
530 } catch (InterruptedException e) {
531 log.error("Exception caused during init state checking...");
532 }
533
534 cleanPhysicalInterfaces(node);
535
Jian Li4b249702021-02-19 18:13:10 +0900536 // provision new physical interfaces on the given node
537 // this includes creating physical bridge, attaching physical port
538 // to physical bridge, adding patch ports to both physical bridge and br-int
539 provisionPhysicalInterfaces(node);
540
Daniel Parkf3136042021-03-10 07:49:11 +0900541 if (node.type() == GATEWAY) {
542 createPatchInterfaceBetweenBrIntBrTun(node);
543 }
544
Jian Li4fe40e52021-01-06 03:29:58 +0900545 return node.intgBridge() != null && node.tunBridge() != null &&
546 deviceService.isAvailable(node.intgBridge()) &&
547 deviceService.isAvailable(node.tunBridge());
548 }
549
550 private boolean isDeviceCreatedStateDone(KubevirtNode node) {
551
552 try {
553 // we need to wait a while, in case tunneling ports
554 // creation requires some time
Jian Li4b249702021-02-19 18:13:10 +0900555 sleep(SLEEP_MID_MS);
Jian Li4fe40e52021-01-06 03:29:58 +0900556 } catch (InterruptedException e) {
557 log.error("Exception caused during init state checking...");
558 }
559
560 if (node.dataIp() != null && !isIntfEnabled(node, VXLAN)) {
Jian Li4b249702021-02-19 18:13:10 +0900561 log.warn("VXLAN interface is not enabled!");
Jian Li4fe40e52021-01-06 03:29:58 +0900562 return false;
563 }
564 if (node.dataIp() != null && !isIntfEnabled(node, GRE)) {
Jian Li4b249702021-02-19 18:13:10 +0900565 log.warn("GRE interface is not enabled!");
Jian Li4fe40e52021-01-06 03:29:58 +0900566 return false;
567 }
568 if (node.dataIp() != null && !isIntfEnabled(node, GENEVE)) {
Jian Li4b249702021-02-19 18:13:10 +0900569 log.warn("GENEVE interface is not enabled!");
Jian Li4fe40e52021-01-06 03:29:58 +0900570 return false;
571 }
572
Jian Li4b3436a2022-03-23 13:07:19 +0900573 // we make the STT tunnel port check optional
574
Jian Li4fe40e52021-01-06 03:29:58 +0900575 for (KubevirtPhyInterface phyIntf : node.phyIntfs()) {
576 if (phyIntf == null) {
Jian Li4b249702021-02-19 18:13:10 +0900577 log.warn("Physnet interface is invalid");
Jian Li4fe40e52021-01-06 03:29:58 +0900578 return false;
579 }
580
Jian Li4b249702021-02-19 18:13:10 +0900581 try {
582 // we need to wait a while, in case tunneling ports
583 // creation requires some time
584 sleep(SLEEP_LONG_MS);
585 } catch (InterruptedException e) {
586 log.error("Exception caused during init state checking...");
587 }
588
Jian Li4fe40e52021-01-06 03:29:58 +0900589 String patchPortName = structurePortName(
590 INTEGRATION_TO_PHYSICAL_PREFIX + phyIntf.network());
591
Daniel Park17fe7982022-04-04 17:48:01 +0900592 if (node.type() == WORKER) {
593 String bridgeName = BRIDGE_PREFIX + phyIntf.network();
594 if (!(hasPhyBridge(node, bridgeName) &&
595 hasPhyPatchPort(node, patchPortName) &&
596 hasPhyIntf(node, phyIntf.intf()))) {
597 log.warn("PhyBridge {}", hasPhyBridge(node, bridgeName));
598 log.warn("hasPhyPatchPort {}", hasPhyPatchPort(node, patchPortName));
599 log.warn("hasPhyIntf {}", hasPhyIntf(node, phyIntf.intf()));
600 return false;
601 }
602 } else {
603 //In case node type is GATEWAY, we create physical bridges connected to the contoller.
604 //By doing so, ONOS immediately recognizes the status of physical interface and performs RM procedures.
605 Port phyIntfPort = deviceService.getPorts(phyIntf.physBridge()).stream()
606 .filter(port -> port.annotations().value(PORT_NAME).equals(phyIntf.intf()))
607 .findAny().orElse(null);
608 if (phyIntfPort == null) {
609 log.warn("There's no connected physical port {} on physical device {}",
610 phyIntf.intf(), phyIntf.physBridge());
611 }
612
613 if (!(deviceService.isAvailable(phyIntf.physBridge()) &&
614 hasPhyPatchPort(node, patchPortName) &&
615 hasPhyIntf(node, phyIntf.intf()) &&
616 phyIntfPort.isEnabled())) {
617 log.warn("PhysBridge {}", deviceService.isAvailable(phyIntf.physBridge()));
618 log.warn("hasPhyPatchPort {}", hasPhyPatchPort(node, patchPortName));
619 log.warn("hasPhyIntf {}", hasPhyIntf(node, phyIntf.intf()));
620 log.warn("physical interface port {}", phyIntfPort.isEnabled());
621 return false;
622 }
Jian Li4fe40e52021-01-06 03:29:58 +0900623 }
Daniel Park17fe7982022-04-04 17:48:01 +0900624
Jian Li4fe40e52021-01-06 03:29:58 +0900625 }
626
Daniel Parkf3136042021-03-10 07:49:11 +0900627 if (node.type() == GATEWAY) {
628 if (!(hasPhyIntf(node, INTEGRATION_TO_TUNNEL) &&
629 hasPhyIntf(node, TUNNEL_TO_INTEGRATION))) {
630 log.warn("IntToTunPort {}", hasPhyIntf(node, INTEGRATION_TO_TUNNEL));
631 log.warn("TunToIntPort {}", hasPhyIntf(node, TUNNEL_TO_INTEGRATION));
632 return false;
633 }
634 }
Jian Li4fe40e52021-01-06 03:29:58 +0900635 return true;
636 }
637
Daniel Park17fe7982022-04-04 17:48:01 +0900638 private boolean hasPhyBridge(KubevirtNode node, String bridgeName) {
639 BridgeConfig bridgeConfig =
640 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
641 return bridgeConfig.getBridges().stream()
642 .anyMatch(br -> br.name().equals(bridgeName));
643 }
Jian Li4fe40e52021-01-06 03:29:58 +0900644 /**
645 * Configures the kubernetes node with new state.
646 *
647 * @param node kubevirt node
648 * @param newState a new state
649 */
650 private void setState(KubevirtNode node, KubevirtNodeState newState) {
651 if (node.state() == newState) {
652 return;
653 }
654 KubevirtNode updated = node.updateState(newState);
655 nodeAdminService.updateNode(updated);
656 log.info("Changed {} state: {}", node.hostname(), newState);
657 }
658
659 private void provisionPhysicalInterfaces(KubevirtNode node) {
660 node.phyIntfs().forEach(pi -> {
661 String bridgeName = BRIDGE_PREFIX + pi.network();
662 String patchPortName =
663 structurePortName(INTEGRATION_TO_PHYSICAL_PREFIX + pi.network());
Daniel Park17fe7982022-04-04 17:48:01 +0900664 if (node.type() == WORKER && !hasPhyBridge(node, bridgeName)) {
Jian Li4fe40e52021-01-06 03:29:58 +0900665 createPhysicalBridge(node, pi);
666 createPhysicalPatchPorts(node, pi);
667 attachPhysicalPort(node, pi);
Jian Li4b249702021-02-19 18:13:10 +0900668
Daniel Park17fe7982022-04-04 17:48:01 +0900669 log.info("Creating physnet bridge {} for worker node {}", bridgeName, node.hostname());
670 log.info("Creating patch ports for physnet {} for worker node {}", bridgeName, node.hostname());
671
672 } else if (node.type() == GATEWAY && (!deviceService.isAvailable(pi.physBridge()))) {
673 createPhysicalBridgeWithConnectedMode(node, pi);
674 createPhysicalPatchPorts(node, pi);
675 attachPhysicalPort(node, pi);
676
677 log.info("Creating physnet bridge {} for gateway node {}", bridgeName, node.hostname());
678 log.info("Creating patch ports for physnet {} for gateway node {}", bridgeName, node.hostname());
Jian Li4fe40e52021-01-06 03:29:58 +0900679 } else {
Jian Li517597a2021-03-22 11:04:52 +0900680 // in case physical bridge exists, but patch port is missing,
681 // we will add patch port to connect br-physnet with physical bridge
Jian Li4fe40e52021-01-06 03:29:58 +0900682 if (!hasPhyPatchPort(node, patchPortName)) {
683 createPhysicalPatchPorts(node, pi);
Jian Li4b249702021-02-19 18:13:10 +0900684
685 log.info("Creating patch ports for physnet {}", bridgeName);
Jian Li4fe40e52021-01-06 03:29:58 +0900686 }
Jian Li517597a2021-03-22 11:04:52 +0900687
688 // in case physical bridge exists, but physnet interface is missing,
689 // we will add the physnet interface to connect br-physnet to the external
690 if (!hasPhyIntf(node, pi.intf())) {
691 attachPhysicalPort(node, pi);
692
693 log.info("Attaching external ports for physnet {}", bridgeName);
694 }
Jian Li4fe40e52021-01-06 03:29:58 +0900695 }
696 });
697 }
698
699 private void cleanPhysicalInterfaces(KubevirtNode node) {
700 Device device = deviceService.getDevice(node.ovsdb());
701
702 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
703
704 Set<String> bridgeNames = bridgeConfig.getBridges().stream()
705 .map(BridgeDescription::name).collect(Collectors.toSet());
706
707 Set<String> phyNetworkNames = node.phyIntfs().stream()
708 .map(pi -> BRIDGE_PREFIX + pi.network()).collect(Collectors.toSet());
709
710 // we remove existing physical bridges and patch ports, if the physical
711 // bridges are not defined in kubevirt node
712 for (String brName : bridgeNames) {
Jian Li4b249702021-02-19 18:13:10 +0900713 // integration bridge and tunnel bridge should NOT be treated as
714 // physical bridges
715 if (brName.equals(INTEGRATION_BRIDGE) ||
716 brName.equals(TUNNEL_BRIDGE) ||
717 brName.startsWith(TENANT_BRIDGE_PREFIX)) {
718 continue;
719 }
720
Jian Li4fe40e52021-01-06 03:29:58 +0900721 if (!phyNetworkNames.contains(brName)) {
Jian Li4fe40e52021-01-06 03:29:58 +0900722 removePhysicalPatchPorts(node, brName.substring(NETWORK_BEGIN));
723 removePhysicalBridge(node, brName.substring(NETWORK_BEGIN));
Jian Li4b249702021-02-19 18:13:10 +0900724 log.info("Removing physical bridge {}...", brName);
Jian Li4fe40e52021-01-06 03:29:58 +0900725 }
726 }
727 }
728
Daniel Parkf3136042021-03-10 07:49:11 +0900729
730 private void createPatchInterfaceBetweenBrIntBrTun(KubevirtNode node) {
731 Device device = deviceService.getDevice(node.ovsdb());
732
733 if (device == null || !device.is(InterfaceConfig.class)) {
734 log.error("Failed to create patch interface on {}", node.ovsdb());
735 return;
736 }
737
738 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
739
740 // int bridge -> tunnel bridge
741 PatchDescription brIntTunPatchDesc =
742 DefaultPatchDescription.builder()
743 .deviceId(INTEGRATION_BRIDGE)
744 .ifaceName(INTEGRATION_TO_TUNNEL)
745 .peer(TUNNEL_TO_INTEGRATION)
746 .build();
747
748 ifaceConfig.addPatchMode(INTEGRATION_TO_TUNNEL, brIntTunPatchDesc);
749
750 // tunnel bridge -> int bridge
751 PatchDescription brTunIntPatchDesc =
752 DefaultPatchDescription.builder()
753 .deviceId(TUNNEL_BRIDGE)
754 .ifaceName(TUNNEL_TO_INTEGRATION)
755 .peer(INTEGRATION_TO_TUNNEL)
756 .build();
757
758 ifaceConfig.addPatchMode(TUNNEL_TO_INTEGRATION, brTunIntPatchDesc);
759 }
760
Jian Li4fe40e52021-01-06 03:29:58 +0900761 private void unprovisionPhysicalInterfaces(KubevirtNode node) {
762 node.phyIntfs().forEach(pi -> {
763 detachPhysicalPort(node, pi.network(), pi.intf());
764 removePhysicalPatchPorts(node, pi.network());
765 removePhysicalBridge(node, pi.network());
766 });
767 }
768
Jian Li4fe40e52021-01-06 03:29:58 +0900769 private boolean hasPhyPatchPort(KubevirtNode node, String patchPortName) {
770 List<Port> ports = deviceService.getPorts(node.intgBridge());
771 return ports.stream().anyMatch(p ->
772 p.annotations().value(PORT_NAME).equals(patchPortName));
773 }
774
775 private boolean hasPhyIntf(KubevirtNode node, String intfName) {
776 BridgeConfig bridgeConfig =
777 deviceService.getDevice(node.ovsdb()).as(BridgeConfig.class);
778 return bridgeConfig.getPorts().stream()
779 .anyMatch(p -> p.annotations().value(PORT_NAME).equals(intfName));
780 }
781
782 private void createPhysicalBridge(KubevirtNode osNode,
783 KubevirtPhyInterface phyInterface) {
784 Device device = deviceService.getDevice(osNode.ovsdb());
785
786 String bridgeName = BRIDGE_PREFIX + phyInterface.network();
787
788 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
789 .name(bridgeName)
790 .mcastSnoopingEnable();
791
792 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
793 bridgeConfig.addBridge(builder.build());
794 }
795
Daniel Park17fe7982022-04-04 17:48:01 +0900796 private void createPhysicalBridgeWithConnectedMode(KubevirtNode osNode,
797 KubevirtPhyInterface phyInterface) {
798 Device device = deviceService.getDevice(osNode.ovsdb());
799 IpAddress controllerIp = apiConfigService.apiConfig().controllerIp();
800 String serviceFqdn = apiConfigService.apiConfig().serviceFqdn();
801 IpAddress serviceIp = null;
802
803 if (controllerIp == null) {
804 if (serviceFqdn != null) {
805 serviceIp = resolveHostname(serviceFqdn);
806 }
807
808 if (serviceIp != null) {
809 controllerIp = serviceIp;
810 } else {
811 controllerIp = apiConfigService.apiConfig().ipAddress();
812 }
813 }
814
815 ControllerInfo controlInfo = new ControllerInfo(controllerIp, DEFAULT_OFPORT, DEFAULT_OF_PROTO);
816 List<ControllerInfo> controllers = Lists.newArrayList(controlInfo);
817
818 String dpid = phyInterface.physBridge().toString().substring(DPID_BEGIN);
819
820 String bridgeName = BRIDGE_PREFIX + phyInterface.network();
821
822 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
823 .name(bridgeName)
824 .failMode(BridgeDescription.FailMode.SECURE)
825 .datapathId(dpid)
826 .mcastSnoopingEnable()
827 .disableInBand()
828 .controllers(controllers);
829
830 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
831 bridgeConfig.addBridge(builder.build());
832 }
833
Jian Li4fe40e52021-01-06 03:29:58 +0900834 private void removePhysicalBridge(KubevirtNode node, String network) {
835 Device device = deviceService.getDevice(node.ovsdb());
836
837 BridgeName bridgeName = BridgeName.bridgeName(BRIDGE_PREFIX + network);
838
839 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
840 bridgeConfig.deleteBridge(bridgeName);
841 }
842
843 private void createPhysicalPatchPorts(KubevirtNode node,
844 KubevirtPhyInterface phyInterface) {
845 Device device = deviceService.getDevice(node.ovsdb());
846
847 if (device == null || !device.is(InterfaceConfig.class)) {
848 log.error("Failed to create patch interface on {}", node.ovsdb());
849 return;
850 }
851
852 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
853
854 String intToPhyPatchPort = structurePortName(
855 INTEGRATION_TO_PHYSICAL_PREFIX + phyInterface.network());
856 String phyToIntPatchPort = structurePortName(
857 phyInterface.network() + PHYSICAL_TO_INTEGRATION_SUFFIX);
858
859 // integration bridge -> physical bridge
860 PatchDescription intToPhyPatchDesc =
861 DefaultPatchDescription.builder()
862 .deviceId(INTEGRATION_BRIDGE)
863 .ifaceName(intToPhyPatchPort)
864 .peer(phyToIntPatchPort)
865 .build();
866
867 // physical bridge -> integration bridge
868 PatchDescription phyToIntPatchDesc =
869 DefaultPatchDescription.builder()
870 .deviceId(physicalDeviceId)
871 .ifaceName(phyToIntPatchPort)
872 .peer(intToPhyPatchPort)
873 .build();
874
875 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
876 ifaceConfig.addPatchMode(INTEGRATION_TO_PHYSICAL_PREFIX +
877 phyInterface.network(), intToPhyPatchDesc);
878 ifaceConfig.addPatchMode(phyInterface.network() +
879 PHYSICAL_TO_INTEGRATION_SUFFIX, phyToIntPatchDesc);
880
881 addOrRemoveSystemInterface(node, physicalDeviceId,
882 phyInterface.intf(), deviceService, true);
883 }
884
885 private void removePhysicalPatchPorts(KubevirtNode node, String network) {
886 Device device = deviceService.getDevice(node.ovsdb());
887
888 if (device == null || !device.is(InterfaceConfig.class)) {
889 log.error("Failed to remove patch interface on {}", node.ovsdb());
890 return;
891 }
892
893 String intToPhyPatchPort = structurePortName(
894 INTEGRATION_TO_PHYSICAL_PREFIX + network);
895
896 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
897 ifaceConfig.removePatchMode(intToPhyPatchPort);
898 }
899
900 private void attachPhysicalPort(KubevirtNode node,
901 KubevirtPhyInterface phyInterface) {
902
903 String physicalDeviceId = BRIDGE_PREFIX + phyInterface.network();
904
905 addOrRemoveSystemInterface(node, physicalDeviceId,
906 phyInterface.intf(), deviceService, true);
907 }
908
909 private void detachPhysicalPort(KubevirtNode node, String network, String portName) {
910 String physicalDeviceId = BRIDGE_PREFIX + network;
911
912 addOrRemoveSystemInterface(node, physicalDeviceId, portName, deviceService, false);
913 }
914
Daniel Park17fe7982022-04-04 17:48:01 +0900915 private KubevirtNode nodeByTunOrPhyBridge(DeviceId deviceId) {
916 KubevirtNode node = nodeAdminService.nodeByTunBridge(deviceId);
917 if (node == null) {
918 node = nodeAdminService.nodeByPhyBridge(deviceId);
919 if (node == null) {
920 return null;
921 }
922 }
923 return node;
924 }
925
Jian Li4fe40e52021-01-06 03:29:58 +0900926 /**
927 * An internal OVSDB listener. This listener is used for listening the
928 * network facing events from OVSDB device. If a new OVSDB device is detected,
929 * ONOS tries to bootstrap the kubernetes node.
930 */
931 private class InternalOvsdbListener implements DeviceListener {
932
933 @Override
934 public boolean isRelevant(DeviceEvent event) {
935 return event.subject().type() == Device.Type.CONTROLLER;
936 }
937
938 private boolean isRelevantHelper() {
939 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
940 }
941
942 @Override
943 public void event(DeviceEvent event) {
944 Device device = event.subject();
945
946 switch (event.type()) {
947 case DEVICE_AVAILABILITY_CHANGED:
948 case DEVICE_ADDED:
949 eventExecutor.execute(() -> {
950
951 if (!isRelevantHelper()) {
952 return;
953 }
954
955 KubevirtNode node = nodeAdminService.node(device.id());
956
957 if (node == null) {
958 return;
959 }
960
961 if (deviceService.isAvailable(device.id())) {
962 log.debug("OVSDB {} detected", device.id());
963 bootstrapNode(node);
964 }
965 });
966 break;
967 case PORT_ADDED:
968 case PORT_REMOVED:
969 case DEVICE_REMOVED:
970 default:
971 // do nothing
972 break;
973 }
974 }
975 }
976
977 /**
978 * An internal integration bridge listener. This listener is used for
979 * listening the events from integration bridge. To listen the events from
980 * other types of bridge such as provider bridge or tunnel bridge, we need
981 * to augment KubevirtNodeService.node() method.
982 */
983 private class InternalBridgeListener implements DeviceListener {
984
985 @Override
986 public boolean isRelevant(DeviceEvent event) {
987 return event.subject().type() == Device.Type.SWITCH;
988 }
989
990 private boolean isRelevantHelper() {
991 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
992 }
993
994 @Override
995 public void event(DeviceEvent event) {
996 Device device = event.subject();
997 Port port = event.port();
998
999 switch (event.type()) {
1000 case DEVICE_AVAILABILITY_CHANGED:
1001 case DEVICE_ADDED:
1002 eventExecutor.execute(() -> processDeviceAddition(device));
1003 break;
1004 case PORT_UPDATED:
1005 case PORT_ADDED:
Daniel Park17fe7982022-04-04 17:48:01 +09001006 eventExecutor.execute(() -> processPortAdditionOrUpdate(device, port));
Jian Li4fe40e52021-01-06 03:29:58 +09001007 break;
1008 case PORT_REMOVED:
1009 eventExecutor.execute(() -> processPortRemoval(device, port));
1010 break;
1011 case DEVICE_REMOVED:
1012 default:
1013 // do nothing
1014 break;
1015 }
1016 }
1017
1018 void processDeviceAddition(Device device) {
1019 if (!isRelevantHelper()) {
1020 return;
1021 }
1022
1023 KubevirtNode node = nodeAdminService.node(device.id());
1024
1025 if (node == null) {
1026 return;
1027 }
1028
1029 if (deviceService.isAvailable(device.id())) {
1030 log.debug("Bridge created on {}", node.hostname());
1031 bootstrapNode(node);
1032 } else if (node.state() == COMPLETE) {
1033 log.info("Device {} disconnected", device.id());
1034 setState(node, INCOMPLETE);
1035 }
1036
1037 if (autoRecovery) {
1038 if (node.state() == INCOMPLETE || node.state() == DEVICE_CREATED) {
1039 log.info("Device {} is reconnected", device.id());
1040 nodeAdminService.updateNode(node.updateState(INIT));
1041 }
1042 }
1043 }
1044
Daniel Park17fe7982022-04-04 17:48:01 +09001045 void processPortAdditionOrUpdate(Device device, Port port) {
Jian Li4fe40e52021-01-06 03:29:58 +09001046 if (!isRelevantHelper()) {
1047 return;
1048 }
1049
Daniel Park17fe7982022-04-04 17:48:01 +09001050 KubevirtNode node = nodeByTunOrPhyBridge(device.id());
Jian Li4fe40e52021-01-06 03:29:58 +09001051
1052 if (node == null) {
1053 return;
1054 }
1055
1056 String portName = port.annotations().value(PORT_NAME);
1057 if (node.state() == DEVICE_CREATED && (
1058 Objects.equals(portName, VXLAN) ||
1059 Objects.equals(portName, GRE) ||
Jian Li4b3436a2022-03-23 13:07:19 +09001060 Objects.equals(portName, GENEVE) ||
1061 Objects.equals(portName, STT))) {
Jian Li4fe40e52021-01-06 03:29:58 +09001062 log.info("Interface {} added or updated to {}",
1063 portName, device.id());
1064 bootstrapNode(node);
1065 }
Daniel Park17fe7982022-04-04 17:48:01 +09001066
1067 //When the physical port is down, in the middle of normal operation, we set the node state to INCOMPLTE
1068 //so that respective handlers do their related jobs.
1069 if (node.state() == COMPLETE && node.type().equals(GATEWAY) && !port.isEnabled()) {
1070 node.phyIntfs().stream()
1071 .filter(pi -> pi.intf().equals(portName))
1072 .findAny()
1073 .ifPresent(pi -> setState(node, INCOMPLETE));
1074 }
1075
1076 //When the physical port up again, we set the node state to INIT
1077 //so that respective handlers do their related jobs.
1078 if (node.state() == INCOMPLETE && node.type().equals(GATEWAY) && port.isEnabled()) {
1079 node.phyIntfs().stream()
1080 .filter(pi -> pi.intf().equals(portName))
1081 .findAny()
1082 .ifPresent(pi -> setState(node, INIT));
1083 }
Jian Li4fe40e52021-01-06 03:29:58 +09001084 }
1085
1086 void processPortRemoval(Device device, Port port) {
1087 if (!isRelevantHelper()) {
1088 return;
1089 }
1090
1091 KubevirtNode node = nodeAdminService.node(device.id());
1092
1093 if (node == null) {
1094 return;
1095 }
1096
1097 String portName = port.annotations().value(PORT_NAME);
1098 if (node.state() == COMPLETE && (
1099 Objects.equals(portName, VXLAN) ||
1100 Objects.equals(portName, GRE) ||
Jian Li4b3436a2022-03-23 13:07:19 +09001101 Objects.equals(portName, GENEVE) ||
1102 Objects.equals(portName, STT))) {
Jian Li4fe40e52021-01-06 03:29:58 +09001103 log.warn("Interface {} removed from {}", portName, device.id());
1104 setState(node, INCOMPLETE);
1105 }
1106 }
1107 }
1108
1109 /**
1110 * An internal kubevirt node listener.
1111 * The notification is triggered by KubevirtNodeStore.
1112 */
1113 private class InternalKubevirtNodeListener implements KubevirtNodeListener {
1114
1115 private boolean isRelevantHelper() {
1116 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
1117 }
1118
1119 @Override
1120 public void event(KubevirtNodeEvent event) {
1121 switch (event.type()) {
1122 case KUBEVIRT_NODE_CREATED:
1123 case KUBEVIRT_NODE_UPDATED:
1124 eventExecutor.execute(() -> {
1125 if (!isRelevantHelper()) {
1126 return;
1127 }
Jian Li517597a2021-03-22 11:04:52 +09001128 if (event.subject() == null) {
1129 return;
1130 }
Jian Li4fe40e52021-01-06 03:29:58 +09001131 bootstrapNode(event.subject());
1132 });
1133 break;
1134 case KUBEVIRT_NODE_REMOVED:
1135 eventExecutor.execute(() -> {
1136 if (!isRelevantHelper()) {
1137 return;
1138 }
1139 removeNode(event.subject());
1140 });
1141 break;
1142 case KUBEVIRT_NODE_INCOMPLETE:
1143 default:
1144 break;
1145 }
1146 }
1147 }
1148}