blob: 10328ee9c7b5993b213104562415cef4016a3d96 [file] [log] [blame]
Jian Lib6dc08f2021-03-24 15:24:18 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ArrayNode;
Jian Li8944d7c2022-03-03 18:42:49 +090021import com.google.common.collect.ImmutableMap;
Jian Lib6dc08f2021-03-24 15:24:18 +090022import io.fabric8.kubernetes.client.KubernetesClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090023import io.fabric8.kubernetes.client.Watcher;
24import io.fabric8.kubernetes.client.WatcherException;
25import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
26import org.apache.commons.lang3.StringUtils;
27import org.onlab.packet.IpAddress;
28import org.onlab.packet.MacAddress;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.LeadershipService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.core.ApplicationId;
33import org.onosproject.core.CoreService;
34import org.onosproject.kubevirtnetworking.api.DefaultKubevirtPort;
Jian Lib6dc08f2021-03-24 15:24:18 +090035import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
36import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
37import org.onosproject.kubevirtnetworking.api.KubevirtPort;
38import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
39import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
40import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
41import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
42import org.onosproject.kubevirtnode.api.KubevirtNodeService;
43import org.onosproject.mastership.MastershipService;
Jian Lib6dc08f2021-03-24 15:24:18 +090044import org.osgi.service.component.annotations.Activate;
45import org.osgi.service.component.annotations.Component;
46import org.osgi.service.component.annotations.Deactivate;
47import org.osgi.service.component.annotations.Reference;
48import org.osgi.service.component.annotations.ReferenceCardinality;
49import org.slf4j.Logger;
50
51import java.io.IOException;
52import java.util.HashMap;
53import java.util.HashSet;
54import java.util.Map;
55import java.util.Objects;
56import java.util.Set;
57import java.util.concurrent.ExecutorService;
Jian Lib6dc08f2021-03-24 15:24:18 +090058
Jian Lib6dc08f2021-03-24 15:24:18 +090059import static java.util.concurrent.Executors.newSingleThreadExecutor;
60import static org.onlab.util.Tools.groupedThreads;
61import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
Jian Lib6dc08f2021-03-24 15:24:18 +090062import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090063import static org.slf4j.LoggerFactory.getLogger;
64
65/**
66 * Kubernetes VM watcher used for feeding VM information.
67 */
68@Component(immediate = true)
69public class KubevirtVmWatcher {
70
71 private final Logger log = getLogger(getClass());
72
Jian Lib6dc08f2021-03-24 15:24:18 +090073 private static final String SPEC = "spec";
74 private static final String TEMPLATE = "template";
75 private static final String METADATA = "metadata";
76 private static final String ANNOTATIONS = "annotations";
77 private static final String DOMAIN = "domain";
78 private static final String DEVICES = "devices";
79 private static final String INTERFACES = "interfaces";
Jian Li8f944d42021-03-23 00:43:29 +090080 private static final String SECURITY_GROUPS = "securityGroups";
Jian Lib6dc08f2021-03-24 15:24:18 +090081 private static final String NAME = "name";
82 private static final String NETWORK = "network";
83 private static final String MAC = "macAddress";
84 private static final String IP = "ipAddress";
85 private static final String DEFAULT = "default";
Jian Li46592cf2021-05-11 18:12:55 +090086 private static final String CNI_ZERO = "cni0";
Jian Lib6dc08f2021-03-24 15:24:18 +090087 private static final String NETWORK_SUFFIX = "-net";
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected CoreService coreService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected MastershipService mastershipService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected ClusterService clusterService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected LeadershipService leadershipService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected KubevirtNodeService nodeService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected KubevirtNetworkAdminService networkAdminService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected KubevirtPortAdminService portAdminService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected KubevirtPodService podService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
114 protected KubevirtApiConfigService configService;
115
116 private final ExecutorService eventExecutor = newSingleThreadExecutor(
117 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
118
119 private final InternalKubevirtVmWatcher watcher = new InternalKubevirtVmWatcher();
120 private final InternalKubevirtApiConfigListener
121 configListener = new InternalKubevirtApiConfigListener();
122
123 CustomResourceDefinitionContext vmCrdCxt = new CustomResourceDefinitionContext
124 .Builder()
125 .withGroup("kubevirt.io")
126 .withScope("Namespaced")
127 .withVersion("v1")
128 .withPlural("virtualmachines")
129 .build();
130
131 private ApplicationId appId;
132 private NodeId localNodeId;
133
134 @Activate
135 protected void activate() {
136 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
137 localNodeId = clusterService.getLocalNode().id();
138 leadershipService.runForLeadership(appId.name());
139 configService.addListener(configListener);
140
141 log.info("Started");
142 }
143
144 @Deactivate
145 protected void deactivate() {
146 configService.removeListener(configListener);
147 leadershipService.withdraw(appId.name());
148 eventExecutor.shutdown();
149
150 log.info("Stopped");
151 }
152
153 private void instantiateWatcher() {
154 KubernetesClient client = k8sClient(configService);
155
156 if (client != null) {
157 try {
158 client.customResource(vmCrdCxt).watch(watcher);
159 } catch (IOException e) {
160 e.printStackTrace();
161 }
162 }
163 }
164
165 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
166
167 private boolean isRelevantHelper() {
168 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
169 }
170
171 @Override
172 public void event(KubevirtApiConfigEvent event) {
173
174 switch (event.type()) {
175 case KUBEVIRT_API_CONFIG_UPDATED:
176 eventExecutor.execute(this::processConfigUpdate);
177 break;
178 case KUBEVIRT_API_CONFIG_CREATED:
179 case KUBEVIRT_API_CONFIG_REMOVED:
180 default:
181 // do nothing
182 break;
183 }
184 }
185
186 private void processConfigUpdate() {
187 if (!isRelevantHelper()) {
188 return;
189 }
190
191 instantiateWatcher();
192 }
193 }
194
195 private class InternalKubevirtVmWatcher implements Watcher<String> {
196
197 @Override
198 public void eventReceived(Action action, String resource) {
199 switch (action) {
200 case ADDED:
201 eventExecutor.execute(() -> processAddition(resource));
202 break;
203 case DELETED:
204 eventExecutor.execute(() -> processDeletion(resource));
205 break;
Jian Li8f944d42021-03-23 00:43:29 +0900206 case MODIFIED:
207 eventExecutor.execute(() -> processModification(resource));
208 break;
Jian Lib6dc08f2021-03-24 15:24:18 +0900209 case ERROR:
210 log.warn("Failures processing VM manipulation.");
211 break;
212 default:
213 break;
214 }
215 }
216
217 @Override
218 public void onClose(WatcherException e) {
219 log.warn("VM watcher OnClose, re-instantiate the VM watcher...");
220 instantiateWatcher();
221 }
222
223 private void processAddition(String resource) {
224 if (!isMaster()) {
225 return;
226 }
227
Jian Li9557e902021-06-08 10:12:52 +0900228 String vmName = parseVmName(resource);
229
Jian Lib6dc08f2021-03-24 15:24:18 +0900230 parseMacAddresses(resource).forEach((mac, net) -> {
231 KubevirtPort port = DefaultKubevirtPort.builder()
Jian Li9557e902021-06-08 10:12:52 +0900232 .vmName(vmName)
Jian Lib6dc08f2021-03-24 15:24:18 +0900233 .macAddress(mac)
234 .networkId(net)
235 .build();
236
Jian Li8f944d42021-03-23 00:43:29 +0900237 Set<String> sgs = parseSecurityGroups(resource);
238 port = port.updateSecurityGroups(sgs);
239
Jian Lib6dc08f2021-03-24 15:24:18 +0900240 Map<String, IpAddress> ips = parseIpAddresses(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900241 IpAddress ip = ips.get(port.networkId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900242
Jian Li7388cdc2021-05-16 16:41:13 +0900243 port = port.updateIpAddress(ip);
244
Jian Liea1ead72021-05-28 11:00:07 +0900245 if (portAdminService.port(port.macAddress()) == null) {
246 portAdminService.createPort(port);
247 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900248 });
249 }
250
Jian Li8f944d42021-03-23 00:43:29 +0900251 private void processModification(String resource) {
252 if (!isMaster()) {
253 return;
254 }
255
Jian Li9557e902021-06-08 10:12:52 +0900256 String vmName = parseVmName(resource);
257
Jian Li8f944d42021-03-23 00:43:29 +0900258 parseMacAddresses(resource).forEach((mac, net) -> {
259 KubevirtPort port = DefaultKubevirtPort.builder()
Jian Li9557e902021-06-08 10:12:52 +0900260 .vmName(vmName)
Jian Li8f944d42021-03-23 00:43:29 +0900261 .macAddress(mac)
262 .networkId(net)
263 .build();
264
265 KubevirtPort existing = portAdminService.port(port.macAddress());
Jian Li8f944d42021-03-23 00:43:29 +0900266 Set<String> sgs = parseSecurityGroups(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900267
Jian Li14968b52022-03-16 11:46:45 +0900268 if (existing == null) {
269 // if the network related information is filled with VM update event,
270 // and there is no port found in the store
271 // we try to add port by extracting network related info from VM
272 port = port.updateSecurityGroups(sgs);
273 Map<String, IpAddress> ips = parseIpAddresses(resource);
274 IpAddress ip = ips.get(port.networkId());
275 port = port.updateIpAddress(ip);
276 portAdminService.createPort(port);
277 } else {
278 // we only update the port, if the newly updated security groups
279 // have different values compared to existing ones
280 if (!port.securityGroups().equals(sgs)) {
281 portAdminService.updatePort(existing.updateSecurityGroups(sgs));
282 }
Jian Li7388cdc2021-05-16 16:41:13 +0900283 }
Jian Li8f944d42021-03-23 00:43:29 +0900284 });
285 }
286
Jian Lib6dc08f2021-03-24 15:24:18 +0900287 private void processDeletion(String resource) {
288 if (!isMaster()) {
289 return;
290 }
291
292 parseMacAddresses(resource).forEach((mac, net) -> {
293 KubevirtPort port = portAdminService.port(mac);
294 if (port != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900295 portAdminService.removePort(mac);
296 }
297 });
298 }
299
300 private boolean isMaster() {
301 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
302 }
303
Jian Li9557e902021-06-08 10:12:52 +0900304 private String parseVmName(String resource) {
305 String vmName = null;
306 try {
307 ObjectMapper mapper = new ObjectMapper();
308 JsonNode json = mapper.readTree(resource);
309 JsonNode nameJson = json.get(METADATA).get(NAME);
310 if (nameJson != null) {
311 vmName = nameJson.asText();
312 }
313 } catch (IOException e) {
314 log.error("Failed to parse kubevirt VM name");
315 }
316
317 return vmName;
318 }
319
Jian Lib6dc08f2021-03-24 15:24:18 +0900320 private Map<String, IpAddress> parseIpAddresses(String resource) {
321 try {
322 ObjectMapper mapper = new ObjectMapper();
323 JsonNode json = mapper.readTree(resource);
324 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
325
326 JsonNode annots = metadata.get(ANNOTATIONS);
327 if (annots == null) {
328 return new HashMap<>();
329 }
330
331 JsonNode interfacesJson = annots.get(INTERFACES);
332 if (interfacesJson == null) {
333 return new HashMap<>();
334 }
335
336 Map<String, IpAddress> result = new HashMap<>();
337
338 String interfacesString = interfacesJson.asText();
339 ArrayNode interfaces = (ArrayNode) mapper.readTree(interfacesString);
340 for (JsonNode intf : interfaces) {
341 String network = intf.get(NETWORK).asText();
342 String ip = intf.get(IP).asText();
343 result.put(network, IpAddress.valueOf(ip));
344 }
345
346 return result;
347 } catch (IOException e) {
348 log.error("Failed to parse kubevirt VM IP addresses");
349 }
350
351 return new HashMap<>();
352 }
353
Jian Li8f944d42021-03-23 00:43:29 +0900354 private Set<String> parseSecurityGroups(String resource) {
355 try {
356 ObjectMapper mapper = new ObjectMapper();
357 JsonNode json = mapper.readTree(resource);
358 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
359
360 JsonNode annots = metadata.get(ANNOTATIONS);
361 if (annots == null) {
362 return new HashSet<>();
363 }
364
365 JsonNode sgsJson = annots.get(SECURITY_GROUPS);
366 if (sgsJson == null) {
367 return new HashSet<>();
368 }
369
370 Set<String> result = new HashSet<>();
371 ArrayNode sgs = (ArrayNode) mapper.readTree(sgsJson.asText());
372 for (JsonNode sg : sgs) {
373 result.add(sg.asText());
374 }
375
376 return result;
377
378 } catch (IOException e) {
379 log.error("Failed to parse kubevirt security group IDs.");
380 }
381
382 return new HashSet<>();
383 }
384
Jian Lib6dc08f2021-03-24 15:24:18 +0900385 private Map<MacAddress, String> parseMacAddresses(String resource) {
386 try {
387 ObjectMapper mapper = new ObjectMapper();
388 JsonNode json = mapper.readTree(resource);
389 JsonNode spec = json.get(SPEC).get(TEMPLATE).get(SPEC);
390 ArrayNode interfaces = (ArrayNode) spec.get(DOMAIN).get(DEVICES).get(INTERFACES);
391
Jian Li8944d7c2022-03-03 18:42:49 +0900392 // if the VM is not associated with any network, we skip parsing MAC address
393 if (interfaces == null) {
394 return ImmutableMap.of();
395 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900396 Map<MacAddress, String> result = new HashMap<>();
397 for (JsonNode intf : interfaces) {
398 String network = intf.get(NAME).asText();
399 JsonNode macJson = intf.get(MAC);
400
Jian Li46592cf2021-05-11 18:12:55 +0900401 if (!DEFAULT.equals(network) && !CNI_ZERO.equals(network) && macJson != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900402 String compact = StringUtils.substringBeforeLast(network, NETWORK_SUFFIX);
403 MacAddress mac = MacAddress.valueOf(macJson.asText());
404 result.put(mac, compact);
405 }
406 }
407
408 return result;
409 } catch (IOException e) {
410 log.error("Failed to parse kubevirt VM MAC addresses");
411 }
412
413 return new HashMap<>();
414 }
415 }
416}