blob: c34103712a0fca44f4a08ae3a86054846ebe29db [file] [log] [blame]
Daniel Parkb9a22022021-03-04 18:58:47 +09001/*
2 * Copyright 2021-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import org.onlab.packet.ARP;
19import org.onlab.packet.EthType;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.Ip4Address;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.IpPrefix;
24import org.onlab.packet.MacAddress;
25import org.onlab.packet.TpPort;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090026import org.onlab.packet.VlanId;
Daniel Parkb9a22022021-03-04 18:58:47 +090027import org.onosproject.cluster.ClusterService;
28import org.onosproject.cluster.LeadershipService;
29import org.onosproject.cluster.NodeId;
30import org.onosproject.core.ApplicationId;
31import org.onosproject.core.CoreService;
32import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
33import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
Daniel Parkb9a22022021-03-04 18:58:47 +090034import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
35import org.onosproject.kubevirtnetworking.api.KubevirtPort;
36import org.onosproject.kubevirtnetworking.api.KubevirtPortEvent;
37import org.onosproject.kubevirtnetworking.api.KubevirtPortListener;
38import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
39import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
40import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
41import org.onosproject.kubevirtnetworking.api.KubevirtRouterListener;
42import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
43import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
44import org.onosproject.kubevirtnode.api.KubevirtNode;
Jian Li91358d62021-03-22 11:04:52 +090045import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
46import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
Daniel Parkb9a22022021-03-04 18:58:47 +090047import org.onosproject.kubevirtnode.api.KubevirtNodeService;
48import org.onosproject.net.Device;
Daniel Parkb9a22022021-03-04 18:58:47 +090049import org.onosproject.net.PortNumber;
50import org.onosproject.net.device.DeviceAdminService;
51import org.onosproject.net.driver.DriverService;
52import org.onosproject.net.flow.DefaultTrafficSelector;
53import org.onosproject.net.flow.DefaultTrafficTreatment;
54import org.onosproject.net.flow.TrafficSelector;
55import org.onosproject.net.flow.TrafficTreatment;
56import org.onosproject.net.flow.instructions.ExtensionTreatment;
57import org.osgi.service.component.annotations.Activate;
58import org.osgi.service.component.annotations.Component;
59import org.osgi.service.component.annotations.Deactivate;
60import org.osgi.service.component.annotations.Reference;
61import org.osgi.service.component.annotations.ReferenceCardinality;
62import org.slf4j.Logger;
63
64import java.util.Objects;
65import java.util.Set;
66import java.util.concurrent.ExecutorService;
67
68import static java.util.concurrent.Executors.newSingleThreadExecutor;
69import static org.onlab.util.Tools.groupedThreads;
70import static org.onosproject.kubevirtnetworking.api.Constants.DEFAULT_GATEWAY_MAC;
71import static org.onosproject.kubevirtnetworking.api.Constants.FLAT_TABLE;
72import static org.onosproject.kubevirtnetworking.api.Constants.FORWARDING_TABLE;
73import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
74import static org.onosproject.kubevirtnetworking.api.Constants.PRE_FLAT_TABLE;
75import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090076import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FORWARDING_RULE;
Daniel Parkb9a22022021-03-04 18:58:47 +090077import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_STATEFUL_SNAT_RULE;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090078import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
79import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GENEVE;
80import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GRE;
81import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
82import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VXLAN;
83import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.externalPatchPortNum;
Daniel Parkb9a22022021-03-04 18:58:47 +090084import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.gatewayNodeForSpecifiedRouter;
Daniel Parkcc8e7462021-03-09 13:37:42 +090085import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterForKubevirtPort;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090086import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterMacAddress;
Daniel Park54205272021-03-23 08:00:00 +090087import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterSnatIpAddress;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090088import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
Daniel Parkb9a22022021-03-04 18:58:47 +090089import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.CT_NAT_SRC_FLAG;
Daniel Parke7e3d6a2021-03-10 07:49:11 +090090import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
Daniel Parkb9a22022021-03-04 18:58:47 +090091import static org.slf4j.LoggerFactory.getLogger;
92
93/**
94 * Handles kubevirt routing snat.
95 */
96
97@Component(immediate = true)
98public class KubevirtRoutingSnatHandler {
99 protected final Logger log = getLogger(getClass());
100 private static final int DEFAULT_TTL = 0xff;
101
102 private static final int TP_PORT_MINIMUM_NUM = 1025;
103 private static final int TP_PORT_MAXIMUM_NUM = 65535;
104
105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected CoreService coreService;
107
108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected ClusterService clusterService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected LeadershipService leadershipService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected DeviceAdminService deviceService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected KubevirtPortService kubevirtPortService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected KubevirtNodeService kubevirtNodeService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
124 protected KubevirtNetworkService kubevirtNetworkService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected KubevirtFlowRuleService flowService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
130 protected DriverService driverService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
133 protected KubevirtRouterService kubevirtRouterService;
134
135 private final ExecutorService eventExecutor = newSingleThreadExecutor(
136 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
137
138 private final InternalKubevirtPortListener kubevirtPortListener =
139 new InternalKubevirtPortListener();
140
141 private final InternalRouterEventListener kubevirtRouterlistener =
142 new InternalRouterEventListener();
143
Jian Li91358d62021-03-22 11:04:52 +0900144 private final InternalNodeEventListener kubevirtNodeListener =
145 new InternalNodeEventListener();
146
Daniel Parkb9a22022021-03-04 18:58:47 +0900147 private ApplicationId appId;
148 private NodeId localNodeId;
149
150 @Activate
151 protected void activate() {
152 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
153 localNodeId = clusterService.getLocalNode().id();
154 leadershipService.runForLeadership(appId.name());
155
156 kubevirtPortService.addListener(kubevirtPortListener);
157 kubevirtRouterService.addListener(kubevirtRouterlistener);
Jian Li91358d62021-03-22 11:04:52 +0900158 kubevirtNodeService.addListener(kubevirtNodeListener);
Daniel Parkb9a22022021-03-04 18:58:47 +0900159
160 log.info("Started");
161 }
162
163 @Deactivate
164 protected void deactivate() {
165 leadershipService.withdraw(appId.name());
Jian Li91358d62021-03-22 11:04:52 +0900166 kubevirtNodeService.removeListener(kubevirtNodeListener);
Daniel Parkb9a22022021-03-04 18:58:47 +0900167 kubevirtPortService.removeListener(kubevirtPortListener);
168 kubevirtRouterService.removeListener(kubevirtRouterlistener);
Daniel Parkb9a22022021-03-04 18:58:47 +0900169
170 eventExecutor.shutdown();
171
172 log.info("Stopped");
173 }
174
175 private void initGatewayNodeSnatForRouter(KubevirtRouter router, boolean install) {
176 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
177
178 if (electedGw == null) {
179 log.warn("Fail to initialize gateway node snat for router {} " +
180 "there's no gateway assigned to it", router.name());
181 return;
182 }
183
184 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
185
186 if (routerSnatIp == null) {
187 log.warn("Fail to initialize gateway node snat for router {} " +
188 "there's no gateway snat ip assigned to it", router.name());
189 return;
190 }
191
Daniel Park54205272021-03-23 08:00:00 +0900192 String externalNet = router.external().values().stream().findAny().orElse(null);
193 if (externalNet == null) {
194 return;
195 }
196
197 if (router.peerRouter() != null &&
198 router.peerRouter().ipAddress() != null && router.peerRouter().macAddress() != null) {
199 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(routerSnatIp), install);
200 setStatefulSnatUpstreamRules(electedGw, router, Ip4Address.valueOf(routerSnatIp),
201 router.peerRouter().macAddress(), install);
202 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(routerSnatIp),
203 kubevirtNetworkService.network(externalNet), install);
204 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900205 }
206
207 private void setArpResponseToPeerRouter(KubevirtNode gatewayNode, Ip4Address ip4Address, boolean install) {
208
209 TrafficSelector selector = DefaultTrafficSelector.builder()
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900210 .matchInPort(externalPatchPortNum(deviceService, gatewayNode))
Daniel Parkb9a22022021-03-04 18:58:47 +0900211 .matchEthType(EthType.EtherType.ARP.ethType().toShort())
212 .matchArpOp(ARP.OP_REQUEST)
213 .matchArpTpa(ip4Address)
214 .build();
215
216 Device device = deviceService.getDevice(gatewayNode.intgBridge());
217
218 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
219 .extension(RulePopulatorUtil.buildMoveEthSrcToDstExtension(device), device.id())
220 .extension(RulePopulatorUtil.buildMoveArpShaToThaExtension(device), device.id())
221 .extension(RulePopulatorUtil.buildMoveArpSpaToTpaExtension(device), device.id())
222 .setArpOp(ARP.OP_REPLY)
223 .setEthSrc(DEFAULT_GATEWAY_MAC)
224 .setArpSha(DEFAULT_GATEWAY_MAC)
225 .setArpSpa(ip4Address)
226 .setOutput(PortNumber.IN_PORT)
227 .build();
228
229 flowService.setRule(
230 appId,
231 gatewayNode.intgBridge(),
232 selector,
233 treatment,
234 PRIORITY_ARP_GATEWAY_RULE,
235 PRE_FLAT_TABLE,
236 install);
237 }
238
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900239 private void setStatefulSnatUpstreamRules(KubevirtNode gatewayNode,
240 KubevirtRouter router,
Daniel Park54205272021-03-23 08:00:00 +0900241 Ip4Address routerSnatIp,
242 MacAddress peerRouterMacAddress,
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900243 boolean install) {
244 MacAddress routerMacAddress = getRouterMacAddress(router);
245 if (routerMacAddress == null) {
246 return;
247 }
Daniel Park54205272021-03-23 08:00:00 +0900248
249 if (routerSnatIp == null || peerRouterMacAddress == null) {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900250 return;
251 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900252
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900253 TrafficSelector.Builder selector = DefaultTrafficSelector.builder()
Daniel Parkb9a22022021-03-04 18:58:47 +0900254 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900255 .matchEthDst(routerMacAddress);
Daniel Parkb9a22022021-03-04 18:58:47 +0900256
257 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
258
259 ExtensionTreatment natTreatment = RulePopulatorUtil
260 .niciraConnTrackTreatmentBuilder(driverService, gatewayNode.intgBridge())
261 .commit(true)
262 .natFlag(CT_NAT_SRC_FLAG)
263 .natAction(true)
Daniel Park54205272021-03-23 08:00:00 +0900264 .natIp(routerSnatIp)
Daniel Parkb9a22022021-03-04 18:58:47 +0900265 .natPortMin(TpPort.tpPort(TP_PORT_MINIMUM_NUM))
266 .natPortMax(TpPort.tpPort(TP_PORT_MAXIMUM_NUM))
267 .build();
268
269 tBuilder.extension(natTreatment, gatewayNode.intgBridge())
Daniel Park54205272021-03-23 08:00:00 +0900270 .setEthDst(peerRouterMacAddress)
Daniel Parkb9a22022021-03-04 18:58:47 +0900271 .setEthSrc(DEFAULT_GATEWAY_MAC)
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900272 .setOutput(externalPatchPortNum(deviceService, gatewayNode));
Daniel Parkb9a22022021-03-04 18:58:47 +0900273
274 flowService.setRule(
275 appId,
276 gatewayNode.intgBridge(),
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900277 selector.build(),
Daniel Parkb9a22022021-03-04 18:58:47 +0900278 tBuilder.build(),
279 PRIORITY_STATEFUL_SNAT_RULE,
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900280 PRE_FLAT_TABLE,
Daniel Parkb9a22022021-03-04 18:58:47 +0900281 install);
282 }
283
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900284 private void setStatefulSnatDownStreamRuleForKubevirtPort(KubevirtRouter router,
285 KubevirtNode gatewayNode,
286 KubevirtPort kubevirtPort,
287 boolean install) {
288 MacAddress routerMacAddress = getRouterMacAddress(router);
Daniel Parkb9a22022021-03-04 18:58:47 +0900289
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900290 if (routerMacAddress == null) {
Daniel Parkb9a22022021-03-04 18:58:47 +0900291 log.error("Failed to set stateful snat downstream rule because " +
292 "there's no br-int port for device {}", gatewayNode.intgBridge());
293 return;
294 }
295
296 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
297 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900298 .matchEthSrc(routerMacAddress)
Daniel Parkb9a22022021-03-04 18:58:47 +0900299 .matchIPDst(IpPrefix.valueOf(kubevirtPort.ipAddress(), 32));
300
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900301 KubevirtNetwork network = kubevirtNetworkService.network(kubevirtPort.networkId());
302
303 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
Daniel Parkb9a22022021-03-04 18:58:47 +0900304 .setEthDst(kubevirtPort.macAddress())
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900305 .transition(FORWARDING_TABLE);
Daniel Parkb9a22022021-03-04 18:58:47 +0900306
307 flowService.setRule(
308 appId,
309 gatewayNode.intgBridge(),
310 sBuilder.build(),
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900311 tBuilder.build(),
Daniel Parkb9a22022021-03-04 18:58:47 +0900312 PRIORITY_STATEFUL_SNAT_RULE,
313 FLAT_TABLE,
314 install);
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900315
316 if (network.type() == VXLAN || network.type() == GENEVE || network.type() == GRE) {
317 setDownStreamRulesToGatewayTunBridge(router, network, kubevirtPort, install);
318 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900319 }
320
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900321 private void setDownStreamRulesToGatewayTunBridge(KubevirtRouter router,
322 KubevirtNetwork network,
323 KubevirtPort port, boolean install) {
324 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
Daniel Parkb9a22022021-03-04 18:58:47 +0900325
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900326 if (electedGw == null) {
327 return;
328 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900329
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900330 KubevirtNode workerNode = kubevirtNodeService.node(port.deviceId());
331 if (workerNode == null) {
332 return;
333 }
334
335 PortNumber tunnelPortNumber = tunnelPort(electedGw, network);
336 if (tunnelPortNumber == null) {
Daniel Parkb9a22022021-03-04 18:58:47 +0900337 return;
338 }
339
340 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
341 .matchEthType(Ethernet.TYPE_IPV4)
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900342 .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32))
343 .matchEthDst(port.macAddress());
344
345 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
346 .setTunnelId(Long.parseLong(network.segmentId()))
347 .extension(buildExtension(
348 deviceService,
349 electedGw.tunBridge(),
350 workerNode.dataIp().getIp4Address()),
351 electedGw.tunBridge())
352 .setOutput(tunnelPortNumber);
353
354 flowService.setRule(
355 appId,
356 electedGw.tunBridge(),
357 sBuilder.build(),
358 tBuilder.build(),
359 PRIORITY_FORWARDING_RULE,
360 TUNNEL_DEFAULT_TABLE,
361 install);
362 }
363
364 private void setStatefulSnatDownstreamRuleForRouter(KubevirtNode gatewayNode,
365 KubevirtRouter router,
Daniel Park54205272021-03-23 08:00:00 +0900366 IpAddress routerSnatIp,
367 KubevirtNetwork externalNetwork,
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900368 boolean install) {
369
370 MacAddress routerMacAddress = getRouterMacAddress(router);
371
372 if (routerMacAddress == null) {
373 log.warn("Failed to set stateful snat downstream rule because " +
374 "there's no br-int port for device {}", gatewayNode.intgBridge());
375 return;
376 }
377
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900378 if (externalNetwork == null) {
379 log.warn("Failed to set stateful snat downstream rule because " +
380 "there's no external network router {}", router.name());
381 return;
382 }
383
384 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
385 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
386
387 if (externalNetwork.type() == VLAN) {
388 sBuilder.matchEthType(Ethernet.TYPE_VLAN)
389 .matchVlanId(VlanId.vlanId(externalNetwork.segmentId()));
390 tBuilder.popVlan();
391 } else {
392 sBuilder.matchEthType(Ethernet.TYPE_IPV4);
393 }
394
Daniel Park54205272021-03-23 08:00:00 +0900395 sBuilder.matchIPDst(IpPrefix.valueOf(routerSnatIp, 32));
Daniel Parkb9a22022021-03-04 18:58:47 +0900396
397 ExtensionTreatment natTreatment = RulePopulatorUtil
398 .niciraConnTrackTreatmentBuilder(driverService, gatewayNode.intgBridge())
399 .commit(false)
400 .natAction(true)
Daniel Parkcc8e7462021-03-09 13:37:42 +0900401 .table((short) FLAT_TABLE)
Daniel Parkb9a22022021-03-04 18:58:47 +0900402 .build();
403
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900404 tBuilder.setEthSrc(routerMacAddress)
405 .extension(natTreatment, gatewayNode.intgBridge());
Daniel Parkb9a22022021-03-04 18:58:47 +0900406
407 flowService.setRule(
408 appId,
409 gatewayNode.intgBridge(),
410 sBuilder.build(),
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900411 tBuilder.build(),
Daniel Parkb9a22022021-03-04 18:58:47 +0900412 PRIORITY_STATEFUL_SNAT_RULE,
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900413 PRE_FLAT_TABLE,
Daniel Parkb9a22022021-03-04 18:58:47 +0900414 install);
Daniel Parkb9a22022021-03-04 18:58:47 +0900415 }
416
417 private class InternalRouterEventListener implements KubevirtRouterListener {
418 private boolean isRelevantHelper() {
419 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
420 }
421
422 @Override
423 public void event(KubevirtRouterEvent event) {
424 switch (event.type()) {
425 case KUBEVIRT_ROUTER_CREATED:
426 eventExecutor.execute(() -> processRouterCreation(event.subject()));
427 break;
428 case KUBEVIRT_ROUTER_REMOVED:
429 eventExecutor.execute(() -> processRouterDeletion(event.subject()));
430 break;
431 case KUBEVIRT_ROUTER_UPDATED:
432 eventExecutor.execute(() -> processRouterUpdate(event.subject()));
433 break;
434 case KUBEVIRT_ROUTER_INTERNAL_NETWORKS_ATTACHED:
435 eventExecutor.execute(() -> processRouterInternalNetworksAttached(event.subject(),
436 event.internal()));
437 break;
438 case KUBEVIRT_ROUTER_INTERNAL_NETWORKS_DETACHED:
439 eventExecutor.execute(() -> processRouterInternalNetworksDetached(event.subject(),
440 event.internal()));
441 break;
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900442 case KUBEVIRT_GATEWAY_NODE_ATTACHED:
443 eventExecutor.execute(() -> processRouterGatewayNodeAttached(event.subject(),
444 event.gateway()));
445 break;
446 case KUBEVIRT_GATEWAY_NODE_DETACHED:
447 eventExecutor.execute(() -> processRouterGatewayNodeDetached(event.subject(),
448 event.gateway()));
449 break;
450 case KUBEVIRT_ROUTER_EXTERNAL_NETWORK_ATTACHED:
Daniel Park54205272021-03-23 08:00:00 +0900451 eventExecutor.execute(() -> processRouterExternalNetAttached(event.subject(),
452 event.externalIp(), event.externalNet(),
453 event.externalPeerRouterIp(), event.peerRouterMac()));
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900454 break;
455 case KUBEVIRT_ROUTER_EXTERNAL_NETWORK_DETACHED:
Daniel Park54205272021-03-23 08:00:00 +0900456 eventExecutor.execute(() -> processRouterExternalNetDetached(event.subject(),
457 event.externalIp(), event.externalNet(),
458 event.externalPeerRouterIp(), event.peerRouterMac()));
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900459 break;
Daniel Parkb9a22022021-03-04 18:58:47 +0900460 default:
461 //do nothing
462 break;
463 }
464 }
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900465
Daniel Park54205272021-03-23 08:00:00 +0900466 private void processRouterExternalNetAttached(KubevirtRouter router, String externalIp, String externalNet,
467 String peerRouterIp, MacAddress peerRouterMac) {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900468 if (!isRelevantHelper()) {
469 return;
470 }
471 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
472
473 if (electedGw == null) {
474 log.warn("Fail to process router external network attached gateway node snat for router {} " +
475 "there's no gateway assigned to it", router.name());
476 return;
477 }
478
Daniel Park54205272021-03-23 08:00:00 +0900479 if (router.enableSnat() &&
480 peerRouterIp != null && peerRouterMac != null && externalIp != null && externalNet != null) {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900481 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(externalIp), true);
Daniel Park54205272021-03-23 08:00:00 +0900482 setStatefulSnatUpstreamRules(electedGw, router, Ip4Address.valueOf(externalIp),
483 peerRouterMac, true);
484 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(externalIp),
485 kubevirtNetworkService.network(externalNet), true);
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900486 }
487
488 router.internal()
489 .stream()
490 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
491 .map(kubevirtNetworkService::network)
492 .forEach(network -> {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900493 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
494 setStatefulSnatDownStreamRuleForKubevirtPort(router,
495 electedGw, kubevirtPort, true);
496 });
497 });
498 }
499
Daniel Park54205272021-03-23 08:00:00 +0900500 private void processRouterExternalNetDetached(KubevirtRouter router, String externalIp, String externalNet,
501 String peerRouterIp, MacAddress peerRouterMac) {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900502 if (!isRelevantHelper()) {
503 return;
504 }
505 if (!isRelevantHelper()) {
506 return;
507 }
508 KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
509
510 if (electedGw == null) {
511 log.warn("Fail to process router external network attached gateway node snat for router {} " +
512 "there's no gateway assigned to it", router.name());
513 return;
514 }
515
Daniel Park54205272021-03-23 08:00:00 +0900516 if (router.enableSnat() &&
517 peerRouterIp != null && peerRouterMac != null && externalIp != null && externalNet != null) {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900518 setArpResponseToPeerRouter(electedGw, Ip4Address.valueOf(externalIp), false);
Daniel Park54205272021-03-23 08:00:00 +0900519 setStatefulSnatUpstreamRules(electedGw, router,
520 Ip4Address.valueOf(externalIp), peerRouterMac, false);
521 setStatefulSnatDownstreamRuleForRouter(electedGw, router, Ip4Address.valueOf(externalIp),
522 kubevirtNetworkService.network(externalNet), false);
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900523 }
524
525 router.internal()
526 .stream()
527 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
528 .map(kubevirtNetworkService::network)
529 .forEach(network -> {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900530 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
531 setStatefulSnatDownStreamRuleForKubevirtPort(router,
532 electedGw, kubevirtPort, false);
533 });
534 });
535 }
536
537 private void processRouterGatewayNodeAttached(KubevirtRouter router, String attachedGatewayId) {
538 if (!isRelevantHelper()) {
539 return;
540 }
541
542 KubevirtNode attachedGateway = kubevirtNodeService.node(attachedGatewayId);
543 if (attachedGateway == null) {
544 return;
545 }
546
547 router.internal()
548 .stream()
549 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
550 .map(kubevirtNetworkService::network)
551 .forEach(network -> {
552 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
553 if (routerSnatIp == null) {
554 return;
555 }
556 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
557 setStatefulSnatDownStreamRuleForKubevirtPort(router,
558 attachedGateway, kubevirtPort, true);
559 });
560 });
561 }
562
563 private void processRouterGatewayNodeDetached(KubevirtRouter router, String detachedGatewayId) {
564 if (!isRelevantHelper()) {
565 return;
566 }
567
568 KubevirtNode detachedGateway = kubevirtNodeService.node(detachedGatewayId);
569 if (detachedGateway == null) {
570 return;
571 }
572
573 router.internal()
574 .stream()
575 .filter(networkId -> kubevirtNetworkService.network(networkId) != null)
576 .map(kubevirtNetworkService::network)
577 .forEach(network -> {
578 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
579 if (routerSnatIp == null) {
580 return;
581 }
582
583 kubevirtPortService.ports(network.networkId()).forEach(kubevirtPort -> {
584 setStatefulSnatDownStreamRuleForKubevirtPort(router,
585 detachedGateway, kubevirtPort, false);
586 });
587 });
588 }
589
Daniel Parkb9a22022021-03-04 18:58:47 +0900590 private void processRouterInternalNetworksAttached(KubevirtRouter router,
591 Set<String> attachedInternalNetworks) {
592 if (!isRelevantHelper()) {
593 return;
594 }
595
596 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
597 if (gwNode == null) {
598 return;
599 }
600
601 attachedInternalNetworks.forEach(networkId -> {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900602 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
603 if (routerSnatIp == null) {
604 return;
605 }
606
Daniel Parkb9a22022021-03-04 18:58:47 +0900607 kubevirtPortService.ports(networkId).forEach(kubevirtPort -> {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900608 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Parkb9a22022021-03-04 18:58:47 +0900609 });
610 });
611 }
612
613 private void processRouterInternalNetworksDetached(KubevirtRouter router,
614 Set<String> detachedInternalNetworks) {
615 if (!isRelevantHelper()) {
616 return;
617 }
618
619 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
620 if (gwNode == null) {
621 return;
622 }
623
624 detachedInternalNetworks.forEach(networkId -> {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900625 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
626 if (routerSnatIp == null) {
627 log.info("snatIp is null");
628 return;
629 }
630
Daniel Parkb9a22022021-03-04 18:58:47 +0900631 kubevirtPortService.ports(networkId).forEach(kubevirtPort -> {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900632 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, false);
Daniel Parkb9a22022021-03-04 18:58:47 +0900633 });
634 });
635 }
636 private void processRouterCreation(KubevirtRouter router) {
637 if (!isRelevantHelper()) {
638 return;
639 }
640 if (router.enableSnat() && !router.external().isEmpty() && router.peerRouter() != null) {
641 initGatewayNodeSnatForRouter(router, true);
642 }
643 }
644
645 private void processRouterDeletion(KubevirtRouter router) {
646 if (!isRelevantHelper()) {
647 return;
648 }
649 if (router.enableSnat() && !router.external().isEmpty() && router.peerRouter() != null) {
650 initGatewayNodeSnatForRouter(router, false);
651 }
652 }
653
654 private void processRouterUpdate(KubevirtRouter router) {
655 if (!isRelevantHelper()) {
656 return;
657 }
658 if (router.enableSnat() && !router.external().isEmpty() && router.peerRouter() != null) {
659 initGatewayNodeSnatForRouter(router, true);
660 }
661 }
662 }
663
Daniel Parkb9a22022021-03-04 18:58:47 +0900664 private class InternalKubevirtPortListener implements KubevirtPortListener {
665
666 private boolean isRelevantHelper() {
667 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
668 }
669
670 @Override
671 public void event(KubevirtPortEvent event) {
672 switch (event.type()) {
673 case KUBEVIRT_PORT_CREATED:
674 eventExecutor.execute(() -> processPortCreation(event.subject()));
675 break;
676 case KUBEVIRT_PORT_UPDATED:
677 eventExecutor.execute(() -> processPortUpdate(event.subject()));
678 break;
679 case KUBEVIRT_PORT_REMOVED:
680 eventExecutor.execute(() -> processPortDeletion(event.subject()));
681 break;
682 default:
683 //do nothing
684 break;
685 }
686 }
687
688 private void processPortCreation(KubevirtPort kubevirtPort) {
689 if (!isRelevantHelper()) {
690 return;
691 }
692
Daniel Parkcc8e7462021-03-09 13:37:42 +0900693 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Parkb9a22022021-03-04 18:58:47 +0900694 if (router == null) {
695 return;
696 }
697
698 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
699
700 if (gwNode != null) {
701 IpAddress gatewaySnatIp = getRouterSnatIpAddress(kubevirtRouterService, kubevirtPort.networkId());
702 if (gatewaySnatIp == null) {
703 return;
704 }
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900705 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Parkb9a22022021-03-04 18:58:47 +0900706 }
707 }
708
709 private void processPortUpdate(KubevirtPort kubevirtPort) {
710 if (!isRelevantHelper()) {
711 return;
712 }
713
Daniel Parkcc8e7462021-03-09 13:37:42 +0900714 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Parkb9a22022021-03-04 18:58:47 +0900715 if (router == null) {
716 return;
717 }
718
719 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
720
721 if (gwNode != null) {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900722 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, true);
Daniel Parkb9a22022021-03-04 18:58:47 +0900723 }
724 }
725
726 private void processPortDeletion(KubevirtPort kubevirtPort) {
727 if (!isRelevantHelper()) {
728 return;
729 }
730
Daniel Parkcc8e7462021-03-09 13:37:42 +0900731 KubevirtRouter router = getRouterForKubevirtPort(kubevirtRouterService, kubevirtPort);
Daniel Parkb9a22022021-03-04 18:58:47 +0900732 if (router == null) {
733 return;
734 }
735
736 KubevirtNode gwNode = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
737
738 if (gwNode != null) {
Daniel Parke7e3d6a2021-03-10 07:49:11 +0900739 setStatefulSnatDownStreamRuleForKubevirtPort(router, gwNode, kubevirtPort, false);
Daniel Parkb9a22022021-03-04 18:58:47 +0900740 }
741 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900742 }
Jian Li91358d62021-03-22 11:04:52 +0900743
744 private class InternalNodeEventListener implements KubevirtNodeListener {
745
746 @Override
747 public void event(KubevirtNodeEvent event) {
748
749 }
750 }
Daniel Parkb9a22022021-03-04 18:58:47 +0900751}