blob: 5b0dd7564c9ae351e79d3c06a74379b1753e0030 [file] [log] [blame]
Jian Lidaa7d6a2021-04-13 17:22:56 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import io.fabric8.kubernetes.client.KubernetesClient;
22import io.fabric8.kubernetes.client.Watcher;
23import io.fabric8.kubernetes.client.WatcherException;
24import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.kubevirtnetworking.api.AbstractWatcher;
31import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancer;
32import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerAdminService;
33import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
34import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
35import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
36import org.onosproject.mastership.MastershipService;
37import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Reference;
41import org.osgi.service.component.annotations.ReferenceCardinality;
42import org.slf4j.Logger;
43
44import java.io.IOException;
45import java.util.Objects;
46import java.util.concurrent.ExecutorService;
47
48import static java.util.concurrent.Executors.newSingleThreadExecutor;
49import static org.onlab.util.Tools.groupedThreads;
50import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
51import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
52import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.parseResourceName;
53import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * Kubevirt load balancer watcher used for feeding kubevirt load balancer information.
57 */
58@Component(immediate = true)
59public class KubevirtLoadBalancerWatcher extends AbstractWatcher {
60
61 private final Logger log = getLogger(getClass());
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY)
64 protected CoreService coreService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY)
67 protected MastershipService mastershipService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY)
70 protected ClusterService clusterService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected LeadershipService leadershipService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
76 protected KubevirtLoadBalancerAdminService adminService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected KubevirtApiConfigService configService;
80
81 private final ExecutorService eventExecutor = newSingleThreadExecutor(
82 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
83
84 private final InternalLoadBalancerWatcher watcher = new InternalLoadBalancerWatcher();
85 private final InternalKubevirtApiConfigListener
86 configListener = new InternalKubevirtApiConfigListener();
87
88 CustomResourceDefinitionContext lbCrdCxt = new CustomResourceDefinitionContext
89 .Builder()
90 .withGroup("kubevirt.io")
91 .withScope("Cluster")
92 .withVersion("v1")
93 .withPlural("loadbalancers")
94 .build();
95
96 private ApplicationId appId;
97 private NodeId localNodeId;
98
99 @Activate
100 protected void activate() {
101 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
102 localNodeId = clusterService.getLocalNode().id();
103 leadershipService.runForLeadership(appId.name());
104 configService.addListener(configListener);
105
106 log.info("Started");
107 }
108
109 @Deactivate
110 protected void deactivate() {
111 configService.removeListener(configListener);
112 leadershipService.withdraw(appId.name());
113 eventExecutor.shutdown();
114
115 log.info("Stopped");
116 }
117
118 private void instantiateWatcher() {
119 KubernetesClient client = k8sClient(configService);
120
121 if (client != null) {
122 try {
123 client.customResource(lbCrdCxt).watch(watcher);
124 } catch (IOException e) {
125 e.printStackTrace();
126 }
127 }
128 }
129
130 private KubevirtLoadBalancer parseKubevirtLoadBalancer(String resource) {
131 try {
132 ObjectMapper mapper = new ObjectMapper();
133 JsonNode json = mapper.readTree(resource);
134 ObjectNode spec = (ObjectNode) json.get("spec");
135 return codec(KubevirtLoadBalancer.class).decode(spec, this);
136 } catch (IOException e) {
137 log.error("Failed to parse kubevirt load balancer object");
138 }
139
140 return null;
141 }
142
143 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
144
145 private boolean isRelevantHelper() {
146 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
147 }
148
149 @Override
150 public void event(KubevirtApiConfigEvent event) {
151
152 switch (event.type()) {
153 case KUBEVIRT_API_CONFIG_UPDATED:
154 eventExecutor.execute(this::processConfigUpdate);
155 break;
156 case KUBEVIRT_API_CONFIG_CREATED:
157 case KUBEVIRT_API_CONFIG_REMOVED:
158 default:
159 // do nothing
160 break;
161 }
162 }
163
164 private void processConfigUpdate() {
165 if (!isRelevantHelper()) {
166 return;
167 }
168
169 instantiateWatcher();
170 }
171 }
172
173 private class InternalLoadBalancerWatcher implements Watcher<String> {
174
175 @Override
176 public void eventReceived(Action action, String resource) {
177 switch (action) {
178 case ADDED:
179 eventExecutor.execute(() -> processAddition(resource));
180 break;
181 case MODIFIED:
182 eventExecutor.execute(() -> processModification(resource));
183 break;
184 case DELETED:
185 eventExecutor.execute(() -> processDeletion(resource));
186 break;
187 case ERROR:
188 log.warn("Failures processing load balancer manipulation.");
189 break;
190 default:
191 // do nothing
192 break;
193 }
194 }
195
196 @Override
197 public void onClose(WatcherException e) {
198 // due to the bugs in fabric8, the watcher might be closed,
199 // we will re-instantiate the watcher in this case
200 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
201 log.warn("Load Balancer watcher OnClose, re-instantiate the watcher...");
202
203 instantiateWatcher();
204 }
205
206 private void processAddition(String resource) {
207 if (!isMaster()) {
208 return;
209 }
210
211 String name = parseResourceName(resource);
212
213 log.trace("Process Load Balancer {} creating event from API server.",
214 name);
215
216 KubevirtLoadBalancer lb = parseKubevirtLoadBalancer(resource);
217 if (lb != null) {
218 if (adminService.loadBalancer(lb.name()) == null) {
219 adminService.createLoadBalancer(lb);
220 }
221 }
222 }
223
224 private void processModification(String resource) {
225 if (!isMaster()) {
226 return;
227 }
228
229 String name = parseResourceName(resource);
230
231 log.trace("Process Load Balancer {} updating event from API server.",
232 name);
233
234 KubevirtLoadBalancer lb = parseKubevirtLoadBalancer(resource);
235 if (lb != null) {
236 adminService.updateLoadBalancer(lb);
237 }
238 }
239
240 private void processDeletion(String resource) {
241 if (!isMaster()) {
242 return;
243 }
244
245 String name = parseResourceName(resource);
246
247 log.trace("Process Load Balancer {} removal event from API server.",
248 name);
249
250 adminService.removeLoadBalancer(name);
251 }
252
253 private boolean isMaster() {
254 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
255 }
256 }
257}