blob: 8323f9891c597e1a30d4b1b92220ac33946bf8bd [file] [log] [blame]
Daniel Park2884b232021-03-04 18:58:47 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import org.onlab.packet.ARP;
19import org.onlab.packet.EthType;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.Ip4Address;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.IpPrefix;
24import org.onlab.packet.MacAddress;
25import org.onlab.packet.TpPort;
Daniel Parkf3136042021-03-10 07:49:11 +090026import org.onlab.packet.VlanId;
Daniel Park2884b232021-03-04 18:58:47 +090027import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
33import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
Daniel Park2884b232021-03-04 18:58:47 +090034import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
35import org.onosproject.kubevirtnetworking.api.KubevirtPort;
36import org.onosproject.kubevirtnetworking.api.KubevirtPortEvent;
37import org.onosproject.kubevirtnetworking.api.KubevirtPortListener;
38import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
39import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
40import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
41import org.onosproject.kubevirtnetworking.api.KubevirtRouterListener;
42import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
43import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
44import org.onosproject.kubevirtnode.api.KubevirtNode;
Jian Li517597a2021-03-22 11:04:52 +090045import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
46import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
Daniel Park2884b232021-03-04 18:58:47 +090047import org.onosproject.kubevirtnode.api.KubevirtNodeService;
48import org.onosproject.net.Device;
Daniel Park2884b232021-03-04 18:58:47 +090049import org.onosproject.net.PortNumber;
50import org.onosproject.net.device.DeviceAdminService;
51import org.onosproject.net.driver.DriverService;
52import org.onosproject.net.flow.DefaultTrafficSelector;
53import org.onosproject.net.flow.DefaultTrafficTreatment;
54import org.onosproject.net.flow.TrafficSelector;
55import org.onosproject.net.flow.TrafficTreatment;
56import org.onosproject.net.flow.instructions.ExtensionTreatment;
57import org.osgi.service.component.annotations.Activate;
58import org.osgi.service.component.annotations.Component;
59import org.osgi.service.component.annotations.Deactivate;
60import org.osgi.service.component.annotations.Reference;
61import org.osgi.service.component.annotations.ReferenceCardinality;
62import org.slf4j.Logger;
63
64import java.util.Objects;
65import java.util.Set;
66import java.util.concurrent.ExecutorService;
67
68import static java.util.concurrent.Executors.newSingleThreadExecutor;
69import static org.onlab.util.Tools.groupedThreads;
70import static org.onosproject.kubevirtnetworking.api.Constants.DEFAULT_GATEWAY_MAC;
71import static org.onosproject.kubevirtnetworking.api.Constants.FLAT_TABLE;
72import static org.onosproject.kubevirtnetworking.api.Constants.FORWARDING_TABLE;
73import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
74import static org.onosproject.kubevirtnetworking.api.Constants.PRE_FLAT_TABLE;
75import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
Daniel Parkf3136042021-03-10 07:49:11 +090076import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FORWARDING_RULE;
Daniel Park2884b232021-03-04 18:58:47 +090077import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_STATEFUL_SNAT_RULE;
Daniel Parkf3136042021-03-10 07:49:11 +090078import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
79import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GENEVE;
80import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GRE;
81import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
82import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VXLAN;
83import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.externalPatchPortNum;
Daniel Park2884b232021-03-04 18:58:47 +090084import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.gatewayNodeForSpecifiedRouter;
Daniel Parkbabde9c2021-03-09 13:37:42 +090085import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterForKubevirtPort;
Daniel Parkf3136042021-03-10 07:49:11 +090086import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterMacAddress;
Daniel Park5a3e9392021-03-23 08:00:00 +090087import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterSnatIpAddress;
Daniel Parkf3136042021-03-10 07:49:11 +090088import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
Daniel Park2884b232021-03-04 18:58:47 +090089import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.CT_NAT_SRC_FLAG;
Daniel Parkf3136042021-03-10 07:49:11 +090090import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
Daniel Park2884b232021-03-04 18:58:47 +090091import static org.slf4j.LoggerFactory.getLogger;
92
93/**
94 * Handles kubevirt routing snat.
95 */
96
97@Component(immediate = true)
98public class KubevirtRoutingSnatHandler {
99 protected final Logger log = getLogger(getClass());
100 private static final int DEFAULT_TTL = 0xff;
101
102 private static final int TP_PORT_MINIMUM_NUM = 1025;
103 private static final int TP_PORT_MAXIMUM_NUM = 65535;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected CoreService coreService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected ClusterService clusterService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected LeadershipService leadershipService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected DeviceAdminService deviceService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected KubevirtPortService kubevirtPortService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected KubevirtNodeService kubevirtNodeService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
124 protected KubevirtNetworkService kubevirtNetworkService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected KubevirtFlowRuleService flowService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
130 protected DriverService driverService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
133 protected KubevirtRouterService kubevirtRouterService;
134
135 private final ExecutorService eventExecutor = newSingleThreadExecutor(
136 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
137
138 private final InternalKubevirtPortListener kubevirtPortListener =
139 new InternalKubevirtPortListener();
140
141 private final InternalRouterEventListener kubevirtRouterlistener =
142 new InternalRouterEventListener();
143
Jian Li517597a2021-03-22 11:04:52 +0900144 private final InternalNodeEventListener kubevirtNodeListener =
145 new InternalNodeEventListener();
146
Daniel Park2884b232021-03-04 18:58:47 +0900147 private ApplicationId appId;
148 private NodeId localNodeId;
149
150 @Activate
151 protected void activate() {
152 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
153 localNodeId = clusterService.getLocalNode().id();
154 leadershipService.runForLeadership(appId.name());
155
156 kubevirtPortService.addListener(kubevirtPortListener);
157 kubevirtRouterService.addListener(kubevirtRouterlistener);
Jian Li517597a2021-03-22 11:04:52 +0900158 kubevirtNodeService.addListener(kubevirtNodeListener);
Daniel Park2884b232021-03-04 18:58:47 +0900159
160 log.info("Started");
161 }
162
163 @Deactivate
164 protected void deactivate() {
165 leadershipService.withdraw(appId.name());
Jian Li517597a2021-03-22 11:04:52 +0900166 kubevirtNodeService.removeListener(kubevirtNodeListener);
Daniel Park2884b232021-03-04 18:58:47 +0900167 kubevirtPortService.removeListener(kubevirtPortListener);
168 kubevirtRouterService.removeListener(kubevirtRouterlistener);
Daniel Park2884b232021-03-04 18:58:47 +0900169
170 eventExecutor.shutdown();
171
172 log.info("Stopped");
173 }
174
175 private void initGatewayNodeSnatForRouter(KubevirtRouter router, boolean install) {
Daniel Park8ad7c3b2021-04-09 15:45:59 +0900176 if (router.electedGateway() == null) {
177 log.warn("Fail to initialize gateway node snat for router {} " +
178 "because there's no gateway assigned to it", router.name());
179 return;
180 }
181
182 KubevirtNode electedGw = kubevirtNodeService.node(router.electedGateway());
Daniel Park2884b232021-03-04 18:58:47 +0900183
184 if (electedGw == null) {
185 log.warn("Fail to initialize gateway node snat for router {} " +
Daniel Park8ad7c3b2021-04-09 15:45:59 +0900186 "because there's no gateway assigned to it", router.name());
Daniel Park2884b232021-03-04 18:58:47 +0900187 return;
188 }
189
190 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
191
192 if (routerSnatIp == null) {
193 log.warn("Fail to initialize gateway node snat for router {} " +
Daniel Park8ad7c3b2021-04-09 15:45:59 +0900194 "because there's no gateway snat ip assigned to it", router.name());
Daniel Park2884b232021-03-04 18:58:47 +0900195 return;
196 }
197
Daniel Park5a3e9392021-03-23 08:00:00 +0900198 String externalNet = router.external().values().stream().findAny().orElse(null);
199 if (externalNet == null) {
200 return;
201 }
202
203 if (router.peerRouter() != null &&
204 router.peerRouter().ipAddress() != null && router.peerRouter().macAddress() != null) {
205 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(routerSnatIp), install);
206 setStatefulSnatUpstreamRules(electedGw, router, Ip4Address.valueOf(routerSnatIp),
207 router.peerRouter().macAddress(), install);
208 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(routerSnatIp),
209 kubevirtNetworkService.network(externalNet), install);
210 }
Daniel Park2884b232021-03-04 18:58:47 +0900211 }
212
213 private void setArpResponseToPeerRouter(KubevirtNode gatewayNode, Ip4Address ip4Address, boolean install) {
214
215 TrafficSelector selector = DefaultTrafficSelector.builder()
Daniel Parkf3136042021-03-10 07:49:11 +0900216 .matchInPort(externalPatchPortNum(deviceService, gatewayNode))
Daniel Park2884b232021-03-04 18:58:47 +0900217 .matchEthType(EthType.EtherType.ARP.ethType().toShort())
218 .matchArpOp(ARP.OP_REQUEST)
219 .matchArpTpa(ip4Address)
220 .build();
221
222 Device device = deviceService.getDevice(gatewayNode.intgBridge());
223
224 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
225 .extension(RulePopulatorUtil.buildMoveEthSrcToDstExtension(device), device.id())
226 .extension(RulePopulatorUtil.buildMoveArpShaToThaExtension(device), device.id())
227 .extension(RulePopulatorUtil.buildMoveArpSpaToTpaExtension(device), device.id())
228 .setArpOp(ARP.OP_REPLY)
229 .setEthSrc(DEFAULT_GATEWAY_MAC)
230 .setArpSha(DEFAULT_GATEWAY_MAC)
231 .setArpSpa(ip4Address)
232 .setOutput(PortNumber.IN_PORT)
233 .build();
234
235 flowService.setRule(
236 appId,
237 gatewayNode.intgBridge(),
238 selector,
239 treatment,
240 PRIORITY_ARP_GATEWAY_RULE,
241 PRE_FLAT_TABLE,
242 install);
243 }
244
Daniel Parkf3136042021-03-10 07:49:11 +0900245 private void setStatefulSnatUpstreamRules(KubevirtNode gatewayNode,
246 KubevirtRouter router,
Daniel Park5a3e9392021-03-23 08:00:00 +0900247 Ip4Address routerSnatIp,
248 MacAddress peerRouterMacAddress,
Daniel Parkf3136042021-03-10 07:49:11 +0900249 boolean install) {
250 MacAddress routerMacAddress = getRouterMacAddress(router);
251 if (routerMacAddress == null) {
252 return;
253 }
Daniel Park5a3e9392021-03-23 08:00:00 +0900254
255 if (routerSnatIp == null || peerRouterMacAddress == null) {
Daniel Parkf3136042021-03-10 07:49:11 +0900256 return;
257 }
Daniel Park2884b232021-03-04 18:58:47 +0900258
Daniel Parkf3136042021-03-10 07:49:11 +0900259 TrafficSelector.Builder selector = DefaultTrafficSelector.builder()
Daniel Park2884b232021-03-04 18:58:47 +0900260 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parkf3136042021-03-10 07:49:11 +0900261 .matchEthDst(routerMacAddress);
Daniel Park2884b232021-03-04 18:58:47 +0900262
263 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
264
265 ExtensionTreatment natTreatment = RulePopulatorUtil
266 .niciraConnTrackTreatmentBuilder(driverService, gatewayNode.intgBridge())
267 .commit(true)
268 .natFlag(CT_NAT_SRC_FLAG)
269 .natAction(true)
Daniel Park5a3e9392021-03-23 08:00:00 +0900270 .natIp(routerSnatIp)
Daniel Park2884b232021-03-04 18:58:47 +0900271 .natPortMin(TpPort.tpPort(TP_PORT_MINIMUM_NUM))
272 .natPortMax(TpPort.tpPort(TP_PORT_MAXIMUM_NUM))
273 .build();
274
275 tBuilder.extension(natTreatment, gatewayNode.intgBridge())
Daniel Park5a3e9392021-03-23 08:00:00 +0900276 .setEthDst(peerRouterMacAddress)
Daniel Park2884b232021-03-04 18:58:47 +0900277 .setEthSrc(DEFAULT_GATEWAY_MAC)
Daniel Parkf3136042021-03-10 07:49:11 +0900278 .setOutput(externalPatchPortNum(deviceService, gatewayNode));
Daniel Park2884b232021-03-04 18:58:47 +0900279
280 flowService.setRule(
281 appId,
282 gatewayNode.intgBridge(),
Daniel Parkf3136042021-03-10 07:49:11 +0900283 selector.build(),
Daniel Park2884b232021-03-04 18:58:47 +0900284 tBuilder.build(),
285 PRIORITY_STATEFUL_SNAT_RULE,
Daniel Parkf3136042021-03-10 07:49:11 +0900286 PRE_FLAT_TABLE,
Daniel Park2884b232021-03-04 18:58:47 +0900287 install);
288 }
289
Daniel Parkf3136042021-03-10 07:49:11 +0900290 private void setStatefulSnatDownStreamRuleForKubevirtPort(KubevirtRouter router,
291 KubevirtNode gatewayNode,
292 KubevirtPort kubevirtPort,
293 boolean install) {
294 MacAddress routerMacAddress = getRouterMacAddress(router);
Daniel Park2884b232021-03-04 18:58:47 +0900295
Daniel Parkf3136042021-03-10 07:49:11 +0900296 if (routerMacAddress == null) {
Daniel Park2884b232021-03-04 18:58:47 +0900297 log.error("Failed to set stateful snat downstream rule because " +
298 "there's no br-int port for device {}", gatewayNode.intgBridge());
299 return;
300 }
301
302 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
303 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parkf3136042021-03-10 07:49:11 +0900304 .matchEthSrc(routerMacAddress)
Daniel Park2884b232021-03-04 18:58:47 +0900305 .matchIPDst(IpPrefix.valueOf(kubevirtPort.ipAddress(), 32));
306
Daniel Parkf3136042021-03-10 07:49:11 +0900307 KubevirtNetwork network = kubevirtNetworkService.network(kubevirtPort.networkId());
308
309 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Daniel Park2884b232021-03-04 18:58:47 +0900310 .setEthDst(kubevirtPort.macAddress())
Daniel Parkf3136042021-03-10 07:49:11 +0900311 .transition(FORWARDING_TABLE);
Daniel Park2884b232021-03-04 18:58:47 +0900312
313 flowService.setRule(
314 appId,
315 gatewayNode.intgBridge(),
316 sBuilder.build(),
Daniel Parkf3136042021-03-10 07:49:11 +0900317 tBuilder.build(),
Daniel Park2884b232021-03-04 18:58:47 +0900318 PRIORITY_STATEFUL_SNAT_RULE,
319 FLAT_TABLE,
320 install);
Daniel Parkf3136042021-03-10 07:49:11 +0900321
322 if (network.type() == VXLAN || network.type() == GENEVE || network.type() == GRE) {
323 setDownStreamRulesToGatewayTunBridge(router, network, kubevirtPort, install);
324 }
Daniel Park2884b232021-03-04 18:58:47 +0900325 }
326
Daniel Parkf3136042021-03-10 07:49:11 +0900327 private void setDownStreamRulesToGatewayTunBridge(KubevirtRouter router,
328 KubevirtNetwork network,
329 KubevirtPort port, boolean install) {
330 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
Daniel Park2884b232021-03-04 18:58:47 +0900331
Daniel Parkf3136042021-03-10 07:49:11 +0900332 if (electedGw == null) {
333 return;
334 }
Daniel Park2884b232021-03-04 18:58:47 +0900335
Daniel Parkf3136042021-03-10 07:49:11 +0900336 KubevirtNode workerNode = kubevirtNodeService.node(port.deviceId());
337 if (workerNode == null) {
338 return;
339 }
340
341 PortNumber tunnelPortNumber = tunnelPort(electedGw, network);
342 if (tunnelPortNumber == null) {
Daniel Park2884b232021-03-04 18:58:47 +0900343 return;
344 }
345
346 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
347 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parkf3136042021-03-10 07:49:11 +0900348 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32))
349 .matchEthDst(port.macAddress());
350
351 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
352 .setTunnelId(Long.parseLong(network.segmentId()))
353 .extension(buildExtension(
354 deviceService,
355 electedGw.tunBridge(),
356 workerNode.dataIp().getIp4Address()),
357 electedGw.tunBridge())
358 .setOutput(tunnelPortNumber);
359
360 flowService.setRule(
361 appId,
362 electedGw.tunBridge(),
363 sBuilder.build(),
364 tBuilder.build(),
365 PRIORITY_FORWARDING_RULE,
366 TUNNEL_DEFAULT_TABLE,
367 install);
368 }
369
370 private void setStatefulSnatDownstreamRuleForRouter(KubevirtNode gatewayNode,
371 KubevirtRouter router,
Daniel Park5a3e9392021-03-23 08:00:00 +0900372 IpAddress routerSnatIp,
373 KubevirtNetwork externalNetwork,
Daniel Parkf3136042021-03-10 07:49:11 +0900374 boolean install) {
375
376 MacAddress routerMacAddress = getRouterMacAddress(router);
377
378 if (routerMacAddress == null) {
379 log.warn("Failed to set stateful snat downstream rule because " +
380 "there's no br-int port for device {}", gatewayNode.intgBridge());
381 return;
382 }
383
Daniel Parkf3136042021-03-10 07:49:11 +0900384 if (externalNetwork == null) {
385 log.warn("Failed to set stateful snat downstream rule because " +
386 "there's no external network router {}", router.name());
387 return;
388 }
389
390 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
391 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
392
393 if (externalNetwork.type() == VLAN) {
394 sBuilder.matchEthType(Ethernet.TYPE_VLAN)
395 .matchVlanId(VlanId.vlanId(externalNetwork.segmentId()));
396 tBuilder.popVlan();
397 } else {
398 sBuilder.matchEthType(Ethernet.TYPE_IPV4);
399 }
400
Daniel Park5a3e9392021-03-23 08:00:00 +0900401 sBuilder.matchIPDst(IpPrefix.valueOf(routerSnatIp, 32));
Daniel Park2884b232021-03-04 18:58:47 +0900402
403 ExtensionTreatment natTreatment = RulePopulatorUtil
404 .niciraConnTrackTreatmentBuilder(driverService, gatewayNode.intgBridge())
405 .commit(false)
406 .natAction(true)
Daniel Parkbabde9c2021-03-09 13:37:42 +0900407 .table((short) FLAT_TABLE)
Daniel Park2884b232021-03-04 18:58:47 +0900408 .build();
409
Daniel Parkf3136042021-03-10 07:49:11 +0900410 tBuilder.setEthSrc(routerMacAddress)
411 .extension(natTreatment, gatewayNode.intgBridge());
Daniel Park2884b232021-03-04 18:58:47 +0900412
413 flowService.setRule(
414 appId,
415 gatewayNode.intgBridge(),
416 sBuilder.build(),
Daniel Parkf3136042021-03-10 07:49:11 +0900417 tBuilder.build(),
Daniel Park2884b232021-03-04 18:58:47 +0900418 PRIORITY_STATEFUL_SNAT_RULE,
Daniel Parkf3136042021-03-10 07:49:11 +0900419 PRE_FLAT_TABLE,
Daniel Park2884b232021-03-04 18:58:47 +0900420 install);
Daniel Park2884b232021-03-04 18:58:47 +0900421 }
422
423 private class InternalRouterEventListener implements KubevirtRouterListener {
424 private boolean isRelevantHelper() {
425 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
426 }
427
428 @Override
429 public void event(KubevirtRouterEvent event) {
430 switch (event.type()) {
431 case KUBEVIRT_ROUTER_CREATED:
432 eventExecutor.execute(() -> processRouterCreation(event.subject()));
433 break;
Daniel Park8ad7c3b2021-04-09 15:45:59 +0900434 case KUBEVIRT_SNAT_STATUS_DISABLED:
Daniel Park2884b232021-03-04 18:58:47 +0900435 case KUBEVIRT_ROUTER_REMOVED:
436 eventExecutor.execute(() -> processRouterDeletion(event.subject()));
437 break;
438 case KUBEVIRT_ROUTER_UPDATED:
439 eventExecutor.execute(() -> processRouterUpdate(event.subject()));
440 break;
441 case KUBEVIRT_ROUTER_INTERNAL_NETWORKS_ATTACHED:
442 eventExecutor.execute(() -> processRouterInternalNetworksAttached(event.subject(),
443 event.internal()));
444 break;
445 case KUBEVIRT_ROUTER_INTERNAL_NETWORKS_DETACHED:
446 eventExecutor.execute(() -> processRouterInternalNetworksDetached(event.subject(),
447 event.internal()));
448 break;
Daniel Parkf3136042021-03-10 07:49:11 +0900449 case KUBEVIRT_GATEWAY_NODE_ATTACHED:
450 eventExecutor.execute(() -> processRouterGatewayNodeAttached(event.subject(),
451 event.gateway()));
452 break;
453 case KUBEVIRT_GATEWAY_NODE_DETACHED:
454 eventExecutor.execute(() -> processRouterGatewayNodeDetached(event.subject(),
455 event.gateway()));
456 break;
457 case KUBEVIRT_ROUTER_EXTERNAL_NETWORK_ATTACHED:
Daniel Park5a3e9392021-03-23 08:00:00 +0900458 eventExecutor.execute(() -> processRouterExternalNetAttached(event.subject(),
459 event.externalIp(), event.externalNet(),
460 event.externalPeerRouterIp(), event.peerRouterMac()));
Daniel Parkf3136042021-03-10 07:49:11 +0900461 break;
462 case KUBEVIRT_ROUTER_EXTERNAL_NETWORK_DETACHED:
Daniel Park5a3e9392021-03-23 08:00:00 +0900463 eventExecutor.execute(() -> processRouterExternalNetDetached(event.subject(),
464 event.externalIp(), event.externalNet(),
465 event.externalPeerRouterIp(), event.peerRouterMac()));
Daniel Parkf3136042021-03-10 07:49:11 +0900466 break;
Daniel Park2884b232021-03-04 18:58:47 +0900467 default:
468 //do nothing
469 break;
470 }
471 }
Daniel Parkf3136042021-03-10 07:49:11 +0900472
Daniel Park5a3e9392021-03-23 08:00:00 +0900473 private void processRouterExternalNetAttached(KubevirtRouter router, String externalIp, String externalNet,
474 String peerRouterIp, MacAddress peerRouterMac) {
Daniel Parkf3136042021-03-10 07:49:11 +0900475 if (!isRelevantHelper()) {
476 return;
477 }
478 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
479
480 if (electedGw == null) {
481 log.warn("Fail to process router external network attached gateway node snat for router {} " +
482 "there's no gateway assigned to it", router.name());
483 return;
484 }
485
Daniel Park5a3e9392021-03-23 08:00:00 +0900486 if (router.enableSnat() &&
487 peerRouterIp != null && peerRouterMac != null && externalIp != null && externalNet != null) {
Daniel Parkf3136042021-03-10 07:49:11 +0900488 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(externalIp), true);
Daniel Park5a3e9392021-03-23 08:00:00 +0900489 setStatefulSnatUpstreamRules(electedGw, router, Ip4Address.valueOf(externalIp),
490 peerRouterMac, true);
491 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(externalIp),
492 kubevirtNetworkService.network(externalNet), true);
Daniel Parkf3136042021-03-10 07:49:11 +0900493 }
494
495 router.internal()
496 .stream()
497 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
498 .map(kubevirtNetworkService::network)
499 .forEach(network -> {
Daniel Parkf3136042021-03-10 07:49:11 +0900500 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
501 setStatefulSnatDownStreamRuleForKubevirtPort(router,
502 electedGw, kubevirtPort, true);
503 });
504 });
505 }
506
Daniel Park5a3e9392021-03-23 08:00:00 +0900507 private void processRouterExternalNetDetached(KubevirtRouter router, String externalIp, String externalNet,
508 String peerRouterIp, MacAddress peerRouterMac) {
Daniel Parkf3136042021-03-10 07:49:11 +0900509 if (!isRelevantHelper()) {
510 return;
511 }
512 if (!isRelevantHelper()) {
513 return;
514 }
515 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
516
517 if (electedGw == null) {
518 log.warn("Fail to process router external network attached gateway node snat for router {} " +
519 "there's no gateway assigned to it", router.name());
520 return;
521 }
522
Daniel Park5a3e9392021-03-23 08:00:00 +0900523 if (router.enableSnat() &&
524 peerRouterIp != null && peerRouterMac != null && externalIp != null && externalNet != null) {
Daniel Parkf3136042021-03-10 07:49:11 +0900525 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(externalIp), false);
Daniel Park5a3e9392021-03-23 08:00:00 +0900526 setStatefulSnatUpstreamRules(electedGw, router,
527 Ip4Address.valueOf(externalIp), peerRouterMac, false);
528 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(externalIp),
529 kubevirtNetworkService.network(externalNet), false);
Daniel Parkf3136042021-03-10 07:49:11 +0900530 }
531
532 router.internal()
533 .stream()
534 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
535 .map(kubevirtNetworkService::network)
536 .forEach(network -> {
Daniel Parkf3136042021-03-10 07:49:11 +0900537 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
538 setStatefulSnatDownStreamRuleForKubevirtPort(router,
539 electedGw, kubevirtPort, false);
540 });
541 });
542 }
543
544 private void processRouterGatewayNodeAttached(KubevirtRouter router, String attachedGatewayId) {
545 if (!isRelevantHelper()) {
546 return;
547 }
548
549 KubevirtNode attachedGateway = kubevirtNodeService.node(attachedGatewayId);
550 if (attachedGateway == null) {
551 return;
552 }
553
Daniel Park8ad7c3b2021-04-09 15:45:59 +0900554 if (!router.enableSnat()) {
555 return;
556 }
557
Daniel Parkf3136042021-03-10 07:49:11 +0900558 router.internal()
559 .stream()
560 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
561 .map(kubevirtNetworkService::network)
562 .forEach(network -> {
563 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
564 if (routerSnatIp == null) {
565 return;
566 }
567 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
568 setStatefulSnatDownStreamRuleForKubevirtPort(router,
569 attachedGateway, kubevirtPort, true);
570 });
571 });
572 }
573
574 private void processRouterGatewayNodeDetached(KubevirtRouter router, String detachedGatewayId) {
575 if (!isRelevantHelper()) {
576 return;
577 }
578
579 KubevirtNode detachedGateway = kubevirtNodeService.node(detachedGatewayId);
580 if (detachedGateway == null) {
581 return;
582 }
583
Daniel Park8ad7c3b2021-04-09 15:45:59 +0900584 if (!router.enableSnat()) {
585 return;
586 }
587
Daniel Parkf3136042021-03-10 07:49:11 +0900588 router.internal()
589 .stream()
590 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
591 .map(kubevirtNetworkService::network)
592 .forEach(network -> {
593 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
594 if (routerSnatIp == null) {
595 return;
596 }
597
598 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
599 setStatefulSnatDownStreamRuleForKubevirtPort(router,
600 detachedGateway, kubevirtPort, false);
601 });
602 });
603 }
604
Daniel Park2884b232021-03-04 18:58:47 +0900605 private void processRouterInternalNetworksAttached(KubevirtRouter router,
606 Set<String> attachedInternalNetworks) {
607 if (!isRelevantHelper()) {
608 return;
609 }
610
611 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
612 if (gwNode == null) {
613 return;
614 }
615
Daniel Park8ad7c3b2021-04-09 15:45:59 +0900616 if (!router.enableSnat()) {
617 return;
618 }
619
Daniel Park2884b232021-03-04 18:58:47 +0900620 attachedInternalNetworks.forEach(networkId -> {
Daniel Parkf3136042021-03-10 07:49:11 +0900621 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
622 if (routerSnatIp == null) {
623 return;
624 }
625
Daniel Park2884b232021-03-04 18:58:47 +0900626 kubevirtPortService.ports(networkId).forEach(kubevirtPort -> {
Daniel Parkf3136042021-03-10 07:49:11 +0900627 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Park2884b232021-03-04 18:58:47 +0900628 });
629 });
630 }
631
632 private void processRouterInternalNetworksDetached(KubevirtRouter router,
633 Set<String> detachedInternalNetworks) {
634 if (!isRelevantHelper()) {
635 return;
636 }
637
638 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
639 if (gwNode == null) {
640 return;
641 }
642
Daniel Park8ad7c3b2021-04-09 15:45:59 +0900643 if (!router.enableSnat()) {
644 return;
645 }
646
Daniel Park2884b232021-03-04 18:58:47 +0900647 detachedInternalNetworks.forEach(networkId -> {
Daniel Parkf3136042021-03-10 07:49:11 +0900648 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
649 if (routerSnatIp == null) {
650 log.info("snatIp is null");
651 return;
652 }
653
Daniel Park2884b232021-03-04 18:58:47 +0900654 kubevirtPortService.ports(networkId).forEach(kubevirtPort -> {
Daniel Parkf3136042021-03-10 07:49:11 +0900655 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, false);
Daniel Park2884b232021-03-04 18:58:47 +0900656 });
657 });
658 }
659 private void processRouterCreation(KubevirtRouter router) {
660 if (!isRelevantHelper()) {
661 return;
662 }
663 if (router.enableSnat() && !router.external().isEmpty() && router.peerRouter() != null) {
664 initGatewayNodeSnatForRouter(router, true);
665 }
666 }
667
668 private void processRouterDeletion(KubevirtRouter router) {
669 if (!isRelevantHelper()) {
670 return;
671 }
Daniel Park8ad7c3b2021-04-09 15:45:59 +0900672 if (!router.external().isEmpty() && router.peerRouter() != null) {
Daniel Park2884b232021-03-04 18:58:47 +0900673 initGatewayNodeSnatForRouter(router, false);
674 }
675 }
676
677 private void processRouterUpdate(KubevirtRouter router) {
678 if (!isRelevantHelper()) {
679 return;
680 }
681 if (router.enableSnat() && !router.external().isEmpty() && router.peerRouter() != null) {
682 initGatewayNodeSnatForRouter(router, true);
683 }
684 }
685 }
686
Daniel Park2884b232021-03-04 18:58:47 +0900687 private class InternalKubevirtPortListener implements KubevirtPortListener {
688
689 private boolean isRelevantHelper() {
690 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
691 }
692
693 @Override
694 public void event(KubevirtPortEvent event) {
695 switch (event.type()) {
696 case KUBEVIRT_PORT_CREATED:
697 eventExecutor.execute(() -> processPortCreation(event.subject()));
698 break;
699 case KUBEVIRT_PORT_UPDATED:
700 eventExecutor.execute(() -> processPortUpdate(event.subject()));
701 break;
702 case KUBEVIRT_PORT_REMOVED:
703 eventExecutor.execute(() -> processPortDeletion(event.subject()));
704 break;
705 default:
706 //do nothing
707 break;
708 }
709 }
710
711 private void processPortCreation(KubevirtPort kubevirtPort) {
712 if (!isRelevantHelper()) {
713 return;
714 }
715
Daniel Parkbabde9c2021-03-09 13:37:42 +0900716 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Park2884b232021-03-04 18:58:47 +0900717 if (router == null) {
718 return;
719 }
720
721 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
722
723 if (gwNode != null) {
724 IpAddress gatewaySnatIp = getRouterSnatIpAddress(kubevirtRouterService, kubevirtPort.networkId());
725 if (gatewaySnatIp == null) {
726 return;
727 }
Daniel Parkf3136042021-03-10 07:49:11 +0900728 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Park2884b232021-03-04 18:58:47 +0900729 }
730 }
731
732 private void processPortUpdate(KubevirtPort kubevirtPort) {
733 if (!isRelevantHelper()) {
734 return;
735 }
736
Daniel Parkbabde9c2021-03-09 13:37:42 +0900737 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Park2884b232021-03-04 18:58:47 +0900738 if (router == null) {
739 return;
740 }
741
742 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
743
744 if (gwNode != null) {
Daniel Parkf3136042021-03-10 07:49:11 +0900745 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Park2884b232021-03-04 18:58:47 +0900746 }
747 }
748
749 private void processPortDeletion(KubevirtPort kubevirtPort) {
750 if (!isRelevantHelper()) {
751 return;
752 }
753
Daniel Parkbabde9c2021-03-09 13:37:42 +0900754 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Park2884b232021-03-04 18:58:47 +0900755 if (router == null) {
756 return;
757 }
758
759 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
760
761 if (gwNode != null) {
Daniel Parkf3136042021-03-10 07:49:11 +0900762 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, false);
Daniel Park2884b232021-03-04 18:58:47 +0900763 }
764 }
Daniel Park2884b232021-03-04 18:58:47 +0900765 }
Jian Li517597a2021-03-22 11:04:52 +0900766
767 private class InternalNodeEventListener implements KubevirtNodeListener {
768
769 @Override
770 public void event(KubevirtNodeEvent event) {
771
772 }
773 }
Daniel Park2884b232021-03-04 18:58:47 +0900774}