blob: 78a7f511b2a5cac395bfacdcc25a49116b5f34b1 [file] [log] [blame]
Jian Lib6dc08f2021-03-24 15:24:18 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ArrayNode;
Jian Lib6dc08f2021-03-24 15:24:18 +090021import io.fabric8.kubernetes.api.model.Pod;
22import io.fabric8.kubernetes.client.KubernetesClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090023import io.fabric8.kubernetes.client.Watcher;
24import io.fabric8.kubernetes.client.WatcherException;
25import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
26import org.apache.commons.lang3.StringUtils;
27import org.onlab.packet.IpAddress;
28import org.onlab.packet.MacAddress;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.LeadershipService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.core.ApplicationId;
33import org.onosproject.core.CoreService;
34import org.onosproject.kubevirtnetworking.api.DefaultKubevirtPort;
Jian Lib6dc08f2021-03-24 15:24:18 +090035import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
36import org.onosproject.kubevirtnetworking.api.KubevirtPodService;
37import org.onosproject.kubevirtnetworking.api.KubevirtPort;
38import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
39import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
40import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
41import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
42import org.onosproject.kubevirtnode.api.KubevirtNodeService;
43import org.onosproject.mastership.MastershipService;
44import org.onosproject.net.DeviceId;
45import org.osgi.service.component.annotations.Activate;
46import org.osgi.service.component.annotations.Component;
47import org.osgi.service.component.annotations.Deactivate;
48import org.osgi.service.component.annotations.Reference;
49import org.osgi.service.component.annotations.ReferenceCardinality;
50import org.slf4j.Logger;
51
52import java.io.IOException;
53import java.util.HashMap;
54import java.util.HashSet;
55import java.util.Map;
56import java.util.Objects;
57import java.util.Set;
58import java.util.concurrent.ExecutorService;
59import java.util.stream.Collectors;
60
Jian Lib6dc08f2021-03-24 15:24:18 +090061import static java.util.concurrent.Executors.newSingleThreadExecutor;
62import static org.onlab.util.Tools.groupedThreads;
63import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
64import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
65import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
Jian Lib6dc08f2021-03-24 15:24:18 +090066import static org.slf4j.LoggerFactory.getLogger;
67
68/**
69 * Kubernetes VM watcher used for feeding VM information.
70 */
71@Component(immediate = true)
72public class KubevirtVmWatcher {
73
74 private final Logger log = getLogger(getClass());
75
76 private static final long SLEEP_MS = 3000; // we wait 3s
77
78 private static final String SPEC = "spec";
79 private static final String TEMPLATE = "template";
80 private static final String METADATA = "metadata";
81 private static final String ANNOTATIONS = "annotations";
82 private static final String DOMAIN = "domain";
83 private static final String DEVICES = "devices";
84 private static final String INTERFACES = "interfaces";
Jian Li8f944d42021-03-23 00:43:29 +090085 private static final String SECURITY_GROUPS = "securityGroups";
Jian Lib6dc08f2021-03-24 15:24:18 +090086 private static final String NAME = "name";
87 private static final String NETWORK = "network";
88 private static final String MAC = "macAddress";
89 private static final String IP = "ipAddress";
90 private static final String DEFAULT = "default";
Jian Li46592cf2021-05-11 18:12:55 +090091 private static final String CNI_ZERO = "cni0";
Jian Lib6dc08f2021-03-24 15:24:18 +090092 private static final String NETWORK_SUFFIX = "-net";
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected CoreService coreService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected MastershipService mastershipService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected ClusterService clusterService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected LeadershipService leadershipService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected KubevirtNodeService nodeService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected KubevirtNetworkAdminService networkAdminService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected KubevirtPortAdminService portAdminService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY)
116 protected KubevirtPodService podService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY)
119 protected KubevirtApiConfigService configService;
120
121 private final ExecutorService eventExecutor = newSingleThreadExecutor(
122 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
123
124 private final InternalKubevirtVmWatcher watcher = new InternalKubevirtVmWatcher();
125 private final InternalKubevirtApiConfigListener
126 configListener = new InternalKubevirtApiConfigListener();
127
128 CustomResourceDefinitionContext vmCrdCxt = new CustomResourceDefinitionContext
129 .Builder()
130 .withGroup("kubevirt.io")
131 .withScope("Namespaced")
132 .withVersion("v1")
133 .withPlural("virtualmachines")
134 .build();
135
136 private ApplicationId appId;
137 private NodeId localNodeId;
138
139 @Activate
140 protected void activate() {
141 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
142 localNodeId = clusterService.getLocalNode().id();
143 leadershipService.runForLeadership(appId.name());
144 configService.addListener(configListener);
145
146 log.info("Started");
147 }
148
149 @Deactivate
150 protected void deactivate() {
151 configService.removeListener(configListener);
152 leadershipService.withdraw(appId.name());
153 eventExecutor.shutdown();
154
155 log.info("Stopped");
156 }
157
158 private void instantiateWatcher() {
159 KubernetesClient client = k8sClient(configService);
160
161 if (client != null) {
162 try {
163 client.customResource(vmCrdCxt).watch(watcher);
164 } catch (IOException e) {
165 e.printStackTrace();
166 }
167 }
168 }
169
170 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
171
172 private boolean isRelevantHelper() {
173 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
174 }
175
176 @Override
177 public void event(KubevirtApiConfigEvent event) {
178
179 switch (event.type()) {
180 case KUBEVIRT_API_CONFIG_UPDATED:
181 eventExecutor.execute(this::processConfigUpdate);
182 break;
183 case KUBEVIRT_API_CONFIG_CREATED:
184 case KUBEVIRT_API_CONFIG_REMOVED:
185 default:
186 // do nothing
187 break;
188 }
189 }
190
191 private void processConfigUpdate() {
192 if (!isRelevantHelper()) {
193 return;
194 }
195
196 instantiateWatcher();
197 }
198 }
199
200 private class InternalKubevirtVmWatcher implements Watcher<String> {
201
202 @Override
203 public void eventReceived(Action action, String resource) {
204 switch (action) {
205 case ADDED:
206 eventExecutor.execute(() -> processAddition(resource));
207 break;
208 case DELETED:
209 eventExecutor.execute(() -> processDeletion(resource));
210 break;
Jian Li8f944d42021-03-23 00:43:29 +0900211 case MODIFIED:
212 eventExecutor.execute(() -> processModification(resource));
213 break;
Jian Lib6dc08f2021-03-24 15:24:18 +0900214 case ERROR:
215 log.warn("Failures processing VM manipulation.");
216 break;
217 default:
218 break;
219 }
220 }
221
222 @Override
223 public void onClose(WatcherException e) {
224 log.warn("VM watcher OnClose, re-instantiate the VM watcher...");
225 instantiateWatcher();
226 }
227
228 private void processAddition(String resource) {
229 if (!isMaster()) {
230 return;
231 }
232
233 parseMacAddresses(resource).forEach((mac, net) -> {
234 KubevirtPort port = DefaultKubevirtPort.builder()
235 .macAddress(mac)
236 .networkId(net)
237 .build();
238
Jian Li8f944d42021-03-23 00:43:29 +0900239 Set<String> sgs = parseSecurityGroups(resource);
240 port = port.updateSecurityGroups(sgs);
241
Jian Lib6dc08f2021-03-24 15:24:18 +0900242 Map<String, IpAddress> ips = parseIpAddresses(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900243 IpAddress ip = ips.get(port.networkId());
Jian Lib6dc08f2021-03-24 15:24:18 +0900244
Jian Li7388cdc2021-05-16 16:41:13 +0900245 port = port.updateIpAddress(ip);
246
247 DeviceId deviceId = getDeviceId(podService.pods(), port);
248
249 if (deviceId != null) {
250 port = port.updateDeviceId(deviceId);
Jian Lib6dc08f2021-03-24 15:24:18 +0900251 }
252
Jian Li7388cdc2021-05-16 16:41:13 +0900253 portAdminService.createPort(port);
Jian Lib6dc08f2021-03-24 15:24:18 +0900254 });
255 }
256
Jian Li8f944d42021-03-23 00:43:29 +0900257 private void processModification(String resource) {
258 if (!isMaster()) {
259 return;
260 }
261
262 parseMacAddresses(resource).forEach((mac, net) -> {
263 KubevirtPort port = DefaultKubevirtPort.builder()
264 .macAddress(mac)
265 .networkId(net)
266 .build();
267
268 KubevirtPort existing = portAdminService.port(port.macAddress());
269
270 if (existing == null) {
271 return;
272 }
273
274 Set<String> sgs = parseSecurityGroups(resource);
Jian Li7388cdc2021-05-16 16:41:13 +0900275
276 // we only update the port, if the newly updated security groups
277 // have different values compared to existing ones
278 if (!port.securityGroups().equals(sgs)) {
279 portAdminService.updatePort(existing.updateSecurityGroups(sgs));
280 }
Jian Li8f944d42021-03-23 00:43:29 +0900281 });
282 }
283
Jian Lib6dc08f2021-03-24 15:24:18 +0900284 private void processDeletion(String resource) {
285 if (!isMaster()) {
286 return;
287 }
288
289 parseMacAddresses(resource).forEach((mac, net) -> {
290 KubevirtPort port = portAdminService.port(mac);
291 if (port != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900292 portAdminService.removePort(mac);
293 }
294 });
295 }
296
297 private boolean isMaster() {
298 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
299 }
300
301 // FIXME: to obtains the device ID, we have to search through
302 // existing POD inventory, need to find a better wat to obtain device ID
303 private DeviceId getDeviceId(Set<Pod> pods, KubevirtPort port) {
304 Set<Pod> defaultPods = pods.stream()
305 .filter(pod -> pod.getMetadata().getNamespace().equals(DEFAULT))
306 .collect(Collectors.toSet());
307
308 Set<KubevirtPort> allPorts = new HashSet<>();
309 for (Pod pod : defaultPods) {
310 allPorts.addAll(getPorts(nodeService, networkAdminService.networks(), pod));
311 }
312
313 return allPorts.stream().filter(p -> p.macAddress().equals(port.macAddress()))
314 .map(KubevirtPort::deviceId).findFirst().orElse(null);
315 }
316
317 private Map<String, IpAddress> parseIpAddresses(String resource) {
318 try {
319 ObjectMapper mapper = new ObjectMapper();
320 JsonNode json = mapper.readTree(resource);
321 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
322
323 JsonNode annots = metadata.get(ANNOTATIONS);
324 if (annots == null) {
325 return new HashMap<>();
326 }
327
328 JsonNode interfacesJson = annots.get(INTERFACES);
329 if (interfacesJson == null) {
330 return new HashMap<>();
331 }
332
333 Map<String, IpAddress> result = new HashMap<>();
334
335 String interfacesString = interfacesJson.asText();
336 ArrayNode interfaces = (ArrayNode) mapper.readTree(interfacesString);
337 for (JsonNode intf : interfaces) {
338 String network = intf.get(NETWORK).asText();
339 String ip = intf.get(IP).asText();
340 result.put(network, IpAddress.valueOf(ip));
341 }
342
343 return result;
344 } catch (IOException e) {
345 log.error("Failed to parse kubevirt VM IP addresses");
346 }
347
348 return new HashMap<>();
349 }
350
Jian Li8f944d42021-03-23 00:43:29 +0900351 private Set<String> parseSecurityGroups(String resource) {
352 try {
353 ObjectMapper mapper = new ObjectMapper();
354 JsonNode json = mapper.readTree(resource);
355 JsonNode metadata = json.get(SPEC).get(TEMPLATE).get(METADATA);
356
357 JsonNode annots = metadata.get(ANNOTATIONS);
358 if (annots == null) {
359 return new HashSet<>();
360 }
361
362 JsonNode sgsJson = annots.get(SECURITY_GROUPS);
363 if (sgsJson == null) {
364 return new HashSet<>();
365 }
366
367 Set<String> result = new HashSet<>();
368 ArrayNode sgs = (ArrayNode) mapper.readTree(sgsJson.asText());
369 for (JsonNode sg : sgs) {
370 result.add(sg.asText());
371 }
372
373 return result;
374
375 } catch (IOException e) {
376 log.error("Failed to parse kubevirt security group IDs.");
377 }
378
379 return new HashSet<>();
380 }
381
Jian Lib6dc08f2021-03-24 15:24:18 +0900382 private Map<MacAddress, String> parseMacAddresses(String resource) {
383 try {
384 ObjectMapper mapper = new ObjectMapper();
385 JsonNode json = mapper.readTree(resource);
386 JsonNode spec = json.get(SPEC).get(TEMPLATE).get(SPEC);
387 ArrayNode interfaces = (ArrayNode) spec.get(DOMAIN).get(DEVICES).get(INTERFACES);
388
389 Map<MacAddress, String> result = new HashMap<>();
390 for (JsonNode intf : interfaces) {
391 String network = intf.get(NAME).asText();
392 JsonNode macJson = intf.get(MAC);
393
Jian Li46592cf2021-05-11 18:12:55 +0900394 if (!DEFAULT.equals(network) && !CNI_ZERO.equals(network) && macJson != null) {
Jian Lib6dc08f2021-03-24 15:24:18 +0900395 String compact = StringUtils.substringBeforeLast(network, NETWORK_SUFFIX);
396 MacAddress mac = MacAddress.valueOf(macJson.asText());
397 result.put(mac, compact);
398 }
399 }
400
401 return result;
402 } catch (IOException e) {
403 log.error("Failed to parse kubevirt VM MAC addresses");
404 }
405
406 return new HashMap<>();
407 }
408 }
409}