blob: 9a6d11076a6c893448be1877a61dd13a0403707f [file] [log] [blame]
Jian Li304dca42020-12-27 23:22:46 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
18import io.fabric8.kubernetes.api.model.Node;
19import io.fabric8.kubernetes.api.model.NodeAddress;
20import io.fabric8.kubernetes.client.KubernetesClient;
Jian Li9ca07f52021-01-19 23:42:55 +090021import org.json.JSONArray;
22import org.json.JSONException;
23import org.json.JSONObject;
Jian Li304dca42020-12-27 23:22:46 +090024import org.onlab.packet.IpAddress;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.kubevirtnode.api.DefaultKubevirtNode;
Jian Li138f51f2021-01-06 03:29:58 +090031import org.onosproject.kubevirtnode.api.DefaultKubevirtPhyInterface;
Jian Li304dca42020-12-27 23:22:46 +090032import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
33import org.onosproject.kubevirtnode.api.KubevirtApiConfigAdminService;
34import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
35import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
36import org.onosproject.kubevirtnode.api.KubevirtNode;
37import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
38import org.onosproject.kubevirtnode.api.KubevirtNodeState;
Jian Li138f51f2021-01-06 03:29:58 +090039import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
Jian Li304dca42020-12-27 23:22:46 +090040import org.osgi.service.component.annotations.Activate;
41import org.osgi.service.component.annotations.Component;
42import org.osgi.service.component.annotations.Deactivate;
43import org.osgi.service.component.annotations.Reference;
44import org.osgi.service.component.annotations.ReferenceCardinality;
45import org.slf4j.Logger;
46
Jian Li138f51f2021-01-06 03:29:58 +090047import java.util.HashSet;
48import java.util.Map;
Jian Li304dca42020-12-27 23:22:46 +090049import java.util.Objects;
Jian Li138f51f2021-01-06 03:29:58 +090050import java.util.Set;
Jian Li304dca42020-12-27 23:22:46 +090051import java.util.concurrent.ExecutorService;
52
53import static java.util.concurrent.Executors.newSingleThreadExecutor;
54import static org.onlab.util.Tools.groupedThreads;
55import static org.onosproject.kubevirtnode.api.KubevirtApiConfig.State.CONNECTED;
56import static org.onosproject.kubevirtnode.api.KubevirtApiConfigService.APP_ID;
57import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
58import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
59import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
60import static org.slf4j.LoggerFactory.getLogger;
61
62/**
63 * Handles the state of KubeVirt API server configuration.
64 */
65@Component(immediate = true)
66public class DefaultKubevirtApiConfigHandler {
67
68 private final Logger log = getLogger(getClass());
69
70 private static final String INTERNAL_IP = "InternalIP";
71 private static final String K8S_ROLE = "node-role.kubernetes.io";
Jian Li9ca07f52021-01-19 23:42:55 +090072 private static final String PHYSNET_CONFIG_KEY = "physnet-config";
73 private static final String NETWORK_KEY = "network";
74 private static final String INTERFACE_KEY = "interface";
Jian Li304dca42020-12-27 23:22:46 +090075
76 private static final long SLEEP_MS = 10000; // we wait 10s
77
78 @Reference(cardinality = ReferenceCardinality.MANDATORY)
79 protected CoreService coreService;
80
81 @Reference(cardinality = ReferenceCardinality.MANDATORY)
82 protected LeadershipService leadershipService;
83
84 @Reference(cardinality = ReferenceCardinality.MANDATORY)
85 protected ClusterService clusterService;
86
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
88 protected KubevirtApiConfigAdminService configAdminService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected KubevirtNodeAdminService nodeAdminService;
92
93 private final ExecutorService eventExecutor = newSingleThreadExecutor(
94 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
95
96 private final KubevirtApiConfigListener configListener = new InternalKubevirtApiConfigListener();
97
98 private ApplicationId appId;
99 private NodeId localNode;
100
101 @Activate
102 protected void activate() {
103 appId = coreService.getAppId(APP_ID);
104 localNode = clusterService.getLocalNode().id();
105 leadershipService.runForLeadership(appId.name());
106 configAdminService.addListener(configListener);
107
108 log.info("Started");
109 }
110
111 @Deactivate
112 protected void deactivate() {
113 configAdminService.removeListener(configListener);
114 leadershipService.withdraw(appId.name());
115 eventExecutor.shutdown();
116
117 log.info("Stopped");
118 }
119
120 /**
121 * Checks the validity of the given kubernetes API server configuration.
122 *
123 * @param config kubernetes API server configuration
124 * @return validity result
125 */
126 private boolean checkApiServerConfig(KubevirtApiConfig config) {
127 KubernetesClient k8sClient = k8sClient(config);
128 return k8sClient != null && k8sClient.getApiVersion() != null;
129 }
130
131 private void bootstrapKubevirtNodes(KubevirtApiConfig config) {
132 KubernetesClient k8sClient = k8sClient(config);
133
134 if (k8sClient == null) {
135 log.warn("Failed to connect to kubernetes API server");
136 return;
137 }
138
139 for (Node node : k8sClient.nodes().list().getItems()) {
140 KubevirtNode kubevirtNode = buildKubevirtNode(node);
141 nodeAdminService.createNode(kubevirtNode);
142 }
143 }
144
145 private KubevirtNode buildKubevirtNode(Node node) {
146 String hostname = node.getMetadata().getName();
147 IpAddress managementIp = null;
148 IpAddress dataIp = null;
149
150 for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
151 if (nodeAddress.getType().equals(INTERNAL_IP)) {
152 managementIp = IpAddress.valueOf(nodeAddress.getAddress());
153 dataIp = IpAddress.valueOf(nodeAddress.getAddress());
154 }
155 }
156
157 String roleStr = node.getMetadata().getLabels().keySet().stream()
158 .filter(l -> l.contains(K8S_ROLE))
159 .findFirst().orElse(null);
160
161 KubevirtNode.Type nodeType = WORKER;
162 if (roleStr != null) {
163 String role = roleStr.split("/")[1];
164 if (MASTER.name().equalsIgnoreCase(role)) {
165 nodeType = MASTER;
166 } else {
167 nodeType = WORKER;
168 }
169 }
170
Jian Li138f51f2021-01-06 03:29:58 +0900171 // start to parse kubernetes annotation
172 Map<String, String> annots = node.getMetadata().getAnnotations();
Jian Li9ca07f52021-01-19 23:42:55 +0900173 String physnetConfig = annots.get(PHYSNET_CONFIG_KEY);
Jian Li138f51f2021-01-06 03:29:58 +0900174 Set<KubevirtPhyInterface> phys = new HashSet<>();
Jian Li9ca07f52021-01-19 23:42:55 +0900175 try {
176 if (physnetConfig != null) {
177 JSONArray configJson = new JSONArray(physnetConfig);
178
179 for (int i = 0; i < configJson.length(); i++) {
180 JSONObject object = configJson.getJSONObject(i);
181 String network = object.getString(NETWORK_KEY);
182 String intf = object.getString(INTERFACE_KEY);
183
184 if (network != null && intf != null) {
185 phys.add(DefaultKubevirtPhyInterface.builder()
186 .network(network).intf(intf).build());
187 }
188
189 }
190 }
191 } catch (JSONException e) {
192 log.error("Failed to parse network status object", e);
Jian Li138f51f2021-01-06 03:29:58 +0900193 }
194
Jian Li304dca42020-12-27 23:22:46 +0900195 return DefaultKubevirtNode.builder()
196 .hostname(hostname)
197 .managementIp(managementIp)
198 .dataIp(dataIp)
199 .type(nodeType)
200 .state(KubevirtNodeState.ON_BOARDED)
Jian Li138f51f2021-01-06 03:29:58 +0900201 .phyIntfs(phys)
Jian Li304dca42020-12-27 23:22:46 +0900202 .build();
203 }
204
205 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
206
207 private boolean isRelevantHelper() {
208 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
209 }
210
211 @Override
212 public void event(KubevirtApiConfigEvent event) {
213 switch (event.type()) {
214 case KUBEVIRT_API_CONFIG_CREATED:
215 eventExecutor.execute(() -> processConfigCreation(event.subject()));
216 break;
217 default:
218 break;
219 }
220 }
221
222 private void processConfigCreation(KubevirtApiConfig config) {
223 if (!isRelevantHelper()) {
224 return;
225 }
226
227 if (checkApiServerConfig(config)) {
228 KubevirtApiConfig newConfig = config.updateState(CONNECTED);
229 configAdminService.updateApiConfig(newConfig);
230
231 bootstrapKubevirtNodes(config);
232 }
233 }
234 }
235}