blob: 8529b5cf841e9c12874ee3949ebd445ac72d6991 [file] [log] [blame]
Jian Lid5e8ea82021-01-18 00:19:31 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Pod;
19import io.fabric8.kubernetes.api.model.PodBuilder;
20import io.fabric8.kubernetes.client.KubernetesClient;
21import org.json.JSONArray;
22import org.json.JSONObject;
23import org.onlab.packet.IpAddress;
24import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.LeadershipService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.core.ApplicationId;
28import org.onosproject.core.CoreService;
29import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
30import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
31import org.onosproject.kubevirtnetworking.api.KubevirtPodAdminService;
32import org.onosproject.kubevirtnetworking.api.KubevirtPodEvent;
33import org.onosproject.kubevirtnetworking.api.KubevirtPodListener;
34import org.onosproject.kubevirtnetworking.api.KubevirtPort;
35import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
36import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090037import org.onosproject.kubevirtnode.api.KubevirtNodeService;
Jian Lid5e8ea82021-01-18 00:19:31 +090038import org.onosproject.mastership.MastershipService;
39import org.onosproject.net.device.DeviceService;
40import org.onosproject.net.driver.DriverService;
41import org.osgi.service.component.annotations.Activate;
42import org.osgi.service.component.annotations.Component;
43import org.osgi.service.component.annotations.Deactivate;
44import org.osgi.service.component.annotations.Reference;
45import org.osgi.service.component.annotations.ReferenceCardinality;
46import org.slf4j.Logger;
47
48import java.util.Map;
49import java.util.Objects;
Jian Li3831f0c2021-03-12 18:03:58 +090050import java.util.Set;
Jian Lid5e8ea82021-01-18 00:19:31 +090051import java.util.concurrent.ExecutorService;
52
53import static java.util.concurrent.Executors.newSingleThreadExecutor;
54import static org.onlab.util.Tools.groupedThreads;
55import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
Jian Li3831f0c2021-03-12 18:03:58 +090056import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
Jian Lid5e8ea82021-01-18 00:19:31 +090057import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
58import static org.slf4j.LoggerFactory.getLogger;
59
60/**
61 * Associates the kubevirt container port and pod.
62 */
63@Component(immediate = true)
64public class KubevirtPodPortMapper {
65
66 private final Logger log = getLogger(getClass());
67
68 private static final String NETWORK_STATUS_KEY = "k8s.v1.cni.cncf.io/network-status";
69 private static final String NAME = "name";
70 private static final String IPS = "ips";
71 private static final String NETWORK_PREFIX = "default/";
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY)
74 protected CoreService coreService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected MastershipService mastershipService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY)
80 protected ClusterService clusterService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected LeadershipService leadershipService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected DeviceService deviceService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected DriverService driverService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected KubevirtPortAdminService kubevirtPortAdminService;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected KubevirtNetworkAdminService kubevirtNetworkAdminService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected KubevirtPodAdminService kubevirtPodAdminService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected KubevirtApiConfigService kubevirtApiConfigService;
102
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected KubevirtNodeService kubevirtNodeService;
105
Jian Lid5e8ea82021-01-18 00:19:31 +0900106 private final ExecutorService eventExecutor = newSingleThreadExecutor(
107 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
108
109 private final InternalKubevirtPodListener kubevirtPodListener =
110 new InternalKubevirtPodListener();
111
112 private ApplicationId appId;
113 private NodeId localNodeId;
114
115 @Activate
116 protected void activate() {
117 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
118 localNodeId = clusterService.getLocalNode().id();
119 leadershipService.runForLeadership(appId.name());
120 kubevirtPodAdminService.addListener(kubevirtPodListener);
121
122 log.info("Started");
123 }
124
125 @Deactivate
126 protected void deactivate() {
127 kubevirtPodAdminService.removeListener(kubevirtPodListener);
128 leadershipService.withdraw(appId.name());
129 eventExecutor.shutdown();
130
131 log.info("Stopped");
132 }
133
134 private class InternalKubevirtPodListener implements KubevirtPodListener {
135
136 private boolean isRelevantHelper() {
137 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
138 }
139
140 @Override
141 public void event(KubevirtPodEvent event) {
142 switch (event.type()) {
143 case KUBEVIRT_POD_UPDATED:
144 eventExecutor.execute(() -> processPodUpdate(event.subject()));
145 break;
146 case KUBEVIRT_POD_REMOVED:
147 eventExecutor.execute(() -> processPodDeletion(event.subject()));
148 break;
149 case KUBEVIRT_POD_CREATED:
Jian Li2417ab72021-02-02 17:35:12 +0900150 eventExecutor.execute(() -> processPodCreation(event.subject()));
151 break;
Jian Lid5e8ea82021-01-18 00:19:31 +0900152 default:
153 // do nothing
154 break;
155 }
156 }
157
Jian Li2417ab72021-02-02 17:35:12 +0900158 private void processPodCreation(Pod pod) {
159 if (!isRelevantHelper()) {
160 return;
161 }
162
Jian Li2417ab72021-02-02 17:35:12 +0900163 Map<String, String> annots = pod.getMetadata().getAnnotations();
164 if (annots == null) {
165 return;
166 }
167
168 if (!annots.containsKey(NETWORK_STATUS_KEY)) {
169 return;
170 }
171
172 try {
173 String networkStatusStr = pod.getMetadata().getAnnotations().get(NETWORK_STATUS_KEY);
174 JSONArray networkStatus = new JSONArray(networkStatusStr);
175 for (int i = 0; i < networkStatus.length(); i++) {
176 JSONObject object = networkStatus.getJSONObject(i);
177 String name = object.getString(NAME);
178 KubevirtNetwork jsonNetwork = kubevirtNetworkAdminService.networks().stream()
179 .filter(n -> (NETWORK_PREFIX + n.name()).equals(name))
180 .findAny().orElse(null);
181 if (jsonNetwork != null) {
182 JSONArray ips = object.getJSONArray(IPS);
183 if (ips != null && ips.length() > 0) {
184 IpAddress ip = IpAddress.valueOf(ips.getString(0));
185 kubevirtNetworkAdminService.reserveIp(jsonNetwork.networkId(), ip);
186 }
187 }
188 }
189 } catch (Exception e) {
190 log.error("Failed to reserve IP address", e);
191 }
192
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900193 Set<KubevirtPort> ports = getPorts(kubevirtNodeService, kubevirtNetworkAdminService.networks(), pod);
Jian Li3831f0c2021-03-12 18:03:58 +0900194 if (ports.size() == 0) {
Jian Li2417ab72021-02-02 17:35:12 +0900195 return;
196 }
197
Jian Li3831f0c2021-03-12 18:03:58 +0900198 ports.forEach(port -> {
199 if (kubevirtPortAdminService.port(port.macAddress()) == null) {
200 kubevirtPortAdminService.createPort(port);
201 }
202 });
Jian Li2417ab72021-02-02 17:35:12 +0900203 }
204
Jian Lid5e8ea82021-01-18 00:19:31 +0900205 private void processPodUpdate(Pod pod) {
206 if (!isRelevantHelper()) {
207 return;
208 }
209
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900210 Set<KubevirtPort> ports = getPorts(kubevirtNodeService, kubevirtNetworkAdminService.networks(), pod);
Jian Li3831f0c2021-03-12 18:03:58 +0900211 if (ports.size() == 0) {
Jian Lid5e8ea82021-01-18 00:19:31 +0900212 return;
213 }
214
Jian Li3831f0c2021-03-12 18:03:58 +0900215 for (KubevirtPort port : ports) {
216 if (kubevirtPortAdminService.port(port.macAddress()) != null) {
217 continue;
Jian Lid5e8ea82021-01-18 00:19:31 +0900218 }
Jian Li3831f0c2021-03-12 18:03:58 +0900219
220 if (port.ipAddress() == null) {
221 try {
222 IpAddress ip = kubevirtNetworkAdminService.allocateIp(port.networkId());
223 log.info("IP address {} is allocated from network {}", ip, port.networkId());
224 port = port.updateIpAddress(ip);
225
226 // update the POD annotation to inject the allocated IP address
227 String networkStatusStr = pod.getMetadata().getAnnotations().get(NETWORK_STATUS_KEY);
228 JSONArray networkStatus = new JSONArray(networkStatusStr);
229 for (int i = 0; i < networkStatus.length(); i++) {
230 JSONObject object = networkStatus.getJSONObject(i);
231 String name = object.getString(NAME);
232
233 if (name.equals(NETWORK_PREFIX + port.networkId())) {
234 JSONArray ipsJson = new JSONArray();
235 ipsJson.put(ip.toString());
236 object.put(IPS, ipsJson);
237 }
238 }
239 Map<String, String> annots = pod.getMetadata().getAnnotations();
240 annots.put(NETWORK_STATUS_KEY, networkStatus.toString(4));
241
242 KubernetesClient client = k8sClient(kubevirtApiConfigService);
243
244 if (client == null) {
245 return;
246 }
247
248 client.pods().inNamespace(pod.getMetadata().getNamespace())
249 .withName(pod.getMetadata().getName())
250 .edit(r -> new PodBuilder(r)
251 .editMetadata()
252 .addToAnnotations(annots)
253 .endMetadata().build()
254 );
255 } catch (Exception e) {
256 log.error("Failed to allocate IP address", e);
257 }
258 }
259 kubevirtPortAdminService.createPort(port);
Jian Lid5e8ea82021-01-18 00:19:31 +0900260 }
Jian Lid5e8ea82021-01-18 00:19:31 +0900261 }
262
263 private void processPodDeletion(Pod pod) {
264 if (!isRelevantHelper()) {
265 return;
266 }
267
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900268 Set<KubevirtPort> ports = getPorts(kubevirtNodeService, kubevirtNetworkAdminService.networks(), pod);
Jian Li3831f0c2021-03-12 18:03:58 +0900269 if (ports.size() == 0) {
Jian Lid5e8ea82021-01-18 00:19:31 +0900270 return;
271 }
272
Jian Li3831f0c2021-03-12 18:03:58 +0900273 ports.forEach(port -> {
274 if (port.ipAddress() != null) {
275 kubevirtNetworkAdminService.releaseIp(port.networkId(), port.ipAddress());
276 }
Jian Lid5e8ea82021-01-18 00:19:31 +0900277
Jian Li3831f0c2021-03-12 18:03:58 +0900278 kubevirtPortAdminService.removePort(port.macAddress());
279 });
Jian Lid5e8ea82021-01-18 00:19:31 +0900280 }
281 }
282}