blob: a64b7722b5820b3ca36c88c10a35cc846cc7093d [file] [log] [blame]
Jian Lif16e8852019-01-22 22:55:31 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import org.onlab.util.Tools;
19import org.onosproject.cfg.ComponentConfigService;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.LeadershipService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.k8snode.api.K8sNode;
26import org.onosproject.k8snode.api.K8sNodeAdminService;
27import org.onosproject.k8snode.api.K8sNodeEvent;
28import org.onosproject.k8snode.api.K8sNodeHandler;
29import org.onosproject.k8snode.api.K8sNodeListener;
30import org.onosproject.k8snode.api.K8sNodeService;
31import org.onosproject.k8snode.api.K8sNodeState;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.Port;
35import org.onosproject.net.behaviour.BridgeConfig;
36import org.onosproject.net.behaviour.BridgeDescription;
37import org.onosproject.net.behaviour.ControllerInfo;
38import org.onosproject.net.behaviour.DefaultBridgeDescription;
Jian Libf562c22019-04-15 18:07:14 +090039import org.onosproject.net.behaviour.DefaultPatchDescription;
Jian Lif16e8852019-01-22 22:55:31 +090040import org.onosproject.net.behaviour.DefaultTunnelDescription;
41import org.onosproject.net.behaviour.InterfaceConfig;
Jian Libf562c22019-04-15 18:07:14 +090042import org.onosproject.net.behaviour.PatchDescription;
Jian Lif16e8852019-01-22 22:55:31 +090043import org.onosproject.net.behaviour.TunnelDescription;
44import org.onosproject.net.behaviour.TunnelEndPoints;
45import org.onosproject.net.behaviour.TunnelKeys;
46import org.onosproject.net.device.DeviceAdminService;
47import org.onosproject.net.device.DeviceEvent;
48import org.onosproject.net.device.DeviceListener;
49import org.onosproject.net.device.DeviceService;
50import org.onosproject.ovsdb.controller.OvsdbClientService;
51import org.onosproject.ovsdb.controller.OvsdbController;
52import org.osgi.service.component.ComponentContext;
53import org.osgi.service.component.annotations.Activate;
54import org.osgi.service.component.annotations.Component;
55import org.osgi.service.component.annotations.Deactivate;
56import org.osgi.service.component.annotations.Modified;
57import org.osgi.service.component.annotations.Reference;
58import org.osgi.service.component.annotations.ReferenceCardinality;
59import org.slf4j.Logger;
60
61import java.util.Dictionary;
62import java.util.List;
63import java.util.Objects;
64import java.util.concurrent.ExecutorService;
65import java.util.stream.Collectors;
66
67import static java.util.concurrent.Executors.newSingleThreadExecutor;
68import static org.onlab.packet.TpPort.tpPort;
69import static org.onlab.util.Tools.groupedThreads;
Jian Libf562c22019-04-15 18:07:14 +090070import static org.onosproject.k8snode.api.Constants.EXTERNAL_BRIDGE;
Jian Lif16e8852019-01-22 22:55:31 +090071import static org.onosproject.k8snode.api.Constants.GENEVE;
72import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
73import static org.onosproject.k8snode.api.Constants.GRE;
74import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
75import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
Jian Libf562c22019-04-15 18:07:14 +090076import static org.onosproject.k8snode.api.Constants.INTEGRATION_TO_EXTERNAL_BRIDGE;
Jian Li1a2eb5d2019-08-27 02:07:05 +090077import static org.onosproject.k8snode.api.Constants.INTEGRATION_TO_LOCAL_BRIDGE;
78import static org.onosproject.k8snode.api.Constants.LOCAL_BRIDGE;
79import static org.onosproject.k8snode.api.Constants.LOCAL_TO_INTEGRATION_BRIDGE;
Jian Libf562c22019-04-15 18:07:14 +090080import static org.onosproject.k8snode.api.Constants.PHYSICAL_EXTERNAL_BRIDGE;
Jian Lif16e8852019-01-22 22:55:31 +090081import static org.onosproject.k8snode.api.Constants.VXLAN;
82import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
83import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
84import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
85import static org.onosproject.k8snode.api.K8sNodeState.DEVICE_CREATED;
86import static org.onosproject.k8snode.api.K8sNodeState.INCOMPLETE;
87import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
88import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
89import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
90import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
91import static org.onosproject.k8snode.util.K8sNodeUtil.getBooleanProperty;
92import static org.onosproject.k8snode.util.K8sNodeUtil.getOvsdbClient;
93import static org.onosproject.k8snode.util.K8sNodeUtil.isOvsdbConnected;
94import static org.onosproject.net.AnnotationKeys.PORT_NAME;
95import static org.slf4j.LoggerFactory.getLogger;
96
97/**
98 * Service bootstraps kubernetes node based on its type.
99 */
100@Component(immediate = true,
101 property = {
102 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
103 AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
104 }
105)
106public class DefaultK8sNodeHandler implements K8sNodeHandler {
107
108 private final Logger log = getLogger(getClass());
109
110 private static final String DEFAULT_OF_PROTO = "tcp";
111 private static final int DEFAULT_OFPORT = 6653;
112 private static final int DPID_BEGIN = 3;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected CoreService coreService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected LeadershipService leadershipService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected ClusterService clusterService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
124 protected DeviceService deviceService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected DeviceAdminService deviceAdminService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
130 protected OvsdbController ovsdbController;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
133 protected K8sNodeService k8sNodeService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
136 protected K8sNodeAdminService k8sNodeAdminService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
139 protected ComponentConfigService componentConfigService;
140
141 /** OVSDB server listen port. */
142 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
143
144 /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
145 private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
146
147 private final ExecutorService eventExecutor = newSingleThreadExecutor(
148 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
149
150 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
151 private final DeviceListener bridgeListener = new InternalBridgeListener();
152 private final K8sNodeListener k8sNodeListener = new InternalK8sNodeListener();
153
154 private ApplicationId appId;
155 private NodeId localNode;
156
157 @Activate
158 protected void activate() {
159 appId = coreService.getAppId(APP_ID);
160 localNode = clusterService.getLocalNode().id();
161
162 componentConfigService.registerProperties(getClass());
163 leadershipService.runForLeadership(appId.name());
164 deviceService.addListener(ovsdbListener);
165 deviceService.addListener(bridgeListener);
166 k8sNodeService.addListener(k8sNodeListener);
167
168 log.info("Started");
169 }
170
171 @Deactivate
172 protected void deactivate() {
173 k8sNodeService.removeListener(k8sNodeListener);
174 deviceService.removeListener(bridgeListener);
175 deviceService.removeListener(ovsdbListener);
176 componentConfigService.unregisterProperties(getClass(), false);
177 leadershipService.withdraw(appId.name());
178 eventExecutor.shutdown();
179
180 log.info("Stopped");
181 }
182
183 @Modified
184 protected void modified(ComponentContext context) {
185 readComponentConfiguration(context);
186
187 log.info("Modified");
188 }
189
190 @Override
191 public void processInitState(K8sNode k8sNode) {
192 if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
193 ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
194 return;
195 }
196 if (!deviceService.isAvailable(k8sNode.intgBridge())) {
197 createBridge(k8sNode, INTEGRATION_BRIDGE, k8sNode.intgBridge());
198 }
Jian Libf562c22019-04-15 18:07:14 +0900199 if (!deviceService.isAvailable(k8sNode.extBridge())) {
200 createBridge(k8sNode, EXTERNAL_BRIDGE, k8sNode.extBridge());
201 }
Jian Li1a2eb5d2019-08-27 02:07:05 +0900202 if (!deviceService.isAvailable(k8sNode.localBridge())) {
203 createBridge(k8sNode, LOCAL_BRIDGE, k8sNode.localBridge());
204 }
Jian Lif16e8852019-01-22 22:55:31 +0900205 }
206
207 @Override
208 public void processDeviceCreatedState(K8sNode k8sNode) {
209 try {
210 if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
211 ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
212 return;
213 }
214
Jian Libf562c22019-04-15 18:07:14 +0900215 // create patch ports between integration and external bridges
216 createPatchInterfaces(k8sNode);
217
Jian Lif16e8852019-01-22 22:55:31 +0900218 if (k8sNode.dataIp() != null &&
219 !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
220 createVxlanTunnelInterface(k8sNode);
221 }
222
223 if (k8sNode.dataIp() != null &&
224 !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
225 createGreTunnelInterface(k8sNode);
226 }
227
228 if (k8sNode.dataIp() != null &&
229 !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
230 createGeneveTunnelInterface(k8sNode);
231 }
232 } catch (Exception e) {
233 log.error("Exception occurred because of {}", e);
234 }
235 }
236
237 @Override
238 public void processCompleteState(K8sNode k8sNode) {
239 // do something if needed
240 }
241
242 @Override
243 public void processIncompleteState(K8sNode k8sNode) {
244 // do something if needed
245 }
246
Jian Li0ee8d0e2019-12-18 11:35:05 +0900247 @Override
248 public void processPreOnBoardState(K8sNode k8sNode) {
249 processInitState(k8sNode);
250 processDeviceCreatedState(k8sNode);
251 }
252
253 @Override
254 public void processOnBoardedState(K8sNode k8sNode) {
255 // do something if needed
256 }
257
Jian Li3b640af2020-01-02 23:57:13 +0900258 @Override
259 public void processPostOnBoardState(K8sNode k8sNode) {
260 // do something if needed
261 }
262
Jian Lif16e8852019-01-22 22:55:31 +0900263 /**
264 * Extracts properties from the component configuration context.
265 *
266 * @param context the component context
267 */
268 private void readComponentConfiguration(ComponentContext context) {
269 Dictionary<?, ?> properties = context.getProperties();
270
271 Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
272 if (ovsdbPortConfigured == null) {
273 ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
274 log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
275 } else {
276 ovsdbPortNum = ovsdbPortConfigured;
277 log.info("Configured. OVSDB port is {}", ovsdbPortNum);
278 }
279
280 Boolean autoRecoveryConfigured =
281 getBooleanProperty(properties, AUTO_RECOVERY);
282 if (autoRecoveryConfigured == null) {
283 autoRecovery = AUTO_RECOVERY_DEFAULT;
284 log.info("Auto recovery flag is NOT " +
285 "configured, default value is {}", autoRecovery);
286 } else {
287 autoRecovery = autoRecoveryConfigured;
288 log.info("Configured. Auto recovery flag is {}", autoRecovery);
289 }
290 }
291
292 /**
293 * Creates a bridge with a given name on a given kubernetes node.
294 *
295 * @param k8sNode kubernetes node
296 * @param bridgeName bridge name
297 * @param devId device identifier
298 */
299 private void createBridge(K8sNode k8sNode, String bridgeName, DeviceId devId) {
300 Device device = deviceService.getDevice(k8sNode.ovsdb());
301
302 List<ControllerInfo> controllers = clusterService.getNodes().stream()
303 .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
304 .collect(Collectors.toList());
305
306 String dpid = devId.toString().substring(DPID_BEGIN);
307
308 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
309 .name(bridgeName)
310 .failMode(BridgeDescription.FailMode.SECURE)
311 .datapathId(dpid)
312 .disableInBand()
313 .controllers(controllers);
314
315 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
316 bridgeConfig.addBridge(builder.build());
317 }
318
319 /**
320 * Creates a VXLAN tunnel interface in a given kubernetes node.
321 *
322 * @param k8sNode kubernetes node
323 */
324 private void createVxlanTunnelInterface(K8sNode k8sNode) {
325 createTunnelInterface(k8sNode, VXLAN, VXLAN_TUNNEL);
326 }
327
328 /**
329 * Creates a GRE tunnel interface in a given kubernetes node.
330 *
331 * @param k8sNode kubernetes node
332 */
333 private void createGreTunnelInterface(K8sNode k8sNode) {
334 createTunnelInterface(k8sNode, GRE, GRE_TUNNEL);
335 }
336
337 /**
338 * Creates a GENEVE tunnel interface in a given kubernetes node.
339 *
340 * @param k8sNode kubernetes node
341 */
342 private void createGeneveTunnelInterface(K8sNode k8sNode) {
343 createTunnelInterface(k8sNode, GENEVE, GENEVE_TUNNEL);
344 }
345
Jian Libf562c22019-04-15 18:07:14 +0900346 private void createPatchInterfaces(K8sNode k8sNode) {
347 Device device = deviceService.getDevice(k8sNode.ovsdb());
348 if (device == null || !device.is(InterfaceConfig.class)) {
349 log.error("Failed to create patch interface on {}", k8sNode.ovsdb());
350 return;
351 }
352
Jian Li1a2eb5d2019-08-27 02:07:05 +0900353 // integration bridge -> external bridge
354 PatchDescription brIntExtPatchDesc =
Jian Libf562c22019-04-15 18:07:14 +0900355 DefaultPatchDescription.builder()
356 .deviceId(INTEGRATION_BRIDGE)
357 .ifaceName(INTEGRATION_TO_EXTERNAL_BRIDGE)
358 .peer(PHYSICAL_EXTERNAL_BRIDGE)
359 .build();
360
Jian Li1a2eb5d2019-08-27 02:07:05 +0900361 // external bridge -> integration bridge
362 PatchDescription brExtIntPatchDesc =
Jian Libf562c22019-04-15 18:07:14 +0900363 DefaultPatchDescription.builder()
364 .deviceId(EXTERNAL_BRIDGE)
365 .ifaceName(PHYSICAL_EXTERNAL_BRIDGE)
366 .peer(INTEGRATION_TO_EXTERNAL_BRIDGE)
367 .build();
368
Jian Li1a2eb5d2019-08-27 02:07:05 +0900369 // integration bridge -> local bridge
370 PatchDescription brIntLocalPatchDesc =
371 DefaultPatchDescription.builder()
372 .deviceId(INTEGRATION_BRIDGE)
373 .ifaceName(INTEGRATION_TO_LOCAL_BRIDGE)
374 .peer(LOCAL_TO_INTEGRATION_BRIDGE)
375 .build();
376
377 // local bridge -> integration bridge
378 PatchDescription brLocalIntPatchDesc =
379 DefaultPatchDescription.builder()
380 .deviceId(LOCAL_BRIDGE)
381 .ifaceName(LOCAL_TO_INTEGRATION_BRIDGE)
382 .peer(INTEGRATION_TO_LOCAL_BRIDGE)
383 .build();
384
Jian Libf562c22019-04-15 18:07:14 +0900385 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
Jian Li1a2eb5d2019-08-27 02:07:05 +0900386 ifaceConfig.addPatchMode(INTEGRATION_TO_EXTERNAL_BRIDGE, brIntExtPatchDesc);
387 ifaceConfig.addPatchMode(PHYSICAL_EXTERNAL_BRIDGE, brExtIntPatchDesc);
388 ifaceConfig.addPatchMode(INTEGRATION_TO_LOCAL_BRIDGE, brIntLocalPatchDesc);
389 ifaceConfig.addPatchMode(LOCAL_TO_INTEGRATION_BRIDGE, brLocalIntPatchDesc);
Jian Libf562c22019-04-15 18:07:14 +0900390 }
391
Jian Lif16e8852019-01-22 22:55:31 +0900392 /**
393 * Creates a tunnel interface in a given kubernetes node.
394 *
395 * @param k8sNode kubernetes node
396 */
397 private void createTunnelInterface(K8sNode k8sNode,
398 String type, String intfName) {
399 if (isIntfEnabled(k8sNode, intfName)) {
400 return;
401 }
402
403 Device device = deviceService.getDevice(k8sNode.ovsdb());
404 if (device == null || !device.is(InterfaceConfig.class)) {
405 log.error("Failed to create tunnel interface on {}", k8sNode.ovsdb());
406 return;
407 }
408
409 TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
410
411 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
412 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
413 }
414
415 /**
416 * Builds tunnel description according to the network type.
417 *
418 * @param type network type
419 * @return tunnel description
420 */
421 private TunnelDescription buildTunnelDesc(String type, String intfName) {
422 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
423 TunnelDescription.Builder tdBuilder =
424 DefaultTunnelDescription.builder()
425 .deviceId(INTEGRATION_BRIDGE)
426 .ifaceName(intfName)
427 .remote(TunnelEndPoints.flowTunnelEndpoint())
428 .key(TunnelKeys.flowTunnelKey());
429
430 switch (type) {
431 case VXLAN:
432 tdBuilder.type(TunnelDescription.Type.VXLAN);
433 break;
434 case GRE:
435 tdBuilder.type(TunnelDescription.Type.GRE);
436 break;
437 case GENEVE:
438 tdBuilder.type(TunnelDescription.Type.GENEVE);
439 break;
440 default:
441 return null;
442 }
443
444 return tdBuilder.build();
445 }
446 return null;
447 }
448
449 /**
450 * Checks whether a given network interface in a given kubernetes node
451 * is enabled or not.
452 *
453 * @param k8sNode kubernetes node
454 * @param intf network interface name
455 * @return true if the given interface is enabled, false otherwise
456 */
457 private boolean isIntfEnabled(K8sNode k8sNode, String intf) {
458 return deviceService.isAvailable(k8sNode.intgBridge()) &&
459 deviceService.getPorts(k8sNode.intgBridge()).stream()
460 .anyMatch(port -> Objects.equals(
461 port.annotations().value(PORT_NAME), intf) &&
462 port.isEnabled());
463 }
464
465 /**
466 * Checks whether all requirements for this state are fulfilled or not.
467 *
468 * @param k8sNode kubernetes node
469 * @return true if all requirements are fulfilled, false otherwise
470 */
471 private boolean isCurrentStateDone(K8sNode k8sNode) {
472 switch (k8sNode.state()) {
473 case INIT:
Jian Li0ee8d0e2019-12-18 11:35:05 +0900474 return isInitStateDone(k8sNode);
Jian Lif16e8852019-01-22 22:55:31 +0900475 case DEVICE_CREATED:
Jian Li0ee8d0e2019-12-18 11:35:05 +0900476 return isDeviceCreatedStateDone(k8sNode);
477 case PRE_ON_BOARD:
478 return isInitStateDone(k8sNode) && isDeviceCreatedStateDone(k8sNode);
Jian Lif16e8852019-01-22 22:55:31 +0900479 case COMPLETE:
480 case INCOMPLETE:
Jian Li0ee8d0e2019-12-18 11:35:05 +0900481 case ON_BOARDED:
Jian Lif16e8852019-01-22 22:55:31 +0900482 // always return false
483 // run init CLI to re-trigger node bootstrap
484 return false;
485 default:
486 return true;
487 }
488 }
489
Jian Li0ee8d0e2019-12-18 11:35:05 +0900490 private boolean isInitStateDone(K8sNode k8sNode) {
491 if (!isOvsdbConnected(k8sNode, ovsdbPortNum,
492 ovsdbController, deviceService)) {
493 return false;
494 }
495
496 return k8sNode.intgBridge() != null && k8sNode.extBridge() != null &&
497 deviceService.isAvailable(k8sNode.intgBridge()) &&
498 deviceService.isAvailable(k8sNode.extBridge()) &&
499 deviceService.isAvailable(k8sNode.localBridge());
500 }
501
502 private boolean isDeviceCreatedStateDone(K8sNode k8sNode) {
503 if (k8sNode.dataIp() != null &&
504 !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
505 return false;
506 }
507 if (k8sNode.dataIp() != null &&
508 !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
509 return false;
510 }
511 if (k8sNode.dataIp() != null &&
512 !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
513 return false;
514 }
515
516 return true;
517 }
518
Jian Lif16e8852019-01-22 22:55:31 +0900519 /**
520 * Configures the kubernetes node with new state.
521 *
522 * @param k8sNode kubernetes node
523 * @param newState a new state
524 */
525 private void setState(K8sNode k8sNode, K8sNodeState newState) {
526 if (k8sNode.state() == newState) {
527 return;
528 }
529 K8sNode updated = k8sNode.updateState(newState);
530 k8sNodeAdminService.updateNode(updated);
531 log.info("Changed {} state: {}", k8sNode.hostname(), newState);
532 }
533
534 /**
535 * Bootstraps a new kubernetes node.
536 *
537 * @param k8sNode kubernetes node
538 */
539 private void bootstrapNode(K8sNode k8sNode) {
540 if (isCurrentStateDone(k8sNode)) {
541 setState(k8sNode, k8sNode.state().nextState());
542 } else {
543 log.trace("Processing {} state for {}", k8sNode.state(),
544 k8sNode.hostname());
545 k8sNode.state().process(this, k8sNode);
546 }
547 }
548
549 private void processK8sNodeRemoved(K8sNode k8sNode) {
550 OvsdbClientService client = getOvsdbClient(k8sNode, ovsdbPortNum, ovsdbController);
551 if (client == null) {
552 log.info("Failed to get ovsdb client");
553 return;
554 }
555
556 // delete integration bridge from the node
557 client.dropBridge(INTEGRATION_BRIDGE);
558
Jian Libf562c22019-04-15 18:07:14 +0900559 // delete external bridge from the node
560 client.dropBridge(EXTERNAL_BRIDGE);
561
Jian Li1a2eb5d2019-08-27 02:07:05 +0900562 // delete local bridge from the node
563 client.dropBridge(LOCAL_BRIDGE);
564
Jian Lif16e8852019-01-22 22:55:31 +0900565 // disconnect ovsdb
566 client.disconnect();
567 }
568
569 /**
570 * An internal OVSDB listener. This listener is used for listening the
571 * network facing events from OVSDB device. If a new OVSDB device is detected,
572 * ONOS tries to bootstrap the kubernetes node.
573 */
574 private class InternalOvsdbListener implements DeviceListener {
575
576 @Override
577 public boolean isRelevant(DeviceEvent event) {
578 return event.subject().type() == Device.Type.CONTROLLER;
579 }
580
581 private boolean isRelevantHelper() {
582 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
583 }
584
585 @Override
586 public void event(DeviceEvent event) {
587 Device device = event.subject();
588
589 switch (event.type()) {
590 case DEVICE_AVAILABILITY_CHANGED:
591 case DEVICE_ADDED:
592 eventExecutor.execute(() -> {
593
594 if (!isRelevantHelper()) {
595 return;
596 }
597
598 K8sNode k8sNode = k8sNodeService.node(device.id());
599
600 if (k8sNode == null) {
601 return;
602 }
603
604 if (deviceService.isAvailable(device.id())) {
605 log.debug("OVSDB {} detected", device.id());
606 bootstrapNode(k8sNode);
607 }
608 });
609 break;
610 case PORT_ADDED:
611 case PORT_REMOVED:
612 case DEVICE_REMOVED:
613 default:
614 // do nothing
615 break;
616 }
617 }
618 }
619
620 /**
621 * An internal integration bridge listener. This listener is used for
622 * listening the events from integration bridge. To listen the events from
623 * other types of bridge such as provider bridge or tunnel bridge, we need
624 * to augment K8sNodeService.node() method.
625 */
626 private class InternalBridgeListener implements DeviceListener {
627
628 @Override
629 public boolean isRelevant(DeviceEvent event) {
630 return event.subject().type() == Device.Type.SWITCH;
631 }
632
633 private boolean isRelevantHelper() {
634 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
635 }
636
637 @Override
638 public void event(DeviceEvent event) {
639 Device device = event.subject();
640
641 switch (event.type()) {
642 case DEVICE_AVAILABILITY_CHANGED:
643 case DEVICE_ADDED:
644 eventExecutor.execute(() -> {
645
646 if (!isRelevantHelper()) {
647 return;
648 }
649
650 K8sNode k8sNode = k8sNodeService.node(device.id());
651
652 if (k8sNode == null) {
653 return;
654 }
655
Jian Libf562c22019-04-15 18:07:14 +0900656 // TODO: also need to check the external bridge's availability
Jian Li1a2eb5d2019-08-27 02:07:05 +0900657 // TODO: also need to check the local bridge's availability
Jian Lif16e8852019-01-22 22:55:31 +0900658 if (deviceService.isAvailable(device.id())) {
Jian Libf562c22019-04-15 18:07:14 +0900659 log.debug("Integration bridge created on {}",
660 k8sNode.hostname());
Jian Lif16e8852019-01-22 22:55:31 +0900661 bootstrapNode(k8sNode);
662 } else if (k8sNode.state() == COMPLETE) {
663 log.info("Device {} disconnected", device.id());
664 setState(k8sNode, INCOMPLETE);
665 }
666
667 if (autoRecovery) {
668 if (k8sNode.state() == INCOMPLETE ||
669 k8sNode.state() == DEVICE_CREATED) {
670 log.info("Device {} is reconnected", device.id());
671 k8sNodeAdminService.updateNode(
672 k8sNode.updateState(K8sNodeState.INIT));
673 }
674 }
675 });
676 break;
677 case PORT_UPDATED:
678 case PORT_ADDED:
679 eventExecutor.execute(() -> {
680
681 if (!isRelevantHelper()) {
682 return;
683 }
684
685 K8sNode k8sNode = k8sNodeService.node(device.id());
686
687 if (k8sNode == null) {
688 return;
689 }
690
691 Port port = event.port();
692 String portName = port.annotations().value(PORT_NAME);
693 if (k8sNode.state() == DEVICE_CREATED && (
694 Objects.equals(portName, VXLAN_TUNNEL) ||
695 Objects.equals(portName, GRE_TUNNEL) ||
696 Objects.equals(portName, GENEVE_TUNNEL))) {
697 log.info("Interface {} added or updated to {}",
698 portName, device.id());
699 bootstrapNode(k8sNode);
700 }
701 });
702 break;
703 case PORT_REMOVED:
704 eventExecutor.execute(() -> {
705
706 if (!isRelevantHelper()) {
707 return;
708 }
709
710 K8sNode k8sNode = k8sNodeService.node(device.id());
711
712 if (k8sNode == null) {
713 return;
714 }
715
716 Port port = event.port();
717 String portName = port.annotations().value(PORT_NAME);
718 if (k8sNode.state() == COMPLETE && (
719 Objects.equals(portName, VXLAN_TUNNEL) ||
720 Objects.equals(portName, GRE_TUNNEL) ||
721 Objects.equals(portName, GENEVE_TUNNEL))) {
722 log.warn("Interface {} removed from {}",
723 portName, event.subject().id());
724 setState(k8sNode, INCOMPLETE);
725 }
726 });
727 break;
728 case DEVICE_REMOVED:
729 default:
730 // do nothing
731 break;
732 }
733 }
734 }
735
736 /**
737 * An internal kubernetes node listener.
738 * The notification is triggered by KubernetesNodeStore.
739 */
740 private class InternalK8sNodeListener implements K8sNodeListener {
741
742 private boolean isRelevantHelper() {
743 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
744 }
745
746 @Override
747 public void event(K8sNodeEvent event) {
748 switch (event.type()) {
749 case K8S_NODE_CREATED:
750 case K8S_NODE_UPDATED:
751 eventExecutor.execute(() -> {
752
753 if (!isRelevantHelper()) {
754 return;
755 }
756
757 bootstrapNode(event.subject());
758 });
759 break;
760 case K8S_NODE_REMOVED:
761 eventExecutor.execute(() -> {
762
763 if (!isRelevantHelper()) {
764 return;
765 }
766
767 processK8sNodeRemoved(event.subject());
768 });
769 break;
770 case K8S_NODE_INCOMPLETE:
771 default:
772 break;
773 }
774 }
775 }
776}