blob: 32bed582daa06358c24058f10cfb6e3536799489 [file] [log] [blame]
Jian Li16eed162021-01-15 16:58:48 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.fabric8.kubernetes.client.KubernetesClient;
19import io.fabric8.kubernetes.client.Watcher;
20import io.fabric8.kubernetes.client.WatcherException;
21import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
Jian Li97e6fc32021-02-01 20:36:45 +090022import org.apache.commons.lang.StringUtils;
23import org.json.JSONArray;
Jian Li16eed162021-01-15 16:58:48 +090024import org.json.JSONException;
25import org.json.JSONObject;
26import org.onlab.packet.IpAddress;
Jian Li97e6fc32021-02-01 20:36:45 +090027import org.onlab.packet.IpPrefix;
Jian Li16eed162021-01-15 16:58:48 +090028import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.LeadershipService;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
33import org.onosproject.kubevirtnetworking.api.DefaultKubevirtNetwork;
Jian Li97e6fc32021-02-01 20:36:45 +090034import org.onosproject.kubevirtnetworking.api.KubevirtHostRoute;
Jian Li16eed162021-01-15 16:58:48 +090035import org.onosproject.kubevirtnetworking.api.KubevirtIpPool;
36import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
37import org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type;
38import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
39import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
40import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
41import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
42import org.onosproject.mastership.MastershipService;
43import org.osgi.service.component.annotations.Activate;
44import org.osgi.service.component.annotations.Component;
45import org.osgi.service.component.annotations.Deactivate;
46import org.osgi.service.component.annotations.Reference;
47import org.osgi.service.component.annotations.ReferenceCardinality;
48import org.slf4j.Logger;
49
50import java.io.IOException;
Jian Li97e6fc32021-02-01 20:36:45 +090051import java.util.HashSet;
Jian Li16eed162021-01-15 16:58:48 +090052import java.util.Objects;
Jian Li97e6fc32021-02-01 20:36:45 +090053import java.util.Set;
Jian Li16eed162021-01-15 16:58:48 +090054import java.util.concurrent.ExecutorService;
55
56import static java.util.concurrent.Executors.newSingleThreadExecutor;
57import static org.onlab.util.Tools.groupedThreads;
58import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
59import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
60import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
61import static org.slf4j.LoggerFactory.getLogger;
62
63/**
64 * Kubernetes network-attachment-definition watcher used for feeding kubevirt network information.
65 */
66@Component(immediate = true)
67public class NetworkAttachmentDefinitionWatcher {
68
69 private final Logger log = getLogger(getClass());
70
71 private static final String NETWORK_CONFIG = "network-config";
72 private static final String TYPE = "type";
73 private static final String MTU = "mtu";
74 private static final String SEGMENT_ID = "segmentId";
75 private static final String GATEWAY_IP = "gatewayIp";
76 private static final String CIDR = "cidr";
77 private static final String HOST_ROUTES = "hostRoutes";
Jian Li97e6fc32021-02-01 20:36:45 +090078 private static final String DESTINATION = "destination";
79 private static final String NEXTHOP = "nexthop";
Jian Li16eed162021-01-15 16:58:48 +090080 private static final String IP_POOL = "ipPool";
81 private static final String START = "start";
82 private static final String END = "end";
Jian Li97e6fc32021-02-01 20:36:45 +090083 private static final String DNSES = "dnses";
Jian Li16eed162021-01-15 16:58:48 +090084
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected CoreService coreService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected MastershipService mastershipService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected ClusterService clusterService;
93
94 @Reference(cardinality = ReferenceCardinality.MANDATORY)
95 protected LeadershipService leadershipService;
96
97 @Reference(cardinality = ReferenceCardinality.MANDATORY)
98 protected KubevirtNetworkAdminService adminService;
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected KubevirtApiConfigService configService;
102
103 private final ExecutorService eventExecutor = newSingleThreadExecutor(
104 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
105
106 private final InternalNetworkAttachmentDefinitionWatcher
107 watcher = new InternalNetworkAttachmentDefinitionWatcher();
108 private final InternalKubevirtApiConfigListener
109 configListener = new InternalKubevirtApiConfigListener();
110
111 CustomResourceDefinitionContext nadCrdCxt = new CustomResourceDefinitionContext
112 .Builder()
113 .withGroup("k8s.cni.cncf.io")
114 .withScope("Namespaced")
115 .withVersion("v1")
116 .withPlural("network-attachment-definitions")
117 .build();
118
119 private ApplicationId appId;
120 private NodeId localNodeId;
121
122 @Activate
123 protected void activate() {
124 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
125 localNodeId = clusterService.getLocalNode().id();
126 leadershipService.runForLeadership(appId.name());
127 configService.addListener(configListener);
128
129 log.info("Started");
130 }
131
132 @Deactivate
133 protected void deactivate() {
134 configService.removeListener(configListener);
135 leadershipService.withdraw(appId.name());
136 eventExecutor.shutdown();
137
138 log.info("Stopped");
139 }
140
Jian Li0201a022021-02-19 14:00:07 +0900141 private void instantiateWatcher() {
142 KubernetesClient client = k8sClient(configService);
143
144 if (client != null) {
145 try {
146 client.customResource(nadCrdCxt).watch(watcher);
147 } catch (IOException e) {
148 e.printStackTrace();
149 }
150 }
151 }
152
Jian Li16eed162021-01-15 16:58:48 +0900153 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
154
155 private boolean isRelevantHelper() {
156 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
157 }
158
159 @Override
160 public void event(KubevirtApiConfigEvent event) {
161
162 switch (event.type()) {
163 case KUBEVIRT_API_CONFIG_UPDATED:
164 eventExecutor.execute(this::processConfigUpdate);
165 break;
166 case KUBEVIRT_API_CONFIG_CREATED:
167 case KUBEVIRT_API_CONFIG_REMOVED:
168 default:
169 // do nothing
170 break;
171 }
172 }
173
174 private void processConfigUpdate() {
175 if (!isRelevantHelper()) {
176 return;
177 }
178
Jian Li0201a022021-02-19 14:00:07 +0900179 instantiateWatcher();
Jian Li16eed162021-01-15 16:58:48 +0900180 }
181 }
182
183 private class InternalNetworkAttachmentDefinitionWatcher
184 implements Watcher<String> {
185
186 @Override
187 public void eventReceived(Action action, String resource) {
188 switch (action) {
189 case ADDED:
190 eventExecutor.execute(() -> processAddition(resource));
191 break;
192 case MODIFIED:
193 eventExecutor.execute(() -> processModification(resource));
194 break;
195 case DELETED:
196 eventExecutor.execute(() -> processDeletion(resource));
197 break;
198 case ERROR:
199 log.warn("Failures processing network-attachment-definition manipulation.");
200 break;
201 default:
202 break;
203 }
204 }
205
206 @Override
207 public void onClose(WatcherException e) {
Jian Li0201a022021-02-19 14:00:07 +0900208 // due to the bugs in fabric8, the watcher might be closed,
209 // we will re-instantiate the watcher in this case
210 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
211 log.warn("Network-attachment-definition watcher OnClose, re-instantiate the watcher...");
212
213 instantiateWatcher();
Jian Li16eed162021-01-15 16:58:48 +0900214 }
215
216 private void processAddition(String resource) {
217 if (!isMaster()) {
218 return;
219 }
220
221 String name = parseName(resource);
222
223 log.trace("Process NetworkAttachmentDefinition {} creating event from API server.",
224 name);
225
226 KubevirtNetwork network = parseKubevirtNetwork(resource);
227 if (network != null) {
Jian Li9ca07f52021-01-19 23:42:55 +0900228 if (adminService.network(network.networkId()) == null) {
229 adminService.createNetwork(network);
230 }
Jian Li16eed162021-01-15 16:58:48 +0900231 }
232 }
233
234 private void processModification(String resource) {
235 if (!isMaster()) {
236 return;
237 }
238
239 String name = parseName(resource);
240
241 log.trace("Process NetworkAttachmentDefinition {} updating event from API server.",
242 name);
243
244 KubevirtNetwork network = parseKubevirtNetwork(resource);
245 if (network != null) {
246 adminService.updateNetwork(network);
247 }
248 }
249
250 private void processDeletion(String resource) {
251 if (!isMaster()) {
252 return;
253 }
254
255 String name = parseName(resource);
256
257 log.trace("Process NetworkAttachmentDefinition {} removal event from API server.",
258 name);
259
260 adminService.removeNetwork(name);
261 }
262
263 private boolean isMaster() {
264 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
265 }
266
267 private String parseName(String resource) {
268 try {
269 JSONObject json = new JSONObject(resource);
270 return json.getJSONObject("metadata").getString("name");
271 } catch (JSONException e) {
272 log.error("");
273 }
274 return "";
275 }
276
277 private KubevirtNetwork parseKubevirtNetwork(String resource) {
278 try {
279 JSONObject json = new JSONObject(resource);
280 String name = parseName(resource);
281 JSONObject annots = json.getJSONObject("metadata").getJSONObject("annotations");
282 String networkConfig = annots.getString(NETWORK_CONFIG);
283 if (networkConfig != null) {
284 KubevirtNetwork.Builder builder = DefaultKubevirtNetwork.builder();
285
286 JSONObject configJson = new JSONObject(networkConfig);
287 String type = configJson.getString(TYPE);
288 Integer mtu = configJson.getInt(MTU);
289 String gatewayIp = configJson.getString(GATEWAY_IP);
290
291 if (!type.equalsIgnoreCase(FLAT.name())) {
292 builder.segmentId(configJson.getString(SEGMENT_ID));
293 }
294
295 String cidr = configJson.getString(CIDR);
296
297 JSONObject poolJson = configJson.getJSONObject(IP_POOL);
298 if (poolJson != null) {
299 String start = poolJson.getString(START);
300 String end = poolJson.getString(END);
301 builder.ipPool(new KubevirtIpPool(
302 IpAddress.valueOf(start), IpAddress.valueOf(end)));
303 }
304
Jian Li2417ab72021-02-02 17:35:12 +0900305 if (configJson.has(HOST_ROUTES)) {
306 JSONArray routesJson = configJson.getJSONArray(HOST_ROUTES);
307 Set<KubevirtHostRoute> hostRoutes = new HashSet<>();
308 if (routesJson != null) {
309 for (int i = 0; i < routesJson.length(); i++) {
310 JSONObject route = routesJson.getJSONObject(i);
311 String destinationStr = route.getString(DESTINATION);
312 String nexthopStr = route.getString(NEXTHOP);
Jian Li97e6fc32021-02-01 20:36:45 +0900313
Jian Li2417ab72021-02-02 17:35:12 +0900314 if (StringUtils.isNotEmpty(destinationStr) &&
315 StringUtils.isNotEmpty(nexthopStr)) {
316 hostRoutes.add(new KubevirtHostRoute(
317 IpPrefix.valueOf(destinationStr),
318 IpAddress.valueOf(nexthopStr)));
319 }
Jian Li97e6fc32021-02-01 20:36:45 +0900320 }
321 }
Jian Li2417ab72021-02-02 17:35:12 +0900322 builder.hostRoutes(hostRoutes);
Jian Li97e6fc32021-02-01 20:36:45 +0900323 }
Jian Li97e6fc32021-02-01 20:36:45 +0900324
Jian Li2417ab72021-02-02 17:35:12 +0900325 if (configJson.has(DNSES)) {
326 JSONArray dnsesJson = configJson.getJSONArray(DNSES);
327 Set<IpAddress> dnses = new HashSet<>();
328 if (dnsesJson != null) {
329 for (int i = 0; i < dnsesJson.length(); i++) {
330 String dns = dnsesJson.getString(i);
331 if (StringUtils.isNotEmpty(dns)) {
332 dnses.add(IpAddress.valueOf(dns));
333 }
334 }
Jian Li97e6fc32021-02-01 20:36:45 +0900335 }
Jian Li2417ab72021-02-02 17:35:12 +0900336 builder.dnses(dnses);
Jian Li97e6fc32021-02-01 20:36:45 +0900337 }
Jian Li97e6fc32021-02-01 20:36:45 +0900338
Jian Li16eed162021-01-15 16:58:48 +0900339 builder.networkId(name).name(name).type(Type.valueOf(type))
340 .mtu(mtu).gatewayIp(IpAddress.valueOf(gatewayIp)).cidr(cidr);
341
342 return builder.build();
343 }
344 } catch (JSONException e) {
345 log.error("Failed to parse network attachment definition object");
346 }
347
348 return null;
349 }
350 }
351}