blob: cfd95b6b61f0411ba0b2f91a9c9d0b81db93414e [file] [log] [blame]
Jian Liaaf44b52020-12-27 23:22:46 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnode.impl;
17
18import io.fabric8.kubernetes.api.model.Node;
19import io.fabric8.kubernetes.api.model.NodeAddress;
20import io.fabric8.kubernetes.client.KubernetesClient;
21import org.onlab.packet.IpAddress;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.kubevirtnode.api.DefaultKubevirtNode;
Jian Li4fe40e52021-01-06 03:29:58 +090028import org.onosproject.kubevirtnode.api.DefaultKubevirtPhyInterface;
Jian Liaaf44b52020-12-27 23:22:46 +090029import org.onosproject.kubevirtnode.api.KubevirtApiConfig;
30import org.onosproject.kubevirtnode.api.KubevirtApiConfigAdminService;
31import org.onosproject.kubevirtnode.api.KubevirtApiConfigEvent;
32import org.onosproject.kubevirtnode.api.KubevirtApiConfigListener;
33import org.onosproject.kubevirtnode.api.KubevirtNode;
34import org.onosproject.kubevirtnode.api.KubevirtNodeAdminService;
35import org.onosproject.kubevirtnode.api.KubevirtNodeState;
Jian Li4fe40e52021-01-06 03:29:58 +090036import org.onosproject.kubevirtnode.api.KubevirtPhyInterface;
Jian Liaaf44b52020-12-27 23:22:46 +090037import org.osgi.service.component.annotations.Activate;
38import org.osgi.service.component.annotations.Component;
39import org.osgi.service.component.annotations.Deactivate;
40import org.osgi.service.component.annotations.Reference;
41import org.osgi.service.component.annotations.ReferenceCardinality;
42import org.slf4j.Logger;
43
Jian Li4fe40e52021-01-06 03:29:58 +090044import java.util.HashSet;
45import java.util.Map;
Jian Liaaf44b52020-12-27 23:22:46 +090046import java.util.Objects;
Jian Li4fe40e52021-01-06 03:29:58 +090047import java.util.Set;
Jian Liaaf44b52020-12-27 23:22:46 +090048import java.util.concurrent.ExecutorService;
49
50import static java.util.concurrent.Executors.newSingleThreadExecutor;
51import static org.onlab.util.Tools.groupedThreads;
52import static org.onosproject.kubevirtnode.api.KubevirtApiConfig.State.CONNECTED;
53import static org.onosproject.kubevirtnode.api.KubevirtApiConfigService.APP_ID;
54import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.MASTER;
55import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
56import static org.onosproject.kubevirtnode.util.KubevirtNodeUtil.k8sClient;
57import static org.slf4j.LoggerFactory.getLogger;
58
59/**
60 * Handles the state of KubeVirt API server configuration.
61 */
62@Component(immediate = true)
63public class DefaultKubevirtApiConfigHandler {
64
65 private final Logger log = getLogger(getClass());
66
67 private static final String INTERNAL_IP = "InternalIP";
68 private static final String K8S_ROLE = "node-role.kubernetes.io";
Jian Li4fe40e52021-01-06 03:29:58 +090069 private static final String DEFAULT_PHY_NETWORK = "physical.network";
70 private static final String DEFAULT_PHY_INTERFACE = "physical.interface";
Jian Liaaf44b52020-12-27 23:22:46 +090071
72 private static final long SLEEP_MS = 10000; // we wait 10s
73
74 @Reference(cardinality = ReferenceCardinality.MANDATORY)
75 protected CoreService coreService;
76
77 @Reference(cardinality = ReferenceCardinality.MANDATORY)
78 protected LeadershipService leadershipService;
79
80 @Reference(cardinality = ReferenceCardinality.MANDATORY)
81 protected ClusterService clusterService;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected KubevirtApiConfigAdminService configAdminService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected KubevirtNodeAdminService nodeAdminService;
88
89 private final ExecutorService eventExecutor = newSingleThreadExecutor(
90 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
91
92 private final KubevirtApiConfigListener configListener = new InternalKubevirtApiConfigListener();
93
94 private ApplicationId appId;
95 private NodeId localNode;
96
97 @Activate
98 protected void activate() {
99 appId = coreService.getAppId(APP_ID);
100 localNode = clusterService.getLocalNode().id();
101 leadershipService.runForLeadership(appId.name());
102 configAdminService.addListener(configListener);
103
104 log.info("Started");
105 }
106
107 @Deactivate
108 protected void deactivate() {
109 configAdminService.removeListener(configListener);
110 leadershipService.withdraw(appId.name());
111 eventExecutor.shutdown();
112
113 log.info("Stopped");
114 }
115
116 /**
117 * Checks the validity of the given kubernetes API server configuration.
118 *
119 * @param config kubernetes API server configuration
120 * @return validity result
121 */
122 private boolean checkApiServerConfig(KubevirtApiConfig config) {
123 KubernetesClient k8sClient = k8sClient(config);
124 return k8sClient != null && k8sClient.getApiVersion() != null;
125 }
126
127 private void bootstrapKubevirtNodes(KubevirtApiConfig config) {
128 KubernetesClient k8sClient = k8sClient(config);
129
130 if (k8sClient == null) {
131 log.warn("Failed to connect to kubernetes API server");
132 return;
133 }
134
135 for (Node node : k8sClient.nodes().list().getItems()) {
136 KubevirtNode kubevirtNode = buildKubevirtNode(node);
137 nodeAdminService.createNode(kubevirtNode);
138 }
139 }
140
141 private KubevirtNode buildKubevirtNode(Node node) {
142 String hostname = node.getMetadata().getName();
143 IpAddress managementIp = null;
144 IpAddress dataIp = null;
145
146 for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
147 if (nodeAddress.getType().equals(INTERNAL_IP)) {
148 managementIp = IpAddress.valueOf(nodeAddress.getAddress());
149 dataIp = IpAddress.valueOf(nodeAddress.getAddress());
150 }
151 }
152
153 String roleStr = node.getMetadata().getLabels().keySet().stream()
154 .filter(l -> l.contains(K8S_ROLE))
155 .findFirst().orElse(null);
156
157 KubevirtNode.Type nodeType = WORKER;
158 if (roleStr != null) {
159 String role = roleStr.split("/")[1];
160 if (MASTER.name().equalsIgnoreCase(role)) {
161 nodeType = MASTER;
162 } else {
163 nodeType = WORKER;
164 }
165 }
166
Jian Li4fe40e52021-01-06 03:29:58 +0900167 // start to parse kubernetes annotation
168 Map<String, String> annots = node.getMetadata().getAnnotations();
169 String physnet = annots.get(DEFAULT_PHY_NETWORK);
170 String physintf = annots.get(DEFAULT_PHY_INTERFACE);
171
172 Set<KubevirtPhyInterface> phys = new HashSet<>();
173 if (physnet != null && physintf != null) {
174 phys.add(DefaultKubevirtPhyInterface.builder().network(physnet).intf(physintf).build());
175 }
176
Jian Liaaf44b52020-12-27 23:22:46 +0900177 return DefaultKubevirtNode.builder()
178 .hostname(hostname)
179 .managementIp(managementIp)
180 .dataIp(dataIp)
181 .type(nodeType)
182 .state(KubevirtNodeState.ON_BOARDED)
Jian Li4fe40e52021-01-06 03:29:58 +0900183 .phyIntfs(phys)
Jian Liaaf44b52020-12-27 23:22:46 +0900184 .build();
185 }
186
187 private class InternalKubevirtApiConfigListener implements KubevirtApiConfigListener {
188
189 private boolean isRelevantHelper() {
190 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
191 }
192
193 @Override
194 public void event(KubevirtApiConfigEvent event) {
195 switch (event.type()) {
196 case KUBEVIRT_API_CONFIG_CREATED:
197 eventExecutor.execute(() -> processConfigCreation(event.subject()));
198 break;
199 default:
200 break;
201 }
202 }
203
204 private void processConfigCreation(KubevirtApiConfig config) {
205 if (!isRelevantHelper()) {
206 return;
207 }
208
209 if (checkApiServerConfig(config)) {
210 KubevirtApiConfig newConfig = config.updateState(CONNECTED);
211 configAdminService.updateApiConfig(newConfig);
212
213 bootstrapKubevirtNodes(config);
214 }
215 }
216 }
217}