blob: 5d23fab8eb33c8567fc21481aabdc698642b3b66 [file] [log] [blame]
Jian Lif4523d82019-07-07 01:06:09 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.web;
17
Jian Lif4523d82019-07-07 01:06:09 +090018import io.fabric8.kubernetes.client.KubernetesClient;
Jian Lif4523d82019-07-07 01:06:09 +090019import org.onlab.util.ItemNotFoundException;
Jian Lif4523d82019-07-07 01:06:09 +090020import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
21import org.onosproject.k8snetworking.api.K8sIngressAdminService;
Jian Li324d6dc2019-07-10 10:55:15 +090022import org.onosproject.k8snetworking.api.K8sNamespaceAdminService;
Jian Lif4523d82019-07-07 01:06:09 +090023import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
24import org.onosproject.k8snetworking.api.K8sNetworkPolicyAdminService;
25import org.onosproject.k8snetworking.api.K8sPodAdminService;
Jian Lif4523d82019-07-07 01:06:09 +090026import org.onosproject.k8snetworking.api.K8sServiceAdminService;
27import org.onosproject.k8snetworking.util.K8sNetworkingUtil;
28import org.onosproject.k8snode.api.K8sApiConfig;
29import org.onosproject.k8snode.api.K8sApiConfigService;
30import org.onosproject.k8snode.api.K8sNode;
31import org.onosproject.k8snode.api.K8sNodeAdminService;
32import org.onosproject.k8snode.api.K8sNodeState;
Jian Lif4523d82019-07-07 01:06:09 +090033import org.onosproject.rest.AbstractWebResource;
34import org.slf4j.Logger;
35import org.slf4j.LoggerFactory;
36
37import javax.ws.rs.GET;
38import javax.ws.rs.Path;
39import javax.ws.rs.Produces;
40import javax.ws.rs.core.MediaType;
41import javax.ws.rs.core.Response;
Jian Lif4523d82019-07-07 01:06:09 +090042
43import static java.lang.Thread.sleep;
Jian Li4bd6f2b2019-08-16 21:39:27 +090044import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.syncPortFromPod;
Jian Lif4523d82019-07-07 01:06:09 +090045import static org.onosproject.k8snode.api.K8sNode.Type.MASTER;
46import static org.onosproject.k8snode.api.K8sNode.Type.MINION;
47import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
48
49/**
50 * REST interface for synchronizing kubernetes network states and rules.
51 */
52@Path("management")
53public class K8sManagementWebResource extends AbstractWebResource {
54 private final Logger log = LoggerFactory.getLogger(getClass());
55
Jian Li913703c2019-06-20 22:47:32 +090056 private static final String PORT_ID = "portId";
57 private static final String DEVICE_ID = "deviceId";
58 private static final String PORT_NUMBER = "portNumber";
59 private static final String IP_ADDRESS = "ipAddress";
60 private static final String MAC_ADDRESS = "macAddress";
61 private static final String NETWORK_ID = "networkId";
62
Jian Lif4523d82019-07-07 01:06:09 +090063 private static final long SLEEP_MIDDLE_MS = 3000; // we wait 3s
64 private static final long TIMEOUT_MS = 10000; // we wait 10s
65
66 private final K8sApiConfigService configService = get(K8sApiConfigService.class);
67 private final K8sPodAdminService podAdminService = get(K8sPodAdminService.class);
Jian Li324d6dc2019-07-10 10:55:15 +090068 private final K8sNamespaceAdminService namespaceAdminService =
69 get(K8sNamespaceAdminService.class);
Jian Lif4523d82019-07-07 01:06:09 +090070 private final K8sServiceAdminService serviceAdminService =
71 get(K8sServiceAdminService.class);
72 private final K8sIngressAdminService ingressAdminService =
73 get(K8sIngressAdminService.class);
74 private final K8sEndpointsAdminService endpointsAdminService =
75 get(K8sEndpointsAdminService.class);
76 private final K8sNetworkAdminService networkAdminService =
77 get(K8sNetworkAdminService.class);
78 private final K8sNodeAdminService nodeAdminService =
79 get(K8sNodeAdminService.class);
80 private final K8sNetworkPolicyAdminService policyAdminService =
81 get(K8sNetworkPolicyAdminService.class);
82
83 /**
84 * Synchronizes the all states with kubernetes API server.
85 *
86 * @return 200 OK with sync result, 404 not found
87 * @throws InterruptedException exception
88 */
89 @GET
90 @Produces(MediaType.APPLICATION_JSON)
91 @Path("sync/states")
92 public Response syncStates() {
93 K8sApiConfig config =
94 configService.apiConfigs().stream().findAny().orElse(null);
95 if (config == null) {
96 throw new ItemNotFoundException("Failed to find valid kubernetes API configuration.");
97 }
98
99 KubernetesClient client = K8sNetworkingUtil.k8sClient(config);
100
101 if (client == null) {
102 throw new ItemNotFoundException("Failed to connect to kubernetes API server.");
103 }
104
Jian Li324d6dc2019-07-10 10:55:15 +0900105 client.namespaces().list().getItems().forEach(ns -> {
106 if (namespaceAdminService.namespace(ns.getMetadata().getUid()) != null) {
107 namespaceAdminService.updateNamespace(ns);
108 } else {
109 namespaceAdminService.createNamespace(ns);
110 }
111 });
112
Jian Lif4523d82019-07-07 01:06:09 +0900113 client.services().inAnyNamespace().list().getItems().forEach(svc -> {
114 if (serviceAdminService.service(svc.getMetadata().getUid()) != null) {
115 serviceAdminService.updateService(svc);
116 } else {
117 serviceAdminService.createService(svc);
118 }
119 });
120
121 client.endpoints().inAnyNamespace().list().getItems().forEach(ep -> {
122 if (endpointsAdminService.endpoints(ep.getMetadata().getUid()) != null) {
123 endpointsAdminService.updateEndpoints(ep);
124 } else {
125 endpointsAdminService.createEndpoints(ep);
126 }
127 });
128
129 client.pods().inAnyNamespace().list().getItems().forEach(pod -> {
130 if (podAdminService.pod(pod.getMetadata().getUid()) != null) {
131 podAdminService.updatePod(pod);
132 } else {
133 podAdminService.createPod(pod);
134 }
135
136 syncPortFromPod(pod, networkAdminService);
137 });
138
139 client.extensions().ingresses().inAnyNamespace().list().getItems().forEach(ingress -> {
140 if (ingressAdminService.ingress(ingress.getMetadata().getUid()) != null) {
141 ingressAdminService.updateIngress(ingress);
142 } else {
143 ingressAdminService.createIngress(ingress);
144 }
145 });
146
147 client.network().networkPolicies().inAnyNamespace().list().getItems().forEach(policy -> {
148 if (policyAdminService.networkPolicy(policy.getMetadata().getUid()) != null) {
149 policyAdminService.updateNetworkPolicy(policy);
150 } else {
151 policyAdminService.createNetworkPolicy(policy);
152 }
153 });
154
155 return ok(mapper().createObjectNode()).build();
156 }
157
158 /**
159 * Synchronizes the flow rules.
160 *
161 * @return 200 OK with sync result, 404 not found
162 */
163 @GET
164 @Produces(MediaType.APPLICATION_JSON)
165 @Path("sync/rules")
166 public Response syncRules() {
167
168 syncRulesBase();
169 return ok(mapper().createObjectNode()).build();
170 }
171
Jian Lif4523d82019-07-07 01:06:09 +0900172 private void syncRulesBase() {
173 nodeAdminService.completeNodes(MASTER).forEach(this::syncRulesBaseForNode);
174 nodeAdminService.completeNodes(MINION).forEach(this::syncRulesBaseForNode);
175 }
176
177 private void syncRulesBaseForNode(K8sNode k8sNode) {
178 K8sNode updated = k8sNode.updateState(K8sNodeState.INIT);
179 nodeAdminService.updateNode(updated);
180
181 boolean result = true;
182 long timeoutExpiredMs = System.currentTimeMillis() + TIMEOUT_MS;
183
184 while (nodeAdminService.node(k8sNode.hostname()).state() != COMPLETE) {
185
186 long waitMs = timeoutExpiredMs - System.currentTimeMillis();
187
188 try {
189 sleep(SLEEP_MIDDLE_MS);
190 } catch (InterruptedException e) {
191 log.error("Exception caused during node synchronization...");
192 }
193
194 if (nodeAdminService.node(k8sNode.hostname()).state() == COMPLETE) {
195 break;
196 } else {
197 nodeAdminService.updateNode(updated);
198 log.info("Failed to synchronize flow rules, retrying...");
199 }
200
201 if (waitMs <= 0) {
202 result = false;
203 break;
204 }
205 }
206
207 if (result) {
208 log.info("Successfully synchronize flow rules for node {}!", k8sNode.hostname());
209 } else {
210 log.warn("Failed to synchronize flow rules for node {}.", k8sNode.hostname());
211 }
212 }
213}