blob: eff1ca4774161a4afcc1b71ad8a5da066b5ce89a [file] [log] [blame]
Jian Li034820d2021-01-15 16:58:48 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.fabric8.kubernetes.client.KubernetesClient;
19import io.fabric8.kubernetes.client.Watcher;
20import io.fabric8.kubernetes.client.WatcherException;
21import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
Jian Li4c35a262021-02-01 20:36:45 +090022import org.apache.commons.lang.StringUtils;
23import org.json.JSONArray;
Jian Li034820d2021-01-15 16:58:48 +090024import org.json.JSONException;
25import org.json.JSONObject;
26import org.onlab.packet.IpAddress;
Jian Li4c35a262021-02-01 20:36:45 +090027import org.onlab.packet.IpPrefix;
Jian Li034820d2021-01-15 16:58:48 +090028import org.onosproject.cluster.ClusterService;
29import org.onosproject.cluster.LeadershipService;
30import org.onosproject.cluster.NodeId;
31import org.onosproject.core.ApplicationId;
32import org.onosproject.core.CoreService;
33import org.onosproject.kubevirtnetworking.api.DefaultKubevirtNetwork;
Jian Li4c35a262021-02-01 20:36:45 +090034import org.onosproject.kubevirtnetworking.api.KubevirtHostRoute;
Jian Li034820d2021-01-15 16:58:48 +090035import org.onosproject.kubevirtnetworking.api.KubevirtIpPool;
36import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
37import org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type;
38import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
39import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
40import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
41import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
42import org.onosproject.mastership.MastershipService;
43import org.osgi.service.component.annotations.Activate;
44import org.osgi.service.component.annotations.Component;
45import org.osgi.service.component.annotations.Deactivate;
46import org.osgi.service.component.annotations.Reference;
47import org.osgi.service.component.annotations.ReferenceCardinality;
48import org.slf4j.Logger;
49
50import java.io.IOException;
Jian Li4c35a262021-02-01 20:36:45 +090051import java.util.HashSet;
Jian Li034820d2021-01-15 16:58:48 +090052import java.util.Objects;
Jian Li4c35a262021-02-01 20:36:45 +090053import java.util.Set;
Jian Li034820d2021-01-15 16:58:48 +090054import java.util.concurrent.ExecutorService;
55
56import static java.util.concurrent.Executors.newSingleThreadExecutor;
57import static org.onlab.util.Tools.groupedThreads;
58import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
59import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
60import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
Jian Li810f58c2021-02-27 01:10:50 +090061import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.parseResourceName;
Jian Li034820d2021-01-15 16:58:48 +090062import static org.slf4j.LoggerFactory.getLogger;
63
64/**
65 * Kubernetes network-attachment-definition watcher used for feeding kubevirt network information.
66 */
67@Component(immediate = true)
68public class NetworkAttachmentDefinitionWatcher {
69
70 private final Logger log = getLogger(getClass());
71
72 private static final String NETWORK_CONFIG = "network-config";
73 private static final String TYPE = "type";
74 private static final String MTU = "mtu";
75 private static final String SEGMENT_ID = "segmentId";
76 private static final String GATEWAY_IP = "gatewayIp";
77 private static final String CIDR = "cidr";
78 private static final String HOST_ROUTES = "hostRoutes";
Jian Li4c35a262021-02-01 20:36:45 +090079 private static final String DESTINATION = "destination";
80 private static final String NEXTHOP = "nexthop";
Jian Li034820d2021-01-15 16:58:48 +090081 private static final String IP_POOL = "ipPool";
82 private static final String START = "start";
83 private static final String END = "end";
Jian Li4c35a262021-02-01 20:36:45 +090084 private static final String DNSES = "dnses";
Jian Li034820d2021-01-15 16:58:48 +090085
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected CoreService coreService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected MastershipService mastershipService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected ClusterService clusterService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected LeadershipService leadershipService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected KubevirtNetworkAdminService adminService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected KubevirtApiConfigService configService;
103
104 private final ExecutorService eventExecutor = newSingleThreadExecutor(
105 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
106
107 private final InternalNetworkAttachmentDefinitionWatcher
108 watcher = new InternalNetworkAttachmentDefinitionWatcher();
109 private final InternalKubevirtApiConfigListener
110 configListener = new InternalKubevirtApiConfigListener();
111
112 CustomResourceDefinitionContext nadCrdCxt = new CustomResourceDefinitionContext
113 .Builder()
114 .withGroup("k8s.cni.cncf.io")
115 .withScope("Namespaced")
116 .withVersion("v1")
117 .withPlural("network-attachment-definitions")
118 .build();
119
120 private ApplicationId appId;
121 private NodeId localNodeId;
122
123 @Activate
124 protected void activate() {
125 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
126 localNodeId = clusterService.getLocalNode().id();
127 leadershipService.runForLeadership(appId.name());
128 configService.addListener(configListener);
129
130 log.info("Started");
131 }
132
133 @Deactivate
134 protected void deactivate() {
135 configService.removeListener(configListener);
136 leadershipService.withdraw(appId.name());
137 eventExecutor.shutdown();
138
139 log.info("Stopped");
140 }
141
Jian Lid2ade0f2021-02-19 14:00:07 +0900142 private void instantiateWatcher() {
143 KubernetesClient client = k8sClient(configService);
144
145 if (client != null) {
146 try {
147 client.customResource(nadCrdCxt).watch(watcher);
148 } catch (IOException e) {
149 e.printStackTrace();
150 }
151 }
152 }
153
Jian Li034820d2021-01-15 16:58:48 +0900154 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
155
156 private boolean isRelevantHelper() {
157 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
158 }
159
160 @Override
161 public void event(KubevirtApiConfigEvent event) {
162
163 switch (event.type()) {
164 case KUBEVIRT_API_CONFIG_UPDATED:
165 eventExecutor.execute(this::processConfigUpdate);
166 break;
167 case KUBEVIRT_API_CONFIG_CREATED:
168 case KUBEVIRT_API_CONFIG_REMOVED:
169 default:
170 // do nothing
171 break;
172 }
173 }
174
175 private void processConfigUpdate() {
176 if (!isRelevantHelper()) {
177 return;
178 }
179
Jian Lid2ade0f2021-02-19 14:00:07 +0900180 instantiateWatcher();
Jian Li034820d2021-01-15 16:58:48 +0900181 }
182 }
183
184 private class InternalNetworkAttachmentDefinitionWatcher
185 implements Watcher<String> {
186
187 @Override
188 public void eventReceived(Action action, String resource) {
189 switch (action) {
190 case ADDED:
191 eventExecutor.execute(() -> processAddition(resource));
192 break;
193 case MODIFIED:
194 eventExecutor.execute(() -> processModification(resource));
195 break;
196 case DELETED:
197 eventExecutor.execute(() -> processDeletion(resource));
198 break;
199 case ERROR:
200 log.warn("Failures processing network-attachment-definition manipulation.");
201 break;
202 default:
203 break;
204 }
205 }
206
207 @Override
208 public void onClose(WatcherException e) {
Jian Lid2ade0f2021-02-19 14:00:07 +0900209 // due to the bugs in fabric8, the watcher might be closed,
210 // we will re-instantiate the watcher in this case
211 // FIXME: https://github.com/fabric8io/kubernetes-client/issues/2135
212 log.warn("Network-attachment-definition watcher OnClose, re-instantiate the watcher...");
213
214 instantiateWatcher();
Jian Li034820d2021-01-15 16:58:48 +0900215 }
216
217 private void processAddition(String resource) {
218 if (!isMaster()) {
219 return;
220 }
221
Jian Li810f58c2021-02-27 01:10:50 +0900222 String name = parseResourceName(resource);
Jian Li034820d2021-01-15 16:58:48 +0900223
224 log.trace("Process NetworkAttachmentDefinition {} creating event from API server.",
225 name);
226
227 KubevirtNetwork network = parseKubevirtNetwork(resource);
228 if (network != null) {
Jian Liea585882021-01-19 23:42:55 +0900229 if (adminService.network(network.networkId()) == null) {
230 adminService.createNetwork(network);
231 }
Jian Li034820d2021-01-15 16:58:48 +0900232 }
233 }
234
235 private void processModification(String resource) {
236 if (!isMaster()) {
237 return;
238 }
239
Jian Li810f58c2021-02-27 01:10:50 +0900240 String name = parseResourceName(resource);
Jian Li034820d2021-01-15 16:58:48 +0900241
242 log.trace("Process NetworkAttachmentDefinition {} updating event from API server.",
243 name);
244
245 KubevirtNetwork network = parseKubevirtNetwork(resource);
246 if (network != null) {
247 adminService.updateNetwork(network);
248 }
249 }
250
251 private void processDeletion(String resource) {
252 if (!isMaster()) {
253 return;
254 }
255
Jian Li810f58c2021-02-27 01:10:50 +0900256 String name = parseResourceName(resource);
Jian Li034820d2021-01-15 16:58:48 +0900257
258 log.trace("Process NetworkAttachmentDefinition {} removal event from API server.",
259 name);
260
Jian Li362bd8b2021-06-11 14:47:57 +0900261 if (adminService.network(name) != null) {
262 adminService.removeNetwork(name);
263 }
Jian Li034820d2021-01-15 16:58:48 +0900264 }
265
266 private boolean isMaster() {
267 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
268 }
269
Jian Li034820d2021-01-15 16:58:48 +0900270 private KubevirtNetwork parseKubevirtNetwork(String resource) {
271 try {
272 JSONObject json = new JSONObject(resource);
Jian Li810f58c2021-02-27 01:10:50 +0900273 String name = parseResourceName(resource);
Jian Li034820d2021-01-15 16:58:48 +0900274 JSONObject annots = json.getJSONObject("metadata").getJSONObject("annotations");
275 String networkConfig = annots.getString(NETWORK_CONFIG);
276 if (networkConfig != null) {
277 KubevirtNetwork.Builder builder = DefaultKubevirtNetwork.builder();
278
279 JSONObject configJson = new JSONObject(networkConfig);
280 String type = configJson.getString(TYPE);
281 Integer mtu = configJson.getInt(MTU);
282 String gatewayIp = configJson.getString(GATEWAY_IP);
283
284 if (!type.equalsIgnoreCase(FLAT.name())) {
285 builder.segmentId(configJson.getString(SEGMENT_ID));
286 }
287
288 String cidr = configJson.getString(CIDR);
289
290 JSONObject poolJson = configJson.getJSONObject(IP_POOL);
291 if (poolJson != null) {
292 String start = poolJson.getString(START);
293 String end = poolJson.getString(END);
294 builder.ipPool(new KubevirtIpPool(
295 IpAddress.valueOf(start), IpAddress.valueOf(end)));
296 }
297
Jian Li7a581b12021-02-18 14:24:32 +0900298 if (configJson.has(HOST_ROUTES)) {
299 JSONArray routesJson = configJson.getJSONArray(HOST_ROUTES);
300 Set<KubevirtHostRoute> hostRoutes = new HashSet<>();
301 if (routesJson != null) {
302 for (int i = 0; i < routesJson.length(); i++) {
303 JSONObject route = routesJson.getJSONObject(i);
304 String destinationStr = route.getString(DESTINATION);
305 String nexthopStr = route.getString(NEXTHOP);
Jian Li4c35a262021-02-01 20:36:45 +0900306
Jian Li7a581b12021-02-18 14:24:32 +0900307 if (StringUtils.isNotEmpty(destinationStr) &&
308 StringUtils.isNotEmpty(nexthopStr)) {
309 hostRoutes.add(new KubevirtHostRoute(
310 IpPrefix.valueOf(destinationStr),
311 IpAddress.valueOf(nexthopStr)));
312 }
Jian Li4c35a262021-02-01 20:36:45 +0900313 }
314 }
Jian Li7a581b12021-02-18 14:24:32 +0900315 builder.hostRoutes(hostRoutes);
Jian Li4c35a262021-02-01 20:36:45 +0900316 }
Jian Li4c35a262021-02-01 20:36:45 +0900317
Jian Li7a581b12021-02-18 14:24:32 +0900318 if (configJson.has(DNSES)) {
319 JSONArray dnsesJson = configJson.getJSONArray(DNSES);
320 Set<IpAddress> dnses = new HashSet<>();
321 if (dnsesJson != null) {
322 for (int i = 0; i < dnsesJson.length(); i++) {
323 String dns = dnsesJson.getString(i);
324 if (StringUtils.isNotEmpty(dns)) {
325 dnses.add(IpAddress.valueOf(dns));
326 }
327 }
Jian Li4c35a262021-02-01 20:36:45 +0900328 }
Jian Li7a581b12021-02-18 14:24:32 +0900329 builder.dnses(dnses);
Jian Li4c35a262021-02-01 20:36:45 +0900330 }
Jian Li4c35a262021-02-01 20:36:45 +0900331
Jian Li034820d2021-01-15 16:58:48 +0900332 builder.networkId(name).name(name).type(Type.valueOf(type))
333 .mtu(mtu).gatewayIp(IpAddress.valueOf(gatewayIp)).cidr(cidr);
334
335 return builder.build();
336 }
337 } catch (JSONException e) {
338 log.error("Failed to parse network attachment definition object");
339 }
340
341 return null;
342 }
343 }
344}