blob: 6fb20b78dd80d91e0d8d78ae2535f391336f0146 [file] [log] [blame]
Jian Lif16e8852019-01-22 22:55:31 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import org.onlab.util.Tools;
19import org.onosproject.cfg.ComponentConfigService;
20import org.onosproject.cluster.ClusterService;
21import org.onosproject.cluster.LeadershipService;
22import org.onosproject.cluster.NodeId;
23import org.onosproject.core.ApplicationId;
24import org.onosproject.core.CoreService;
25import org.onosproject.k8snode.api.K8sNode;
26import org.onosproject.k8snode.api.K8sNodeAdminService;
27import org.onosproject.k8snode.api.K8sNodeEvent;
28import org.onosproject.k8snode.api.K8sNodeHandler;
29import org.onosproject.k8snode.api.K8sNodeListener;
30import org.onosproject.k8snode.api.K8sNodeService;
31import org.onosproject.k8snode.api.K8sNodeState;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.Port;
35import org.onosproject.net.behaviour.BridgeConfig;
36import org.onosproject.net.behaviour.BridgeDescription;
37import org.onosproject.net.behaviour.ControllerInfo;
38import org.onosproject.net.behaviour.DefaultBridgeDescription;
39import org.onosproject.net.behaviour.DefaultTunnelDescription;
40import org.onosproject.net.behaviour.InterfaceConfig;
41import org.onosproject.net.behaviour.TunnelDescription;
42import org.onosproject.net.behaviour.TunnelEndPoints;
43import org.onosproject.net.behaviour.TunnelKeys;
44import org.onosproject.net.device.DeviceAdminService;
45import org.onosproject.net.device.DeviceEvent;
46import org.onosproject.net.device.DeviceListener;
47import org.onosproject.net.device.DeviceService;
48import org.onosproject.ovsdb.controller.OvsdbClientService;
49import org.onosproject.ovsdb.controller.OvsdbController;
50import org.osgi.service.component.ComponentContext;
51import org.osgi.service.component.annotations.Activate;
52import org.osgi.service.component.annotations.Component;
53import org.osgi.service.component.annotations.Deactivate;
54import org.osgi.service.component.annotations.Modified;
55import org.osgi.service.component.annotations.Reference;
56import org.osgi.service.component.annotations.ReferenceCardinality;
57import org.slf4j.Logger;
58
59import java.util.Dictionary;
60import java.util.List;
61import java.util.Objects;
62import java.util.concurrent.ExecutorService;
63import java.util.stream.Collectors;
64
65import static java.util.concurrent.Executors.newSingleThreadExecutor;
66import static org.onlab.packet.TpPort.tpPort;
67import static org.onlab.util.Tools.groupedThreads;
68import static org.onosproject.k8snode.api.Constants.GENEVE;
69import static org.onosproject.k8snode.api.Constants.GENEVE_TUNNEL;
70import static org.onosproject.k8snode.api.Constants.GRE;
71import static org.onosproject.k8snode.api.Constants.GRE_TUNNEL;
72import static org.onosproject.k8snode.api.Constants.INTEGRATION_BRIDGE;
73import static org.onosproject.k8snode.api.Constants.VXLAN;
74import static org.onosproject.k8snode.api.Constants.VXLAN_TUNNEL;
75import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
76import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
77import static org.onosproject.k8snode.api.K8sNodeState.DEVICE_CREATED;
78import static org.onosproject.k8snode.api.K8sNodeState.INCOMPLETE;
79import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY;
80import static org.onosproject.k8snode.impl.OsgiPropertyConstants.AUTO_RECOVERY_DEFAULT;
81import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT;
82import static org.onosproject.k8snode.impl.OsgiPropertyConstants.OVSDB_PORT_NUM_DEFAULT;
83import static org.onosproject.k8snode.util.K8sNodeUtil.getBooleanProperty;
84import static org.onosproject.k8snode.util.K8sNodeUtil.getOvsdbClient;
85import static org.onosproject.k8snode.util.K8sNodeUtil.isOvsdbConnected;
86import static org.onosproject.net.AnnotationKeys.PORT_NAME;
87import static org.slf4j.LoggerFactory.getLogger;
88
89/**
90 * Service bootstraps kubernetes node based on its type.
91 */
92@Component(immediate = true,
93 property = {
94 OVSDB_PORT + ":Integer=" + OVSDB_PORT_NUM_DEFAULT,
95 AUTO_RECOVERY + ":Boolean=" + AUTO_RECOVERY_DEFAULT
96 }
97)
98public class DefaultK8sNodeHandler implements K8sNodeHandler {
99
100 private final Logger log = getLogger(getClass());
101
102 private static final String DEFAULT_OF_PROTO = "tcp";
103 private static final int DEFAULT_OFPORT = 6653;
104 private static final int DPID_BEGIN = 3;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected CoreService coreService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected LeadershipService leadershipService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected ClusterService clusterService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY)
116 protected DeviceService deviceService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY)
119 protected DeviceAdminService deviceAdminService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY)
122 protected OvsdbController ovsdbController;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
125 protected K8sNodeService k8sNodeService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
128 protected K8sNodeAdminService k8sNodeAdminService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
131 protected ComponentConfigService componentConfigService;
132
133 /** OVSDB server listen port. */
134 private int ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
135
136 /** Indicates whether auto-recover kubernetes node status on switch re-conn event. */
137 private boolean autoRecovery = AUTO_RECOVERY_DEFAULT;
138
139 private final ExecutorService eventExecutor = newSingleThreadExecutor(
140 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
141
142 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
143 private final DeviceListener bridgeListener = new InternalBridgeListener();
144 private final K8sNodeListener k8sNodeListener = new InternalK8sNodeListener();
145
146 private ApplicationId appId;
147 private NodeId localNode;
148
149 @Activate
150 protected void activate() {
151 appId = coreService.getAppId(APP_ID);
152 localNode = clusterService.getLocalNode().id();
153
154 componentConfigService.registerProperties(getClass());
155 leadershipService.runForLeadership(appId.name());
156 deviceService.addListener(ovsdbListener);
157 deviceService.addListener(bridgeListener);
158 k8sNodeService.addListener(k8sNodeListener);
159
160 log.info("Started");
161 }
162
163 @Deactivate
164 protected void deactivate() {
165 k8sNodeService.removeListener(k8sNodeListener);
166 deviceService.removeListener(bridgeListener);
167 deviceService.removeListener(ovsdbListener);
168 componentConfigService.unregisterProperties(getClass(), false);
169 leadershipService.withdraw(appId.name());
170 eventExecutor.shutdown();
171
172 log.info("Stopped");
173 }
174
175 @Modified
176 protected void modified(ComponentContext context) {
177 readComponentConfiguration(context);
178
179 log.info("Modified");
180 }
181
182 @Override
183 public void processInitState(K8sNode k8sNode) {
184 if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
185 ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
186 return;
187 }
188 if (!deviceService.isAvailable(k8sNode.intgBridge())) {
189 createBridge(k8sNode, INTEGRATION_BRIDGE, k8sNode.intgBridge());
190 }
191 }
192
193 @Override
194 public void processDeviceCreatedState(K8sNode k8sNode) {
195 try {
196 if (!isOvsdbConnected(k8sNode, ovsdbPortNum, ovsdbController, deviceService)) {
197 ovsdbController.connect(k8sNode.managementIp(), tpPort(ovsdbPortNum));
198 return;
199 }
200
201 if (k8sNode.dataIp() != null &&
202 !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
203 createVxlanTunnelInterface(k8sNode);
204 }
205
206 if (k8sNode.dataIp() != null &&
207 !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
208 createGreTunnelInterface(k8sNode);
209 }
210
211 if (k8sNode.dataIp() != null &&
212 !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
213 createGeneveTunnelInterface(k8sNode);
214 }
215 } catch (Exception e) {
216 log.error("Exception occurred because of {}", e);
217 }
218 }
219
220 @Override
221 public void processCompleteState(K8sNode k8sNode) {
222 // do something if needed
223 }
224
225 @Override
226 public void processIncompleteState(K8sNode k8sNode) {
227 // do something if needed
228 }
229
230 /**
231 * Extracts properties from the component configuration context.
232 *
233 * @param context the component context
234 */
235 private void readComponentConfiguration(ComponentContext context) {
236 Dictionary<?, ?> properties = context.getProperties();
237
238 Integer ovsdbPortConfigured = Tools.getIntegerProperty(properties, OVSDB_PORT);
239 if (ovsdbPortConfigured == null) {
240 ovsdbPortNum = OVSDB_PORT_NUM_DEFAULT;
241 log.info("OVSDB port is NOT configured, default value is {}", ovsdbPortNum);
242 } else {
243 ovsdbPortNum = ovsdbPortConfigured;
244 log.info("Configured. OVSDB port is {}", ovsdbPortNum);
245 }
246
247 Boolean autoRecoveryConfigured =
248 getBooleanProperty(properties, AUTO_RECOVERY);
249 if (autoRecoveryConfigured == null) {
250 autoRecovery = AUTO_RECOVERY_DEFAULT;
251 log.info("Auto recovery flag is NOT " +
252 "configured, default value is {}", autoRecovery);
253 } else {
254 autoRecovery = autoRecoveryConfigured;
255 log.info("Configured. Auto recovery flag is {}", autoRecovery);
256 }
257 }
258
259 /**
260 * Creates a bridge with a given name on a given kubernetes node.
261 *
262 * @param k8sNode kubernetes node
263 * @param bridgeName bridge name
264 * @param devId device identifier
265 */
266 private void createBridge(K8sNode k8sNode, String bridgeName, DeviceId devId) {
267 Device device = deviceService.getDevice(k8sNode.ovsdb());
268
269 List<ControllerInfo> controllers = clusterService.getNodes().stream()
270 .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
271 .collect(Collectors.toList());
272
273 String dpid = devId.toString().substring(DPID_BEGIN);
274
275 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
276 .name(bridgeName)
277 .failMode(BridgeDescription.FailMode.SECURE)
278 .datapathId(dpid)
279 .disableInBand()
280 .controllers(controllers);
281
282 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
283 bridgeConfig.addBridge(builder.build());
284 }
285
286 /**
287 * Creates a VXLAN tunnel interface in a given kubernetes node.
288 *
289 * @param k8sNode kubernetes node
290 */
291 private void createVxlanTunnelInterface(K8sNode k8sNode) {
292 createTunnelInterface(k8sNode, VXLAN, VXLAN_TUNNEL);
293 }
294
295 /**
296 * Creates a GRE tunnel interface in a given kubernetes node.
297 *
298 * @param k8sNode kubernetes node
299 */
300 private void createGreTunnelInterface(K8sNode k8sNode) {
301 createTunnelInterface(k8sNode, GRE, GRE_TUNNEL);
302 }
303
304 /**
305 * Creates a GENEVE tunnel interface in a given kubernetes node.
306 *
307 * @param k8sNode kubernetes node
308 */
309 private void createGeneveTunnelInterface(K8sNode k8sNode) {
310 createTunnelInterface(k8sNode, GENEVE, GENEVE_TUNNEL);
311 }
312
313 /**
314 * Creates a tunnel interface in a given kubernetes node.
315 *
316 * @param k8sNode kubernetes node
317 */
318 private void createTunnelInterface(K8sNode k8sNode,
319 String type, String intfName) {
320 if (isIntfEnabled(k8sNode, intfName)) {
321 return;
322 }
323
324 Device device = deviceService.getDevice(k8sNode.ovsdb());
325 if (device == null || !device.is(InterfaceConfig.class)) {
326 log.error("Failed to create tunnel interface on {}", k8sNode.ovsdb());
327 return;
328 }
329
330 TunnelDescription tunnelDesc = buildTunnelDesc(type, intfName);
331
332 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
333 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
334 }
335
336 /**
337 * Builds tunnel description according to the network type.
338 *
339 * @param type network type
340 * @return tunnel description
341 */
342 private TunnelDescription buildTunnelDesc(String type, String intfName) {
343 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
344 TunnelDescription.Builder tdBuilder =
345 DefaultTunnelDescription.builder()
346 .deviceId(INTEGRATION_BRIDGE)
347 .ifaceName(intfName)
348 .remote(TunnelEndPoints.flowTunnelEndpoint())
349 .key(TunnelKeys.flowTunnelKey());
350
351 switch (type) {
352 case VXLAN:
353 tdBuilder.type(TunnelDescription.Type.VXLAN);
354 break;
355 case GRE:
356 tdBuilder.type(TunnelDescription.Type.GRE);
357 break;
358 case GENEVE:
359 tdBuilder.type(TunnelDescription.Type.GENEVE);
360 break;
361 default:
362 return null;
363 }
364
365 return tdBuilder.build();
366 }
367 return null;
368 }
369
370 /**
371 * Checks whether a given network interface in a given kubernetes node
372 * is enabled or not.
373 *
374 * @param k8sNode kubernetes node
375 * @param intf network interface name
376 * @return true if the given interface is enabled, false otherwise
377 */
378 private boolean isIntfEnabled(K8sNode k8sNode, String intf) {
379 return deviceService.isAvailable(k8sNode.intgBridge()) &&
380 deviceService.getPorts(k8sNode.intgBridge()).stream()
381 .anyMatch(port -> Objects.equals(
382 port.annotations().value(PORT_NAME), intf) &&
383 port.isEnabled());
384 }
385
386 /**
387 * Checks whether all requirements for this state are fulfilled or not.
388 *
389 * @param k8sNode kubernetes node
390 * @return true if all requirements are fulfilled, false otherwise
391 */
392 private boolean isCurrentStateDone(K8sNode k8sNode) {
393 switch (k8sNode.state()) {
394 case INIT:
395 if (!isOvsdbConnected(k8sNode, ovsdbPortNum,
396 ovsdbController, deviceService)) {
397 return false;
398 }
399
400 return deviceService.isAvailable(k8sNode.intgBridge());
401 case DEVICE_CREATED:
402 if (k8sNode.dataIp() != null &&
403 !isIntfEnabled(k8sNode, VXLAN_TUNNEL)) {
404 return false;
405 }
406 if (k8sNode.dataIp() != null &&
407 !isIntfEnabled(k8sNode, GRE_TUNNEL)) {
408 return false;
409 }
410 if (k8sNode.dataIp() != null &&
411 !isIntfEnabled(k8sNode, GENEVE_TUNNEL)) {
412 return false;
413 }
414
415 return true;
416 case COMPLETE:
417 case INCOMPLETE:
418 // always return false
419 // run init CLI to re-trigger node bootstrap
420 return false;
421 default:
422 return true;
423 }
424 }
425
426 /**
427 * Configures the kubernetes node with new state.
428 *
429 * @param k8sNode kubernetes node
430 * @param newState a new state
431 */
432 private void setState(K8sNode k8sNode, K8sNodeState newState) {
433 if (k8sNode.state() == newState) {
434 return;
435 }
436 K8sNode updated = k8sNode.updateState(newState);
437 k8sNodeAdminService.updateNode(updated);
438 log.info("Changed {} state: {}", k8sNode.hostname(), newState);
439 }
440
441 /**
442 * Bootstraps a new kubernetes node.
443 *
444 * @param k8sNode kubernetes node
445 */
446 private void bootstrapNode(K8sNode k8sNode) {
447 if (isCurrentStateDone(k8sNode)) {
448 setState(k8sNode, k8sNode.state().nextState());
449 } else {
450 log.trace("Processing {} state for {}", k8sNode.state(),
451 k8sNode.hostname());
452 k8sNode.state().process(this, k8sNode);
453 }
454 }
455
456 private void processK8sNodeRemoved(K8sNode k8sNode) {
457 OvsdbClientService client = getOvsdbClient(k8sNode, ovsdbPortNum, ovsdbController);
458 if (client == null) {
459 log.info("Failed to get ovsdb client");
460 return;
461 }
462
463 // delete integration bridge from the node
464 client.dropBridge(INTEGRATION_BRIDGE);
465
466 // disconnect ovsdb
467 client.disconnect();
468 }
469
470 /**
471 * An internal OVSDB listener. This listener is used for listening the
472 * network facing events from OVSDB device. If a new OVSDB device is detected,
473 * ONOS tries to bootstrap the kubernetes node.
474 */
475 private class InternalOvsdbListener implements DeviceListener {
476
477 @Override
478 public boolean isRelevant(DeviceEvent event) {
479 return event.subject().type() == Device.Type.CONTROLLER;
480 }
481
482 private boolean isRelevantHelper() {
483 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
484 }
485
486 @Override
487 public void event(DeviceEvent event) {
488 Device device = event.subject();
489
490 switch (event.type()) {
491 case DEVICE_AVAILABILITY_CHANGED:
492 case DEVICE_ADDED:
493 eventExecutor.execute(() -> {
494
495 if (!isRelevantHelper()) {
496 return;
497 }
498
499 K8sNode k8sNode = k8sNodeService.node(device.id());
500
501 if (k8sNode == null) {
502 return;
503 }
504
505 if (deviceService.isAvailable(device.id())) {
506 log.debug("OVSDB {} detected", device.id());
507 bootstrapNode(k8sNode);
508 }
509 });
510 break;
511 case PORT_ADDED:
512 case PORT_REMOVED:
513 case DEVICE_REMOVED:
514 default:
515 // do nothing
516 break;
517 }
518 }
519 }
520
521 /**
522 * An internal integration bridge listener. This listener is used for
523 * listening the events from integration bridge. To listen the events from
524 * other types of bridge such as provider bridge or tunnel bridge, we need
525 * to augment K8sNodeService.node() method.
526 */
527 private class InternalBridgeListener implements DeviceListener {
528
529 @Override
530 public boolean isRelevant(DeviceEvent event) {
531 return event.subject().type() == Device.Type.SWITCH;
532 }
533
534 private boolean isRelevantHelper() {
535 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
536 }
537
538 @Override
539 public void event(DeviceEvent event) {
540 Device device = event.subject();
541
542 switch (event.type()) {
543 case DEVICE_AVAILABILITY_CHANGED:
544 case DEVICE_ADDED:
545 eventExecutor.execute(() -> {
546
547 if (!isRelevantHelper()) {
548 return;
549 }
550
551 K8sNode k8sNode = k8sNodeService.node(device.id());
552
553 if (k8sNode == null) {
554 return;
555 }
556
557 if (deviceService.isAvailable(device.id())) {
558 log.debug("Integration bridge created on {}", k8sNode.hostname());
559 bootstrapNode(k8sNode);
560 } else if (k8sNode.state() == COMPLETE) {
561 log.info("Device {} disconnected", device.id());
562 setState(k8sNode, INCOMPLETE);
563 }
564
565 if (autoRecovery) {
566 if (k8sNode.state() == INCOMPLETE ||
567 k8sNode.state() == DEVICE_CREATED) {
568 log.info("Device {} is reconnected", device.id());
569 k8sNodeAdminService.updateNode(
570 k8sNode.updateState(K8sNodeState.INIT));
571 }
572 }
573 });
574 break;
575 case PORT_UPDATED:
576 case PORT_ADDED:
577 eventExecutor.execute(() -> {
578
579 if (!isRelevantHelper()) {
580 return;
581 }
582
583 K8sNode k8sNode = k8sNodeService.node(device.id());
584
585 if (k8sNode == null) {
586 return;
587 }
588
589 Port port = event.port();
590 String portName = port.annotations().value(PORT_NAME);
591 if (k8sNode.state() == DEVICE_CREATED && (
592 Objects.equals(portName, VXLAN_TUNNEL) ||
593 Objects.equals(portName, GRE_TUNNEL) ||
594 Objects.equals(portName, GENEVE_TUNNEL))) {
595 log.info("Interface {} added or updated to {}",
596 portName, device.id());
597 bootstrapNode(k8sNode);
598 }
599 });
600 break;
601 case PORT_REMOVED:
602 eventExecutor.execute(() -> {
603
604 if (!isRelevantHelper()) {
605 return;
606 }
607
608 K8sNode k8sNode = k8sNodeService.node(device.id());
609
610 if (k8sNode == null) {
611 return;
612 }
613
614 Port port = event.port();
615 String portName = port.annotations().value(PORT_NAME);
616 if (k8sNode.state() == COMPLETE && (
617 Objects.equals(portName, VXLAN_TUNNEL) ||
618 Objects.equals(portName, GRE_TUNNEL) ||
619 Objects.equals(portName, GENEVE_TUNNEL))) {
620 log.warn("Interface {} removed from {}",
621 portName, event.subject().id());
622 setState(k8sNode, INCOMPLETE);
623 }
624 });
625 break;
626 case DEVICE_REMOVED:
627 default:
628 // do nothing
629 break;
630 }
631 }
632 }
633
634 /**
635 * An internal kubernetes node listener.
636 * The notification is triggered by KubernetesNodeStore.
637 */
638 private class InternalK8sNodeListener implements K8sNodeListener {
639
640 private boolean isRelevantHelper() {
641 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
642 }
643
644 @Override
645 public void event(K8sNodeEvent event) {
646 switch (event.type()) {
647 case K8S_NODE_CREATED:
648 case K8S_NODE_UPDATED:
649 eventExecutor.execute(() -> {
650
651 if (!isRelevantHelper()) {
652 return;
653 }
654
655 bootstrapNode(event.subject());
656 });
657 break;
658 case K8S_NODE_REMOVED:
659 eventExecutor.execute(() -> {
660
661 if (!isRelevantHelper()) {
662 return;
663 }
664
665 processK8sNodeRemoved(event.subject());
666 });
667 break;
668 case K8S_NODE_INCOMPLETE:
669 default:
670 break;
671 }
672 }
673 }
674}