blob: 43d3e64044696080f2a134e3061cf04af7ad1d32 [file] [log] [blame]
Jian Lib6dc08f2021-03-24 15:24:18 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ArrayNode;
Jian Lib6dc08f2021-03-24 15:24:18 +090021import io.fabric8.kubernetes.api.model.Pod;
22import io.fabric8.kubernetes.client.KubernetesClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090023import io.fabric8.kubernetes.client.Watcher;
24import io.fabric8.kubernetes.client.WatcherException;
25import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
26import org.apache.commons.lang3.StringUtils;
27import org.onlab.packet.IpAddress;
28import org.onlab.packet.MacAddress;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.LeadershipService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.core.ApplicationId;
33import org.onosproject.core.CoreService;
34import org.onosproject.kubevirtnetworking.api.DefaultKubevirtPort;
Jian Lib6dc08f2021-03-24 15:24:18 +090035import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
36import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
37import org.onosproject.kubevirtnetworking.api.KubevirtPort;
38import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
39import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
40import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
41import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
42import org.onosproject.kubevirtnode.api.KubevirtNodeService;
43import org.onosproject.mastership.MastershipService;
44import org.onosproject.net.DeviceId;
45import org.osgi.service.component.annotations.Activate;
46import org.osgi.service.component.annotations.Component;
47import org.osgi.service.component.annotations.Deactivate;
48import org.osgi.service.component.annotations.Reference;
49import org.osgi.service.component.annotations.ReferenceCardinality;
50import org.slf4j.Logger;
51
52import java.io.IOException;
53import java.util.HashMap;
54import java.util.HashSet;
55import java.util.Map;
56import java.util.Objects;
57import java.util.Set;
58import java.util.concurrent.ExecutorService;
59import java.util.stream.Collectors;
60
Jian Lib6dc08f2021-03-24 15:24:18 +090061import static java.util.concurrent.Executors.newSingleThreadExecutor;
62import static org.onlab.util.Tools.groupedThreads;
63import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
64import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
65import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090066import static org.slf4j.LoggerFactory.getLogger;
67
68/**
69 * Kubernetes VM watcher used for feeding VM information.
70 */
71@Component(immediate = true)
72public class KubevirtVmWatcher {
73
74 private final Logger log = getLogger(getClass());
75
Jian Lib6dc08f2021-03-24 15:24:18 +090076 private static final String SPEC = "spec";
77 private static final String TEMPLATE = "template";
78 private static final String METADATA = "metadata";
79 private static final String ANNOTATIONS = "annotations";
80 private static final String DOMAIN = "domain";
81 private static final String DEVICES = "devices";
82 private static final String INTERFACES = "interfaces";
Jian Li8f944d42021-03-23 00:43:29 +090083 private static final String SECURITY_GROUPS = "securityGroups";
Jian Lib6dc08f2021-03-24 15:24:18 +090084 private static final String NAME = "name";
85 private static final String NETWORK = "network";
86 private static final String MAC = "macAddress";
87 private static final String IP = "ipAddress";
88 private static final String DEFAULT = "default";
Jian Li46592cf2021-05-11 18:12:55 +090089 private static final String CNI_ZERO = "cni0";
Jian Lib6dc08f2021-03-24 15:24:18 +090090 private static final String NETWORK_SUFFIX = "-net";
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected CoreService coreService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected MastershipService mastershipService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected ClusterService clusterService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected LeadershipService leadershipService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected KubevirtNodeService nodeService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected KubevirtNetworkAdminService networkAdminService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected KubevirtPortAdminService portAdminService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
114 protected KubevirtPodService podService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
117 protected KubevirtApiConfigService configService;
118
119 private final ExecutorService eventExecutor = newSingleThreadExecutor(
120 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
121
122 private final InternalKubevirtVmWatcher watcher = new InternalKubevirtVmWatcher();
123 private final InternalKubevirtApiConfigListener
124 configListener = new InternalKubevirtApiConfigListener();
125
126 CustomResourceDefinitionContext vmCrdCxt = new CustomResourceDefinitionContext
127 .Builder()
128 .withGroup("kubevirt.io")
129 .withScope("Namespaced")
130 .withVersion("v1")
131 .withPlural("virtualmachines")
132 .build();
133
134 private ApplicationId appId;
135 private NodeId localNodeId;
136
137 @Activate
138 protected void activate() {
139 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
140 localNodeId = clusterService.getLocalNode().id();
141 leadershipService.runForLeadership(appId.name());
142 configService.addListener(configListener);
143
144 log.info("Started");
145 }
146
147 @Deactivate
148 protected void deactivate() {
149 configService.removeListener(configListener);
150 leadershipService.withdraw(appId.name());
151 eventExecutor.shutdown();
152
153 log.info("Stopped");
154 }
155
156 private void instantiateWatcher() {
157 KubernetesClient client = k8sClient(configService);
158
159 if (client != null) {
160 try {
161 client.customResource(vmCrdCxt).watch(watcher);
162 } catch (IOException e) {
163 e.printStackTrace();
164 }
165 }
166 }
167
168 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
169
170 private boolean isRelevantHelper() {
171 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
172 }
173
174 @Override
175 public void event(KubevirtApiConfigEvent event) {
176
177 switch (event.type()) {
178 case KUBEVIRT_API_CONFIG_UPDATED:
179 eventExecutor.execute(this::processConfigUpdate);
180 break;
181 case KUBEVIRT_API_CONFIG_CREATED:
182 case KUBEVIRT_API_CONFIG_REMOVED:
183 default:
184 // do nothing
185 break;
186 }
187 }
188
189 private void processConfigUpdate() {
190 if (!isRelevantHelper()) {
191 return;
192 }
193
194 instantiateWatcher();
195 }
196 }
197
198 private class InternalKubevirtVmWatcher implements Watcher<String> {
199
200 @Override
201 public void eventReceived(Action action, String resource) {
202 switch (action) {
203 case ADDED:
204 eventExecutor.execute(() -> processAddition(resource));
205 break;
206 case DELETED:
207 eventExecutor.execute(() -> processDeletion(resource));
208 break;
Jian Li8f944d42021-03-23 00:43:29 +0900209 case MODIFIED:
210 eventExecutor.execute(() -> processModification(resource));
211 break;
Jian Lib6dc08f2021-03-24 15:24:18 +0900212 case ERROR:
213 log.warn("Failures processing VM manipulation.");
214 break;
215 default:
216 break;
217 }
218 }
219
220 @Override
221 public void onClose(WatcherException e) {
222 log.warn("VM watcher OnClose, re-instantiate the VM watcher...");
223 instantiateWatcher();
224 }
225
226 private void processAddition(String resource) {
227 if (!isMaster()) {
228 return;
229 }
230
231 parseMacAddresses(resource).forEach((mac, net) -> {
232 KubevirtPort port = DefaultKubevirtPort.builder()
233 .macAddress(mac)
234 .networkId(net)
235 .build();
236
Jian Li8f944d42021-03-23 00:43:29 +0900237 Set<String> sgs = parseSecurityGroups(resource);
238 port = port.updateSecurityGroups(sgs);
239
Jian Lib6dc08f2021-03-24 15:24:18 +0900240 Map<String, IpAddress> ips = parseIpAddresses(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900241 IpAddress ip = ips.get(port.networkId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900242
Jian Li7388cdc2021-05-16 16:41:13 +0900243 port = port.updateIpAddress(ip);
244
245 DeviceId deviceId = getDeviceId(podService.pods(), port);
246
247 if (deviceId != null) {
248 port = port.updateDeviceId(deviceId);
Jian Lib6dc08f2021-03-24 15:24:18 +0900249 }
250
Jian Li7388cdc2021-05-16 16:41:13 +0900251 portAdminService.createPort(port);
Jian Lib6dc08f2021-03-24 15:24:18 +0900252 });
253 }
254
Jian Li8f944d42021-03-23 00:43:29 +0900255 private void processModification(String resource) {
256 if (!isMaster()) {
257 return;
258 }
259
260 parseMacAddresses(resource).forEach((mac, net) -> {
261 KubevirtPort port = DefaultKubevirtPort.builder()
262 .macAddress(mac)
263 .networkId(net)
264 .build();
265
266 KubevirtPort existing = portAdminService.port(port.macAddress());
267
268 if (existing == null) {
269 return;
270 }
271
272 Set<String> sgs = parseSecurityGroups(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900273
274 // we only update the port, if the newly updated security groups
275 // have different values compared to existing ones
276 if (!port.securityGroups().equals(sgs)) {
277 portAdminService.updatePort(existing.updateSecurityGroups(sgs));
278 }
Jian Li8f944d42021-03-23 00:43:29 +0900279 });
280 }
281
Jian Lib6dc08f2021-03-24 15:24:18 +0900282 private void processDeletion(String resource) {
283 if (!isMaster()) {
284 return;
285 }
286
287 parseMacAddresses(resource).forEach((mac, net) -> {
288 KubevirtPort port = portAdminService.port(mac);
289 if (port != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900290 portAdminService.removePort(mac);
291 }
292 });
293 }
294
295 private boolean isMaster() {
296 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
297 }
298
299 // FIXME: to obtains the device ID, we have to search through
300 // existing POD inventory, need to find a better wat to obtain device ID
301 private DeviceId getDeviceId(Set<Pod> pods, KubevirtPort port) {
302 Set<Pod> defaultPods = pods.stream()
303 .filter(pod -> pod.getMetadata().getNamespace().equals(DEFAULT))
304 .collect(Collectors.toSet());
305
306 Set<KubevirtPort> allPorts = new HashSet<>();
307 for (Pod pod : defaultPods) {
308 allPorts.addAll(getPorts(nodeService, networkAdminService.networks(), pod));
309 }
310
311 return allPorts.stream().filter(p -> p.macAddress().equals(port.macAddress()))
312 .map(KubevirtPort::deviceId).findFirst().orElse(null);
313 }
314
315 private Map<String, IpAddress> parseIpAddresses(String resource) {
316 try {
317 ObjectMapper mapper = new ObjectMapper();
318 JsonNode json = mapper.readTree(resource);
319 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
320
321 JsonNode annots = metadata.get(ANNOTATIONS);
322 if (annots == null) {
323 return new HashMap<>();
324 }
325
326 JsonNode interfacesJson = annots.get(INTERFACES);
327 if (interfacesJson == null) {
328 return new HashMap<>();
329 }
330
331 Map<String, IpAddress> result = new HashMap<>();
332
333 String interfacesString = interfacesJson.asText();
334 ArrayNode interfaces = (ArrayNode) mapper.readTree(interfacesString);
335 for (JsonNode intf : interfaces) {
336 String network = intf.get(NETWORK).asText();
337 String ip = intf.get(IP).asText();
338 result.put(network, IpAddress.valueOf(ip));
339 }
340
341 return result;
342 } catch (IOException e) {
343 log.error("Failed to parse kubevirt VM IP addresses");
344 }
345
346 return new HashMap<>();
347 }
348
Jian Li8f944d42021-03-23 00:43:29 +0900349 private Set<String> parseSecurityGroups(String resource) {
350 try {
351 ObjectMapper mapper = new ObjectMapper();
352 JsonNode json = mapper.readTree(resource);
353 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
354
355 JsonNode annots = metadata.get(ANNOTATIONS);
356 if (annots == null) {
357 return new HashSet<>();
358 }
359
360 JsonNode sgsJson = annots.get(SECURITY_GROUPS);
361 if (sgsJson == null) {
362 return new HashSet<>();
363 }
364
365 Set<String> result = new HashSet<>();
366 ArrayNode sgs = (ArrayNode) mapper.readTree(sgsJson.asText());
367 for (JsonNode sg : sgs) {
368 result.add(sg.asText());
369 }
370
371 return result;
372
373 } catch (IOException e) {
374 log.error("Failed to parse kubevirt security group IDs.");
375 }
376
377 return new HashSet<>();
378 }
379
Jian Lib6dc08f2021-03-24 15:24:18 +0900380 private Map<MacAddress, String> parseMacAddresses(String resource) {
381 try {
382 ObjectMapper mapper = new ObjectMapper();
383 JsonNode json = mapper.readTree(resource);
384 JsonNode spec = json.get(SPEC).get(TEMPLATE).get(SPEC);
385 ArrayNode interfaces = (ArrayNode) spec.get(DOMAIN).get(DEVICES).get(INTERFACES);
386
387 Map<MacAddress, String> result = new HashMap<>();
388 for (JsonNode intf : interfaces) {
389 String network = intf.get(NAME).asText();
390 JsonNode macJson = intf.get(MAC);
391
Jian Li46592cf2021-05-11 18:12:55 +0900392 if (!DEFAULT.equals(network) && !CNI_ZERO.equals(network) && macJson != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900393 String compact = StringUtils.substringBeforeLast(network, NETWORK_SUFFIX);
394 MacAddress mac = MacAddress.valueOf(macJson.asText());
395 result.put(mac, compact);
396 }
397 }
398
399 return result;
400 } catch (IOException e) {
401 log.error("Failed to parse kubevirt VM MAC addresses");
402 }
403
404 return new HashMap<>();
405 }
406 }
407}