blob: c87dfe72b1830f9282ff7006fda30b82a50452cf [file] [log] [blame]
Jian Li140d8a22019-04-24 23:41:44 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Service;
19import io.fabric8.kubernetes.api.model.ServicePort;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.IPv4;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.IpPrefix;
24import org.onlab.packet.TpPort;
25import org.onosproject.cfg.ComponentConfigService;
26import org.onosproject.cfg.ConfigProperty;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.k8snetworking.api.K8sFlowRuleService;
33import org.onosproject.k8snetworking.api.K8sNetworkService;
34import org.onosproject.k8snetworking.api.K8sServiceEvent;
35import org.onosproject.k8snetworking.api.K8sServiceListener;
36import org.onosproject.k8snetworking.api.K8sServiceService;
37import org.onosproject.k8snode.api.K8sNode;
38import org.onosproject.k8snode.api.K8sNodeEvent;
39import org.onosproject.k8snode.api.K8sNodeListener;
40import org.onosproject.k8snode.api.K8sNodeService;
41import org.onosproject.mastership.MastershipService;
42import org.onosproject.net.DeviceId;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.device.DeviceService;
45import org.onosproject.net.driver.DriverService;
46import org.onosproject.net.flow.DefaultTrafficSelector;
47import org.onosproject.net.flow.DefaultTrafficTreatment;
48import org.onosproject.net.flow.TrafficSelector;
49import org.onosproject.net.flow.TrafficTreatment;
50import org.onosproject.net.flow.instructions.ExtensionTreatment;
51import org.osgi.service.component.annotations.Activate;
52import org.osgi.service.component.annotations.Component;
53import org.osgi.service.component.annotations.Deactivate;
54import org.osgi.service.component.annotations.Reference;
55import org.osgi.service.component.annotations.ReferenceCardinality;
56import org.slf4j.Logger;
57
58import java.util.Objects;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
61
62import static java.util.concurrent.Executors.newSingleThreadExecutor;
63import static org.onlab.util.Tools.groupedThreads;
64import static org.onosproject.k8snetworking.api.Constants.B_CLASS;
65import static org.onosproject.k8snetworking.api.Constants.DST;
66import static org.onosproject.k8snetworking.api.Constants.EXT_ENTRY_TABLE;
67import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
68import static org.onosproject.k8snetworking.api.Constants.NODE_IP_PREFIX;
69import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
Jian Li6d2ffbf2020-11-04 15:58:18 +090070import static org.onosproject.k8snetworking.api.Constants.PRIORITY_INTER_ROUTING_RULE;
Jian Li140d8a22019-04-24 23:41:44 +090071import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NODE_PORT_RULE;
72import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
73import static org.onosproject.k8snetworking.api.Constants.SRC;
Jian Li6d2ffbf2020-11-04 15:58:18 +090074import static org.onosproject.k8snetworking.api.Constants.TUN_ENTRY_TABLE;
Jian Li140d8a22019-04-24 23:41:44 +090075import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getBclassIpPrefixFromCidr;
76import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
Jian Li140d8a22019-04-24 23:41:44 +090077import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
Jian Li6d2ffbf2020-11-04 15:58:18 +090078import static org.onosproject.k8snode.api.K8sApiConfig.Mode.PASSTHROUGH;
Jian Li140d8a22019-04-24 23:41:44 +090079import static org.slf4j.LoggerFactory.getLogger;
80
81/**
82 * Provides service port exposure using node port.
83 */
84@Component(immediate = true)
85public class K8sNodePortHandler {
86
87 private final Logger log = getLogger(getClass());
88
89 private static final String NODE_PORT_TYPE = "NodePort";
90 private static final String TCP = "TCP";
91 private static final String UDP = "UDP";
92 private static final int HOST_CIDR = 32;
93 private static final String SERVICE_CIDR = "serviceCidr";
94 private static final String B_CLASS_SUFFIX = "0.0/16";
Jian Li140d8a22019-04-24 23:41:44 +090095
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected CoreService coreService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected ComponentConfigService configService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected DeviceService deviceService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected DriverService driverService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected LeadershipService leadershipService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected MastershipService mastershipService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected ClusterService clusterService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected K8sNodeService k8sNodeService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected K8sNetworkService k8sNetworkService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
124 protected K8sServiceService k8sServiceService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected K8sFlowRuleService k8sFlowRuleService;
128
129 private final InternalK8sServiceListener k8sServiceListener =
130 new InternalK8sServiceListener();
131 private final InternalK8sNodeListener k8sNodeListener =
132 new InternalK8sNodeListener();
133 private final ExecutorService eventExecutor = newSingleThreadExecutor(
134 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
135
136 private ApplicationId appId;
137 private NodeId localNodeId;
138
139 @Activate
140 protected void activate() {
141 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
142
143 localNodeId = clusterService.getLocalNode().id();
144 leadershipService.runForLeadership(appId.name());
145 k8sNodeService.addListener(k8sNodeListener);
146 k8sServiceService.addListener(k8sServiceListener);
147
148 log.info("Started");
149 }
150
151 @Deactivate
152 protected void deactivate() {
153 k8sNodeService.removeListener(k8sNodeListener);
154 k8sServiceService.removeListener(k8sServiceListener);
155 leadershipService.withdraw(appId.name());
156 eventExecutor.shutdown();
157
158 log.info("Stopped");
159 }
160
161 private void processNodePortEvent(K8sNode k8sNode, Service service, boolean install) {
162
163 String clusterIp = service.getSpec().getClusterIP();
164 for (ServicePort servicePort : service.getSpec().getPorts()) {
165 setNodeToServiceRules(k8sNode, clusterIp, servicePort, install);
Jian Li6d2ffbf2020-11-04 15:58:18 +0900166 setServiceToNodeRules(k8sNode, clusterIp, servicePort, install);
Jian Li140d8a22019-04-24 23:41:44 +0900167 }
Jian Li6d2ffbf2020-11-04 15:58:18 +0900168 }
Jian Li140d8a22019-04-24 23:41:44 +0900169
Jian Li6d2ffbf2020-11-04 15:58:18 +0900170 private void setIntgToExtRules(K8sNode k8sNode, String serviceCidr,
171 boolean install) {
172 // for local traffic, we add default flow rules for steering traffic from
173 // integration bridge to external bridge through patch port
174 // for remote traffic, we add default flow rules for steering traffic from
175 // integration bridge to tun bridge through patch port
Jian Li140d8a22019-04-24 23:41:44 +0900176 k8sNodeService.completeNodes().forEach(n -> {
177 String podCidr = k8sNetworkService.network(n.hostname()).cidr();
178 String fullCidr = NODE_IP_PREFIX + "." +
179 podCidr.split("\\.")[2] + "." + B_CLASS_SUFFIX;
180
Jian Li6d2ffbf2020-11-04 15:58:18 +0900181 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
182 .matchEthType(Ethernet.TYPE_IPV4)
183 .matchIPSrc(IpPrefix.valueOf(serviceCidr))
184 .matchIPDst(IpPrefix.valueOf(fullCidr));
185
186 PortNumber output;
187 if (n.hostname().equals(k8sNode.hostname())) {
188 output = k8sNode.intgToExtPatchPortNum();
Jian Li140d8a22019-04-24 23:41:44 +0900189 } else {
Jian Li6d2ffbf2020-11-04 15:58:18 +0900190 output = k8sNode.intgToTunPortNum();
Jian Li140d8a22019-04-24 23:41:44 +0900191 }
Jian Li6d2ffbf2020-11-04 15:58:18 +0900192
193 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
194 .setOutput(output);
195
196 k8sFlowRuleService.setRule(
197 appId,
198 k8sNode.intgBridge(),
199 sBuilder.build(),
200 tBuilder.build(),
201 PRIORITY_CIDR_RULE,
202 ROUTING_TABLE,
203 install);
Jian Li140d8a22019-04-24 23:41:44 +0900204 });
Jian Li140d8a22019-04-24 23:41:44 +0900205 }
206
Jian Li6d2ffbf2020-11-04 15:58:18 +0900207 private void setTunToIntgRules(K8sNode k8sNode, boolean install) {
208 String podCidr = k8sNetworkService.network(k8sNode.hostname()).cidr();
209 String fullCidr = NODE_IP_PREFIX + "." +
210 podCidr.split("\\.")[2] + "." + B_CLASS_SUFFIX;
Jian Li140d8a22019-04-24 23:41:44 +0900211
Jian Li6d2ffbf2020-11-04 15:58:18 +0900212 TrafficSelector selector = DefaultTrafficSelector.builder()
Jian Li140d8a22019-04-24 23:41:44 +0900213 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li6d2ffbf2020-11-04 15:58:18 +0900214 .matchIPDst(IpPrefix.valueOf(fullCidr))
215 .build();
Jian Li140d8a22019-04-24 23:41:44 +0900216
Jian Li6d2ffbf2020-11-04 15:58:18 +0900217 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
218 .setOutput(k8sNode.tunToIntgPortNum())
219 .build();
Jian Li140d8a22019-04-24 23:41:44 +0900220
221 k8sFlowRuleService.setRule(
222 appId,
Jian Li6d2ffbf2020-11-04 15:58:18 +0900223 k8sNode.tunBridge(),
224 selector,
225 treatment,
226 PRIORITY_INTER_ROUTING_RULE,
227 TUN_ENTRY_TABLE,
Jian Li140d8a22019-04-24 23:41:44 +0900228 install);
229 }
230
231 private void setNodeToServiceRules(K8sNode k8sNode,
232 String clusterIp,
233 ServicePort servicePort,
234 boolean install) {
235 String protocol = servicePort.getProtocol();
236 int nodePort = servicePort.getNodePort();
237 int svcPort = servicePort.getPort();
238 DeviceId deviceId = k8sNode.extBridge();
239
240 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
241 .matchEthType(Ethernet.TYPE_IPV4)
Jian Li6d2ffbf2020-11-04 15:58:18 +0900242 .matchIPDst(IpPrefix.valueOf(k8sNode.nodeIp(), HOST_CIDR));
Jian Li140d8a22019-04-24 23:41:44 +0900243
244 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
245 .setIpDst(IpAddress.valueOf(clusterIp));
246
247 if (TCP.equals(protocol)) {
248 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
249 .matchTcpDst(TpPort.tpPort(nodePort));
250 tBuilder.setTcpDst(TpPort.tpPort(svcPort));
251 } else if (UDP.equals(protocol)) {
252 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
253 .matchUdpDst(TpPort.tpPort(nodePort));
254 tBuilder.setUdpDst(TpPort.tpPort(svcPort));
255 }
256
257 String podCidr = k8sNetworkService.network(k8sNode.hostname()).cidr();
258 String prefix = NODE_IP_PREFIX + "." + podCidr.split("\\.")[2];
259
260 ExtensionTreatment loadTreatment = buildLoadExtension(
261 deviceService.getDevice(deviceId), B_CLASS, SRC, prefix);
262 tBuilder.extension(loadTreatment, deviceId)
263 .setOutput(k8sNode.extToIntgPatchPortNum());
264
265 k8sFlowRuleService.setRule(
266 appId,
267 k8sNode.extBridge(),
268 sBuilder.build(),
269 tBuilder.build(),
270 PRIORITY_NODE_PORT_RULE,
271 EXT_ENTRY_TABLE,
272 install);
273 }
274
Jian Li6d2ffbf2020-11-04 15:58:18 +0900275 private void setServiceToNodeRules(K8sNode k8sNode,
276 String clusterIp,
277 ServicePort servicePort,
278 boolean install) {
Jian Li140d8a22019-04-24 23:41:44 +0900279 String protocol = servicePort.getProtocol();
280 int nodePort = servicePort.getNodePort();
281 int svcPort = servicePort.getPort();
282 DeviceId deviceId = k8sNode.extBridge();
283
Jian Li6d2ffbf2020-11-04 15:58:18 +0900284 String nodeIp = k8sNode.nodeIp().toString();
285 String nodeIpPrefix = getBclassIpPrefixFromCidr(nodeIp);
Jian Li140d8a22019-04-24 23:41:44 +0900286
Jian Li6d2ffbf2020-11-04 15:58:18 +0900287 if (nodeIpPrefix == null) {
Jian Li140d8a22019-04-24 23:41:44 +0900288 return;
289 }
290
Jian Li140d8a22019-04-24 23:41:44 +0900291 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
292 .matchEthType(Ethernet.TYPE_IPV4)
293 .matchInPort(k8sNode.extToIntgPatchPortNum())
294 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(clusterIp), HOST_CIDR));
295
296 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Jian Li6d2ffbf2020-11-04 15:58:18 +0900297 .setIpSrc(k8sNode.nodeIp())
298 .setEthSrc(k8sNode.nodeMac());
Jian Li140d8a22019-04-24 23:41:44 +0900299
300 if (TCP.equals(protocol)) {
301 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
302 .matchTcpSrc(TpPort.tpPort(svcPort));
303 tBuilder.setTcpSrc(TpPort.tpPort(nodePort));
304 } else if (UDP.equals(protocol)) {
305 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
306 .matchUdpSrc(TpPort.tpPort(svcPort));
307 tBuilder.setUdpSrc(TpPort.tpPort(nodePort));
308 }
309
Jian Li140d8a22019-04-24 23:41:44 +0900310 ExtensionTreatment loadTreatment = buildLoadExtension(
Jian Li6d2ffbf2020-11-04 15:58:18 +0900311 deviceService.getDevice(deviceId), B_CLASS, DST, nodeIpPrefix);
312 tBuilder.extension(loadTreatment, deviceId);
313
314 // in passthrough mode, we steer the traffic to the openstack intg bridge
315 // in normal mode, we steer the traffic to the local port
316 if (k8sNode.mode() == PASSTHROUGH) {
317 PortNumber output = k8sNode.portNumByName(k8sNode.extBridge(),
318 k8sNode.k8sExtToOsPatchPortName());
319 if (output == null) {
320 log.warn("Kubernetes external to OpenStack patch port is null");
321 return;
322 }
323 tBuilder.setOutput(output);
324 } else {
325 tBuilder.setOutput(PortNumber.LOCAL);
326 }
Jian Li140d8a22019-04-24 23:41:44 +0900327
328 k8sFlowRuleService.setRule(
329 appId,
330 deviceId,
331 sBuilder.build(),
332 tBuilder.build(),
Jian Li6d2ffbf2020-11-04 15:58:18 +0900333 PRIORITY_NODE_PORT_RULE,
Jian Li140d8a22019-04-24 23:41:44 +0900334 EXT_ENTRY_TABLE,
335 install);
336 }
337
338 private String getServiceCidr() {
339 Set<ConfigProperty> properties =
340 configService.getProperties(K8sServiceHandler.class.getName());
341 return getPropertyValue(properties, SERVICE_CIDR);
342 }
343
344 private class InternalK8sServiceListener implements K8sServiceListener {
345
346 private boolean isRelevantHelper() {
347 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
348 }
349
350 @Override
351 public void event(K8sServiceEvent event) {
352 switch (event.type()) {
353 case K8S_SERVICE_CREATED:
354 case K8S_SERVICE_UPDATED:
355 eventExecutor.execute(() -> processServiceCreation(event.subject()));
356 break;
357 default:
358 break;
359 }
360 }
361
362 private void processServiceCreation(Service service) {
363 if (!isRelevantHelper()) {
364 return;
365 }
366
367 if (NODE_PORT_TYPE.equals(service.getSpec().getType())) {
368 k8sNodeService.completeNodes().forEach(n ->
369 processNodePortEvent(n, service, true)
370 );
371 }
372 }
373 }
374
375 private class InternalK8sNodeListener implements K8sNodeListener {
376
377 private boolean isRelevantHelper() {
378 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
379 }
380
381 @Override
382 public void event(K8sNodeEvent event) {
383 switch (event.type()) {
384 case K8S_NODE_COMPLETE:
385 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
386 break;
387 case K8S_NODE_INCOMPLETE:
388 default:
389 break;
390 }
391 }
392
393 private void processNodeCompletion(K8sNode k8sNode) {
394 if (!isRelevantHelper()) {
395 return;
396 }
397
398 k8sServiceService.services().stream()
399 .filter(s -> NODE_PORT_TYPE.equals(s.getSpec().getType()))
400 .forEach(s -> processNodePortEvent(k8sNode, s, true));
Jian Li6d2ffbf2020-11-04 15:58:18 +0900401
402 setIntgToExtRules(k8sNode, getServiceCidr(), true);
403 setTunToIntgRules(k8sNode, true);
Jian Li140d8a22019-04-24 23:41:44 +0900404 }
405 }
406}