blob: 5da9624c1d62e82c719531288121186acf712e05 [file] [log] [blame]
Jian Li077b07e2020-09-01 16:55:25 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import org.onosproject.cluster.ClusterService;
19import org.onosproject.cluster.LeadershipService;
20import org.onosproject.cluster.NodeId;
21import org.onosproject.core.ApplicationId;
22import org.onosproject.core.CoreService;
23import org.onosproject.k8snode.api.K8sHost;
24import org.onosproject.k8snode.api.K8sHostAdminService;
25import org.onosproject.k8snode.api.K8sHostEvent;
26import org.onosproject.k8snode.api.K8sHostHandler;
27import org.onosproject.k8snode.api.K8sHostListener;
28import org.onosproject.k8snode.api.K8sHostState;
29import org.onosproject.k8snode.api.K8sNode;
30import org.onosproject.k8snode.api.K8sNodeAdminService;
31import org.onosproject.k8snode.api.K8sTunnelBridge;
32import org.onosproject.net.Device;
33import org.onosproject.net.DeviceId;
34import org.onosproject.net.Port;
35import org.onosproject.net.behaviour.BridgeConfig;
36import org.onosproject.net.behaviour.BridgeDescription;
37import org.onosproject.net.behaviour.ControllerInfo;
38import org.onosproject.net.behaviour.DefaultBridgeDescription;
39import org.onosproject.net.behaviour.DefaultPatchDescription;
40import org.onosproject.net.behaviour.DefaultTunnelDescription;
41import org.onosproject.net.behaviour.InterfaceConfig;
42import org.onosproject.net.behaviour.PatchDescription;
43import org.onosproject.net.behaviour.TunnelDescription;
44import org.onosproject.net.behaviour.TunnelEndPoints;
45import org.onosproject.net.behaviour.TunnelKey;
46import org.onosproject.net.device.DeviceAdminService;
47import org.onosproject.net.device.DeviceEvent;
48import org.onosproject.net.device.DeviceListener;
49import org.onosproject.net.device.DeviceService;
50import org.onosproject.ovsdb.controller.OvsdbClientService;
51import org.onosproject.ovsdb.controller.OvsdbController;
52import org.onosproject.ovsdb.controller.OvsdbNodeId;
53import org.osgi.service.component.annotations.Activate;
54import org.osgi.service.component.annotations.Component;
55import org.osgi.service.component.annotations.Deactivate;
56import org.osgi.service.component.annotations.Reference;
57import org.osgi.service.component.annotations.ReferenceCardinality;
58import org.slf4j.Logger;
59
60import java.util.List;
61import java.util.Objects;
62import java.util.concurrent.ExecutorService;
63import java.util.stream.Collectors;
64
65import static java.lang.Thread.sleep;
66import static java.util.concurrent.Executors.newSingleThreadExecutor;
67import static org.onlab.packet.TpPort.tpPort;
68import static org.onlab.util.Tools.groupedThreads;
69import static org.onosproject.k8snode.api.Constants.GENEVE;
70import static org.onosproject.k8snode.api.Constants.GRE;
71import static org.onosproject.k8snode.api.Constants.VXLAN;
72import static org.onosproject.k8snode.api.K8sHostState.COMPLETE;
73import static org.onosproject.k8snode.api.K8sHostState.DEVICE_CREATED;
74import static org.onosproject.k8snode.api.K8sHostState.INCOMPLETE;
75import static org.onosproject.k8snode.api.K8sHostState.INIT;
76import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
77import static org.onosproject.net.AnnotationKeys.PORT_NAME;
78import static org.slf4j.LoggerFactory.getLogger;
79
80/**
81 * Service bootstraps kubernetes host.
82 */
83@Component(immediate = true)
84public class DefaultK8sHostHandler implements K8sHostHandler {
85
86 private final Logger log = getLogger(getClass());
87
88 private static final String DEFAULT_OF_PROTO = "tcp";
89 private static final int DEFAULT_OFPORT = 6653;
90 private static final int DPID_BEGIN = 3;
91 private static final long SLEEP_MS = 3000; // we wait 3s
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 protected CoreService coreService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected LeadershipService leadershipService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected ClusterService clusterService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected DeviceService deviceService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected DeviceAdminService deviceAdminService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected OvsdbController ovsdbController;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected K8sHostAdminService k8sHostAdminService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected K8sNodeAdminService k8sNodeAdminService;
116
117
118 private int ovsdbPortNum = 6640;
119
120 private final ExecutorService eventExecutor = newSingleThreadExecutor(
121 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
122
123 private final DeviceListener ovsdbListener = new InternalOvsdbListener();
124 private final DeviceListener bridgeListener = new InternalBridgeListener();
125 private final K8sHostListener k8sHostListener = new InternalK8sHostListener();
126
127 private ApplicationId appId;
128 private NodeId localNode;
129
130 @Activate
131 protected void activate() {
132 appId = coreService.getAppId(APP_ID);
133 localNode = clusterService.getLocalNode().id();
134
135 leadershipService.runForLeadership(appId.name());
136 deviceService.addListener(ovsdbListener);
137 deviceService.addListener(bridgeListener);
138 k8sHostAdminService.addListener(k8sHostListener);
139
140 log.info("Started");
141 }
142
143 @Deactivate
144 protected void deactivate() {
145 k8sHostAdminService.removeListener(k8sHostListener);
146 deviceService.removeListener(bridgeListener);
147 deviceService.removeListener(ovsdbListener);
148 leadershipService.withdraw(appId.name());
149 eventExecutor.shutdown();
150
151 log.info("Stopped");
152 }
153
154 @Override
155 public void processInitState(K8sHost k8sHost) {
156 if (!isOvsdbConnected(k8sHost, ovsdbPortNum, ovsdbController, deviceService)) {
157 ovsdbController.connect(k8sHost.hostIp(), tpPort(ovsdbPortNum));
158 return;
159 }
160
161 for (K8sTunnelBridge tunBridge : k8sHost.tunBridges()) {
162 if (!deviceService.isAvailable(tunBridge.deviceId())) {
163 createBridge(k8sHost.ovsdb(), tunBridge);
164 }
165 }
166 }
167
168 @Override
169 public void processDeviceCreatedState(K8sHost k8sHost) {
170 try {
171 if (!isOvsdbConnected(k8sHost, ovsdbPortNum, ovsdbController, deviceService)) {
172 ovsdbController.connect(k8sHost.hostIp(), tpPort(ovsdbPortNum));
173 return;
174 }
175
176 // create patch ports into tunnel bridge face to integration bridge
177 for (K8sTunnelBridge bridge : k8sHost.tunBridges()) {
178 for (String node : k8sHost.nodeNames()) {
179 K8sNode k8sNode = k8sNodeAdminService.node(node);
180 if (k8sNode.segmentId() == bridge.tunnelId()) {
181 createPatchInterfaces(k8sHost.ovsdb(), bridge, k8sNode);
182 }
183 }
184 }
185
186 // create tunnel ports
187 for (K8sTunnelBridge bridge : k8sHost.tunBridges()) {
188 if (!isTunPortEnabled(bridge, bridge.vxlanPortName())) {
189 createVxlanTunnelInterface(k8sHost.ovsdb(), bridge);
190 }
191
192 if (!isTunPortEnabled(bridge, bridge.grePortName())) {
193 createGreTunnelInterface(k8sHost.ovsdb(), bridge);
194 }
195
196 if (!isTunPortEnabled(bridge, bridge.genevePortName())) {
197 createGeneveTunnelInterface(k8sHost.ovsdb(), bridge);
198 }
199 }
200 } catch (Exception e) {
201 log.error("Exception occurred because of {}", e);
202 }
203 }
204
205 @Override
206 public void processCompleteState(K8sHost k8sHost) {
207 // do something if needed
208 }
209
210 @Override
211 public void processIncompleteState(K8sHost k8sHost) {
212 // do something if needed
213 }
214
215 private void createBridge(DeviceId ovsdb, K8sTunnelBridge bridge) {
216 Device device = deviceService.getDevice(ovsdb);
217
218 List<ControllerInfo> controllers = clusterService.getNodes().stream()
219 .map(n -> new ControllerInfo(n.ip(), DEFAULT_OFPORT, DEFAULT_OF_PROTO))
220 .collect(Collectors.toList());
221
222 String dpid = bridge.dpid().substring(DPID_BEGIN);
223
224 BridgeDescription.Builder builder = DefaultBridgeDescription.builder()
225 .name(bridge.name())
226 .failMode(BridgeDescription.FailMode.SECURE)
227 .datapathId(dpid)
228 .disableInBand()
229 .controllers(controllers);
230
231 BridgeConfig bridgeConfig = device.as(BridgeConfig.class);
232 bridgeConfig.addBridge(builder.build());
233 }
234
235 private void createPatchInterfaces(DeviceId ovsdb, K8sTunnelBridge bridge, K8sNode k8sNode) {
236 Device device = deviceService.getDevice(ovsdb);
237 if (device == null || !device.is(InterfaceConfig.class)) {
238 log.error("Failed to create patch interface on {}", ovsdb);
239 return;
240 }
241
242 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
243
244 // tunnel bridge -> integration bridge
245 PatchDescription brTunIntPatchDesc =
246 DefaultPatchDescription.builder()
247 .deviceId(bridge.name())
248 .ifaceName(k8sNode.tunToIntgPatchPortName())
249 .peer(k8sNode.intgToTunPatchPortName())
250 .build();
251
252 ifaceConfig.addPatchMode(k8sNode.tunToIntgPatchPortName(), brTunIntPatchDesc);
253 }
254
255 private void createVxlanTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
256 createTunnelInterface(ovsdb, bridge, VXLAN, bridge.vxlanPortName());
257 }
258
259 private void createGreTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
260 createTunnelInterface(ovsdb, bridge, GRE, bridge.grePortName());
261 }
262
263 private void createGeneveTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge) {
264 createTunnelInterface(ovsdb, bridge, GENEVE, bridge.genevePortName());
265 }
266
267 private void createTunnelInterface(DeviceId ovsdb, K8sTunnelBridge bridge,
268 String type, String intfName) {
269 if (isTunPortEnabled(bridge, intfName)) {
270 return;
271 }
272
273 Device device = deviceService.getDevice(ovsdb);
274 if (device == null || !device.is(InterfaceConfig.class)) {
275 log.error("Failed to create tunnel interface on {}", ovsdb);
276 return;
277 }
278
279 TunnelDescription tunnelDesc = buildTunnelDesc(bridge, type, intfName);
280
281 InterfaceConfig ifaceConfig = device.as(InterfaceConfig.class);
282 ifaceConfig.addTunnelMode(intfName, tunnelDesc);
283 }
284
285 private TunnelDescription buildTunnelDesc(K8sTunnelBridge bridge, String type, String intfName) {
286 TunnelKey<String> key = new TunnelKey<>(String.valueOf(bridge.tunnelId()));
287
288 if (VXLAN.equals(type) || GRE.equals(type) || GENEVE.equals(type)) {
289 TunnelDescription.Builder tdBuilder =
290 DefaultTunnelDescription.builder()
291 .deviceId(bridge.name())
292 .ifaceName(intfName)
293 .remote(TunnelEndPoints.flowTunnelEndpoint())
294 .key(key);
295
296 switch (type) {
297 case VXLAN:
298 tdBuilder.type(TunnelDescription.Type.VXLAN);
299 break;
300 case GRE:
301 tdBuilder.type(TunnelDescription.Type.GRE);
302 break;
303 case GENEVE:
304 tdBuilder.type(TunnelDescription.Type.GENEVE);
305 break;
306 default:
307 return null;
308 }
309
310 return tdBuilder.build();
311 }
312
313 return null;
314 }
315
316 private boolean isOvsdbConnected(K8sHost host, int ovsdbPort,
317 OvsdbController ovsdbController,
318 DeviceService deviceService) {
319 OvsdbClientService client = getOvsdbClient(host, ovsdbPort, ovsdbController);
320 return deviceService.isAvailable(host.ovsdb()) &&
321 client != null &&
322 client.isConnected();
323 }
324
325 private OvsdbClientService getOvsdbClient(K8sHost host, int ovsdbPort,
326 OvsdbController ovsdbController) {
327 OvsdbNodeId ovsdb = new OvsdbNodeId(host.hostIp(), ovsdbPort);
328 return ovsdbController.getOvsdbClient(ovsdb);
329 }
330
331 private boolean isCurrentStateDone(K8sHost k8sHost) {
332 switch (k8sHost.state()) {
333 case INIT:
334 return isInitStateDone(k8sHost);
335 case DEVICE_CREATED:
336 return isDeviceCreatedStateDone(k8sHost);
337 case COMPLETE:
338 case INCOMPLETE:
339 return false;
340 default:
341 return true;
342 }
343 }
344
345 private boolean isInitStateDone(K8sHost k8sHost) {
346 if (!isOvsdbConnected(k8sHost, ovsdbPortNum,
347 ovsdbController, deviceService)) {
348 return false;
349 }
350
351 try {
352 // we need to wait a while, in case interface and bridge
353 // creation requires some time
354 sleep(SLEEP_MS);
355 } catch (InterruptedException e) {
356 log.error("Exception caused during init state checking...");
357 }
358
359 for (K8sTunnelBridge tunBridge : k8sHost.tunBridges()) {
360 if (!deviceService.isAvailable(tunBridge.deviceId())) {
361 return false;
362 }
363 }
364
365 return true;
366 }
367
368 private boolean isDeviceCreatedStateDone(K8sHost k8sHost) {
369 try {
370 // we need to wait a while, in case interface and bridge
371 // creation requires some time
372 sleep(SLEEP_MS);
373 } catch (InterruptedException e) {
374 log.error("Exception caused during init state checking...");
375 }
376
377 for (K8sTunnelBridge bridge: k8sHost.tunBridges()) {
378 if (!isTunPortEnabled(bridge, bridge.vxlanPortName())) {
379 return false;
380 }
381 if (!isTunPortEnabled(bridge, bridge.grePortName())) {
382 return false;
383 }
384 if (!isTunPortEnabled(bridge, bridge.genevePortName())) {
385 return false;
386 }
387 }
388
389 return true;
390 }
391
392 private boolean isTunPortEnabled(K8sTunnelBridge tunBridge, String intf) {
393 return deviceService.isAvailable(tunBridge.deviceId()) &&
394 deviceService.getPorts(tunBridge.deviceId()).stream()
395 .anyMatch(port -> Objects.equals(
396 port.annotations().value(PORT_NAME), intf) &&
397 port.isEnabled());
398 }
399
400 /**
401 * Configures the kubernetes host with new state.
402 *
403 * @param k8sHost kubernetes host
404 * @param newState a new state
405 */
406 private void setState(K8sHost k8sHost, K8sHostState newState) {
407 if (k8sHost.state() == newState) {
408 return;
409 }
410 K8sHost updated = k8sHost.updateState(newState);
411 k8sHostAdminService.updateHost(updated);
412 log.info("Changed {} state: {}", k8sHost.hostIp(), newState);
413 }
414
415 /**
416 * Bootstraps a new kubernetes host.
417 *
418 * @param k8sHost kubernetes host
419 */
420 private void bootstrapHost(K8sHost k8sHost) {
421 if (isCurrentStateDone(k8sHost)) {
422 setState(k8sHost, k8sHost.state().nextState());
423 } else {
424 log.trace("Processing {} state for {}", k8sHost.state(),
425 k8sHost.hostIp());
426 k8sHost.state().process(this, k8sHost);
427 }
428 }
429
430 private class InternalOvsdbListener implements DeviceListener {
431
432 @Override
433 public boolean isRelevant(DeviceEvent event) {
434 return event.subject().type() == Device.Type.CONTROLLER;
435 }
436
437 private boolean isRelevantHelper() {
438 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
439 }
440
441 @Override
442 public void event(DeviceEvent event) {
443 Device device = event.subject();
444
445 switch (event.type()) {
446 case DEVICE_AVAILABILITY_CHANGED:
447 case DEVICE_ADDED:
448 eventExecutor.execute(() -> {
449 if (!isRelevantHelper()) {
450 return;
451 }
452
453 K8sHost k8sHost = k8sHostAdminService.host(device.id());
454
455 if (k8sHost == null) {
456 return;
457 }
458
459 if (deviceService.isAvailable(device.id())) {
460 log.debug("OVSDB {} detected", device.id());
461 bootstrapHost(k8sHost);
462 }
463 });
464 break;
465 case PORT_ADDED:
466 case PORT_REMOVED:
467 case DEVICE_REMOVED:
468 default:
469 // do nothing
470 break;
471 }
472 }
473 }
474
475 private class InternalBridgeListener implements DeviceListener {
476
477 @Override
478 public boolean isRelevant(DeviceEvent event) {
479 return event.subject().type() == Device.Type.SWITCH;
480 }
481
482 private boolean isRelevantHelper() {
483 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
484 }
485
486 @Override
487 public void event(DeviceEvent event) {
488 Device device = event.subject();
489
490 switch (event.type()) {
491 case DEVICE_AVAILABILITY_CHANGED:
492 case DEVICE_ADDED:
493 eventExecutor.execute(() -> {
494 if (!isRelevantHelper()) {
495 return;
496 }
497
498 K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
499
500 if (k8sHost == null) {
501 return;
502 }
503
504 if (deviceService.isAvailable(device.id())) {
505 log.debug("Tunnel bridge created on {}",
506 k8sHost.hostIp());
507 log.debug("OVSDB {} detected", device.id());
508 bootstrapHost(k8sHost);
509 } else if (k8sHost.state() == COMPLETE) {
510 log.info("Device {} disconnected", device.id());
511 setState(k8sHost, INCOMPLETE);
512 }
513
514 if (k8sHost.state() == INCOMPLETE ||
515 k8sHost.state() == DEVICE_CREATED) {
516 log.info("Device {} is reconnected", device.id());
517 k8sHostAdminService.updateHost(
518 k8sHost.updateState(INIT));
519 }
520 });
521 break;
522 case PORT_UPDATED:
523 case PORT_ADDED:
524 eventExecutor.execute(() -> {
525 if (!isRelevantHelper()) {
526 return;
527 }
528
529 K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
530
531 if (k8sHost == null) {
532 return;
533 }
534
535 Port port = event.port();
536 String portName = port.annotations().value(PORT_NAME);
537 if (k8sHost.state() == DEVICE_CREATED) {
538
539 K8sTunnelBridge tunBridge = k8sHost.tunBridges().stream().filter(
540 br -> br.deviceId().equals(device.id())
541 ).findAny().orElse(null);
542
543 if (tunBridge != null) {
544 if (Objects.equals(portName, tunBridge.vxlanPortName()) ||
545 Objects.equals(portName, tunBridge.grePortName()) ||
546 Objects.equals(portName, tunBridge.genevePortName())) {
547 log.info("Interface {} added or updated to {}",
548 portName, device.id());
549 bootstrapHost(k8sHost);
550 }
551 }
552 }
553 });
554 break;
555 case PORT_REMOVED:
556 eventExecutor.execute(() -> {
557 if (!isRelevantHelper()) {
558 return;
559 }
560
561 K8sHost k8sHost = k8sHostAdminService.hostByTunBridge(device.id());
562
563 if (k8sHost == null) {
564 return;
565 }
566
567 Port port = event.port();
568 String portName = port.annotations().value(PORT_NAME);
569 if (k8sHost.state() == COMPLETE) {
570 K8sTunnelBridge tunBridge = k8sHost.tunBridges().stream().filter(
571 br -> br.deviceId().equals(device.id())
572 ).findAny().orElse(null);
573
574 if (tunBridge != null) {
575 if (Objects.equals(portName, tunBridge.vxlanPortName()) ||
576 Objects.equals(portName, tunBridge.grePortName()) ||
577 Objects.equals(portName, tunBridge.genevePortName())) {
578 log.warn("Interface {} removed from {}",
579 portName, event.subject().id());
580 setState(k8sHost, INCOMPLETE);
581 }
582 }
583 }
584 });
585 break;
586 case DEVICE_REMOVED:
587 default:
588 // do nothing
589 break;
590 }
591 }
592 }
593
594 private class InternalK8sHostListener implements K8sHostListener {
595
596 private boolean isRelevantHelper() {
597 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
598 }
599
600 @Override
601 public void event(K8sHostEvent event) {
602 switch (event.type()) {
603 case K8S_HOST_CREATED:
604 case K8S_HOST_UPDATED:
605 eventExecutor.execute(() -> {
606 if (!isRelevantHelper()) {
607 return;
608 }
609
610 bootstrapHost(event.subject());
611 });
612 break;
613 case K8S_HOST_REMOVED:
614 case K8S_HOST_INCOMPLETE:
615 default:
616 break;
617 }
618 }
619 }
620}