blob: 387494a2c145b76e4a5ab823d08c800f133b29d1 [file] [log] [blame]
Jian Li140d8a22019-04-24 23:41:44 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Service;
19import io.fabric8.kubernetes.api.model.ServicePort;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.IPv4;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.IpPrefix;
24import org.onlab.packet.TpPort;
25import org.onosproject.cfg.ComponentConfigService;
26import org.onosproject.cfg.ConfigProperty;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.k8snetworking.api.K8sFlowRuleService;
33import org.onosproject.k8snetworking.api.K8sNetworkService;
34import org.onosproject.k8snetworking.api.K8sServiceEvent;
35import org.onosproject.k8snetworking.api.K8sServiceListener;
36import org.onosproject.k8snetworking.api.K8sServiceService;
37import org.onosproject.k8snode.api.K8sNode;
38import org.onosproject.k8snode.api.K8sNodeEvent;
39import org.onosproject.k8snode.api.K8sNodeListener;
40import org.onosproject.k8snode.api.K8sNodeService;
41import org.onosproject.mastership.MastershipService;
42import org.onosproject.net.DeviceId;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.device.DeviceService;
45import org.onosproject.net.driver.DriverService;
46import org.onosproject.net.flow.DefaultTrafficSelector;
47import org.onosproject.net.flow.DefaultTrafficTreatment;
48import org.onosproject.net.flow.TrafficSelector;
49import org.onosproject.net.flow.TrafficTreatment;
50import org.onosproject.net.flow.instructions.ExtensionTreatment;
51import org.osgi.service.component.annotations.Activate;
52import org.osgi.service.component.annotations.Component;
53import org.osgi.service.component.annotations.Deactivate;
54import org.osgi.service.component.annotations.Reference;
55import org.osgi.service.component.annotations.ReferenceCardinality;
56import org.slf4j.Logger;
57
58import java.util.Objects;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
61
Jian Li897d92d2020-11-15 01:02:59 +090062import static java.lang.Thread.sleep;
Jian Li140d8a22019-04-24 23:41:44 +090063import static java.util.concurrent.Executors.newSingleThreadExecutor;
64import static org.onlab.util.Tools.groupedThreads;
65import static org.onosproject.k8snetworking.api.Constants.B_CLASS;
66import static org.onosproject.k8snetworking.api.Constants.DST;
67import static org.onosproject.k8snetworking.api.Constants.EXT_ENTRY_TABLE;
68import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
69import static org.onosproject.k8snetworking.api.Constants.NODE_IP_PREFIX;
70import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
Jian Li6d2ffbf2020-11-04 15:58:18 +090071import static org.onosproject.k8snetworking.api.Constants.PRIORITY_INTER_ROUTING_RULE;
Jian Li140d8a22019-04-24 23:41:44 +090072import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NODE_PORT_RULE;
73import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
74import static org.onosproject.k8snetworking.api.Constants.SRC;
Jian Li6d2ffbf2020-11-04 15:58:18 +090075import static org.onosproject.k8snetworking.api.Constants.TUN_ENTRY_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +090076import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getBclassIpPrefixFromCidr;
77import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
Jian Li140d8a22019-04-24 23:41:44 +090078import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
Jian Li6d2ffbf2020-11-04 15:58:18 +090079import static org.onosproject.k8snode.api.K8sApiConfig.Mode.PASSTHROUGH;
Jian Li140d8a22019-04-24 23:41:44 +090080import static org.slf4j.LoggerFactory.getLogger;
81
82/**
83 * Provides service port exposure using node port.
84 */
85@Component(immediate = true)
86public class K8sNodePortHandler {
87
88 private final Logger log = getLogger(getClass());
89
90 private static final String NODE_PORT_TYPE = "NodePort";
Jian Li897d92d2020-11-15 01:02:59 +090091 private static final String LOAD_BALANCER_TYPE = "LoadBalancer";
Jian Li140d8a22019-04-24 23:41:44 +090092 private static final String TCP = "TCP";
93 private static final String UDP = "UDP";
94 private static final int HOST_CIDR = 32;
95 private static final String SERVICE_CIDR = "serviceCidr";
96 private static final String B_CLASS_SUFFIX = "0.0/16";
Jian Li140d8a22019-04-24 23:41:44 +090097
Jian Li897d92d2020-11-15 01:02:59 +090098 private static final long SLEEP_MS = 3000;
99
Jian Li140d8a22019-04-24 23:41:44 +0900100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected CoreService coreService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected ComponentConfigService configService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected DeviceService deviceService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected DriverService driverService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected LeadershipService leadershipService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY)
116 protected MastershipService mastershipService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY)
119 protected ClusterService clusterService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY)
122 protected K8sNodeService k8sNodeService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
125 protected K8sNetworkService k8sNetworkService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
128 protected K8sServiceService k8sServiceService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
131 protected K8sFlowRuleService k8sFlowRuleService;
132
133 private final InternalK8sServiceListener k8sServiceListener =
134 new InternalK8sServiceListener();
135 private final InternalK8sNodeListener k8sNodeListener =
136 new InternalK8sNodeListener();
137 private final ExecutorService eventExecutor = newSingleThreadExecutor(
138 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
139
140 private ApplicationId appId;
141 private NodeId localNodeId;
142
143 @Activate
144 protected void activate() {
145 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
146
147 localNodeId = clusterService.getLocalNode().id();
148 leadershipService.runForLeadership(appId.name());
149 k8sNodeService.addListener(k8sNodeListener);
150 k8sServiceService.addListener(k8sServiceListener);
151
152 log.info("Started");
153 }
154
155 @Deactivate
156 protected void deactivate() {
157 k8sNodeService.removeListener(k8sNodeListener);
158 k8sServiceService.removeListener(k8sServiceListener);
159 leadershipService.withdraw(appId.name());
160 eventExecutor.shutdown();
161
162 log.info("Stopped");
163 }
164
165 private void processNodePortEvent(K8sNode k8sNode, Service service, boolean install) {
166
167 String clusterIp = service.getSpec().getClusterIP();
168 for (ServicePort servicePort : service.getSpec().getPorts()) {
169 setNodeToServiceRules(k8sNode, clusterIp, servicePort, install);
Jian Li6d2ffbf2020-11-04 15:58:18 +0900170 setServiceToNodeRules(k8sNode, clusterIp, servicePort, install);
Jian Li140d8a22019-04-24 23:41:44 +0900171 }
Jian Li6d2ffbf2020-11-04 15:58:18 +0900172 }
Jian Li140d8a22019-04-24 23:41:44 +0900173
Jian Li6d2ffbf2020-11-04 15:58:18 +0900174 private void setIntgToExtRules(K8sNode k8sNode, String serviceCidr,
175 boolean install) {
176 // for local traffic, we add default flow rules for steering traffic from
177 // integration bridge to external bridge through patch port
178 // for remote traffic, we add default flow rules for steering traffic from
179 // integration bridge to tun bridge through patch port
Jian Li140d8a22019-04-24 23:41:44 +0900180 k8sNodeService.completeNodes().forEach(n -> {
181 String podCidr = k8sNetworkService.network(n.hostname()).cidr();
182 String fullCidr = NODE_IP_PREFIX + "." +
183 podCidr.split("\\.")[2] + "." + B_CLASS_SUFFIX;
184
Jian Li6d2ffbf2020-11-04 15:58:18 +0900185 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
186 .matchEthType(Ethernet.TYPE_IPV4)
187 .matchIPSrc(IpPrefix.valueOf(serviceCidr))
188 .matchIPDst(IpPrefix.valueOf(fullCidr));
189
190 PortNumber output;
191 if (n.hostname().equals(k8sNode.hostname())) {
192 output = k8sNode.intgToExtPatchPortNum();
Jian Li140d8a22019-04-24 23:41:44 +0900193 } else {
Jian Li6d2ffbf2020-11-04 15:58:18 +0900194 output = k8sNode.intgToTunPortNum();
Jian Li140d8a22019-04-24 23:41:44 +0900195 }
Jian Li6d2ffbf2020-11-04 15:58:18 +0900196
197 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
198 .setOutput(output);
199
200 k8sFlowRuleService.setRule(
201 appId,
202 k8sNode.intgBridge(),
203 sBuilder.build(),
204 tBuilder.build(),
205 PRIORITY_CIDR_RULE,
206 ROUTING_TABLE,
207 install);
Jian Li140d8a22019-04-24 23:41:44 +0900208 });
Jian Li140d8a22019-04-24 23:41:44 +0900209 }
210
Jian Li6d2ffbf2020-11-04 15:58:18 +0900211 private void setTunToIntgRules(K8sNode k8sNode, boolean install) {
212 String podCidr = k8sNetworkService.network(k8sNode.hostname()).cidr();
213 String fullCidr = NODE_IP_PREFIX + "." +
214 podCidr.split("\\.")[2] + "." + B_CLASS_SUFFIX;
Jian Li140d8a22019-04-24 23:41:44 +0900215
Jian Li6d2ffbf2020-11-04 15:58:18 +0900216 TrafficSelector selector = DefaultTrafficSelector.builder()
Jian Li140d8a22019-04-24 23:41:44 +0900217 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li6d2ffbf2020-11-04 15:58:18 +0900218 .matchIPDst(IpPrefix.valueOf(fullCidr))
219 .build();
Jian Li140d8a22019-04-24 23:41:44 +0900220
Jian Li6d2ffbf2020-11-04 15:58:18 +0900221 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
222 .setOutput(k8sNode.tunToIntgPortNum())
223 .build();
Jian Li140d8a22019-04-24 23:41:44 +0900224
225 k8sFlowRuleService.setRule(
226 appId,
Jian Li6d2ffbf2020-11-04 15:58:18 +0900227 k8sNode.tunBridge(),
228 selector,
229 treatment,
230 PRIORITY_INTER_ROUTING_RULE,
231 TUN_ENTRY_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900232 install);
233 }
234
235 private void setNodeToServiceRules(K8sNode k8sNode,
236 String clusterIp,
237 ServicePort servicePort,
238 boolean install) {
239 String protocol = servicePort.getProtocol();
240 int nodePort = servicePort.getNodePort();
241 int svcPort = servicePort.getPort();
242 DeviceId deviceId = k8sNode.extBridge();
243
244 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
245 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li6d2ffbf2020-11-04 15:58:18 +0900246 .matchIPDst(IpPrefix.valueOf(k8sNode.nodeIp(), HOST_CIDR));
Jian Li140d8a22019-04-24 23:41:44 +0900247
248 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
249 .setIpDst(IpAddress.valueOf(clusterIp));
250
251 if (TCP.equals(protocol)) {
252 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
253 .matchTcpDst(TpPort.tpPort(nodePort));
254 tBuilder.setTcpDst(TpPort.tpPort(svcPort));
255 } else if (UDP.equals(protocol)) {
256 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
257 .matchUdpDst(TpPort.tpPort(nodePort));
258 tBuilder.setUdpDst(TpPort.tpPort(svcPort));
259 }
260
261 String podCidr = k8sNetworkService.network(k8sNode.hostname()).cidr();
262 String prefix = NODE_IP_PREFIX + "." + podCidr.split("\\.")[2];
263
264 ExtensionTreatment loadTreatment = buildLoadExtension(
265 deviceService.getDevice(deviceId), B_CLASS, SRC, prefix);
266 tBuilder.extension(loadTreatment, deviceId)
267 .setOutput(k8sNode.extToIntgPatchPortNum());
268
269 k8sFlowRuleService.setRule(
270 appId,
271 k8sNode.extBridge(),
272 sBuilder.build(),
273 tBuilder.build(),
274 PRIORITY_NODE_PORT_RULE,
275 EXT_ENTRY_TABLE,
276 install);
277 }
278
Jian Li6d2ffbf2020-11-04 15:58:18 +0900279 private void setServiceToNodeRules(K8sNode k8sNode,
280 String clusterIp,
281 ServicePort servicePort,
282 boolean install) {
Jian Li140d8a22019-04-24 23:41:44 +0900283 String protocol = servicePort.getProtocol();
284 int nodePort = servicePort.getNodePort();
285 int svcPort = servicePort.getPort();
286 DeviceId deviceId = k8sNode.extBridge();
287
Jian Li6d2ffbf2020-11-04 15:58:18 +0900288 String nodeIp = k8sNode.nodeIp().toString();
289 String nodeIpPrefix = getBclassIpPrefixFromCidr(nodeIp);
Jian Li140d8a22019-04-24 23:41:44 +0900290
Jian Li6d2ffbf2020-11-04 15:58:18 +0900291 if (nodeIpPrefix == null) {
Jian Li140d8a22019-04-24 23:41:44 +0900292 return;
293 }
294
Jian Li140d8a22019-04-24 23:41:44 +0900295 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
296 .matchEthType(Ethernet.TYPE_IPV4)
297 .matchInPort(k8sNode.extToIntgPatchPortNum())
298 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(clusterIp), HOST_CIDR));
299
300 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li6d2ffbf2020-11-04 15:58:18 +0900301 .setIpSrc(k8sNode.nodeIp())
302 .setEthSrc(k8sNode.nodeMac());
Jian Li140d8a22019-04-24 23:41:44 +0900303
304 if (TCP.equals(protocol)) {
305 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
306 .matchTcpSrc(TpPort.tpPort(svcPort));
307 tBuilder.setTcpSrc(TpPort.tpPort(nodePort));
308 } else if (UDP.equals(protocol)) {
309 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
310 .matchUdpSrc(TpPort.tpPort(svcPort));
311 tBuilder.setUdpSrc(TpPort.tpPort(nodePort));
312 }
313
Jian Li140d8a22019-04-24 23:41:44 +0900314 ExtensionTreatment loadTreatment = buildLoadExtension(
Jian Li6d2ffbf2020-11-04 15:58:18 +0900315 deviceService.getDevice(deviceId), B_CLASS, DST, nodeIpPrefix);
316 tBuilder.extension(loadTreatment, deviceId);
317
318 // in passthrough mode, we steer the traffic to the openstack intg bridge
319 // in normal mode, we steer the traffic to the local port
320 if (k8sNode.mode() == PASSTHROUGH) {
321 PortNumber output = k8sNode.portNumByName(k8sNode.extBridge(),
322 k8sNode.k8sExtToOsPatchPortName());
323 if (output == null) {
324 log.warn("Kubernetes external to OpenStack patch port is null");
325 return;
326 }
327 tBuilder.setOutput(output);
328 } else {
329 tBuilder.setOutput(PortNumber.LOCAL);
330 }
Jian Li140d8a22019-04-24 23:41:44 +0900331
332 k8sFlowRuleService.setRule(
333 appId,
334 deviceId,
335 sBuilder.build(),
336 tBuilder.build(),
Jian Li6d2ffbf2020-11-04 15:58:18 +0900337 PRIORITY_NODE_PORT_RULE,
Jian Li140d8a22019-04-24 23:41:44 +0900338 EXT_ENTRY_TABLE,
339 install);
340 }
341
342 private String getServiceCidr() {
343 Set<ConfigProperty> properties =
344 configService.getProperties(K8sServiceHandler.class.getName());
345 return getPropertyValue(properties, SERVICE_CIDR);
346 }
347
348 private class InternalK8sServiceListener implements K8sServiceListener {
349
350 private boolean isRelevantHelper() {
351 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
352 }
353
354 @Override
355 public void event(K8sServiceEvent event) {
356 switch (event.type()) {
357 case K8S_SERVICE_CREATED:
358 case K8S_SERVICE_UPDATED:
359 eventExecutor.execute(() -> processServiceCreation(event.subject()));
360 break;
361 default:
362 break;
363 }
364 }
365
366 private void processServiceCreation(Service service) {
367 if (!isRelevantHelper()) {
368 return;
369 }
370
Jian Li897d92d2020-11-15 01:02:59 +0900371 if (NODE_PORT_TYPE.equals(service.getSpec().getType()) ||
372 LOAD_BALANCER_TYPE.equals(service.getSpec().getType())) {
373 k8sNodeService.completeNodes().forEach(n -> {
374 // we need to wait, until we resolve the valid MAC address of the node
375 while (k8sNodeService.node(n.hostname()).nodeMac() == null) {
376 log.warn("Node {} MAC address is not resolved, " +
377 "wait until resolving it", n.hostname());
378 try {
379 sleep(SLEEP_MS);
380 } catch (InterruptedException e) {
381 log.error("Exception caused by", e);
382 }
383 }
384 K8sNode updatedNode = k8sNodeService.node(n.hostname());
385 processNodePortEvent(updatedNode, service, true);
386 });
Jian Li140d8a22019-04-24 23:41:44 +0900387 }
388 }
389 }
390
391 private class InternalK8sNodeListener implements K8sNodeListener {
392
393 private boolean isRelevantHelper() {
394 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
395 }
396
397 @Override
398 public void event(K8sNodeEvent event) {
399 switch (event.type()) {
400 case K8S_NODE_COMPLETE:
401 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
402 break;
403 case K8S_NODE_INCOMPLETE:
404 default:
405 break;
406 }
407 }
408
409 private void processNodeCompletion(K8sNode k8sNode) {
410 if (!isRelevantHelper()) {
411 return;
412 }
413
Jian Li897d92d2020-11-15 01:02:59 +0900414 // we need to wait, until we resolve the valid MAC address of the node
415 while (k8sNodeService.node(k8sNode.hostname()).nodeMac() == null) {
416 log.warn("Node {} MAC address is not resolved, " +
417 "wait until resolving it", k8sNode.hostname());
418 try {
419 sleep(SLEEP_MS);
420 } catch (InterruptedException e) {
421 log.error("Exception caused by", e);
422 }
423 }
Jian Li6d2ffbf2020-11-04 15:58:18 +0900424
Jian Li897d92d2020-11-15 01:02:59 +0900425 K8sNode updatedNode = k8sNodeService.node(k8sNode.hostname());
426
427 k8sServiceService.services().stream()
428 .filter(s -> NODE_PORT_TYPE.equals(s.getSpec().getType()) ||
429 LOAD_BALANCER_TYPE.equals(s.getSpec().getType()))
430 .forEach(s -> processNodePortEvent(updatedNode, s, true));
431
432 setIntgToExtRules(updatedNode, getServiceCidr(), true);
433 setTunToIntgRules(updatedNode, true);
Jian Li140d8a22019-04-24 23:41:44 +0900434 }
Jian Li140d8a22019-04-24 23:41:44 +0900435 }
436}