blob: f13d5b1e469494fe26ed828cc5ebb81f7083443f [file] [log] [blame]
Jian Lib6dc08f2021-03-24 15:24:18 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ArrayNode;
Jian Li8944d7c2022-03-03 18:42:49 +090021import com.google.common.collect.ImmutableMap;
Jian Lib6dc08f2021-03-24 15:24:18 +090022import io.fabric8.kubernetes.client.KubernetesClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090023import io.fabric8.kubernetes.client.Watcher;
24import io.fabric8.kubernetes.client.WatcherException;
25import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
26import org.apache.commons.lang3.StringUtils;
27import org.onlab.packet.IpAddress;
28import org.onlab.packet.MacAddress;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.LeadershipService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.core.ApplicationId;
33import org.onosproject.core.CoreService;
34import org.onosproject.kubevirtnetworking.api.DefaultKubevirtPort;
Jian Lib6dc08f2021-03-24 15:24:18 +090035import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
36import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
37import org.onosproject.kubevirtnetworking.api.KubevirtPort;
38import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
39import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
40import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
41import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
42import org.onosproject.kubevirtnode.api.KubevirtNodeService;
43import org.onosproject.mastership.MastershipService;
Jian Lib6dc08f2021-03-24 15:24:18 +090044import org.osgi.service.component.annotations.Activate;
45import org.osgi.service.component.annotations.Component;
46import org.osgi.service.component.annotations.Deactivate;
47import org.osgi.service.component.annotations.Reference;
48import org.osgi.service.component.annotations.ReferenceCardinality;
49import org.slf4j.Logger;
50
51import java.io.IOException;
52import java.util.HashMap;
53import java.util.HashSet;
54import java.util.Map;
55import java.util.Objects;
56import java.util.Set;
57import java.util.concurrent.ExecutorService;
Jian Lib6dc08f2021-03-24 15:24:18 +090058
Jian Lib6dc08f2021-03-24 15:24:18 +090059import static java.util.concurrent.Executors.newSingleThreadExecutor;
60import static org.onlab.util.Tools.groupedThreads;
61import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
Jian Lib6dc08f2021-03-24 15:24:18 +090062import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090063import static org.slf4j.LoggerFactory.getLogger;
64
65/**
66 * Kubernetes VM watcher used for feeding VM information.
67 */
68@Component(immediate = true)
69public class KubevirtVmWatcher {
70
71 private final Logger log = getLogger(getClass());
72
Jian Lib6dc08f2021-03-24 15:24:18 +090073 private static final String SPEC = "spec";
74 private static final String TEMPLATE = "template";
75 private static final String METADATA = "metadata";
76 private static final String ANNOTATIONS = "annotations";
77 private static final String DOMAIN = "domain";
78 private static final String DEVICES = "devices";
79 private static final String INTERFACES = "interfaces";
Jian Li8f944d42021-03-23 00:43:29 +090080 private static final String SECURITY_GROUPS = "securityGroups";
Jian Lib6dc08f2021-03-24 15:24:18 +090081 private static final String NAME = "name";
82 private static final String NETWORK = "network";
83 private static final String MAC = "macAddress";
84 private static final String IP = "ipAddress";
85 private static final String DEFAULT = "default";
Jian Li46592cf2021-05-11 18:12:55 +090086 private static final String CNI_ZERO = "cni0";
Jian Lib6dc08f2021-03-24 15:24:18 +090087 private static final String NETWORK_SUFFIX = "-net";
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected CoreService coreService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected MastershipService mastershipService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected ClusterService clusterService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected LeadershipService leadershipService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected KubevirtNodeService nodeService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected KubevirtNetworkAdminService networkAdminService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected KubevirtPortAdminService portAdminService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected KubevirtPodService podService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
114 protected KubevirtApiConfigService configService;
115
116 private final ExecutorService eventExecutor = newSingleThreadExecutor(
117 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
118
119 private final InternalKubevirtVmWatcher watcher = new InternalKubevirtVmWatcher();
120 private final InternalKubevirtApiConfigListener
121 configListener = new InternalKubevirtApiConfigListener();
122
123 CustomResourceDefinitionContext vmCrdCxt = new CustomResourceDefinitionContext
124 .Builder()
125 .withGroup("kubevirt.io")
126 .withScope("Namespaced")
127 .withVersion("v1")
128 .withPlural("virtualmachines")
129 .build();
130
131 private ApplicationId appId;
132 private NodeId localNodeId;
133
134 @Activate
135 protected void activate() {
136 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
137 localNodeId = clusterService.getLocalNode().id();
138 leadershipService.runForLeadership(appId.name());
139 configService.addListener(configListener);
140
141 log.info("Started");
142 }
143
144 @Deactivate
145 protected void deactivate() {
146 configService.removeListener(configListener);
147 leadershipService.withdraw(appId.name());
148 eventExecutor.shutdown();
149
150 log.info("Stopped");
151 }
152
153 private void instantiateWatcher() {
154 KubernetesClient client = k8sClient(configService);
155
156 if (client != null) {
157 try {
158 client.customResource(vmCrdCxt).watch(watcher);
159 } catch (IOException e) {
160 e.printStackTrace();
161 }
162 }
163 }
164
165 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
166
167 private boolean isRelevantHelper() {
168 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
169 }
170
171 @Override
172 public void event(KubevirtApiConfigEvent event) {
173
174 switch (event.type()) {
175 case KUBEVIRT_API_CONFIG_UPDATED:
176 eventExecutor.execute(this::processConfigUpdate);
177 break;
178 case KUBEVIRT_API_CONFIG_CREATED:
179 case KUBEVIRT_API_CONFIG_REMOVED:
180 default:
181 // do nothing
182 break;
183 }
184 }
185
186 private void processConfigUpdate() {
187 if (!isRelevantHelper()) {
188 return;
189 }
190
191 instantiateWatcher();
192 }
193 }
194
195 private class InternalKubevirtVmWatcher implements Watcher<String> {
196
197 @Override
198 public void eventReceived(Action action, String resource) {
199 switch (action) {
200 case ADDED:
201 eventExecutor.execute(() -> processAddition(resource));
202 break;
203 case DELETED:
204 eventExecutor.execute(() -> processDeletion(resource));
205 break;
Jian Li8f944d42021-03-23 00:43:29 +0900206 case MODIFIED:
207 eventExecutor.execute(() -> processModification(resource));
208 break;
Jian Lib6dc08f2021-03-24 15:24:18 +0900209 case ERROR:
210 log.warn("Failures processing VM manipulation.");
211 break;
212 default:
213 break;
214 }
215 }
216
217 @Override
218 public void onClose(WatcherException e) {
Jian Li5e012a42022-12-20 15:07:04 +0900219 // due to the bugs in fabric8, the watcher might be closed,
220 // we will re-instantiate the watcher in this case
221 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
222 log.info("VM watcher OnClose, re-instantiate the VM watcher...");
Jian Lib6dc08f2021-03-24 15:24:18 +0900223 instantiateWatcher();
224 }
225
226 private void processAddition(String resource) {
227 if (!isMaster()) {
228 return;
229 }
230
Jian Li9557e902021-06-08 10:12:52 +0900231 String vmName = parseVmName(resource);
232
Jian Lib6dc08f2021-03-24 15:24:18 +0900233 parseMacAddresses(resource).forEach((mac, net) -> {
234 KubevirtPort port = DefaultKubevirtPort.builder()
Jian Li9557e902021-06-08 10:12:52 +0900235 .vmName(vmName)
Jian Lib6dc08f2021-03-24 15:24:18 +0900236 .macAddress(mac)
237 .networkId(net)
238 .build();
239
Jian Li8f944d42021-03-23 00:43:29 +0900240 Set<String> sgs = parseSecurityGroups(resource);
241 port = port.updateSecurityGroups(sgs);
242
Jian Lib6dc08f2021-03-24 15:24:18 +0900243 Map<String, IpAddress> ips = parseIpAddresses(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900244 IpAddress ip = ips.get(port.networkId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900245
Jian Li7388cdc2021-05-16 16:41:13 +0900246 port = port.updateIpAddress(ip);
247
Jian Liea1ead72021-05-28 11:00:07 +0900248 if (portAdminService.port(port.macAddress()) == null) {
249 portAdminService.createPort(port);
250 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900251 });
252 }
253
Jian Li8f944d42021-03-23 00:43:29 +0900254 private void processModification(String resource) {
255 if (!isMaster()) {
256 return;
257 }
258
Jian Li9557e902021-06-08 10:12:52 +0900259 String vmName = parseVmName(resource);
260
Jian Li8f944d42021-03-23 00:43:29 +0900261 parseMacAddresses(resource).forEach((mac, net) -> {
262 KubevirtPort port = DefaultKubevirtPort.builder()
Jian Li9557e902021-06-08 10:12:52 +0900263 .vmName(vmName)
Jian Li8f944d42021-03-23 00:43:29 +0900264 .macAddress(mac)
265 .networkId(net)
266 .build();
267
268 KubevirtPort existing = portAdminService.port(port.macAddress());
Jian Li8f944d42021-03-23 00:43:29 +0900269 Set<String> sgs = parseSecurityGroups(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900270
Jian Li1bbd5002023-07-28 20:06:04 +0900271 Map<String, IpAddress> ips = parseIpAddresses(resource);
272 IpAddress ip = ips.get(port.networkId());
273
Jian Li14968b52022-03-16 11:46:45 +0900274 if (existing == null) {
275 // if the network related information is filled with VM update event,
276 // and there is no port found in the store
277 // we try to add port by extracting network related info from VM
278 port = port.updateSecurityGroups(sgs);
Jian Li14968b52022-03-16 11:46:45 +0900279 port = port.updateIpAddress(ip);
280 portAdminService.createPort(port);
281 } else {
Jian Li1bbd5002023-07-28 20:06:04 +0900282 // we only update the port, if either the newly updated
283 // security groups have different values compared to existing
284 // ones or the newly updated IP address has been changed
285 KubevirtPort updatedPort = existing;
286 if (!existing.securityGroups().equals(sgs)) {
287 updatedPort = updatedPort.updateSecurityGroups(sgs);
288 }
289 if (!existing.ipAddress().equals(ip)) {
290 updatedPort = updatedPort.updateIpAddress(ip);
291 }
292 if (!port.securityGroups().equals(sgs) || !port.ipAddress().equals(ip)) {
293 portAdminService.updatePort(updatedPort);
Jian Li14968b52022-03-16 11:46:45 +0900294 }
Jian Li7388cdc2021-05-16 16:41:13 +0900295 }
Jian Li8f944d42021-03-23 00:43:29 +0900296 });
297 }
298
Jian Lib6dc08f2021-03-24 15:24:18 +0900299 private void processDeletion(String resource) {
300 if (!isMaster()) {
301 return;
302 }
303
304 parseMacAddresses(resource).forEach((mac, net) -> {
305 KubevirtPort port = portAdminService.port(mac);
306 if (port != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900307 portAdminService.removePort(mac);
308 }
309 });
310 }
311
312 private boolean isMaster() {
313 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
314 }
315
Jian Li9557e902021-06-08 10:12:52 +0900316 private String parseVmName(String resource) {
317 String vmName = null;
318 try {
319 ObjectMapper mapper = new ObjectMapper();
320 JsonNode json = mapper.readTree(resource);
321 JsonNode nameJson = json.get(METADATA).get(NAME);
322 if (nameJson != null) {
323 vmName = nameJson.asText();
324 }
325 } catch (IOException e) {
326 log.error("Failed to parse kubevirt VM name");
327 }
328
329 return vmName;
330 }
331
Jian Lib6dc08f2021-03-24 15:24:18 +0900332 private Map<String, IpAddress> parseIpAddresses(String resource) {
333 try {
334 ObjectMapper mapper = new ObjectMapper();
335 JsonNode json = mapper.readTree(resource);
336 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
337
338 JsonNode annots = metadata.get(ANNOTATIONS);
339 if (annots == null) {
340 return new HashMap<>();
341 }
342
343 JsonNode interfacesJson = annots.get(INTERFACES);
344 if (interfacesJson == null) {
345 return new HashMap<>();
346 }
347
348 Map<String, IpAddress> result = new HashMap<>();
349
350 String interfacesString = interfacesJson.asText();
351 ArrayNode interfaces = (ArrayNode) mapper.readTree(interfacesString);
352 for (JsonNode intf : interfaces) {
353 String network = intf.get(NETWORK).asText();
354 String ip = intf.get(IP).asText();
355 result.put(network, IpAddress.valueOf(ip));
356 }
357
358 return result;
359 } catch (IOException e) {
360 log.error("Failed to parse kubevirt VM IP addresses");
361 }
362
363 return new HashMap<>();
364 }
365
Jian Li8f944d42021-03-23 00:43:29 +0900366 private Set<String> parseSecurityGroups(String resource) {
367 try {
368 ObjectMapper mapper = new ObjectMapper();
369 JsonNode json = mapper.readTree(resource);
370 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
371
372 JsonNode annots = metadata.get(ANNOTATIONS);
373 if (annots == null) {
374 return new HashSet<>();
375 }
376
377 JsonNode sgsJson = annots.get(SECURITY_GROUPS);
378 if (sgsJson == null) {
379 return new HashSet<>();
380 }
381
382 Set<String> result = new HashSet<>();
383 ArrayNode sgs = (ArrayNode) mapper.readTree(sgsJson.asText());
384 for (JsonNode sg : sgs) {
385 result.add(sg.asText());
386 }
387
388 return result;
389
390 } catch (IOException e) {
391 log.error("Failed to parse kubevirt security group IDs.");
392 }
393
394 return new HashSet<>();
395 }
396
Jian Lib6dc08f2021-03-24 15:24:18 +0900397 private Map<MacAddress, String> parseMacAddresses(String resource) {
398 try {
399 ObjectMapper mapper = new ObjectMapper();
400 JsonNode json = mapper.readTree(resource);
401 JsonNode spec = json.get(SPEC).get(TEMPLATE).get(SPEC);
402 ArrayNode interfaces = (ArrayNode) spec.get(DOMAIN).get(DEVICES).get(INTERFACES);
403
Jian Li8944d7c2022-03-03 18:42:49 +0900404 // if the VM is not associated with any network, we skip parsing MAC address
405 if (interfaces == null) {
406 return ImmutableMap.of();
407 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900408 Map<MacAddress, String> result = new HashMap<>();
409 for (JsonNode intf : interfaces) {
410 String network = intf.get(NAME).asText();
411 JsonNode macJson = intf.get(MAC);
412
Jian Li46592cf2021-05-11 18:12:55 +0900413 if (!DEFAULT.equals(network) && !CNI_ZERO.equals(network) && macJson != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900414 String compact = StringUtils.substringBeforeLast(network, NETWORK_SUFFIX);
415 MacAddress mac = MacAddress.valueOf(macJson.asText());
416 result.put(mac, compact);
417 }
418 }
419
420 return result;
421 } catch (IOException e) {
422 log.error("Failed to parse kubevirt VM MAC addresses");
423 }
424
425 return new HashMap<>();
426 }
427 }
428}