blob: 42adb65c180953cc6fd6dcc15a16da6c9540fd7e [file] [log] [blame]
Jian Lif4523d82019-07-07 01:06:09 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.web;
17
18import io.fabric8.kubernetes.api.model.Pod;
19import io.fabric8.kubernetes.client.KubernetesClient;
20import org.onlab.packet.IpAddress;
21import org.onlab.packet.MacAddress;
22import org.onlab.util.ItemNotFoundException;
23import org.onosproject.k8snetworking.api.DefaultK8sPort;
24import org.onosproject.k8snetworking.api.K8sEndpointsAdminService;
25import org.onosproject.k8snetworking.api.K8sIngressAdminService;
26import org.onosproject.k8snetworking.api.K8sNetworkAdminService;
27import org.onosproject.k8snetworking.api.K8sNetworkPolicyAdminService;
28import org.onosproject.k8snetworking.api.K8sPodAdminService;
29import org.onosproject.k8snetworking.api.K8sPort;
30import org.onosproject.k8snetworking.api.K8sServiceAdminService;
31import org.onosproject.k8snetworking.util.K8sNetworkingUtil;
32import org.onosproject.k8snode.api.K8sApiConfig;
33import org.onosproject.k8snode.api.K8sApiConfigService;
34import org.onosproject.k8snode.api.K8sNode;
35import org.onosproject.k8snode.api.K8sNodeAdminService;
36import org.onosproject.k8snode.api.K8sNodeState;
37import org.onosproject.net.DeviceId;
38import org.onosproject.net.PortNumber;
39import org.onosproject.rest.AbstractWebResource;
40import org.slf4j.Logger;
41import org.slf4j.LoggerFactory;
42
43import javax.ws.rs.GET;
44import javax.ws.rs.Path;
45import javax.ws.rs.Produces;
46import javax.ws.rs.core.MediaType;
47import javax.ws.rs.core.Response;
48import java.util.Map;
49
50import static java.lang.Thread.sleep;
51import static org.onosproject.k8snetworking.api.K8sPort.State.INACTIVE;
52import static org.onosproject.k8snode.api.K8sNode.Type.MASTER;
53import static org.onosproject.k8snode.api.K8sNode.Type.MINION;
54import static org.onosproject.k8snode.api.K8sNodeState.COMPLETE;
55
56/**
57 * REST interface for synchronizing kubernetes network states and rules.
58 */
59@Path("management")
60public class K8sManagementWebResource extends AbstractWebResource {
61 private final Logger log = LoggerFactory.getLogger(getClass());
62
63 private static final String PORT_ID = "portId";
64 private static final String DEVICE_ID = "deviceId";
65 private static final String PORT_NUMBER = "portNumber";
66 private static final String IP_ADDRESS = "ipAddress";
67 private static final String MAC_ADDRESS = "macAddress";
68 private static final String NETWORK_ID = "networkId";
69
70 private static final long SLEEP_MIDDLE_MS = 3000; // we wait 3s
71 private static final long TIMEOUT_MS = 10000; // we wait 10s
72
73 private final K8sApiConfigService configService = get(K8sApiConfigService.class);
74 private final K8sPodAdminService podAdminService = get(K8sPodAdminService.class);
75 private final K8sServiceAdminService serviceAdminService =
76 get(K8sServiceAdminService.class);
77 private final K8sIngressAdminService ingressAdminService =
78 get(K8sIngressAdminService.class);
79 private final K8sEndpointsAdminService endpointsAdminService =
80 get(K8sEndpointsAdminService.class);
81 private final K8sNetworkAdminService networkAdminService =
82 get(K8sNetworkAdminService.class);
83 private final K8sNodeAdminService nodeAdminService =
84 get(K8sNodeAdminService.class);
85 private final K8sNetworkPolicyAdminService policyAdminService =
86 get(K8sNetworkPolicyAdminService.class);
87
88 /**
89 * Synchronizes the all states with kubernetes API server.
90 *
91 * @return 200 OK with sync result, 404 not found
92 * @throws InterruptedException exception
93 */
94 @GET
95 @Produces(MediaType.APPLICATION_JSON)
96 @Path("sync/states")
97 public Response syncStates() {
98 K8sApiConfig config =
99 configService.apiConfigs().stream().findAny().orElse(null);
100 if (config == null) {
101 throw new ItemNotFoundException("Failed to find valid kubernetes API configuration.");
102 }
103
104 KubernetesClient client = K8sNetworkingUtil.k8sClient(config);
105
106 if (client == null) {
107 throw new ItemNotFoundException("Failed to connect to kubernetes API server.");
108 }
109
110 client.services().inAnyNamespace().list().getItems().forEach(svc -> {
111 if (serviceAdminService.service(svc.getMetadata().getUid()) != null) {
112 serviceAdminService.updateService(svc);
113 } else {
114 serviceAdminService.createService(svc);
115 }
116 });
117
118 client.endpoints().inAnyNamespace().list().getItems().forEach(ep -> {
119 if (endpointsAdminService.endpoints(ep.getMetadata().getUid()) != null) {
120 endpointsAdminService.updateEndpoints(ep);
121 } else {
122 endpointsAdminService.createEndpoints(ep);
123 }
124 });
125
126 client.pods().inAnyNamespace().list().getItems().forEach(pod -> {
127 if (podAdminService.pod(pod.getMetadata().getUid()) != null) {
128 podAdminService.updatePod(pod);
129 } else {
130 podAdminService.createPod(pod);
131 }
132
133 syncPortFromPod(pod, networkAdminService);
134 });
135
136 client.extensions().ingresses().inAnyNamespace().list().getItems().forEach(ingress -> {
137 if (ingressAdminService.ingress(ingress.getMetadata().getUid()) != null) {
138 ingressAdminService.updateIngress(ingress);
139 } else {
140 ingressAdminService.createIngress(ingress);
141 }
142 });
143
144 client.network().networkPolicies().inAnyNamespace().list().getItems().forEach(policy -> {
145 if (policyAdminService.networkPolicy(policy.getMetadata().getUid()) != null) {
146 policyAdminService.updateNetworkPolicy(policy);
147 } else {
148 policyAdminService.createNetworkPolicy(policy);
149 }
150 });
151
152 return ok(mapper().createObjectNode()).build();
153 }
154
155 /**
156 * Synchronizes the flow rules.
157 *
158 * @return 200 OK with sync result, 404 not found
159 */
160 @GET
161 @Produces(MediaType.APPLICATION_JSON)
162 @Path("sync/rules")
163 public Response syncRules() {
164
165 syncRulesBase();
166 return ok(mapper().createObjectNode()).build();
167 }
168
169 private void syncPortFromPod(Pod pod, K8sNetworkAdminService adminService) {
170 Map<String, String> annotations = pod.getMetadata().getAnnotations();
171 if (annotations != null && !annotations.isEmpty() &&
172 annotations.get(PORT_ID) != null) {
173 String portId = annotations.get(PORT_ID);
174
175 K8sPort oldPort = adminService.port(portId);
176
177 String networkId = annotations.get(NETWORK_ID);
178 DeviceId deviceId = DeviceId.deviceId(annotations.get(DEVICE_ID));
179 PortNumber portNumber = PortNumber.portNumber(annotations.get(PORT_NUMBER));
180 IpAddress ipAddress = IpAddress.valueOf(annotations.get(IP_ADDRESS));
181 MacAddress macAddress = MacAddress.valueOf(annotations.get(MAC_ADDRESS));
182
183 K8sPort newPort = DefaultK8sPort.builder()
184 .portId(portId)
185 .networkId(networkId)
186 .deviceId(deviceId)
187 .ipAddress(ipAddress)
188 .macAddress(macAddress)
189 .portNumber(portNumber)
190 .state(INACTIVE)
191 .build();
192
193 if (oldPort == null) {
194 adminService.createPort(newPort);
195 } else {
196 adminService.updatePort(newPort);
197 }
198 }
199 }
200
201 private void syncRulesBase() {
202 nodeAdminService.completeNodes(MASTER).forEach(this::syncRulesBaseForNode);
203 nodeAdminService.completeNodes(MINION).forEach(this::syncRulesBaseForNode);
204 }
205
206 private void syncRulesBaseForNode(K8sNode k8sNode) {
207 K8sNode updated = k8sNode.updateState(K8sNodeState.INIT);
208 nodeAdminService.updateNode(updated);
209
210 boolean result = true;
211 long timeoutExpiredMs = System.currentTimeMillis() + TIMEOUT_MS;
212
213 while (nodeAdminService.node(k8sNode.hostname()).state() != COMPLETE) {
214
215 long waitMs = timeoutExpiredMs - System.currentTimeMillis();
216
217 try {
218 sleep(SLEEP_MIDDLE_MS);
219 } catch (InterruptedException e) {
220 log.error("Exception caused during node synchronization...");
221 }
222
223 if (nodeAdminService.node(k8sNode.hostname()).state() == COMPLETE) {
224 break;
225 } else {
226 nodeAdminService.updateNode(updated);
227 log.info("Failed to synchronize flow rules, retrying...");
228 }
229
230 if (waitMs <= 0) {
231 result = false;
232 break;
233 }
234 }
235
236 if (result) {
237 log.info("Successfully synchronize flow rules for node {}!", k8sNode.hostname());
238 } else {
239 log.warn("Failed to synchronize flow rules for node {}.", k8sNode.hostname());
240 }
241 }
242}