blob: d5c047f378de8965efe4f86bda36772611c0c12a [file] [log] [blame]
Jian Li4aa17642019-01-30 00:01:11 +09001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.k8snetworking.impl;
17
18import org.onlab.packet.ARP;
19import org.onlab.packet.EthType;
20import org.onlab.packet.Ethernet;
21import org.onlab.packet.Ip4Address;
22import org.onlab.packet.IpAddress;
23import org.onlab.packet.MacAddress;
Jian Li44c2b122019-05-03 14:46:34 +090024import org.onlab.packet.VlanId;
25import org.onlab.util.KryoNamespace;
Jian Li4aa17642019-01-30 00:01:11 +090026import org.onlab.util.Tools;
27import org.onosproject.cfg.ComponentConfigService;
28import org.onosproject.cfg.ConfigProperty;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.LeadershipService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.core.ApplicationId;
33import org.onosproject.core.CoreService;
34import org.onosproject.k8snetworking.api.K8sFlowRuleService;
Jian Li140d8a22019-04-24 23:41:44 +090035import org.onosproject.k8snetworking.api.K8sNetwork;
Jian Li4aa17642019-01-30 00:01:11 +090036import org.onosproject.k8snetworking.api.K8sNetworkService;
37import org.onosproject.k8snetworking.api.K8sPort;
Jian Li7d111d72019-04-12 13:58:44 +090038import org.onosproject.k8snetworking.api.K8sServiceService;
Jian Lieab51352020-09-11 03:29:16 +090039import org.onosproject.k8snode.api.K8sHostService;
Jian Li4aa17642019-01-30 00:01:11 +090040import org.onosproject.k8snode.api.K8sNode;
41import org.onosproject.k8snode.api.K8sNodeEvent;
42import org.onosproject.k8snode.api.K8sNodeListener;
43import org.onosproject.k8snode.api.K8sNodeService;
44import org.onosproject.mastership.MastershipService;
Jian Li44c2b122019-05-03 14:46:34 +090045import org.onosproject.net.ConnectPoint;
Jian Lieab51352020-09-11 03:29:16 +090046import org.onosproject.net.DeviceId;
Jian Li4aa17642019-01-30 00:01:11 +090047import org.onosproject.net.PortNumber;
48import org.onosproject.net.device.DeviceService;
49import org.onosproject.net.flow.DefaultTrafficSelector;
50import org.onosproject.net.flow.DefaultTrafficTreatment;
51import org.onosproject.net.flow.TrafficSelector;
52import org.onosproject.net.flow.TrafficTreatment;
53import org.onosproject.net.packet.DefaultOutboundPacket;
54import org.onosproject.net.packet.PacketContext;
55import org.onosproject.net.packet.PacketProcessor;
56import org.onosproject.net.packet.PacketService;
Jian Li44c2b122019-05-03 14:46:34 +090057import org.onosproject.store.serializers.KryoNamespaces;
58import org.onosproject.store.service.ConsistentMap;
59import org.onosproject.store.service.Serializer;
60import org.onosproject.store.service.StorageService;
Jian Li4aa17642019-01-30 00:01:11 +090061import org.osgi.service.component.ComponentContext;
62import org.osgi.service.component.annotations.Activate;
63import org.osgi.service.component.annotations.Component;
64import org.osgi.service.component.annotations.Deactivate;
65import org.osgi.service.component.annotations.Modified;
66import org.osgi.service.component.annotations.Reference;
67import org.osgi.service.component.annotations.ReferenceCardinality;
68import org.slf4j.Logger;
69import org.slf4j.LoggerFactory;
70
71import java.nio.ByteBuffer;
72import java.util.Dictionary;
73import java.util.Objects;
74import java.util.Set;
75import java.util.concurrent.ExecutorService;
Jian Li7d111d72019-04-12 13:58:44 +090076import java.util.stream.Collectors;
Jian Li4aa17642019-01-30 00:01:11 +090077
78import static java.util.concurrent.Executors.newSingleThreadExecutor;
79import static org.onlab.util.Tools.groupedThreads;
80import static org.onosproject.k8snetworking.api.Constants.ARP_BROADCAST_MODE;
81import static org.onosproject.k8snetworking.api.Constants.ARP_PROXY_MODE;
82import static org.onosproject.k8snetworking.api.Constants.ARP_TABLE;
83import static org.onosproject.k8snetworking.api.Constants.K8S_NETWORKING_APP_ID;
Jian Li140d8a22019-04-24 23:41:44 +090084import static org.onosproject.k8snetworking.api.Constants.NODE_IP_PREFIX;
Jian Li4aa17642019-01-30 00:01:11 +090085import static org.onosproject.k8snetworking.api.Constants.PRIORITY_ARP_CONTROL_RULE;
Jian Li7d111d72019-04-12 13:58:44 +090086import static org.onosproject.k8snetworking.api.Constants.SERVICE_FAKE_MAC_STR;
Jian Lieab51352020-09-11 03:29:16 +090087import static org.onosproject.k8snetworking.api.Constants.SHIFTED_IP_PREFIX;
Jian Li4aa17642019-01-30 00:01:11 +090088import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE;
89import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.ARP_MODE_DEFAULT;
90import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC;
91import static org.onosproject.k8snetworking.impl.OsgiPropertyConstants.GATEWAY_MAC_DEFAULT;
Jian Lieab51352020-09-11 03:29:16 +090092import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.allK8sDevices;
Jian Li4aa17642019-01-30 00:01:11 +090093import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.getPropertyValue;
Jian Li004526d2019-02-25 16:26:27 +090094import static org.onosproject.k8snetworking.util.K8sNetworkingUtil.unshiftIpDomain;
Jian Li4aa17642019-01-30 00:01:11 +090095
96/**
97 * Handles ARP packet from containers.
98 */
99@Component(
100 immediate = true,
101 property = {
102 GATEWAY_MAC + "=" + GATEWAY_MAC_DEFAULT,
103 ARP_MODE + "=" + ARP_MODE_DEFAULT
104 }
105)
106public class K8sSwitchingArpHandler {
107
108 private final Logger log = LoggerFactory.getLogger(getClass());
109
Jian Li7d111d72019-04-12 13:58:44 +0900110 private static final String GATEWAY_MAC = "gatewayMac";
111 private static final String ARP_MODE = "arpMode";
112
Jian Li44c2b122019-05-03 14:46:34 +0900113 private static final KryoNamespace SERIALIZER_HOST_MAC = KryoNamespace.newBuilder()
114 .register(KryoNamespaces.API)
115 .build();
116
Jian Li4aa17642019-01-30 00:01:11 +0900117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected CoreService coreService;
119
120 @Reference(cardinality = ReferenceCardinality.MANDATORY)
121 protected PacketService packetService;
122
123 @Reference(cardinality = ReferenceCardinality.MANDATORY)
124 protected ComponentConfigService configService;
125
126 @Reference(cardinality = ReferenceCardinality.MANDATORY)
127 protected ClusterService clusterService;
128
129 @Reference(cardinality = ReferenceCardinality.MANDATORY)
130 protected LeadershipService leadershipService;
131
132 @Reference(cardinality = ReferenceCardinality.MANDATORY)
133 protected DeviceService deviceService;
134
135 @Reference(cardinality = ReferenceCardinality.MANDATORY)
136 protected MastershipService mastershipService;
137
138 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li44c2b122019-05-03 14:46:34 +0900139 protected StorageService storageService;
140
141 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4aa17642019-01-30 00:01:11 +0900142 protected K8sNodeService k8sNodeService;
143
144 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Lieab51352020-09-11 03:29:16 +0900145 protected K8sHostService k8sHostService;
146
147 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jian Li4aa17642019-01-30 00:01:11 +0900148 protected K8sNetworkService k8sNetworkService;
149
150 @Reference(cardinality = ReferenceCardinality.MANDATORY)
151 protected K8sFlowRuleService k8sFlowRuleService;
152
Jian Li7d111d72019-04-12 13:58:44 +0900153 @Reference(cardinality = ReferenceCardinality.MANDATORY)
154 protected K8sServiceService k8sServiceService;
155
Jian Li4aa17642019-01-30 00:01:11 +0900156 /** Fake MAC address for virtual network subnet gateway. */
157 private String gatewayMac = GATEWAY_MAC_DEFAULT;
158
159 /** ARP processing mode, broadcast | proxy (default). */
160 protected String arpMode = ARP_MODE_DEFAULT;
161
162 private MacAddress gwMacAddress;
163
Jian Li44c2b122019-05-03 14:46:34 +0900164 private ConsistentMap<IpAddress, MacAddress> extHostMacStore;
165
Jian Li4aa17642019-01-30 00:01:11 +0900166 private final ExecutorService eventExecutor = newSingleThreadExecutor(
167 groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
168
169 private final InternalPacketProcessor packetProcessor = new InternalPacketProcessor();
170 private final InternalNodeEventListener k8sNodeListener = new InternalNodeEventListener();
171
172 private ApplicationId appId;
173 private NodeId localNodeId;
174
175 @Activate
176 void activate() {
177 appId = coreService.registerApplication(K8S_NETWORKING_APP_ID);
178 configService.registerProperties(getClass());
179 localNodeId = clusterService.getLocalNode().id();
180 leadershipService.runForLeadership(appId.name());
181 k8sNodeService.addListener(k8sNodeListener);
182 packetService.addProcessor(packetProcessor, PacketProcessor.director(0));
183
Jian Li44c2b122019-05-03 14:46:34 +0900184 extHostMacStore = storageService.<IpAddress, MacAddress>consistentMapBuilder()
185 .withSerializer(Serializer.using(SERIALIZER_HOST_MAC))
186 .withName("k8s-host-mac-store")
187 .withApplicationId(appId)
188 .build();
189
Jian Li4aa17642019-01-30 00:01:11 +0900190 log.info("Started");
191 }
192
193 @Deactivate
194 void deactivate() {
195 packetService.removeProcessor(packetProcessor);
196 k8sNodeService.removeListener(k8sNodeListener);
197 leadershipService.withdraw(appId.name());
198 configService.unregisterProperties(getClass(), false);
199 eventExecutor.shutdown();
200
201 log.info("Stopped");
202 }
203
204 @Modified
205 void modified(ComponentContext context) {
206 readComponentConfiguration(context);
207
208 log.info("Modified");
209 }
210
211 /**
212 * Processes ARP request packets.
213 *
214 * @param context packet context
215 * @param ethPacket ethernet packet
216 */
217 private void processPacketIn(PacketContext context, Ethernet ethPacket) {
218 // if the ARP mode is configured as broadcast mode, we simply ignore ARP packet_in
219 if (ARP_BROADCAST_MODE.equals(getArpMode())) {
220 return;
221 }
222
Jian Lieab51352020-09-11 03:29:16 +0900223 DeviceId deviceId = context.inPacket().receivedFrom().deviceId();
224
225 if (!allK8sDevices(k8sNodeService, k8sHostService).contains(deviceId)) {
226 return;
227 }
228
Jian Li4aa17642019-01-30 00:01:11 +0900229 ARP arpPacket = (ARP) ethPacket.getPayload();
Jian Li44c2b122019-05-03 14:46:34 +0900230 if (arpPacket.getOpCode() == ARP.OP_REQUEST) {
231 processArpRequest(context, ethPacket);
232 } else if (arpPacket.getOpCode() == ARP.OP_REPLY) {
233 processArpReply(context, ethPacket);
Jian Li4aa17642019-01-30 00:01:11 +0900234 }
Jian Li44c2b122019-05-03 14:46:34 +0900235 }
Jian Li4aa17642019-01-30 00:01:11 +0900236
Jian Li44c2b122019-05-03 14:46:34 +0900237 private void processArpRequest(PacketContext context, Ethernet ethPacket) {
238 ARP arpPacket = (ARP) ethPacket.getPayload();
Jian Li4b5048a2020-10-08 02:57:45 +0900239 K8sPort srcK8sPort = k8sNetworkService.ports().stream()
Jian Li4aa17642019-01-30 00:01:11 +0900240 .filter(p -> p.macAddress().equals(ethPacket.getSourceMAC()))
241 .findAny().orElse(null);
242
Jian Li4b5048a2020-10-08 02:57:45 +0900243 PortNumber srcPortNum = context.inPacket().receivedFrom().port();
244 DeviceId srcDeviceId = context.inPacket().receivedFrom().deviceId();
245 boolean isEntryPort = false;
246
247 for (K8sNode node : k8sNodeService.completeNodes()) {
248 if (srcDeviceId.equals(node.intgBridge()) &&
249 srcPortNum.equals(node.intgEntryPortNum())) {
250 isEntryPort = true;
251 }
252 }
253
254 // if the ARP request is not initiated from regular k8s ports nor
255 // integration bridge entry port, we simply ignore the ARP request...
256 if (srcK8sPort == null && !isEntryPort) {
Jian Li4aa17642019-01-30 00:01:11 +0900257 log.warn("Failed to find source port(MAC:{})", ethPacket.getSourceMAC());
258 return;
259 }
260
Jian Li4aa17642019-01-30 00:01:11 +0900261 IpAddress targetIp = Ip4Address.valueOf(arpPacket.getTargetProtocolAddress());
262
Jian Li4b5048a2020-10-08 02:57:45 +0900263 // look up the MAC address from regular k8s ports
Jian Li4aa17642019-01-30 00:01:11 +0900264 MacAddress replyMac = k8sNetworkService.ports().stream()
Jian Li44c2b122019-05-03 14:46:34 +0900265 // .filter(p -> p.networkId().equals(srcPort.networkId()))
Jian Li4aa17642019-01-30 00:01:11 +0900266 .filter(p -> p.ipAddress().equals(targetIp))
267 .map(K8sPort::macAddress)
268 .findAny().orElse(null);
269
Jian Li4b5048a2020-10-08 02:57:45 +0900270 // look up the MAC address from special integration entry port (e.g., LOCAL, k8s-int-os)
271 for (K8sNetwork network : k8sNetworkService.networks()) {
272 if (network.gatewayIp().equals(targetIp)) {
273 K8sNode node = k8sNodeService.node(network.name());
274 replyMac = node.intgEntryPortMac();
275 }
Jian Li4aa17642019-01-30 00:01:11 +0900276 }
277
278 if (replyMac == null) {
Jian Li140d8a22019-04-24 23:41:44 +0900279 String cidr = k8sNetworkService.networks().stream()
280 .map(K8sNetwork::cidr).findAny().orElse(null);
281
282 if (cidr != null) {
283 String unshiftedIp = unshiftIpDomain(targetIp.toString(),
284 SHIFTED_IP_PREFIX, cidr);
285
Jian Li004526d2019-02-25 16:26:27 +0900286 replyMac = k8sNetworkService.ports().stream()
Jian Li140d8a22019-04-24 23:41:44 +0900287 .filter(p -> p.ipAddress().equals(IpAddress.valueOf(unshiftedIp)))
Jian Li004526d2019-02-25 16:26:27 +0900288 .map(K8sPort::macAddress)
289 .findAny().orElse(null);
Jian Li004526d2019-02-25 16:26:27 +0900290 }
291 }
292
293 if (replyMac == null) {
Jian Li7d111d72019-04-12 13:58:44 +0900294 Set<String> serviceIps = k8sServiceService.services().stream()
295 .map(s -> s.getSpec().getClusterIP())
296 .collect(Collectors.toSet());
297 if (serviceIps.contains(targetIp.toString())) {
298 replyMac = MacAddress.valueOf(SERVICE_FAKE_MAC_STR);
299 }
300 }
301
302 if (replyMac == null) {
Jian Li140d8a22019-04-24 23:41:44 +0900303
Jian Li44c2b122019-05-03 14:46:34 +0900304 if (targetIp.toString().startsWith(NODE_IP_PREFIX)) {
305 String targetIpPrefix = targetIp.toString().split("\\.")[1];
306 String nodePrefix = NODE_IP_PREFIX + "." + targetIpPrefix;
Jian Li140d8a22019-04-24 23:41:44 +0900307
Jian Li44c2b122019-05-03 14:46:34 +0900308 String exBridgeCidr = k8sNodeService.completeNodes().stream()
309 .map(n -> n.extBridgeIp().toString()).findAny().orElse(null);
Jian Li140d8a22019-04-24 23:41:44 +0900310
Jian Li44c2b122019-05-03 14:46:34 +0900311 if (exBridgeCidr != null) {
312 String extBridgeIp = unshiftIpDomain(targetIp.toString(),
313 nodePrefix, exBridgeCidr);
314
315 replyMac = k8sNodeService.completeNodes().stream()
316 .filter(n -> extBridgeIp.equals(n.extBridgeIp().toString()))
317 .map(K8sNode::extBridgeMac).findAny().orElse(null);
318
319 if (replyMac == null) {
320 replyMac = extHostMacStore.asJavaMap().get(
321 IpAddress.valueOf(extBridgeIp));
322 }
323
324 // if the source hosts are not in k8s cluster range,
325 // we need to manually learn their MAC addresses
326 if (replyMac == null) {
327 ConnectPoint cp = context.inPacket().receivedFrom();
328 K8sNode k8sNode = k8sNodeService.node(cp.deviceId());
329
330 if (k8sNode != null) {
331 setArpRequest(k8sNode.extBridgeMac().toBytes(),
332 k8sNode.extBridgeIp().toOctets(),
333 IpAddress.valueOf(extBridgeIp).toOctets(),
334 k8sNode);
335 context.block();
336 return;
337 }
338 }
339 }
Jian Li140d8a22019-04-24 23:41:44 +0900340 }
341 }
342
343 if (replyMac == null) {
Jian Li1c68e9a2020-10-11 02:45:16 +0900344 replyMac = MacAddress.valueOf(gatewayMac);
Jian Li4aa17642019-01-30 00:01:11 +0900345 }
346
347 Ethernet ethReply = ARP.buildArpReply(
348 targetIp.getIp4Address(),
349 replyMac,
350 ethPacket);
351
352 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
353 .setOutput(context.inPacket().receivedFrom().port())
354 .build();
355
356 packetService.emit(new DefaultOutboundPacket(
357 context.inPacket().receivedFrom().deviceId(),
358 treatment,
359 ByteBuffer.wrap(ethReply.serialize())));
Jian Li1b08d652019-05-02 17:28:09 +0900360
361 context.block();
Jian Li4aa17642019-01-30 00:01:11 +0900362 }
363
Jian Li44c2b122019-05-03 14:46:34 +0900364 private void processArpReply(PacketContext context, Ethernet ethPacket) {
365 ARP arpPacket = (ARP) ethPacket.getPayload();
366 ConnectPoint cp = context.inPacket().receivedFrom();
367 K8sNode k8sNode = k8sNodeService.node(cp.deviceId());
368
369 if (k8sNode != null &&
370 ethPacket.getDestinationMAC().equals(k8sNode.extBridgeMac())) {
371 IpAddress srcIp = IpAddress.valueOf(IpAddress.Version.INET,
372 arpPacket.getSenderProtocolAddress());
373 MacAddress srcMac = MacAddress.valueOf(arpPacket.getSenderHardwareAddress());
374
375 // we only add the host IP - MAC map store once,
376 // mutable MAP scenario is not considered for now
377 if (!extHostMacStore.containsKey(srcIp)) {
378 extHostMacStore.put(srcIp, srcMac);
379 }
380 }
381 }
382
383 private void setArpRequest(byte[] senderMac, byte[] senderIp,
384 byte[] targetIp, K8sNode k8sNode) {
385 Ethernet ethRequest = ARP.buildArpRequest(senderMac,
386 senderIp, targetIp, VlanId.NO_VID);
387
388 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
389 .setOutput(k8sNode.intgToExtPatchPortNum())
390 .build();
391
392 packetService.emit(new DefaultOutboundPacket(
393 k8sNode.intgBridge(),
394 treatment,
395 ByteBuffer.wrap(ethRequest.serialize())));
396 }
397
Jian Li4aa17642019-01-30 00:01:11 +0900398 private String getArpMode() {
399 Set<ConfigProperty> properties = configService.getProperties(this.getClass().getName());
400 return getPropertyValue(properties, ARP_MODE);
401 }
402
403 /**
404 * Extracts properties from the component configuration context.
405 *
406 * @param context the component context
407 */
408 private void readComponentConfiguration(ComponentContext context) {
409 Dictionary<?, ?> properties = context.getProperties();
410
411 String updatedMac = Tools.get(properties, GATEWAY_MAC);
412 gatewayMac = updatedMac != null ? updatedMac : GATEWAY_MAC_DEFAULT;
413 log.info("Configured. Gateway MAC is {}", gatewayMac);
414 }
415
416 /**
417 * An internal packet processor which processes ARP request, and results in
418 * packet-out ARP reply.
419 */
420 private class InternalPacketProcessor implements PacketProcessor {
421
422 @Override
423 public void process(PacketContext context) {
424 if (context.isHandled()) {
425 return;
426 }
427
428 Ethernet ethPacket = context.inPacket().parsed();
429 if (ethPacket == null || ethPacket.getEtherType() != Ethernet.TYPE_ARP) {
430 return;
431 }
432
433 eventExecutor.execute(() -> processPacketIn(context, ethPacket));
434 }
435 }
436
437 /**
438 * An internal kubernetes node listener which is used for listening kubernetes
439 * node activity. As long as a node is in complete state, we will install
440 * default ARP rule to handle ARP request.
441 */
442 private class InternalNodeEventListener implements K8sNodeListener {
443
444 private boolean isRelevantHelper() {
445 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
446 }
447
448 @Override
449 public void event(K8sNodeEvent event) {
450 K8sNode k8sNode = event.subject();
451 switch (event.type()) {
452 case K8S_NODE_COMPLETE:
453 eventExecutor.execute(() -> processNodeCompletion(k8sNode));
454 break;
455 case K8S_NODE_INCOMPLETE:
456 eventExecutor.execute(() -> processNodeIncompletion(k8sNode));
457 break;
458 default:
459 break;
460 }
461 }
462
463 private void processNodeCompletion(K8sNode node) {
464 if (!isRelevantHelper()) {
465 return;
466 }
467
468 setDefaultArpRule(node, true);
469 }
470
471 private void processNodeIncompletion(K8sNode node) {
472 if (!isRelevantHelper()) {
473 return;
474 }
475
476 setDefaultArpRule(node, false);
477 }
478
479 private void setDefaultArpRule(K8sNode node, boolean install) {
480
481 if (getArpMode() == null) {
482 return;
483 }
484
485 switch (getArpMode()) {
486 case ARP_PROXY_MODE:
487 setDefaultArpRuleForProxyMode(node, install);
488 break;
489 case ARP_BROADCAST_MODE:
490 // TODO: need to implement broadcast mode
491 log.warn("Not implemented yet.");
492 break;
493 default:
494 log.warn("Invalid ARP mode {}. Please use either " +
495 "broadcast or proxy mode.", getArpMode());
496 break;
497 }
498 }
499
500 private void setDefaultArpRuleForProxyMode(K8sNode node, boolean install) {
501 TrafficSelector selector = DefaultTrafficSelector.builder()
502 .matchEthType(EthType.EtherType.ARP.ethType().toShort())
503 .build();
504
505 TrafficTreatment treatment = DefaultTrafficTreatment.builder()
506 .punt()
507 .build();
508
509 k8sFlowRuleService.setRule(
510 appId,
511 node.intgBridge(),
512 selector,
513 treatment,
514 PRIORITY_ARP_CONTROL_RULE,
515 ARP_TABLE,
516 install
517 );
518 }
519 }
520}