blob: 83e537c9775b5a8e997e80a0118066296bc1fdd1 [file] [log] [blame]
Jian Lif16e8852019-01-22 22:55:31 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import org.onlab.util.Tools;
19import org.onosproject.cfg.ComponentConfigService;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.LeadershipService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.k8snode.api.K8sNode;
26import org.onosproject.k8snode.api.K8sNodeAdminService;
27import org.onosproject.k8snode.api.K8sNodeEvent;
28import org.onosproject.k8snode.api.K8sNodeHandler;
29import org.onosproject.k8snode.api.K8sNodeListener;
30import org.onosproject.k8snode.api.K8sNodeService;
31import org.onosproject.k8snode.api.K8sNodeState;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.Port;
35import org.onosproject.net.behaviour.BridgeConfig;
36import org.onosproject.net.behaviour.BridgeDescription;
37import org.onosproject.net.behaviour.ControllerInfo;
38import org.onosproject.net.behaviour.DefaultBridgeDescription;
Jian Libf562c22019-04-15 18:07:14 +090039import org.onosproject.net.behaviour.DefaultPatchDescription;
Jian Lif16e8852019-01-22 22:55:31 +090040import org.onosproject.net.behaviour.DefaultTunnelDescription;
41import org.onosproject.net.behaviour.InterfaceConfig;
Jian Libf562c22019-04-15 18:07:14 +090042import org.onosproject.net.behaviour.PatchDescription;
Jian Lif16e8852019-01-22 22:55:31 +090043import org.onosproject.net.behaviour.TunnelDescription;
44import org.onosproject.net.behaviour.TunnelEndPoints;
45import org.onosproject.net.behaviour.TunnelKeys;
46import org.onosproject.net.device.DeviceAdminService;
47import org.onosproject.net.device.DeviceEvent;
48import org.onosproject.net.device.DeviceListener;
49import org.onosproject.net.device.DeviceService;
50import org.onosproject.ovsdb.controller.OvsdbClientService;
51import org.onosproject.ovsdb.controller.OvsdbController;
52import org.osgi.service.component.ComponentContext;
53import org.osgi.service.component.annotations.Activate;
54import org.osgi.service.component.annotations.Component;
55import org.osgi.service.component.annotations.Deactivate;
56import org.osgi.service.component.annotations.Modified;
57import org.osgi.service.component.annotations.Reference;
58import org.osgi.service.component.annotations.ReferenceCardinality;
59import org.slf4j.Logger;
60
61import java.util.Dictionary;
62import java.util.List;
63import java.util.Objects;
64import java.util.concurrent.ExecutorService;
65import java.util.stream.Collectors;
66
67import static java.util.concurrent.Executors.newSingleThreadExecutor;
68import static org.onlab.packet.TpPort.tpPort;
69import static org.onlab.util.Tools.groupedThreads;
Jian Libf562c22019-04-15 18:07:14 +090070import static org.onosproject.k8snode.api.Constants.EXTERNAL_BRIDGE;
Jian Lif16e8852019-01-22 22:55:31 +090071import static org.onosproject.k8snode.api.Constants.GENEVE;
72import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
73import static org.onosproject.k8snode.api.Constants.GRE;
74import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
75import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
Jian Libf562c22019-04-15 18:07:14 +090076import static org.onosproject.k8snode.api.Constants.INTEGRATION_TO_EXTERNAL_BRIDGE;
77import static org.onosproject.k8snode.api.Constants.PHYSICAL_EXTERNAL_BRIDGE;
Jian Lif16e8852019-01-22 22:55:31 +090078import static org.onosproject.k8snode.api.Constants.VXLAN;
79import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
80import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
81import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
82import static org.onosproject.k8snode.api.K8sNodeState.DEVICE_CREATED;
83import static org.onosproject.k8snode.api.K8sNodeState.INCOMPLETE;
84import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
85import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
86import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
87import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
88import static org.onosproject.k8snode.util.K8sNodeUtil.getBooleanProperty;
89import static org.onosproject.k8snode.util.K8sNodeUtil.getOvsdbClient;
90import static org.onosproject.k8snode.util.K8sNodeUtil.isOvsdbConnected;
91import static org.onosproject.net.AnnotationKeys.PORT_NAME;
92import static org.slf4j.LoggerFactory.getLogger;
93
94/**
95 * Service bootstraps kubernetes node based on its type.
96 */
97@Component(immediate = true,
98 property = {
99 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
100 AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
101 }
102)
103public class DefaultK8sNodeHandler implements K8sNodeHandler {
104
105 private final Logger log = getLogger(getClass());
106
107 private static final String DEFAULT_OF_PROTO = "tcp";
108 private static final int DEFAULT_OFPORT = 6653;
109 private static final int DPID_BEGIN = 3;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected CoreService coreService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected LeadershipService leadershipService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected ClusterService clusterService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected DeviceService deviceService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
124 protected DeviceAdminService deviceAdminService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected OvsdbController ovsdbController;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
130 protected K8sNodeService k8sNodeService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
133 protected K8sNodeAdminService k8sNodeAdminService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
136 protected ComponentConfigService componentConfigService;
137
138 /** OVSDB server listen port. */
139 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
140
141 /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
142 private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
143
144 private final ExecutorService eventExecutor = newSingleThreadExecutor(
145 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
146
147 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
148 private final DeviceListener bridgeListener = new InternalBridgeListener();
149 private final K8sNodeListener k8sNodeListener = new InternalK8sNodeListener();
150
151 private ApplicationId appId;
152 private NodeId localNode;
153
154 @Activate
155 protected void activate() {
156 appId = coreService.getAppId(APP_ID);
157 localNode = clusterService.getLocalNode().id();
158
159 componentConfigService.registerProperties(getClass());
160 leadershipService.runForLeadership(appId.name());
161 deviceService.addListener(ovsdbListener);
162 deviceService.addListener(bridgeListener);
163 k8sNodeService.addListener(k8sNodeListener);
164
165 log.info("Started");
166 }
167
168 @Deactivate
169 protected void deactivate() {
170 k8sNodeService.removeListener(k8sNodeListener);
171 deviceService.removeListener(bridgeListener);
172 deviceService.removeListener(ovsdbListener);
173 componentConfigService.unregisterProperties(getClass(), false);
174 leadershipService.withdraw(appId.name());
175 eventExecutor.shutdown();
176
177 log.info("Stopped");
178 }
179
180 @Modified
181 protected void modified(ComponentContext context) {
182 readComponentConfiguration(context);
183
184 log.info("Modified");
185 }
186
187 @Override
188 public void processInitState(K8sNode k8sNode) {
189 if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
190 ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
191 return;
192 }
193 if (!deviceService.isAvailable(k8sNode.intgBridge())) {
194 createBridge(k8sNode, INTEGRATION_BRIDGE, k8sNode.intgBridge());
195 }
Jian Libf562c22019-04-15 18:07:14 +0900196 if (!deviceService.isAvailable(k8sNode.extBridge())) {
197 createBridge(k8sNode, EXTERNAL_BRIDGE, k8sNode.extBridge());
198 }
Jian Lif16e8852019-01-22 22:55:31 +0900199 }
200
201 @Override
202 public void processDeviceCreatedState(K8sNode k8sNode) {
203 try {
204 if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
205 ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
206 return;
207 }
208
Jian Libf562c22019-04-15 18:07:14 +0900209 // create patch ports between integration and external bridges
210 createPatchInterfaces(k8sNode);
211
Jian Lif16e8852019-01-22 22:55:31 +0900212 if (k8sNode.dataIp() != null &&
213 !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
214 createVxlanTunnelInterface(k8sNode);
215 }
216
217 if (k8sNode.dataIp() != null &&
218 !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
219 createGreTunnelInterface(k8sNode);
220 }
221
222 if (k8sNode.dataIp() != null &&
223 !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
224 createGeneveTunnelInterface(k8sNode);
225 }
226 } catch (Exception e) {
227 log.error("Exception occurred because of {}", e);
228 }
229 }
230
231 @Override
232 public void processCompleteState(K8sNode k8sNode) {
233 // do something if needed
234 }
235
236 @Override
237 public void processIncompleteState(K8sNode k8sNode) {
238 // do something if needed
239 }
240
241 /**
242 * Extracts properties from the component configuration context.
243 *
244 * @param context the component context
245 */
246 private void readComponentConfiguration(ComponentContext context) {
247 Dictionary<?, ?> properties = context.getProperties();
248
249 Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
250 if (ovsdbPortConfigured == null) {
251 ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
252 log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
253 } else {
254 ovsdbPortNum = ovsdbPortConfigured;
255 log.info("Configured. OVSDB port is {}", ovsdbPortNum);
256 }
257
258 Boolean autoRecoveryConfigured =
259 getBooleanProperty(properties, AUTO_RECOVERY);
260 if (autoRecoveryConfigured == null) {
261 autoRecovery = AUTO_RECOVERY_DEFAULT;
262 log.info("Auto recovery flag is NOT " +
263 "configured, default value is {}", autoRecovery);
264 } else {
265 autoRecovery = autoRecoveryConfigured;
266 log.info("Configured. Auto recovery flag is {}", autoRecovery);
267 }
268 }
269
270 /**
271 * Creates a bridge with a given name on a given kubernetes node.
272 *
273 * @param k8sNode kubernetes node
274 * @param bridgeName bridge name
275 * @param devId device identifier
276 */
277 private void createBridge(K8sNode k8sNode, String bridgeName, DeviceId devId) {
278 Device device = deviceService.getDevice(k8sNode.ovsdb());
279
280 List<ControllerInfo> controllers = clusterService.getNodes().stream()
281 .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
282 .collect(Collectors.toList());
283
284 String dpid = devId.toString().substring(DPID_BEGIN);
285
286 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
287 .name(bridgeName)
288 .failMode(BridgeDescription.FailMode.SECURE)
289 .datapathId(dpid)
290 .disableInBand()
291 .controllers(controllers);
292
293 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
294 bridgeConfig.addBridge(builder.build());
295 }
296
297 /**
298 * Creates a VXLAN tunnel interface in a given kubernetes node.
299 *
300 * @param k8sNode kubernetes node
301 */
302 private void createVxlanTunnelInterface(K8sNode k8sNode) {
303 createTunnelInterface(k8sNode, VXLAN, VXLAN_TUNNEL);
304 }
305
306 /**
307 * Creates a GRE tunnel interface in a given kubernetes node.
308 *
309 * @param k8sNode kubernetes node
310 */
311 private void createGreTunnelInterface(K8sNode k8sNode) {
312 createTunnelInterface(k8sNode, GRE, GRE_TUNNEL);
313 }
314
315 /**
316 * Creates a GENEVE tunnel interface in a given kubernetes node.
317 *
318 * @param k8sNode kubernetes node
319 */
320 private void createGeneveTunnelInterface(K8sNode k8sNode) {
321 createTunnelInterface(k8sNode, GENEVE, GENEVE_TUNNEL);
322 }
323
Jian Libf562c22019-04-15 18:07:14 +0900324 private void createPatchInterfaces(K8sNode k8sNode) {
325 Device device = deviceService.getDevice(k8sNode.ovsdb());
326 if (device == null || !device.is(InterfaceConfig.class)) {
327 log.error("Failed to create patch interface on {}", k8sNode.ovsdb());
328 return;
329 }
330
331 PatchDescription brIntPatchDesc =
332 DefaultPatchDescription.builder()
333 .deviceId(INTEGRATION_BRIDGE)
334 .ifaceName(INTEGRATION_TO_EXTERNAL_BRIDGE)
335 .peer(PHYSICAL_EXTERNAL_BRIDGE)
336 .build();
337
338 PatchDescription brExtPatchDesc =
339 DefaultPatchDescription.builder()
340 .deviceId(EXTERNAL_BRIDGE)
341 .ifaceName(PHYSICAL_EXTERNAL_BRIDGE)
342 .peer(INTEGRATION_TO_EXTERNAL_BRIDGE)
343 .build();
344
345 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
346 ifaceConfig.addPatchMode(INTEGRATION_TO_EXTERNAL_BRIDGE, brIntPatchDesc);
347 ifaceConfig.addPatchMode(PHYSICAL_EXTERNAL_BRIDGE, brExtPatchDesc);
348 }
349
Jian Lif16e8852019-01-22 22:55:31 +0900350 /**
351 * Creates a tunnel interface in a given kubernetes node.
352 *
353 * @param k8sNode kubernetes node
354 */
355 private void createTunnelInterface(K8sNode k8sNode,
356 String type, String intfName) {
357 if (isIntfEnabled(k8sNode, intfName)) {
358 return;
359 }
360
361 Device device = deviceService.getDevice(k8sNode.ovsdb());
362 if (device == null || !device.is(InterfaceConfig.class)) {
363 log.error("Failed to create tunnel interface on {}", k8sNode.ovsdb());
364 return;
365 }
366
367 TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
368
369 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
370 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
371 }
372
373 /**
374 * Builds tunnel description according to the network type.
375 *
376 * @param type network type
377 * @return tunnel description
378 */
379 private TunnelDescription buildTunnelDesc(String type, String intfName) {
380 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
381 TunnelDescription.Builder tdBuilder =
382 DefaultTunnelDescription.builder()
383 .deviceId(INTEGRATION_BRIDGE)
384 .ifaceName(intfName)
385 .remote(TunnelEndPoints.flowTunnelEndpoint())
386 .key(TunnelKeys.flowTunnelKey());
387
388 switch (type) {
389 case VXLAN:
390 tdBuilder.type(TunnelDescription.Type.VXLAN);
391 break;
392 case GRE:
393 tdBuilder.type(TunnelDescription.Type.GRE);
394 break;
395 case GENEVE:
396 tdBuilder.type(TunnelDescription.Type.GENEVE);
397 break;
398 default:
399 return null;
400 }
401
402 return tdBuilder.build();
403 }
404 return null;
405 }
406
407 /**
408 * Checks whether a given network interface in a given kubernetes node
409 * is enabled or not.
410 *
411 * @param k8sNode kubernetes node
412 * @param intf network interface name
413 * @return true if the given interface is enabled, false otherwise
414 */
415 private boolean isIntfEnabled(K8sNode k8sNode, String intf) {
416 return deviceService.isAvailable(k8sNode.intgBridge()) &&
417 deviceService.getPorts(k8sNode.intgBridge()).stream()
418 .anyMatch(port -> Objects.equals(
419 port.annotations().value(PORT_NAME), intf) &&
420 port.isEnabled());
421 }
422
423 /**
424 * Checks whether all requirements for this state are fulfilled or not.
425 *
426 * @param k8sNode kubernetes node
427 * @return true if all requirements are fulfilled, false otherwise
428 */
429 private boolean isCurrentStateDone(K8sNode k8sNode) {
430 switch (k8sNode.state()) {
431 case INIT:
432 if (!isOvsdbConnected(k8sNode, ovsdbPortNum,
433 ovsdbController, deviceService)) {
434 return false;
435 }
436
Jian Libf562c22019-04-15 18:07:14 +0900437 return k8sNode.intgBridge() != null && k8sNode.extBridge() != null &&
438 deviceService.isAvailable(k8sNode.intgBridge()) &&
439 deviceService.isAvailable(k8sNode.extBridge());
Jian Lif16e8852019-01-22 22:55:31 +0900440 case DEVICE_CREATED:
441 if (k8sNode.dataIp() != null &&
442 !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
443 return false;
444 }
445 if (k8sNode.dataIp() != null &&
446 !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
447 return false;
448 }
449 if (k8sNode.dataIp() != null &&
450 !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
451 return false;
452 }
453
454 return true;
455 case COMPLETE:
456 case INCOMPLETE:
457 // always return false
458 // run init CLI to re-trigger node bootstrap
459 return false;
460 default:
461 return true;
462 }
463 }
464
465 /**
466 * Configures the kubernetes node with new state.
467 *
468 * @param k8sNode kubernetes node
469 * @param newState a new state
470 */
471 private void setState(K8sNode k8sNode, K8sNodeState newState) {
472 if (k8sNode.state() == newState) {
473 return;
474 }
475 K8sNode updated = k8sNode.updateState(newState);
476 k8sNodeAdminService.updateNode(updated);
477 log.info("Changed {} state: {}", k8sNode.hostname(), newState);
478 }
479
480 /**
481 * Bootstraps a new kubernetes node.
482 *
483 * @param k8sNode kubernetes node
484 */
485 private void bootstrapNode(K8sNode k8sNode) {
486 if (isCurrentStateDone(k8sNode)) {
487 setState(k8sNode, k8sNode.state().nextState());
488 } else {
489 log.trace("Processing {} state for {}", k8sNode.state(),
490 k8sNode.hostname());
491 k8sNode.state().process(this, k8sNode);
492 }
493 }
494
495 private void processK8sNodeRemoved(K8sNode k8sNode) {
496 OvsdbClientService client = getOvsdbClient(k8sNode, ovsdbPortNum, ovsdbController);
497 if (client == null) {
498 log.info("Failed to get ovsdb client");
499 return;
500 }
501
502 // delete integration bridge from the node
503 client.dropBridge(INTEGRATION_BRIDGE);
504
Jian Libf562c22019-04-15 18:07:14 +0900505 // delete external bridge from the node
506 client.dropBridge(EXTERNAL_BRIDGE);
507
Jian Lif16e8852019-01-22 22:55:31 +0900508 // disconnect ovsdb
509 client.disconnect();
510 }
511
512 /**
513 * An internal OVSDB listener. This listener is used for listening the
514 * network facing events from OVSDB device. If a new OVSDB device is detected,
515 * ONOS tries to bootstrap the kubernetes node.
516 */
517 private class InternalOvsdbListener implements DeviceListener {
518
519 @Override
520 public boolean isRelevant(DeviceEvent event) {
521 return event.subject().type() == Device.Type.CONTROLLER;
522 }
523
524 private boolean isRelevantHelper() {
525 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
526 }
527
528 @Override
529 public void event(DeviceEvent event) {
530 Device device = event.subject();
531
532 switch (event.type()) {
533 case DEVICE_AVAILABILITY_CHANGED:
534 case DEVICE_ADDED:
535 eventExecutor.execute(() -> {
536
537 if (!isRelevantHelper()) {
538 return;
539 }
540
541 K8sNode k8sNode = k8sNodeService.node(device.id());
542
543 if (k8sNode == null) {
544 return;
545 }
546
547 if (deviceService.isAvailable(device.id())) {
548 log.debug("OVSDB {} detected", device.id());
549 bootstrapNode(k8sNode);
550 }
551 });
552 break;
553 case PORT_ADDED:
554 case PORT_REMOVED:
555 case DEVICE_REMOVED:
556 default:
557 // do nothing
558 break;
559 }
560 }
561 }
562
563 /**
564 * An internal integration bridge listener. This listener is used for
565 * listening the events from integration bridge. To listen the events from
566 * other types of bridge such as provider bridge or tunnel bridge, we need
567 * to augment K8sNodeService.node() method.
568 */
569 private class InternalBridgeListener implements DeviceListener {
570
571 @Override
572 public boolean isRelevant(DeviceEvent event) {
573 return event.subject().type() == Device.Type.SWITCH;
574 }
575
576 private boolean isRelevantHelper() {
577 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
578 }
579
580 @Override
581 public void event(DeviceEvent event) {
582 Device device = event.subject();
583
584 switch (event.type()) {
585 case DEVICE_AVAILABILITY_CHANGED:
586 case DEVICE_ADDED:
587 eventExecutor.execute(() -> {
588
589 if (!isRelevantHelper()) {
590 return;
591 }
592
593 K8sNode k8sNode = k8sNodeService.node(device.id());
594
595 if (k8sNode == null) {
596 return;
597 }
598
Jian Libf562c22019-04-15 18:07:14 +0900599 // TODO: also need to check the external bridge's availability
Jian Lif16e8852019-01-22 22:55:31 +0900600 if (deviceService.isAvailable(device.id())) {
Jian Libf562c22019-04-15 18:07:14 +0900601 log.debug("Integration bridge created on {}",
602 k8sNode.hostname());
Jian Lif16e8852019-01-22 22:55:31 +0900603 bootstrapNode(k8sNode);
604 } else if (k8sNode.state() == COMPLETE) {
605 log.info("Device {} disconnected", device.id());
606 setState(k8sNode, INCOMPLETE);
607 }
608
609 if (autoRecovery) {
610 if (k8sNode.state() == INCOMPLETE ||
611 k8sNode.state() == DEVICE_CREATED) {
612 log.info("Device {} is reconnected", device.id());
613 k8sNodeAdminService.updateNode(
614 k8sNode.updateState(K8sNodeState.INIT));
615 }
616 }
617 });
618 break;
619 case PORT_UPDATED:
620 case PORT_ADDED:
621 eventExecutor.execute(() -> {
622
623 if (!isRelevantHelper()) {
624 return;
625 }
626
627 K8sNode k8sNode = k8sNodeService.node(device.id());
628
629 if (k8sNode == null) {
630 return;
631 }
632
633 Port port = event.port();
634 String portName = port.annotations().value(PORT_NAME);
635 if (k8sNode.state() == DEVICE_CREATED && (
636 Objects.equals(portName, VXLAN_TUNNEL) ||
637 Objects.equals(portName, GRE_TUNNEL) ||
638 Objects.equals(portName, GENEVE_TUNNEL))) {
639 log.info("Interface {} added or updated to {}",
640 portName, device.id());
641 bootstrapNode(k8sNode);
642 }
643 });
644 break;
645 case PORT_REMOVED:
646 eventExecutor.execute(() -> {
647
648 if (!isRelevantHelper()) {
649 return;
650 }
651
652 K8sNode k8sNode = k8sNodeService.node(device.id());
653
654 if (k8sNode == null) {
655 return;
656 }
657
658 Port port = event.port();
659 String portName = port.annotations().value(PORT_NAME);
660 if (k8sNode.state() == COMPLETE && (
661 Objects.equals(portName, VXLAN_TUNNEL) ||
662 Objects.equals(portName, GRE_TUNNEL) ||
663 Objects.equals(portName, GENEVE_TUNNEL))) {
664 log.warn("Interface {} removed from {}",
665 portName, event.subject().id());
666 setState(k8sNode, INCOMPLETE);
667 }
668 });
669 break;
670 case DEVICE_REMOVED:
671 default:
672 // do nothing
673 break;
674 }
675 }
676 }
677
678 /**
679 * An internal kubernetes node listener.
680 * The notification is triggered by KubernetesNodeStore.
681 */
682 private class InternalK8sNodeListener implements K8sNodeListener {
683
684 private boolean isRelevantHelper() {
685 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
686 }
687
688 @Override
689 public void event(K8sNodeEvent event) {
690 switch (event.type()) {
691 case K8S_NODE_CREATED:
692 case K8S_NODE_UPDATED:
693 eventExecutor.execute(() -> {
694
695 if (!isRelevantHelper()) {
696 return;
697 }
698
699 bootstrapNode(event.subject());
700 });
701 break;
702 case K8S_NODE_REMOVED:
703 eventExecutor.execute(() -> {
704
705 if (!isRelevantHelper()) {
706 return;
707 }
708
709 processK8sNodeRemoved(event.subject());
710 });
711 break;
712 case K8S_NODE_INCOMPLETE:
713 default:
714 break;
715 }
716 }
717 }
718}