blob: e48bb18d670c221c8985d7f3d0b91b2f50661b46 [file] [log] [blame]
Jian Lib6dc08f2021-03-24 15:24:18 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ArrayNode;
Jian Lib6dc08f2021-03-24 15:24:18 +090021import io.fabric8.kubernetes.client.KubernetesClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090022import io.fabric8.kubernetes.client.Watcher;
23import io.fabric8.kubernetes.client.WatcherException;
24import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
25import org.apache.commons.lang3.StringUtils;
26import org.onlab.packet.IpAddress;
27import org.onlab.packet.MacAddress;
28import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.LeadershipService;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
33import org.onosproject.kubevirtnetworking.api.DefaultKubevirtPort;
Jian Lib6dc08f2021-03-24 15:24:18 +090034import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
35import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
36import org.onosproject.kubevirtnetworking.api.KubevirtPort;
37import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
38import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
39import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
40import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
41import org.onosproject.kubevirtnode.api.KubevirtNodeService;
42import org.onosproject.mastership.MastershipService;
Jian Lib6dc08f2021-03-24 15:24:18 +090043import org.osgi.service.component.annotations.Activate;
44import org.osgi.service.component.annotations.Component;
45import org.osgi.service.component.annotations.Deactivate;
46import org.osgi.service.component.annotations.Reference;
47import org.osgi.service.component.annotations.ReferenceCardinality;
48import org.slf4j.Logger;
49
50import java.io.IOException;
51import java.util.HashMap;
52import java.util.HashSet;
53import java.util.Map;
54import java.util.Objects;
55import java.util.Set;
56import java.util.concurrent.ExecutorService;
Jian Lib6dc08f2021-03-24 15:24:18 +090057
Jian Lib6dc08f2021-03-24 15:24:18 +090058import static java.util.concurrent.Executors.newSingleThreadExecutor;
59import static org.onlab.util.Tools.groupedThreads;
60import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
Jian Lib6dc08f2021-03-24 15:24:18 +090061import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090062import static org.slf4j.LoggerFactory.getLogger;
63
64/**
65 * Kubernetes VM watcher used for feeding VM information.
66 */
67@Component(immediate = true)
68public class KubevirtVmWatcher {
69
70 private final Logger log = getLogger(getClass());
71
Jian Lib6dc08f2021-03-24 15:24:18 +090072 private static final String SPEC = "spec";
73 private static final String TEMPLATE = "template";
74 private static final String METADATA = "metadata";
75 private static final String ANNOTATIONS = "annotations";
76 private static final String DOMAIN = "domain";
77 private static final String DEVICES = "devices";
78 private static final String INTERFACES = "interfaces";
Jian Li8f944d42021-03-23 00:43:29 +090079 private static final String SECURITY_GROUPS = "securityGroups";
Jian Lib6dc08f2021-03-24 15:24:18 +090080 private static final String NAME = "name";
81 private static final String NETWORK = "network";
82 private static final String MAC = "macAddress";
83 private static final String IP = "ipAddress";
84 private static final String DEFAULT = "default";
Jian Li46592cf2021-05-11 18:12:55 +090085 private static final String CNI_ZERO = "cni0";
Jian Lib6dc08f2021-03-24 15:24:18 +090086 private static final String NETWORK_SUFFIX = "-net";
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected CoreService coreService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected MastershipService mastershipService;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected ClusterService clusterService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected LeadershipService leadershipService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected KubevirtNodeService nodeService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected KubevirtNetworkAdminService networkAdminService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected KubevirtPortAdminService portAdminService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected KubevirtPodService podService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected KubevirtApiConfigService configService;
114
115 private final ExecutorService eventExecutor = newSingleThreadExecutor(
116 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
117
118 private final InternalKubevirtVmWatcher watcher = new InternalKubevirtVmWatcher();
119 private final InternalKubevirtApiConfigListener
120 configListener = new InternalKubevirtApiConfigListener();
121
122 CustomResourceDefinitionContext vmCrdCxt = new CustomResourceDefinitionContext
123 .Builder()
124 .withGroup("kubevirt.io")
125 .withScope("Namespaced")
126 .withVersion("v1")
127 .withPlural("virtualmachines")
128 .build();
129
130 private ApplicationId appId;
131 private NodeId localNodeId;
132
133 @Activate
134 protected void activate() {
135 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
136 localNodeId = clusterService.getLocalNode().id();
137 leadershipService.runForLeadership(appId.name());
138 configService.addListener(configListener);
139
140 log.info("Started");
141 }
142
143 @Deactivate
144 protected void deactivate() {
145 configService.removeListener(configListener);
146 leadershipService.withdraw(appId.name());
147 eventExecutor.shutdown();
148
149 log.info("Stopped");
150 }
151
152 private void instantiateWatcher() {
153 KubernetesClient client = k8sClient(configService);
154
155 if (client != null) {
156 try {
157 client.customResource(vmCrdCxt).watch(watcher);
158 } catch (IOException e) {
159 e.printStackTrace();
160 }
161 }
162 }
163
164 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
165
166 private boolean isRelevantHelper() {
167 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
168 }
169
170 @Override
171 public void event(KubevirtApiConfigEvent event) {
172
173 switch (event.type()) {
174 case KUBEVIRT_API_CONFIG_UPDATED:
175 eventExecutor.execute(this::processConfigUpdate);
176 break;
177 case KUBEVIRT_API_CONFIG_CREATED:
178 case KUBEVIRT_API_CONFIG_REMOVED:
179 default:
180 // do nothing
181 break;
182 }
183 }
184
185 private void processConfigUpdate() {
186 if (!isRelevantHelper()) {
187 return;
188 }
189
190 instantiateWatcher();
191 }
192 }
193
194 private class InternalKubevirtVmWatcher implements Watcher<String> {
195
196 @Override
197 public void eventReceived(Action action, String resource) {
198 switch (action) {
199 case ADDED:
200 eventExecutor.execute(() -> processAddition(resource));
201 break;
202 case DELETED:
203 eventExecutor.execute(() -> processDeletion(resource));
204 break;
Jian Li8f944d42021-03-23 00:43:29 +0900205 case MODIFIED:
206 eventExecutor.execute(() -> processModification(resource));
207 break;
Jian Lib6dc08f2021-03-24 15:24:18 +0900208 case ERROR:
209 log.warn("Failures processing VM manipulation.");
210 break;
211 default:
212 break;
213 }
214 }
215
216 @Override
217 public void onClose(WatcherException e) {
218 log.warn("VM watcher OnClose, re-instantiate the VM watcher...");
219 instantiateWatcher();
220 }
221
222 private void processAddition(String resource) {
223 if (!isMaster()) {
224 return;
225 }
226
Jian Li9557e902021-06-08 10:12:52 +0900227 String vmName = parseVmName(resource);
228
Jian Lib6dc08f2021-03-24 15:24:18 +0900229 parseMacAddresses(resource).forEach((mac, net) -> {
230 KubevirtPort port = DefaultKubevirtPort.builder()
Jian Li9557e902021-06-08 10:12:52 +0900231 .vmName(vmName)
Jian Lib6dc08f2021-03-24 15:24:18 +0900232 .macAddress(mac)
233 .networkId(net)
234 .build();
235
Jian Li8f944d42021-03-23 00:43:29 +0900236 Set<String> sgs = parseSecurityGroups(resource);
237 port = port.updateSecurityGroups(sgs);
238
Jian Lib6dc08f2021-03-24 15:24:18 +0900239 Map<String, IpAddress> ips = parseIpAddresses(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900240 IpAddress ip = ips.get(port.networkId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900241
Jian Li7388cdc2021-05-16 16:41:13 +0900242 port = port.updateIpAddress(ip);
243
Jian Liea1ead72021-05-28 11:00:07 +0900244 if (portAdminService.port(port.macAddress()) == null) {
245 portAdminService.createPort(port);
246 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900247 });
248 }
249
Jian Li8f944d42021-03-23 00:43:29 +0900250 private void processModification(String resource) {
251 if (!isMaster()) {
252 return;
253 }
254
Jian Li9557e902021-06-08 10:12:52 +0900255 String vmName = parseVmName(resource);
256
Jian Li8f944d42021-03-23 00:43:29 +0900257 parseMacAddresses(resource).forEach((mac, net) -> {
258 KubevirtPort port = DefaultKubevirtPort.builder()
Jian Li9557e902021-06-08 10:12:52 +0900259 .vmName(vmName)
Jian Li8f944d42021-03-23 00:43:29 +0900260 .macAddress(mac)
261 .networkId(net)
262 .build();
263
264 KubevirtPort existing = portAdminService.port(port.macAddress());
265
266 if (existing == null) {
267 return;
268 }
269
270 Set<String> sgs = parseSecurityGroups(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900271
272 // we only update the port, if the newly updated security groups
273 // have different values compared to existing ones
274 if (!port.securityGroups().equals(sgs)) {
275 portAdminService.updatePort(existing.updateSecurityGroups(sgs));
276 }
Jian Li8f944d42021-03-23 00:43:29 +0900277 });
278 }
279
Jian Lib6dc08f2021-03-24 15:24:18 +0900280 private void processDeletion(String resource) {
281 if (!isMaster()) {
282 return;
283 }
284
285 parseMacAddresses(resource).forEach((mac, net) -> {
286 KubevirtPort port = portAdminService.port(mac);
287 if (port != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900288 portAdminService.removePort(mac);
289 }
290 });
291 }
292
293 private boolean isMaster() {
294 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
295 }
296
Jian Li9557e902021-06-08 10:12:52 +0900297 private String parseVmName(String resource) {
298 String vmName = null;
299 try {
300 ObjectMapper mapper = new ObjectMapper();
301 JsonNode json = mapper.readTree(resource);
302 JsonNode nameJson = json.get(METADATA).get(NAME);
303 if (nameJson != null) {
304 vmName = nameJson.asText();
305 }
306 } catch (IOException e) {
307 log.error("Failed to parse kubevirt VM name");
308 }
309
310 return vmName;
311 }
312
Jian Lib6dc08f2021-03-24 15:24:18 +0900313 private Map<String, IpAddress> parseIpAddresses(String resource) {
314 try {
315 ObjectMapper mapper = new ObjectMapper();
316 JsonNode json = mapper.readTree(resource);
317 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
318
319 JsonNode annots = metadata.get(ANNOTATIONS);
320 if (annots == null) {
321 return new HashMap<>();
322 }
323
324 JsonNode interfacesJson = annots.get(INTERFACES);
325 if (interfacesJson == null) {
326 return new HashMap<>();
327 }
328
329 Map<String, IpAddress> result = new HashMap<>();
330
331 String interfacesString = interfacesJson.asText();
332 ArrayNode interfaces = (ArrayNode) mapper.readTree(interfacesString);
333 for (JsonNode intf : interfaces) {
334 String network = intf.get(NETWORK).asText();
335 String ip = intf.get(IP).asText();
336 result.put(network, IpAddress.valueOf(ip));
337 }
338
339 return result;
340 } catch (IOException e) {
341 log.error("Failed to parse kubevirt VM IP addresses");
342 }
343
344 return new HashMap<>();
345 }
346
Jian Li8f944d42021-03-23 00:43:29 +0900347 private Set<String> parseSecurityGroups(String resource) {
348 try {
349 ObjectMapper mapper = new ObjectMapper();
350 JsonNode json = mapper.readTree(resource);
351 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
352
353 JsonNode annots = metadata.get(ANNOTATIONS);
354 if (annots == null) {
355 return new HashSet<>();
356 }
357
358 JsonNode sgsJson = annots.get(SECURITY_GROUPS);
359 if (sgsJson == null) {
360 return new HashSet<>();
361 }
362
363 Set<String> result = new HashSet<>();
364 ArrayNode sgs = (ArrayNode) mapper.readTree(sgsJson.asText());
365 for (JsonNode sg : sgs) {
366 result.add(sg.asText());
367 }
368
369 return result;
370
371 } catch (IOException e) {
372 log.error("Failed to parse kubevirt security group IDs.");
373 }
374
375 return new HashSet<>();
376 }
377
Jian Lib6dc08f2021-03-24 15:24:18 +0900378 private Map<MacAddress, String> parseMacAddresses(String resource) {
379 try {
380 ObjectMapper mapper = new ObjectMapper();
381 JsonNode json = mapper.readTree(resource);
382 JsonNode spec = json.get(SPEC).get(TEMPLATE).get(SPEC);
383 ArrayNode interfaces = (ArrayNode) spec.get(DOMAIN).get(DEVICES).get(INTERFACES);
384
385 Map<MacAddress, String> result = new HashMap<>();
386 for (JsonNode intf : interfaces) {
387 String network = intf.get(NAME).asText();
388 JsonNode macJson = intf.get(MAC);
389
Jian Li46592cf2021-05-11 18:12:55 +0900390 if (!DEFAULT.equals(network) && !CNI_ZERO.equals(network) && macJson != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900391 String compact = StringUtils.substringBeforeLast(network, NETWORK_SUFFIX);
392 MacAddress mac = MacAddress.valueOf(macJson.asText());
393 result.put(mac, compact);
394 }
395 }
396
397 return result;
398 } catch (IOException e) {
399 log.error("Failed to parse kubevirt VM MAC addresses");
400 }
401
402 return new HashMap<>();
403 }
404 }
405}