blob: 5a25db3a9576f9358b93b731de9af112c3771cdc [file] [log] [blame]
Jian Li1c10cf22021-03-05 01:32:04 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import io.fabric8.kubernetes.client.KubernetesClient;
22import io.fabric8.kubernetes.client.Watcher;
23import io.fabric8.kubernetes.client.WatcherException;
24import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.kubevirtnetworking.api.AbstractWatcher;
31import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroup;
32import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupAdminService;
33import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupRule;
34import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
35import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
36import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
37import org.onosproject.mastership.MastershipService;
38import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
43import org.slf4j.Logger;
44
45import java.io.IOException;
46import java.util.Objects;
47import java.util.concurrent.ExecutorService;
48
49import static java.util.concurrent.Executors.newSingleThreadExecutor;
50import static org.onlab.util.Tools.groupedThreads;
51import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
52import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
Jian Liae43d222021-07-16 23:50:11 +090053import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
Jian Li1c10cf22021-03-05 01:32:04 +090054import static org.slf4j.LoggerFactory.getLogger;
55
56/**
57 * Kubevirt security group watcher used for feeding kubevirt security group information.
58 */
59@Component(immediate = true)
60public class KubevirtSecurityGroupWatcher extends AbstractWatcher {
61
62 private final Logger log = getLogger(getClass());
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY)
65 protected CoreService coreService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY)
68 protected MastershipService mastershipService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY)
71 protected ClusterService clusterService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY)
74 protected LeadershipService leadershipService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected KubevirtSecurityGroupAdminService adminService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY)
80 protected KubevirtApiConfigService configService;
81
82 private final ExecutorService eventExecutor = newSingleThreadExecutor(
83 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
84
85 private final InternalSecurityGroupWatcher
86 sgWatcher = new InternalSecurityGroupWatcher();
87 private final InternalSecurityGroupRuleWatcher
88 sgrWatcher = new InternalSecurityGroupRuleWatcher();
89 private final InternalKubevirtApiConfigListener
90 configListener = new InternalKubevirtApiConfigListener();
91
92 CustomResourceDefinitionContext securityGroupCrdCxt = new CustomResourceDefinitionContext
93 .Builder()
94 .withGroup("kubevirt.io")
95 .withScope("Cluster")
96 .withVersion("v1")
97 .withPlural("securitygroups")
98 .build();
99
100 CustomResourceDefinitionContext securityGroupRuleCrdCxt = new CustomResourceDefinitionContext
101 .Builder()
102 .withGroup("kubevirt.io")
103 .withScope("Cluster")
104 .withVersion("v1")
105 .withPlural("securitygrouprules")
106 .build();
107
108 private ApplicationId appId;
109 private NodeId localNodeId;
110
111 @Activate
112 protected void activate() {
113 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
114 localNodeId = clusterService.getLocalNode().id();
115 leadershipService.runForLeadership(appId.name());
116 configService.addListener(configListener);
117
118 log.info("Started");
119 }
120
121 @Deactivate
122 protected void deactivate() {
123 configService.removeListener(configListener);
124 leadershipService.withdraw(appId.name());
125 eventExecutor.shutdown();
126
127 log.info("Stopped");
128 }
129
130 private void instantiateSgWatcher() {
131 KubernetesClient client = k8sClient(configService);
132
133 if (client != null) {
134 try {
135 client.customResource(securityGroupCrdCxt).watch(sgWatcher);
136 } catch (IOException e) {
137 e.printStackTrace();
138 }
139 }
140 }
141
142 private void instantiateSgrWatcher() {
143 KubernetesClient client = k8sClient(configService);
144
145 if (client != null) {
146 try {
147 client.customResource(securityGroupRuleCrdCxt).watch(sgrWatcher);
148 } catch (IOException e) {
149 e.printStackTrace();
150 }
151 }
152 }
153
154 private KubevirtSecurityGroup parseSecurityGroup(String resource) {
155 try {
156 ObjectMapper mapper = new ObjectMapper();
157 JsonNode json = mapper.readTree(resource);
158 ObjectNode spec = (ObjectNode) json.get("spec");
159 return codec(KubevirtSecurityGroup.class).decode(spec, this);
160 } catch (IOException e) {
161 log.error("Failed to parse kubevirt security group object");
162 }
163
164 return null;
165 }
166
167 private KubevirtSecurityGroupRule parseSecurityGroupRule(String resource) {
168 try {
169 ObjectMapper mapper = new ObjectMapper();
170 JsonNode json = mapper.readTree(resource);
171 ObjectNode spec = (ObjectNode) json.get("spec");
172 return codec(KubevirtSecurityGroupRule.class).decode(spec, this);
173 } catch (IOException e) {
174 log.error("Failed to parse kubevirt security group rule object");
175 }
176
177 return null;
178 }
179
180 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
181
182 private boolean isRelevantHelper() {
183 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
184 }
185
186 @Override
187 public void event(KubevirtApiConfigEvent event) {
188
189 switch (event.type()) {
190 case KUBEVIRT_API_CONFIG_UPDATED:
191 eventExecutor.execute(this::processConfigUpdate);
192 break;
193 case KUBEVIRT_API_CONFIG_CREATED:
194 case KUBEVIRT_API_CONFIG_REMOVED:
195 default:
196 // do nothing
197 break;
198 }
199 }
200
201 private void processConfigUpdate() {
202 if (!isRelevantHelper()) {
203 return;
204 }
205
206 instantiateSgWatcher();
207 instantiateSgrWatcher();
208 }
209 }
210
211 private class InternalSecurityGroupWatcher implements Watcher<String> {
212
213 @Override
214 public void eventReceived(Action action, String resource) {
215 switch (action) {
216 case ADDED:
217 eventExecutor.execute(() -> processAddition(resource));
218 break;
219 case MODIFIED:
220 eventExecutor.execute(() -> processModification(resource));
221 break;
222 case DELETED:
223 eventExecutor.execute(() -> processDeletion(resource));
224 break;
225 default:
226 // do nothing
227 break;
228 }
229 }
230
231 @Override
232 public void onClose(WatcherException e) {
233 // due to the bugs in fabric8, the watcher might be closed,
234 // we will re-instantiate the watcher in this case
235 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
236 log.warn("Security Group watcher OnClose, re-instantiate the watcher...");
237
238 instantiateSgWatcher();
239 }
240
241 private void processAddition(String resource) {
242 if (!isMaster()) {
243 return;
244 }
245
246 KubevirtSecurityGroup sg = parseSecurityGroup(resource);
247
248 if (sg != null) {
249 log.trace("Process Security Group {} creating event from API server.", sg.name());
250
251 if (adminService.securityGroup(sg.id()) == null) {
252 adminService.createSecurityGroup(sg);
253 }
254 }
255 }
256
257 private void processModification(String resource) {
258 if (!isMaster()) {
259 return;
260 }
261
262 KubevirtSecurityGroup sg = parseSecurityGroup(resource);
263
264 if (sg != null) {
265 log.trace("Process Security Group {} updating event from API server.", sg.name());
266
267 // since Security Group CRD does not contains any rules information,
268 // we need to manually add all rules from original to the updated one
269 KubevirtSecurityGroup orig = adminService.securityGroup(sg.id());
270 if (orig != null) {
271 KubevirtSecurityGroup updated = sg.updateRules(orig.rules());
272 adminService.updateSecurityGroup(updated);
273 }
274 }
275 }
276
277 private void processDeletion(String resource) {
278 if (!isMaster()) {
279 return;
280 }
281
282 KubevirtSecurityGroup sg = parseSecurityGroup(resource);
283
284 if (sg != null) {
285 log.trace("Process Security Group {} removal event from API server.", sg.name());
286
287 adminService.removeSecurityGroup(sg.id());
288 }
289 }
290 }
291
292 private class InternalSecurityGroupRuleWatcher implements Watcher<String> {
293
294 @Override
295 public void eventReceived(Action action, String resource) {
296 switch (action) {
297 case ADDED:
298 eventExecutor.execute(() -> processAddition(resource));
299 break;
300 case MODIFIED:
301 eventExecutor.execute(() -> processModification(resource));
302 break;
303 case DELETED:
304 eventExecutor.execute(() -> processDeletion(resource));
305 break;
306 default:
307 // do nothing
308 break;
309 }
310 }
311
312 @Override
313 public void onClose(WatcherException e) {
314 // due to the bugs in fabric8, the watcher might be closed,
315 // we will re-instantiate the watcher in this case
316 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
317 log.warn("Security Group Rule watcher OnClose, re-instantiate the watcher...");
318
319 instantiateSgrWatcher();
320 }
321
322 private void processAddition(String resource) {
323 if (!isMaster()) {
324 return;
325 }
326
327 KubevirtSecurityGroupRule sgr = parseSecurityGroupRule(resource);
328
329 if (sgr != null) {
330 log.trace("Process Security Group Rule {} creating event from API server.", sgr.id());
331
Jian Liae43d222021-07-16 23:50:11 +0900332 KubevirtSecurityGroup sg = adminService.securityGroup(sgr.securityGroupId());
333 if (sg == null) {
334 log.warn("Security Group {} is not found, we wait 5 seconds until " +
335 "the group to be installed.", sgr.securityGroupId());
336 waitFor(5);
337 }
338
Jian Li1c10cf22021-03-05 01:32:04 +0900339 if (adminService.securityGroupRule(sgr.id()) == null) {
340 adminService.createSecurityGroupRule(sgr);
341 }
342 }
343 }
344
345 private void processModification(String resource) {
346 if (!isMaster()) {
347 return;
348 }
349
350 // we do not handle the update case, as we assume the security group rule
351 // object is immutable
352 }
353
354 private void processDeletion(String resource) {
355 if (!isMaster()) {
356 return;
357 }
358
359 KubevirtSecurityGroupRule sgr = parseSecurityGroupRule(resource);
360
361 if (sgr != null) {
362 log.trace("Process Security Group Rule {} removal event from API server.", sgr.id());
363
364 adminService.removeSecurityGroupRule(sgr.id());
365 }
366 }
367 }
368
369 private boolean isMaster() {
370 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
371 }
372}