blob: c8a08024df6c82c0a9928bb9df1898bbfaca01b7 [file] [log] [blame]
Jian Li140d8a22019-04-24 23:41:44 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import io.fabric8.kubernetes.api.model.Service;
19import io.fabric8.kubernetes.api.model.ServicePort;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.IPv4;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.IpPrefix;
24import org.onlab.packet.TpPort;
25import org.onosproject.cfg.ComponentConfigService;
26import org.onosproject.cfg.ConfigProperty;
27import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.k8snetworking.api.K8sFlowRuleService;
33import org.onosproject.k8snetworking.api.K8sNetworkService;
34import org.onosproject.k8snetworking.api.K8sServiceEvent;
35import org.onosproject.k8snetworking.api.K8sServiceListener;
36import org.onosproject.k8snetworking.api.K8sServiceService;
37import org.onosproject.k8snode.api.K8sNode;
38import org.onosproject.k8snode.api.K8sNodeEvent;
39import org.onosproject.k8snode.api.K8sNodeListener;
40import org.onosproject.k8snode.api.K8sNodeService;
41import org.onosproject.mastership.MastershipService;
42import org.onosproject.net.DeviceId;
43import org.onosproject.net.PortNumber;
44import org.onosproject.net.device.DeviceService;
45import org.onosproject.net.driver.DriverService;
46import org.onosproject.net.flow.DefaultTrafficSelector;
47import org.onosproject.net.flow.DefaultTrafficTreatment;
48import org.onosproject.net.flow.TrafficSelector;
49import org.onosproject.net.flow.TrafficTreatment;
50import org.onosproject.net.flow.instructions.ExtensionTreatment;
51import org.osgi.service.component.annotations.Activate;
52import org.osgi.service.component.annotations.Component;
53import org.osgi.service.component.annotations.Deactivate;
54import org.osgi.service.component.annotations.Reference;
55import org.osgi.service.component.annotations.ReferenceCardinality;
56import org.slf4j.Logger;
57
58import java.util.Objects;
59import java.util.Set;
60import java.util.concurrent.ExecutorService;
61
62import static java.util.concurrent.Executors.newSingleThreadExecutor;
63import static org.onlab.util.Tools.groupedThreads;
64import static org.onosproject.k8snetworking.api.Constants.B_CLASS;
65import static org.onosproject.k8snetworking.api.Constants.DST;
66import static org.onosproject.k8snetworking.api.Constants.EXT_ENTRY_TABLE;
67import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
68import static org.onosproject.k8snetworking.api.Constants.NODE_IP_PREFIX;
69import static org.onosproject.k8snetworking.api.Constants.PRIORITY_CIDR_RULE;
70import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NODE_PORT_INTER_RULE;
71import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NODE_PORT_REMOTE_RULE;
72import static org.onosproject.k8snetworking.api.Constants.PRIORITY_NODE_PORT_RULE;
73import static org.onosproject.k8snetworking.api.Constants.ROUTING_TABLE;
74import static org.onosproject.k8snetworking.api.Constants.SRC;
75import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getBclassIpPrefixFromCidr;
76import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
77import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.unshiftIpDomain;
78import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildExtension;
79import static org.onosproject.k8snetworking.util.RulePopulatorUtil.buildLoadExtension;
80import static org.slf4j.LoggerFactory.getLogger;
81
82/**
83 * Provides service port exposure using node port.
84 */
85@Component(immediate = true)
86public class K8sNodePortHandler {
87
88 private final Logger log = getLogger(getClass());
89
90 private static final String NODE_PORT_TYPE = "NodePort";
91 private static final String TCP = "TCP";
92 private static final String UDP = "UDP";
93 private static final int HOST_CIDR = 32;
94 private static final String SERVICE_CIDR = "serviceCidr";
95 private static final String B_CLASS_SUFFIX = "0.0/16";
96 private static final String C_CLASS_SUFFIX = ".0/24";
97
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected CoreService coreService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected ComponentConfigService configService;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected DeviceService deviceService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected DriverService driverService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected LeadershipService leadershipService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected MastershipService mastershipService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected ClusterService clusterService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected K8sNodeService k8sNodeService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
124 protected K8sNetworkService k8sNetworkService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected K8sServiceService k8sServiceService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
130 protected K8sFlowRuleService k8sFlowRuleService;
131
132 private final InternalK8sServiceListener k8sServiceListener =
133 new InternalK8sServiceListener();
134 private final InternalK8sNodeListener k8sNodeListener =
135 new InternalK8sNodeListener();
136 private final ExecutorService eventExecutor = newSingleThreadExecutor(
137 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
138
139 private ApplicationId appId;
140 private NodeId localNodeId;
141
142 @Activate
143 protected void activate() {
144 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
145
146 localNodeId = clusterService.getLocalNode().id();
147 leadershipService.runForLeadership(appId.name());
148 k8sNodeService.addListener(k8sNodeListener);
149 k8sServiceService.addListener(k8sServiceListener);
150
151 log.info("Started");
152 }
153
154 @Deactivate
155 protected void deactivate() {
156 k8sNodeService.removeListener(k8sNodeListener);
157 k8sServiceService.removeListener(k8sServiceListener);
158 leadershipService.withdraw(appId.name());
159 eventExecutor.shutdown();
160
161 log.info("Stopped");
162 }
163
164 private void processNodePortEvent(K8sNode k8sNode, Service service, boolean install) {
165
166 String clusterIp = service.getSpec().getClusterIP();
167 for (ServicePort servicePort : service.getSpec().getPorts()) {
168 setNodeToServiceRules(k8sNode, clusterIp, servicePort, install);
169 setServiceToNodeLocalRules(k8sNode, clusterIp, servicePort, install);
170 setServiceToNodeRemoteRules(k8sNode, clusterIp, servicePort, install);
171 setExtToIngrRules(k8sNode, servicePort, install);
172 }
173
174 k8sNodeService.completeNodes().forEach(n -> {
175 String podCidr = k8sNetworkService.network(n.hostname()).cidr();
176 String fullCidr = NODE_IP_PREFIX + "." +
177 podCidr.split("\\.")[2] + "." + B_CLASS_SUFFIX;
178
179 if (n.equals(k8sNode)) {
180 setIntgToExtLocalRules(k8sNode, getServiceCidr(), fullCidr, install);
181 } else {
182 setIntgToExtRemoteRules(k8sNode, n, getServiceCidr(), fullCidr, install);
183 }
184 });
185
186 setDefaultExtEgrRule(k8sNode, install);
187 }
188
189 private void setDefaultExtEgrRule(K8sNode k8sNode, boolean install) {
190 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
191 .matchInPort(PortNumber.LOCAL)
192 .matchEthType(Ethernet.TYPE_IPV4);
193
194 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
195 .setOutput(k8sNode.extBridgePortNum());
196
197 k8sFlowRuleService.setRule(
198 appId,
199 k8sNode.extBridge(),
200 sBuilder.build(),
201 tBuilder.build(),
202 PRIORITY_NODE_PORT_INTER_RULE,
203 EXT_ENTRY_TABLE,
204 install);
205 }
206
207 private void setIntgToExtLocalRules(K8sNode k8sNode, String serviceCidr,
208 String shiftedCidr, boolean install) {
209 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
210 .matchEthType(Ethernet.TYPE_IPV4)
211 .matchIPSrc(IpPrefix.valueOf(serviceCidr))
212 .matchIPDst(IpPrefix.valueOf(shiftedCidr));
213
214 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
215 .setOutput(k8sNode.intgToExtPatchPortNum());
216
217 k8sFlowRuleService.setRule(
218 appId,
219 k8sNode.intgBridge(),
220 sBuilder.build(),
221 tBuilder.build(),
222 PRIORITY_CIDR_RULE,
223 ROUTING_TABLE,
224 install);
225 }
226
227 private void setIntgToExtRemoteRules(K8sNode k8sNodeLocal, K8sNode k8sNodeRemote,
228 String serviceCidr, String shiftedCidr,
229 boolean install) {
230 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
231 .matchEthType(Ethernet.TYPE_IPV4)
232 .matchIPSrc(IpPrefix.valueOf(serviceCidr))
233 .matchIPDst(IpPrefix.valueOf(shiftedCidr));
234
235 ExtensionTreatment remote = buildExtension(deviceService,
236 k8sNodeLocal.intgBridge(), k8sNodeRemote.dataIp().getIp4Address());
237
238 // TODO: need to consider other network types
239 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
240 .extension(remote, k8sNodeLocal.intgBridge())
241 .setOutput(k8sNodeLocal.vxlanPortNum());
242
243 k8sFlowRuleService.setRule(
244 appId,
245 k8sNodeLocal.intgBridge(),
246 sBuilder.build(),
247 tBuilder.build(),
248 PRIORITY_CIDR_RULE,
249 ROUTING_TABLE,
250 install);
251 }
252
253 private void setExtToIngrRules(K8sNode k8sNode, ServicePort servicePort,
254 boolean install) {
255 String protocol = servicePort.getProtocol();
256 int nodePort = servicePort.getNodePort();
257 DeviceId deviceId = k8sNode.extBridge();
258
259 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
260 .matchEthType(Ethernet.TYPE_IPV4)
261 .matchIPDst(IpPrefix.valueOf(k8sNode.extBridgeIp(), HOST_CIDR));
262
263 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
264 .setOutput(PortNumber.LOCAL);
265
266 if (TCP.equals(protocol)) {
267 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
268 .matchTcpSrc(TpPort.tpPort(nodePort));
269 } else if (UDP.equals(protocol)) {
270 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
271 .matchUdpSrc(TpPort.tpPort(nodePort));
272 }
273
274 k8sFlowRuleService.setRule(
275 appId,
276 deviceId,
277 sBuilder.build(),
278 tBuilder.build(),
279 PRIORITY_NODE_PORT_RULE,
280 EXT_ENTRY_TABLE,
281 install);
282 }
283
284 private void setNodeToServiceRules(K8sNode k8sNode,
285 String clusterIp,
286 ServicePort servicePort,
287 boolean install) {
288 String protocol = servicePort.getProtocol();
289 int nodePort = servicePort.getNodePort();
290 int svcPort = servicePort.getPort();
291 DeviceId deviceId = k8sNode.extBridge();
292
293 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
294 .matchEthType(Ethernet.TYPE_IPV4)
295 .matchIPDst(IpPrefix.valueOf(k8sNode.extBridgeIp(), HOST_CIDR));
296
297 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
298 .setIpDst(IpAddress.valueOf(clusterIp));
299
300 if (TCP.equals(protocol)) {
301 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
302 .matchTcpDst(TpPort.tpPort(nodePort));
303 tBuilder.setTcpDst(TpPort.tpPort(svcPort));
304 } else if (UDP.equals(protocol)) {
305 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
306 .matchUdpDst(TpPort.tpPort(nodePort));
307 tBuilder.setUdpDst(TpPort.tpPort(svcPort));
308 }
309
310 String podCidr = k8sNetworkService.network(k8sNode.hostname()).cidr();
311 String prefix = NODE_IP_PREFIX + "." + podCidr.split("\\.")[2];
312
313 ExtensionTreatment loadTreatment = buildLoadExtension(
314 deviceService.getDevice(deviceId), B_CLASS, SRC, prefix);
315 tBuilder.extension(loadTreatment, deviceId)
316 .setOutput(k8sNode.extToIntgPatchPortNum());
317
318 k8sFlowRuleService.setRule(
319 appId,
320 k8sNode.extBridge(),
321 sBuilder.build(),
322 tBuilder.build(),
323 PRIORITY_NODE_PORT_RULE,
324 EXT_ENTRY_TABLE,
325 install);
326 }
327
328 private void setServiceToNodeLocalRules(K8sNode k8sNode,
329 String clusterIp,
330 ServicePort servicePort,
331 boolean install) {
332 String protocol = servicePort.getProtocol();
333 int nodePort = servicePort.getNodePort();
334 int svcPort = servicePort.getPort();
335 DeviceId deviceId = k8sNode.extBridge();
336
337 String extBridgeIp = k8sNode.extBridgeIp().toString();
338 String extBridgePrefix = getBclassIpPrefixFromCidr(extBridgeIp);
339
340 String podCidr = k8sNetworkService.network(k8sNode.hostname()).cidr();
341 String nodePrefix = NODE_IP_PREFIX + "." + podCidr.split("\\.")[2];
342
343 if (extBridgePrefix == null) {
344 return;
345 }
346
347 String shiftedIp = unshiftIpDomain(extBridgeIp, extBridgePrefix, nodePrefix);
348
349 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
350 .matchEthType(Ethernet.TYPE_IPV4)
351 .matchInPort(k8sNode.extToIntgPatchPortNum())
352 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(clusterIp), HOST_CIDR))
353 .matchIPDst(IpPrefix.valueOf(IpAddress.valueOf(shiftedIp), HOST_CIDR));
354
355 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
356 .setIpSrc(k8sNode.extBridgeIp())
357 .setEthSrc(k8sNode.extBridgeMac());
358
359 if (TCP.equals(protocol)) {
360 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
361 .matchTcpSrc(TpPort.tpPort(svcPort));
362 tBuilder.setTcpSrc(TpPort.tpPort(nodePort));
363 } else if (UDP.equals(protocol)) {
364 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
365 .matchUdpSrc(TpPort.tpPort(svcPort));
366 tBuilder.setUdpSrc(TpPort.tpPort(nodePort));
367 }
368
369 String gatewayIp = k8sNode.extGatewayIp().toString();
370 String gatewayPrefix = getBclassIpPrefixFromCidr(gatewayIp);
371
372 if (gatewayPrefix == null) {
373 return;
374 }
375
376 ExtensionTreatment loadTreatment = buildLoadExtension(
377 deviceService.getDevice(deviceId), B_CLASS, DST, gatewayPrefix);
378 tBuilder.extension(loadTreatment, deviceId)
379 .setOutput(PortNumber.LOCAL);
380
381 k8sFlowRuleService.setRule(
382 appId,
383 deviceId,
384 sBuilder.build(),
385 tBuilder.build(),
386 PRIORITY_NODE_PORT_RULE,
387 EXT_ENTRY_TABLE,
388 install);
389 }
390
391 private void setServiceToNodeRemoteRules(K8sNode k8sNode,
392 String clusterIp,
393 ServicePort servicePort,
394 boolean install) {
395 String protocol = servicePort.getProtocol();
396 int nodePort = servicePort.getNodePort();
397 int svcPort = servicePort.getPort();
398 DeviceId deviceId = k8sNode.extBridge();
399
400 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
401 .matchEthType(Ethernet.TYPE_IPV4)
402 .matchInPort(k8sNode.extToIntgPatchPortNum())
403 .matchIPSrc(IpPrefix.valueOf(IpAddress.valueOf(clusterIp), HOST_CIDR));
404
405 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
406 .setIpSrc(k8sNode.extBridgeIp())
407 .setEthSrc(k8sNode.extBridgeMac());
408
409 if (TCP.equals(protocol)) {
410 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
411 .matchTcpSrc(TpPort.tpPort(svcPort));
412 tBuilder.setTcpSrc(TpPort.tpPort(nodePort));
413 } else if (UDP.equals(protocol)) {
414 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
415 .matchUdpSrc(TpPort.tpPort(svcPort));
416 tBuilder.setUdpSrc(TpPort.tpPort(nodePort));
417 }
418
419 String gatewayIp = k8sNode.extGatewayIp().toString();
420 String prefix = getBclassIpPrefixFromCidr(gatewayIp);
421
422 if (prefix == null) {
423 return;
424 }
425
426 ExtensionTreatment loadTreatment = buildLoadExtension(
427 deviceService.getDevice(deviceId), B_CLASS, DST, prefix);
428 tBuilder.extension(loadTreatment, deviceId)
429 .setOutput(k8sNode.extBridgePortNum());
430
431 k8sFlowRuleService.setRule(
432 appId,
433 deviceId,
434 sBuilder.build(),
435 tBuilder.build(),
436 PRIORITY_NODE_PORT_REMOTE_RULE,
437 EXT_ENTRY_TABLE,
438 install);
439 }
440
441 private String getServiceCidr() {
442 Set<ConfigProperty> properties =
443 configService.getProperties(K8sServiceHandler.class.getName());
444 return getPropertyValue(properties, SERVICE_CIDR);
445 }
446
447 private class InternalK8sServiceListener implements K8sServiceListener {
448
449 private boolean isRelevantHelper() {
450 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
451 }
452
453 @Override
454 public void event(K8sServiceEvent event) {
455 switch (event.type()) {
456 case K8S_SERVICE_CREATED:
457 case K8S_SERVICE_UPDATED:
458 eventExecutor.execute(() -> processServiceCreation(event.subject()));
459 break;
460 default:
461 break;
462 }
463 }
464
465 private void processServiceCreation(Service service) {
466 if (!isRelevantHelper()) {
467 return;
468 }
469
470 if (NODE_PORT_TYPE.equals(service.getSpec().getType())) {
471 k8sNodeService.completeNodes().forEach(n ->
472 processNodePortEvent(n, service, true)
473 );
474 }
475 }
476 }
477
478 private class InternalK8sNodeListener implements K8sNodeListener {
479
480 private boolean isRelevantHelper() {
481 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
482 }
483
484 @Override
485 public void event(K8sNodeEvent event) {
486 switch (event.type()) {
487 case K8S_NODE_COMPLETE:
488 eventExecutor.execute(() -> processNodeCompletion(event.subject()));
489 break;
490 case K8S_NODE_INCOMPLETE:
491 default:
492 break;
493 }
494 }
495
496 private void processNodeCompletion(K8sNode k8sNode) {
497 if (!isRelevantHelper()) {
498 return;
499 }
500
501 k8sServiceService.services().stream()
502 .filter(s -> NODE_PORT_TYPE.equals(s.getSpec().getType()))
503 .forEach(s -> processNodePortEvent(k8sNode, s, true));
504 }
505 }
506}