blob: 2fc26bedcd6a62d06e7e11c09229e69ef754e4c6 [file] [log] [blame]
Jian Li077b07e2020-09-01 16:55:25 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import org.onosproject.cluster.ClusterService;
19import org.onosproject.cluster.LeadershipService;
20import org.onosproject.cluster.NodeId;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
Jian Li019ce6a2020-09-09 10:23:21 +090023import org.onosproject.k8snode.api.K8sBridge;
Jian Li077b07e2020-09-01 16:55:25 +090024import org.onosproject.k8snode.api.K8sHost;
25import org.onosproject.k8snode.api.K8sHostAdminService;
26import org.onosproject.k8snode.api.K8sHostEvent;
27import org.onosproject.k8snode.api.K8sHostHandler;
28import org.onosproject.k8snode.api.K8sHostListener;
29import org.onosproject.k8snode.api.K8sHostState;
30import org.onosproject.k8snode.api.K8sNode;
31import org.onosproject.k8snode.api.K8sNodeAdminService;
Jian Li019ce6a2020-09-09 10:23:21 +090032import org.onosproject.k8snode.api.K8sRouterBridge;
Jian Li077b07e2020-09-01 16:55:25 +090033import org.onosproject.k8snode.api.K8sTunnelBridge;
34import org.onosproject.net.Device;
35import org.onosproject.net.DeviceId;
36import org.onosproject.net.Port;
37import org.onosproject.net.behaviour.BridgeConfig;
38import org.onosproject.net.behaviour.BridgeDescription;
39import org.onosproject.net.behaviour.ControllerInfo;
40import org.onosproject.net.behaviour.DefaultBridgeDescription;
41import org.onosproject.net.behaviour.DefaultPatchDescription;
42import org.onosproject.net.behaviour.DefaultTunnelDescription;
43import org.onosproject.net.behaviour.InterfaceConfig;
44import org.onosproject.net.behaviour.PatchDescription;
45import org.onosproject.net.behaviour.TunnelDescription;
46import org.onosproject.net.behaviour.TunnelEndPoints;
47import org.onosproject.net.behaviour.TunnelKey;
48import org.onosproject.net.device.DeviceAdminService;
49import org.onosproject.net.device.DeviceEvent;
50import org.onosproject.net.device.DeviceListener;
51import org.onosproject.net.device.DeviceService;
52import org.onosproject.ovsdb.controller.OvsdbClientService;
53import org.onosproject.ovsdb.controller.OvsdbController;
54import org.onosproject.ovsdb.controller.OvsdbNodeId;
55import org.osgi.service.component.annotations.Activate;
56import org.osgi.service.component.annotations.Component;
57import org.osgi.service.component.annotations.Deactivate;
58import org.osgi.service.component.annotations.Reference;
59import org.osgi.service.component.annotations.ReferenceCardinality;
60import org.slf4j.Logger;
61
62import java.util.List;
63import java.util.Objects;
64import java.util.concurrent.ExecutorService;
65import java.util.stream.Collectors;
66
67import static java.lang.Thread.sleep;
68import static java.util.concurrent.Executors.newSingleThreadExecutor;
69import static org.onlab.packet.TpPort.tpPort;
70import static org.onlab.util.Tools.groupedThreads;
71import static org.onosproject.k8snode.api.Constants.GENEVE;
72import static org.onosproject.k8snode.api.Constants.GRE;
Jian Li019ce6a2020-09-09 10:23:21 +090073import static org.onosproject.k8snode.api.Constants.OS_INTEGRATION_BRIDGE;
Jian Li077b07e2020-09-01 16:55:25 +090074import static org.onosproject.k8snode.api.Constants.VXLAN;
75import static org.onosproject.k8snode.api.K8sHostState.COMPLETE;
76import static org.onosproject.k8snode.api.K8sHostState.DEVICE_CREATED;
77import static org.onosproject.k8snode.api.K8sHostState.INCOMPLETE;
78import static org.onosproject.k8snode.api.K8sHostState.INIT;
79import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
80import static org.onosproject.net.AnnotationKeys.PORT_NAME;
81import static org.slf4j.LoggerFactory.getLogger;
82
83/**
84 * Service bootstraps kubernetes host.
85 */
86@Component(immediate = true)
87public class DefaultK8sHostHandler implements K8sHostHandler {
88
89 private final Logger log = getLogger(getClass());
90
91 private static final String DEFAULT_OF_PROTO = "tcp";
92 private static final int DEFAULT_OFPORT = 6653;
93 private static final int DPID_BEGIN = 3;
94 private static final long SLEEP_MS = 3000; // we wait 3s
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected CoreService coreService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected LeadershipService leadershipService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected ClusterService clusterService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected DeviceService deviceService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected DeviceAdminService deviceAdminService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected OvsdbController ovsdbController;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected K8sHostAdminService k8sHostAdminService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected K8sNodeAdminService k8sNodeAdminService;
119
120
121 private int ovsdbPortNum = 6640;
122
123 private final ExecutorService eventExecutor = newSingleThreadExecutor(
124 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
125
126 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
127 private final DeviceListener bridgeListener = new InternalBridgeListener();
128 private final K8sHostListener k8sHostListener = new InternalK8sHostListener();
129
130 private ApplicationId appId;
131 private NodeId localNode;
132
133 @Activate
134 protected void activate() {
135 appId = coreService.getAppId(APP_ID);
136 localNode = clusterService.getLocalNode().id();
137
138 leadershipService.runForLeadership(appId.name());
139 deviceService.addListener(ovsdbListener);
140 deviceService.addListener(bridgeListener);
141 k8sHostAdminService.addListener(k8sHostListener);
142
143 log.info("Started");
144 }
145
146 @Deactivate
147 protected void deactivate() {
148 k8sHostAdminService.removeListener(k8sHostListener);
149 deviceService.removeListener(bridgeListener);
150 deviceService.removeListener(ovsdbListener);
151 leadershipService.withdraw(appId.name());
152 eventExecutor.shutdown();
153
154 log.info("Stopped");
155 }
156
157 @Override
158 public void processInitState(K8sHost k8sHost) {
159 if (!isOvsdbConnected(k8sHost, ovsdbPortNum, ovsdbController, deviceService)) {
160 ovsdbController.connect(k8sHost.hostIp(), tpPort(ovsdbPortNum));
161 return;
162 }
163
164 for (K8sTunnelBridge tunBridge : k8sHost.tunBridges()) {
165 if (!deviceService.isAvailable(tunBridge.deviceId())) {
166 createBridge(k8sHost.ovsdb(), tunBridge);
167 }
168 }
Jian Li019ce6a2020-09-09 10:23:21 +0900169
170 for (K8sRouterBridge routerBridge : k8sHost.routerBridges()) {
171 if (!deviceService.isAvailable(routerBridge.deviceId())) {
172 createBridge(k8sHost.ovsdb(), routerBridge);
173 }
174 }
Jian Li077b07e2020-09-01 16:55:25 +0900175 }
176
177 @Override
178 public void processDeviceCreatedState(K8sHost k8sHost) {
179 try {
180 if (!isOvsdbConnected(k8sHost, ovsdbPortNum, ovsdbController, deviceService)) {
181 ovsdbController.connect(k8sHost.hostIp(), tpPort(ovsdbPortNum));
182 return;
183 }
184
185 // create patch ports into tunnel bridge face to integration bridge
186 for (K8sTunnelBridge bridge : k8sHost.tunBridges()) {
187 for (String node : k8sHost.nodeNames()) {
188 K8sNode k8sNode = k8sNodeAdminService.node(node);
189 if (k8sNode.segmentId() == bridge.tunnelId()) {
Jian Li019ce6a2020-09-09 10:23:21 +0900190 createTunnelPatchInterfaces(k8sHost.ovsdb(), bridge, k8sNode);
191 createInterPatchInterfaces(k8sHost.ovsdb(), k8sNode);
Jian Li077b07e2020-09-01 16:55:25 +0900192 }
193 }
194 }
195
196 // create tunnel ports
197 for (K8sTunnelBridge bridge : k8sHost.tunBridges()) {
198 if (!isTunPortEnabled(bridge, bridge.vxlanPortName())) {
199 createVxlanTunnelInterface(k8sHost.ovsdb(), bridge);
200 }
201
202 if (!isTunPortEnabled(bridge, bridge.grePortName())) {
203 createGreTunnelInterface(k8sHost.ovsdb(), bridge);
204 }
205
206 if (!isTunPortEnabled(bridge, bridge.genevePortName())) {
207 createGeneveTunnelInterface(k8sHost.ovsdb(), bridge);
208 }
209 }
Jian Li019ce6a2020-09-09 10:23:21 +0900210
211 // create patch ports into router bridge face to external bridge
212 for (K8sRouterBridge bridge : k8sHost.routerBridges()) {
213 for (String node : k8sHost.nodeNames()) {
214 K8sNode k8sNode = k8sNodeAdminService.node(node);
215 if (k8sNode.segmentId() == bridge.segmentId()) {
216 createRouterPatchInterfaces(k8sHost.ovsdb(), bridge, k8sNode);
217 }
218 }
219 }
Jian Li077b07e2020-09-01 16:55:25 +0900220 } catch (Exception e) {
221 log.error("Exception occurred because of {}", e);
222 }
223 }
224
225 @Override
226 public void processCompleteState(K8sHost k8sHost) {
227 // do something if needed
228 }
229
230 @Override
231 public void processIncompleteState(K8sHost k8sHost) {
232 // do something if needed
233 }
234
Jian Li019ce6a2020-09-09 10:23:21 +0900235 private void createBridge(DeviceId ovsdb, K8sBridge bridge) {
Jian Li077b07e2020-09-01 16:55:25 +0900236 Device device = deviceService.getDevice(ovsdb);
237
238 List<ControllerInfo> controllers = clusterService.getNodes().stream()
239 .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
240 .collect(Collectors.toList());
241
242 String dpid = bridge.dpid().substring(DPID_BEGIN);
243
244 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
245 .name(bridge.name())
246 .failMode(BridgeDescription.FailMode.SECURE)
247 .datapathId(dpid)
248 .disableInBand()
249 .controllers(controllers);
250
251 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
252 bridgeConfig.addBridge(builder.build());
253 }
254
Jian Li019ce6a2020-09-09 10:23:21 +0900255 private void createTunnelPatchInterfaces(DeviceId ovsdb, K8sBridge bridge, K8sNode k8sNode) {
Jian Li077b07e2020-09-01 16:55:25 +0900256 Device device = deviceService.getDevice(ovsdb);
257 if (device == null || !device.is(InterfaceConfig.class)) {
258 log.error("Failed to create patch interface on {}", ovsdb);
259 return;
260 }
261
262 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
263
Jian Li019ce6a2020-09-09 10:23:21 +0900264 // tunnel bridge -> k8s integration bridge
Jian Li077b07e2020-09-01 16:55:25 +0900265 PatchDescription brTunIntPatchDesc =
266 DefaultPatchDescription.builder()
267 .deviceId(bridge.name())
268 .ifaceName(k8sNode.tunToIntgPatchPortName())
269 .peer(k8sNode.intgToTunPatchPortName())
270 .build();
271
272 ifaceConfig.addPatchMode(k8sNode.tunToIntgPatchPortName(), brTunIntPatchDesc);
273 }
274
Jian Li019ce6a2020-09-09 10:23:21 +0900275 private void createInterPatchInterfaces(DeviceId ovsdb, K8sNode k8sNode) {
276 Device device = deviceService.getDevice(ovsdb);
277 if (device == null || !device.is(InterfaceConfig.class)) {
278 log.error("Failed to create patch interface on {}", ovsdb);
279 return;
280 }
281
282 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
283
284 // openstack integration bridge -> k8s integration bridge
285 PatchDescription osIntK8sIntPatchDesc =
286 DefaultPatchDescription.builder()
287 .deviceId(OS_INTEGRATION_BRIDGE)
288 .ifaceName(k8sNode.osToK8sIntgPatchPortName())
Jian Lia4d8fba2020-09-10 23:16:50 +0900289 .peer(k8sNode.k8sIntgToOsPatchPortName())
Jian Li019ce6a2020-09-09 10:23:21 +0900290 .build();
Jian Lia4d8fba2020-09-10 23:16:50 +0900291 ifaceConfig.addPatchMode(k8sNode.osToK8sIntgPatchPortName(), osIntK8sIntPatchDesc);
Jian Li019ce6a2020-09-09 10:23:21 +0900292
Jian Lia4d8fba2020-09-10 23:16:50 +0900293 // openstack integration bridge -> k8s external bridge
294 PatchDescription osIntK8sExPatchDesc =
295 DefaultPatchDescription.builder()
296 .deviceId(OS_INTEGRATION_BRIDGE)
297 .ifaceName(k8sNode.osToK8sExtPatchPortName())
298 .peer(k8sNode.k8sExtToOsPatchPortName())
299 .build();
300 ifaceConfig.addPatchMode(k8sNode.osToK8sExtPatchPortName(), osIntK8sExPatchDesc);
Jian Li019ce6a2020-09-09 10:23:21 +0900301 }
302
303 private void createRouterPatchInterfaces(DeviceId ovsdb, K8sBridge bridge, K8sNode k8sNode) {
304 Device device = deviceService.getDevice(ovsdb);
305 if (device == null || !device.is(InterfaceConfig.class)) {
306 log.error("Failed to create patch interface on {}", ovsdb);
307 return;
308 }
309
310 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
311
312 // router bridge -> external bridge
313 PatchDescription brRouterExtPatchDesc =
314 DefaultPatchDescription.builder()
315 .deviceId(bridge.name())
316 .ifaceName(k8sNode.routerToExtPatchPortName())
317 .peer(k8sNode.extToRouterPatchPortName())
318 .build();
319
320 ifaceConfig.addPatchMode(k8sNode.tunToIntgPatchPortName(), brRouterExtPatchDesc);
321 }
322
Jian Li077b07e2020-09-01 16:55:25 +0900323 private void createVxlanTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
324 createTunnelInterface(ovsdb, bridge, VXLAN, bridge.vxlanPortName());
325 }
326
327 private void createGreTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
328 createTunnelInterface(ovsdb, bridge, GRE, bridge.grePortName());
329 }
330
331 private void createGeneveTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
332 createTunnelInterface(ovsdb, bridge, GENEVE, bridge.genevePortName());
333 }
334
335 private void createTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge,
336 String type, String intfName) {
337 if (isTunPortEnabled(bridge, intfName)) {
338 return;
339 }
340
341 Device device = deviceService.getDevice(ovsdb);
342 if (device == null || !device.is(InterfaceConfig.class)) {
343 log.error("Failed to create tunnel interface on {}", ovsdb);
344 return;
345 }
346
347 TunnelDescription tunnelDesc = buildTunnelDesc(bridge, type, intfName);
348
349 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
350 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
351 }
352
353 private TunnelDescription buildTunnelDesc(K8sTunnelBridge bridge, String type, String intfName) {
354 TunnelKey<String> key = new TunnelKey<>(String.valueOf(bridge.tunnelId()));
355
356 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
357 TunnelDescription.Builder tdBuilder =
358 DefaultTunnelDescription.builder()
359 .deviceId(bridge.name())
360 .ifaceName(intfName)
361 .remote(TunnelEndPoints.flowTunnelEndpoint())
362 .key(key);
363
364 switch (type) {
365 case VXLAN:
366 tdBuilder.type(TunnelDescription.Type.VXLAN);
367 break;
368 case GRE:
369 tdBuilder.type(TunnelDescription.Type.GRE);
370 break;
371 case GENEVE:
372 tdBuilder.type(TunnelDescription.Type.GENEVE);
373 break;
374 default:
375 return null;
376 }
377
378 return tdBuilder.build();
379 }
380
381 return null;
382 }
383
384 private boolean isOvsdbConnected(K8sHost host, int ovsdbPort,
385 OvsdbController ovsdbController,
386 DeviceService deviceService) {
387 OvsdbClientService client = getOvsdbClient(host, ovsdbPort, ovsdbController);
388 return deviceService.isAvailable(host.ovsdb()) &&
389 client != null &&
390 client.isConnected();
391 }
392
393 private OvsdbClientService getOvsdbClient(K8sHost host, int ovsdbPort,
394 OvsdbController ovsdbController) {
395 OvsdbNodeId ovsdb = new OvsdbNodeId(host.hostIp(), ovsdbPort);
396 return ovsdbController.getOvsdbClient(ovsdb);
397 }
398
399 private boolean isCurrentStateDone(K8sHost k8sHost) {
400 switch (k8sHost.state()) {
401 case INIT:
402 return isInitStateDone(k8sHost);
403 case DEVICE_CREATED:
404 return isDeviceCreatedStateDone(k8sHost);
405 case COMPLETE:
406 case INCOMPLETE:
407 return false;
408 default:
409 return true;
410 }
411 }
412
413 private boolean isInitStateDone(K8sHost k8sHost) {
414 if (!isOvsdbConnected(k8sHost, ovsdbPortNum,
415 ovsdbController, deviceService)) {
416 return false;
417 }
418
419 try {
420 // we need to wait a while, in case interface and bridge
421 // creation requires some time
422 sleep(SLEEP_MS);
423 } catch (InterruptedException e) {
424 log.error("Exception caused during init state checking...");
425 }
426
Jian Li019ce6a2020-09-09 10:23:21 +0900427 for (K8sBridge tunBridge : k8sHost.tunBridges()) {
Jian Li077b07e2020-09-01 16:55:25 +0900428 if (!deviceService.isAvailable(tunBridge.deviceId())) {
429 return false;
430 }
431 }
432
Jian Li019ce6a2020-09-09 10:23:21 +0900433 for (K8sBridge routerBridge: k8sHost.routerBridges()) {
434 if (!deviceService.isAvailable(routerBridge.deviceId())) {
435 return false;
436 }
437 }
438
Jian Li077b07e2020-09-01 16:55:25 +0900439 return true;
440 }
441
442 private boolean isDeviceCreatedStateDone(K8sHost k8sHost) {
443 try {
444 // we need to wait a while, in case interface and bridge
445 // creation requires some time
446 sleep(SLEEP_MS);
447 } catch (InterruptedException e) {
448 log.error("Exception caused during init state checking...");
449 }
450
451 for (K8sTunnelBridge bridge: k8sHost.tunBridges()) {
452 if (!isTunPortEnabled(bridge, bridge.vxlanPortName())) {
453 return false;
454 }
455 if (!isTunPortEnabled(bridge, bridge.grePortName())) {
456 return false;
457 }
458 if (!isTunPortEnabled(bridge, bridge.genevePortName())) {
459 return false;
460 }
461 }
462
463 return true;
464 }
465
466 private boolean isTunPortEnabled(K8sTunnelBridge tunBridge, String intf) {
467 return deviceService.isAvailable(tunBridge.deviceId()) &&
468 deviceService.getPorts(tunBridge.deviceId()).stream()
469 .anyMatch(port -> Objects.equals(
470 port.annotations().value(PORT_NAME), intf) &&
471 port.isEnabled());
472 }
473
474 /**
475 * Configures the kubernetes host with new state.
476 *
477 * @param k8sHost kubernetes host
478 * @param newState a new state
479 */
480 private void setState(K8sHost k8sHost, K8sHostState newState) {
481 if (k8sHost.state() == newState) {
482 return;
483 }
484 K8sHost updated = k8sHost.updateState(newState);
485 k8sHostAdminService.updateHost(updated);
486 log.info("Changed {} state: {}", k8sHost.hostIp(), newState);
487 }
488
489 /**
490 * Bootstraps a new kubernetes host.
491 *
492 * @param k8sHost kubernetes host
493 */
494 private void bootstrapHost(K8sHost k8sHost) {
495 if (isCurrentStateDone(k8sHost)) {
496 setState(k8sHost, k8sHost.state().nextState());
497 } else {
498 log.trace("Processing {} state for {}", k8sHost.state(),
499 k8sHost.hostIp());
500 k8sHost.state().process(this, k8sHost);
501 }
502 }
503
504 private class InternalOvsdbListener implements DeviceListener {
505
506 @Override
507 public boolean isRelevant(DeviceEvent event) {
508 return event.subject().type() == Device.Type.CONTROLLER;
509 }
510
511 private boolean isRelevantHelper() {
512 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
513 }
514
515 @Override
516 public void event(DeviceEvent event) {
517 Device device = event.subject();
518
519 switch (event.type()) {
520 case DEVICE_AVAILABILITY_CHANGED:
521 case DEVICE_ADDED:
522 eventExecutor.execute(() -> {
523 if (!isRelevantHelper()) {
524 return;
525 }
526
527 K8sHost k8sHost = k8sHostAdminService.host(device.id());
528
529 if (k8sHost == null) {
530 return;
531 }
532
533 if (deviceService.isAvailable(device.id())) {
534 log.debug("OVSDB {} detected", device.id());
535 bootstrapHost(k8sHost);
536 }
537 });
538 break;
539 case PORT_ADDED:
540 case PORT_REMOVED:
541 case DEVICE_REMOVED:
542 default:
543 // do nothing
544 break;
545 }
546 }
547 }
548
549 private class InternalBridgeListener implements DeviceListener {
550
551 @Override
552 public boolean isRelevant(DeviceEvent event) {
553 return event.subject().type() == Device.Type.SWITCH;
554 }
555
556 private boolean isRelevantHelper() {
557 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
558 }
559
560 @Override
561 public void event(DeviceEvent event) {
562 Device device = event.subject();
563
564 switch (event.type()) {
565 case DEVICE_AVAILABILITY_CHANGED:
566 case DEVICE_ADDED:
567 eventExecutor.execute(() -> {
568 if (!isRelevantHelper()) {
569 return;
570 }
571
572 K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
573
574 if (k8sHost == null) {
575 return;
576 }
577
578 if (deviceService.isAvailable(device.id())) {
579 log.debug("Tunnel bridge created on {}",
580 k8sHost.hostIp());
581 log.debug("OVSDB {} detected", device.id());
582 bootstrapHost(k8sHost);
583 } else if (k8sHost.state() == COMPLETE) {
584 log.info("Device {} disconnected", device.id());
585 setState(k8sHost, INCOMPLETE);
586 }
587
588 if (k8sHost.state() == INCOMPLETE ||
589 k8sHost.state() == DEVICE_CREATED) {
590 log.info("Device {} is reconnected", device.id());
591 k8sHostAdminService.updateHost(
592 k8sHost.updateState(INIT));
593 }
594 });
595 break;
596 case PORT_UPDATED:
597 case PORT_ADDED:
598 eventExecutor.execute(() -> {
599 if (!isRelevantHelper()) {
600 return;
601 }
602
603 K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
604
605 if (k8sHost == null) {
606 return;
607 }
608
609 Port port = event.port();
610 String portName = port.annotations().value(PORT_NAME);
611 if (k8sHost.state() == DEVICE_CREATED) {
612
613 K8sTunnelBridge tunBridge = k8sHost.tunBridges().stream().filter(
614 br -> br.deviceId().equals(device.id())
615 ).findAny().orElse(null);
616
617 if (tunBridge != null) {
618 if (Objects.equals(portName, tunBridge.vxlanPortName()) ||
619 Objects.equals(portName, tunBridge.grePortName()) ||
620 Objects.equals(portName, tunBridge.genevePortName())) {
621 log.info("Interface {} added or updated to {}",
622 portName, device.id());
623 bootstrapHost(k8sHost);
624 }
625 }
626 }
627 });
628 break;
629 case PORT_REMOVED:
630 eventExecutor.execute(() -> {
631 if (!isRelevantHelper()) {
632 return;
633 }
634
635 K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
636
637 if (k8sHost == null) {
638 return;
639 }
640
641 Port port = event.port();
642 String portName = port.annotations().value(PORT_NAME);
643 if (k8sHost.state() == COMPLETE) {
644 K8sTunnelBridge tunBridge = k8sHost.tunBridges().stream().filter(
645 br -> br.deviceId().equals(device.id())
646 ).findAny().orElse(null);
647
648 if (tunBridge != null) {
649 if (Objects.equals(portName, tunBridge.vxlanPortName()) ||
650 Objects.equals(portName, tunBridge.grePortName()) ||
651 Objects.equals(portName, tunBridge.genevePortName())) {
652 log.warn("Interface {} removed from {}",
653 portName, event.subject().id());
654 setState(k8sHost, INCOMPLETE);
655 }
656 }
657 }
658 });
659 break;
660 case DEVICE_REMOVED:
661 default:
662 // do nothing
663 break;
664 }
665 }
666 }
667
668 private class InternalK8sHostListener implements K8sHostListener {
669
670 private boolean isRelevantHelper() {
671 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
672 }
673
674 @Override
675 public void event(K8sHostEvent event) {
676 switch (event.type()) {
677 case K8S_HOST_CREATED:
678 case K8S_HOST_UPDATED:
679 eventExecutor.execute(() -> {
680 if (!isRelevantHelper()) {
681 return;
682 }
683
684 bootstrapHost(event.subject());
685 });
686 break;
687 case K8S_HOST_REMOVED:
688 case K8S_HOST_INCOMPLETE:
689 default:
690 break;
691 }
692 }
693 }
694}