blob: 20fa0cb2636a7b090deaa1617ca63a21a126fe2d [file] [log] [blame]
Daniel Park2884b232021-03-04 18:58:47 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import org.onlab.packet.ARP;
19import org.onlab.packet.EthType;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.Ip4Address;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.IpPrefix;
24import org.onlab.packet.MacAddress;
25import org.onlab.packet.TpPort;
Daniel Parkf3136042021-03-10 07:49:11 +090026import org.onlab.packet.VlanId;
Daniel Park2884b232021-03-04 18:58:47 +090027import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
33import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
Daniel Park2884b232021-03-04 18:58:47 +090034import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
35import org.onosproject.kubevirtnetworking.api.KubevirtPort;
36import org.onosproject.kubevirtnetworking.api.KubevirtPortEvent;
37import org.onosproject.kubevirtnetworking.api.KubevirtPortListener;
38import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
39import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
40import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
41import org.onosproject.kubevirtnetworking.api.KubevirtRouterListener;
42import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
43import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
44import org.onosproject.kubevirtnode.api.KubevirtNode;
Jian Li517597a2021-03-22 11:04:52 +090045import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
46import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
Daniel Park2884b232021-03-04 18:58:47 +090047import org.onosproject.kubevirtnode.api.KubevirtNodeService;
48import org.onosproject.net.Device;
Daniel Park2884b232021-03-04 18:58:47 +090049import org.onosproject.net.PortNumber;
50import org.onosproject.net.device.DeviceAdminService;
51import org.onosproject.net.driver.DriverService;
52import org.onosproject.net.flow.DefaultTrafficSelector;
53import org.onosproject.net.flow.DefaultTrafficTreatment;
54import org.onosproject.net.flow.TrafficSelector;
55import org.onosproject.net.flow.TrafficTreatment;
56import org.onosproject.net.flow.instructions.ExtensionTreatment;
57import org.osgi.service.component.annotations.Activate;
58import org.osgi.service.component.annotations.Component;
59import org.osgi.service.component.annotations.Deactivate;
60import org.osgi.service.component.annotations.Reference;
61import org.osgi.service.component.annotations.ReferenceCardinality;
62import org.slf4j.Logger;
63
64import java.util.Objects;
65import java.util.Set;
66import java.util.concurrent.ExecutorService;
67
68import static java.util.concurrent.Executors.newSingleThreadExecutor;
69import static org.onlab.util.Tools.groupedThreads;
70import static org.onosproject.kubevirtnetworking.api.Constants.DEFAULT_GATEWAY_MAC;
71import static org.onosproject.kubevirtnetworking.api.Constants.FLAT_TABLE;
72import static org.onosproject.kubevirtnetworking.api.Constants.FORWARDING_TABLE;
73import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
74import static org.onosproject.kubevirtnetworking.api.Constants.PRE_FLAT_TABLE;
75import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
Daniel Parkf3136042021-03-10 07:49:11 +090076import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FORWARDING_RULE;
Daniel Park2884b232021-03-04 18:58:47 +090077import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_STATEFUL_SNAT_RULE;
Daniel Parkf3136042021-03-10 07:49:11 +090078import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
79import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GENEVE;
80import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GRE;
81import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
82import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VXLAN;
83import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.externalPatchPortNum;
Daniel Park2884b232021-03-04 18:58:47 +090084import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.gatewayNodeForSpecifiedRouter;
Daniel Parkf3136042021-03-10 07:49:11 +090085import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getExternalNetworkByRouter;
Daniel Parkbabde9c2021-03-09 13:37:42 +090086import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterForKubevirtPort;
Daniel Parkf3136042021-03-10 07:49:11 +090087import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterSnatIpAddress;
88import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterMacAddress;
89import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
Daniel Park2884b232021-03-04 18:58:47 +090090import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.CT_NAT_SRC_FLAG;
Daniel Parkf3136042021-03-10 07:49:11 +090091import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
Daniel Park2884b232021-03-04 18:58:47 +090092import static org.slf4j.LoggerFactory.getLogger;
93
94/**
95 * Handles kubevirt routing snat.
96 */
97
98@Component(immediate = true)
99public class KubevirtRoutingSnatHandler {
100 protected final Logger log = getLogger(getClass());
101 private static final int DEFAULT_TTL = 0xff;
102
103 private static final int TP_PORT_MINIMUM_NUM = 1025;
104 private static final int TP_PORT_MAXIMUM_NUM = 65535;
105
106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected CoreService coreService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected ClusterService clusterService;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected LeadershipService leadershipService;
114
115 @Reference(cardinality = ReferenceCardinality.MANDATORY)
116 protected DeviceAdminService deviceService;
117
118 @Reference(cardinality = ReferenceCardinality.MANDATORY)
119 protected KubevirtPortService kubevirtPortService;
120
121 @Reference(cardinality = ReferenceCardinality.MANDATORY)
122 protected KubevirtNodeService kubevirtNodeService;
123
124 @Reference(cardinality = ReferenceCardinality.MANDATORY)
125 protected KubevirtNetworkService kubevirtNetworkService;
126
127 @Reference(cardinality = ReferenceCardinality.MANDATORY)
128 protected KubevirtFlowRuleService flowService;
129
130 @Reference(cardinality = ReferenceCardinality.MANDATORY)
131 protected DriverService driverService;
132
133 @Reference(cardinality = ReferenceCardinality.MANDATORY)
134 protected KubevirtRouterService kubevirtRouterService;
135
136 private final ExecutorService eventExecutor = newSingleThreadExecutor(
137 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
138
139 private final InternalKubevirtPortListener kubevirtPortListener =
140 new InternalKubevirtPortListener();
141
142 private final InternalRouterEventListener kubevirtRouterlistener =
143 new InternalRouterEventListener();
144
Jian Li517597a2021-03-22 11:04:52 +0900145 private final InternalNodeEventListener kubevirtNodeListener =
146 new InternalNodeEventListener();
147
Daniel Park2884b232021-03-04 18:58:47 +0900148 private ApplicationId appId;
149 private NodeId localNodeId;
150
151 @Activate
152 protected void activate() {
153 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
154 localNodeId = clusterService.getLocalNode().id();
155 leadershipService.runForLeadership(appId.name());
156
157 kubevirtPortService.addListener(kubevirtPortListener);
158 kubevirtRouterService.addListener(kubevirtRouterlistener);
Jian Li517597a2021-03-22 11:04:52 +0900159 kubevirtNodeService.addListener(kubevirtNodeListener);
Daniel Park2884b232021-03-04 18:58:47 +0900160
161 log.info("Started");
162 }
163
164 @Deactivate
165 protected void deactivate() {
166 leadershipService.withdraw(appId.name());
Jian Li517597a2021-03-22 11:04:52 +0900167 kubevirtNodeService.removeListener(kubevirtNodeListener);
Daniel Park2884b232021-03-04 18:58:47 +0900168 kubevirtPortService.removeListener(kubevirtPortListener);
169 kubevirtRouterService.removeListener(kubevirtRouterlistener);
Daniel Park2884b232021-03-04 18:58:47 +0900170
171 eventExecutor.shutdown();
172
173 log.info("Stopped");
174 }
175
176 private void initGatewayNodeSnatForRouter(KubevirtRouter router, boolean install) {
177 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
178
179 if (electedGw == null) {
180 log.warn("Fail to initialize gateway node snat for router {} " +
181 "there's no gateway assigned to it", router.name());
182 return;
183 }
184
185 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
186
187 if (routerSnatIp == null) {
188 log.warn("Fail to initialize gateway node snat for router {} " +
189 "there's no gateway snat ip assigned to it", router.name());
190 return;
191 }
192
193 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(routerSnatIp), install);
194 setStatefulSnatUpstreamRules(electedGw, router, Ip4Address.valueOf(routerSnatIp), install);
Daniel Parkf3136042021-03-10 07:49:11 +0900195 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(routerSnatIp), install);
Daniel Park2884b232021-03-04 18:58:47 +0900196 }
197
198 private void setArpResponseToPeerRouter(KubevirtNode gatewayNode, Ip4Address ip4Address, boolean install) {
199
200 TrafficSelector selector = DefaultTrafficSelector.builder()
Daniel Parkf3136042021-03-10 07:49:11 +0900201 .matchInPort(externalPatchPortNum(deviceService, gatewayNode))
Daniel Park2884b232021-03-04 18:58:47 +0900202 .matchEthType(EthType.EtherType.ARP.ethType().toShort())
203 .matchArpOp(ARP.OP_REQUEST)
204 .matchArpTpa(ip4Address)
205 .build();
206
207 Device device = deviceService.getDevice(gatewayNode.intgBridge());
208
209 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
210 .extension(RulePopulatorUtil.buildMoveEthSrcToDstExtension(device), device.id())
211 .extension(RulePopulatorUtil.buildMoveArpShaToThaExtension(device), device.id())
212 .extension(RulePopulatorUtil.buildMoveArpSpaToTpaExtension(device), device.id())
213 .setArpOp(ARP.OP_REPLY)
214 .setEthSrc(DEFAULT_GATEWAY_MAC)
215 .setArpSha(DEFAULT_GATEWAY_MAC)
216 .setArpSpa(ip4Address)
217 .setOutput(PortNumber.IN_PORT)
218 .build();
219
220 flowService.setRule(
221 appId,
222 gatewayNode.intgBridge(),
223 selector,
224 treatment,
225 PRIORITY_ARP_GATEWAY_RULE,
226 PRE_FLAT_TABLE,
227 install);
228 }
229
Daniel Parkf3136042021-03-10 07:49:11 +0900230 private void setStatefulSnatUpstreamRules(KubevirtNode gatewayNode,
231 KubevirtRouter router,
232 Ip4Address ip4Address,
233 boolean install) {
234 MacAddress routerMacAddress = getRouterMacAddress(router);
235 if (routerMacAddress == null) {
236 return;
237 }
238 MacAddress peerRouterMacAddres = router.peerRouter().macAddress();
239 if (peerRouterMacAddres == null) {
240 return;
241 }
Daniel Park2884b232021-03-04 18:58:47 +0900242
Daniel Parkf3136042021-03-10 07:49:11 +0900243 TrafficSelector.Builder selector = DefaultTrafficSelector.builder()
Daniel Park2884b232021-03-04 18:58:47 +0900244 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parkf3136042021-03-10 07:49:11 +0900245 .matchEthDst(routerMacAddress);
Daniel Park2884b232021-03-04 18:58:47 +0900246
247 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
248
249 ExtensionTreatment natTreatment = RulePopulatorUtil
250 .niciraConnTrackTreatmentBuilder(driverService, gatewayNode.intgBridge())
251 .commit(true)
252 .natFlag(CT_NAT_SRC_FLAG)
253 .natAction(true)
254 .natIp(ip4Address)
255 .natPortMin(TpPort.tpPort(TP_PORT_MINIMUM_NUM))
256 .natPortMax(TpPort.tpPort(TP_PORT_MAXIMUM_NUM))
257 .build();
258
259 tBuilder.extension(natTreatment, gatewayNode.intgBridge())
Daniel Parkf3136042021-03-10 07:49:11 +0900260 .setEthDst(peerRouterMacAddres)
Daniel Park2884b232021-03-04 18:58:47 +0900261 .setEthSrc(DEFAULT_GATEWAY_MAC)
Daniel Parkf3136042021-03-10 07:49:11 +0900262 .setOutput(externalPatchPortNum(deviceService, gatewayNode));
Daniel Park2884b232021-03-04 18:58:47 +0900263
264 flowService.setRule(
265 appId,
266 gatewayNode.intgBridge(),
Daniel Parkf3136042021-03-10 07:49:11 +0900267 selector.build(),
Daniel Park2884b232021-03-04 18:58:47 +0900268 tBuilder.build(),
269 PRIORITY_STATEFUL_SNAT_RULE,
Daniel Parkf3136042021-03-10 07:49:11 +0900270 PRE_FLAT_TABLE,
Daniel Park2884b232021-03-04 18:58:47 +0900271 install);
272 }
273
Daniel Parkf3136042021-03-10 07:49:11 +0900274 private void setStatefulSnatDownStreamRuleForKubevirtPort(KubevirtRouter router,
275 KubevirtNode gatewayNode,
276 KubevirtPort kubevirtPort,
277 boolean install) {
278 MacAddress routerMacAddress = getRouterMacAddress(router);
Daniel Park2884b232021-03-04 18:58:47 +0900279
Daniel Parkf3136042021-03-10 07:49:11 +0900280 if (routerMacAddress == null) {
Daniel Park2884b232021-03-04 18:58:47 +0900281 log.error("Failed to set stateful snat downstream rule because " +
282 "there's no br-int port for device {}", gatewayNode.intgBridge());
283 return;
284 }
285
286 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
287 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parkf3136042021-03-10 07:49:11 +0900288 .matchEthSrc(routerMacAddress)
Daniel Park2884b232021-03-04 18:58:47 +0900289 .matchIPDst(IpPrefix.valueOf(kubevirtPort.ipAddress(), 32));
290
Daniel Parkf3136042021-03-10 07:49:11 +0900291 KubevirtNetwork network = kubevirtNetworkService.network(kubevirtPort.networkId());
292
293 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Daniel Park2884b232021-03-04 18:58:47 +0900294 .setEthDst(kubevirtPort.macAddress())
Daniel Parkf3136042021-03-10 07:49:11 +0900295 .transition(FORWARDING_TABLE);
Daniel Park2884b232021-03-04 18:58:47 +0900296
297 flowService.setRule(
298 appId,
299 gatewayNode.intgBridge(),
300 sBuilder.build(),
Daniel Parkf3136042021-03-10 07:49:11 +0900301 tBuilder.build(),
Daniel Park2884b232021-03-04 18:58:47 +0900302 PRIORITY_STATEFUL_SNAT_RULE,
303 FLAT_TABLE,
304 install);
Daniel Parkf3136042021-03-10 07:49:11 +0900305
306 if (network.type() == VXLAN || network.type() == GENEVE || network.type() == GRE) {
307 setDownStreamRulesToGatewayTunBridge(router, network, kubevirtPort, install);
308 }
Daniel Park2884b232021-03-04 18:58:47 +0900309 }
310
Daniel Parkf3136042021-03-10 07:49:11 +0900311 private void setDownStreamRulesToGatewayTunBridge(KubevirtRouter router,
312 KubevirtNetwork network,
313 KubevirtPort port, boolean install) {
314 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
Daniel Park2884b232021-03-04 18:58:47 +0900315
Daniel Parkf3136042021-03-10 07:49:11 +0900316 if (electedGw == null) {
317 return;
318 }
Daniel Park2884b232021-03-04 18:58:47 +0900319
Daniel Parkf3136042021-03-10 07:49:11 +0900320 KubevirtNode workerNode = kubevirtNodeService.node(port.deviceId());
321 if (workerNode == null) {
322 return;
323 }
324
325 PortNumber tunnelPortNumber = tunnelPort(electedGw, network);
326 if (tunnelPortNumber == null) {
Daniel Park2884b232021-03-04 18:58:47 +0900327 return;
328 }
329
330 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
331 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parkf3136042021-03-10 07:49:11 +0900332 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32))
333 .matchEthDst(port.macAddress());
334
335 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
336 .setTunnelId(Long.parseLong(network.segmentId()))
337 .extension(buildExtension(
338 deviceService,
339 electedGw.tunBridge(),
340 workerNode.dataIp().getIp4Address()),
341 electedGw.tunBridge())
342 .setOutput(tunnelPortNumber);
343
344 flowService.setRule(
345 appId,
346 electedGw.tunBridge(),
347 sBuilder.build(),
348 tBuilder.build(),
349 PRIORITY_FORWARDING_RULE,
350 TUNNEL_DEFAULT_TABLE,
351 install);
352 }
353
354 private void setStatefulSnatDownstreamRuleForRouter(KubevirtNode gatewayNode,
355 KubevirtRouter router,
356 IpAddress gatewaySnatIp,
357 boolean install) {
358
359 MacAddress routerMacAddress = getRouterMacAddress(router);
360
361 if (routerMacAddress == null) {
362 log.warn("Failed to set stateful snat downstream rule because " +
363 "there's no br-int port for device {}", gatewayNode.intgBridge());
364 return;
365 }
366
367 KubevirtNetwork externalNetwork = getExternalNetworkByRouter(kubevirtNetworkService, router);
368
369 if (externalNetwork == null) {
370 log.warn("Failed to set stateful snat downstream rule because " +
371 "there's no external network router {}", router.name());
372 return;
373 }
374
375 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
376 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
377
378 if (externalNetwork.type() == VLAN) {
379 sBuilder.matchEthType(Ethernet.TYPE_VLAN)
380 .matchVlanId(VlanId.vlanId(externalNetwork.segmentId()));
381 tBuilder.popVlan();
382 } else {
383 sBuilder.matchEthType(Ethernet.TYPE_IPV4);
384 }
385
386 sBuilder.matchIPDst(IpPrefix.valueOf(gatewaySnatIp, 32));
Daniel Park2884b232021-03-04 18:58:47 +0900387
388 ExtensionTreatment natTreatment = RulePopulatorUtil
389 .niciraConnTrackTreatmentBuilder(driverService, gatewayNode.intgBridge())
390 .commit(false)
391 .natAction(true)
Daniel Parkbabde9c2021-03-09 13:37:42 +0900392 .table((short) FLAT_TABLE)
Daniel Park2884b232021-03-04 18:58:47 +0900393 .build();
394
Daniel Parkf3136042021-03-10 07:49:11 +0900395 tBuilder.setEthSrc(routerMacAddress)
396 .extension(natTreatment, gatewayNode.intgBridge());
Daniel Park2884b232021-03-04 18:58:47 +0900397
398 flowService.setRule(
399 appId,
400 gatewayNode.intgBridge(),
401 sBuilder.build(),
Daniel Parkf3136042021-03-10 07:49:11 +0900402 tBuilder.build(),
Daniel Park2884b232021-03-04 18:58:47 +0900403 PRIORITY_STATEFUL_SNAT_RULE,
Daniel Parkf3136042021-03-10 07:49:11 +0900404 PRE_FLAT_TABLE,
Daniel Park2884b232021-03-04 18:58:47 +0900405 install);
Daniel Park2884b232021-03-04 18:58:47 +0900406 }
407
408 private class InternalRouterEventListener implements KubevirtRouterListener {
409 private boolean isRelevantHelper() {
410 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
411 }
412
413 @Override
414 public void event(KubevirtRouterEvent event) {
415 switch (event.type()) {
416 case KUBEVIRT_ROUTER_CREATED:
417 eventExecutor.execute(() -> processRouterCreation(event.subject()));
418 break;
419 case KUBEVIRT_ROUTER_REMOVED:
420 eventExecutor.execute(() -> processRouterDeletion(event.subject()));
421 break;
422 case KUBEVIRT_ROUTER_UPDATED:
423 eventExecutor.execute(() -> processRouterUpdate(event.subject()));
424 break;
425 case KUBEVIRT_ROUTER_INTERNAL_NETWORKS_ATTACHED:
426 eventExecutor.execute(() -> processRouterInternalNetworksAttached(event.subject(),
427 event.internal()));
428 break;
429 case KUBEVIRT_ROUTER_INTERNAL_NETWORKS_DETACHED:
430 eventExecutor.execute(() -> processRouterInternalNetworksDetached(event.subject(),
431 event.internal()));
432 break;
Daniel Parkf3136042021-03-10 07:49:11 +0900433 case KUBEVIRT_GATEWAY_NODE_ATTACHED:
434 eventExecutor.execute(() -> processRouterGatewayNodeAttached(event.subject(),
435 event.gateway()));
436 break;
437 case KUBEVIRT_GATEWAY_NODE_DETACHED:
438 eventExecutor.execute(() -> processRouterGatewayNodeDetached(event.subject(),
439 event.gateway()));
440 break;
441 case KUBEVIRT_ROUTER_EXTERNAL_NETWORK_ATTACHED:
442 eventExecutor.execute(() -> processRouterExternalNetAttached(event.subject(), event.externalIp()));
443 break;
444 case KUBEVIRT_ROUTER_EXTERNAL_NETWORK_DETACHED:
445 eventExecutor.execute(() -> processRouterExternalNetDetached(event.subject(), event.externalIp()));
446 break;
Daniel Park2884b232021-03-04 18:58:47 +0900447 default:
448 //do nothing
449 break;
450 }
451 }
Daniel Parkf3136042021-03-10 07:49:11 +0900452
453 private void processRouterExternalNetAttached(KubevirtRouter router, String externalIp) {
454 if (!isRelevantHelper()) {
455 return;
456 }
457 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
458
459 if (electedGw == null) {
460 log.warn("Fail to process router external network attached gateway node snat for router {} " +
461 "there's no gateway assigned to it", router.name());
462 return;
463 }
464
465 if (router.enableSnat() && router.peerRouter() != null && externalIp != null) {
466 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(externalIp), true);
467 setStatefulSnatUpstreamRules(electedGw, router, Ip4Address.valueOf(externalIp), true);
468 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(externalIp), true);
469 }
470
471 router.internal()
472 .stream()
473 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
474 .map(kubevirtNetworkService::network)
475 .forEach(network -> {
476 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
477 if (routerSnatIp == null) {
478 return;
479 }
480 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
481 setStatefulSnatDownStreamRuleForKubevirtPort(router,
482 electedGw, kubevirtPort, true);
483 });
484 });
485 }
486
487 private void processRouterExternalNetDetached(KubevirtRouter router, String externalIp) {
488 if (!isRelevantHelper()) {
489 return;
490 }
491 if (!isRelevantHelper()) {
492 return;
493 }
494 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
495
496 if (electedGw == null) {
497 log.warn("Fail to process router external network attached gateway node snat for router {} " +
498 "there's no gateway assigned to it", router.name());
499 return;
500 }
501
502 if (router.enableSnat() && router.peerRouter() != null && externalIp != null) {
503 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(externalIp), false);
504 setStatefulSnatUpstreamRules(electedGw, router, Ip4Address.valueOf(externalIp), false);
505 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(externalIp), false);
506 }
507
508 router.internal()
509 .stream()
510 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
511 .map(kubevirtNetworkService::network)
512 .forEach(network -> {
513 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
514 if (routerSnatIp == null) {
515 return;
516 }
517 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
518 setStatefulSnatDownStreamRuleForKubevirtPort(router,
519 electedGw, kubevirtPort, false);
520 });
521 });
522 }
523
524 private void processRouterGatewayNodeAttached(KubevirtRouter router, String attachedGatewayId) {
525 if (!isRelevantHelper()) {
526 return;
527 }
528
529 KubevirtNode attachedGateway = kubevirtNodeService.node(attachedGatewayId);
530 if (attachedGateway == null) {
531 return;
532 }
533
534 router.internal()
535 .stream()
536 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
537 .map(kubevirtNetworkService::network)
538 .forEach(network -> {
539 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
540 if (routerSnatIp == null) {
541 return;
542 }
543 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
544 setStatefulSnatDownStreamRuleForKubevirtPort(router,
545 attachedGateway, kubevirtPort, true);
546 });
547 });
548 }
549
550 private void processRouterGatewayNodeDetached(KubevirtRouter router, String detachedGatewayId) {
551 if (!isRelevantHelper()) {
552 return;
553 }
554
555 KubevirtNode detachedGateway = kubevirtNodeService.node(detachedGatewayId);
556 if (detachedGateway == null) {
557 return;
558 }
559
560 router.internal()
561 .stream()
562 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
563 .map(kubevirtNetworkService::network)
564 .forEach(network -> {
565 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
566 if (routerSnatIp == null) {
567 return;
568 }
569
570 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
571 setStatefulSnatDownStreamRuleForKubevirtPort(router,
572 detachedGateway, kubevirtPort, false);
573 });
574 });
575 }
576
Daniel Park2884b232021-03-04 18:58:47 +0900577 private void processRouterInternalNetworksAttached(KubevirtRouter router,
578 Set<String> attachedInternalNetworks) {
579 if (!isRelevantHelper()) {
580 return;
581 }
582
583 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
584 if (gwNode == null) {
585 return;
586 }
587
588 attachedInternalNetworks.forEach(networkId -> {
Daniel Parkf3136042021-03-10 07:49:11 +0900589 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
590 if (routerSnatIp == null) {
591 return;
592 }
593
Daniel Park2884b232021-03-04 18:58:47 +0900594 kubevirtPortService.ports(networkId).forEach(kubevirtPort -> {
Daniel Parkf3136042021-03-10 07:49:11 +0900595 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Park2884b232021-03-04 18:58:47 +0900596 });
597 });
598 }
599
600 private void processRouterInternalNetworksDetached(KubevirtRouter router,
601 Set<String> detachedInternalNetworks) {
602 if (!isRelevantHelper()) {
603 return;
604 }
605
606 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
607 if (gwNode == null) {
608 return;
609 }
610
611 detachedInternalNetworks.forEach(networkId -> {
Daniel Parkf3136042021-03-10 07:49:11 +0900612 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
613 if (routerSnatIp == null) {
614 log.info("snatIp is null");
615 return;
616 }
617
Daniel Park2884b232021-03-04 18:58:47 +0900618 kubevirtPortService.ports(networkId).forEach(kubevirtPort -> {
Daniel Parkf3136042021-03-10 07:49:11 +0900619 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, false);
Daniel Park2884b232021-03-04 18:58:47 +0900620 });
621 });
622 }
623 private void processRouterCreation(KubevirtRouter router) {
624 if (!isRelevantHelper()) {
625 return;
626 }
627 if (router.enableSnat() && !router.external().isEmpty() && router.peerRouter() != null) {
628 initGatewayNodeSnatForRouter(router, true);
629 }
630 }
631
632 private void processRouterDeletion(KubevirtRouter router) {
633 if (!isRelevantHelper()) {
634 return;
635 }
636 if (router.enableSnat() && !router.external().isEmpty() && router.peerRouter() != null) {
637 initGatewayNodeSnatForRouter(router, false);
638 }
639 }
640
641 private void processRouterUpdate(KubevirtRouter router) {
642 if (!isRelevantHelper()) {
643 return;
644 }
645 if (router.enableSnat() && !router.external().isEmpty() && router.peerRouter() != null) {
646 initGatewayNodeSnatForRouter(router, true);
647 }
648 }
649 }
650
Daniel Park2884b232021-03-04 18:58:47 +0900651 private class InternalKubevirtPortListener implements KubevirtPortListener {
652
653 private boolean isRelevantHelper() {
654 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
655 }
656
657 @Override
658 public void event(KubevirtPortEvent event) {
659 switch (event.type()) {
660 case KUBEVIRT_PORT_CREATED:
661 eventExecutor.execute(() -> processPortCreation(event.subject()));
662 break;
663 case KUBEVIRT_PORT_UPDATED:
664 eventExecutor.execute(() -> processPortUpdate(event.subject()));
665 break;
666 case KUBEVIRT_PORT_REMOVED:
667 eventExecutor.execute(() -> processPortDeletion(event.subject()));
668 break;
669 default:
670 //do nothing
671 break;
672 }
673 }
674
675 private void processPortCreation(KubevirtPort kubevirtPort) {
676 if (!isRelevantHelper()) {
677 return;
678 }
679
Daniel Parkbabde9c2021-03-09 13:37:42 +0900680 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Park2884b232021-03-04 18:58:47 +0900681 if (router == null) {
682 return;
683 }
684
685 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
686
687 if (gwNode != null) {
688 IpAddress gatewaySnatIp = getRouterSnatIpAddress(kubevirtRouterService, kubevirtPort.networkId());
689 if (gatewaySnatIp == null) {
690 return;
691 }
Daniel Parkf3136042021-03-10 07:49:11 +0900692 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Park2884b232021-03-04 18:58:47 +0900693 }
694 }
695
696 private void processPortUpdate(KubevirtPort kubevirtPort) {
697 if (!isRelevantHelper()) {
698 return;
699 }
700
Daniel Parkbabde9c2021-03-09 13:37:42 +0900701 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Park2884b232021-03-04 18:58:47 +0900702 if (router == null) {
703 return;
704 }
705
706 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
707
708 if (gwNode != null) {
Daniel Parkf3136042021-03-10 07:49:11 +0900709 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Park2884b232021-03-04 18:58:47 +0900710 }
711 }
712
713 private void processPortDeletion(KubevirtPort kubevirtPort) {
714 if (!isRelevantHelper()) {
715 return;
716 }
717
Daniel Parkbabde9c2021-03-09 13:37:42 +0900718 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Park2884b232021-03-04 18:58:47 +0900719 if (router == null) {
720 return;
721 }
722
723 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
724
725 if (gwNode != null) {
Daniel Parkf3136042021-03-10 07:49:11 +0900726 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, false);
Daniel Park2884b232021-03-04 18:58:47 +0900727 }
728 }
Daniel Park2884b232021-03-04 18:58:47 +0900729 }
Jian Li517597a2021-03-22 11:04:52 +0900730
731 private class InternalNodeEventListener implements KubevirtNodeListener {
732
733 @Override
734 public void event(KubevirtNodeEvent event) {
735
736 }
737 }
Daniel Park2884b232021-03-04 18:58:47 +0900738}