blob: f4f0156bf984e354a5927ae7f3726345dc188456 [file] [log] [blame]
Jian Lib6dc08f2021-03-24 15:24:18 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ArrayNode;
Jian Lia7c909a2022-03-03 18:42:49 +090021import com.google.common.collect.ImmutableMap;
Jian Lib6dc08f2021-03-24 15:24:18 +090022import io.fabric8.kubernetes.client.KubernetesClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090023import io.fabric8.kubernetes.client.Watcher;
24import io.fabric8.kubernetes.client.WatcherException;
25import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
26import org.apache.commons.lang3.StringUtils;
27import org.onlab.packet.IpAddress;
28import org.onlab.packet.MacAddress;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.LeadershipService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.core.ApplicationId;
33import org.onosproject.core.CoreService;
34import org.onosproject.kubevirtnetworking.api.DefaultKubevirtPort;
Jian Lib6dc08f2021-03-24 15:24:18 +090035import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
36import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
37import org.onosproject.kubevirtnetworking.api.KubevirtPort;
38import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
39import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
40import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
41import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
42import org.onosproject.kubevirtnode.api.KubevirtNodeService;
43import org.onosproject.mastership.MastershipService;
Jian Lib6dc08f2021-03-24 15:24:18 +090044import org.osgi.service.component.annotations.Activate;
45import org.osgi.service.component.annotations.Component;
46import org.osgi.service.component.annotations.Deactivate;
47import org.osgi.service.component.annotations.Reference;
48import org.osgi.service.component.annotations.ReferenceCardinality;
49import org.slf4j.Logger;
50
51import java.io.IOException;
52import java.util.HashMap;
53import java.util.HashSet;
54import java.util.Map;
55import java.util.Objects;
56import java.util.Set;
57import java.util.concurrent.ExecutorService;
Jian Lib6dc08f2021-03-24 15:24:18 +090058
Jian Lib6dc08f2021-03-24 15:24:18 +090059import static java.util.concurrent.Executors.newSingleThreadExecutor;
60import static org.onlab.util.Tools.groupedThreads;
61import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
Jian Lib6dc08f2021-03-24 15:24:18 +090062import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090063import static org.slf4j.LoggerFactory.getLogger;
64
65/**
66 * Kubernetes VM watcher used for feeding VM information.
67 */
68@Component(immediate = true)
69public class KubevirtVmWatcher {
70
71 private final Logger log = getLogger(getClass());
72
Jian Lib6dc08f2021-03-24 15:24:18 +090073 private static final String SPEC = "spec";
74 private static final String TEMPLATE = "template";
75 private static final String METADATA = "metadata";
76 private static final String ANNOTATIONS = "annotations";
77 private static final String DOMAIN = "domain";
78 private static final String DEVICES = "devices";
79 private static final String INTERFACES = "interfaces";
Jian Li8f944d42021-03-23 00:43:29 +090080 private static final String SECURITY_GROUPS = "securityGroups";
Jian Lib6dc08f2021-03-24 15:24:18 +090081 private static final String NAME = "name";
82 private static final String NETWORK = "network";
83 private static final String MAC = "macAddress";
84 private static final String IP = "ipAddress";
85 private static final String DEFAULT = "default";
Jian Li46592cf2021-05-11 18:12:55 +090086 private static final String CNI_ZERO = "cni0";
Jian Lib6dc08f2021-03-24 15:24:18 +090087 private static final String NETWORK_SUFFIX = "-net";
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected CoreService coreService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected MastershipService mastershipService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected ClusterService clusterService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected LeadershipService leadershipService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected KubevirtNodeService nodeService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected KubevirtNetworkAdminService networkAdminService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected KubevirtPortAdminService portAdminService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected KubevirtPodService podService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
114 protected KubevirtApiConfigService configService;
115
116 private final ExecutorService eventExecutor = newSingleThreadExecutor(
117 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
118
119 private final InternalKubevirtVmWatcher watcher = new InternalKubevirtVmWatcher();
120 private final InternalKubevirtApiConfigListener
121 configListener = new InternalKubevirtApiConfigListener();
122
123 CustomResourceDefinitionContext vmCrdCxt = new CustomResourceDefinitionContext
124 .Builder()
125 .withGroup("kubevirt.io")
126 .withScope("Namespaced")
127 .withVersion("v1")
128 .withPlural("virtualmachines")
129 .build();
130
131 private ApplicationId appId;
132 private NodeId localNodeId;
133
134 @Activate
135 protected void activate() {
136 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
137 localNodeId = clusterService.getLocalNode().id();
138 leadershipService.runForLeadership(appId.name());
139 configService.addListener(configListener);
140
141 log.info("Started");
142 }
143
144 @Deactivate
145 protected void deactivate() {
146 configService.removeListener(configListener);
147 leadershipService.withdraw(appId.name());
148 eventExecutor.shutdown();
149
150 log.info("Stopped");
151 }
152
153 private void instantiateWatcher() {
154 KubernetesClient client = k8sClient(configService);
155
156 if (client != null) {
157 try {
158 client.customResource(vmCrdCxt).watch(watcher);
159 } catch (IOException e) {
160 e.printStackTrace();
161 }
162 }
163 }
164
165 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
166
167 private boolean isRelevantHelper() {
168 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
169 }
170
171 @Override
172 public void event(KubevirtApiConfigEvent event) {
173
174 switch (event.type()) {
175 case KUBEVIRT_API_CONFIG_UPDATED:
176 eventExecutor.execute(this::processConfigUpdate);
177 break;
178 case KUBEVIRT_API_CONFIG_CREATED:
179 case KUBEVIRT_API_CONFIG_REMOVED:
180 default:
181 // do nothing
182 break;
183 }
184 }
185
186 private void processConfigUpdate() {
187 if (!isRelevantHelper()) {
188 return;
189 }
190
191 instantiateWatcher();
192 }
193 }
194
195 private class InternalKubevirtVmWatcher implements Watcher<String> {
196
197 @Override
198 public void eventReceived(Action action, String resource) {
199 switch (action) {
200 case ADDED:
201 eventExecutor.execute(() -> processAddition(resource));
202 break;
203 case DELETED:
204 eventExecutor.execute(() -> processDeletion(resource));
205 break;
Jian Li8f944d42021-03-23 00:43:29 +0900206 case MODIFIED:
207 eventExecutor.execute(() -> processModification(resource));
208 break;
Jian Lib6dc08f2021-03-24 15:24:18 +0900209 case ERROR:
210 log.warn("Failures processing VM manipulation.");
211 break;
212 default:
213 break;
214 }
215 }
216
217 @Override
218 public void onClose(WatcherException e) {
Jian Lic9e29242022-12-20 15:07:04 +0900219 // due to the bugs in fabric8, the watcher might be closed,
220 // we will re-instantiate the watcher in this case
221 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
222 log.info("VM watcher OnClose, re-instantiate the VM watcher...");
Jian Lib6dc08f2021-03-24 15:24:18 +0900223 instantiateWatcher();
224 }
225
226 private void processAddition(String resource) {
227 if (!isMaster()) {
228 return;
229 }
230
Jian Li9557e902021-06-08 10:12:52 +0900231 String vmName = parseVmName(resource);
232
Jian Lib6dc08f2021-03-24 15:24:18 +0900233 parseMacAddresses(resource).forEach((mac, net) -> {
234 KubevirtPort port = DefaultKubevirtPort.builder()
Jian Li9557e902021-06-08 10:12:52 +0900235 .vmName(vmName)
Jian Lib6dc08f2021-03-24 15:24:18 +0900236 .macAddress(mac)
237 .networkId(net)
238 .build();
239
Jian Li8f944d42021-03-23 00:43:29 +0900240 Set<String> sgs = parseSecurityGroups(resource);
241 port = port.updateSecurityGroups(sgs);
242
Jian Lib6dc08f2021-03-24 15:24:18 +0900243 Map<String, IpAddress> ips = parseIpAddresses(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900244 IpAddress ip = ips.get(port.networkId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900245
Jian Li7388cdc2021-05-16 16:41:13 +0900246 port = port.updateIpAddress(ip);
247
Jian Liea1ead72021-05-28 11:00:07 +0900248 if (portAdminService.port(port.macAddress()) == null) {
249 portAdminService.createPort(port);
250 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900251 });
252 }
253
Jian Li8f944d42021-03-23 00:43:29 +0900254 private void processModification(String resource) {
255 if (!isMaster()) {
256 return;
257 }
258
Jian Li9557e902021-06-08 10:12:52 +0900259 String vmName = parseVmName(resource);
260
Jian Li8f944d42021-03-23 00:43:29 +0900261 parseMacAddresses(resource).forEach((mac, net) -> {
262 KubevirtPort port = DefaultKubevirtPort.builder()
Jian Li9557e902021-06-08 10:12:52 +0900263 .vmName(vmName)
Jian Li8f944d42021-03-23 00:43:29 +0900264 .macAddress(mac)
265 .networkId(net)
266 .build();
267
268 KubevirtPort existing = portAdminService.port(port.macAddress());
Jian Li8f944d42021-03-23 00:43:29 +0900269 Set<String> sgs = parseSecurityGroups(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900270
Jian Li14541f52023-07-28 20:06:04 +0900271 Map<String, IpAddress> ips = parseIpAddresses(resource);
272 IpAddress ip = ips.get(port.networkId());
273
Jian Li5c6cf8f2022-03-16 11:46:45 +0900274 if (existing == null) {
275 // if the network related information is filled with VM update event,
276 // and there is no port found in the store
277 // we try to add port by extracting network related info from VM
278 port = port.updateSecurityGroups(sgs);
Jian Li5c6cf8f2022-03-16 11:46:45 +0900279 port = port.updateIpAddress(ip);
280 portAdminService.createPort(port);
281 } else {
Jian Li14541f52023-07-28 20:06:04 +0900282 // we only update the port, if either the newly updated
283 // security groups have different values compared to existing
284 // ones or the newly updated IP address has been changed
285 KubevirtPort updatedPort = existing;
286 if (!existing.securityGroups().equals(sgs)) {
287 updatedPort = updatedPort.updateSecurityGroups(sgs);
288 }
289 if (!existing.ipAddress().equals(ip)) {
290 updatedPort = updatedPort.updateIpAddress(ip);
291 }
Jian Lifd151922023-11-30 21:54:17 +0900292 if (!existing.securityGroups().equals(sgs) ||
293 !existing.ipAddress().equals(ip)) {
Jian Li14541f52023-07-28 20:06:04 +0900294 portAdminService.updatePort(updatedPort);
Jian Li5c6cf8f2022-03-16 11:46:45 +0900295 }
Jian Li7388cdc2021-05-16 16:41:13 +0900296 }
Jian Li8f944d42021-03-23 00:43:29 +0900297 });
298 }
299
Jian Lib6dc08f2021-03-24 15:24:18 +0900300 private void processDeletion(String resource) {
301 if (!isMaster()) {
302 return;
303 }
304
305 parseMacAddresses(resource).forEach((mac, net) -> {
306 KubevirtPort port = portAdminService.port(mac);
307 if (port != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900308 portAdminService.removePort(mac);
309 }
310 });
311 }
312
313 private boolean isMaster() {
314 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
315 }
316
Jian Li9557e902021-06-08 10:12:52 +0900317 private String parseVmName(String resource) {
318 String vmName = null;
319 try {
320 ObjectMapper mapper = new ObjectMapper();
321 JsonNode json = mapper.readTree(resource);
322 JsonNode nameJson = json.get(METADATA).get(NAME);
323 if (nameJson != null) {
324 vmName = nameJson.asText();
325 }
326 } catch (IOException e) {
327 log.error("Failed to parse kubevirt VM name");
328 }
329
330 return vmName;
331 }
332
Jian Lib6dc08f2021-03-24 15:24:18 +0900333 private Map<String, IpAddress> parseIpAddresses(String resource) {
334 try {
335 ObjectMapper mapper = new ObjectMapper();
336 JsonNode json = mapper.readTree(resource);
337 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
338
339 JsonNode annots = metadata.get(ANNOTATIONS);
340 if (annots == null) {
341 return new HashMap<>();
342 }
343
344 JsonNode interfacesJson = annots.get(INTERFACES);
345 if (interfacesJson == null) {
346 return new HashMap<>();
347 }
348
349 Map<String, IpAddress> result = new HashMap<>();
350
351 String interfacesString = interfacesJson.asText();
352 ArrayNode interfaces = (ArrayNode) mapper.readTree(interfacesString);
353 for (JsonNode intf : interfaces) {
354 String network = intf.get(NETWORK).asText();
355 String ip = intf.get(IP).asText();
356 result.put(network, IpAddress.valueOf(ip));
357 }
358
359 return result;
360 } catch (IOException e) {
361 log.error("Failed to parse kubevirt VM IP addresses");
362 }
363
364 return new HashMap<>();
365 }
366
Jian Li8f944d42021-03-23 00:43:29 +0900367 private Set<String> parseSecurityGroups(String resource) {
368 try {
369 ObjectMapper mapper = new ObjectMapper();
370 JsonNode json = mapper.readTree(resource);
371 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
372
373 JsonNode annots = metadata.get(ANNOTATIONS);
374 if (annots == null) {
375 return new HashSet<>();
376 }
377
378 JsonNode sgsJson = annots.get(SECURITY_GROUPS);
379 if (sgsJson == null) {
380 return new HashSet<>();
381 }
382
383 Set<String> result = new HashSet<>();
384 ArrayNode sgs = (ArrayNode) mapper.readTree(sgsJson.asText());
385 for (JsonNode sg : sgs) {
386 result.add(sg.asText());
387 }
388
389 return result;
390
391 } catch (IOException e) {
392 log.error("Failed to parse kubevirt security group IDs.");
393 }
394
395 return new HashSet<>();
396 }
397
Jian Lib6dc08f2021-03-24 15:24:18 +0900398 private Map<MacAddress, String> parseMacAddresses(String resource) {
399 try {
400 ObjectMapper mapper = new ObjectMapper();
401 JsonNode json = mapper.readTree(resource);
402 JsonNode spec = json.get(SPEC).get(TEMPLATE).get(SPEC);
403 ArrayNode interfaces = (ArrayNode) spec.get(DOMAIN).get(DEVICES).get(INTERFACES);
404
Jian Lia7c909a2022-03-03 18:42:49 +0900405 // if the VM is not associated with any network, we skip parsing MAC address
406 if (interfaces == null) {
407 return ImmutableMap.of();
408 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900409 Map<MacAddress, String> result = new HashMap<>();
410 for (JsonNode intf : interfaces) {
411 String network = intf.get(NAME).asText();
412 JsonNode macJson = intf.get(MAC);
413
Jian Li46592cf2021-05-11 18:12:55 +0900414 if (!DEFAULT.equals(network) && !CNI_ZERO.equals(network) && macJson != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900415 String compact = StringUtils.substringBeforeLast(network, NETWORK_SUFFIX);
416 MacAddress mac = MacAddress.valueOf(macJson.asText());
417 result.put(mac, compact);
418 }
419 }
420
421 return result;
422 } catch (IOException e) {
423 log.error("Failed to parse kubevirt VM MAC addresses");
424 }
425
426 return new HashMap<>();
427 }
428 }
429}