blob: 2fb53a969b92455af4302d8b4dc1033e40758bf0 [file] [log] [blame]
Jian Li1c10cf22021-03-05 01:32:04 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import io.fabric8.kubernetes.client.KubernetesClient;
22import io.fabric8.kubernetes.client.Watcher;
23import io.fabric8.kubernetes.client.WatcherException;
24import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.kubevirtnetworking.api.AbstractWatcher;
31import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroup;
32import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupAdminService;
33import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupRule;
34import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
35import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
36import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
37import org.onosproject.mastership.MastershipService;
38import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
43import org.slf4j.Logger;
44
45import java.io.IOException;
46import java.util.Objects;
47import java.util.concurrent.ExecutorService;
48
49import static java.util.concurrent.Executors.newSingleThreadExecutor;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
52import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
53import static org.slf4j.LoggerFactory.getLogger;
54
55/**
56 * Kubevirt security group watcher used for feeding kubevirt security group information.
57 */
58@Component(immediate = true)
59public class KubevirtSecurityGroupWatcher extends AbstractWatcher {
60
61 private final Logger log = getLogger(getClass());
62
63 @Reference(cardinality = ReferenceCardinality.MANDATORY)
64 protected CoreService coreService;
65
66 @Reference(cardinality = ReferenceCardinality.MANDATORY)
67 protected MastershipService mastershipService;
68
69 @Reference(cardinality = ReferenceCardinality.MANDATORY)
70 protected ClusterService clusterService;
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected LeadershipService leadershipService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
76 protected KubevirtSecurityGroupAdminService adminService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected KubevirtApiConfigService configService;
80
81 private final ExecutorService eventExecutor = newSingleThreadExecutor(
82 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
83
84 private final InternalSecurityGroupWatcher
85 sgWatcher = new InternalSecurityGroupWatcher();
86 private final InternalSecurityGroupRuleWatcher
87 sgrWatcher = new InternalSecurityGroupRuleWatcher();
88 private final InternalKubevirtApiConfigListener
89 configListener = new InternalKubevirtApiConfigListener();
90
91 CustomResourceDefinitionContext securityGroupCrdCxt = new CustomResourceDefinitionContext
92 .Builder()
93 .withGroup("kubevirt.io")
94 .withScope("Cluster")
95 .withVersion("v1")
96 .withPlural("securitygroups")
97 .build();
98
99 CustomResourceDefinitionContext securityGroupRuleCrdCxt = new CustomResourceDefinitionContext
100 .Builder()
101 .withGroup("kubevirt.io")
102 .withScope("Cluster")
103 .withVersion("v1")
104 .withPlural("securitygrouprules")
105 .build();
106
107 private ApplicationId appId;
108 private NodeId localNodeId;
109
110 @Activate
111 protected void activate() {
112 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
113 localNodeId = clusterService.getLocalNode().id();
114 leadershipService.runForLeadership(appId.name());
115 configService.addListener(configListener);
116
117 log.info("Started");
118 }
119
120 @Deactivate
121 protected void deactivate() {
122 configService.removeListener(configListener);
123 leadershipService.withdraw(appId.name());
124 eventExecutor.shutdown();
125
126 log.info("Stopped");
127 }
128
129 private void instantiateSgWatcher() {
130 KubernetesClient client = k8sClient(configService);
131
132 if (client != null) {
133 try {
134 client.customResource(securityGroupCrdCxt).watch(sgWatcher);
135 } catch (IOException e) {
136 e.printStackTrace();
137 }
138 }
139 }
140
141 private void instantiateSgrWatcher() {
142 KubernetesClient client = k8sClient(configService);
143
144 if (client != null) {
145 try {
146 client.customResource(securityGroupRuleCrdCxt).watch(sgrWatcher);
147 } catch (IOException e) {
148 e.printStackTrace();
149 }
150 }
151 }
152
153 private KubevirtSecurityGroup parseSecurityGroup(String resource) {
154 try {
155 ObjectMapper mapper = new ObjectMapper();
156 JsonNode json = mapper.readTree(resource);
157 ObjectNode spec = (ObjectNode) json.get("spec");
158 return codec(KubevirtSecurityGroup.class).decode(spec, this);
159 } catch (IOException e) {
160 log.error("Failed to parse kubevirt security group object");
161 }
162
163 return null;
164 }
165
166 private KubevirtSecurityGroupRule parseSecurityGroupRule(String resource) {
167 try {
168 ObjectMapper mapper = new ObjectMapper();
169 JsonNode json = mapper.readTree(resource);
170 ObjectNode spec = (ObjectNode) json.get("spec");
171 return codec(KubevirtSecurityGroupRule.class).decode(spec, this);
172 } catch (IOException e) {
173 log.error("Failed to parse kubevirt security group rule object");
174 }
175
176 return null;
177 }
178
179 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
180
181 private boolean isRelevantHelper() {
182 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
183 }
184
185 @Override
186 public void event(KubevirtApiConfigEvent event) {
187
188 switch (event.type()) {
189 case KUBEVIRT_API_CONFIG_UPDATED:
190 eventExecutor.execute(this::processConfigUpdate);
191 break;
192 case KUBEVIRT_API_CONFIG_CREATED:
193 case KUBEVIRT_API_CONFIG_REMOVED:
194 default:
195 // do nothing
196 break;
197 }
198 }
199
200 private void processConfigUpdate() {
201 if (!isRelevantHelper()) {
202 return;
203 }
204
205 instantiateSgWatcher();
206 instantiateSgrWatcher();
207 }
208 }
209
210 private class InternalSecurityGroupWatcher implements Watcher<String> {
211
212 @Override
213 public void eventReceived(Action action, String resource) {
214 switch (action) {
215 case ADDED:
216 eventExecutor.execute(() -> processAddition(resource));
217 break;
218 case MODIFIED:
219 eventExecutor.execute(() -> processModification(resource));
220 break;
221 case DELETED:
222 eventExecutor.execute(() -> processDeletion(resource));
223 break;
224 default:
225 // do nothing
226 break;
227 }
228 }
229
230 @Override
231 public void onClose(WatcherException e) {
232 // due to the bugs in fabric8, the watcher might be closed,
233 // we will re-instantiate the watcher in this case
234 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
235 log.warn("Security Group watcher OnClose, re-instantiate the watcher...");
236
237 instantiateSgWatcher();
238 }
239
240 private void processAddition(String resource) {
241 if (!isMaster()) {
242 return;
243 }
244
245 KubevirtSecurityGroup sg = parseSecurityGroup(resource);
246
247 if (sg != null) {
248 log.trace("Process Security Group {} creating event from API server.", sg.name());
249
250 if (adminService.securityGroup(sg.id()) == null) {
251 adminService.createSecurityGroup(sg);
252 }
253 }
254 }
255
256 private void processModification(String resource) {
257 if (!isMaster()) {
258 return;
259 }
260
261 KubevirtSecurityGroup sg = parseSecurityGroup(resource);
262
263 if (sg != null) {
264 log.trace("Process Security Group {} updating event from API server.", sg.name());
265
266 // since Security Group CRD does not contains any rules information,
267 // we need to manually add all rules from original to the updated one
268 KubevirtSecurityGroup orig = adminService.securityGroup(sg.id());
269 if (orig != null) {
270 KubevirtSecurityGroup updated = sg.updateRules(orig.rules());
271 adminService.updateSecurityGroup(updated);
272 }
273 }
274 }
275
276 private void processDeletion(String resource) {
277 if (!isMaster()) {
278 return;
279 }
280
281 KubevirtSecurityGroup sg = parseSecurityGroup(resource);
282
283 if (sg != null) {
284 log.trace("Process Security Group {} removal event from API server.", sg.name());
285
286 adminService.removeSecurityGroup(sg.id());
287 }
288 }
289 }
290
291 private class InternalSecurityGroupRuleWatcher implements Watcher<String> {
292
293 @Override
294 public void eventReceived(Action action, String resource) {
295 switch (action) {
296 case ADDED:
297 eventExecutor.execute(() -> processAddition(resource));
298 break;
299 case MODIFIED:
300 eventExecutor.execute(() -> processModification(resource));
301 break;
302 case DELETED:
303 eventExecutor.execute(() -> processDeletion(resource));
304 break;
305 default:
306 // do nothing
307 break;
308 }
309 }
310
311 @Override
312 public void onClose(WatcherException e) {
313 // due to the bugs in fabric8, the watcher might be closed,
314 // we will re-instantiate the watcher in this case
315 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
316 log.warn("Security Group Rule watcher OnClose, re-instantiate the watcher...");
317
318 instantiateSgrWatcher();
319 }
320
321 private void processAddition(String resource) {
322 if (!isMaster()) {
323 return;
324 }
325
326 KubevirtSecurityGroupRule sgr = parseSecurityGroupRule(resource);
327
328 if (sgr != null) {
329 log.trace("Process Security Group Rule {} creating event from API server.", sgr.id());
330
331 if (adminService.securityGroupRule(sgr.id()) == null) {
332 adminService.createSecurityGroupRule(sgr);
333 }
334 }
335 }
336
337 private void processModification(String resource) {
338 if (!isMaster()) {
339 return;
340 }
341
342 // we do not handle the update case, as we assume the security group rule
343 // object is immutable
344 }
345
346 private void processDeletion(String resource) {
347 if (!isMaster()) {
348 return;
349 }
350
351 KubevirtSecurityGroupRule sgr = parseSecurityGroupRule(resource);
352
353 if (sgr != null) {
354 log.trace("Process Security Group Rule {} removal event from API server.", sgr.id());
355
356 adminService.removeSecurityGroupRule(sgr.id());
357 }
358 }
359 }
360
361 private boolean isMaster() {
362 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
363 }
364}