blob: 8f38c4a7d7bc7b558c3f2f59a906f9af6a63d4e5 [file] [log] [blame]
Jian Lib6dc08f2021-03-24 15:24:18 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ArrayNode;
Jian Lib6dc08f2021-03-24 15:24:18 +090021import io.fabric8.kubernetes.client.KubernetesClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090022import io.fabric8.kubernetes.client.Watcher;
23import io.fabric8.kubernetes.client.WatcherException;
24import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
25import org.apache.commons.lang3.StringUtils;
26import org.onlab.packet.IpAddress;
27import org.onlab.packet.MacAddress;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.LeadershipService;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
33import org.onosproject.kubevirtnetworking.api.DefaultKubevirtPort;
Jian Lib6dc08f2021-03-24 15:24:18 +090034import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
35import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
36import org.onosproject.kubevirtnetworking.api.KubevirtPort;
37import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
38import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
39import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
40import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
41import org.onosproject.kubevirtnode.api.KubevirtNodeService;
42import org.onosproject.mastership.MastershipService;
Jian Lib6dc08f2021-03-24 15:24:18 +090043import org.osgi.service.component.annotations.Activate;
44import org.osgi.service.component.annotations.Component;
45import org.osgi.service.component.annotations.Deactivate;
46import org.osgi.service.component.annotations.Reference;
47import org.osgi.service.component.annotations.ReferenceCardinality;
48import org.slf4j.Logger;
49
50import java.io.IOException;
51import java.util.HashMap;
52import java.util.HashSet;
53import java.util.Map;
54import java.util.Objects;
55import java.util.Set;
56import java.util.concurrent.ExecutorService;
Jian Lib6dc08f2021-03-24 15:24:18 +090057
Jian Lib6dc08f2021-03-24 15:24:18 +090058import static java.util.concurrent.Executors.newSingleThreadExecutor;
59import static org.onlab.util.Tools.groupedThreads;
60import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
Jian Lib6dc08f2021-03-24 15:24:18 +090061import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090062import static org.slf4j.LoggerFactory.getLogger;
63
64/**
65 * Kubernetes VM watcher used for feeding VM information.
66 */
67@Component(immediate = true)
68public class KubevirtVmWatcher {
69
70 private final Logger log = getLogger(getClass());
71
Jian Lib6dc08f2021-03-24 15:24:18 +090072 private static final String SPEC = "spec";
73 private static final String TEMPLATE = "template";
74 private static final String METADATA = "metadata";
75 private static final String ANNOTATIONS = "annotations";
76 private static final String DOMAIN = "domain";
77 private static final String DEVICES = "devices";
78 private static final String INTERFACES = "interfaces";
Jian Li8f944d42021-03-23 00:43:29 +090079 private static final String SECURITY_GROUPS = "securityGroups";
Jian Lib6dc08f2021-03-24 15:24:18 +090080 private static final String NAME = "name";
81 private static final String NETWORK = "network";
82 private static final String MAC = "macAddress";
83 private static final String IP = "ipAddress";
84 private static final String DEFAULT = "default";
Jian Li46592cf2021-05-11 18:12:55 +090085 private static final String CNI_ZERO = "cni0";
Jian Lib6dc08f2021-03-24 15:24:18 +090086 private static final String NETWORK_SUFFIX = "-net";
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected CoreService coreService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected MastershipService mastershipService;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected ClusterService clusterService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected LeadershipService leadershipService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected KubevirtNodeService nodeService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected KubevirtNetworkAdminService networkAdminService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected KubevirtPortAdminService portAdminService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected KubevirtPodService podService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected KubevirtApiConfigService configService;
114
115 private final ExecutorService eventExecutor = newSingleThreadExecutor(
116 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
117
118 private final InternalKubevirtVmWatcher watcher = new InternalKubevirtVmWatcher();
119 private final InternalKubevirtApiConfigListener
120 configListener = new InternalKubevirtApiConfigListener();
121
122 CustomResourceDefinitionContext vmCrdCxt = new CustomResourceDefinitionContext
123 .Builder()
124 .withGroup("kubevirt.io")
125 .withScope("Namespaced")
126 .withVersion("v1")
127 .withPlural("virtualmachines")
128 .build();
129
130 private ApplicationId appId;
131 private NodeId localNodeId;
132
133 @Activate
134 protected void activate() {
135 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
136 localNodeId = clusterService.getLocalNode().id();
137 leadershipService.runForLeadership(appId.name());
138 configService.addListener(configListener);
139
140 log.info("Started");
141 }
142
143 @Deactivate
144 protected void deactivate() {
145 configService.removeListener(configListener);
146 leadershipService.withdraw(appId.name());
147 eventExecutor.shutdown();
148
149 log.info("Stopped");
150 }
151
152 private void instantiateWatcher() {
153 KubernetesClient client = k8sClient(configService);
154
155 if (client != null) {
156 try {
157 client.customResource(vmCrdCxt).watch(watcher);
158 } catch (IOException e) {
159 e.printStackTrace();
160 }
161 }
162 }
163
164 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
165
166 private boolean isRelevantHelper() {
167 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
168 }
169
170 @Override
171 public void event(KubevirtApiConfigEvent event) {
172
173 switch (event.type()) {
174 case KUBEVIRT_API_CONFIG_UPDATED:
175 eventExecutor.execute(this::processConfigUpdate);
176 break;
177 case KUBEVIRT_API_CONFIG_CREATED:
178 case KUBEVIRT_API_CONFIG_REMOVED:
179 default:
180 // do nothing
181 break;
182 }
183 }
184
185 private void processConfigUpdate() {
186 if (!isRelevantHelper()) {
187 return;
188 }
189
190 instantiateWatcher();
191 }
192 }
193
194 private class InternalKubevirtVmWatcher implements Watcher<String> {
195
196 @Override
197 public void eventReceived(Action action, String resource) {
198 switch (action) {
199 case ADDED:
200 eventExecutor.execute(() -> processAddition(resource));
201 break;
202 case DELETED:
203 eventExecutor.execute(() -> processDeletion(resource));
204 break;
Jian Li8f944d42021-03-23 00:43:29 +0900205 case MODIFIED:
206 eventExecutor.execute(() -> processModification(resource));
207 break;
Jian Lib6dc08f2021-03-24 15:24:18 +0900208 case ERROR:
209 log.warn("Failures processing VM manipulation.");
210 break;
211 default:
212 break;
213 }
214 }
215
216 @Override
217 public void onClose(WatcherException e) {
218 log.warn("VM watcher OnClose, re-instantiate the VM watcher...");
219 instantiateWatcher();
220 }
221
222 private void processAddition(String resource) {
223 if (!isMaster()) {
224 return;
225 }
226
227 parseMacAddresses(resource).forEach((mac, net) -> {
228 KubevirtPort port = DefaultKubevirtPort.builder()
229 .macAddress(mac)
230 .networkId(net)
231 .build();
232
Jian Li8f944d42021-03-23 00:43:29 +0900233 Set<String> sgs = parseSecurityGroups(resource);
234 port = port.updateSecurityGroups(sgs);
235
Jian Lib6dc08f2021-03-24 15:24:18 +0900236 Map<String, IpAddress> ips = parseIpAddresses(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900237 IpAddress ip = ips.get(port.networkId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900238
Jian Li7388cdc2021-05-16 16:41:13 +0900239 port = port.updateIpAddress(ip);
240
Jian Liea1ead72021-05-28 11:00:07 +0900241 if (portAdminService.port(port.macAddress()) == null) {
242 portAdminService.createPort(port);
243 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900244 });
245 }
246
Jian Li8f944d42021-03-23 00:43:29 +0900247 private void processModification(String resource) {
248 if (!isMaster()) {
249 return;
250 }
251
252 parseMacAddresses(resource).forEach((mac, net) -> {
253 KubevirtPort port = DefaultKubevirtPort.builder()
254 .macAddress(mac)
255 .networkId(net)
256 .build();
257
258 KubevirtPort existing = portAdminService.port(port.macAddress());
259
260 if (existing == null) {
261 return;
262 }
263
264 Set<String> sgs = parseSecurityGroups(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900265
266 // we only update the port, if the newly updated security groups
267 // have different values compared to existing ones
268 if (!port.securityGroups().equals(sgs)) {
269 portAdminService.updatePort(existing.updateSecurityGroups(sgs));
270 }
Jian Li8f944d42021-03-23 00:43:29 +0900271 });
272 }
273
Jian Lib6dc08f2021-03-24 15:24:18 +0900274 private void processDeletion(String resource) {
275 if (!isMaster()) {
276 return;
277 }
278
279 parseMacAddresses(resource).forEach((mac, net) -> {
280 KubevirtPort port = portAdminService.port(mac);
281 if (port != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900282 portAdminService.removePort(mac);
283 }
284 });
285 }
286
287 private boolean isMaster() {
288 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
289 }
290
Jian Lib6dc08f2021-03-24 15:24:18 +0900291 private Map<String, IpAddress> parseIpAddresses(String resource) {
292 try {
293 ObjectMapper mapper = new ObjectMapper();
294 JsonNode json = mapper.readTree(resource);
295 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
296
297 JsonNode annots = metadata.get(ANNOTATIONS);
298 if (annots == null) {
299 return new HashMap<>();
300 }
301
302 JsonNode interfacesJson = annots.get(INTERFACES);
303 if (interfacesJson == null) {
304 return new HashMap<>();
305 }
306
307 Map<String, IpAddress> result = new HashMap<>();
308
309 String interfacesString = interfacesJson.asText();
310 ArrayNode interfaces = (ArrayNode) mapper.readTree(interfacesString);
311 for (JsonNode intf : interfaces) {
312 String network = intf.get(NETWORK).asText();
313 String ip = intf.get(IP).asText();
314 result.put(network, IpAddress.valueOf(ip));
315 }
316
317 return result;
318 } catch (IOException e) {
319 log.error("Failed to parse kubevirt VM IP addresses");
320 }
321
322 return new HashMap<>();
323 }
324
Jian Li8f944d42021-03-23 00:43:29 +0900325 private Set<String> parseSecurityGroups(String resource) {
326 try {
327 ObjectMapper mapper = new ObjectMapper();
328 JsonNode json = mapper.readTree(resource);
329 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
330
331 JsonNode annots = metadata.get(ANNOTATIONS);
332 if (annots == null) {
333 return new HashSet<>();
334 }
335
336 JsonNode sgsJson = annots.get(SECURITY_GROUPS);
337 if (sgsJson == null) {
338 return new HashSet<>();
339 }
340
341 Set<String> result = new HashSet<>();
342 ArrayNode sgs = (ArrayNode) mapper.readTree(sgsJson.asText());
343 for (JsonNode sg : sgs) {
344 result.add(sg.asText());
345 }
346
347 return result;
348
349 } catch (IOException e) {
350 log.error("Failed to parse kubevirt security group IDs.");
351 }
352
353 return new HashSet<>();
354 }
355
Jian Lib6dc08f2021-03-24 15:24:18 +0900356 private Map<MacAddress, String> parseMacAddresses(String resource) {
357 try {
358 ObjectMapper mapper = new ObjectMapper();
359 JsonNode json = mapper.readTree(resource);
360 JsonNode spec = json.get(SPEC).get(TEMPLATE).get(SPEC);
361 ArrayNode interfaces = (ArrayNode) spec.get(DOMAIN).get(DEVICES).get(INTERFACES);
362
363 Map<MacAddress, String> result = new HashMap<>();
364 for (JsonNode intf : interfaces) {
365 String network = intf.get(NAME).asText();
366 JsonNode macJson = intf.get(MAC);
367
Jian Li46592cf2021-05-11 18:12:55 +0900368 if (!DEFAULT.equals(network) && !CNI_ZERO.equals(network) && macJson != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900369 String compact = StringUtils.substringBeforeLast(network, NETWORK_SUFFIX);
370 MacAddress mac = MacAddress.valueOf(macJson.asText());
371 result.put(mac, compact);
372 }
373 }
374
375 return result;
376 } catch (IOException e) {
377 log.error("Failed to parse kubevirt VM MAC addresses");
378 }
379
380 return new HashMap<>();
381 }
382 }
383}