blob: d9076da410bc0c6f11179117f6e9658f1df52625 [file] [log] [blame]
Jian Lid5e8ea82021-01-18 00:19:31 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Pod;
19import io.fabric8.kubernetes.api.model.PodBuilder;
20import io.fabric8.kubernetes.client.KubernetesClient;
21import org.json.JSONArray;
22import org.json.JSONObject;
23import org.onlab.packet.IpAddress;
24import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.LeadershipService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.core.ApplicationId;
28import org.onosproject.core.CoreService;
29import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
30import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
31import org.onosproject.kubevirtnetworking.api.KubevirtPodAdminService;
32import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
33import org.onosproject.kubevirtnetworking.api.KubevirtPodListener;
34import org.onosproject.kubevirtnetworking.api.KubevirtPort;
35import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
36import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
37import org.onosproject.mastership.MastershipService;
38import org.onosproject.net.device.DeviceService;
39import org.onosproject.net.driver.DriverService;
40import org.osgi.service.component.annotations.Activate;
41import org.osgi.service.component.annotations.Component;
42import org.osgi.service.component.annotations.Deactivate;
43import org.osgi.service.component.annotations.Reference;
44import org.osgi.service.component.annotations.ReferenceCardinality;
45import org.slf4j.Logger;
46
47import java.util.Map;
48import java.util.Objects;
49import java.util.concurrent.ExecutorService;
50
51import static java.util.concurrent.Executors.newSingleThreadExecutor;
52import static org.onlab.util.Tools.groupedThreads;
53import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
54import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPort;
55import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
56import static org.slf4j.LoggerFactory.getLogger;
57
58/**
59 * Associates the kubevirt container port and pod.
60 */
61@Component(immediate = true)
62public class KubevirtPodPortMapper {
63
64 private final Logger log = getLogger(getClass());
65
66 private static final String NETWORK_STATUS_KEY = "k8s.v1.cni.cncf.io/network-status";
67 private static final String NAME = "name";
68 private static final String IPS = "ips";
69 private static final String NETWORK_PREFIX = "default/";
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY)
72 protected CoreService coreService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY)
75 protected MastershipService mastershipService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY)
78 protected ClusterService clusterService;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected LeadershipService leadershipService;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected DeviceService deviceService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected DriverService driverService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected KubevirtPortAdminService kubevirtPortAdminService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected KubevirtNetworkAdminService kubevirtNetworkAdminService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected KubevirtPodAdminService kubevirtPodAdminService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected KubevirtApiConfigService kubevirtApiConfigService;
100
101 private final ExecutorService eventExecutor = newSingleThreadExecutor(
102 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
103
104 private final InternalKubevirtPodListener kubevirtPodListener =
105 new InternalKubevirtPodListener();
106
107 private ApplicationId appId;
108 private NodeId localNodeId;
109
110 @Activate
111 protected void activate() {
112 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
113 localNodeId = clusterService.getLocalNode().id();
114 leadershipService.runForLeadership(appId.name());
115 kubevirtPodAdminService.addListener(kubevirtPodListener);
116
117 log.info("Started");
118 }
119
120 @Deactivate
121 protected void deactivate() {
122 kubevirtPodAdminService.removeListener(kubevirtPodListener);
123 leadershipService.withdraw(appId.name());
124 eventExecutor.shutdown();
125
126 log.info("Stopped");
127 }
128
129 private class InternalKubevirtPodListener implements KubevirtPodListener {
130
131 private boolean isRelevantHelper() {
132 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
133 }
134
135 @Override
136 public void event(KubevirtPodEvent event) {
137 switch (event.type()) {
138 case KUBEVIRT_POD_UPDATED:
139 eventExecutor.execute(() -> processPodUpdate(event.subject()));
140 break;
141 case KUBEVIRT_POD_REMOVED:
142 eventExecutor.execute(() -> processPodDeletion(event.subject()));
143 break;
144 case KUBEVIRT_POD_CREATED:
Jian Li2417ab72021-02-02 17:35:12 +0900145 eventExecutor.execute(() -> processPodCreation(event.subject()));
146 break;
Jian Lid5e8ea82021-01-18 00:19:31 +0900147 default:
148 // do nothing
149 break;
150 }
151 }
152
Jian Li2417ab72021-02-02 17:35:12 +0900153 private void processPodCreation(Pod pod) {
154 if (!isRelevantHelper()) {
155 return;
156 }
157
158 KubernetesClient client = k8sClient(kubevirtApiConfigService);
159
160 if (client == null) {
161 return;
162 }
163
164 Map<String, String> annots = pod.getMetadata().getAnnotations();
165 if (annots == null) {
166 return;
167 }
168
169 if (!annots.containsKey(NETWORK_STATUS_KEY)) {
170 return;
171 }
172
173 try {
174 String networkStatusStr = pod.getMetadata().getAnnotations().get(NETWORK_STATUS_KEY);
175 JSONArray networkStatus = new JSONArray(networkStatusStr);
176 for (int i = 0; i < networkStatus.length(); i++) {
177 JSONObject object = networkStatus.getJSONObject(i);
178 String name = object.getString(NAME);
179 KubevirtNetwork jsonNetwork = kubevirtNetworkAdminService.networks().stream()
180 .filter(n -> (NETWORK_PREFIX + n.name()).equals(name))
181 .findAny().orElse(null);
182 if (jsonNetwork != null) {
183 JSONArray ips = object.getJSONArray(IPS);
184 if (ips != null && ips.length() > 0) {
185 IpAddress ip = IpAddress.valueOf(ips.getString(0));
186 kubevirtNetworkAdminService.reserveIp(jsonNetwork.networkId(), ip);
187 }
188 }
189 }
190 } catch (Exception e) {
191 log.error("Failed to reserve IP address", e);
192 }
193
194 KubevirtPort port = getPort(kubevirtNetworkAdminService.networks(), pod);
195 if (port == null) {
196 return;
197 }
198
199 if (kubevirtPortAdminService.port(port.macAddress()) == null) {
200 kubevirtPortAdminService.createPort(port);
201 }
202 }
203
Jian Lid5e8ea82021-01-18 00:19:31 +0900204 private void processPodUpdate(Pod pod) {
205 if (!isRelevantHelper()) {
206 return;
207 }
208
209 KubernetesClient client = k8sClient(kubevirtApiConfigService);
210
211 if (client == null) {
212 return;
213 }
214
215 KubevirtPort port = getPort(kubevirtNetworkAdminService.networks(), pod);
216 if (port == null) {
217 return;
218 }
219
220 if (kubevirtPortAdminService.port(port.macAddress()) != null) {
221 return;
222 }
223
224 if (port.ipAddress() == null) {
225 try {
226 IpAddress ip = kubevirtNetworkAdminService.allocateIp(port.networkId());
227 port = port.updateIpAddress(ip);
228
229 // update the POD annotation to inject the allocated IP address
230 String networkStatusStr = pod.getMetadata().getAnnotations().get(NETWORK_STATUS_KEY);
231 JSONArray networkStatus = new JSONArray(networkStatusStr);
232 for (int i = 0; i < networkStatus.length(); i++) {
233 JSONObject object = networkStatus.getJSONObject(i);
234 String name = object.getString(NAME);
235 KubevirtNetwork jsonNetwork = kubevirtNetworkAdminService.networks().stream()
236 .filter(n -> (NETWORK_PREFIX + n.name()).equals(name))
237 .findAny().orElse(null);
238 if (jsonNetwork != null) {
239 JSONArray ipsJson = new JSONArray();
240 ipsJson.put(ip.toString());
241 object.put(IPS, ipsJson);
242 }
243 }
244 Map<String, String> annots = pod.getMetadata().getAnnotations();
245 annots.put(NETWORK_STATUS_KEY, networkStatus.toString(4));
246
247 client.pods().inNamespace(pod.getMetadata().getNamespace())
248 .withName(pod.getMetadata().getName())
249 .edit(r -> new PodBuilder(r)
250 .editMetadata()
251 .addToAnnotations(annots)
252 .endMetadata().build()
253 );
254 } catch (Exception e) {
255 log.error("Failed to allocate IP address", e);
256 }
257 }
258 kubevirtPortAdminService.createPort(port);
259 }
260
261 private void processPodDeletion(Pod pod) {
262 if (!isRelevantHelper()) {
263 return;
264 }
265
266 KubernetesClient client = k8sClient(kubevirtApiConfigService);
267
268 if (client == null) {
269 return;
270 }
271
272 KubevirtPort port = getPort(kubevirtNetworkAdminService.networks(), pod);
273 if (port == null) {
274 return;
275 }
276
277 if (port.ipAddress() != null) {
278 kubevirtNetworkAdminService.releaseIp(port.networkId(), port.ipAddress());
279 }
280
281 kubevirtPortAdminService.removePort(port.macAddress());
282 }
283 }
284}