blob: 186ca90a1d2de1eac61667f5f4a30c38a0f973e2 [file] [log] [blame]
Jian Li16eed162021-01-15 16:58:48 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.fabric8.kubernetes.client.KubernetesClient;
19import io.fabric8.kubernetes.client.Watcher;
20import io.fabric8.kubernetes.client.WatcherException;
21import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
22import org.json.JSONException;
23import org.json.JSONObject;
24import org.onlab.packet.IpAddress;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.kubevirtnetworking.api.DefaultKubevirtNetwork;
31import org.onosproject.kubevirtnetworking.api.KubevirtIpPool;
32import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
33import org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type;
34import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
35import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
36import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
37import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
38import org.onosproject.mastership.MastershipService;
39import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
41import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Reference;
43import org.osgi.service.component.annotations.ReferenceCardinality;
44import org.slf4j.Logger;
45
46import java.io.IOException;
47import java.util.Objects;
48import java.util.concurrent.ExecutorService;
49
50import static java.util.concurrent.Executors.newSingleThreadExecutor;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
53import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.FLAT;
54import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
55import static org.slf4j.LoggerFactory.getLogger;
56
57/**
58 * Kubernetes network-attachment-definition watcher used for feeding kubevirt network information.
59 */
60@Component(immediate = true)
61public class NetworkAttachmentDefinitionWatcher {
62
63 private final Logger log = getLogger(getClass());
64
65 private static final String NETWORK_CONFIG = "network-config";
66 private static final String TYPE = "type";
67 private static final String MTU = "mtu";
68 private static final String SEGMENT_ID = "segmentId";
69 private static final String GATEWAY_IP = "gatewayIp";
70 private static final String CIDR = "cidr";
71 private static final String HOST_ROUTES = "hostRoutes";
72 private static final String IP_POOL = "ipPool";
73 private static final String START = "start";
74 private static final String END = "end";
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected CoreService coreService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY)
80 protected MastershipService mastershipService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected ClusterService clusterService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected LeadershipService leadershipService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected KubevirtNetworkAdminService adminService;
90
91 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected KubevirtApiConfigService configService;
93
94 private final ExecutorService eventExecutor = newSingleThreadExecutor(
95 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
96
97 private final InternalNetworkAttachmentDefinitionWatcher
98 watcher = new InternalNetworkAttachmentDefinitionWatcher();
99 private final InternalKubevirtApiConfigListener
100 configListener = new InternalKubevirtApiConfigListener();
101
102 CustomResourceDefinitionContext nadCrdCxt = new CustomResourceDefinitionContext
103 .Builder()
104 .withGroup("k8s.cni.cncf.io")
105 .withScope("Namespaced")
106 .withVersion("v1")
107 .withPlural("network-attachment-definitions")
108 .build();
109
110 private ApplicationId appId;
111 private NodeId localNodeId;
112
113 @Activate
114 protected void activate() {
115 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
116 localNodeId = clusterService.getLocalNode().id();
117 leadershipService.runForLeadership(appId.name());
118 configService.addListener(configListener);
119
120 log.info("Started");
121 }
122
123 @Deactivate
124 protected void deactivate() {
125 configService.removeListener(configListener);
126 leadershipService.withdraw(appId.name());
127 eventExecutor.shutdown();
128
129 log.info("Stopped");
130 }
131
132 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
133
134 private boolean isRelevantHelper() {
135 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
136 }
137
138 @Override
139 public void event(KubevirtApiConfigEvent event) {
140
141 switch (event.type()) {
142 case KUBEVIRT_API_CONFIG_UPDATED:
143 eventExecutor.execute(this::processConfigUpdate);
144 break;
145 case KUBEVIRT_API_CONFIG_CREATED:
146 case KUBEVIRT_API_CONFIG_REMOVED:
147 default:
148 // do nothing
149 break;
150 }
151 }
152
153 private void processConfigUpdate() {
154 if (!isRelevantHelper()) {
155 return;
156 }
157
158 KubernetesClient client = k8sClient(configService);
159
160 if (client != null) {
161 try {
162 client.customResource(nadCrdCxt).watch(watcher);
163 } catch (IOException e) {
164 e.printStackTrace();
165 }
166 }
167 }
168 }
169
170 private class InternalNetworkAttachmentDefinitionWatcher
171 implements Watcher<String> {
172
173 @Override
174 public void eventReceived(Action action, String resource) {
175 switch (action) {
176 case ADDED:
177 eventExecutor.execute(() -> processAddition(resource));
178 break;
179 case MODIFIED:
180 eventExecutor.execute(() -> processModification(resource));
181 break;
182 case DELETED:
183 eventExecutor.execute(() -> processDeletion(resource));
184 break;
185 case ERROR:
186 log.warn("Failures processing network-attachment-definition manipulation.");
187 break;
188 default:
189 break;
190 }
191 }
192
193 @Override
194 public void onClose(WatcherException e) {
195 log.warn("Network-attachment-definition watcher OnClose", e);
196 }
197
198 private void processAddition(String resource) {
199 if (!isMaster()) {
200 return;
201 }
202
203 String name = parseName(resource);
204
205 log.trace("Process NetworkAttachmentDefinition {} creating event from API server.",
206 name);
207
208 KubevirtNetwork network = parseKubevirtNetwork(resource);
209 if (network != null) {
Jian Li9ca07f52021-01-19 23:42:55 +0900210 if (adminService.network(network.networkId()) == null) {
211 adminService.createNetwork(network);
212 }
Jian Li16eed162021-01-15 16:58:48 +0900213 }
214 }
215
216 private void processModification(String resource) {
217 if (!isMaster()) {
218 return;
219 }
220
221 String name = parseName(resource);
222
223 log.trace("Process NetworkAttachmentDefinition {} updating event from API server.",
224 name);
225
226 KubevirtNetwork network = parseKubevirtNetwork(resource);
227 if (network != null) {
228 adminService.updateNetwork(network);
229 }
230 }
231
232 private void processDeletion(String resource) {
233 if (!isMaster()) {
234 return;
235 }
236
237 String name = parseName(resource);
238
239 log.trace("Process NetworkAttachmentDefinition {} removal event from API server.",
240 name);
241
242 adminService.removeNetwork(name);
243 }
244
245 private boolean isMaster() {
246 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
247 }
248
249 private String parseName(String resource) {
250 try {
251 JSONObject json = new JSONObject(resource);
252 return json.getJSONObject("metadata").getString("name");
253 } catch (JSONException e) {
254 log.error("");
255 }
256 return "";
257 }
258
259 private KubevirtNetwork parseKubevirtNetwork(String resource) {
260 try {
261 JSONObject json = new JSONObject(resource);
262 String name = parseName(resource);
263 JSONObject annots = json.getJSONObject("metadata").getJSONObject("annotations");
264 String networkConfig = annots.getString(NETWORK_CONFIG);
265 if (networkConfig != null) {
266 KubevirtNetwork.Builder builder = DefaultKubevirtNetwork.builder();
267
268 JSONObject configJson = new JSONObject(networkConfig);
269 String type = configJson.getString(TYPE);
270 Integer mtu = configJson.getInt(MTU);
271 String gatewayIp = configJson.getString(GATEWAY_IP);
272
273 if (!type.equalsIgnoreCase(FLAT.name())) {
274 builder.segmentId(configJson.getString(SEGMENT_ID));
275 }
276
277 String cidr = configJson.getString(CIDR);
278
279 JSONObject poolJson = configJson.getJSONObject(IP_POOL);
280 if (poolJson != null) {
281 String start = poolJson.getString(START);
282 String end = poolJson.getString(END);
283 builder.ipPool(new KubevirtIpPool(
284 IpAddress.valueOf(start), IpAddress.valueOf(end)));
285 }
286
287 builder.networkId(name).name(name).type(Type.valueOf(type))
288 .mtu(mtu).gatewayIp(IpAddress.valueOf(gatewayIp)).cidr(cidr);
289
290 return builder.build();
291 }
292 } catch (JSONException e) {
293 log.error("Failed to parse network attachment definition object");
294 }
295
296 return null;
297 }
298 }
299}