blob: dc3946db45116c58ef558b02e5af629a421247be [file] [log] [blame]
Jian Li304dca42020-12-27 23:22:46 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
18import io.fabric8.kubernetes.api.model.Node;
19import io.fabric8.kubernetes.api.model.NodeAddress;
20import io.fabric8.kubernetes.client.KubernetesClient;
Jian Li9ca07f52021-01-19 23:42:55 +090021import org.json.JSONArray;
22import org.json.JSONException;
23import org.json.JSONObject;
Jian Li304dca42020-12-27 23:22:46 +090024import org.onlab.packet.IpAddress;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.kubevirtnode.api.DefaultKubevirtNode;
Jian Li138f51f2021-01-06 03:29:58 +090031import org.onosproject.kubevirtnode.api.DefaultKubevirtPhyInterface;
Jian Li304dca42020-12-27 23:22:46 +090032import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
33import org.onosproject.kubevirtnode.api.KubevirtApiConfigAdminService;
34import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
35import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
36import org.onosproject.kubevirtnode.api.KubevirtNode;
37import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
38import org.onosproject.kubevirtnode.api.KubevirtNodeState;
Jian Li138f51f2021-01-06 03:29:58 +090039import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
Jian Li304dca42020-12-27 23:22:46 +090040import org.osgi.service.component.annotations.Activate;
41import org.osgi.service.component.annotations.Component;
42import org.osgi.service.component.annotations.Deactivate;
43import org.osgi.service.component.annotations.Reference;
44import org.osgi.service.component.annotations.ReferenceCardinality;
45import org.slf4j.Logger;
46
Jian Li138f51f2021-01-06 03:29:58 +090047import java.util.HashSet;
48import java.util.Map;
Jian Li304dca42020-12-27 23:22:46 +090049import java.util.Objects;
Jian Li138f51f2021-01-06 03:29:58 +090050import java.util.Set;
Jian Li304dca42020-12-27 23:22:46 +090051import java.util.concurrent.ExecutorService;
52
53import static java.util.concurrent.Executors.newSingleThreadExecutor;
54import static org.onlab.util.Tools.groupedThreads;
55import static org.onosproject.kubevirtnode.api.KubevirtApiConfig.State.CONNECTED;
56import static org.onosproject.kubevirtnode.api.KubevirtApiConfigService.APP_ID;
57import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
58import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
59import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
60import static org.slf4j.LoggerFactory.getLogger;
61
62/**
63 * Handles the state of KubeVirt API server configuration.
64 */
65@Component(immediate = true)
66public class DefaultKubevirtApiConfigHandler {
67
68 private final Logger log = getLogger(getClass());
69
70 private static final String INTERNAL_IP = "InternalIP";
71 private static final String K8S_ROLE = "node-role.kubernetes.io";
Jian Li9ca07f52021-01-19 23:42:55 +090072 private static final String PHYSNET_CONFIG_KEY = "physnet-config";
73 private static final String NETWORK_KEY = "network";
74 private static final String INTERFACE_KEY = "interface";
Jian Li304dca42020-12-27 23:22:46 +090075
76 private static final long SLEEP_MS = 10000; // we wait 10s
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected CoreService coreService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY)
82 protected LeadershipService leadershipService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 protected ClusterService clusterService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 protected KubevirtApiConfigAdminService configAdminService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected KubevirtNodeAdminService nodeAdminService;
92
93 private final ExecutorService eventExecutor = newSingleThreadExecutor(
94 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
95
96 private final KubevirtApiConfigListener configListener = new InternalKubevirtApiConfigListener();
97
98 private ApplicationId appId;
99 private NodeId localNode;
100
101 @Activate
102 protected void activate() {
103 appId = coreService.getAppId(APP_ID);
104 localNode = clusterService.getLocalNode().id();
105 leadershipService.runForLeadership(appId.name());
106 configAdminService.addListener(configListener);
107
108 log.info("Started");
109 }
110
111 @Deactivate
112 protected void deactivate() {
113 configAdminService.removeListener(configListener);
114 leadershipService.withdraw(appId.name());
115 eventExecutor.shutdown();
116
117 log.info("Stopped");
118 }
119
120 /**
121 * Checks the validity of the given kubernetes API server configuration.
122 *
123 * @param config kubernetes API server configuration
124 * @return validity result
125 */
126 private boolean checkApiServerConfig(KubevirtApiConfig config) {
127 KubernetesClient k8sClient = k8sClient(config);
128 return k8sClient != null && k8sClient.getApiVersion() != null;
129 }
130
131 private void bootstrapKubevirtNodes(KubevirtApiConfig config) {
132 KubernetesClient k8sClient = k8sClient(config);
133
134 if (k8sClient == null) {
135 log.warn("Failed to connect to kubernetes API server");
136 return;
137 }
138
139 for (Node node : k8sClient.nodes().list().getItems()) {
140 KubevirtNode kubevirtNode = buildKubevirtNode(node);
Jian Li2b35ec72021-01-21 16:45:02 +0900141 // we always provision VMs to worker nodes, so only need to install
142 // flow rules in worker nodes
143 if (kubevirtNode.type() == WORKER) {
144 nodeAdminService.createNode(kubevirtNode);
145 }
Jian Li304dca42020-12-27 23:22:46 +0900146 }
147 }
148
149 private KubevirtNode buildKubevirtNode(Node node) {
150 String hostname = node.getMetadata().getName();
151 IpAddress managementIp = null;
152 IpAddress dataIp = null;
153
154 for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
155 if (nodeAddress.getType().equals(INTERNAL_IP)) {
156 managementIp = IpAddress.valueOf(nodeAddress.getAddress());
157 dataIp = IpAddress.valueOf(nodeAddress.getAddress());
158 }
159 }
160
161 String roleStr = node.getMetadata().getLabels().keySet().stream()
162 .filter(l -> l.contains(K8S_ROLE))
163 .findFirst().orElse(null);
164
165 KubevirtNode.Type nodeType = WORKER;
166 if (roleStr != null) {
167 String role = roleStr.split("/")[1];
168 if (MASTER.name().equalsIgnoreCase(role)) {
169 nodeType = MASTER;
170 } else {
171 nodeType = WORKER;
172 }
173 }
174
Jian Li138f51f2021-01-06 03:29:58 +0900175 // start to parse kubernetes annotation
176 Map<String, String> annots = node.getMetadata().getAnnotations();
Jian Li9ca07f52021-01-19 23:42:55 +0900177 String physnetConfig = annots.get(PHYSNET_CONFIG_KEY);
Jian Li138f51f2021-01-06 03:29:58 +0900178 Set<KubevirtPhyInterface> phys = new HashSet<>();
Jian Li9ca07f52021-01-19 23:42:55 +0900179 try {
180 if (physnetConfig != null) {
181 JSONArray configJson = new JSONArray(physnetConfig);
182
183 for (int i = 0; i < configJson.length(); i++) {
184 JSONObject object = configJson.getJSONObject(i);
185 String network = object.getString(NETWORK_KEY);
186 String intf = object.getString(INTERFACE_KEY);
187
188 if (network != null && intf != null) {
189 phys.add(DefaultKubevirtPhyInterface.builder()
190 .network(network).intf(intf).build());
191 }
192
193 }
194 }
195 } catch (JSONException e) {
196 log.error("Failed to parse network status object", e);
Jian Li138f51f2021-01-06 03:29:58 +0900197 }
198
Jian Li304dca42020-12-27 23:22:46 +0900199 return DefaultKubevirtNode.builder()
200 .hostname(hostname)
201 .managementIp(managementIp)
202 .dataIp(dataIp)
203 .type(nodeType)
204 .state(KubevirtNodeState.ON_BOARDED)
Jian Li138f51f2021-01-06 03:29:58 +0900205 .phyIntfs(phys)
Jian Li304dca42020-12-27 23:22:46 +0900206 .build();
207 }
208
209 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
210
211 private boolean isRelevantHelper() {
212 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
213 }
214
215 @Override
216 public void event(KubevirtApiConfigEvent event) {
217 switch (event.type()) {
218 case KUBEVIRT_API_CONFIG_CREATED:
219 eventExecutor.execute(() -> processConfigCreation(event.subject()));
220 break;
221 default:
222 break;
223 }
224 }
225
226 private void processConfigCreation(KubevirtApiConfig config) {
227 if (!isRelevantHelper()) {
228 return;
229 }
230
231 if (checkApiServerConfig(config)) {
232 KubevirtApiConfig newConfig = config.updateState(CONNECTED);
233 configAdminService.updateApiConfig(newConfig);
234
235 bootstrapKubevirtNodes(config);
236 }
237 }
238 }
239}