blob: 1b96e70c3a7c77e84c0aa4f6d0cce963e4b1e658 [file] [log] [blame]
Jian Lif4523d82019-07-07 01:06:09 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.web;
17
Jian Lif4523d82019-07-07 01:06:09 +090018import io.fabric8.kubernetes.client.KubernetesClient;
Jian Lif4523d82019-07-07 01:06:09 +090019import org.onlab.util.ItemNotFoundException;
Jian Lif4523d82019-07-07 01:06:09 +090020import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
21import org.onosproject.k8snetworking.api.K8sIngressAdminService;
Jian Li324d6dc2019-07-10 10:55:15 +090022import org.onosproject.k8snetworking.api.K8sNamespaceAdminService;
Jian Lif4523d82019-07-07 01:06:09 +090023import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
24import org.onosproject.k8snetworking.api.K8sNetworkPolicyAdminService;
25import org.onosproject.k8snetworking.api.K8sPodAdminService;
Jian Lif4523d82019-07-07 01:06:09 +090026import org.onosproject.k8snetworking.api.K8sServiceAdminService;
27import org.onosproject.k8snetworking.util.K8sNetworkingUtil;
28import org.onosproject.k8snode.api.K8sApiConfig;
29import org.onosproject.k8snode.api.K8sApiConfigService;
30import org.onosproject.k8snode.api.K8sNode;
31import org.onosproject.k8snode.api.K8sNodeAdminService;
32import org.onosproject.k8snode.api.K8sNodeState;
Jian Lif4523d82019-07-07 01:06:09 +090033import org.onosproject.rest.AbstractWebResource;
34import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
37import javax.ws.rs.GET;
38import javax.ws.rs.Path;
39import javax.ws.rs.Produces;
40import javax.ws.rs.core.MediaType;
41import javax.ws.rs.core.Response;
Jian Lif4523d82019-07-07 01:06:09 +090042
43import static java.lang.Thread.sleep;
Jian Li4bd6f2b2019-08-16 21:39:27 +090044import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.syncPortFromPod;
Jian Lif4523d82019-07-07 01:06:09 +090045import static org.onosproject.k8snode.api.K8sNode.Type.MASTER;
46import static org.onosproject.k8snode.api.K8sNode.Type.MINION;
47import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
48
49/**
50 * REST interface for synchronizing kubernetes network states and rules.
51 */
52@Path("management")
53public class K8sManagementWebResource extends AbstractWebResource {
54 private final Logger log = LoggerFactory.getLogger(getClass());
55
Jian Li5abc9f02020-09-04 19:38:37 +090056 private static final long SLEEP_MS = 10000; // we wait 10s
57 private static final long TIMEOUT_MS = 30000; // we wait 30s
Jian Lif4523d82019-07-07 01:06:09 +090058
59 private final K8sApiConfigService configService = get(K8sApiConfigService.class);
60 private final K8sPodAdminService podAdminService = get(K8sPodAdminService.class);
Jian Li324d6dc2019-07-10 10:55:15 +090061 private final K8sNamespaceAdminService namespaceAdminService =
62 get(K8sNamespaceAdminService.class);
Jian Lif4523d82019-07-07 01:06:09 +090063 private final K8sServiceAdminService serviceAdminService =
64 get(K8sServiceAdminService.class);
65 private final K8sIngressAdminService ingressAdminService =
66 get(K8sIngressAdminService.class);
67 private final K8sEndpointsAdminService endpointsAdminService =
68 get(K8sEndpointsAdminService.class);
69 private final K8sNetworkAdminService networkAdminService =
70 get(K8sNetworkAdminService.class);
71 private final K8sNodeAdminService nodeAdminService =
72 get(K8sNodeAdminService.class);
73 private final K8sNetworkPolicyAdminService policyAdminService =
74 get(K8sNetworkPolicyAdminService.class);
75
76 /**
77 * Synchronizes the all states with kubernetes API server.
78 *
79 * @return 200 OK with sync result, 404 not found
80 * @throws InterruptedException exception
81 */
82 @GET
83 @Produces(MediaType.APPLICATION_JSON)
84 @Path("sync/states")
85 public Response syncStates() {
86 K8sApiConfig config =
87 configService.apiConfigs().stream().findAny().orElse(null);
88 if (config == null) {
89 throw new ItemNotFoundException("Failed to find valid kubernetes API configuration.");
90 }
91
92 KubernetesClient client = K8sNetworkingUtil.k8sClient(config);
93
94 if (client == null) {
95 throw new ItemNotFoundException("Failed to connect to kubernetes API server.");
96 }
97
Jian Li324d6dc2019-07-10 10:55:15 +090098 client.namespaces().list().getItems().forEach(ns -> {
99 if (namespaceAdminService.namespace(ns.getMetadata().getUid()) != null) {
100 namespaceAdminService.updateNamespace(ns);
101 } else {
102 namespaceAdminService.createNamespace(ns);
103 }
104 });
105
Jian Lif4523d82019-07-07 01:06:09 +0900106 client.services().inAnyNamespace().list().getItems().forEach(svc -> {
107 if (serviceAdminService.service(svc.getMetadata().getUid()) != null) {
108 serviceAdminService.updateService(svc);
109 } else {
110 serviceAdminService.createService(svc);
111 }
112 });
113
114 client.endpoints().inAnyNamespace().list().getItems().forEach(ep -> {
115 if (endpointsAdminService.endpoints(ep.getMetadata().getUid()) != null) {
116 endpointsAdminService.updateEndpoints(ep);
117 } else {
118 endpointsAdminService.createEndpoints(ep);
119 }
120 });
121
122 client.pods().inAnyNamespace().list().getItems().forEach(pod -> {
123 if (podAdminService.pod(pod.getMetadata().getUid()) != null) {
124 podAdminService.updatePod(pod);
125 } else {
126 podAdminService.createPod(pod);
127 }
128
129 syncPortFromPod(pod, networkAdminService);
130 });
131
132 client.extensions().ingresses().inAnyNamespace().list().getItems().forEach(ingress -> {
133 if (ingressAdminService.ingress(ingress.getMetadata().getUid()) != null) {
134 ingressAdminService.updateIngress(ingress);
135 } else {
136 ingressAdminService.createIngress(ingress);
137 }
138 });
139
140 client.network().networkPolicies().inAnyNamespace().list().getItems().forEach(policy -> {
141 if (policyAdminService.networkPolicy(policy.getMetadata().getUid()) != null) {
142 policyAdminService.updateNetworkPolicy(policy);
143 } else {
144 policyAdminService.createNetworkPolicy(policy);
145 }
146 });
147
148 return ok(mapper().createObjectNode()).build();
149 }
150
151 /**
152 * Synchronizes the flow rules.
153 *
154 * @return 200 OK with sync result, 404 not found
155 */
156 @GET
157 @Produces(MediaType.APPLICATION_JSON)
158 @Path("sync/rules")
159 public Response syncRules() {
160
161 syncRulesBase();
162 return ok(mapper().createObjectNode()).build();
163 }
164
Jian Lif4523d82019-07-07 01:06:09 +0900165 private void syncRulesBase() {
166 nodeAdminService.completeNodes(MASTER).forEach(this::syncRulesBaseForNode);
167 nodeAdminService.completeNodes(MINION).forEach(this::syncRulesBaseForNode);
168 }
169
170 private void syncRulesBaseForNode(K8sNode k8sNode) {
171 K8sNode updated = k8sNode.updateState(K8sNodeState.INIT);
172 nodeAdminService.updateNode(updated);
173
174 boolean result = true;
175 long timeoutExpiredMs = System.currentTimeMillis() + TIMEOUT_MS;
176
177 while (nodeAdminService.node(k8sNode.hostname()).state() != COMPLETE) {
178
179 long waitMs = timeoutExpiredMs - System.currentTimeMillis();
180
181 try {
Jian Li5abc9f02020-09-04 19:38:37 +0900182 sleep(SLEEP_MS);
Jian Lif4523d82019-07-07 01:06:09 +0900183 } catch (InterruptedException e) {
184 log.error("Exception caused during node synchronization...");
185 }
186
187 if (nodeAdminService.node(k8sNode.hostname()).state() == COMPLETE) {
188 break;
189 } else {
190 nodeAdminService.updateNode(updated);
191 log.info("Failed to synchronize flow rules, retrying...");
192 }
193
194 if (waitMs <= 0) {
195 result = false;
196 break;
197 }
198 }
199
200 if (result) {
201 log.info("Successfully synchronize flow rules for node {}!", k8sNode.hostname());
202 } else {
203 log.warn("Failed to synchronize flow rules for node {}.", k8sNode.hostname());
204 }
205 }
206}