blob: 55b84f1085ca0ada78db08d01543c1f5a676e3e4 [file] [log] [blame]
Daniel Park05a94582021-05-12 10:57:02 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.google.common.collect.Lists;
19import org.onlab.packet.ARP;
20import org.onlab.packet.EthType;
21import org.onlab.packet.Ethernet;
22import org.onlab.packet.IPv4;
23import org.onlab.packet.IpPrefix;
24import org.onlab.packet.TpPort;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.core.GroupId;
31import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
32import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
33import org.onosproject.kubevirtnetworking.api.KubevirtGroupRuleService;
34import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancer;
35import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerEvent;
36import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerListener;
37import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerService;
38import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
39import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
40import org.onosproject.kubevirtnetworking.api.KubevirtPort;
41import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
42import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
43import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
44import org.onosproject.kubevirtnetworking.api.KubevirtRouterListener;
45import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
46import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
47import org.onosproject.kubevirtnode.api.KubevirtNode;
48import org.onosproject.kubevirtnode.api.KubevirtNodeService;
49import org.onosproject.net.Device;
50import org.onosproject.net.PortNumber;
51import org.onosproject.net.device.DeviceService;
52import org.onosproject.net.driver.DriverService;
53import org.onosproject.net.flow.DefaultTrafficSelector;
54import org.onosproject.net.flow.DefaultTrafficTreatment;
55import org.onosproject.net.flow.TrafficSelector;
56import org.onosproject.net.flow.TrafficTreatment;
57import org.onosproject.net.group.DefaultGroupBucket;
58import org.onosproject.net.group.GroupBucket;
59import org.onosproject.net.packet.DefaultOutboundPacket;
60import org.onosproject.net.packet.PacketService;
61import org.osgi.service.component.annotations.Activate;
62import org.osgi.service.component.annotations.Component;
63import org.osgi.service.component.annotations.Deactivate;
64import org.osgi.service.component.annotations.Reference;
65import org.osgi.service.component.annotations.ReferenceCardinality;
66import org.slf4j.Logger;
67
68import java.nio.ByteBuffer;
69import java.util.List;
70import java.util.Objects;
71import java.util.Set;
72import java.util.concurrent.ExecutorService;
73
74import static java.util.concurrent.Executors.newSingleThreadExecutor;
75import static org.onlab.util.Tools.groupedThreads;
76import static org.onosproject.kubevirtnetworking.api.Constants.DEFAULT_GATEWAY_MAC;
77import static org.onosproject.kubevirtnetworking.api.Constants.GW_DROP_TABLE;
78import static org.onosproject.kubevirtnetworking.api.Constants.GW_ENTRY_TABLE;
79import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
80import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
81import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_LB_GATEWAY_TUN_BRIDGE_RULE;
82import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_LB_RULE;
83import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
84import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GENEVE;
85import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GRE;
Jian Li4b3436a2022-03-23 13:07:19 +090086import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.STT;
Daniel Park05a94582021-05-12 10:57:02 +090087import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VXLAN;
88import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.buildGarpPacket;
89import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.externalPatchPortNum;
90import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getLoadBalancerSetForRouter;
91import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterForKubevirtNetwork;
92import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
93import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
94import static org.onosproject.net.group.GroupDescription.Type.SELECT;
95import static org.slf4j.LoggerFactory.getLogger;
96
97/**
98 * Handles kubevirt loadbalancer.
99 */
100@Component(immediate = true)
101public class KubevirtLbHandler {
102 protected final Logger log = getLogger(getClass());
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected CoreService coreService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected ClusterService clusterService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected LeadershipService leadershipService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
114 protected DriverService driverService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
117 protected DeviceService deviceService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
120 protected PacketService packetService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
123 protected KubevirtRouterService routerService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
126 protected KubevirtNetworkService networkService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
129 protected KubevirtGroupRuleService groupRuleService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
132 protected KubevirtNodeService nodeService;
133
134 @Reference(cardinality = ReferenceCardinality.MANDATORY)
135 protected KubevirtPortService portService;
136
137 @Reference(cardinality = ReferenceCardinality.MANDATORY)
138 protected KubevirtLoadBalancerService loadBalancerService;
139
140 @Reference(cardinality = ReferenceCardinality.MANDATORY)
141 protected KubevirtFlowRuleService flowRuleService;
142
143 private final InternalLbEventListener lbEventListener =
144 new InternalLbEventListener();
145
146 private final ExecutorService eventExecutor = newSingleThreadExecutor(
147 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
148
149 private ApplicationId appId;
150 private NodeId localNodeId;
151
152 private static final String PROTOCOL_TCP = "TCP";
153 private static final String PROTOCOL_UDP = "UDP";
154 private static final String PROTOCOL_ICMP = "ICMP";
155
156 private final InternalRouterEventListener kubevirtRouterlistener = new InternalRouterEventListener();
157
158 @Activate
159 protected void activate() {
160 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
161 localNodeId = clusterService.getLocalNode().id();
162 leadershipService.runForLeadership(appId.name());
163
164 loadBalancerService.addListener(lbEventListener);
165 routerService.addListener(kubevirtRouterlistener);
166
167
168 log.info("Started");
169 }
170
171 @Deactivate
172 protected void deactivate() {
173 leadershipService.withdraw(appId.name());
174
175 loadBalancerService.removeListener(lbEventListener);
176 routerService.removeListener(kubevirtRouterlistener);
177
178 eventExecutor.shutdown();
179
180 log.info("Stopped");
181 }
182
183 private class InternalLbEventListener implements KubevirtLoadBalancerListener {
184 private boolean isRelevantHelper() {
185 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
186 }
187
188 @Override
189 public void event(KubevirtLoadBalancerEvent event) {
190 switch (event.type()) {
191 case KUBEVIRT_LOAD_BALANCER_CREATED:
192 eventExecutor.execute(() -> processLbCreated(event.subject()));
193 break;
194 case KUBEVIRT_LOAD_BALANCER_UPDATED:
195 eventExecutor.execute(() -> processLbUpdated(event.subject(), event.oldLb()));
196 break;
197 case KUBEVIRT_LOAD_BALANCER_REMOVED:
198 eventExecutor.execute(() -> processLbRemoved(event.subject()));
199 break;
200 default:
201 //do nothing
202 break;
203 }
204 }
205
206 private void processLbCreated(KubevirtLoadBalancer loadBalancer) {
207 if (!isRelevantHelper()) {
208 return;
209 }
210
211 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
212
213 if (network == null) {
214 log.warn("Failed to process processLbCreated because there's no network for lb {}",
215 loadBalancer.name());
216 return;
217 }
218
219 KubevirtRouter router =
220 getRouterForKubevirtNetwork(routerService, network);
221
222 if (router == null) {
223 log.warn("Failed to process processLbCreated because there's no router for lb {}",
224 loadBalancer.name());
225 return;
226 }
227
228 if (router.electedGateway() == null) {
229 log.warn("Failed to process processLbCreated because there's elected gateway for lb {}",
230 loadBalancer.name());
231 return;
232 }
233
234 KubevirtNode gateway = nodeService.node(router.electedGateway());
235
236 setLbGroup(loadBalancer, gateway, true);
237 setBucketsToGroup(loadBalancer, gateway, true);
238 setLbDownstreamRules(loadBalancer, router, gateway, true);
239 setLbUpstreamRules(loadBalancer, router, gateway, true);
240
Jian Li4b3436a2022-03-23 13:07:19 +0900241 if (network.type() == VXLAN || network.type() == GENEVE || network.type() == GRE || network.type() == STT) {
Daniel Park05a94582021-05-12 10:57:02 +0900242 setLbDownStreamRulesForTunBridge(loadBalancer, gateway, true);
243 }
244 }
245
246 private void processLbUpdated(KubevirtLoadBalancer loadBalancer, KubevirtLoadBalancer old) {
247 if (!isRelevantHelper()) {
248 return;
249 }
250 // clean up buckets and flow rules related to the old loadbalancer
251
252 KubevirtNetwork oldNetwork = networkService.network(loadBalancer.networkId());
253
254 if (oldNetwork == null) {
255 log.warn("Failed to process processLbUpdated because there's no network for lb {}",
256 loadBalancer.name());
257 return;
258 }
259
260 KubevirtRouter oldRouter =
261 getRouterForKubevirtNetwork(routerService, oldNetwork);
262
263 if (oldRouter == null) {
264 log.warn("Failed to process processLbUpdated because there's no router for lb {}",
265 loadBalancer.name());
266 return;
267 }
268
269 if (oldRouter.electedGateway() == null) {
270 log.warn("Failed to process processLbUpdated because there's elected gateway for lb {}",
271 loadBalancer.name());
272 return;
273 }
274 KubevirtNode oldGateway = nodeService.node(oldRouter.electedGateway());
275
276 setLbDownstreamRules(old, oldRouter, oldGateway, false);
277 setLbUpstreamRules(old, oldRouter, oldGateway, false);
Jian Li4b3436a2022-03-23 13:07:19 +0900278 if (oldNetwork.type() == VXLAN || oldNetwork.type() == GENEVE ||
279 oldNetwork.type() == GRE || oldNetwork.type() == STT) {
Daniel Park05a94582021-05-12 10:57:02 +0900280 setLbDownStreamRulesForTunBridge(loadBalancer, oldGateway, false);
281 }
282 setBucketsToGroup(old, oldGateway, false);
283 setLbGroup(old, oldGateway, false);
284
285
286 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
287
288 if (network == null) {
289 log.warn("Failed to process processLbUpdated because there's no network for lb {}",
290 loadBalancer.name());
291 return;
292 }
293
294 KubevirtRouter router =
295 getRouterForKubevirtNetwork(routerService, network);
296
297 if (router == null) {
298 log.warn("Failed to process processLbUpdated because there's no router for lb {}",
299 loadBalancer.name());
300 return;
301 }
302
303 if (router.electedGateway() == null) {
304 log.warn("Failed to process processLbUpdated because there's elected gateway for lb {}",
305 loadBalancer.name());
306 return;
307 }
308
309 KubevirtNode gateway = nodeService.node(router.electedGateway());
310
311 setLbGroup(loadBalancer, gateway, true);
312 setBucketsToGroup(loadBalancer, gateway, true);
313 setLbDownstreamRules(loadBalancer, router, gateway, true);
314 setLbUpstreamRules(loadBalancer, router, gateway, true);
Jian Li4b3436a2022-03-23 13:07:19 +0900315 if (network.type() == VXLAN || network.type() == GENEVE ||
316 network.type() == GRE || network.type() == STT) {
Daniel Park05a94582021-05-12 10:57:02 +0900317 setLbDownStreamRulesForTunBridge(loadBalancer, gateway, true);
318 }
319 }
320
321 private void processLbRemoved(KubevirtLoadBalancer loadBalancer) {
322 if (!isRelevantHelper()) {
323 return;
324 }
325
326 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
327
328 if (network == null) {
329 log.warn("Failed to process processLbRemoved because there's no network for lb {}",
330 loadBalancer.name());
331 return;
332 }
333
334 KubevirtRouter router =
335 getRouterForKubevirtNetwork(routerService, network);
336
337 if (router == null) {
338 log.warn("Failed to process processLbRemoved because there's no router for lb {}",
339 loadBalancer.name());
340 return;
341 }
342
343 if (router.electedGateway() == null) {
344 log.warn("Failed to process processLbRemoved because there's elected gateway for lb {}",
345 loadBalancer.name());
346 return;
347 }
348
349 KubevirtNode gateway = nodeService.node(router.electedGateway());
350
351 setLbDownstreamRules(loadBalancer, router, gateway, false);
352 setLbUpstreamRules(loadBalancer, router, gateway, false);
353 setBucketsToGroup(loadBalancer, gateway, false);
354 setLbGroup(loadBalancer, gateway, false);
355
Jian Li4b3436a2022-03-23 13:07:19 +0900356 if (network.type() == VXLAN || network.type() == GENEVE || network.type() == GRE || network.type() == STT) {
Daniel Park05a94582021-05-12 10:57:02 +0900357 setLbDownStreamRulesForTunBridge(loadBalancer, gateway, false);
358 }
359 }
360 }
361
362 private void setLbGroup(KubevirtLoadBalancer loadBalancer, KubevirtNode gateway, boolean install) {
363
364 int groupId = loadBalancer.hashCode();
365
366 groupRuleService.setRule(appId, gateway.intgBridge(), groupId,
367 SELECT, Lists.newArrayList(), install);
368 }
369
370 private void setBucketsToGroup(KubevirtLoadBalancer loadBalancer, KubevirtNode gateway, boolean install) {
371 int groupId = loadBalancer.hashCode();
372
373 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
374
375 Set<KubevirtPort> ports = portService.ports(loadBalancer.networkId());
376
377 List<GroupBucket> bkts = Lists.newArrayList();
378 loadBalancer.members().forEach(ip -> {
379 ports.stream().filter(port -> port.ipAddress().equals(ip) && port.macAddress() != null)
380 .findAny().ifPresent(port -> bkts.add(buildGroupBucket(port)));
381 });
382
383 groupRuleService.setBuckets(appId, gateway.intgBridge(),
384 groupId, bkts, install);
385 }
386
387 private void setLbDownstreamRules(KubevirtLoadBalancer loadBalancer,
388 KubevirtRouter router,
389 KubevirtNode gateway, boolean install) {
390
391 int groupId = loadBalancer.hashCode();
392
393 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
394 .setEthSrc(router.mac())
395 .group(GroupId.valueOf(groupId))
396 .build();
397
398 loadBalancer.rules().forEach(rule -> {
399 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
400 switch (rule.protocol().toUpperCase()) {
401 case PROTOCOL_TCP:
402 sBuilder.matchIPDst(loadBalancer.vip().toIpPrefix())
403 .matchEthType(Ethernet.TYPE_IPV4)
404 .matchIPProtocol(IPv4.PROTOCOL_TCP)
405 .matchTcpDst(TpPort.tpPort(rule.portRangeMin().intValue()));
406 break;
407 case PROTOCOL_UDP:
408 sBuilder.matchIPDst(loadBalancer.vip().toIpPrefix())
409 .matchEthType(Ethernet.TYPE_IPV4)
410 .matchIPProtocol(IPv4.PROTOCOL_UDP)
411 .matchUdpDst(TpPort.tpPort(rule.portRangeMin().intValue()));
412 break;
413 case PROTOCOL_ICMP:
414 sBuilder.matchIPDst(loadBalancer.vip().toIpPrefix())
415 .matchEthType(Ethernet.TYPE_IPV4)
416 .matchIPProtocol(IPv4.PROTOCOL_ICMP);
417 break;
418 default:
419 break;
420 }
421
422 flowRuleService.setRule(
423 appId,
424 gateway.intgBridge(),
425 sBuilder.build(),
426 treatment,
427 PRIORITY_LB_RULE,
428 GW_DROP_TABLE,
429 install
430 );
431 });
432 }
433
434 private void setLbDownstreamRulesForFloatingIp(KubevirtNode gateway,
435 KubevirtFloatingIp floatingIp,
436 boolean install) {
437
438 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
439 .matchEthType(Ethernet.TYPE_IPV4)
440 .matchIPDst(floatingIp.floatingIp().toIpPrefix());
441
442 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
443 .setIpDst(floatingIp.fixedIp())
444 .transition(GW_DROP_TABLE);
445
446 flowRuleService.setRule(
447 appId,
448 gateway.intgBridge(),
449 sBuilder.build(),
450 tBuilder.build(),
451 PRIORITY_LB_RULE,
452 GW_ENTRY_TABLE,
453 install);
454 }
455
456 private void setLbDownStreamRulesForTunBridge(KubevirtLoadBalancer loadBalancer,
457 KubevirtNode gateway, boolean install) {
458 Set<KubevirtPort> ports = portService.ports(loadBalancer.networkId());
459 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
460
461 PortNumber tunnelPortNumber = tunnelPort(gateway, network);
462 if (tunnelPortNumber == null) {
463 return;
464 }
465
466 loadBalancer.members().forEach(ip -> {
467 ports.stream().filter(port -> port.ipAddress().equals(ip) && port.macAddress() != null)
468 .findAny().ifPresent(port -> {
469
470 KubevirtNode workerNode = nodeService.node(port.deviceId());
471
472 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
473 .matchEthType(Ethernet.TYPE_IPV4)
474 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32))
475 .matchEthDst(port.macAddress());
476
477 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
478 .setTunnelId(Long.parseLong(network.segmentId()))
479 .extension(buildExtension(
480 deviceService,
481 gateway.tunBridge(),
482 workerNode.dataIp().getIp4Address()),
483 gateway.tunBridge())
484 .setOutput(tunnelPortNumber);
485
486 flowRuleService.setRule(
487 appId,
488 gateway.tunBridge(),
489 sBuilder.build(),
490 tBuilder.build(),
491 PRIORITY_LB_GATEWAY_TUN_BRIDGE_RULE,
492 TUNNEL_DEFAULT_TABLE,
493 install);
494 });
495 });
496 }
497 private void setLbUpstreamRules(KubevirtLoadBalancer loadBalancer,
498 KubevirtRouter router,
499 KubevirtNode gateway, boolean install) {
500
501 Set<KubevirtPort> ports = portService.ports(loadBalancer.networkId());
502
503 loadBalancer.members().forEach(ip -> {
504 ports.stream().filter(port -> port.ipAddress().equals(ip) && port.macAddress() != null)
505 .findAny().ifPresent(port -> {
506 loadBalancer.rules().forEach(rule -> {
507 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
508 .matchEthType(Ethernet.TYPE_IPV4)
509 .matchEthSrc(port.macAddress())
510 .matchIPSrc(port.ipAddress().toIpPrefix());
511
512 switch (rule.protocol().toUpperCase()) {
513 case PROTOCOL_TCP:
514 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
515 .matchTcpSrc(TpPort.tpPort(rule.portRangeMin()));
516 break;
517 case PROTOCOL_UDP:
518 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
519 .matchUdpSrc(TpPort.tpPort(rule.portRangeMin()));
520 break;
521 case PROTOCOL_ICMP:
522 sBuilder.matchIPProtocol(IPv4.PROTOCOL_ICMP);
523 break;
524 default:
525 break;
526 }
527
528 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
529 .setEthSrc(router.mac())
530 .setIpSrc(loadBalancer.vip())
531 .transition(GW_DROP_TABLE);
532
533 flowRuleService.setRule(
534 appId,
535 gateway.intgBridge(),
536 sBuilder.build(),
537 tBuilder.build(),
538 PRIORITY_LB_RULE,
539 GW_ENTRY_TABLE,
540 install);
541 });
542 });
543 });
544 }
545
546 private void setArpResponseRuleForFloatingIp(KubevirtNode gateway,
547 KubevirtFloatingIp floatingIp,
548 boolean install) {
549 TrafficSelector selector = DefaultTrafficSelector.builder()
550 .matchInPort(externalPatchPortNum(deviceService, gateway))
551 .matchEthType(EthType.EtherType.ARP.ethType().toShort())
552 .matchArpOp(ARP.OP_REQUEST)
553 .matchArpTpa(floatingIp.floatingIp().getIp4Address())
554 .build();
555
556 Device device = deviceService.getDevice(gateway.intgBridge());
557
558 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
559 .extension(RulePopulatorUtil.buildMoveEthSrcToDstExtension(device), device.id())
560 .extension(RulePopulatorUtil.buildMoveArpShaToThaExtension(device), device.id())
561 .extension(RulePopulatorUtil.buildMoveArpSpaToTpaExtension(device), device.id())
562 .setArpOp(ARP.OP_REPLY)
563 .setEthSrc(DEFAULT_GATEWAY_MAC)
564 .setArpSha(DEFAULT_GATEWAY_MAC)
565 .setArpSpa(floatingIp.floatingIp().getIp4Address())
566 .setOutput(PortNumber.IN_PORT)
567 .build();
568
569 flowRuleService.setRule(
570 appId,
571 gateway.intgBridge(),
572 selector,
573 treatment,
574 PRIORITY_ARP_GATEWAY_RULE,
575 GW_ENTRY_TABLE,
576 install);
577 }
578 private void setLbUpstreamRulesForFloatingIp(KubevirtRouter router,
579 KubevirtNode gateway,
580 KubevirtFloatingIp floatingIp,
581 boolean install) {
582 if (router.peerRouter().macAddress() == null) {
583 return;
584 }
585
586 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
587 .matchEthType(Ethernet.TYPE_IPV4)
588 .matchEthSrc(router.mac())
589 .matchIPSrc(floatingIp.fixedIp().toIpPrefix());
590
591 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
592 .setEthSrc(DEFAULT_GATEWAY_MAC)
593 .setIpSrc(floatingIp.floatingIp())
594 .setEthDst(router.peerRouter().macAddress())
595 .setOutput(externalPatchPortNum(deviceService, gateway));
596
597 flowRuleService.setRule(
598 appId,
599 gateway.intgBridge(),
600 sBuilder.build(),
601 tBuilder.build(),
602 PRIORITY_LB_RULE,
603 GW_DROP_TABLE,
604 install);
605 }
606
607 private GroupBucket buildGroupBucket(KubevirtPort port) {
608 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
609 tBuilder.setEthDst(port.macAddress())
610 .setIpDst(port.ipAddress())
611 .setOutput(PortNumber.NORMAL);
612
613 return DefaultGroupBucket.createSelectGroupBucket(tBuilder.build());
614 }
615
616 private class InternalRouterEventListener implements KubevirtRouterListener {
617 private boolean isRelevantHelper() {
618 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
619 }
620
621 @Override
622 public void event(KubevirtRouterEvent event) {
623 switch (event.type()) {
624 case KUBEVIRT_GATEWAY_NODE_ATTACHED:
625 eventExecutor.execute(() -> processRouterGatewayNodeAttached(event.subject(),
626 event.gateway()));
627 break;
628 case KUBEVIRT_GATEWAY_NODE_CHANGED:
629 eventExecutor.execute(() -> processRouterGatewayNodeChanged(event.subject(),
630 event.gateway()));
631 break;
632 case KUBEVIRT_GATEWAY_NODE_DETACHED:
633 eventExecutor.execute(() -> processRouterGatewayNodeDetached(event.subject(),
634 event.gateway()));
635 break;
636
637 case KUBEVIRT_FLOATING_IP_LB_ASSOCIATED:
638 eventExecutor.execute(() -> processFloatingIpAssociated(event.subject(),
639 event.floatingIp()));
640 break;
641 case KUBEVIRT_FLOATING_IP_LB_DISASSOCIATED:
642 eventExecutor.execute(() -> processFloatingIpDisAssociated(event.subject(),
643 event.floatingIp()));
644 break;
645 case KUBEVIRT_PEER_ROUTER_MAC_RETRIEVED:
646 eventExecutor.execute(() -> processPeerRouterRetrieved(event.subject()));
647 break;
648 default:
649 //do nothing
650 break;
651 }
652 }
653
654 private void processPeerRouterRetrieved(KubevirtRouter router) {
655 if (!isRelevantHelper()) {
656 return;
657 }
658
659 if (router.peerRouter().macAddress() == null) {
660 return;
661 }
662
663 if (router.electedGateway() == null) {
664 return;
665 }
666
667 processRouterGatewayNodeAttached(router, router.electedGateway());
668 }
669
670 private void processFloatingIpAssociated(KubevirtRouter router, KubevirtFloatingIp floatingIp) {
671 if (!isRelevantHelper()) {
672 return;
673 }
674
675 if (router.electedGateway() == null) {
676 log.warn("Failed to process processFloatingIpAssociated because there's elected gateway for fip {}",
677 floatingIp.floatingIp());
678 return;
679 }
680
681 KubevirtNode gateway = nodeService.node(router.electedGateway());
682
683 loadBalancerService.loadBalancers().stream()
684 .filter(lb -> lb.vip().equals(floatingIp.fixedIp()))
685 .findAny()
686 .ifPresent(lb -> {
687 setLbUpstreamRulesForFloatingIp(router, gateway, floatingIp, true);
688 setLbDownstreamRulesForFloatingIp(gateway, floatingIp, true);
689 setArpResponseRuleForFloatingIp(gateway, floatingIp, true);
690 processGarpPacketForFloatingIp(floatingIp, gateway);
691 });
692 }
693
694 private void processGarpPacketForFloatingIp(KubevirtFloatingIp floatingIp, KubevirtNode electedGw) {
695 if (floatingIp == null) {
696 return;
697 }
698
699 Ethernet ethernet = buildGarpPacket(DEFAULT_GATEWAY_MAC, floatingIp.floatingIp());
700 if (ethernet == null) {
701 return;
702 }
703
704 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
705 .setOutput(externalPatchPortNum(deviceService, electedGw)).build();
706
707 packetService.emit(new DefaultOutboundPacket(electedGw.intgBridge(), treatment,
708 ByteBuffer.wrap(ethernet.serialize())));
709 }
710
711 private void processFloatingIpDisAssociated(KubevirtRouter router, KubevirtFloatingIp floatingIp) {
712 if (!isRelevantHelper()) {
713 return;
714 }
715
716 if (router.electedGateway() == null) {
717 log.warn("Failed to process processFloatingIpDisAssociated because there's elected gateway for fip {}",
718 floatingIp.floatingIp());
719 return;
720 }
721
722 KubevirtNode gateway = nodeService.node(router.electedGateway());
723
724
725 loadBalancerService.loadBalancers().stream()
726 .filter(lb -> lb.vip().equals(floatingIp.fixedIp()))
727 .findAny()
728 .ifPresent(lb -> {
729 setLbUpstreamRulesForFloatingIp(router, gateway, floatingIp, false);
730 setLbDownstreamRulesForFloatingIp(gateway, floatingIp, false);
731 setArpResponseRuleForFloatingIp(gateway, floatingIp, false);
732 });
733 }
734
735 private void processRouterGatewayNodeAttached(KubevirtRouter router,
736 String associatedGateway) {
737 if (!isRelevantHelper()) {
738 return;
739 }
740
741 KubevirtNode gatewayNode = nodeService.node(associatedGateway);
742 if (gatewayNode == null) {
743 return;
744 }
745
746 getLoadBalancerSetForRouter(router, loadBalancerService).forEach(loadBalancer -> {
747 setLbGroup(loadBalancer, gatewayNode, true);
748 setBucketsToGroup(loadBalancer, gatewayNode, true);
749 setLbDownstreamRules(loadBalancer, router, gatewayNode, true);
750 setLbUpstreamRules(loadBalancer, router, gatewayNode, true);
751
752 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
Jian Li4b3436a2022-03-23 13:07:19 +0900753 if (network.type() == VXLAN || network.type() == GENEVE ||
754 network.type() == GRE || network.type() == STT) {
Daniel Park05a94582021-05-12 10:57:02 +0900755 setLbDownStreamRulesForTunBridge(loadBalancer, gatewayNode, true);
756 }
757
758 routerService.floatingIpsByRouter(router.name())
759 .stream()
760 .filter(fip -> fip.fixedIp() != null && fip.fixedIp().equals(loadBalancer.vip()))
761 .findAny()
762 .ifPresent(fip -> {
763 setLbDownstreamRulesForFloatingIp(gatewayNode, fip, true);
764 setLbUpstreamRulesForFloatingIp(router, gatewayNode, fip, true);
765 setArpResponseRuleForFloatingIp(gatewayNode, fip, true);
766 });
767 });
768 }
769
770 private void processRouterGatewayNodeDetached(KubevirtRouter router,
771 String disAssociatedGateway) {
772 if (!isRelevantHelper()) {
773 return;
774 }
775
776 KubevirtNode gatewayNode = nodeService.node(disAssociatedGateway);
777 if (gatewayNode == null) {
778 return;
779 }
780
781 getLoadBalancerSetForRouter(router, loadBalancerService).forEach(loadBalancer -> {
782 setLbDownstreamRules(loadBalancer, router, gatewayNode, false);
783 setLbUpstreamRules(loadBalancer, router, gatewayNode, false);
784 setBucketsToGroup(loadBalancer, gatewayNode, false);
785 setLbGroup(loadBalancer, gatewayNode, false);
786
787 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
Jian Li4b3436a2022-03-23 13:07:19 +0900788 if (network.type() == VXLAN || network.type() == GENEVE ||
789 network.type() == GRE || network.type() == STT) {
Daniel Park05a94582021-05-12 10:57:02 +0900790 setLbDownStreamRulesForTunBridge(loadBalancer, gatewayNode, false);
791 }
792
793 routerService.floatingIpsByRouter(router.name())
794 .stream()
795 .filter(fip -> fip.fixedIp() != null && fip.fixedIp().equals(loadBalancer.vip()))
796 .findAny()
797 .ifPresent(fip -> {
798 setLbDownstreamRulesForFloatingIp(gatewayNode, fip, false);
799 setLbUpstreamRulesForFloatingIp(router, gatewayNode, fip, false);
800 setArpResponseRuleForFloatingIp(gatewayNode, fip, false);
801 });
802 });
803 }
804
805 private void processRouterGatewayNodeChanged(KubevirtRouter router,
806 String disAssociatedGateway) {
807 if (!isRelevantHelper()) {
808 return;
809 }
810
811 processRouterGatewayNodeDetached(router, disAssociatedGateway);
812
813 KubevirtNode newGatewayNode = nodeService.node(router.electedGateway());
814 if (newGatewayNode == null) {
815 return;
816 }
817 processRouterGatewayNodeAttached(router, newGatewayNode.hostname());
818 }
819 }
820}