blob: 7b1c733f25a350ad52cf55d059f212c62983d7a0 [file] [log] [blame]
Daniel Park5ff76b72022-09-26 22:58:53 +09001/*
2 * Copyright 2022-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import org.onlab.packet.ARP;
19import org.onlab.packet.EthType;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.IPv4;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.MacAddress;
24import org.onlab.packet.TpPort;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.kubevirtnetworking.api.KubernetesExternalLb;
31import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbAdminService;
32import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbEvent;
33import org.onosproject.kubevirtnetworking.api.KubernetesExternalLbListener;
34import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
35import org.onosproject.kubevirtnetworking.api.KubevirtGroupRuleService;
36import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
37import org.onosproject.kubevirtnode.api.KubernetesExternalLbInterface;
38import org.onosproject.kubevirtnode.api.KubevirtApiConfigService;
39import org.onosproject.kubevirtnode.api.KubevirtNode;
40import org.onosproject.kubevirtnode.api.KubevirtNodeService;
41import org.onosproject.net.Device;
42import org.onosproject.net.PortNumber;
43import org.onosproject.net.device.DeviceService;
44import org.onosproject.net.driver.DriverService;
45import org.onosproject.net.flow.DefaultTrafficSelector;
46import org.onosproject.net.flow.DefaultTrafficTreatment;
47import org.onosproject.net.flow.TrafficSelector;
48import org.onosproject.net.flow.TrafficTreatment;
49import org.onosproject.net.flow.instructions.ExtensionTreatment;
50import org.onosproject.net.packet.PacketService;
51import org.osgi.service.component.annotations.Activate;
52import org.osgi.service.component.annotations.Component;
53import org.osgi.service.component.annotations.Deactivate;
54import org.osgi.service.component.annotations.Reference;
55import org.osgi.service.component.annotations.ReferenceCardinality;
56import org.slf4j.Logger;
57
58import java.util.Objects;
59import java.util.concurrent.ExecutorService;
60
61import static java.util.concurrent.Executors.newSingleThreadExecutor;
62import static org.onlab.util.Tools.groupedThreads;
63import static org.onosproject.kubevirtnetworking.api.Constants.GW_DROP_TABLE;
64import static org.onosproject.kubevirtnetworking.api.Constants.GW_ENTRY_TABLE;
65import static org.onosproject.kubevirtnetworking.api.Constants.KUBERNETES_EXTERNAL_LB_FAKE_MAC;
66import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
67import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
68import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ELB_DOWNSTREAM_RULE;
69import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ELB_UPSTREAM_RULE;
70import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.elbPatchPortNum;
71import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.externalPatchPortNum;
72import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.kubernetesElbMac;
73import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.CT_NAT_SRC_FLAG;
74import static org.slf4j.LoggerFactory.getLogger;
75
76/**
77 * Handles Kubernetes External load balancer.
78 */
79@Component(immediate = true)
80public class KubernetesExternalLbHandler {
81 protected final Logger log = getLogger(getClass());
82
83 private static final int TP_PORT_MINIMUM_NUM = 1025;
84 private static final int TP_PORT_MAXIMUM_NUM = 65535;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected CoreService coreService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected ClusterService clusterService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected LeadershipService leadershipService;
94
95 @Reference(cardinality = ReferenceCardinality.MANDATORY)
96 protected KubevirtApiConfigService configService;
97
98 @Reference(cardinality = ReferenceCardinality.MANDATORY)
99 protected KubevirtNodeService nodeService;
100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected KubevirtGroupRuleService groupRuleService;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected DeviceService deviceService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected PacketService packetService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected KubevirtFlowRuleService flowService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
114 protected KubernetesExternalLbAdminService externalLbService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
117 protected DriverService driverService;
118
119 private final ExecutorService eventExecutor = newSingleThreadExecutor(
120 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
121
122 private ApplicationId appId;
123 private NodeId localNodeId;
124
125 private final InternalKubernetesExternalLbListener lbListener =
126 new InternalKubernetesExternalLbListener();
127
128 @Activate
129 protected void activate() {
130 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
131 localNodeId = clusterService.getLocalNode().id();
132 leadershipService.runForLeadership(appId.name());
133 externalLbService.addListener(lbListener);
134
135 log.info("Started");
136 }
137
138 @Deactivate
139 protected void deactivate() {
140 leadershipService.withdraw(appId.name());
141 externalLbService.removeListener(lbListener);
142
143 eventExecutor.shutdown();
144
145 log.info("Stopped");
146 }
147
148
149 private class InternalKubernetesExternalLbListener implements KubernetesExternalLbListener {
150 private boolean isRelevantHelper() {
151 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
152 }
153
154 @Override
155 public void event(KubernetesExternalLbEvent event) {
156 switch (event.type()) {
157 case KUBERNETES_EXTERNAL_LOAD_BALANCER_CREATED:
158 case KUBERNETES_EXTERNAL_LOAD_BALANCER_UPDATED:
159 eventExecutor.execute(() -> processKubernetesExternalLbCreatedOrUpdated(
160 event.subject()));
161 break;
162 case KUBERNETES_EXTERNAL_LOAD_BALANCER_GATEWAY_CHANGED:
163 eventExecutor.execute(() -> processKubernetesExternalLbGatewayChanged(
164 event.subject(), event.oldGateway()));
165 break;
166 case KUBERNETES_EXTERNAL_LOAD_BALANCER_WORKER_CHANGED:
167 eventExecutor.execute(() -> processKubernetesExternalLbWorkerChanged(
168 event.subject(), event.oldWorker()));
169 break;
170 case KUBERNETES_EXTERNAL_LOAD_BALANCER_REMOVED:
171 eventExecutor.execute(() -> processKubernetesExternalLbRemoved(
172 event.subject()));
173 break;
174 default:
175 //do nothing
176 break;
177 }
178 }
179
180 private void processKubernetesExternalLbCreatedOrUpdated(KubernetesExternalLb lb) {
181 if (!isRelevantHelper()) {
182 return;
183 }
184
185 if (lb.electedGateway() == null || lb.electedWorker() == null) {
186 log.warn("processKubernetesExternalLbCreatedOrUpdated called but electedGateway " +
187 "or electedWorker is null. Stop this task.");
188 return;
189 }
190
191 log.info("processKubernetesExternalLbCreatedOrUpdated and updated elb with elecedGateway: {}", lb);
192
193 setExternalLbRulesForService(lb, true);
194 }
195
196 private void processKubernetesExternalLbGatewayChanged(KubernetesExternalLb lb, String oldGatway) {
197 if (!isRelevantHelper()) {
198 return;
199 }
200
201 if (lb.electedWorker() == null || oldGatway == null) {
202 return;
203 }
204
205 log.info("processKubernetesExternalLbGatewayChanged with oldateway: {}", oldGatway);
206
207 setExternalLbRulesForService(lb.updateElectedGateway(oldGatway), false);
208
209 setExternalLbRulesForService(lb, true);
210 }
211
212 private void processKubernetesExternalLbWorkerChanged(KubernetesExternalLb lb, String oldWorker) {
213 if (!isRelevantHelper()) {
214 return;
215 }
216
217 if (lb.electedGateway() == null || oldWorker == null) {
218 return;
219 }
220
221 log.info("processKubernetesExternalLbWorkerChanged with oldworker: {}", oldWorker);
222
223 setExternalLbRulesForService(lb.updateElectedWorker(oldWorker), false);
224
225 setExternalLbRulesForService(lb, true);
226 }
227
228
229 private void processKubernetesExternalLbRemoved(KubernetesExternalLb lb) {
230 if (!isRelevantHelper()) {
231 return;
232 }
233
234 if (lb.electedGateway() == null) {
235 return;
236 }
237
238 setExternalLbRulesForService(lb, false);
239 }
240 }
241
242 private void setExternalLbRulesForService(KubernetesExternalLb lb, boolean install) {
243 if (lb.electedGateway() == null) {
244 return;
245 }
246
247 KubevirtNode gateway = nodeService.node(lb.electedGateway());
248
249 if (gateway == null) {
250 return;
251 }
252
253 setLoadbalanceIpArpResponseRules(lb, gateway, install);
254 setDownstreamRules(lb, gateway, install);
255 setUpstreamRules(lb, gateway, install);
256 }
257
258 private void setLoadbalanceIpArpResponseRules(KubernetesExternalLb lb, KubevirtNode gateway, boolean install) {
259
260 IpAddress loadBalancerIp = lb.loadBalancerIp();
261
262 if (loadBalancerIp == null) {
263 return;
264 }
265
266 TrafficSelector selector = DefaultTrafficSelector.builder()
267 .matchInPort(externalPatchPortNum(deviceService, gateway))
268 .matchEthType(EthType.EtherType.ARP.ethType().toShort())
269 .matchArpOp(ARP.OP_REQUEST)
270 .matchArpTpa(loadBalancerIp.getIp4Address())
271 .build();
272
273 Device device = deviceService.getDevice(gateway.intgBridge());
274
275 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
276 .extension(RulePopulatorUtil.buildMoveEthSrcToDstExtension(device), device.id())
277 .extension(RulePopulatorUtil.buildMoveArpShaToThaExtension(device), device.id())
278 .extension(RulePopulatorUtil.buildMoveArpSpaToTpaExtension(device), device.id())
279 .setArpOp(ARP.OP_REPLY)
280 .setEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
281 .setArpSha(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
282 .setArpSpa(loadBalancerIp.getIp4Address())
283 .setOutput(PortNumber.IN_PORT)
284 .build();
285
286 flowService.setRule(
287 appId,
288 gateway.intgBridge(),
289 selector,
290 treatment,
291 PRIORITY_ARP_GATEWAY_RULE,
292 GW_ENTRY_TABLE,
293 install);
294 }
295
296 private void setDownstreamRules(KubernetesExternalLb lb, KubevirtNode gateway, boolean install) {
297
298 IpAddress loadBalancerIp = lb.loadBalancerIp();
299
300 if (loadBalancerIp == null) {
301 log.warn("setDownstreamRules called but loadBalancerIp is null. Stop this task.");
302 return;
303 }
304
305 MacAddress elbIntfMac = kubernetesElbMac(deviceService, gateway);
306 if (elbIntfMac == null) {
307 log.warn("setDownstreamRules called but elbIntfMac is null. Stop this task.");
308 return;
309 }
310
311 PortNumber elbBridgePortNum = elbPatchPortNum(deviceService, gateway);
312 if (elbBridgePortNum == null) {
313 log.warn("setDownstreamRules called but elbBridgePortNum is null. Stop this task.");
314 return;
315 }
316
317 KubernetesExternalLbInterface externalLbInterface = gateway.kubernetesExternalLbInterface();
318 if (externalLbInterface == null) {
319 log.warn("setDownstreamRules called but externalLbInterface is null. Stop this task.");
320 return;
321 }
322
323 KubevirtNode electedWorker = nodeService.node(lb.electedWorker());
324 if (electedWorker == null) {
325 log.warn("setDownstreamRules called but electedWorker is null. Stop this task.");
326 return;
327 }
328
329 lb.nodePortSet().forEach(nodeport -> {
330 TrafficSelector selector = DefaultTrafficSelector.builder()
331 .matchEthType(Ethernet.TYPE_IPV4)
332 .matchEthDst(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
333 .matchIPDst(loadBalancerIp.toIpPrefix())
334 .matchIPProtocol(IPv4.PROTOCOL_TCP)
335 .matchTcpDst(TpPort.tpPort(nodeport.intValue()))
336 .build();
337
338 ExtensionTreatment natTreatment = RulePopulatorUtil
339 .niciraConnTrackTreatmentBuilder(driverService, gateway.intgBridge())
340 .commit(true)
341 .natFlag(CT_NAT_SRC_FLAG)
342 .natAction(true)
343 .natIp(externalLbInterface.externalLbIp())
344 .natPortMin(TpPort.tpPort(TP_PORT_MINIMUM_NUM))
345 .natPortMax(TpPort.tpPort(TP_PORT_MAXIMUM_NUM))
346 .build();
347
348
349 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
350 .extension(natTreatment, gateway.intgBridge())
351 .setEthSrc(elbIntfMac)
352 .setEthDst(externalLbInterface.externalLbGwMac())
353 .setIpDst(electedWorker.dataIp())
354 .setOutput(elbBridgePortNum)
355 .build();
356
357 flowService.setRule(
358 appId,
359 gateway.intgBridge(),
360 selector,
361 treatment,
362 PRIORITY_ELB_DOWNSTREAM_RULE,
363 GW_ENTRY_TABLE,
364 install);
365 });
366 }
367
368 private void setUpstreamRules(KubernetesExternalLb lb, KubevirtNode gateway, boolean install) {
369 IpAddress loadBalancerIp = lb.loadBalancerIp();
370
371 if (loadBalancerIp == null) {
372 log.warn("setUpstreamRules called but loadBalancerIp is null. Stop this task.");
373 return;
374 }
375
376 MacAddress elbIntfMac = kubernetesElbMac(deviceService, gateway);
377 if (elbIntfMac == null) {
378 log.warn("setUpstreamRules called but elbIntfMac is null. Stop this task.");
379 return;
380 }
381
382 PortNumber elbBridgePortNum = elbPatchPortNum(deviceService, gateway);
383 if (elbBridgePortNum == null) {
384 log.warn("setUpstreamRules called but elbBridgePortNum is null. Stop this task.");
385 return;
386 }
387
388 PortNumber externalPatchPortNum = externalPatchPortNum(deviceService, gateway);
389 if (externalPatchPortNum == null) {
390 log.warn("setUpstreamRules called but externalPatchPortNum is null. Stop this task.");
391 return;
392 }
393
394 KubernetesExternalLbInterface externalLbInterface = gateway.kubernetesExternalLbInterface();
395 if (externalLbInterface == null) {
396 log.warn("setUpstreamRules called but externalLbInterface is null. Stop this task.");
397 return;
398 }
399
400
401 KubevirtNode electedWorker = nodeService.node(lb.electedWorker());
402 if (electedWorker == null) {
403 log.warn("setDownstreamRules called but electedWorker is null. Stop this task.");
404 return;
405 }
406
407 lb.nodePortSet().forEach(nodePort -> {
408 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
409 .matchEthType(Ethernet.TYPE_IPV4)
410 .matchIPSrc(electedWorker.dataIp().toIpPrefix())
411 .matchIPDst(externalLbInterface.externalLbIp().toIpPrefix())
412 .matchIPProtocol(IPv4.PROTOCOL_TCP)
413 .matchTcpSrc(TpPort.tpPort(nodePort.intValue()));
414
415 ExtensionTreatment natTreatment = RulePopulatorUtil
416 .niciraConnTrackTreatmentBuilder(driverService, gateway.intgBridge())
417 .commit(false)
418 .natAction(true)
419 .table((short) GW_DROP_TABLE)
420 .build();
421
422 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
423 .setEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
424 .setIpSrc(lb.loadBalancerIp())
425 .setEthDst(lb.loadBalancerGwMac())
426 .extension(natTreatment, gateway.intgBridge())
427 .transition(GW_DROP_TABLE);
428
429 flowService.setRule(
430 appId,
431 gateway.intgBridge(),
432 sBuilder.build(),
433 tBuilder.build(),
434 PRIORITY_ELB_UPSTREAM_RULE,
435 GW_ENTRY_TABLE,
436 install);
437
438 sBuilder = DefaultTrafficSelector.builder()
439 .matchEthType(Ethernet.TYPE_IPV4)
440 .matchIPProtocol(IPv4.PROTOCOL_TCP)
441 .matchTcpSrc(TpPort.tpPort(nodePort.intValue()));
442
443
444 tBuilder = DefaultTrafficTreatment.builder()
445 .setEthSrc(KUBERNETES_EXTERNAL_LB_FAKE_MAC)
446 .setIpSrc(lb.loadBalancerIp())
447 .setEthDst(lb.loadBalancerGwMac())
448 .setOutput(externalPatchPortNum);
449
450 flowService.setRule(
451 appId,
452 gateway.intgBridge(),
453 sBuilder.build(),
454 tBuilder.build(),
455 PRIORITY_ELB_UPSTREAM_RULE,
456 GW_DROP_TABLE,
457 install);
458 });
459 }
460}