blob: bd517676757de8cbecc8dd04cefef4b3e110f7c6 [file] [log] [blame]
Jian Lif4523d82019-07-07 01:06:09 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.web;
17
Jian Lif4523d82019-07-07 01:06:09 +090018import io.fabric8.kubernetes.client.KubernetesClient;
Jian Li32a28ad2020-12-01 00:35:50 +090019import org.onlab.packet.IpAddress;
Jian Lif4523d82019-07-07 01:06:09 +090020import org.onlab.util.ItemNotFoundException;
Jian Lif4523d82019-07-07 01:06:09 +090021import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
22import org.onosproject.k8snetworking.api.K8sIngressAdminService;
Jian Li324d6dc2019-07-10 10:55:15 +090023import org.onosproject.k8snetworking.api.K8sNamespaceAdminService;
Jian Lif4523d82019-07-07 01:06:09 +090024import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
25import org.onosproject.k8snetworking.api.K8sNetworkPolicyAdminService;
26import org.onosproject.k8snetworking.api.K8sPodAdminService;
Jian Li32a28ad2020-12-01 00:35:50 +090027import org.onosproject.k8snetworking.api.K8sPort;
Jian Lif4523d82019-07-07 01:06:09 +090028import org.onosproject.k8snetworking.api.K8sServiceAdminService;
29import org.onosproject.k8snetworking.util.K8sNetworkingUtil;
30import org.onosproject.k8snode.api.K8sApiConfig;
31import org.onosproject.k8snode.api.K8sApiConfigService;
Jian Li32a28ad2020-12-01 00:35:50 +090032import org.onosproject.k8snode.api.K8sHost;
33import org.onosproject.k8snode.api.K8sHostAdminService;
Jian Lif4523d82019-07-07 01:06:09 +090034import org.onosproject.k8snode.api.K8sNode;
35import org.onosproject.k8snode.api.K8sNodeAdminService;
36import org.onosproject.k8snode.api.K8sNodeState;
Jian Lif4523d82019-07-07 01:06:09 +090037import org.onosproject.rest.AbstractWebResource;
38import org.slf4j.Logger;
39import org.slf4j.LoggerFactory;
40
Jian Li32a28ad2020-12-01 00:35:50 +090041import javax.ws.rs.Consumes;
42import javax.ws.rs.DELETE;
Jian Lif4523d82019-07-07 01:06:09 +090043import javax.ws.rs.GET;
44import javax.ws.rs.Path;
45import javax.ws.rs.Produces;
46import javax.ws.rs.core.MediaType;
47import javax.ws.rs.core.Response;
Jian Li32a28ad2020-12-01 00:35:50 +090048import java.util.Set;
49import java.util.stream.Collectors;
Jian Lif4523d82019-07-07 01:06:09 +090050
51import static java.lang.Thread.sleep;
Jian Li4bd6f2b2019-08-16 21:39:27 +090052import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.syncPortFromPod;
Jian Lif4523d82019-07-07 01:06:09 +090053import static org.onosproject.k8snode.api.K8sNode.Type.MASTER;
54import static org.onosproject.k8snode.api.K8sNode.Type.MINION;
55import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
56
57/**
58 * REST interface for synchronizing kubernetes network states and rules.
59 */
60@Path("management")
61public class K8sManagementWebResource extends AbstractWebResource {
62 private final Logger log = LoggerFactory.getLogger(getClass());
63
Jian Li32a28ad2020-12-01 00:35:50 +090064 private static final long MID_SLEEP_MS = 3000; // we wait 3s
Jian Li5abc9f02020-09-04 19:38:37 +090065 private static final long SLEEP_MS = 10000; // we wait 10s
66 private static final long TIMEOUT_MS = 30000; // we wait 30s
Jian Lif4523d82019-07-07 01:06:09 +090067
Jian Li32a28ad2020-12-01 00:35:50 +090068 private static final String MESSAGE_ALL = "Received all %s request";
69 private static final String REMOVE = "REMOVE";
70
Jian Lif4523d82019-07-07 01:06:09 +090071 private final K8sApiConfigService configService = get(K8sApiConfigService.class);
72 private final K8sPodAdminService podAdminService = get(K8sPodAdminService.class);
Jian Li324d6dc2019-07-10 10:55:15 +090073 private final K8sNamespaceAdminService namespaceAdminService =
74 get(K8sNamespaceAdminService.class);
Jian Lif4523d82019-07-07 01:06:09 +090075 private final K8sServiceAdminService serviceAdminService =
76 get(K8sServiceAdminService.class);
77 private final K8sIngressAdminService ingressAdminService =
78 get(K8sIngressAdminService.class);
79 private final K8sEndpointsAdminService endpointsAdminService =
80 get(K8sEndpointsAdminService.class);
81 private final K8sNetworkAdminService networkAdminService =
82 get(K8sNetworkAdminService.class);
83 private final K8sNodeAdminService nodeAdminService =
84 get(K8sNodeAdminService.class);
Jian Li32a28ad2020-12-01 00:35:50 +090085 private final K8sHostAdminService hostAdminService =
86 get(K8sHostAdminService.class);
Jian Lif4523d82019-07-07 01:06:09 +090087 private final K8sNetworkPolicyAdminService policyAdminService =
88 get(K8sNetworkPolicyAdminService.class);
89
90 /**
91 * Synchronizes the all states with kubernetes API server.
92 *
93 * @return 200 OK with sync result, 404 not found
94 * @throws InterruptedException exception
95 */
96 @GET
97 @Produces(MediaType.APPLICATION_JSON)
98 @Path("sync/states")
99 public Response syncStates() {
100 K8sApiConfig config =
101 configService.apiConfigs().stream().findAny().orElse(null);
102 if (config == null) {
103 throw new ItemNotFoundException("Failed to find valid kubernetes API configuration.");
104 }
105
106 KubernetesClient client = K8sNetworkingUtil.k8sClient(config);
107
108 if (client == null) {
109 throw new ItemNotFoundException("Failed to connect to kubernetes API server.");
110 }
111
Jian Li324d6dc2019-07-10 10:55:15 +0900112 client.namespaces().list().getItems().forEach(ns -> {
113 if (namespaceAdminService.namespace(ns.getMetadata().getUid()) != null) {
114 namespaceAdminService.updateNamespace(ns);
115 } else {
116 namespaceAdminService.createNamespace(ns);
117 }
118 });
119
Jian Lif4523d82019-07-07 01:06:09 +0900120 client.services().inAnyNamespace().list().getItems().forEach(svc -> {
121 if (serviceAdminService.service(svc.getMetadata().getUid()) != null) {
122 serviceAdminService.updateService(svc);
123 } else {
124 serviceAdminService.createService(svc);
125 }
126 });
127
128 client.endpoints().inAnyNamespace().list().getItems().forEach(ep -> {
129 if (endpointsAdminService.endpoints(ep.getMetadata().getUid()) != null) {
130 endpointsAdminService.updateEndpoints(ep);
131 } else {
132 endpointsAdminService.createEndpoints(ep);
133 }
134 });
135
136 client.pods().inAnyNamespace().list().getItems().forEach(pod -> {
137 if (podAdminService.pod(pod.getMetadata().getUid()) != null) {
138 podAdminService.updatePod(pod);
139 } else {
140 podAdminService.createPod(pod);
141 }
142
143 syncPortFromPod(pod, networkAdminService);
144 });
145
146 client.extensions().ingresses().inAnyNamespace().list().getItems().forEach(ingress -> {
147 if (ingressAdminService.ingress(ingress.getMetadata().getUid()) != null) {
148 ingressAdminService.updateIngress(ingress);
149 } else {
150 ingressAdminService.createIngress(ingress);
151 }
152 });
153
154 client.network().networkPolicies().inAnyNamespace().list().getItems().forEach(policy -> {
155 if (policyAdminService.networkPolicy(policy.getMetadata().getUid()) != null) {
156 policyAdminService.updateNetworkPolicy(policy);
157 } else {
158 policyAdminService.createNetworkPolicy(policy);
159 }
160 });
161
162 return ok(mapper().createObjectNode()).build();
163 }
164
165 /**
166 * Synchronizes the flow rules.
167 *
168 * @return 200 OK with sync result, 404 not found
169 */
170 @GET
171 @Produces(MediaType.APPLICATION_JSON)
172 @Path("sync/rules")
173 public Response syncRules() {
174
175 syncRulesBase();
176 return ok(mapper().createObjectNode()).build();
177 }
178
Jian Li32a28ad2020-12-01 00:35:50 +0900179 /**
180 * Removes all nodes and hosts.
181 *
182 * @return 204 NO_CONTENT, 400 BAD_REQUEST if the JSON is malformed, and
183 * 304 NOT_MODIFIED without the updated config
184 */
185 @DELETE
186 @Path("purge/all")
187 @Consumes(MediaType.APPLICATION_JSON)
188 @Produces(MediaType.APPLICATION_JSON)
189 public Response purgeAll() {
190 log.trace(String.format(MESSAGE_ALL, REMOVE));
191
192 Set<String> portIds = networkAdminService.ports().stream().map(K8sPort::portId).collect(Collectors.toSet());
193 portIds.forEach(networkAdminService::removePort);
194
195 try {
196 sleep(MID_SLEEP_MS);
197 } catch (InterruptedException e) {
198 log.error("Exception caused during node synchronization...");
199 }
200
201 Set<String> masters = nodeAdminService.nodes(K8sNode.Type.MASTER).stream()
202 .map(K8sNode::hostname).collect(Collectors.toSet());
203 Set<String> workers = nodeAdminService.nodes(K8sNode.Type.MINION).stream()
204 .map(K8sNode::hostname).collect(Collectors.toSet());
205
206 for (String hostname : workers) {
207 nodeAdminService.removeNode(hostname);
208 try {
209 sleep(MID_SLEEP_MS);
210 } catch (InterruptedException e) {
211 log.error("Exception caused during node synchronization...");
212 }
213 }
214
215 for (String hostname : masters) {
216 nodeAdminService.removeNode(hostname);
217 try {
218 sleep(MID_SLEEP_MS);
219 } catch (InterruptedException e) {
220 log.error("Exception caused during node synchronization...");
221 }
222 }
223
224 Set<IpAddress> allHosts = hostAdminService.hosts().stream().map(K8sHost::hostIp).collect(Collectors.toSet());
225 for (IpAddress hostIp : allHosts) {
226 hostAdminService.removeHost(hostIp);
227 try {
228 sleep(MID_SLEEP_MS);
229 } catch (InterruptedException e) {
230 log.error("Exception caused during node synchronization...");
231 }
232 }
233
234 return Response.noContent().build();
235 }
236
Jian Lif4523d82019-07-07 01:06:09 +0900237 private void syncRulesBase() {
238 nodeAdminService.completeNodes(MASTER).forEach(this::syncRulesBaseForNode);
239 nodeAdminService.completeNodes(MINION).forEach(this::syncRulesBaseForNode);
240 }
241
242 private void syncRulesBaseForNode(K8sNode k8sNode) {
243 K8sNode updated = k8sNode.updateState(K8sNodeState.INIT);
244 nodeAdminService.updateNode(updated);
245
246 boolean result = true;
247 long timeoutExpiredMs = System.currentTimeMillis() + TIMEOUT_MS;
248
249 while (nodeAdminService.node(k8sNode.hostname()).state() != COMPLETE) {
250
251 long waitMs = timeoutExpiredMs - System.currentTimeMillis();
252
253 try {
Jian Li5abc9f02020-09-04 19:38:37 +0900254 sleep(SLEEP_MS);
Jian Lif4523d82019-07-07 01:06:09 +0900255 } catch (InterruptedException e) {
256 log.error("Exception caused during node synchronization...");
257 }
258
259 if (nodeAdminService.node(k8sNode.hostname()).state() == COMPLETE) {
260 break;
261 } else {
262 nodeAdminService.updateNode(updated);
263 log.info("Failed to synchronize flow rules, retrying...");
264 }
265
266 if (waitMs <= 0) {
267 result = false;
268 break;
269 }
270 }
271
272 if (result) {
273 log.info("Successfully synchronize flow rules for node {}!", k8sNode.hostname());
274 } else {
275 log.warn("Failed to synchronize flow rules for node {}.", k8sNode.hostname());
276 }
277 }
278}