blob: a7d4dc57c63e0a4e19ded25aececaba54701aee6 [file] [log] [blame]
Daniel Park5ff76b72022-09-26 22:58:53 +09001/*
2 * Copyright 2022-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18
19import com.google.common.collect.Lists;
20import com.google.common.collect.Sets;
21import io.fabric8.kubernetes.api.model.ConfigMap;
22import io.fabric8.kubernetes.api.model.LoadBalancerIngress;
23import io.fabric8.kubernetes.api.model.LoadBalancerStatus;
24import io.fabric8.kubernetes.api.model.Service;
25import io.fabric8.kubernetes.client.KubernetesClient;
26import io.fabric8.kubernetes.client.Watcher;
27import io.fabric8.kubernetes.client.WatcherException;
28import io.fabric8.kubernetes.client.dsl.Resource;
29import org.onlab.packet.IpAddress;
30import org.onlab.packet.MacAddress;
31import org.onosproject.cluster.ClusterService;
32import org.onosproject.cluster.LeadershipService;
33import org.onosproject.cluster.NodeId;
34import org.onosproject.core.ApplicationId;
35import org.onosproject.core.CoreService;
36import org.onosproject.kubevirtnetworking.api.DefaultKubernetesExternalLb;
Daniel Park6a83ee62022-10-26 11:57:34 +090037import org.onosproject.kubevirtnetworking.api.DefaultKubernetesServicePort;
Daniel Park5ff76b72022-09-26 22:58:53 +090038import org.onosproject.kubevirtnetworking.api.KubernetesExternalLb;
39import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbAdminService;
Daniel Park6a83ee62022-10-26 11:57:34 +090040import org.onosproject.kubevirtnetworking.api.KubernetesServicePort;
Daniel Park5ff76b72022-09-26 22:58:53 +090041import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfig;
42import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfigEvent;
43import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfigListener;
44import org.onosproject.kubevirtnode.api.KubernetesExternalLbConfigService;
45import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
46import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
47import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
48import org.onosproject.kubevirtnode.api.KubevirtNode;
49import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
50import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
51import org.onosproject.kubevirtnode.api.KubevirtNodeService;
52import org.onosproject.mastership.MastershipService;
53import org.osgi.service.component.annotations.Activate;
54import org.osgi.service.component.annotations.Component;
55import org.osgi.service.component.annotations.Deactivate;
56import org.osgi.service.component.annotations.Reference;
57import org.osgi.service.component.annotations.ReferenceCardinality;
58import org.slf4j.Logger;
59
60import java.util.Map;
61import java.util.Objects;
62import java.util.Set;
63import java.util.concurrent.ExecutorService;
64
65import static java.util.concurrent.Executors.newSingleThreadExecutor;
66import static org.onlab.util.Tools.groupedThreads;
67import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
Daniel Park6a83ee62022-10-26 11:57:34 +090068import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.configMapUpdated;
Daniel Park5ff76b72022-09-26 22:58:53 +090069import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.gatewayNodeForSpecifiedService;
70import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
71import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.workerNodeForSpecifiedService;
72import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
73import static org.slf4j.LoggerFactory.getLogger;
74
75/**
76 * Kubernetes service watcher used for external loadbalancing among PODs.
77 */
78@Component(immediate = true)
79public class KubernetesServiceWatcher {
80 private final Logger log = getLogger(getClass());
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected CoreService coreService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected MastershipService mastershipService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected ClusterService clusterService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected LeadershipService leadershipService;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected KubevirtApiConfigService apiConfigService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected KubernetesExternalLbConfigService lbConfigService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected KubernetesExternalLbAdminService adminService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected KubevirtNodeService nodeService;
105
106 private static final String KUBE_DASH_VIP = "kube-vip";
107 private static final String KUBE_VIP = "kubevip";
108 private static final String LOADBALANCER_IP = "loadBalancerIP";
109 private static final String TYPE_LOADBALANCER = "LoadBalancer";
110 private static final String KUBE_SYSTEM = "kube-system";
111 private static final String GATEWAY_IP = "gateway-ip";
112 private static final String GATEWAY_MAC = "gateway-mac";
113 private static final String DEFAULT = "default";
114
115 private final ExecutorService eventExecutor = newSingleThreadExecutor(
116 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
117
118 private final InternalKubevirtApiConfigListener
119 apiConfigListener = new InternalKubevirtApiConfigListener();
120
121 private final InternalKubernetesServiceWatcher
122 serviceWatcher = new InternalKubernetesServiceWatcher();
123
124 private final InternalKubernetesExternalLbConfigListener
125 lbConfigListener = new InternalKubernetesExternalLbConfigListener();
126
127 private final InternalNodeEventListener
128 nodeEventListener = new InternalNodeEventListener();
129
130
131 private ApplicationId appId;
132 private NodeId localNodeId;
133
134 @Activate
135 protected void activate() {
136 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
137 localNodeId = clusterService.getLocalNode().id();
138 leadershipService.runForLeadership(appId.name());
139
140 apiConfigService.addListener(apiConfigListener);
141 lbConfigService.addListener(lbConfigListener);
142 nodeService.addListener(nodeEventListener);
143
144 log.info("Started");
145 }
146
147
148 @Deactivate
149 protected void deactivate() {
150 leadershipService.withdraw(appId.name());
151
152 apiConfigService.removeListener(apiConfigListener);
153 lbConfigService.removeListener(lbConfigListener);
154 nodeService.removeListener(nodeEventListener);
155
156 eventExecutor.shutdown();
157
158 log.info("Stopped");
159 }
160
161 private void instantiateWatcher() {
162 KubernetesClient client = k8sClient(apiConfigService);
163
164 if (client != null) {
165 client.services().inAnyNamespace().watch(serviceWatcher);
166 }
167 }
168
169 private class InternalKubernetesExternalLbConfigListener
170 implements KubernetesExternalLbConfigListener {
171
172 private boolean isRelevantHelper() {
173 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
174 }
175
176 @Override
177 public void event(KubernetesExternalLbConfigEvent event) {
178 switch (event.type()) {
179 case KUBERNETES_EXTERNAL_LB_CONFIG_CREATED:
180 case KUBERNETES_EXTERNAL_LB_CONFIG_UPDATED:
Daniel Park6a83ee62022-10-26 11:57:34 +0900181 eventExecutor.execute(() -> processConfigUpdate(event.subject()));
Daniel Park5ff76b72022-09-26 22:58:53 +0900182 break;
183 case KUBERNETES_EXTERNAL_LB_CONFIG_REMOVED:
184 default:
185 //do nothing
186 break;
187 }
188 }
189
Daniel Park6a83ee62022-10-26 11:57:34 +0900190 private void processConfigUpdate(KubernetesExternalLbConfig externalLbConfig) {
Daniel Park5ff76b72022-09-26 22:58:53 +0900191 if (!isRelevantHelper()) {
192 return;
193 }
Daniel Park6a83ee62022-10-26 11:57:34 +0900194 if (configMapUpdated(externalLbConfig)) {
195 addOrUpdateExternalLoadBalancers();
196 }
Daniel Park5ff76b72022-09-26 22:58:53 +0900197 }
198 }
199
200 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
201
202 private boolean isRelevantHelper() {
203 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
204 }
205
206 @Override
207 public void event(KubevirtApiConfigEvent event) {
208
209 switch (event.type()) {
210 case KUBEVIRT_API_CONFIG_UPDATED:
211 eventExecutor.execute(this::processConfigUpdate);
212 break;
213 case KUBEVIRT_API_CONFIG_CREATED:
214 case KUBEVIRT_API_CONFIG_REMOVED:
215 default:
216 // do nothing
217 break;
218 }
219 }
220
221 private void processConfigUpdate() {
222 if (!isRelevantHelper()) {
223 return;
224 }
225 instantiateWatcher();
226 addOrUpdateExternalLoadBalancers();
227 }
228 }
229
230 private class InternalKubernetesServiceWatcher implements Watcher<Service> {
231
232 @Override
233 public void eventReceived(Action action, Service service) {
234 switch (action) {
235 case ADDED:
Daniel Park5ff76b72022-09-26 22:58:53 +0900236 case MODIFIED:
Daniel Park5ff76b72022-09-26 22:58:53 +0900237 eventExecutor.execute(() -> processAddOrMod(service));
238 break;
239 case DELETED:
Daniel Park5ff76b72022-09-26 22:58:53 +0900240 eventExecutor.execute(() -> processDeletion(service));
241 break;
242 case ERROR:
243 log.warn("Failures processing pod manipulation.");
244 break;
245 default:
246 break;
247 }
248 }
249
250 @Override
251 public void onClose(WatcherException e) {
252 // due to the bugs in fabric8, pod watcher might be closed,
253 // we will re-instantiate the pod watcher in this case
254 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
255 log.warn("Service watcher OnClose, re-instantiate the POD watcher...");
256 instantiateWatcher();
257 }
258
259 private void processAddOrMod(Service service) {
Daniel Parka8373582022-11-10 13:40:06 +0900260 if (service == null || !isMaster() || !isLoadBalancerType(service)) {
Daniel Park5ff76b72022-09-26 22:58:53 +0900261 return;
262 }
263
Daniel Parka8373582022-11-10 13:40:06 +0900264 log.info("Service event ADDED or MODIFIED received");
265
Daniel Park6a83ee62022-10-26 11:57:34 +0900266 KubernetesExternalLbConfig config = lbConfigService.lbConfigs().stream().findAny().orElse(null);
267
268 if (!configMapUpdated(config)) {
Daniel Park5ff76b72022-09-26 22:58:53 +0900269 log.warn("Config map is not set yet. Stop this task");
270 return;
271 }
272
273 try {
274 if (addOrUpdateExternalLoadBalancer(service) &&
275 !isLoadBalancerStatusAlreadySet(service)) {
276 serviceStatusUpdate(service);
277 }
278 } catch (Exception e) {
279 log.error("Exception occurred because of {}", e.toString());
280 }
281 }
282
283 private void processDeletion(Service service) {
Daniel Parka8373582022-11-10 13:40:06 +0900284 if (service == null || !isMaster() || !isLoadBalancerType(service)) {
Daniel Park5ff76b72022-09-26 22:58:53 +0900285 return;
286 }
Daniel Parka8373582022-11-10 13:40:06 +0900287
288 log.info("Service event DELETED received");
289
290 if (isKubeVipCloudProviderLabelIsSet(service)) {
Daniel Park5ff76b72022-09-26 22:58:53 +0900291 KubernetesExternalLb lb = adminService.loadBalancer(service.getMetadata().getName());
292
293 if (lb == null) {
294 return;
295 }
296
297 adminService.removeExternalLb(lb.serviceName());
298 }
299 }
300 private boolean isMaster() {
301 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
302 }
303 }
304
305
306 //When api config or configmap updated, check every prerequisite and update all external load balancers
307 private void addOrUpdateExternalLoadBalancers() {
308 KubernetesClient client = k8sClient(apiConfigService);
309
310 client.services().inNamespace(DEFAULT).list()
311 .getItems().forEach(this::addOrUpdateExternalLoadBalancer);
312 }
313
314 private boolean addOrUpdateExternalLoadBalancer(Service service) {
315 if (isLoadBalancerType(service) &&
316 isKubeVipCloudProviderLabelIsSet(service)) {
317
318 KubernetesExternalLb lb = parseKubernetesExternalLb(service);
319 if (lb == null) {
320 log.warn("Failed to parse the kubernetes external lb");
321 return false;
322 }
323
324 KubevirtNode electedGatewayNode = gatewayNodeForSpecifiedService(nodeService, lb);
325 if (electedGatewayNode == null) {
326 log.warn("Service created but there's no gateway nodes ready. Stop this task.");
327 return false;
328 }
329
330 lb = lb.updateElectedGateway(electedGatewayNode.hostname());
331
332 KubevirtNode electedWorkerNode = workerNodeForSpecifiedService(nodeService, lb);
333 if (electedWorkerNode == null) {
334 log.warn("Service created but there's no worker nodes ready. Stop this task.");
335 return false;
336 }
337 lb = lb.updateElectedWorker(electedWorkerNode.hostname());
338
339 log.trace("processAddOrMod called and parsed lb is {}", lb);
340
341 if (adminService.loadBalancer(lb.serviceName()) == null) {
342 adminService.createExternalLb(lb);
343 } else {
344 adminService.updateExternalLb(lb);
345 }
346 return true;
347 }
348 return false;
349 }
350
351 private void serviceStatusUpdate(Service service) {
352 KubernetesClient client = k8sClient(apiConfigService);
353
354 String lbIp = service.getSpec().getLoadBalancerIP();
355 if (lbIp == null) {
356 return;
357 }
358
359 LoadBalancerIngress lbIngress = new LoadBalancerIngress(KUBE_VIP, lbIp, Lists.newArrayList());
360
361 service.getStatus().getLoadBalancer().setIngress(Lists.newArrayList(lbIngress));
362
363 //When a service is deleted, the event MODIFED is also along with DELETED event
364 //So filter out this MODIFIED events
365 if (client.services().withName(service.getMetadata().getName()) != null) {
366 client.services().patchStatus(service);
367 }
368 }
369
370 //Only process if the event when the kube-vip-cloud-provider label is set
371 // and loadbalancer status is not set.
372 private boolean isKubeVipCloudProviderLabelIsSet(Service service) {
373 log.trace("isKubeVipCloudProviderLabelIsSet called with labels {}", service.getMetadata().getLabels());
374 if (service.getMetadata().getLabels() == null) {
375 return false;
376 }
377
378 return service.getMetadata().getLabels().containsValue(KUBE_DASH_VIP);
379 }
380
381 private boolean isLoadBalancerStatusAlreadySet(Service service) {
382 log.trace("isLoadBalancerStatusAlreadySet called with status {}", service.getStatus());
383
384 LoadBalancerStatus lbStatus = service.getStatus().getLoadBalancer();
385 if (lbStatus.getIngress().isEmpty()) {
386 return false;
387 }
388
389 String lbIp = service.getSpec().getLoadBalancerIP();
390 if (lbIp == null) {
391 return false;
392 }
393
394 return lbStatus.getIngress().stream()
395 .filter(lbIngress -> Objects.equals(lbIngress.getIp(), lbIp))
396 .findAny().isPresent();
397 }
398
Daniel Park5ff76b72022-09-26 22:58:53 +0900399 //Only process if the event when the service type is LoadBalancer
400 private boolean isLoadBalancerType(Service service) {
401 return service.getSpec().getType().equals(TYPE_LOADBALANCER);
402 }
403
404 private KubernetesExternalLb parseKubernetesExternalLb(Service service) {
405 if (service.getMetadata() == null || service.getSpec() == null) {
406 return null;
407 }
408
409 String serviceName = service.getMetadata().getName();
410 if (serviceName == null) {
411 return null;
412 }
413
414 String lbIp = service.getSpec().getLoadBalancerIP();
415 if (lbIp == null) {
416 return null;
417 }
418
Daniel Park6a83ee62022-10-26 11:57:34 +0900419 Set<KubernetesServicePort> servicePorts = Sets.newHashSet();
Daniel Park5ff76b72022-09-26 22:58:53 +0900420 Set<String> endpointSet = Sets.newHashSet();
421
422 service.getSpec().getPorts().forEach(servicePort -> {
Daniel Park6a83ee62022-10-26 11:57:34 +0900423 if (servicePort.getPort() != null && servicePort.getNodePort() != null) {
424 servicePorts.add(DefaultKubernetesServicePort.builder()
425 .nodePort(servicePort.getNodePort())
426 .port(servicePort.getPort()).build());
427 }
Daniel Park5ff76b72022-09-26 22:58:53 +0900428 });
429
430 nodeService.completeNodes(WORKER).forEach(workerNode -> {
431 endpointSet.add(workerNode.dataIp().toString());
432 });
433
434 String loadbalancerGatewayIp = loadBalancerGatewayIp();
435
436 if (loadbalancerGatewayIp == null) {
437 log.error("Can't find the loadbalancer gateway ip in the kubevip configmap.." +
438 "Failed to parse kubernetes external lb and return null");
439 return null;
440 }
441
442 MacAddress loadBalancerGatewayMac = loadBalancerGatewayMac();
443
444 if (loadbalancerGatewayIp == null) {
445 log.error("Can't find the loadbalancer gateway mac in the kubevip configmap.." +
446 "Failed to parse kubernetes external lb and return null");
447 return null;
448 }
449
450 return DefaultKubernetesExternalLb.builder().serviceName(serviceName)
451 .loadBalancerIp(IpAddress.valueOf(lbIp))
Daniel Park6a83ee62022-10-26 11:57:34 +0900452 .servicePorts(servicePorts)
Daniel Park5ff76b72022-09-26 22:58:53 +0900453 .endpointSet(endpointSet)
454 .loadBalancerGwIp(IpAddress.valueOf(loadbalancerGatewayIp))
455 .loadBalancerGwMac(loadBalancerGatewayMac)
456 .build();
457 }
458
459 private String loadBalancerGatewayIp() {
460 KubernetesClient client = k8sClient(apiConfigService);
461
462 Resource<ConfigMap> kubeVipConfigMapResource =
463 client.configMaps().inNamespace(KUBE_SYSTEM).withName(KUBE_VIP);
464
465 if (kubeVipConfigMapResource == null) {
466 return null;
467 }
468
469 Map<String, String> kubeVipConfigMap = kubeVipConfigMapResource.get().getData();
470
471 if (!kubeVipConfigMap.containsKey(GATEWAY_IP)) {
472 return null;
473 }
474
475 return kubeVipConfigMap.get(GATEWAY_IP);
476 }
477
478 private MacAddress loadBalancerGatewayMac() {
479 KubernetesExternalLbConfig config = lbConfigService.lbConfigs().stream().findAny().orElse(null);
480
481 if (config == null) {
482 return null;
483 }
484
485 return config.loadBalancerGwMac();
486 }
487
488 private class InternalNodeEventListener implements KubevirtNodeListener {
489
490 private boolean isRelevantHelper() {
491 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
492 }
493
494 @Override
495 public void event(KubevirtNodeEvent event) {
496 switch (event.type()) {
497 case KUBEVIRT_NODE_COMPLETE:
498 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
499 break;
500 case KUBEVIRT_NODE_INCOMPLETE:
501 case KUBEVIRT_NODE_REMOVED:
502 eventExecutor.execute(() -> processNodeDeletion(event.subject()));
503 break;
504 case KUBEVIRT_NODE_UPDATED:
505 default:
506 // do nothing
507 break;
508 }
509 }
510
511 private void processNodeCompletion(KubevirtNode node) {
512 if (!isRelevantHelper()) {
513 return;
514 }
515 addOrUpdateExternalLoadBalancers();
516 }
517
518 private void processNodeDeletion(KubevirtNode node) {
519 if (!isRelevantHelper()) {
520 return;
521 }
522 addOrUpdateExternalLoadBalancers();
523 }
524 }
525}