blob: 33415a895715f49da3133a9e2c12979bb780e0b5 [file] [log] [blame]
Jian Li1cee9882019-02-13 11:25:25 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import io.fabric8.kubernetes.api.model.Node;
19import io.fabric8.kubernetes.api.model.NodeAddress;
20import io.fabric8.kubernetes.client.KubernetesClient;
21import org.onlab.packet.IpAddress;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.k8snode.api.DefaultK8sNode;
Jian Lie2a04ce2020-07-01 19:07:02 +090028import org.onosproject.k8snode.api.ExternalNetworkService;
29import org.onosproject.k8snode.api.HostNodesInfo;
Jian Li1cee9882019-02-13 11:25:25 +090030import org.onosproject.k8snode.api.K8sApiConfig;
31import org.onosproject.k8snode.api.K8sApiConfigAdminService;
32import org.onosproject.k8snode.api.K8sApiConfigEvent;
33import org.onosproject.k8snode.api.K8sApiConfigListener;
34import org.onosproject.k8snode.api.K8sNode;
35import org.onosproject.k8snode.api.K8sNodeAdminService;
36import org.osgi.service.component.annotations.Activate;
37import org.osgi.service.component.annotations.Component;
38import org.osgi.service.component.annotations.Deactivate;
39import org.osgi.service.component.annotations.Reference;
40import org.osgi.service.component.annotations.ReferenceCardinality;
41import org.slf4j.Logger;
42
Jian Li0c632722019-05-08 15:58:04 +090043import java.util.Map;
Jian Li1cee9882019-02-13 11:25:25 +090044import java.util.Objects;
45import java.util.concurrent.ExecutorService;
46
47import static java.util.concurrent.Executors.newSingleThreadExecutor;
48import static org.onlab.util.Tools.groupedThreads;
Jian Lie2a04ce2020-07-01 19:07:02 +090049import static org.onosproject.k8snode.api.Constants.DEFAULT_CLUSTER_NAME;
50import static org.onosproject.k8snode.api.Constants.EXTERNAL_TO_ROUTER;
51import static org.onosproject.k8snode.api.K8sApiConfig.Mode.PASSTHROUGH;
Jian Li1cee9882019-02-13 11:25:25 +090052import static org.onosproject.k8snode.api.K8sNode.Type.MASTER;
53import static org.onosproject.k8snode.api.K8sNode.Type.MINION;
54import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
Jian Li0ee8d0e2019-12-18 11:35:05 +090055import static org.onosproject.k8snode.api.K8sNodeState.PRE_ON_BOARD;
Jian Li1cee9882019-02-13 11:25:25 +090056import static org.onosproject.k8snode.util.K8sNodeUtil.k8sClient;
57import static org.slf4j.LoggerFactory.getLogger;
58
59/**
60 * Handles the state of kubernetes API server configuration.
61 */
62@Component(immediate = true)
63public class DefaultK8sApiConfigHandler {
64
65 private final Logger log = getLogger(getClass());
66
67 private static final String INTERNAL_IP = "InternalIP";
68 private static final String K8S_ROLE = "node-role.kubernetes.io";
Jian Li0c632722019-05-08 15:58:04 +090069 private static final String EXT_BRIDGE_IP = "external.bridge.ip";
70 private static final String EXT_GATEWAY_IP = "external.gateway.ip";
71 private static final String EXT_INTF_NAME = "external.interface.name";
Jian Li1cee9882019-02-13 11:25:25 +090072
Jian Lie2a04ce2020-07-01 19:07:02 +090073 private static final String DEFAULT_GATEWAY_IP = "127.0.0.1";
74 private static final String DEFAULT_BRIDGE_IP = "127.0.0.1";
75
Jian Li1cee9882019-02-13 11:25:25 +090076 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected CoreService coreService;
78
79 @Reference(cardinality = ReferenceCardinality.MANDATORY)
80 protected LeadershipService leadershipService;
81
82 @Reference(cardinality = ReferenceCardinality.MANDATORY)
83 protected ClusterService clusterService;
84
85 @Reference(cardinality = ReferenceCardinality.MANDATORY)
86 protected K8sApiConfigAdminService k8sApiConfigAdminService;
87
88 @Reference(cardinality = ReferenceCardinality.MANDATORY)
89 protected K8sNodeAdminService k8sNodeAdminService;
90
Jian Lie2a04ce2020-07-01 19:07:02 +090091 @Reference(cardinality = ReferenceCardinality.MANDATORY)
92 protected ExternalNetworkService extNetworkService;
93
Jian Li1cee9882019-02-13 11:25:25 +090094 private final ExecutorService eventExecutor = newSingleThreadExecutor(
95 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
96
97 private final K8sApiConfigListener k8sApiConfigListener = new InternalK8sApiConfigListener();
98
99 private ApplicationId appId;
100 private NodeId localNode;
101
102 @Activate
103 protected void activate() {
104 appId = coreService.getAppId(APP_ID);
105 localNode = clusterService.getLocalNode().id();
106 leadershipService.runForLeadership(appId.name());
107 k8sApiConfigAdminService.addListener(k8sApiConfigListener);
108
109 log.info("Started");
110 }
111
112 @Deactivate
113 protected void deactivate() {
114 k8sApiConfigAdminService.removeListener(k8sApiConfigListener);
115 leadershipService.withdraw(appId.name());
116 eventExecutor.shutdown();
117
118 log.info("Stopped");
119 }
120
121 /**
122 * Checks the validity of the given kubernetes API server configuration.
123 *
124 * @param config kubernetes API server configuration
125 * @return validity result
126 */
127 private boolean checkApiServerConfig(K8sApiConfig config) {
128 KubernetesClient k8sClient = k8sClient(config);
129 return k8sClient != null && k8sClient.getApiVersion() != null;
130 }
131
132 private void bootstrapK8sNodes(K8sApiConfig config) {
133 KubernetesClient k8sClient = k8sClient(config);
134
135 if (k8sClient == null) {
136 log.warn("Failed to connect to kubernetes API server");
137 return;
138 }
139
140 k8sClient.nodes().list().getItems().forEach(n ->
Jian Lie2a04ce2020-07-01 19:07:02 +0900141 k8sNodeAdminService.createNode(buildK8sNode(n, config))
Jian Li1cee9882019-02-13 11:25:25 +0900142 );
143 }
144
Jian Lie2a04ce2020-07-01 19:07:02 +0900145 private K8sNode buildK8sNode(Node node, K8sApiConfig config) {
Jian Li1cee9882019-02-13 11:25:25 +0900146 String hostname = node.getMetadata().getName();
147 IpAddress managementIp = null;
148 IpAddress dataIp = null;
149
Jian Lie2a04ce2020-07-01 19:07:02 +0900150 // pass-through mode: we use host IP as the management and data IP
151 // normal mode: we use K8S node's internal IP as the management and data IP
152 if (config.mode() == PASSTHROUGH) {
153 HostNodesInfo info = config.infos().stream().filter(h -> h.nodes()
154 .contains(hostname)).findAny().orElse(null);
155 if (info == null) {
156 log.error("None of the nodes were found in the host nodes info mapping list");
157 } else {
158 managementIp = info.hostIp();
159 dataIp = info.hostIp();
160 }
161 } else {
162 for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
163 if (nodeAddress.getType().equals(INTERNAL_IP)) {
164 managementIp = IpAddress.valueOf(nodeAddress.getAddress());
165 dataIp = IpAddress.valueOf(nodeAddress.getAddress());
166 }
Jian Li1cee9882019-02-13 11:25:25 +0900167 }
168 }
169
170 String roleStr = node.getMetadata().getLabels().keySet().stream()
171 .filter(l -> l.contains(K8S_ROLE))
172 .findFirst().orElse(null);
173
Jian Li8a988042019-05-03 23:41:19 +0900174 K8sNode.Type nodeType = MINION;
Jian Li1cee9882019-02-13 11:25:25 +0900175
176 if (roleStr != null) {
177 String role = roleStr.split("/")[1];
178 if (MASTER.name().equalsIgnoreCase(role)) {
179 nodeType = MASTER;
180 } else {
181 nodeType = MINION;
182 }
183 }
184
Jian Li0c632722019-05-08 15:58:04 +0900185 Map<String, String> annots = node.getMetadata().getAnnotations();
186
Jian Lie2a04ce2020-07-01 19:07:02 +0900187 String extIntf = "";
188 String extGatewayIpStr = DEFAULT_GATEWAY_IP;
189 String extBridgeIpStr = DEFAULT_BRIDGE_IP;
190
191 if (config.mode() == PASSTHROUGH) {
192 extNetworkService.registerNetwork(config.extNetworkCidr());
193 extIntf = EXTERNAL_TO_ROUTER + "-" + config.clusterShortName();
194 IpAddress gatewayIp = extNetworkService.getGatewayIp(config.extNetworkCidr());
195 IpAddress bridgeIp = extNetworkService.allocateIp(config.extNetworkCidr());
196 if (gatewayIp != null) {
197 extGatewayIpStr = gatewayIp.toString();
198 }
199 if (bridgeIp != null) {
200 extBridgeIpStr = bridgeIp.toString();
201 }
202 } else {
203 extIntf = annots.get(EXT_INTF_NAME);
204 extGatewayIpStr = annots.get(EXT_GATEWAY_IP);
205 extBridgeIpStr = annots.get(EXT_BRIDGE_IP);
206 }
Jian Li0c632722019-05-08 15:58:04 +0900207
Jian Li1cee9882019-02-13 11:25:25 +0900208 return DefaultK8sNode.builder()
Jian Lie2a04ce2020-07-01 19:07:02 +0900209 .clusterName(DEFAULT_CLUSTER_NAME)
Jian Li1cee9882019-02-13 11:25:25 +0900210 .hostname(hostname)
211 .managementIp(managementIp)
212 .dataIp(dataIp)
Jian Li0c632722019-05-08 15:58:04 +0900213 .extIntf(extIntf)
214 .type(nodeType)
Jian Lie2a04ce2020-07-01 19:07:02 +0900215 .segmentId(config.segmentId())
Jian Li0ee8d0e2019-12-18 11:35:05 +0900216 .state(PRE_ON_BOARD)
Jian Lie2a04ce2020-07-01 19:07:02 +0900217 .mode(config.mode())
Jian Li0c632722019-05-08 15:58:04 +0900218 .extBridgeIp(IpAddress.valueOf(extBridgeIpStr))
219 .extGatewayIp(IpAddress.valueOf(extGatewayIpStr))
220 .podCidr(node.getSpec().getPodCIDR())
Jian Li1cee9882019-02-13 11:25:25 +0900221 .build();
222 }
223
224 /**
225 * An internal kubernetes API server config listener.
226 * The notification is triggered by K8sApiConfigStore.
227 */
228 private class InternalK8sApiConfigListener implements K8sApiConfigListener {
229
230 private boolean isRelevantHelper() {
231 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
232 }
233
234 @Override
235 public void event(K8sApiConfigEvent event) {
236
237 switch (event.type()) {
238 case K8S_API_CONFIG_CREATED:
239 eventExecutor.execute(() -> processConfigCreation(event.subject()));
240 break;
241 default:
242 break;
243 }
244 }
245
246 private void processConfigCreation(K8sApiConfig config) {
247 if (!isRelevantHelper()) {
248 return;
249 }
250
251 if (checkApiServerConfig(config)) {
252 K8sApiConfig newConfig = config.updateState(K8sApiConfig.State.CONNECTED);
253 k8sApiConfigAdminService.updateApiConfig(newConfig);
Jian Lie2a04ce2020-07-01 19:07:02 +0900254
Jian Li1cee9882019-02-13 11:25:25 +0900255 bootstrapK8sNodes(config);
256 }
257 }
258 }
259}