blob: 7415395a97207c24a30baee4a7050bed3c6c14d1 [file] [log] [blame]
Jian Li1cee9882019-02-13 11:25:25 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snode.impl;
17
18import io.fabric8.kubernetes.api.model.Node;
19import io.fabric8.kubernetes.api.model.NodeAddress;
20import io.fabric8.kubernetes.client.KubernetesClient;
21import org.onlab.packet.IpAddress;
22import org.onosproject.cluster.ClusterService;
23import org.onosproject.cluster.LeadershipService;
24import org.onosproject.cluster.NodeId;
25import org.onosproject.core.ApplicationId;
26import org.onosproject.core.CoreService;
27import org.onosproject.k8snode.api.DefaultK8sNode;
28import org.onosproject.k8snode.api.K8sApiConfig;
29import org.onosproject.k8snode.api.K8sApiConfigAdminService;
30import org.onosproject.k8snode.api.K8sApiConfigEvent;
31import org.onosproject.k8snode.api.K8sApiConfigListener;
32import org.onosproject.k8snode.api.K8sNode;
33import org.onosproject.k8snode.api.K8sNodeAdminService;
34import org.osgi.service.component.annotations.Activate;
35import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.annotations.Deactivate;
37import org.osgi.service.component.annotations.Reference;
38import org.osgi.service.component.annotations.ReferenceCardinality;
39import org.slf4j.Logger;
40
41import java.util.Objects;
42import java.util.concurrent.ExecutorService;
43
44import static java.util.concurrent.Executors.newSingleThreadExecutor;
45import static org.onlab.util.Tools.groupedThreads;
46import static org.onosproject.k8snode.api.K8sNode.Type.MASTER;
47import static org.onosproject.k8snode.api.K8sNode.Type.MINION;
48import static org.onosproject.k8snode.api.K8sNodeService.APP_ID;
49import static org.onosproject.k8snode.api.K8sNodeState.INIT;
50import static org.onosproject.k8snode.util.K8sNodeUtil.k8sClient;
51import static org.slf4j.LoggerFactory.getLogger;
52
53/**
54 * Handles the state of kubernetes API server configuration.
55 */
56@Component(immediate = true)
57public class DefaultK8sApiConfigHandler {
58
59 private final Logger log = getLogger(getClass());
60
61 private static final String INTERNAL_IP = "InternalIP";
62 private static final String K8S_ROLE = "node-role.kubernetes.io";
63
64 @Reference(cardinality = ReferenceCardinality.MANDATORY)
65 protected CoreService coreService;
66
67 @Reference(cardinality = ReferenceCardinality.MANDATORY)
68 protected LeadershipService leadershipService;
69
70 @Reference(cardinality = ReferenceCardinality.MANDATORY)
71 protected ClusterService clusterService;
72
73 @Reference(cardinality = ReferenceCardinality.MANDATORY)
74 protected K8sApiConfigAdminService k8sApiConfigAdminService;
75
76 @Reference(cardinality = ReferenceCardinality.MANDATORY)
77 protected K8sNodeAdminService k8sNodeAdminService;
78
79 private final ExecutorService eventExecutor = newSingleThreadExecutor(
80 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
81
82 private final K8sApiConfigListener k8sApiConfigListener = new InternalK8sApiConfigListener();
83
84 private ApplicationId appId;
85 private NodeId localNode;
86
87 @Activate
88 protected void activate() {
89 appId = coreService.getAppId(APP_ID);
90 localNode = clusterService.getLocalNode().id();
91 leadershipService.runForLeadership(appId.name());
92 k8sApiConfigAdminService.addListener(k8sApiConfigListener);
93
94 log.info("Started");
95 }
96
97 @Deactivate
98 protected void deactivate() {
99 k8sApiConfigAdminService.removeListener(k8sApiConfigListener);
100 leadershipService.withdraw(appId.name());
101 eventExecutor.shutdown();
102
103 log.info("Stopped");
104 }
105
106 /**
107 * Checks the validity of the given kubernetes API server configuration.
108 *
109 * @param config kubernetes API server configuration
110 * @return validity result
111 */
112 private boolean checkApiServerConfig(K8sApiConfig config) {
113 KubernetesClient k8sClient = k8sClient(config);
114 return k8sClient != null && k8sClient.getApiVersion() != null;
115 }
116
117 private void bootstrapK8sNodes(K8sApiConfig config) {
118 KubernetesClient k8sClient = k8sClient(config);
119
120 if (k8sClient == null) {
121 log.warn("Failed to connect to kubernetes API server");
122 return;
123 }
124
125 k8sClient.nodes().list().getItems().forEach(n ->
126 k8sNodeAdminService.createNode(buildK8sNode(n))
127 );
128 }
129
130 private K8sNode buildK8sNode(Node node) {
131 String hostname = node.getMetadata().getName();
132 IpAddress managementIp = null;
133 IpAddress dataIp = null;
134
135 for (NodeAddress nodeAddress:node.getStatus().getAddresses()) {
136 // we need to consider assigning managementIp and dataIp differently
137 // FIXME: ExternalIp is not considered currently
138 if (nodeAddress.getType().equals(INTERNAL_IP)) {
139 managementIp = IpAddress.valueOf(nodeAddress.getAddress());
140 dataIp = IpAddress.valueOf(nodeAddress.getAddress());
141 }
142 }
143
144 String roleStr = node.getMetadata().getLabels().keySet().stream()
145 .filter(l -> l.contains(K8S_ROLE))
146 .findFirst().orElse(null);
147
148 K8sNode.Type nodeType = MASTER;
149
150 if (roleStr != null) {
151 String role = roleStr.split("/")[1];
152 if (MASTER.name().equalsIgnoreCase(role)) {
153 nodeType = MASTER;
154 } else {
155 nodeType = MINION;
156 }
157 }
158
159 return DefaultK8sNode.builder()
160 .hostname(hostname)
161 .managementIp(managementIp)
162 .dataIp(dataIp)
163 .type(nodeType) // need to get correct node type
164 .state(INIT)
165 .build();
166 }
167
168 /**
169 * An internal kubernetes API server config listener.
170 * The notification is triggered by K8sApiConfigStore.
171 */
172 private class InternalK8sApiConfigListener implements K8sApiConfigListener {
173
174 private boolean isRelevantHelper() {
175 return Objects.equals(localNode, leadershipService.getLeader(appId.name()));
176 }
177
178 @Override
179 public void event(K8sApiConfigEvent event) {
180
181 switch (event.type()) {
182 case K8S_API_CONFIG_CREATED:
183 eventExecutor.execute(() -> processConfigCreation(event.subject()));
184 break;
185 default:
186 break;
187 }
188 }
189
190 private void processConfigCreation(K8sApiConfig config) {
191 if (!isRelevantHelper()) {
192 return;
193 }
194
195 if (checkApiServerConfig(config)) {
196 K8sApiConfig newConfig = config.updateState(K8sApiConfig.State.CONNECTED);
197 k8sApiConfigAdminService.updateApiConfig(newConfig);
198 bootstrapK8sNodes(config);
199 }
200 }
201 }
202}