blob: 264017abaf326aa16f3066de4512eaa6c4473e14 [file] [log] [blame]
Daniel Parkb9a22022021-03-04 18:58:47 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import org.onlab.packet.ARP;
19import org.onlab.packet.EthType;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.Ip4Address;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.IpPrefix;
24import org.onlab.packet.MacAddress;
25import org.onlab.packet.TpPort;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090026import org.onlab.packet.VlanId;
Daniel Parkb9a22022021-03-04 18:58:47 +090027import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
33import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
Daniel Parkb9a22022021-03-04 18:58:47 +090034import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
35import org.onosproject.kubevirtnetworking.api.KubevirtPort;
36import org.onosproject.kubevirtnetworking.api.KubevirtPortEvent;
37import org.onosproject.kubevirtnetworking.api.KubevirtPortListener;
38import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
39import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
40import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
41import org.onosproject.kubevirtnetworking.api.KubevirtRouterListener;
42import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
43import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
44import org.onosproject.kubevirtnode.api.KubevirtNode;
45import org.onosproject.kubevirtnode.api.KubevirtNodeService;
46import org.onosproject.net.Device;
Daniel Parkb9a22022021-03-04 18:58:47 +090047import org.onosproject.net.PortNumber;
48import org.onosproject.net.device.DeviceAdminService;
49import org.onosproject.net.driver.DriverService;
50import org.onosproject.net.flow.DefaultTrafficSelector;
51import org.onosproject.net.flow.DefaultTrafficTreatment;
52import org.onosproject.net.flow.TrafficSelector;
53import org.onosproject.net.flow.TrafficTreatment;
54import org.onosproject.net.flow.instructions.ExtensionTreatment;
55import org.osgi.service.component.annotations.Activate;
56import org.osgi.service.component.annotations.Component;
57import org.osgi.service.component.annotations.Deactivate;
58import org.osgi.service.component.annotations.Reference;
59import org.osgi.service.component.annotations.ReferenceCardinality;
60import org.slf4j.Logger;
61
62import java.util.Objects;
63import java.util.Set;
64import java.util.concurrent.ExecutorService;
65
66import static java.util.concurrent.Executors.newSingleThreadExecutor;
67import static org.onlab.util.Tools.groupedThreads;
68import static org.onosproject.kubevirtnetworking.api.Constants.DEFAULT_GATEWAY_MAC;
69import static org.onosproject.kubevirtnetworking.api.Constants.FLAT_TABLE;
70import static org.onosproject.kubevirtnetworking.api.Constants.FORWARDING_TABLE;
71import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
72import static org.onosproject.kubevirtnetworking.api.Constants.PRE_FLAT_TABLE;
73import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090074import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FORWARDING_RULE;
Daniel Parkb9a22022021-03-04 18:58:47 +090075import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_STATEFUL_SNAT_RULE;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090076import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
77import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GENEVE;
78import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GRE;
79import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
80import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VXLAN;
81import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.externalPatchPortNum;
Daniel Parkb9a22022021-03-04 18:58:47 +090082import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.gatewayNodeForSpecifiedRouter;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090083import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getExternalNetworkByRouter;
Daniel Parkcc8e7462021-03-09 13:37:42 +090084import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterForKubevirtPort;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090085import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterSnatIpAddress;
86import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterMacAddress;
87import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
Daniel Parkb9a22022021-03-04 18:58:47 +090088import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.CT_NAT_SRC_FLAG;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090089import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
Daniel Parkb9a22022021-03-04 18:58:47 +090090import static org.slf4j.LoggerFactory.getLogger;
91
92/**
93 * Handles kubevirt routing snat.
94 */
95
96@Component(immediate = true)
97public class KubevirtRoutingSnatHandler {
98 protected final Logger log = getLogger(getClass());
99 private static final int DEFAULT_TTL = 0xff;
100
101 private static final int TP_PORT_MINIMUM_NUM = 1025;
102 private static final int TP_PORT_MAXIMUM_NUM = 65535;
103
104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected CoreService coreService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected ClusterService clusterService;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected LeadershipService leadershipService;
112
113 @Reference(cardinality = ReferenceCardinality.MANDATORY)
114 protected DeviceAdminService deviceService;
115
116 @Reference(cardinality = ReferenceCardinality.MANDATORY)
117 protected KubevirtPortService kubevirtPortService;
118
119 @Reference(cardinality = ReferenceCardinality.MANDATORY)
120 protected KubevirtNodeService kubevirtNodeService;
121
122 @Reference(cardinality = ReferenceCardinality.MANDATORY)
123 protected KubevirtNetworkService kubevirtNetworkService;
124
125 @Reference(cardinality = ReferenceCardinality.MANDATORY)
126 protected KubevirtFlowRuleService flowService;
127
128 @Reference(cardinality = ReferenceCardinality.MANDATORY)
129 protected DriverService driverService;
130
131 @Reference(cardinality = ReferenceCardinality.MANDATORY)
132 protected KubevirtRouterService kubevirtRouterService;
133
134 private final ExecutorService eventExecutor = newSingleThreadExecutor(
135 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
136
137 private final InternalKubevirtPortListener kubevirtPortListener =
138 new InternalKubevirtPortListener();
139
140 private final InternalRouterEventListener kubevirtRouterlistener =
141 new InternalRouterEventListener();
142
Daniel Parkb9a22022021-03-04 18:58:47 +0900143 private ApplicationId appId;
144 private NodeId localNodeId;
145
146 @Activate
147 protected void activate() {
148 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
149 localNodeId = clusterService.getLocalNode().id();
150 leadershipService.runForLeadership(appId.name());
151
152 kubevirtPortService.addListener(kubevirtPortListener);
153 kubevirtRouterService.addListener(kubevirtRouterlistener);
Daniel Parkb9a22022021-03-04 18:58:47 +0900154
155 log.info("Started");
156 }
157
158 @Deactivate
159 protected void deactivate() {
160 leadershipService.withdraw(appId.name());
161 kubevirtPortService.removeListener(kubevirtPortListener);
162 kubevirtRouterService.removeListener(kubevirtRouterlistener);
Daniel Parkb9a22022021-03-04 18:58:47 +0900163
164 eventExecutor.shutdown();
165
166 log.info("Stopped");
167 }
168
169 private void initGatewayNodeSnatForRouter(KubevirtRouter router, boolean install) {
170 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
171
172 if (electedGw == null) {
173 log.warn("Fail to initialize gateway node snat for router {} " +
174 "there's no gateway assigned to it", router.name());
175 return;
176 }
177
178 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
179
180 if (routerSnatIp == null) {
181 log.warn("Fail to initialize gateway node snat for router {} " +
182 "there's no gateway snat ip assigned to it", router.name());
183 return;
184 }
185
186 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(routerSnatIp), install);
187 setStatefulSnatUpstreamRules(electedGw, router, Ip4Address.valueOf(routerSnatIp), install);
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900188 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(routerSnatIp), install);
Daniel Parkb9a22022021-03-04 18:58:47 +0900189 }
190
191 private void setArpResponseToPeerRouter(KubevirtNode gatewayNode, Ip4Address ip4Address, boolean install) {
192
193 TrafficSelector selector = DefaultTrafficSelector.builder()
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900194 .matchInPort(externalPatchPortNum(deviceService, gatewayNode))
Daniel Parkb9a22022021-03-04 18:58:47 +0900195 .matchEthType(EthType.EtherType.ARP.ethType().toShort())
196 .matchArpOp(ARP.OP_REQUEST)
197 .matchArpTpa(ip4Address)
198 .build();
199
200 Device device = deviceService.getDevice(gatewayNode.intgBridge());
201
202 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
203 .extension(RulePopulatorUtil.buildMoveEthSrcToDstExtension(device), device.id())
204 .extension(RulePopulatorUtil.buildMoveArpShaToThaExtension(device), device.id())
205 .extension(RulePopulatorUtil.buildMoveArpSpaToTpaExtension(device), device.id())
206 .setArpOp(ARP.OP_REPLY)
207 .setEthSrc(DEFAULT_GATEWAY_MAC)
208 .setArpSha(DEFAULT_GATEWAY_MAC)
209 .setArpSpa(ip4Address)
210 .setOutput(PortNumber.IN_PORT)
211 .build();
212
213 flowService.setRule(
214 appId,
215 gatewayNode.intgBridge(),
216 selector,
217 treatment,
218 PRIORITY_ARP_GATEWAY_RULE,
219 PRE_FLAT_TABLE,
220 install);
221 }
222
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900223 private void setStatefulSnatUpstreamRules(KubevirtNode gatewayNode,
224 KubevirtRouter router,
225 Ip4Address ip4Address,
226 boolean install) {
227 MacAddress routerMacAddress = getRouterMacAddress(router);
228 if (routerMacAddress == null) {
229 return;
230 }
231 MacAddress peerRouterMacAddres = router.peerRouter().macAddress();
232 if (peerRouterMacAddres == null) {
233 return;
234 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900235
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900236 TrafficSelector.Builder selector = DefaultTrafficSelector.builder()
Daniel Parkb9a22022021-03-04 18:58:47 +0900237 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900238 .matchEthDst(routerMacAddress);
Daniel Parkb9a22022021-03-04 18:58:47 +0900239
240 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
241
242 ExtensionTreatment natTreatment = RulePopulatorUtil
243 .niciraConnTrackTreatmentBuilder(driverService, gatewayNode.intgBridge())
244 .commit(true)
245 .natFlag(CT_NAT_SRC_FLAG)
246 .natAction(true)
247 .natIp(ip4Address)
248 .natPortMin(TpPort.tpPort(TP_PORT_MINIMUM_NUM))
249 .natPortMax(TpPort.tpPort(TP_PORT_MAXIMUM_NUM))
250 .build();
251
252 tBuilder.extension(natTreatment, gatewayNode.intgBridge())
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900253 .setEthDst(peerRouterMacAddres)
Daniel Parkb9a22022021-03-04 18:58:47 +0900254 .setEthSrc(DEFAULT_GATEWAY_MAC)
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900255 .setOutput(externalPatchPortNum(deviceService, gatewayNode));
Daniel Parkb9a22022021-03-04 18:58:47 +0900256
257 flowService.setRule(
258 appId,
259 gatewayNode.intgBridge(),
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900260 selector.build(),
Daniel Parkb9a22022021-03-04 18:58:47 +0900261 tBuilder.build(),
262 PRIORITY_STATEFUL_SNAT_RULE,
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900263 PRE_FLAT_TABLE,
Daniel Parkb9a22022021-03-04 18:58:47 +0900264 install);
265 }
266
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900267 private void setStatefulSnatDownStreamRuleForKubevirtPort(KubevirtRouter router,
268 KubevirtNode gatewayNode,
269 KubevirtPort kubevirtPort,
270 boolean install) {
271 MacAddress routerMacAddress = getRouterMacAddress(router);
Daniel Parkb9a22022021-03-04 18:58:47 +0900272
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900273 if (routerMacAddress == null) {
Daniel Parkb9a22022021-03-04 18:58:47 +0900274 log.error("Failed to set stateful snat downstream rule because " +
275 "there's no br-int port for device {}", gatewayNode.intgBridge());
276 return;
277 }
278
279 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
280 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900281 .matchEthSrc(routerMacAddress)
Daniel Parkb9a22022021-03-04 18:58:47 +0900282 .matchIPDst(IpPrefix.valueOf(kubevirtPort.ipAddress(), 32));
283
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900284 KubevirtNetwork network = kubevirtNetworkService.network(kubevirtPort.networkId());
285
286 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Daniel Parkb9a22022021-03-04 18:58:47 +0900287 .setEthDst(kubevirtPort.macAddress())
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900288 .transition(FORWARDING_TABLE);
Daniel Parkb9a22022021-03-04 18:58:47 +0900289
290 flowService.setRule(
291 appId,
292 gatewayNode.intgBridge(),
293 sBuilder.build(),
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900294 tBuilder.build(),
Daniel Parkb9a22022021-03-04 18:58:47 +0900295 PRIORITY_STATEFUL_SNAT_RULE,
296 FLAT_TABLE,
297 install);
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900298
299 if (network.type() == VXLAN || network.type() == GENEVE || network.type() == GRE) {
300 setDownStreamRulesToGatewayTunBridge(router, network, kubevirtPort, install);
301 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900302 }
303
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900304 private void setDownStreamRulesToGatewayTunBridge(KubevirtRouter router,
305 KubevirtNetwork network,
306 KubevirtPort port, boolean install) {
307 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
Daniel Parkb9a22022021-03-04 18:58:47 +0900308
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900309 if (electedGw == null) {
310 return;
311 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900312
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900313 KubevirtNode workerNode = kubevirtNodeService.node(port.deviceId());
314 if (workerNode == null) {
315 return;
316 }
317
318 PortNumber tunnelPortNumber = tunnelPort(electedGw, network);
319 if (tunnelPortNumber == null) {
Daniel Parkb9a22022021-03-04 18:58:47 +0900320 return;
321 }
322
323 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
324 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900325 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32))
326 .matchEthDst(port.macAddress());
327
328 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
329 .setTunnelId(Long.parseLong(network.segmentId()))
330 .extension(buildExtension(
331 deviceService,
332 electedGw.tunBridge(),
333 workerNode.dataIp().getIp4Address()),
334 electedGw.tunBridge())
335 .setOutput(tunnelPortNumber);
336
337 flowService.setRule(
338 appId,
339 electedGw.tunBridge(),
340 sBuilder.build(),
341 tBuilder.build(),
342 PRIORITY_FORWARDING_RULE,
343 TUNNEL_DEFAULT_TABLE,
344 install);
345 }
346
347 private void setStatefulSnatDownstreamRuleForRouter(KubevirtNode gatewayNode,
348 KubevirtRouter router,
349 IpAddress gatewaySnatIp,
350 boolean install) {
351
352 MacAddress routerMacAddress = getRouterMacAddress(router);
353
354 if (routerMacAddress == null) {
355 log.warn("Failed to set stateful snat downstream rule because " +
356 "there's no br-int port for device {}", gatewayNode.intgBridge());
357 return;
358 }
359
360 KubevirtNetwork externalNetwork = getExternalNetworkByRouter(kubevirtNetworkService, router);
361
362 if (externalNetwork == null) {
363 log.warn("Failed to set stateful snat downstream rule because " +
364 "there's no external network router {}", router.name());
365 return;
366 }
367
368 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
369 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
370
371 if (externalNetwork.type() == VLAN) {
372 sBuilder.matchEthType(Ethernet.TYPE_VLAN)
373 .matchVlanId(VlanId.vlanId(externalNetwork.segmentId()));
374 tBuilder.popVlan();
375 } else {
376 sBuilder.matchEthType(Ethernet.TYPE_IPV4);
377 }
378
379 sBuilder.matchIPDst(IpPrefix.valueOf(gatewaySnatIp, 32));
Daniel Parkb9a22022021-03-04 18:58:47 +0900380
381 ExtensionTreatment natTreatment = RulePopulatorUtil
382 .niciraConnTrackTreatmentBuilder(driverService, gatewayNode.intgBridge())
383 .commit(false)
384 .natAction(true)
Daniel Parkcc8e7462021-03-09 13:37:42 +0900385 .table((short) FLAT_TABLE)
Daniel Parkb9a22022021-03-04 18:58:47 +0900386 .build();
387
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900388 tBuilder.setEthSrc(routerMacAddress)
389 .extension(natTreatment, gatewayNode.intgBridge());
Daniel Parkb9a22022021-03-04 18:58:47 +0900390
391 flowService.setRule(
392 appId,
393 gatewayNode.intgBridge(),
394 sBuilder.build(),
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900395 tBuilder.build(),
Daniel Parkb9a22022021-03-04 18:58:47 +0900396 PRIORITY_STATEFUL_SNAT_RULE,
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900397 PRE_FLAT_TABLE,
Daniel Parkb9a22022021-03-04 18:58:47 +0900398 install);
Daniel Parkb9a22022021-03-04 18:58:47 +0900399 }
400
401 private class InternalRouterEventListener implements KubevirtRouterListener {
402 private boolean isRelevantHelper() {
403 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
404 }
405
406 @Override
407 public void event(KubevirtRouterEvent event) {
408 switch (event.type()) {
409 case KUBEVIRT_ROUTER_CREATED:
410 eventExecutor.execute(() -> processRouterCreation(event.subject()));
411 break;
412 case KUBEVIRT_ROUTER_REMOVED:
413 eventExecutor.execute(() -> processRouterDeletion(event.subject()));
414 break;
415 case KUBEVIRT_ROUTER_UPDATED:
416 eventExecutor.execute(() -> processRouterUpdate(event.subject()));
417 break;
418 case KUBEVIRT_ROUTER_INTERNAL_NETWORKS_ATTACHED:
419 eventExecutor.execute(() -> processRouterInternalNetworksAttached(event.subject(),
420 event.internal()));
421 break;
422 case KUBEVIRT_ROUTER_INTERNAL_NETWORKS_DETACHED:
423 eventExecutor.execute(() -> processRouterInternalNetworksDetached(event.subject(),
424 event.internal()));
425 break;
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900426 case KUBEVIRT_GATEWAY_NODE_ATTACHED:
427 eventExecutor.execute(() -> processRouterGatewayNodeAttached(event.subject(),
428 event.gateway()));
429 break;
430 case KUBEVIRT_GATEWAY_NODE_DETACHED:
431 eventExecutor.execute(() -> processRouterGatewayNodeDetached(event.subject(),
432 event.gateway()));
433 break;
434 case KUBEVIRT_ROUTER_EXTERNAL_NETWORK_ATTACHED:
435 eventExecutor.execute(() -> processRouterExternalNetAttached(event.subject(), event.externalIp()));
436 break;
437 case KUBEVIRT_ROUTER_EXTERNAL_NETWORK_DETACHED:
438 eventExecutor.execute(() -> processRouterExternalNetDetached(event.subject(), event.externalIp()));
439 break;
Daniel Parkb9a22022021-03-04 18:58:47 +0900440 default:
441 //do nothing
442 break;
443 }
444 }
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900445
446 private void processRouterExternalNetAttached(KubevirtRouter router, String externalIp) {
447 if (!isRelevantHelper()) {
448 return;
449 }
450 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
451
452 if (electedGw == null) {
453 log.warn("Fail to process router external network attached gateway node snat for router {} " +
454 "there's no gateway assigned to it", router.name());
455 return;
456 }
457
458 if (router.enableSnat() && router.peerRouter() != null && externalIp != null) {
459 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(externalIp), true);
460 setStatefulSnatUpstreamRules(electedGw, router, Ip4Address.valueOf(externalIp), true);
461 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(externalIp), true);
462 }
463
464 router.internal()
465 .stream()
466 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
467 .map(kubevirtNetworkService::network)
468 .forEach(network -> {
469 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
470 if (routerSnatIp == null) {
471 return;
472 }
473 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
474 setStatefulSnatDownStreamRuleForKubevirtPort(router,
475 electedGw, kubevirtPort, true);
476 });
477 });
478 }
479
480 private void processRouterExternalNetDetached(KubevirtRouter router, String externalIp) {
481 if (!isRelevantHelper()) {
482 return;
483 }
484 if (!isRelevantHelper()) {
485 return;
486 }
487 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
488
489 if (electedGw == null) {
490 log.warn("Fail to process router external network attached gateway node snat for router {} " +
491 "there's no gateway assigned to it", router.name());
492 return;
493 }
494
495 if (router.enableSnat() && router.peerRouter() != null && externalIp != null) {
496 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(externalIp), false);
497 setStatefulSnatUpstreamRules(electedGw, router, Ip4Address.valueOf(externalIp), false);
498 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(externalIp), false);
499 }
500
501 router.internal()
502 .stream()
503 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
504 .map(kubevirtNetworkService::network)
505 .forEach(network -> {
506 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
507 if (routerSnatIp == null) {
508 return;
509 }
510 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
511 setStatefulSnatDownStreamRuleForKubevirtPort(router,
512 electedGw, kubevirtPort, false);
513 });
514 });
515 }
516
517 private void processRouterGatewayNodeAttached(KubevirtRouter router, String attachedGatewayId) {
518 if (!isRelevantHelper()) {
519 return;
520 }
521
522 KubevirtNode attachedGateway = kubevirtNodeService.node(attachedGatewayId);
523 if (attachedGateway == null) {
524 return;
525 }
526
527 router.internal()
528 .stream()
529 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
530 .map(kubevirtNetworkService::network)
531 .forEach(network -> {
532 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
533 if (routerSnatIp == null) {
534 return;
535 }
536 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
537 setStatefulSnatDownStreamRuleForKubevirtPort(router,
538 attachedGateway, kubevirtPort, true);
539 });
540 });
541 }
542
543 private void processRouterGatewayNodeDetached(KubevirtRouter router, String detachedGatewayId) {
544 if (!isRelevantHelper()) {
545 return;
546 }
547
548 KubevirtNode detachedGateway = kubevirtNodeService.node(detachedGatewayId);
549 if (detachedGateway == null) {
550 return;
551 }
552
553 router.internal()
554 .stream()
555 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
556 .map(kubevirtNetworkService::network)
557 .forEach(network -> {
558 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
559 if (routerSnatIp == null) {
560 return;
561 }
562
563 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
564 setStatefulSnatDownStreamRuleForKubevirtPort(router,
565 detachedGateway, kubevirtPort, false);
566 });
567 });
568 }
569
Daniel Parkb9a22022021-03-04 18:58:47 +0900570 private void processRouterInternalNetworksAttached(KubevirtRouter router,
571 Set<String> attachedInternalNetworks) {
572 if (!isRelevantHelper()) {
573 return;
574 }
575
576 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
577 if (gwNode == null) {
578 return;
579 }
580
581 attachedInternalNetworks.forEach(networkId -> {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900582 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
583 if (routerSnatIp == null) {
584 return;
585 }
586
Daniel Parkb9a22022021-03-04 18:58:47 +0900587 kubevirtPortService.ports(networkId).forEach(kubevirtPort -> {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900588 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Parkb9a22022021-03-04 18:58:47 +0900589 });
590 });
591 }
592
593 private void processRouterInternalNetworksDetached(KubevirtRouter router,
594 Set<String> detachedInternalNetworks) {
595 if (!isRelevantHelper()) {
596 return;
597 }
598
599 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
600 if (gwNode == null) {
601 return;
602 }
603
604 detachedInternalNetworks.forEach(networkId -> {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900605 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
606 if (routerSnatIp == null) {
607 log.info("snatIp is null");
608 return;
609 }
610
Daniel Parkb9a22022021-03-04 18:58:47 +0900611 kubevirtPortService.ports(networkId).forEach(kubevirtPort -> {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900612 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, false);
Daniel Parkb9a22022021-03-04 18:58:47 +0900613 });
614 });
615 }
616 private void processRouterCreation(KubevirtRouter router) {
617 if (!isRelevantHelper()) {
618 return;
619 }
620 if (router.enableSnat() && !router.external().isEmpty() && router.peerRouter() != null) {
621 initGatewayNodeSnatForRouter(router, true);
622 }
623 }
624
625 private void processRouterDeletion(KubevirtRouter router) {
626 if (!isRelevantHelper()) {
627 return;
628 }
629 if (router.enableSnat() && !router.external().isEmpty() && router.peerRouter() != null) {
630 initGatewayNodeSnatForRouter(router, false);
631 }
632 }
633
634 private void processRouterUpdate(KubevirtRouter router) {
635 if (!isRelevantHelper()) {
636 return;
637 }
638 if (router.enableSnat() && !router.external().isEmpty() && router.peerRouter() != null) {
639 initGatewayNodeSnatForRouter(router, true);
640 }
641 }
642 }
643
Daniel Parkb9a22022021-03-04 18:58:47 +0900644 private class InternalKubevirtPortListener implements KubevirtPortListener {
645
646 private boolean isRelevantHelper() {
647 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
648 }
649
650 @Override
651 public void event(KubevirtPortEvent event) {
652 switch (event.type()) {
653 case KUBEVIRT_PORT_CREATED:
654 eventExecutor.execute(() -> processPortCreation(event.subject()));
655 break;
656 case KUBEVIRT_PORT_UPDATED:
657 eventExecutor.execute(() -> processPortUpdate(event.subject()));
658 break;
659 case KUBEVIRT_PORT_REMOVED:
660 eventExecutor.execute(() -> processPortDeletion(event.subject()));
661 break;
662 default:
663 //do nothing
664 break;
665 }
666 }
667
668 private void processPortCreation(KubevirtPort kubevirtPort) {
669 if (!isRelevantHelper()) {
670 return;
671 }
672
Daniel Parkcc8e7462021-03-09 13:37:42 +0900673 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Parkb9a22022021-03-04 18:58:47 +0900674 if (router == null) {
675 return;
676 }
677
678 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
679
680 if (gwNode != null) {
681 IpAddress gatewaySnatIp = getRouterSnatIpAddress(kubevirtRouterService, kubevirtPort.networkId());
682 if (gatewaySnatIp == null) {
683 return;
684 }
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900685 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Parkb9a22022021-03-04 18:58:47 +0900686 }
687 }
688
689 private void processPortUpdate(KubevirtPort kubevirtPort) {
690 if (!isRelevantHelper()) {
691 return;
692 }
693
Daniel Parkcc8e7462021-03-09 13:37:42 +0900694 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Parkb9a22022021-03-04 18:58:47 +0900695 if (router == null) {
696 return;
697 }
698
699 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
700
701 if (gwNode != null) {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900702 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Parkb9a22022021-03-04 18:58:47 +0900703 }
704 }
705
706 private void processPortDeletion(KubevirtPort kubevirtPort) {
707 if (!isRelevantHelper()) {
708 return;
709 }
710
Daniel Parkcc8e7462021-03-09 13:37:42 +0900711 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Parkb9a22022021-03-04 18:58:47 +0900712 if (router == null) {
713 return;
714 }
715
716 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
717
718 if (gwNode != null) {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900719 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, false);
Daniel Parkb9a22022021-03-04 18:58:47 +0900720 }
721 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900722 }
723}