blob: 9675ca29e50814beffa9390ab048291556f5d6e2 [file] [log] [blame]
Jian Lica20b712021-01-18 00:19:31 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Pod;
19import io.fabric8.kubernetes.api.model.PodBuilder;
20import io.fabric8.kubernetes.client.KubernetesClient;
21import org.json.JSONArray;
22import org.json.JSONObject;
23import org.onlab.packet.IpAddress;
24import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.LeadershipService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.core.ApplicationId;
28import org.onosproject.core.CoreService;
29import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
30import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
31import org.onosproject.kubevirtnetworking.api.KubevirtPodAdminService;
32import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
33import org.onosproject.kubevirtnetworking.api.KubevirtPodListener;
34import org.onosproject.kubevirtnetworking.api.KubevirtPort;
35import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
36import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
37import org.onosproject.mastership.MastershipService;
38import org.onosproject.net.device.DeviceService;
39import org.onosproject.net.driver.DriverService;
40import org.osgi.service.component.annotations.Activate;
41import org.osgi.service.component.annotations.Component;
42import org.osgi.service.component.annotations.Deactivate;
43import org.osgi.service.component.annotations.Reference;
44import org.osgi.service.component.annotations.ReferenceCardinality;
45import org.slf4j.Logger;
46
47import java.util.Map;
48import java.util.Objects;
Jian Lid4296d02021-03-12 18:03:58 +090049import java.util.Set;
Jian Lica20b712021-01-18 00:19:31 +090050import java.util.concurrent.ExecutorService;
51
52import static java.util.concurrent.Executors.newSingleThreadExecutor;
53import static org.onlab.util.Tools.groupedThreads;
54import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
Jian Lid4296d02021-03-12 18:03:58 +090055import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
Jian Lica20b712021-01-18 00:19:31 +090056import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
57import static org.slf4j.LoggerFactory.getLogger;
58
59/**
60 * Associates the kubevirt container port and pod.
61 */
62@Component(immediate = true)
63public class KubevirtPodPortMapper {
64
65 private final Logger log = getLogger(getClass());
66
67 private static final String NETWORK_STATUS_KEY = "k8s.v1.cni.cncf.io/network-status";
68 private static final String NAME = "name";
69 private static final String IPS = "ips";
70 private static final String NETWORK_PREFIX = "default/";
71
72 @Reference(cardinality = ReferenceCardinality.MANDATORY)
73 protected CoreService coreService;
74
75 @Reference(cardinality = ReferenceCardinality.MANDATORY)
76 protected MastershipService mastershipService;
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected ClusterService clusterService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY)
82 protected LeadershipService leadershipService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 protected DeviceService deviceService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 protected DriverService driverService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected KubevirtPortAdminService kubevirtPortAdminService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 protected KubevirtNetworkAdminService kubevirtNetworkAdminService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected KubevirtPodAdminService kubevirtPodAdminService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected KubevirtApiConfigService kubevirtApiConfigService;
101
102 private final ExecutorService eventExecutor = newSingleThreadExecutor(
103 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
104
105 private final InternalKubevirtPodListener kubevirtPodListener =
106 new InternalKubevirtPodListener();
107
108 private ApplicationId appId;
109 private NodeId localNodeId;
110
111 @Activate
112 protected void activate() {
113 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
114 localNodeId = clusterService.getLocalNode().id();
115 leadershipService.runForLeadership(appId.name());
116 kubevirtPodAdminService.addListener(kubevirtPodListener);
117
118 log.info("Started");
119 }
120
121 @Deactivate
122 protected void deactivate() {
123 kubevirtPodAdminService.removeListener(kubevirtPodListener);
124 leadershipService.withdraw(appId.name());
125 eventExecutor.shutdown();
126
127 log.info("Stopped");
128 }
129
130 private class InternalKubevirtPodListener implements KubevirtPodListener {
131
132 private boolean isRelevantHelper() {
133 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
134 }
135
136 @Override
137 public void event(KubevirtPodEvent event) {
138 switch (event.type()) {
139 case KUBEVIRT_POD_UPDATED:
140 eventExecutor.execute(() -> processPodUpdate(event.subject()));
141 break;
142 case KUBEVIRT_POD_REMOVED:
143 eventExecutor.execute(() -> processPodDeletion(event.subject()));
144 break;
145 case KUBEVIRT_POD_CREATED:
Jian Li7a581b12021-02-18 14:24:32 +0900146 eventExecutor.execute(() -> processPodCreation(event.subject()));
147 break;
Jian Lica20b712021-01-18 00:19:31 +0900148 default:
149 // do nothing
150 break;
151 }
152 }
153
Jian Li7a581b12021-02-18 14:24:32 +0900154 private void processPodCreation(Pod pod) {
155 if (!isRelevantHelper()) {
156 return;
157 }
158
Jian Li7a581b12021-02-18 14:24:32 +0900159 Map<String, String> annots = pod.getMetadata().getAnnotations();
160 if (annots == null) {
161 return;
162 }
163
164 if (!annots.containsKey(NETWORK_STATUS_KEY)) {
165 return;
166 }
167
168 try {
169 String networkStatusStr = pod.getMetadata().getAnnotations().get(NETWORK_STATUS_KEY);
170 JSONArray networkStatus = new JSONArray(networkStatusStr);
171 for (int i = 0; i < networkStatus.length(); i++) {
172 JSONObject object = networkStatus.getJSONObject(i);
173 String name = object.getString(NAME);
174 KubevirtNetwork jsonNetwork = kubevirtNetworkAdminService.networks().stream()
175 .filter(n -> (NETWORK_PREFIX + n.name()).equals(name))
176 .findAny().orElse(null);
177 if (jsonNetwork != null) {
178 JSONArray ips = object.getJSONArray(IPS);
179 if (ips != null && ips.length() > 0) {
180 IpAddress ip = IpAddress.valueOf(ips.getString(0));
181 kubevirtNetworkAdminService.reserveIp(jsonNetwork.networkId(), ip);
182 }
183 }
184 }
185 } catch (Exception e) {
186 log.error("Failed to reserve IP address", e);
187 }
188
Jian Lid4296d02021-03-12 18:03:58 +0900189 Set<KubevirtPort> ports = getPorts(kubevirtNetworkAdminService.networks(), pod);
190 if (ports.size() == 0) {
Jian Li7a581b12021-02-18 14:24:32 +0900191 return;
192 }
193
Jian Lid4296d02021-03-12 18:03:58 +0900194 ports.forEach(port -> {
195 if (kubevirtPortAdminService.port(port.macAddress()) == null) {
196 kubevirtPortAdminService.createPort(port);
197 }
198 });
Jian Li7a581b12021-02-18 14:24:32 +0900199 }
200
Jian Lica20b712021-01-18 00:19:31 +0900201 private void processPodUpdate(Pod pod) {
202 if (!isRelevantHelper()) {
203 return;
204 }
205
Jian Lid4296d02021-03-12 18:03:58 +0900206 Set<KubevirtPort> ports = getPorts(kubevirtNetworkAdminService.networks(), pod);
207 if (ports.size() == 0) {
Jian Lica20b712021-01-18 00:19:31 +0900208 return;
209 }
210
Jian Lid4296d02021-03-12 18:03:58 +0900211 for (KubevirtPort port : ports) {
212 if (kubevirtPortAdminService.port(port.macAddress()) != null) {
213 continue;
Jian Lica20b712021-01-18 00:19:31 +0900214 }
Jian Lid4296d02021-03-12 18:03:58 +0900215
216 if (port.ipAddress() == null) {
217 try {
218 IpAddress ip = kubevirtNetworkAdminService.allocateIp(port.networkId());
219 log.info("IP address {} is allocated from network {}", ip, port.networkId());
220 port = port.updateIpAddress(ip);
221
222 // update the POD annotation to inject the allocated IP address
223 String networkStatusStr = pod.getMetadata().getAnnotations().get(NETWORK_STATUS_KEY);
224 JSONArray networkStatus = new JSONArray(networkStatusStr);
225 for (int i = 0; i < networkStatus.length(); i++) {
226 JSONObject object = networkStatus.getJSONObject(i);
227 String name = object.getString(NAME);
228
229 if (name.equals(NETWORK_PREFIX + port.networkId())) {
230 JSONArray ipsJson = new JSONArray();
231 ipsJson.put(ip.toString());
232 object.put(IPS, ipsJson);
233 }
234 }
235 Map<String, String> annots = pod.getMetadata().getAnnotations();
236 annots.put(NETWORK_STATUS_KEY, networkStatus.toString(4));
237
238 KubernetesClient client = k8sClient(kubevirtApiConfigService);
239
240 if (client == null) {
241 return;
242 }
243
244 client.pods().inNamespace(pod.getMetadata().getNamespace())
245 .withName(pod.getMetadata().getName())
246 .edit(r -> new PodBuilder(r)
247 .editMetadata()
248 .addToAnnotations(annots)
249 .endMetadata().build()
250 );
251 } catch (Exception e) {
252 log.error("Failed to allocate IP address", e);
253 }
254 }
255 kubevirtPortAdminService.createPort(port);
Jian Lica20b712021-01-18 00:19:31 +0900256 }
Jian Lica20b712021-01-18 00:19:31 +0900257 }
258
259 private void processPodDeletion(Pod pod) {
260 if (!isRelevantHelper()) {
261 return;
262 }
263
Jian Lid4296d02021-03-12 18:03:58 +0900264 Set<KubevirtPort> ports = getPorts(kubevirtNetworkAdminService.networks(), pod);
265 if (ports.size() == 0) {
Jian Lica20b712021-01-18 00:19:31 +0900266 return;
267 }
268
Jian Lid4296d02021-03-12 18:03:58 +0900269 ports.forEach(port -> {
270 if (port.ipAddress() != null) {
271 kubevirtNetworkAdminService.releaseIp(port.networkId(), port.ipAddress());
272 }
Jian Lica20b712021-01-18 00:19:31 +0900273
Jian Lid4296d02021-03-12 18:03:58 +0900274 kubevirtPortAdminService.removePort(port.macAddress());
275 });
Jian Lica20b712021-01-18 00:19:31 +0900276 }
277 }
278}