blob: ba352eb8b3d0e688b115d6caec7211120656e5d3 [file] [log] [blame]
Daniel Park05a94582021-05-12 10:57:02 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.google.common.collect.Lists;
19import org.onlab.packet.ARP;
20import org.onlab.packet.EthType;
21import org.onlab.packet.Ethernet;
22import org.onlab.packet.IPv4;
23import org.onlab.packet.IpPrefix;
24import org.onlab.packet.TpPort;
25import org.onosproject.cluster.ClusterService;
26import org.onosproject.cluster.LeadershipService;
27import org.onosproject.cluster.NodeId;
28import org.onosproject.core.ApplicationId;
29import org.onosproject.core.CoreService;
30import org.onosproject.core.GroupId;
31import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
32import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
33import org.onosproject.kubevirtnetworking.api.KubevirtGroupRuleService;
34import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancer;
35import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerEvent;
36import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerListener;
37import org.onosproject.kubevirtnetworking.api.KubevirtLoadBalancerService;
38import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
39import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
40import org.onosproject.kubevirtnetworking.api.KubevirtPort;
41import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
42import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
43import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
44import org.onosproject.kubevirtnetworking.api.KubevirtRouterListener;
45import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
46import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
47import org.onosproject.kubevirtnode.api.KubevirtNode;
48import org.onosproject.kubevirtnode.api.KubevirtNodeService;
49import org.onosproject.net.Device;
50import org.onosproject.net.PortNumber;
51import org.onosproject.net.device.DeviceService;
52import org.onosproject.net.driver.DriverService;
53import org.onosproject.net.flow.DefaultTrafficSelector;
54import org.onosproject.net.flow.DefaultTrafficTreatment;
55import org.onosproject.net.flow.TrafficSelector;
56import org.onosproject.net.flow.TrafficTreatment;
57import org.onosproject.net.group.DefaultGroupBucket;
58import org.onosproject.net.group.GroupBucket;
59import org.onosproject.net.packet.DefaultOutboundPacket;
60import org.onosproject.net.packet.PacketService;
61import org.osgi.service.component.annotations.Activate;
62import org.osgi.service.component.annotations.Component;
63import org.osgi.service.component.annotations.Deactivate;
64import org.osgi.service.component.annotations.Reference;
65import org.osgi.service.component.annotations.ReferenceCardinality;
66import org.slf4j.Logger;
67
68import java.nio.ByteBuffer;
69import java.util.List;
70import java.util.Objects;
71import java.util.Set;
72import java.util.concurrent.ExecutorService;
73
74import static java.util.concurrent.Executors.newSingleThreadExecutor;
75import static org.onlab.util.Tools.groupedThreads;
76import static org.onosproject.kubevirtnetworking.api.Constants.DEFAULT_GATEWAY_MAC;
77import static org.onosproject.kubevirtnetworking.api.Constants.GW_DROP_TABLE;
78import static org.onosproject.kubevirtnetworking.api.Constants.GW_ENTRY_TABLE;
79import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
80import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
81import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_LB_GATEWAY_TUN_BRIDGE_RULE;
82import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_LB_RULE;
83import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
84import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GENEVE;
85import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GRE;
86import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VXLAN;
87import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.buildGarpPacket;
88import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.externalPatchPortNum;
89import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getLoadBalancerSetForRouter;
90import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterForKubevirtNetwork;
91import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
92import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
93import static org.onosproject.net.group.GroupDescription.Type.SELECT;
94import static org.slf4j.LoggerFactory.getLogger;
95
96/**
97 * Handles kubevirt loadbalancer.
98 */
99@Component(immediate = true)
100public class KubevirtLbHandler {
101 protected final Logger log = getLogger(getClass());
102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected CoreService coreService;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected ClusterService clusterService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected LeadershipService leadershipService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected DriverService driverService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY)
116 protected DeviceService deviceService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY)
119 protected PacketService packetService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY)
122 protected KubevirtRouterService routerService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
125 protected KubevirtNetworkService networkService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
128 protected KubevirtGroupRuleService groupRuleService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
131 protected KubevirtNodeService nodeService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
134 protected KubevirtPortService portService;
135
136 @Reference(cardinality = ReferenceCardinality.MANDATORY)
137 protected KubevirtLoadBalancerService loadBalancerService;
138
139 @Reference(cardinality = ReferenceCardinality.MANDATORY)
140 protected KubevirtFlowRuleService flowRuleService;
141
142 private final InternalLbEventListener lbEventListener =
143 new InternalLbEventListener();
144
145 private final ExecutorService eventExecutor = newSingleThreadExecutor(
146 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
147
148 private ApplicationId appId;
149 private NodeId localNodeId;
150
151 private static final String PROTOCOL_TCP = "TCP";
152 private static final String PROTOCOL_UDP = "UDP";
153 private static final String PROTOCOL_ICMP = "ICMP";
154
155 private final InternalRouterEventListener kubevirtRouterlistener = new InternalRouterEventListener();
156
157 @Activate
158 protected void activate() {
159 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
160 localNodeId = clusterService.getLocalNode().id();
161 leadershipService.runForLeadership(appId.name());
162
163 loadBalancerService.addListener(lbEventListener);
164 routerService.addListener(kubevirtRouterlistener);
165
166
167 log.info("Started");
168 }
169
170 @Deactivate
171 protected void deactivate() {
172 leadershipService.withdraw(appId.name());
173
174 loadBalancerService.removeListener(lbEventListener);
175 routerService.removeListener(kubevirtRouterlistener);
176
177 eventExecutor.shutdown();
178
179 log.info("Stopped");
180 }
181
182 private class InternalLbEventListener implements KubevirtLoadBalancerListener {
183 private boolean isRelevantHelper() {
184 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
185 }
186
187 @Override
188 public void event(KubevirtLoadBalancerEvent event) {
189 switch (event.type()) {
190 case KUBEVIRT_LOAD_BALANCER_CREATED:
191 eventExecutor.execute(() -> processLbCreated(event.subject()));
192 break;
193 case KUBEVIRT_LOAD_BALANCER_UPDATED:
194 eventExecutor.execute(() -> processLbUpdated(event.subject(), event.oldLb()));
195 break;
196 case KUBEVIRT_LOAD_BALANCER_REMOVED:
197 eventExecutor.execute(() -> processLbRemoved(event.subject()));
198 break;
199 default:
200 //do nothing
201 break;
202 }
203 }
204
205 private void processLbCreated(KubevirtLoadBalancer loadBalancer) {
206 if (!isRelevantHelper()) {
207 return;
208 }
209
210 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
211
212 if (network == null) {
213 log.warn("Failed to process processLbCreated because there's no network for lb {}",
214 loadBalancer.name());
215 return;
216 }
217
218 KubevirtRouter router =
219 getRouterForKubevirtNetwork(routerService, network);
220
221 if (router == null) {
222 log.warn("Failed to process processLbCreated because there's no router for lb {}",
223 loadBalancer.name());
224 return;
225 }
226
227 if (router.electedGateway() == null) {
228 log.warn("Failed to process processLbCreated because there's elected gateway for lb {}",
229 loadBalancer.name());
230 return;
231 }
232
233 KubevirtNode gateway = nodeService.node(router.electedGateway());
234
235 setLbGroup(loadBalancer, gateway, true);
236 setBucketsToGroup(loadBalancer, gateway, true);
237 setLbDownstreamRules(loadBalancer, router, gateway, true);
238 setLbUpstreamRules(loadBalancer, router, gateway, true);
239
240 if (network.type() == VXLAN || network.type() == GENEVE || network.type() == GRE) {
241 setLbDownStreamRulesForTunBridge(loadBalancer, gateway, true);
242 }
243 }
244
245 private void processLbUpdated(KubevirtLoadBalancer loadBalancer, KubevirtLoadBalancer old) {
246 if (!isRelevantHelper()) {
247 return;
248 }
249 // clean up buckets and flow rules related to the old loadbalancer
250
251 KubevirtNetwork oldNetwork = networkService.network(loadBalancer.networkId());
252
253 if (oldNetwork == null) {
254 log.warn("Failed to process processLbUpdated because there's no network for lb {}",
255 loadBalancer.name());
256 return;
257 }
258
259 KubevirtRouter oldRouter =
260 getRouterForKubevirtNetwork(routerService, oldNetwork);
261
262 if (oldRouter == null) {
263 log.warn("Failed to process processLbUpdated because there's no router for lb {}",
264 loadBalancer.name());
265 return;
266 }
267
268 if (oldRouter.electedGateway() == null) {
269 log.warn("Failed to process processLbUpdated because there's elected gateway for lb {}",
270 loadBalancer.name());
271 return;
272 }
273 KubevirtNode oldGateway = nodeService.node(oldRouter.electedGateway());
274
275 setLbDownstreamRules(old, oldRouter, oldGateway, false);
276 setLbUpstreamRules(old, oldRouter, oldGateway, false);
277 if (oldNetwork.type() == VXLAN || oldNetwork.type() == GENEVE || oldNetwork.type() == GRE) {
278 setLbDownStreamRulesForTunBridge(loadBalancer, oldGateway, false);
279 }
280 setBucketsToGroup(old, oldGateway, false);
281 setLbGroup(old, oldGateway, false);
282
283
284 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
285
286 if (network == null) {
287 log.warn("Failed to process processLbUpdated because there's no network for lb {}",
288 loadBalancer.name());
289 return;
290 }
291
292 KubevirtRouter router =
293 getRouterForKubevirtNetwork(routerService, network);
294
295 if (router == null) {
296 log.warn("Failed to process processLbUpdated because there's no router for lb {}",
297 loadBalancer.name());
298 return;
299 }
300
301 if (router.electedGateway() == null) {
302 log.warn("Failed to process processLbUpdated because there's elected gateway for lb {}",
303 loadBalancer.name());
304 return;
305 }
306
307 KubevirtNode gateway = nodeService.node(router.electedGateway());
308
309 setLbGroup(loadBalancer, gateway, true);
310 setBucketsToGroup(loadBalancer, gateway, true);
311 setLbDownstreamRules(loadBalancer, router, gateway, true);
312 setLbUpstreamRules(loadBalancer, router, gateway, true);
313 if (network.type() == VXLAN || network.type() == GENEVE || network.type() == GRE) {
314 setLbDownStreamRulesForTunBridge(loadBalancer, gateway, true);
315 }
316 }
317
318 private void processLbRemoved(KubevirtLoadBalancer loadBalancer) {
319 if (!isRelevantHelper()) {
320 return;
321 }
322
323 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
324
325 if (network == null) {
326 log.warn("Failed to process processLbRemoved because there's no network for lb {}",
327 loadBalancer.name());
328 return;
329 }
330
331 KubevirtRouter router =
332 getRouterForKubevirtNetwork(routerService, network);
333
334 if (router == null) {
335 log.warn("Failed to process processLbRemoved because there's no router for lb {}",
336 loadBalancer.name());
337 return;
338 }
339
340 if (router.electedGateway() == null) {
341 log.warn("Failed to process processLbRemoved because there's elected gateway for lb {}",
342 loadBalancer.name());
343 return;
344 }
345
346 KubevirtNode gateway = nodeService.node(router.electedGateway());
347
348 setLbDownstreamRules(loadBalancer, router, gateway, false);
349 setLbUpstreamRules(loadBalancer, router, gateway, false);
350 setBucketsToGroup(loadBalancer, gateway, false);
351 setLbGroup(loadBalancer, gateway, false);
352
353 if (network.type() == VXLAN || network.type() == GENEVE || network.type() == GRE) {
354 setLbDownStreamRulesForTunBridge(loadBalancer, gateway, false);
355 }
356 }
357 }
358
359 private void setLbGroup(KubevirtLoadBalancer loadBalancer, KubevirtNode gateway, boolean install) {
360
361 int groupId = loadBalancer.hashCode();
362
363 groupRuleService.setRule(appId, gateway.intgBridge(), groupId,
364 SELECT, Lists.newArrayList(), install);
365 }
366
367 private void setBucketsToGroup(KubevirtLoadBalancer loadBalancer, KubevirtNode gateway, boolean install) {
368 int groupId = loadBalancer.hashCode();
369
370 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
371
372 Set<KubevirtPort> ports = portService.ports(loadBalancer.networkId());
373
374 List<GroupBucket> bkts = Lists.newArrayList();
375 loadBalancer.members().forEach(ip -> {
376 ports.stream().filter(port -> port.ipAddress().equals(ip) && port.macAddress() != null)
377 .findAny().ifPresent(port -> bkts.add(buildGroupBucket(port)));
378 });
379
380 groupRuleService.setBuckets(appId, gateway.intgBridge(),
381 groupId, bkts, install);
382 }
383
384 private void setLbDownstreamRules(KubevirtLoadBalancer loadBalancer,
385 KubevirtRouter router,
386 KubevirtNode gateway, boolean install) {
387
388 int groupId = loadBalancer.hashCode();
389
390 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
391 .setEthSrc(router.mac())
392 .group(GroupId.valueOf(groupId))
393 .build();
394
395 loadBalancer.rules().forEach(rule -> {
396 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
397 switch (rule.protocol().toUpperCase()) {
398 case PROTOCOL_TCP:
399 sBuilder.matchIPDst(loadBalancer.vip().toIpPrefix())
400 .matchEthType(Ethernet.TYPE_IPV4)
401 .matchIPProtocol(IPv4.PROTOCOL_TCP)
402 .matchTcpDst(TpPort.tpPort(rule.portRangeMin().intValue()));
403 break;
404 case PROTOCOL_UDP:
405 sBuilder.matchIPDst(loadBalancer.vip().toIpPrefix())
406 .matchEthType(Ethernet.TYPE_IPV4)
407 .matchIPProtocol(IPv4.PROTOCOL_UDP)
408 .matchUdpDst(TpPort.tpPort(rule.portRangeMin().intValue()));
409 break;
410 case PROTOCOL_ICMP:
411 sBuilder.matchIPDst(loadBalancer.vip().toIpPrefix())
412 .matchEthType(Ethernet.TYPE_IPV4)
413 .matchIPProtocol(IPv4.PROTOCOL_ICMP);
414 break;
415 default:
416 break;
417 }
418
419 flowRuleService.setRule(
420 appId,
421 gateway.intgBridge(),
422 sBuilder.build(),
423 treatment,
424 PRIORITY_LB_RULE,
425 GW_DROP_TABLE,
426 install
427 );
428 });
429 }
430
431 private void setLbDownstreamRulesForFloatingIp(KubevirtNode gateway,
432 KubevirtFloatingIp floatingIp,
433 boolean install) {
434
435 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
436 .matchEthType(Ethernet.TYPE_IPV4)
437 .matchIPDst(floatingIp.floatingIp().toIpPrefix());
438
439 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
440 .setIpDst(floatingIp.fixedIp())
441 .transition(GW_DROP_TABLE);
442
443 flowRuleService.setRule(
444 appId,
445 gateway.intgBridge(),
446 sBuilder.build(),
447 tBuilder.build(),
448 PRIORITY_LB_RULE,
449 GW_ENTRY_TABLE,
450 install);
451 }
452
453 private void setLbDownStreamRulesForTunBridge(KubevirtLoadBalancer loadBalancer,
454 KubevirtNode gateway, boolean install) {
455 Set<KubevirtPort> ports = portService.ports(loadBalancer.networkId());
456 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
457
458 PortNumber tunnelPortNumber = tunnelPort(gateway, network);
459 if (tunnelPortNumber == null) {
460 return;
461 }
462
463 loadBalancer.members().forEach(ip -> {
464 ports.stream().filter(port -> port.ipAddress().equals(ip) && port.macAddress() != null)
465 .findAny().ifPresent(port -> {
466
467 KubevirtNode workerNode = nodeService.node(port.deviceId());
468
469 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
470 .matchEthType(Ethernet.TYPE_IPV4)
471 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32))
472 .matchEthDst(port.macAddress());
473
474 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
475 .setTunnelId(Long.parseLong(network.segmentId()))
476 .extension(buildExtension(
477 deviceService,
478 gateway.tunBridge(),
479 workerNode.dataIp().getIp4Address()),
480 gateway.tunBridge())
481 .setOutput(tunnelPortNumber);
482
483 flowRuleService.setRule(
484 appId,
485 gateway.tunBridge(),
486 sBuilder.build(),
487 tBuilder.build(),
488 PRIORITY_LB_GATEWAY_TUN_BRIDGE_RULE,
489 TUNNEL_DEFAULT_TABLE,
490 install);
491 });
492 });
493 }
494 private void setLbUpstreamRules(KubevirtLoadBalancer loadBalancer,
495 KubevirtRouter router,
496 KubevirtNode gateway, boolean install) {
497
498 Set<KubevirtPort> ports = portService.ports(loadBalancer.networkId());
499
500 loadBalancer.members().forEach(ip -> {
501 ports.stream().filter(port -> port.ipAddress().equals(ip) && port.macAddress() != null)
502 .findAny().ifPresent(port -> {
503 loadBalancer.rules().forEach(rule -> {
504 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
505 .matchEthType(Ethernet.TYPE_IPV4)
506 .matchEthSrc(port.macAddress())
507 .matchIPSrc(port.ipAddress().toIpPrefix());
508
509 switch (rule.protocol().toUpperCase()) {
510 case PROTOCOL_TCP:
511 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP)
512 .matchTcpSrc(TpPort.tpPort(rule.portRangeMin()));
513 break;
514 case PROTOCOL_UDP:
515 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP)
516 .matchUdpSrc(TpPort.tpPort(rule.portRangeMin()));
517 break;
518 case PROTOCOL_ICMP:
519 sBuilder.matchIPProtocol(IPv4.PROTOCOL_ICMP);
520 break;
521 default:
522 break;
523 }
524
525 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
526 .setEthSrc(router.mac())
527 .setIpSrc(loadBalancer.vip())
528 .transition(GW_DROP_TABLE);
529
530 flowRuleService.setRule(
531 appId,
532 gateway.intgBridge(),
533 sBuilder.build(),
534 tBuilder.build(),
535 PRIORITY_LB_RULE,
536 GW_ENTRY_TABLE,
537 install);
538 });
539 });
540 });
541 }
542
543 private void setArpResponseRuleForFloatingIp(KubevirtNode gateway,
544 KubevirtFloatingIp floatingIp,
545 boolean install) {
546 TrafficSelector selector = DefaultTrafficSelector.builder()
547 .matchInPort(externalPatchPortNum(deviceService, gateway))
548 .matchEthType(EthType.EtherType.ARP.ethType().toShort())
549 .matchArpOp(ARP.OP_REQUEST)
550 .matchArpTpa(floatingIp.floatingIp().getIp4Address())
551 .build();
552
553 Device device = deviceService.getDevice(gateway.intgBridge());
554
555 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
556 .extension(RulePopulatorUtil.buildMoveEthSrcToDstExtension(device), device.id())
557 .extension(RulePopulatorUtil.buildMoveArpShaToThaExtension(device), device.id())
558 .extension(RulePopulatorUtil.buildMoveArpSpaToTpaExtension(device), device.id())
559 .setArpOp(ARP.OP_REPLY)
560 .setEthSrc(DEFAULT_GATEWAY_MAC)
561 .setArpSha(DEFAULT_GATEWAY_MAC)
562 .setArpSpa(floatingIp.floatingIp().getIp4Address())
563 .setOutput(PortNumber.IN_PORT)
564 .build();
565
566 flowRuleService.setRule(
567 appId,
568 gateway.intgBridge(),
569 selector,
570 treatment,
571 PRIORITY_ARP_GATEWAY_RULE,
572 GW_ENTRY_TABLE,
573 install);
574 }
575 private void setLbUpstreamRulesForFloatingIp(KubevirtRouter router,
576 KubevirtNode gateway,
577 KubevirtFloatingIp floatingIp,
578 boolean install) {
579 if (router.peerRouter().macAddress() == null) {
580 return;
581 }
582
583 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
584 .matchEthType(Ethernet.TYPE_IPV4)
585 .matchEthSrc(router.mac())
586 .matchIPSrc(floatingIp.fixedIp().toIpPrefix());
587
588 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
589 .setEthSrc(DEFAULT_GATEWAY_MAC)
590 .setIpSrc(floatingIp.floatingIp())
591 .setEthDst(router.peerRouter().macAddress())
592 .setOutput(externalPatchPortNum(deviceService, gateway));
593
594 flowRuleService.setRule(
595 appId,
596 gateway.intgBridge(),
597 sBuilder.build(),
598 tBuilder.build(),
599 PRIORITY_LB_RULE,
600 GW_DROP_TABLE,
601 install);
602 }
603
604 private GroupBucket buildGroupBucket(KubevirtPort port) {
605 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
606 tBuilder.setEthDst(port.macAddress())
607 .setIpDst(port.ipAddress())
608 .setOutput(PortNumber.NORMAL);
609
610 return DefaultGroupBucket.createSelectGroupBucket(tBuilder.build());
611 }
612
613 private class InternalRouterEventListener implements KubevirtRouterListener {
614 private boolean isRelevantHelper() {
615 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
616 }
617
618 @Override
619 public void event(KubevirtRouterEvent event) {
620 switch (event.type()) {
621 case KUBEVIRT_GATEWAY_NODE_ATTACHED:
622 eventExecutor.execute(() -> processRouterGatewayNodeAttached(event.subject(),
623 event.gateway()));
624 break;
625 case KUBEVIRT_GATEWAY_NODE_CHANGED:
626 eventExecutor.execute(() -> processRouterGatewayNodeChanged(event.subject(),
627 event.gateway()));
628 break;
629 case KUBEVIRT_GATEWAY_NODE_DETACHED:
630 eventExecutor.execute(() -> processRouterGatewayNodeDetached(event.subject(),
631 event.gateway()));
632 break;
633
634 case KUBEVIRT_FLOATING_IP_LB_ASSOCIATED:
635 eventExecutor.execute(() -> processFloatingIpAssociated(event.subject(),
636 event.floatingIp()));
637 break;
638 case KUBEVIRT_FLOATING_IP_LB_DISASSOCIATED:
639 eventExecutor.execute(() -> processFloatingIpDisAssociated(event.subject(),
640 event.floatingIp()));
641 break;
642 case KUBEVIRT_PEER_ROUTER_MAC_RETRIEVED:
643 eventExecutor.execute(() -> processPeerRouterRetrieved(event.subject()));
644 break;
645 default:
646 //do nothing
647 break;
648 }
649 }
650
651 private void processPeerRouterRetrieved(KubevirtRouter router) {
652 if (!isRelevantHelper()) {
653 return;
654 }
655
656 if (router.peerRouter().macAddress() == null) {
657 return;
658 }
659
660 if (router.electedGateway() == null) {
661 return;
662 }
663
664 processRouterGatewayNodeAttached(router, router.electedGateway());
665 }
666
667 private void processFloatingIpAssociated(KubevirtRouter router, KubevirtFloatingIp floatingIp) {
668 if (!isRelevantHelper()) {
669 return;
670 }
671
672 if (router.electedGateway() == null) {
673 log.warn("Failed to process processFloatingIpAssociated because there's elected gateway for fip {}",
674 floatingIp.floatingIp());
675 return;
676 }
677
678 KubevirtNode gateway = nodeService.node(router.electedGateway());
679
680 loadBalancerService.loadBalancers().stream()
681 .filter(lb -> lb.vip().equals(floatingIp.fixedIp()))
682 .findAny()
683 .ifPresent(lb -> {
684 setLbUpstreamRulesForFloatingIp(router, gateway, floatingIp, true);
685 setLbDownstreamRulesForFloatingIp(gateway, floatingIp, true);
686 setArpResponseRuleForFloatingIp(gateway, floatingIp, true);
687 processGarpPacketForFloatingIp(floatingIp, gateway);
688 });
689 }
690
691 private void processGarpPacketForFloatingIp(KubevirtFloatingIp floatingIp, KubevirtNode electedGw) {
692 if (floatingIp == null) {
693 return;
694 }
695
696 Ethernet ethernet = buildGarpPacket(DEFAULT_GATEWAY_MAC, floatingIp.floatingIp());
697 if (ethernet == null) {
698 return;
699 }
700
701 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
702 .setOutput(externalPatchPortNum(deviceService, electedGw)).build();
703
704 packetService.emit(new DefaultOutboundPacket(electedGw.intgBridge(), treatment,
705 ByteBuffer.wrap(ethernet.serialize())));
706 }
707
708 private void processFloatingIpDisAssociated(KubevirtRouter router, KubevirtFloatingIp floatingIp) {
709 if (!isRelevantHelper()) {
710 return;
711 }
712
713 if (router.electedGateway() == null) {
714 log.warn("Failed to process processFloatingIpDisAssociated because there's elected gateway for fip {}",
715 floatingIp.floatingIp());
716 return;
717 }
718
719 KubevirtNode gateway = nodeService.node(router.electedGateway());
720
721
722 loadBalancerService.loadBalancers().stream()
723 .filter(lb -> lb.vip().equals(floatingIp.fixedIp()))
724 .findAny()
725 .ifPresent(lb -> {
726 setLbUpstreamRulesForFloatingIp(router, gateway, floatingIp, false);
727 setLbDownstreamRulesForFloatingIp(gateway, floatingIp, false);
728 setArpResponseRuleForFloatingIp(gateway, floatingIp, false);
729 });
730 }
731
732 private void processRouterGatewayNodeAttached(KubevirtRouter router,
733 String associatedGateway) {
734 if (!isRelevantHelper()) {
735 return;
736 }
737
738 KubevirtNode gatewayNode = nodeService.node(associatedGateway);
739 if (gatewayNode == null) {
740 return;
741 }
742
743 getLoadBalancerSetForRouter(router, loadBalancerService).forEach(loadBalancer -> {
744 setLbGroup(loadBalancer, gatewayNode, true);
745 setBucketsToGroup(loadBalancer, gatewayNode, true);
746 setLbDownstreamRules(loadBalancer, router, gatewayNode, true);
747 setLbUpstreamRules(loadBalancer, router, gatewayNode, true);
748
749 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
750 if (network.type() == VXLAN || network.type() == GENEVE || network.type() == GRE) {
751 setLbDownStreamRulesForTunBridge(loadBalancer, gatewayNode, true);
752 }
753
754 routerService.floatingIpsByRouter(router.name())
755 .stream()
756 .filter(fip -> fip.fixedIp() != null && fip.fixedIp().equals(loadBalancer.vip()))
757 .findAny()
758 .ifPresent(fip -> {
759 setLbDownstreamRulesForFloatingIp(gatewayNode, fip, true);
760 setLbUpstreamRulesForFloatingIp(router, gatewayNode, fip, true);
761 setArpResponseRuleForFloatingIp(gatewayNode, fip, true);
762 });
763 });
764 }
765
766 private void processRouterGatewayNodeDetached(KubevirtRouter router,
767 String disAssociatedGateway) {
768 if (!isRelevantHelper()) {
769 return;
770 }
771
772 KubevirtNode gatewayNode = nodeService.node(disAssociatedGateway);
773 if (gatewayNode == null) {
774 return;
775 }
776
777 getLoadBalancerSetForRouter(router, loadBalancerService).forEach(loadBalancer -> {
778 setLbDownstreamRules(loadBalancer, router, gatewayNode, false);
779 setLbUpstreamRules(loadBalancer, router, gatewayNode, false);
780 setBucketsToGroup(loadBalancer, gatewayNode, false);
781 setLbGroup(loadBalancer, gatewayNode, false);
782
783 KubevirtNetwork network = networkService.network(loadBalancer.networkId());
784 if (network.type() == VXLAN || network.type() == GENEVE || network.type() == GRE) {
785 setLbDownStreamRulesForTunBridge(loadBalancer, gatewayNode, false);
786 }
787
788 routerService.floatingIpsByRouter(router.name())
789 .stream()
790 .filter(fip -> fip.fixedIp() != null && fip.fixedIp().equals(loadBalancer.vip()))
791 .findAny()
792 .ifPresent(fip -> {
793 setLbDownstreamRulesForFloatingIp(gatewayNode, fip, false);
794 setLbUpstreamRulesForFloatingIp(router, gatewayNode, fip, false);
795 setArpResponseRuleForFloatingIp(gatewayNode, fip, false);
796 });
797 });
798 }
799
800 private void processRouterGatewayNodeChanged(KubevirtRouter router,
801 String disAssociatedGateway) {
802 if (!isRelevantHelper()) {
803 return;
804 }
805
806 processRouterGatewayNodeDetached(router, disAssociatedGateway);
807
808 KubevirtNode newGatewayNode = nodeService.node(router.electedGateway());
809 if (newGatewayNode == null) {
810 return;
811 }
812 processRouterGatewayNodeAttached(router, newGatewayNode.hostname());
813 }
814 }
815}