blob: 9e0ce3e0c95599a7c65df2338990cadf0e629240 [file] [log] [blame]
Jian Li0c656f02021-06-07 13:32:39 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import io.fabric8.kubernetes.client.KubernetesClient;
21import io.fabric8.kubernetes.client.Watcher;
22import io.fabric8.kubernetes.client.WatcherException;
23import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
24import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.LeadershipService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.core.ApplicationId;
28import org.onosproject.core.CoreService;
29import org.onosproject.kubevirtnetworking.api.KubevirtNetworkAdminService;
30import org.onosproject.kubevirtnetworking.api.KubevirtPort;
31import org.onosproject.kubevirtnetworking.api.KubevirtPortAdminService;
32import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
33import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
34import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
35import org.onosproject.kubevirtnode.api.KubevirtNode;
36import org.onosproject.kubevirtnode.api.KubevirtNodeService;
37import org.onosproject.mastership.MastershipService;
38import org.osgi.service.component.annotations.Activate;
39import org.osgi.service.component.annotations.Component;
40import org.osgi.service.component.annotations.Deactivate;
41import org.osgi.service.component.annotations.Reference;
42import org.osgi.service.component.annotations.ReferenceCardinality;
43import org.slf4j.Logger;
44
45import java.io.IOException;
46import java.util.Objects;
47import java.util.Set;
48import java.util.concurrent.ExecutorService;
49
50import static java.util.concurrent.Executors.newSingleThreadExecutor;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
53import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPorts;
54import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.k8sClient;
55import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.waitFor;
56import static org.slf4j.LoggerFactory.getLogger;
57
58/**
59 * Kubernetes VMI watcher used for feeding VMI information.
60 */
61@Component(immediate = true)
62public class KubevirtVmiWatcher {
63
64 private final Logger log = getLogger(getClass());
65
66 private static final String STATUS = "status";
67 private static final String NODE_NAME = "nodeName";
68 private static final String METADATA = "metadata";
69 private static final String NAME = "name";
70
71 @Reference(cardinality = ReferenceCardinality.MANDATORY)
72 protected CoreService coreService;
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY)
75 protected MastershipService mastershipService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY)
78 protected ClusterService clusterService;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected LeadershipService leadershipService;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected KubevirtNodeService nodeService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected KubevirtNetworkAdminService networkAdminService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected KubevirtPortAdminService portAdminService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected KubevirtApiConfigService configService;
94
95 private final ExecutorService eventExecutor = newSingleThreadExecutor(
96 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
97
98 private final InternalKubevirtVmiWatcher watcher = new InternalKubevirtVmiWatcher();
99 private final InternalKubevirtApiConfigListener
100 configListener = new InternalKubevirtApiConfigListener();
101
102 CustomResourceDefinitionContext vmiCrdCxt = new CustomResourceDefinitionContext
103 .Builder()
104 .withGroup("kubevirt.io")
105 .withScope("Namespaced")
106 .withVersion("v1")
107 .withPlural("virtualmachineinstances")
108 .build();
109
110 private ApplicationId appId;
111 private NodeId localNodeId;
112
113 @Activate
114 protected void activate() {
115 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
116 localNodeId = clusterService.getLocalNode().id();
117 leadershipService.runForLeadership(appId.name());
118 configService.addListener(configListener);
119
120 log.info("Started");
121 }
122
123 @Deactivate
124 protected void deactivate() {
125 configService.removeListener(configListener);
126 leadershipService.withdraw(appId.name());
127 eventExecutor.shutdown();
128
129 log.info("Stopped");
130 }
131
132 private void instantiateWatcher() {
133 KubernetesClient client = k8sClient(configService);
134
135 if (client != null) {
136 try {
137 client.customResource(vmiCrdCxt).watch(watcher);
138 } catch (IOException e) {
139 e.printStackTrace();
140 }
141 }
142 }
143
144 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
145
146 private boolean isRelevantHelper() {
147 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
148 }
149
150 @Override
151 public void event(KubevirtApiConfigEvent event) {
152
153 switch (event.type()) {
154 case KUBEVIRT_API_CONFIG_UPDATED:
155 eventExecutor.execute(this::processConfigUpdate);
156 break;
157 case KUBEVIRT_API_CONFIG_CREATED:
158 case KUBEVIRT_API_CONFIG_REMOVED:
159 default:
160 // do nothing
161 break;
162 }
163 }
164
165 private void processConfigUpdate() {
166 if (!isRelevantHelper()) {
167 return;
168 }
169
170 instantiateWatcher();
171 }
172 }
173
174 private class InternalKubevirtVmiWatcher implements Watcher<String> {
175
176 @Override
177 public void eventReceived(Action action, String s) {
178 switch (action) {
179 case ADDED:
180 case MODIFIED:
181 eventExecutor.execute(() -> processAddition(s));
182 break;
183 case ERROR:
184 log.warn("Failures processing VM manipulation.");
185 break;
186 default:
187 break;
188 }
189 }
190
191 @Override
192 public void onClose(WatcherException e) {
193 log.warn("VM watcher OnClose, re-instantiate the VM watcher...");
194 instantiateWatcher();
195 }
196
197 private void processAddition(String resource) {
198 if (!isMaster()) {
199 return;
200 }
201
202 String nodeName = parseNodeName(resource);
203 String vmiName = parseVmiName(resource);
204
205 if (nodeName == null) {
206 return;
207 }
208
209 KubevirtNode node = nodeService.node(nodeName);
210
211 if (node == null) {
212 log.warn("VMI {} scheduled on node {} is not ready, " +
213 "we wait for a while...", vmiName, nodeName);
214 waitFor(2);
215 }
216
217 Set<KubevirtPort> ports = getPorts(nodeService,
218 networkAdminService.networks(), resource);
219
220 if (ports.size() == 0) {
221 return;
222 }
223
224 ports.forEach(port -> {
225 KubevirtPort existing = portAdminService.port(port.macAddress());
226
227 if (existing != null) {
228 if (port.deviceId() != null && existing.deviceId() == null) {
229 KubevirtPort updated = existing.updateDeviceId(port.deviceId());
230 // internal we update device ID of kubevirt port
231 portAdminService.updatePort(updated);
232 }
233 }
234 });
235 }
236
237 private boolean isMaster() {
238 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
239 }
240
241 private String parseVmiName(String resource) {
242 String vmiName = null;
243
244 try {
245 ObjectMapper mapper = new ObjectMapper();
246 JsonNode json = mapper.readTree(resource);
247 JsonNode metadataJson = json.get(METADATA);
248 JsonNode vmiNameJson = metadataJson.get(NAME);
249 vmiName = vmiNameJson != null ? vmiNameJson.asText() : null;
250 } catch (IOException e) {
251 log.error("Failed to parse kubevirt VMI name");
252 }
253
254 return vmiName;
255 }
256
257 private String parseNodeName(String resource) {
258 String nodeName = null;
259 try {
260 ObjectMapper mapper = new ObjectMapper();
261 JsonNode json = mapper.readTree(resource);
262 JsonNode statusJson = json.get(STATUS);
263 JsonNode nodeNameJson = statusJson.get(NODE_NAME);
264 nodeName = nodeNameJson != null ? nodeNameJson.asText() : null;
265 } catch (IOException e) {
266 log.error("Failed to parse kubevirt VMI nodename");
267 }
268
269 return nodeName;
270 }
271 }
272}