blob: fc3baabdc044b17e06963d5de2a4af17cb75d6e3 [file] [log] [blame]
Jian Lib6dc08f2021-03-24 15:24:18 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ArrayNode;
Jian Lib6dc08f2021-03-24 15:24:18 +090021import io.fabric8.kubernetes.api.model.Pod;
22import io.fabric8.kubernetes.client.KubernetesClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090023import io.fabric8.kubernetes.client.Watcher;
24import io.fabric8.kubernetes.client.WatcherException;
25import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
26import org.apache.commons.lang3.StringUtils;
27import org.onlab.packet.IpAddress;
28import org.onlab.packet.MacAddress;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.LeadershipService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.core.ApplicationId;
33import org.onosproject.core.CoreService;
34import org.onosproject.kubevirtnetworking.api.DefaultKubevirtPort;
Jian Lib6dc08f2021-03-24 15:24:18 +090035import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
36import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
37import org.onosproject.kubevirtnetworking.api.KubevirtPort;
38import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
39import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
40import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
41import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
42import org.onosproject.kubevirtnode.api.KubevirtNodeService;
43import org.onosproject.mastership.MastershipService;
44import org.onosproject.net.DeviceId;
45import org.osgi.service.component.annotations.Activate;
46import org.osgi.service.component.annotations.Component;
47import org.osgi.service.component.annotations.Deactivate;
48import org.osgi.service.component.annotations.Reference;
49import org.osgi.service.component.annotations.ReferenceCardinality;
50import org.slf4j.Logger;
51
52import java.io.IOException;
53import java.util.HashMap;
54import java.util.HashSet;
55import java.util.Map;
56import java.util.Objects;
57import java.util.Set;
58import java.util.concurrent.ExecutorService;
59import java.util.stream.Collectors;
60
Jian Lib6dc08f2021-03-24 15:24:18 +090061import static java.util.concurrent.Executors.newSingleThreadExecutor;
62import static org.onlab.util.Tools.groupedThreads;
63import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
64import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
65import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090066import static org.slf4j.LoggerFactory.getLogger;
67
68/**
69 * Kubernetes VM watcher used for feeding VM information.
70 */
71@Component(immediate = true)
72public class KubevirtVmWatcher {
73
74 private final Logger log = getLogger(getClass());
75
Jian Lib6dc08f2021-03-24 15:24:18 +090076 private static final String SPEC = "spec";
77 private static final String TEMPLATE = "template";
78 private static final String METADATA = "metadata";
79 private static final String ANNOTATIONS = "annotations";
80 private static final String DOMAIN = "domain";
81 private static final String DEVICES = "devices";
82 private static final String INTERFACES = "interfaces";
Jian Li8f944d42021-03-23 00:43:29 +090083 private static final String SECURITY_GROUPS = "securityGroups";
Jian Lib6dc08f2021-03-24 15:24:18 +090084 private static final String NAME = "name";
85 private static final String NETWORK = "network";
86 private static final String MAC = "macAddress";
87 private static final String IP = "ipAddress";
88 private static final String DEFAULT = "default";
Jian Li46592cf2021-05-11 18:12:55 +090089 private static final String CNI_ZERO = "cni0";
Jian Lib6dc08f2021-03-24 15:24:18 +090090 private static final String NETWORK_SUFFIX = "-net";
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected CoreService coreService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected MastershipService mastershipService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected ClusterService clusterService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected LeadershipService leadershipService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected KubevirtNodeService nodeService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected KubevirtNetworkAdminService networkAdminService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected KubevirtPortAdminService portAdminService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
114 protected KubevirtPodService podService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
117 protected KubevirtApiConfigService configService;
118
119 private final ExecutorService eventExecutor = newSingleThreadExecutor(
120 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
121
122 private final InternalKubevirtVmWatcher watcher = new InternalKubevirtVmWatcher();
123 private final InternalKubevirtApiConfigListener
124 configListener = new InternalKubevirtApiConfigListener();
125
126 CustomResourceDefinitionContext vmCrdCxt = new CustomResourceDefinitionContext
127 .Builder()
128 .withGroup("kubevirt.io")
129 .withScope("Namespaced")
130 .withVersion("v1")
131 .withPlural("virtualmachines")
132 .build();
133
134 private ApplicationId appId;
135 private NodeId localNodeId;
136
137 @Activate
138 protected void activate() {
139 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
140 localNodeId = clusterService.getLocalNode().id();
141 leadershipService.runForLeadership(appId.name());
142 configService.addListener(configListener);
143
144 log.info("Started");
145 }
146
147 @Deactivate
148 protected void deactivate() {
149 configService.removeListener(configListener);
150 leadershipService.withdraw(appId.name());
151 eventExecutor.shutdown();
152
153 log.info("Stopped");
154 }
155
156 private void instantiateWatcher() {
157 KubernetesClient client = k8sClient(configService);
158
159 if (client != null) {
160 try {
161 client.customResource(vmCrdCxt).watch(watcher);
162 } catch (IOException e) {
163 e.printStackTrace();
164 }
165 }
166 }
167
168 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
169
170 private boolean isRelevantHelper() {
171 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
172 }
173
174 @Override
175 public void event(KubevirtApiConfigEvent event) {
176
177 switch (event.type()) {
178 case KUBEVIRT_API_CONFIG_UPDATED:
179 eventExecutor.execute(this::processConfigUpdate);
180 break;
181 case KUBEVIRT_API_CONFIG_CREATED:
182 case KUBEVIRT_API_CONFIG_REMOVED:
183 default:
184 // do nothing
185 break;
186 }
187 }
188
189 private void processConfigUpdate() {
190 if (!isRelevantHelper()) {
191 return;
192 }
193
194 instantiateWatcher();
195 }
196 }
197
198 private class InternalKubevirtVmWatcher implements Watcher<String> {
199
200 @Override
201 public void eventReceived(Action action, String resource) {
202 switch (action) {
203 case ADDED:
204 eventExecutor.execute(() -> processAddition(resource));
205 break;
206 case DELETED:
207 eventExecutor.execute(() -> processDeletion(resource));
208 break;
Jian Li8f944d42021-03-23 00:43:29 +0900209 case MODIFIED:
210 eventExecutor.execute(() -> processModification(resource));
211 break;
Jian Lib6dc08f2021-03-24 15:24:18 +0900212 case ERROR:
213 log.warn("Failures processing VM manipulation.");
214 break;
215 default:
216 break;
217 }
218 }
219
220 @Override
221 public void onClose(WatcherException e) {
222 log.warn("VM watcher OnClose, re-instantiate the VM watcher...");
223 instantiateWatcher();
224 }
225
226 private void processAddition(String resource) {
227 if (!isMaster()) {
228 return;
229 }
230
231 parseMacAddresses(resource).forEach((mac, net) -> {
232 KubevirtPort port = DefaultKubevirtPort.builder()
233 .macAddress(mac)
234 .networkId(net)
235 .build();
236
Jian Li8f944d42021-03-23 00:43:29 +0900237 Set<String> sgs = parseSecurityGroups(resource);
238 port = port.updateSecurityGroups(sgs);
239
Jian Lib6dc08f2021-03-24 15:24:18 +0900240 Map<String, IpAddress> ips = parseIpAddresses(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900241 IpAddress ip = ips.get(port.networkId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900242
Jian Li7388cdc2021-05-16 16:41:13 +0900243 port = port.updateIpAddress(ip);
244
245 DeviceId deviceId = getDeviceId(podService.pods(), port);
246
247 if (deviceId != null) {
248 port = port.updateDeviceId(deviceId);
Jian Lib6dc08f2021-03-24 15:24:18 +0900249 }
250
Jian Liea1ead72021-05-28 11:00:07 +0900251 if (portAdminService.port(port.macAddress()) == null) {
252 portAdminService.createPort(port);
253 }
Jian Lib6dc08f2021-03-24 15:24:18 +0900254 });
255 }
256
Jian Li8f944d42021-03-23 00:43:29 +0900257 private void processModification(String resource) {
258 if (!isMaster()) {
259 return;
260 }
261
262 parseMacAddresses(resource).forEach((mac, net) -> {
263 KubevirtPort port = DefaultKubevirtPort.builder()
264 .macAddress(mac)
265 .networkId(net)
266 .build();
267
268 KubevirtPort existing = portAdminService.port(port.macAddress());
269
270 if (existing == null) {
271 return;
272 }
273
274 Set<String> sgs = parseSecurityGroups(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900275
276 // we only update the port, if the newly updated security groups
277 // have different values compared to existing ones
278 if (!port.securityGroups().equals(sgs)) {
279 portAdminService.updatePort(existing.updateSecurityGroups(sgs));
280 }
Jian Li8f944d42021-03-23 00:43:29 +0900281 });
282 }
283
Jian Lib6dc08f2021-03-24 15:24:18 +0900284 private void processDeletion(String resource) {
285 if (!isMaster()) {
286 return;
287 }
288
289 parseMacAddresses(resource).forEach((mac, net) -> {
290 KubevirtPort port = portAdminService.port(mac);
291 if (port != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900292 portAdminService.removePort(mac);
293 }
294 });
295 }
296
297 private boolean isMaster() {
298 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
299 }
300
301 // FIXME: to obtains the device ID, we have to search through
302 // existing POD inventory, need to find a better wat to obtain device ID
303 private DeviceId getDeviceId(Set<Pod> pods, KubevirtPort port) {
304 Set<Pod> defaultPods = pods.stream()
305 .filter(pod -> pod.getMetadata().getNamespace().equals(DEFAULT))
306 .collect(Collectors.toSet());
307
308 Set<KubevirtPort> allPorts = new HashSet<>();
309 for (Pod pod : defaultPods) {
310 allPorts.addAll(getPorts(nodeService, networkAdminService.networks(), pod));
311 }
312
313 return allPorts.stream().filter(p -> p.macAddress().equals(port.macAddress()))
314 .map(KubevirtPort::deviceId).findFirst().orElse(null);
315 }
316
317 private Map<String, IpAddress> parseIpAddresses(String resource) {
318 try {
319 ObjectMapper mapper = new ObjectMapper();
320 JsonNode json = mapper.readTree(resource);
321 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
322
323 JsonNode annots = metadata.get(ANNOTATIONS);
324 if (annots == null) {
325 return new HashMap<>();
326 }
327
328 JsonNode interfacesJson = annots.get(INTERFACES);
329 if (interfacesJson == null) {
330 return new HashMap<>();
331 }
332
333 Map<String, IpAddress> result = new HashMap<>();
334
335 String interfacesString = interfacesJson.asText();
336 ArrayNode interfaces = (ArrayNode) mapper.readTree(interfacesString);
337 for (JsonNode intf : interfaces) {
338 String network = intf.get(NETWORK).asText();
339 String ip = intf.get(IP).asText();
340 result.put(network, IpAddress.valueOf(ip));
341 }
342
343 return result;
344 } catch (IOException e) {
345 log.error("Failed to parse kubevirt VM IP addresses");
346 }
347
348 return new HashMap<>();
349 }
350
Jian Li8f944d42021-03-23 00:43:29 +0900351 private Set<String> parseSecurityGroups(String resource) {
352 try {
353 ObjectMapper mapper = new ObjectMapper();
354 JsonNode json = mapper.readTree(resource);
355 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
356
357 JsonNode annots = metadata.get(ANNOTATIONS);
358 if (annots == null) {
359 return new HashSet<>();
360 }
361
362 JsonNode sgsJson = annots.get(SECURITY_GROUPS);
363 if (sgsJson == null) {
364 return new HashSet<>();
365 }
366
367 Set<String> result = new HashSet<>();
368 ArrayNode sgs = (ArrayNode) mapper.readTree(sgsJson.asText());
369 for (JsonNode sg : sgs) {
370 result.add(sg.asText());
371 }
372
373 return result;
374
375 } catch (IOException e) {
376 log.error("Failed to parse kubevirt security group IDs.");
377 }
378
379 return new HashSet<>();
380 }
381
Jian Lib6dc08f2021-03-24 15:24:18 +0900382 private Map<MacAddress, String> parseMacAddresses(String resource) {
383 try {
384 ObjectMapper mapper = new ObjectMapper();
385 JsonNode json = mapper.readTree(resource);
386 JsonNode spec = json.get(SPEC).get(TEMPLATE).get(SPEC);
387 ArrayNode interfaces = (ArrayNode) spec.get(DOMAIN).get(DEVICES).get(INTERFACES);
388
389 Map<MacAddress, String> result = new HashMap<>();
390 for (JsonNode intf : interfaces) {
391 String network = intf.get(NAME).asText();
392 JsonNode macJson = intf.get(MAC);
393
Jian Li46592cf2021-05-11 18:12:55 +0900394 if (!DEFAULT.equals(network) && !CNI_ZERO.equals(network) && macJson != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900395 String compact = StringUtils.substringBeforeLast(network, NETWORK_SUFFIX);
396 MacAddress mac = MacAddress.valueOf(macJson.asText());
397 result.put(mac, compact);
398 }
399 }
400
401 return result;
402 } catch (IOException e) {
403 log.error("Failed to parse kubevirt VM MAC addresses");
404 }
405
406 return new HashMap<>();
407 }
408 }
409}