blob: 0a549cf8f78a5a257d7c1c6891b76f50f43027e2 [file] [log] [blame]
Jian Li140d8a22019-04-24 23:41:44 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Service;
19import io.fabric8.kubernetes.api.model.ServicePort;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.IPv4;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.IpPrefix;
24import org.onlab.packet.TpPort;
25import org.onosproject.cfg.ComponentConfigService;
26import org.onosproject.cfg.ConfigProperty;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.k8snetworking.api.K8sFlowRuleService;
33import org.onosproject.k8snetworking.api.K8sNetworkService;
34import org.onosproject.k8snetworking.api.K8sServiceEvent;
35import org.onosproject.k8snetworking.api.K8sServiceListener;
36import org.onosproject.k8snetworking.api.K8sServiceService;
37import org.onosproject.k8snode.api.K8sNode;
38import org.onosproject.k8snode.api.K8sNodeEvent;
39import org.onosproject.k8snode.api.K8sNodeListener;
40import org.onosproject.k8snode.api.K8sNodeService;
41import org.onosproject.mastership.MastershipService;
42import org.onosproject.net.DeviceId;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.device.DeviceService;
45import org.onosproject.net.driver.DriverService;
46import org.onosproject.net.flow.DefaultTrafficSelector;
47import org.onosproject.net.flow.DefaultTrafficTreatment;
48import org.onosproject.net.flow.TrafficSelector;
49import org.onosproject.net.flow.TrafficTreatment;
50import org.onosproject.net.flow.instructions.ExtensionTreatment;
51import org.osgi.service.component.annotations.Activate;
52import org.osgi.service.component.annotations.Component;
53import org.osgi.service.component.annotations.Deactivate;
54import org.osgi.service.component.annotations.Reference;
55import org.osgi.service.component.annotations.ReferenceCardinality;
56import org.slf4j.Logger;
57
58import java.util.Objects;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
61
62import static java.util.concurrent.Executors.newSingleThreadExecutor;
63import static org.onlab.util.Tools.groupedThreads;
64import static org.onosproject.k8snetworking.api.Constants.B_CLASS;
65import static org.onosproject.k8snetworking.api.Constants.DST;
66import static org.onosproject.k8snetworking.api.Constants.EXT_ENTRY_TABLE;
67import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
68import static org.onosproject.k8snetworking.api.Constants.NODE_IP_PREFIX;
69import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
70import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NODE_PORT_INTER_RULE;
71import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NODE_PORT_REMOTE_RULE;
72import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NODE_PORT_RULE;
73import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
74import static org.onosproject.k8snetworking.api.Constants.SRC;
75import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getBclassIpPrefixFromCidr;
76import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
Jian Libecf1d92019-06-09 20:28:53 +090077import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.tunnelPortNumByNetId;
Jian Li140d8a22019-04-24 23:41:44 +090078import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.unshiftIpDomain;
79import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
80import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
81import static org.slf4j.LoggerFactory.getLogger;
82
83/**
84 * Provides service port exposure using node port.
85 */
86@Component(immediate = true)
87public class K8sNodePortHandler {
88
89 private final Logger log = getLogger(getClass());
90
91 private static final String NODE_PORT_TYPE = "NodePort";
92 private static final String TCP = "TCP";
93 private static final String UDP = "UDP";
94 private static final int HOST_CIDR = 32;
95 private static final String SERVICE_CIDR = "serviceCidr";
96 private static final String B_CLASS_SUFFIX = "0.0/16";
97 private static final String C_CLASS_SUFFIX = ".0/24";
98
99
100 @Reference(cardinality = ReferenceCardinality.MANDATORY)
101 protected CoreService coreService;
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected ComponentConfigService configService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected DeviceService deviceService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected DriverService driverService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected LeadershipService leadershipService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY)
116 protected MastershipService mastershipService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY)
119 protected ClusterService clusterService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY)
122 protected K8sNodeService k8sNodeService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
125 protected K8sNetworkService k8sNetworkService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
128 protected K8sServiceService k8sServiceService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
131 protected K8sFlowRuleService k8sFlowRuleService;
132
133 private final InternalK8sServiceListener k8sServiceListener =
134 new InternalK8sServiceListener();
135 private final InternalK8sNodeListener k8sNodeListener =
136 new InternalK8sNodeListener();
137 private final ExecutorService eventExecutor = newSingleThreadExecutor(
138 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
139
140 private ApplicationId appId;
141 private NodeId localNodeId;
142
143 @Activate
144 protected void activate() {
145 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
146
147 localNodeId = clusterService.getLocalNode().id();
148 leadershipService.runForLeadership(appId.name());
149 k8sNodeService.addListener(k8sNodeListener);
150 k8sServiceService.addListener(k8sServiceListener);
151
152 log.info("Started");
153 }
154
155 @Deactivate
156 protected void deactivate() {
157 k8sNodeService.removeListener(k8sNodeListener);
158 k8sServiceService.removeListener(k8sServiceListener);
159 leadershipService.withdraw(appId.name());
160 eventExecutor.shutdown();
161
162 log.info("Stopped");
163 }
164
165 private void processNodePortEvent(K8sNode k8sNode, Service service, boolean install) {
166
167 String clusterIp = service.getSpec().getClusterIP();
168 for (ServicePort servicePort : service.getSpec().getPorts()) {
169 setNodeToServiceRules(k8sNode, clusterIp, servicePort, install);
170 setServiceToNodeLocalRules(k8sNode, clusterIp, servicePort, install);
171 setServiceToNodeRemoteRules(k8sNode, clusterIp, servicePort, install);
172 setExtToIngrRules(k8sNode, servicePort, install);
173 }
174
175 k8sNodeService.completeNodes().forEach(n -> {
176 String podCidr = k8sNetworkService.network(n.hostname()).cidr();
177 String fullCidr = NODE_IP_PREFIX + "." +
178 podCidr.split("\\.")[2] + "." + B_CLASS_SUFFIX;
179
180 if (n.equals(k8sNode)) {
181 setIntgToExtLocalRules(k8sNode, getServiceCidr(), fullCidr, install);
182 } else {
183 setIntgToExtRemoteRules(k8sNode, n, getServiceCidr(), fullCidr, install);
184 }
185 });
186
187 setDefaultExtEgrRule(k8sNode, install);
188 }
189
190 private void setDefaultExtEgrRule(K8sNode k8sNode, boolean install) {
191 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
192 .matchInPort(PortNumber.LOCAL)
193 .matchEthType(Ethernet.TYPE_IPV4);
194
195 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
196 .setOutput(k8sNode.extBridgePortNum());
197
198 k8sFlowRuleService.setRule(
199 appId,
200 k8sNode.extBridge(),
201 sBuilder.build(),
202 tBuilder.build(),
203 PRIORITY_NODE_PORT_INTER_RULE,
204 EXT_ENTRY_TABLE,
205 install);
206 }
207
208 private void setIntgToExtLocalRules(K8sNode k8sNode, String serviceCidr,
209 String shiftedCidr, boolean install) {
210 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
211 .matchEthType(Ethernet.TYPE_IPV4)
212 .matchIPSrc(IpPrefix.valueOf(serviceCidr))
213 .matchIPDst(IpPrefix.valueOf(shiftedCidr));
214
215 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
216 .setOutput(k8sNode.intgToExtPatchPortNum());
217
218 k8sFlowRuleService.setRule(
219 appId,
220 k8sNode.intgBridge(),
221 sBuilder.build(),
222 tBuilder.build(),
223 PRIORITY_CIDR_RULE,
224 ROUTING_TABLE,
225 install);
226 }
227
228 private void setIntgToExtRemoteRules(K8sNode k8sNodeLocal, K8sNode k8sNodeRemote,
229 String serviceCidr, String shiftedCidr,
230 boolean install) {
231 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
232 .matchEthType(Ethernet.TYPE_IPV4)
233 .matchIPSrc(IpPrefix.valueOf(serviceCidr))
234 .matchIPDst(IpPrefix.valueOf(shiftedCidr));
235
236 ExtensionTreatment remote = buildExtension(deviceService,
237 k8sNodeLocal.intgBridge(), k8sNodeRemote.dataIp().getIp4Address());
238
Jian Libecf1d92019-06-09 20:28:53 +0900239 PortNumber portNumber = tunnelPortNumByNetId(
240 k8sNodeLocal.hostname(), k8sNetworkService, k8sNodeLocal);
241
Jian Li140d8a22019-04-24 23:41:44 +0900242 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
243 .extension(remote, k8sNodeLocal.intgBridge())
Jian Libecf1d92019-06-09 20:28:53 +0900244 .setOutput(portNumber);
Jian Li140d8a22019-04-24 23:41:44 +0900245
246 k8sFlowRuleService.setRule(
247 appId,
248 k8sNodeLocal.intgBridge(),
249 sBuilder.build(),
250 tBuilder.build(),
251 PRIORITY_CIDR_RULE,
252 ROUTING_TABLE,
253 install);
254 }
255
256 private void setExtToIngrRules(K8sNode k8sNode, ServicePort servicePort,
257 boolean install) {
258 String protocol = servicePort.getProtocol();
259 int nodePort = servicePort.getNodePort();
260 DeviceId deviceId = k8sNode.extBridge();
261
262 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
263 .matchEthType(Ethernet.TYPE_IPV4)
264 .matchIPDst(IpPrefix.valueOf(k8sNode.extBridgeIp(), HOST_CIDR));
265
266 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
267 .setOutput(PortNumber.LOCAL);
268
269 if (TCP.equals(protocol)) {
270 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
271 .matchTcpSrc(TpPort.tpPort(nodePort));
272 } else if (UDP.equals(protocol)) {
273 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
274 .matchUdpSrc(TpPort.tpPort(nodePort));
275 }
276
277 k8sFlowRuleService.setRule(
278 appId,
279 deviceId,
280 sBuilder.build(),
281 tBuilder.build(),
282 PRIORITY_NODE_PORT_RULE,
283 EXT_ENTRY_TABLE,
284 install);
285 }
286
287 private void setNodeToServiceRules(K8sNode k8sNode,
288 String clusterIp,
289 ServicePort servicePort,
290 boolean install) {
291 String protocol = servicePort.getProtocol();
292 int nodePort = servicePort.getNodePort();
293 int svcPort = servicePort.getPort();
294 DeviceId deviceId = k8sNode.extBridge();
295
296 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
297 .matchEthType(Ethernet.TYPE_IPV4)
298 .matchIPDst(IpPrefix.valueOf(k8sNode.extBridgeIp(), HOST_CIDR));
299
300 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
301 .setIpDst(IpAddress.valueOf(clusterIp));
302
303 if (TCP.equals(protocol)) {
304 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
305 .matchTcpDst(TpPort.tpPort(nodePort));
306 tBuilder.setTcpDst(TpPort.tpPort(svcPort));
307 } else if (UDP.equals(protocol)) {
308 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
309 .matchUdpDst(TpPort.tpPort(nodePort));
310 tBuilder.setUdpDst(TpPort.tpPort(svcPort));
311 }
312
313 String podCidr = k8sNetworkService.network(k8sNode.hostname()).cidr();
314 String prefix = NODE_IP_PREFIX + "." + podCidr.split("\\.")[2];
315
316 ExtensionTreatment loadTreatment = buildLoadExtension(
317 deviceService.getDevice(deviceId), B_CLASS, SRC, prefix);
318 tBuilder.extension(loadTreatment, deviceId)
319 .setOutput(k8sNode.extToIntgPatchPortNum());
320
321 k8sFlowRuleService.setRule(
322 appId,
323 k8sNode.extBridge(),
324 sBuilder.build(),
325 tBuilder.build(),
326 PRIORITY_NODE_PORT_RULE,
327 EXT_ENTRY_TABLE,
328 install);
329 }
330
331 private void setServiceToNodeLocalRules(K8sNode k8sNode,
332 String clusterIp,
333 ServicePort servicePort,
334 boolean install) {
335 String protocol = servicePort.getProtocol();
336 int nodePort = servicePort.getNodePort();
337 int svcPort = servicePort.getPort();
338 DeviceId deviceId = k8sNode.extBridge();
339
340 String extBridgeIp = k8sNode.extBridgeIp().toString();
341 String extBridgePrefix = getBclassIpPrefixFromCidr(extBridgeIp);
342
343 String podCidr = k8sNetworkService.network(k8sNode.hostname()).cidr();
344 String nodePrefix = NODE_IP_PREFIX + "." + podCidr.split("\\.")[2];
345
346 if (extBridgePrefix == null) {
347 return;
348 }
349
350 String shiftedIp = unshiftIpDomain(extBridgeIp, extBridgePrefix, nodePrefix);
351
352 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
353 .matchEthType(Ethernet.TYPE_IPV4)
354 .matchInPort(k8sNode.extToIntgPatchPortNum())
355 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(clusterIp), HOST_CIDR))
356 .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(shiftedIp), HOST_CIDR));
357
358 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
359 .setIpSrc(k8sNode.extBridgeIp())
360 .setEthSrc(k8sNode.extBridgeMac());
361
362 if (TCP.equals(protocol)) {
363 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
364 .matchTcpSrc(TpPort.tpPort(svcPort));
365 tBuilder.setTcpSrc(TpPort.tpPort(nodePort));
366 } else if (UDP.equals(protocol)) {
367 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
368 .matchUdpSrc(TpPort.tpPort(svcPort));
369 tBuilder.setUdpSrc(TpPort.tpPort(nodePort));
370 }
371
372 String gatewayIp = k8sNode.extGatewayIp().toString();
373 String gatewayPrefix = getBclassIpPrefixFromCidr(gatewayIp);
374
375 if (gatewayPrefix == null) {
376 return;
377 }
378
379 ExtensionTreatment loadTreatment = buildLoadExtension(
380 deviceService.getDevice(deviceId), B_CLASS, DST, gatewayPrefix);
381 tBuilder.extension(loadTreatment, deviceId)
382 .setOutput(PortNumber.LOCAL);
383
384 k8sFlowRuleService.setRule(
385 appId,
386 deviceId,
387 sBuilder.build(),
388 tBuilder.build(),
389 PRIORITY_NODE_PORT_RULE,
390 EXT_ENTRY_TABLE,
391 install);
392 }
393
394 private void setServiceToNodeRemoteRules(K8sNode k8sNode,
395 String clusterIp,
396 ServicePort servicePort,
397 boolean install) {
398 String protocol = servicePort.getProtocol();
399 int nodePort = servicePort.getNodePort();
400 int svcPort = servicePort.getPort();
401 DeviceId deviceId = k8sNode.extBridge();
402
403 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
404 .matchEthType(Ethernet.TYPE_IPV4)
405 .matchInPort(k8sNode.extToIntgPatchPortNum())
406 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(clusterIp), HOST_CIDR));
407
408 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
409 .setIpSrc(k8sNode.extBridgeIp())
410 .setEthSrc(k8sNode.extBridgeMac());
411
412 if (TCP.equals(protocol)) {
413 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
414 .matchTcpSrc(TpPort.tpPort(svcPort));
415 tBuilder.setTcpSrc(TpPort.tpPort(nodePort));
416 } else if (UDP.equals(protocol)) {
417 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
418 .matchUdpSrc(TpPort.tpPort(svcPort));
419 tBuilder.setUdpSrc(TpPort.tpPort(nodePort));
420 }
421
422 String gatewayIp = k8sNode.extGatewayIp().toString();
423 String prefix = getBclassIpPrefixFromCidr(gatewayIp);
424
425 if (prefix == null) {
426 return;
427 }
428
429 ExtensionTreatment loadTreatment = buildLoadExtension(
430 deviceService.getDevice(deviceId), B_CLASS, DST, prefix);
431 tBuilder.extension(loadTreatment, deviceId)
432 .setOutput(k8sNode.extBridgePortNum());
433
434 k8sFlowRuleService.setRule(
435 appId,
436 deviceId,
437 sBuilder.build(),
438 tBuilder.build(),
439 PRIORITY_NODE_PORT_REMOTE_RULE,
440 EXT_ENTRY_TABLE,
441 install);
442 }
443
444 private String getServiceCidr() {
445 Set<ConfigProperty> properties =
446 configService.getProperties(K8sServiceHandler.class.getName());
447 return getPropertyValue(properties, SERVICE_CIDR);
448 }
449
450 private class InternalK8sServiceListener implements K8sServiceListener {
451
452 private boolean isRelevantHelper() {
453 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
454 }
455
456 @Override
457 public void event(K8sServiceEvent event) {
458 switch (event.type()) {
459 case K8S_SERVICE_CREATED:
460 case K8S_SERVICE_UPDATED:
461 eventExecutor.execute(() -> processServiceCreation(event.subject()));
462 break;
463 default:
464 break;
465 }
466 }
467
468 private void processServiceCreation(Service service) {
469 if (!isRelevantHelper()) {
470 return;
471 }
472
473 if (NODE_PORT_TYPE.equals(service.getSpec().getType())) {
474 k8sNodeService.completeNodes().forEach(n ->
475 processNodePortEvent(n, service, true)
476 );
477 }
478 }
479 }
480
481 private class InternalK8sNodeListener implements K8sNodeListener {
482
483 private boolean isRelevantHelper() {
484 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
485 }
486
487 @Override
488 public void event(K8sNodeEvent event) {
489 switch (event.type()) {
490 case K8S_NODE_COMPLETE:
491 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
492 break;
493 case K8S_NODE_INCOMPLETE:
494 default:
495 break;
496 }
497 }
498
499 private void processNodeCompletion(K8sNode k8sNode) {
500 if (!isRelevantHelper()) {
501 return;
502 }
503
504 k8sServiceService.services().stream()
505 .filter(s -> NODE_PORT_TYPE.equals(s.getSpec().getType()))
506 .forEach(s -> processNodePortEvent(k8sNode, s, true));
507 }
508 }
509}