blob: b6475e3e87ad3f2bb3c217c9f3ea289cd62f9395 [file] [log] [blame]
Jian Lif4523d82019-07-07 01:06:09 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.web;
17
18import io.fabric8.kubernetes.api.model.Pod;
19import io.fabric8.kubernetes.client.KubernetesClient;
20import org.onlab.packet.IpAddress;
21import org.onlab.packet.MacAddress;
22import org.onlab.util.ItemNotFoundException;
23import org.onosproject.k8snetworking.api.DefaultK8sPort;
24import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
25import org.onosproject.k8snetworking.api.K8sIngressAdminService;
Jian Li324d6dc2019-07-10 10:55:15 +090026import org.onosproject.k8snetworking.api.K8sNamespaceAdminService;
Jian Lif4523d82019-07-07 01:06:09 +090027import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
28import org.onosproject.k8snetworking.api.K8sNetworkPolicyAdminService;
29import org.onosproject.k8snetworking.api.K8sPodAdminService;
30import org.onosproject.k8snetworking.api.K8sPort;
31import org.onosproject.k8snetworking.api.K8sServiceAdminService;
32import org.onosproject.k8snetworking.util.K8sNetworkingUtil;
33import org.onosproject.k8snode.api.K8sApiConfig;
34import org.onosproject.k8snode.api.K8sApiConfigService;
35import org.onosproject.k8snode.api.K8sNode;
36import org.onosproject.k8snode.api.K8sNodeAdminService;
37import org.onosproject.k8snode.api.K8sNodeState;
38import org.onosproject.net.DeviceId;
39import org.onosproject.net.PortNumber;
40import org.onosproject.rest.AbstractWebResource;
41import org.slf4j.Logger;
42import org.slf4j.LoggerFactory;
43
44import javax.ws.rs.GET;
45import javax.ws.rs.Path;
46import javax.ws.rs.Produces;
47import javax.ws.rs.core.MediaType;
48import javax.ws.rs.core.Response;
49import java.util.Map;
50
51import static java.lang.Thread.sleep;
52import static org.onosproject.k8snetworking.api.K8sPort.State.INACTIVE;
53import static org.onosproject.k8snode.api.K8sNode.Type.MASTER;
54import static org.onosproject.k8snode.api.K8sNode.Type.MINION;
55import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
56
57/**
58 * REST interface for synchronizing kubernetes network states and rules.
59 */
60@Path("management")
61public class K8sManagementWebResource extends AbstractWebResource {
62 private final Logger log = LoggerFactory.getLogger(getClass());
63
64 private static final String PORT_ID = "portId";
65 private static final String DEVICE_ID = "deviceId";
66 private static final String PORT_NUMBER = "portNumber";
67 private static final String IP_ADDRESS = "ipAddress";
68 private static final String MAC_ADDRESS = "macAddress";
69 private static final String NETWORK_ID = "networkId";
70
71 private static final long SLEEP_MIDDLE_MS = 3000; // we wait 3s
72 private static final long TIMEOUT_MS = 10000; // we wait 10s
73
74 private final K8sApiConfigService configService = get(K8sApiConfigService.class);
75 private final K8sPodAdminService podAdminService = get(K8sPodAdminService.class);
Jian Li324d6dc2019-07-10 10:55:15 +090076 private final K8sNamespaceAdminService namespaceAdminService =
77 get(K8sNamespaceAdminService.class);
Jian Lif4523d82019-07-07 01:06:09 +090078 private final K8sServiceAdminService serviceAdminService =
79 get(K8sServiceAdminService.class);
80 private final K8sIngressAdminService ingressAdminService =
81 get(K8sIngressAdminService.class);
82 private final K8sEndpointsAdminService endpointsAdminService =
83 get(K8sEndpointsAdminService.class);
84 private final K8sNetworkAdminService networkAdminService =
85 get(K8sNetworkAdminService.class);
86 private final K8sNodeAdminService nodeAdminService =
87 get(K8sNodeAdminService.class);
88 private final K8sNetworkPolicyAdminService policyAdminService =
89 get(K8sNetworkPolicyAdminService.class);
90
91 /**
92 * Synchronizes the all states with kubernetes API server.
93 *
94 * @return 200 OK with sync result, 404 not found
95 * @throws InterruptedException exception
96 */
97 @GET
98 @Produces(MediaType.APPLICATION_JSON)
99 @Path("sync/states")
100 public Response syncStates() {
101 K8sApiConfig config =
102 configService.apiConfigs().stream().findAny().orElse(null);
103 if (config == null) {
104 throw new ItemNotFoundException("Failed to find valid kubernetes API configuration.");
105 }
106
107 KubernetesClient client = K8sNetworkingUtil.k8sClient(config);
108
109 if (client == null) {
110 throw new ItemNotFoundException("Failed to connect to kubernetes API server.");
111 }
112
Jian Li324d6dc2019-07-10 10:55:15 +0900113 client.namespaces().list().getItems().forEach(ns -> {
114 if (namespaceAdminService.namespace(ns.getMetadata().getUid()) != null) {
115 namespaceAdminService.updateNamespace(ns);
116 } else {
117 namespaceAdminService.createNamespace(ns);
118 }
119 });
120
Jian Lif4523d82019-07-07 01:06:09 +0900121 client.services().inAnyNamespace().list().getItems().forEach(svc -> {
122 if (serviceAdminService.service(svc.getMetadata().getUid()) != null) {
123 serviceAdminService.updateService(svc);
124 } else {
125 serviceAdminService.createService(svc);
126 }
127 });
128
129 client.endpoints().inAnyNamespace().list().getItems().forEach(ep -> {
130 if (endpointsAdminService.endpoints(ep.getMetadata().getUid()) != null) {
131 endpointsAdminService.updateEndpoints(ep);
132 } else {
133 endpointsAdminService.createEndpoints(ep);
134 }
135 });
136
137 client.pods().inAnyNamespace().list().getItems().forEach(pod -> {
138 if (podAdminService.pod(pod.getMetadata().getUid()) != null) {
139 podAdminService.updatePod(pod);
140 } else {
141 podAdminService.createPod(pod);
142 }
143
144 syncPortFromPod(pod, networkAdminService);
145 });
146
147 client.extensions().ingresses().inAnyNamespace().list().getItems().forEach(ingress -> {
148 if (ingressAdminService.ingress(ingress.getMetadata().getUid()) != null) {
149 ingressAdminService.updateIngress(ingress);
150 } else {
151 ingressAdminService.createIngress(ingress);
152 }
153 });
154
155 client.network().networkPolicies().inAnyNamespace().list().getItems().forEach(policy -> {
156 if (policyAdminService.networkPolicy(policy.getMetadata().getUid()) != null) {
157 policyAdminService.updateNetworkPolicy(policy);
158 } else {
159 policyAdminService.createNetworkPolicy(policy);
160 }
161 });
162
163 return ok(mapper().createObjectNode()).build();
164 }
165
166 /**
167 * Synchronizes the flow rules.
168 *
169 * @return 200 OK with sync result, 404 not found
170 */
171 @GET
172 @Produces(MediaType.APPLICATION_JSON)
173 @Path("sync/rules")
174 public Response syncRules() {
175
176 syncRulesBase();
177 return ok(mapper().createObjectNode()).build();
178 }
179
180 private void syncPortFromPod(Pod pod, K8sNetworkAdminService adminService) {
181 Map<String, String> annotations = pod.getMetadata().getAnnotations();
182 if (annotations != null && !annotations.isEmpty() &&
183 annotations.get(PORT_ID) != null) {
184 String portId = annotations.get(PORT_ID);
185
186 K8sPort oldPort = adminService.port(portId);
187
188 String networkId = annotations.get(NETWORK_ID);
189 DeviceId deviceId = DeviceId.deviceId(annotations.get(DEVICE_ID));
190 PortNumber portNumber = PortNumber.portNumber(annotations.get(PORT_NUMBER));
191 IpAddress ipAddress = IpAddress.valueOf(annotations.get(IP_ADDRESS));
192 MacAddress macAddress = MacAddress.valueOf(annotations.get(MAC_ADDRESS));
193
194 K8sPort newPort = DefaultK8sPort.builder()
195 .portId(portId)
196 .networkId(networkId)
197 .deviceId(deviceId)
198 .ipAddress(ipAddress)
199 .macAddress(macAddress)
200 .portNumber(portNumber)
201 .state(INACTIVE)
202 .build();
203
204 if (oldPort == null) {
205 adminService.createPort(newPort);
206 } else {
207 adminService.updatePort(newPort);
208 }
209 }
210 }
211
212 private void syncRulesBase() {
213 nodeAdminService.completeNodes(MASTER).forEach(this::syncRulesBaseForNode);
214 nodeAdminService.completeNodes(MINION).forEach(this::syncRulesBaseForNode);
215 }
216
217 private void syncRulesBaseForNode(K8sNode k8sNode) {
218 K8sNode updated = k8sNode.updateState(K8sNodeState.INIT);
219 nodeAdminService.updateNode(updated);
220
221 boolean result = true;
222 long timeoutExpiredMs = System.currentTimeMillis() + TIMEOUT_MS;
223
224 while (nodeAdminService.node(k8sNode.hostname()).state() != COMPLETE) {
225
226 long waitMs = timeoutExpiredMs - System.currentTimeMillis();
227
228 try {
229 sleep(SLEEP_MIDDLE_MS);
230 } catch (InterruptedException e) {
231 log.error("Exception caused during node synchronization...");
232 }
233
234 if (nodeAdminService.node(k8sNode.hostname()).state() == COMPLETE) {
235 break;
236 } else {
237 nodeAdminService.updateNode(updated);
238 log.info("Failed to synchronize flow rules, retrying...");
239 }
240
241 if (waitMs <= 0) {
242 result = false;
243 break;
244 }
245 }
246
247 if (result) {
248 log.info("Successfully synchronize flow rules for node {}!", k8sNode.hostname());
249 } else {
250 log.warn("Failed to synchronize flow rules for node {}.", k8sNode.hostname());
251 }
252 }
253}