blob: 3827e2158d87960d10e2507d8994624d18b123d7 [file] [log] [blame]
Jian Li077b07e2020-09-01 16:55:25 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import org.onosproject.cluster.ClusterService;
19import org.onosproject.cluster.LeadershipService;
20import org.onosproject.cluster.NodeId;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
Jian Li019ce6a2020-09-09 10:23:21 +090023import org.onosproject.k8snode.api.K8sBridge;
Jian Li077b07e2020-09-01 16:55:25 +090024import org.onosproject.k8snode.api.K8sHost;
25import org.onosproject.k8snode.api.K8sHostAdminService;
26import org.onosproject.k8snode.api.K8sHostEvent;
27import org.onosproject.k8snode.api.K8sHostHandler;
28import org.onosproject.k8snode.api.K8sHostListener;
29import org.onosproject.k8snode.api.K8sHostState;
30import org.onosproject.k8snode.api.K8sNode;
31import org.onosproject.k8snode.api.K8sNodeAdminService;
Jian Li019ce6a2020-09-09 10:23:21 +090032import org.onosproject.k8snode.api.K8sRouterBridge;
Jian Li077b07e2020-09-01 16:55:25 +090033import org.onosproject.k8snode.api.K8sTunnelBridge;
34import org.onosproject.net.Device;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.Port;
37import org.onosproject.net.behaviour.BridgeConfig;
38import org.onosproject.net.behaviour.BridgeDescription;
39import org.onosproject.net.behaviour.ControllerInfo;
40import org.onosproject.net.behaviour.DefaultBridgeDescription;
41import org.onosproject.net.behaviour.DefaultPatchDescription;
42import org.onosproject.net.behaviour.DefaultTunnelDescription;
43import org.onosproject.net.behaviour.InterfaceConfig;
44import org.onosproject.net.behaviour.PatchDescription;
45import org.onosproject.net.behaviour.TunnelDescription;
46import org.onosproject.net.behaviour.TunnelEndPoints;
47import org.onosproject.net.behaviour.TunnelKey;
48import org.onosproject.net.device.DeviceAdminService;
49import org.onosproject.net.device.DeviceEvent;
50import org.onosproject.net.device.DeviceListener;
51import org.onosproject.net.device.DeviceService;
52import org.onosproject.ovsdb.controller.OvsdbClientService;
53import org.onosproject.ovsdb.controller.OvsdbController;
54import org.onosproject.ovsdb.controller.OvsdbNodeId;
55import org.osgi.service.component.annotations.Activate;
56import org.osgi.service.component.annotations.Component;
57import org.osgi.service.component.annotations.Deactivate;
58import org.osgi.service.component.annotations.Reference;
59import org.osgi.service.component.annotations.ReferenceCardinality;
60import org.slf4j.Logger;
61
62import java.util.List;
63import java.util.Objects;
64import java.util.concurrent.ExecutorService;
65import java.util.stream.Collectors;
66
67import static java.lang.Thread.sleep;
68import static java.util.concurrent.Executors.newSingleThreadExecutor;
69import static org.onlab.packet.TpPort.tpPort;
70import static org.onlab.util.Tools.groupedThreads;
71import static org.onosproject.k8snode.api.Constants.GENEVE;
72import static org.onosproject.k8snode.api.Constants.GRE;
Jian Li019ce6a2020-09-09 10:23:21 +090073import static org.onosproject.k8snode.api.Constants.OS_INTEGRATION_BRIDGE;
Jian Li077b07e2020-09-01 16:55:25 +090074import static org.onosproject.k8snode.api.Constants.VXLAN;
75import static org.onosproject.k8snode.api.K8sHostState.COMPLETE;
76import static org.onosproject.k8snode.api.K8sHostState.DEVICE_CREATED;
77import static org.onosproject.k8snode.api.K8sHostState.INCOMPLETE;
78import static org.onosproject.k8snode.api.K8sHostState.INIT;
79import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
80import static org.onosproject.net.AnnotationKeys.PORT_NAME;
81import static org.slf4j.LoggerFactory.getLogger;
82
83/**
84 * Service bootstraps kubernetes host.
85 */
86@Component(immediate = true)
87public class DefaultK8sHostHandler implements K8sHostHandler {
88
89 private final Logger log = getLogger(getClass());
90
91 private static final String DEFAULT_OF_PROTO = "tcp";
92 private static final int DEFAULT_OFPORT = 6653;
93 private static final int DPID_BEGIN = 3;
94 private static final long SLEEP_MS = 3000; // we wait 3s
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected CoreService coreService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected LeadershipService leadershipService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected ClusterService clusterService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected DeviceService deviceService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected DeviceAdminService deviceAdminService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected OvsdbController ovsdbController;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected K8sHostAdminService k8sHostAdminService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected K8sNodeAdminService k8sNodeAdminService;
119
120
121 private int ovsdbPortNum = 6640;
122
123 private final ExecutorService eventExecutor = newSingleThreadExecutor(
124 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
125
126 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
127 private final DeviceListener bridgeListener = new InternalBridgeListener();
128 private final K8sHostListener k8sHostListener = new InternalK8sHostListener();
129
130 private ApplicationId appId;
131 private NodeId localNode;
132
133 @Activate
134 protected void activate() {
135 appId = coreService.getAppId(APP_ID);
136 localNode = clusterService.getLocalNode().id();
137
138 leadershipService.runForLeadership(appId.name());
139 deviceService.addListener(ovsdbListener);
140 deviceService.addListener(bridgeListener);
141 k8sHostAdminService.addListener(k8sHostListener);
142
143 log.info("Started");
144 }
145
146 @Deactivate
147 protected void deactivate() {
148 k8sHostAdminService.removeListener(k8sHostListener);
149 deviceService.removeListener(bridgeListener);
150 deviceService.removeListener(ovsdbListener);
151 leadershipService.withdraw(appId.name());
152 eventExecutor.shutdown();
153
154 log.info("Stopped");
155 }
156
157 @Override
158 public void processInitState(K8sHost k8sHost) {
159 if (!isOvsdbConnected(k8sHost, ovsdbPortNum, ovsdbController, deviceService)) {
160 ovsdbController.connect(k8sHost.hostIp(), tpPort(ovsdbPortNum));
161 return;
162 }
163
164 for (K8sTunnelBridge tunBridge : k8sHost.tunBridges()) {
165 if (!deviceService.isAvailable(tunBridge.deviceId())) {
166 createBridge(k8sHost.ovsdb(), tunBridge);
167 }
168 }
Jian Li019ce6a2020-09-09 10:23:21 +0900169
170 for (K8sRouterBridge routerBridge : k8sHost.routerBridges()) {
171 if (!deviceService.isAvailable(routerBridge.deviceId())) {
172 createBridge(k8sHost.ovsdb(), routerBridge);
173 }
174 }
Jian Li077b07e2020-09-01 16:55:25 +0900175 }
176
177 @Override
178 public void processDeviceCreatedState(K8sHost k8sHost) {
179 try {
180 if (!isOvsdbConnected(k8sHost, ovsdbPortNum, ovsdbController, deviceService)) {
181 ovsdbController.connect(k8sHost.hostIp(), tpPort(ovsdbPortNum));
182 return;
183 }
184
185 // create patch ports into tunnel bridge face to integration bridge
186 for (K8sTunnelBridge bridge : k8sHost.tunBridges()) {
187 for (String node : k8sHost.nodeNames()) {
188 K8sNode k8sNode = k8sNodeAdminService.node(node);
189 if (k8sNode.segmentId() == bridge.tunnelId()) {
Jian Li019ce6a2020-09-09 10:23:21 +0900190 createTunnelPatchInterfaces(k8sHost.ovsdb(), bridge, k8sNode);
191 createInterPatchInterfaces(k8sHost.ovsdb(), k8sNode);
Jian Li077b07e2020-09-01 16:55:25 +0900192 }
193 }
194 }
195
196 // create tunnel ports
197 for (K8sTunnelBridge bridge : k8sHost.tunBridges()) {
198 if (!isTunPortEnabled(bridge, bridge.vxlanPortName())) {
199 createVxlanTunnelInterface(k8sHost.ovsdb(), bridge);
200 }
201
202 if (!isTunPortEnabled(bridge, bridge.grePortName())) {
203 createGreTunnelInterface(k8sHost.ovsdb(), bridge);
204 }
205
206 if (!isTunPortEnabled(bridge, bridge.genevePortName())) {
207 createGeneveTunnelInterface(k8sHost.ovsdb(), bridge);
208 }
209 }
Jian Li019ce6a2020-09-09 10:23:21 +0900210
211 // create patch ports into router bridge face to external bridge
212 for (K8sRouterBridge bridge : k8sHost.routerBridges()) {
213 for (String node : k8sHost.nodeNames()) {
214 K8sNode k8sNode = k8sNodeAdminService.node(node);
215 if (k8sNode.segmentId() == bridge.segmentId()) {
216 createRouterPatchInterfaces(k8sHost.ovsdb(), bridge, k8sNode);
217 }
218 }
219 }
Jian Li077b07e2020-09-01 16:55:25 +0900220 } catch (Exception e) {
221 log.error("Exception occurred because of {}", e);
222 }
223 }
224
225 @Override
226 public void processCompleteState(K8sHost k8sHost) {
227 // do something if needed
228 }
229
230 @Override
231 public void processIncompleteState(K8sHost k8sHost) {
232 // do something if needed
233 }
234
Jian Li019ce6a2020-09-09 10:23:21 +0900235 private void createBridge(DeviceId ovsdb, K8sBridge bridge) {
Jian Li077b07e2020-09-01 16:55:25 +0900236 Device device = deviceService.getDevice(ovsdb);
237
238 List<ControllerInfo> controllers = clusterService.getNodes().stream()
239 .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
240 .collect(Collectors.toList());
241
242 String dpid = bridge.dpid().substring(DPID_BEGIN);
243
244 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
245 .name(bridge.name())
246 .failMode(BridgeDescription.FailMode.SECURE)
247 .datapathId(dpid)
248 .disableInBand()
249 .controllers(controllers);
250
251 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
252 bridgeConfig.addBridge(builder.build());
253 }
254
Jian Li019ce6a2020-09-09 10:23:21 +0900255 private void createTunnelPatchInterfaces(DeviceId ovsdb, K8sBridge bridge, K8sNode k8sNode) {
Jian Li077b07e2020-09-01 16:55:25 +0900256 Device device = deviceService.getDevice(ovsdb);
257 if (device == null || !device.is(InterfaceConfig.class)) {
258 log.error("Failed to create patch interface on {}", ovsdb);
259 return;
260 }
261
262 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
263
Jian Li019ce6a2020-09-09 10:23:21 +0900264 // tunnel bridge -> k8s integration bridge
Jian Li077b07e2020-09-01 16:55:25 +0900265 PatchDescription brTunIntPatchDesc =
266 DefaultPatchDescription.builder()
267 .deviceId(bridge.name())
268 .ifaceName(k8sNode.tunToIntgPatchPortName())
269 .peer(k8sNode.intgToTunPatchPortName())
270 .build();
271
272 ifaceConfig.addPatchMode(k8sNode.tunToIntgPatchPortName(), brTunIntPatchDesc);
273 }
274
Jian Li019ce6a2020-09-09 10:23:21 +0900275 private void createInterPatchInterfaces(DeviceId ovsdb, K8sNode k8sNode) {
276 Device device = deviceService.getDevice(ovsdb);
277 if (device == null || !device.is(InterfaceConfig.class)) {
278 log.error("Failed to create patch interface on {}", ovsdb);
279 return;
280 }
281
282 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
283
284 // openstack integration bridge -> k8s integration bridge
285 PatchDescription osIntK8sIntPatchDesc =
286 DefaultPatchDescription.builder()
287 .deviceId(OS_INTEGRATION_BRIDGE)
288 .ifaceName(k8sNode.osToK8sIntgPatchPortName())
289 .peer(k8sNode.k8sToOsIntgPatchPortName())
290 .build();
291
292 ifaceConfig.addPatchMode(k8sNode.tunToIntgPatchPortName(), osIntK8sIntPatchDesc);
293 }
294
295 private void createRouterPatchInterfaces(DeviceId ovsdb, K8sBridge bridge, K8sNode k8sNode) {
296 Device device = deviceService.getDevice(ovsdb);
297 if (device == null || !device.is(InterfaceConfig.class)) {
298 log.error("Failed to create patch interface on {}", ovsdb);
299 return;
300 }
301
302 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
303
304 // router bridge -> external bridge
305 PatchDescription brRouterExtPatchDesc =
306 DefaultPatchDescription.builder()
307 .deviceId(bridge.name())
308 .ifaceName(k8sNode.routerToExtPatchPortName())
309 .peer(k8sNode.extToRouterPatchPortName())
310 .build();
311
312 ifaceConfig.addPatchMode(k8sNode.tunToIntgPatchPortName(), brRouterExtPatchDesc);
313 }
314
Jian Li077b07e2020-09-01 16:55:25 +0900315 private void createVxlanTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
316 createTunnelInterface(ovsdb, bridge, VXLAN, bridge.vxlanPortName());
317 }
318
319 private void createGreTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
320 createTunnelInterface(ovsdb, bridge, GRE, bridge.grePortName());
321 }
322
323 private void createGeneveTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
324 createTunnelInterface(ovsdb, bridge, GENEVE, bridge.genevePortName());
325 }
326
327 private void createTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge,
328 String type, String intfName) {
329 if (isTunPortEnabled(bridge, intfName)) {
330 return;
331 }
332
333 Device device = deviceService.getDevice(ovsdb);
334 if (device == null || !device.is(InterfaceConfig.class)) {
335 log.error("Failed to create tunnel interface on {}", ovsdb);
336 return;
337 }
338
339 TunnelDescription tunnelDesc = buildTunnelDesc(bridge, type, intfName);
340
341 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
342 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
343 }
344
345 private TunnelDescription buildTunnelDesc(K8sTunnelBridge bridge, String type, String intfName) {
346 TunnelKey<String> key = new TunnelKey<>(String.valueOf(bridge.tunnelId()));
347
348 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
349 TunnelDescription.Builder tdBuilder =
350 DefaultTunnelDescription.builder()
351 .deviceId(bridge.name())
352 .ifaceName(intfName)
353 .remote(TunnelEndPoints.flowTunnelEndpoint())
354 .key(key);
355
356 switch (type) {
357 case VXLAN:
358 tdBuilder.type(TunnelDescription.Type.VXLAN);
359 break;
360 case GRE:
361 tdBuilder.type(TunnelDescription.Type.GRE);
362 break;
363 case GENEVE:
364 tdBuilder.type(TunnelDescription.Type.GENEVE);
365 break;
366 default:
367 return null;
368 }
369
370 return tdBuilder.build();
371 }
372
373 return null;
374 }
375
376 private boolean isOvsdbConnected(K8sHost host, int ovsdbPort,
377 OvsdbController ovsdbController,
378 DeviceService deviceService) {
379 OvsdbClientService client = getOvsdbClient(host, ovsdbPort, ovsdbController);
380 return deviceService.isAvailable(host.ovsdb()) &&
381 client != null &&
382 client.isConnected();
383 }
384
385 private OvsdbClientService getOvsdbClient(K8sHost host, int ovsdbPort,
386 OvsdbController ovsdbController) {
387 OvsdbNodeId ovsdb = new OvsdbNodeId(host.hostIp(), ovsdbPort);
388 return ovsdbController.getOvsdbClient(ovsdb);
389 }
390
391 private boolean isCurrentStateDone(K8sHost k8sHost) {
392 switch (k8sHost.state()) {
393 case INIT:
394 return isInitStateDone(k8sHost);
395 case DEVICE_CREATED:
396 return isDeviceCreatedStateDone(k8sHost);
397 case COMPLETE:
398 case INCOMPLETE:
399 return false;
400 default:
401 return true;
402 }
403 }
404
405 private boolean isInitStateDone(K8sHost k8sHost) {
406 if (!isOvsdbConnected(k8sHost, ovsdbPortNum,
407 ovsdbController, deviceService)) {
408 return false;
409 }
410
411 try {
412 // we need to wait a while, in case interface and bridge
413 // creation requires some time
414 sleep(SLEEP_MS);
415 } catch (InterruptedException e) {
416 log.error("Exception caused during init state checking...");
417 }
418
Jian Li019ce6a2020-09-09 10:23:21 +0900419 for (K8sBridge tunBridge : k8sHost.tunBridges()) {
Jian Li077b07e2020-09-01 16:55:25 +0900420 if (!deviceService.isAvailable(tunBridge.deviceId())) {
421 return false;
422 }
423 }
424
Jian Li019ce6a2020-09-09 10:23:21 +0900425 for (K8sBridge routerBridge: k8sHost.routerBridges()) {
426 if (!deviceService.isAvailable(routerBridge.deviceId())) {
427 return false;
428 }
429 }
430
Jian Li077b07e2020-09-01 16:55:25 +0900431 return true;
432 }
433
434 private boolean isDeviceCreatedStateDone(K8sHost k8sHost) {
435 try {
436 // we need to wait a while, in case interface and bridge
437 // creation requires some time
438 sleep(SLEEP_MS);
439 } catch (InterruptedException e) {
440 log.error("Exception caused during init state checking...");
441 }
442
443 for (K8sTunnelBridge bridge: k8sHost.tunBridges()) {
444 if (!isTunPortEnabled(bridge, bridge.vxlanPortName())) {
445 return false;
446 }
447 if (!isTunPortEnabled(bridge, bridge.grePortName())) {
448 return false;
449 }
450 if (!isTunPortEnabled(bridge, bridge.genevePortName())) {
451 return false;
452 }
453 }
454
455 return true;
456 }
457
458 private boolean isTunPortEnabled(K8sTunnelBridge tunBridge, String intf) {
459 return deviceService.isAvailable(tunBridge.deviceId()) &&
460 deviceService.getPorts(tunBridge.deviceId()).stream()
461 .anyMatch(port -> Objects.equals(
462 port.annotations().value(PORT_NAME), intf) &&
463 port.isEnabled());
464 }
465
466 /**
467 * Configures the kubernetes host with new state.
468 *
469 * @param k8sHost kubernetes host
470 * @param newState a new state
471 */
472 private void setState(K8sHost k8sHost, K8sHostState newState) {
473 if (k8sHost.state() == newState) {
474 return;
475 }
476 K8sHost updated = k8sHost.updateState(newState);
477 k8sHostAdminService.updateHost(updated);
478 log.info("Changed {} state: {}", k8sHost.hostIp(), newState);
479 }
480
481 /**
482 * Bootstraps a new kubernetes host.
483 *
484 * @param k8sHost kubernetes host
485 */
486 private void bootstrapHost(K8sHost k8sHost) {
487 if (isCurrentStateDone(k8sHost)) {
488 setState(k8sHost, k8sHost.state().nextState());
489 } else {
490 log.trace("Processing {} state for {}", k8sHost.state(),
491 k8sHost.hostIp());
492 k8sHost.state().process(this, k8sHost);
493 }
494 }
495
496 private class InternalOvsdbListener implements DeviceListener {
497
498 @Override
499 public boolean isRelevant(DeviceEvent event) {
500 return event.subject().type() == Device.Type.CONTROLLER;
501 }
502
503 private boolean isRelevantHelper() {
504 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
505 }
506
507 @Override
508 public void event(DeviceEvent event) {
509 Device device = event.subject();
510
511 switch (event.type()) {
512 case DEVICE_AVAILABILITY_CHANGED:
513 case DEVICE_ADDED:
514 eventExecutor.execute(() -> {
515 if (!isRelevantHelper()) {
516 return;
517 }
518
519 K8sHost k8sHost = k8sHostAdminService.host(device.id());
520
521 if (k8sHost == null) {
522 return;
523 }
524
525 if (deviceService.isAvailable(device.id())) {
526 log.debug("OVSDB {} detected", device.id());
527 bootstrapHost(k8sHost);
528 }
529 });
530 break;
531 case PORT_ADDED:
532 case PORT_REMOVED:
533 case DEVICE_REMOVED:
534 default:
535 // do nothing
536 break;
537 }
538 }
539 }
540
541 private class InternalBridgeListener implements DeviceListener {
542
543 @Override
544 public boolean isRelevant(DeviceEvent event) {
545 return event.subject().type() == Device.Type.SWITCH;
546 }
547
548 private boolean isRelevantHelper() {
549 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
550 }
551
552 @Override
553 public void event(DeviceEvent event) {
554 Device device = event.subject();
555
556 switch (event.type()) {
557 case DEVICE_AVAILABILITY_CHANGED:
558 case DEVICE_ADDED:
559 eventExecutor.execute(() -> {
560 if (!isRelevantHelper()) {
561 return;
562 }
563
564 K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
565
566 if (k8sHost == null) {
567 return;
568 }
569
570 if (deviceService.isAvailable(device.id())) {
571 log.debug("Tunnel bridge created on {}",
572 k8sHost.hostIp());
573 log.debug("OVSDB {} detected", device.id());
574 bootstrapHost(k8sHost);
575 } else if (k8sHost.state() == COMPLETE) {
576 log.info("Device {} disconnected", device.id());
577 setState(k8sHost, INCOMPLETE);
578 }
579
580 if (k8sHost.state() == INCOMPLETE ||
581 k8sHost.state() == DEVICE_CREATED) {
582 log.info("Device {} is reconnected", device.id());
583 k8sHostAdminService.updateHost(
584 k8sHost.updateState(INIT));
585 }
586 });
587 break;
588 case PORT_UPDATED:
589 case PORT_ADDED:
590 eventExecutor.execute(() -> {
591 if (!isRelevantHelper()) {
592 return;
593 }
594
595 K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
596
597 if (k8sHost == null) {
598 return;
599 }
600
601 Port port = event.port();
602 String portName = port.annotations().value(PORT_NAME);
603 if (k8sHost.state() == DEVICE_CREATED) {
604
605 K8sTunnelBridge tunBridge = k8sHost.tunBridges().stream().filter(
606 br -> br.deviceId().equals(device.id())
607 ).findAny().orElse(null);
608
609 if (tunBridge != null) {
610 if (Objects.equals(portName, tunBridge.vxlanPortName()) ||
611 Objects.equals(portName, tunBridge.grePortName()) ||
612 Objects.equals(portName, tunBridge.genevePortName())) {
613 log.info("Interface {} added or updated to {}",
614 portName, device.id());
615 bootstrapHost(k8sHost);
616 }
617 }
618 }
619 });
620 break;
621 case PORT_REMOVED:
622 eventExecutor.execute(() -> {
623 if (!isRelevantHelper()) {
624 return;
625 }
626
627 K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
628
629 if (k8sHost == null) {
630 return;
631 }
632
633 Port port = event.port();
634 String portName = port.annotations().value(PORT_NAME);
635 if (k8sHost.state() == COMPLETE) {
636 K8sTunnelBridge tunBridge = k8sHost.tunBridges().stream().filter(
637 br -> br.deviceId().equals(device.id())
638 ).findAny().orElse(null);
639
640 if (tunBridge != null) {
641 if (Objects.equals(portName, tunBridge.vxlanPortName()) ||
642 Objects.equals(portName, tunBridge.grePortName()) ||
643 Objects.equals(portName, tunBridge.genevePortName())) {
644 log.warn("Interface {} removed from {}",
645 portName, event.subject().id());
646 setState(k8sHost, INCOMPLETE);
647 }
648 }
649 }
650 });
651 break;
652 case DEVICE_REMOVED:
653 default:
654 // do nothing
655 break;
656 }
657 }
658 }
659
660 private class InternalK8sHostListener implements K8sHostListener {
661
662 private boolean isRelevantHelper() {
663 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
664 }
665
666 @Override
667 public void event(K8sHostEvent event) {
668 switch (event.type()) {
669 case K8S_HOST_CREATED:
670 case K8S_HOST_UPDATED:
671 eventExecutor.execute(() -> {
672 if (!isRelevantHelper()) {
673 return;
674 }
675
676 bootstrapHost(event.subject());
677 });
678 break;
679 case K8S_HOST_REMOVED:
680 case K8S_HOST_INCOMPLETE:
681 default:
682 break;
683 }
684 }
685 }
686}