blob: 77b60d904c9cb91ba3dea6f23b09bc530e2a08da [file] [log] [blame]
Jian Li8f944d42021-03-23 00:43:29 +09001/*
2 * Copyright 2020-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import com.google.common.collect.Sets;
19import org.onlab.packet.Ethernet;
20import org.onlab.packet.IPv4;
21import org.onlab.packet.Ip4Address;
22import org.onlab.packet.Ip4Prefix;
23import org.onlab.packet.IpPrefix;
24import org.onlab.packet.TpPort;
25import org.onlab.packet.VlanId;
26import org.onlab.util.Tools;
27import org.onosproject.cfg.ComponentConfigService;
28import org.onosproject.cfg.ConfigProperty;
29import org.onosproject.cluster.ClusterService;
30import org.onosproject.cluster.LeadershipService;
31import org.onosproject.cluster.NodeId;
32import org.onosproject.core.ApplicationId;
33import org.onosproject.core.CoreService;
34import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
35import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
36import org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type;
37import org.onosproject.kubevirtnetworking.api.KubevirtNetworkEvent;
38import org.onosproject.kubevirtnetworking.api.KubevirtNetworkListener;
39import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
40import org.onosproject.kubevirtnetworking.api.KubevirtPort;
41import org.onosproject.kubevirtnetworking.api.KubevirtPortEvent;
42import org.onosproject.kubevirtnetworking.api.KubevirtPortListener;
43import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
44import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroup;
45import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupEvent;
46import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupListener;
47import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupRule;
48import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupService;
49import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
50import org.onosproject.kubevirtnode.api.KubevirtNode;
51import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
52import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
53import org.onosproject.kubevirtnode.api.KubevirtNodeService;
54import org.onosproject.mastership.MastershipService;
55import org.onosproject.net.DeviceId;
56import org.onosproject.net.device.DeviceService;
57import org.onosproject.net.driver.DriverService;
58import org.onosproject.net.flow.DefaultTrafficSelector;
59import org.onosproject.net.flow.DefaultTrafficTreatment;
60import org.onosproject.net.flow.TrafficSelector;
61import org.onosproject.net.flow.TrafficTreatment;
62import org.onosproject.net.flow.criteria.ExtensionSelector;
63import org.onosproject.net.flow.instructions.ExtensionTreatment;
64import org.onosproject.store.service.StorageService;
65import org.osgi.service.component.ComponentContext;
66import org.osgi.service.component.annotations.Activate;
67import org.osgi.service.component.annotations.Component;
68import org.osgi.service.component.annotations.Deactivate;
69import org.osgi.service.component.annotations.Modified;
70import org.osgi.service.component.annotations.Reference;
71import org.osgi.service.component.annotations.ReferenceCardinality;
72import org.slf4j.Logger;
73
74import java.util.Dictionary;
75import java.util.Map;
76import java.util.Objects;
77import java.util.Set;
78import java.util.concurrent.ExecutorService;
79import java.util.stream.Collectors;
80
81import static java.lang.Thread.sleep;
82import static java.util.concurrent.Executors.newSingleThreadExecutor;
83import static org.onlab.util.Tools.groupedThreads;
84import static org.onosproject.kubevirtnetworking.api.Constants.ERROR_TABLE;
85import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
86import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ACL_INGRESS_RULE;
87import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ACL_RULE;
88import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_CT_DROP_RULE;
89import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_CT_HOOK_RULE;
90import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_CT_RULE;
91import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_ACL_CT_TABLE;
92import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_ACL_EGRESS_TABLE;
93import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_ACL_INGRESS_TABLE;
94import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_ACL_RECIRC_TABLE;
95import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_FORWARDING_TABLE;
96import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GENEVE;
97import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GRE;
98import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
99import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VXLAN;
100import static org.onosproject.kubevirtnetworking.impl.OsgiPropertyConstants.USE_SECURITY_GROUP;
101import static org.onosproject.kubevirtnetworking.impl.OsgiPropertyConstants.USE_SECURITY_GROUP_DEFAULT;
102import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPropertyValueAsBoolean;
103import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildPortRangeMatches;
104import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.computeCtMaskFlag;
105import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.computeCtStateFlag;
106import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
107import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
108import static org.slf4j.LoggerFactory.getLogger;
109
110/**
111 * Populates flow rules to handle EdgeStack SecurityGroups.
112 */
113@Component(
114 immediate = true,
115 property = {
116 USE_SECURITY_GROUP + ":Boolean=" + USE_SECURITY_GROUP_DEFAULT
117 }
118)
119public class KubevirtSecurityGroupHandler {
120
121 private final Logger log = getLogger(getClass());
122
123 private static final int VM_IP_PREFIX = 32;
124
125 private static final String STR_NULL = "null";
126 private static final String PROTO_ICMP = "ICMP";
127 private static final String PROTO_ICMP_NUM = "1";
128 private static final String PROTO_TCP = "TCP";
129 private static final String PROTO_TCP_NUM = "6";
130 private static final String PROTO_UDP = "UDP";
131 private static final String PROTO_UDP_NUM = "17";
132 private static final String PROTO_SCTP = "SCTP";
133 private static final String PROTO_SCTP_NUM = "132";
134 private static final byte PROTOCOL_SCTP = (byte) 0x84;
135 private static final String PROTO_ANY = "ANY";
136 private static final String PROTO_ANY_NUM = "0";
137 private static final String ETHTYPE_IPV4 = "IPV4";
138 private static final String EGRESS = "EGRESS";
139 private static final String INGRESS = "INGRESS";
140 private static final IpPrefix IP_PREFIX_ANY = Ip4Prefix.valueOf("0.0.0.0/0");
141
142 private static final int ICMP_CODE_MIN = 0;
143 private static final int ICMP_CODE_MAX = 255;
144 private static final int ICMP_TYPE_MIN = 0;
145 private static final int ICMP_TYPE_MAX = 255;
146
147 private static final int CT_COMMIT = 0;
148 private static final int CT_NO_COMMIT = 1;
149 private static final short CT_NO_RECIRC = -1;
150
151 private static final int ACTION_NONE = 0;
152 private static final int ACTION_DROP = -1;
153
154 private static final long SLEEP_MS = 5000;
155
156 /** Apply EdgeStack security group rule for VM traffic. */
157 private boolean useSecurityGroup = USE_SECURITY_GROUP_DEFAULT;
158
159 @Reference(cardinality = ReferenceCardinality.MANDATORY)
160 protected CoreService coreService;
161
162 @Reference(cardinality = ReferenceCardinality.MANDATORY)
163 protected MastershipService mastershipService;
164
165 @Reference(cardinality = ReferenceCardinality.MANDATORY)
166 protected DriverService driverService;
167
168 @Reference(cardinality = ReferenceCardinality.MANDATORY)
169 protected DeviceService deviceService;
170
171 @Reference(cardinality = ReferenceCardinality.MANDATORY)
172 protected LeadershipService leadershipService;
173
174 @Reference(cardinality = ReferenceCardinality.MANDATORY)
175 protected ClusterService clusterService;
176
177 @Reference(cardinality = ReferenceCardinality.MANDATORY)
178 protected StorageService storageService;
179
180 @Reference(cardinality = ReferenceCardinality.MANDATORY)
181 protected ComponentConfigService configService;
182
183 @Reference(cardinality = ReferenceCardinality.MANDATORY)
184 protected KubevirtNodeService nodeService;
185
186 @Reference(cardinality = ReferenceCardinality.MANDATORY)
187 protected KubevirtNetworkService networkService;
188
189 @Reference(cardinality = ReferenceCardinality.MANDATORY)
190 protected KubevirtPortService portService;
191
192 @Reference(cardinality = ReferenceCardinality.MANDATORY)
193 protected KubevirtFlowRuleService flowRuleService;
194
195 @Reference(cardinality = ReferenceCardinality.MANDATORY)
196 protected KubevirtSecurityGroupService securityGroupService;
197
198 private final KubevirtPortListener portListener =
199 new InternalKubevirtPortListener();
200 private final KubevirtSecurityGroupListener securityGroupListener =
201 new InternalSecurityGroupListener();
202 private final KubevirtNodeListener nodeListener =
203 new InternalNodeListener();
204 private final KubevirtNetworkListener networkListener =
205 new InternalNetworkListener();
206
207 private final ExecutorService eventExecutor = newSingleThreadExecutor(
208 groupedThreads(this.getClass().getSimpleName(), "event-handler"));
209
210 private ApplicationId appId;
211 private NodeId localNodeId;
212
213 @Activate
214 protected void activate() {
215 appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
216 localNodeId = clusterService.getLocalNode().id();
217 securityGroupService.addListener(securityGroupListener);
218 portService.addListener(portListener);
219 networkService.addListener(networkListener);
220 configService.registerProperties(getClass());
221 nodeService.addListener(nodeListener);
222
223 log.info("Started");
224 }
225
226 @Deactivate
227 protected void deactivate() {
228 securityGroupService.removeListener(securityGroupListener);
229 portService.removeListener(portListener);
230 configService.unregisterProperties(getClass(), false);
231 nodeService.removeListener(nodeListener);
232 networkService.removeListener(networkListener);
233 eventExecutor.shutdown();
234
235 log.info("Stopped");
236 }
237
238 @Modified
239 protected void modified(ComponentContext context) {
240 Dictionary<?, ?> properties = context.getProperties();
241 Boolean flag;
242
243 flag = Tools.isPropertyEnabled(properties, USE_SECURITY_GROUP);
244 if (flag == null) {
245 log.info("useSecurityGroup is not configured, " +
246 "using current value of {}", useSecurityGroup);
247 } else {
248 useSecurityGroup = flag;
249 log.info("Configured. useSecurityGroup is {}",
250 useSecurityGroup ? "enabled" : "disabled");
251 }
252
253 securityGroupService.setSecurityGroupEnabled(useSecurityGroup);
254 resetSecurityGroupRules();
255 }
256
257 private boolean getUseSecurityGroupFlag() {
258 Set<ConfigProperty> properties =
259 configService.getProperties(getClass().getName());
260 return getPropertyValueAsBoolean(properties, USE_SECURITY_GROUP);
261 }
262
263 private void initializeConnTrackTable(DeviceId deviceId, boolean install) {
264
265 // table={ACL_INGRESS_TABLE(44)},ip,ct_state=-trk, actions=ct(table:{ACL_CT_TABLE(45)})
266 long ctState = computeCtStateFlag(false, false, false);
267 long ctMask = computeCtMaskFlag(true, false, false);
268 setConnTrackRule(deviceId, ctState, ctMask, CT_NO_COMMIT, (short) TENANT_ACL_CT_TABLE,
269 ACTION_NONE, PRIORITY_CT_HOOK_RULE, install);
270
271 //table={ACL_CT_TABLE(45)},ip,nw_dst=10.10.0.2,ct_state=+trk+est,action=goto_table:{NORMAL_TABLE(80)}
272 ctState = computeCtStateFlag(true, false, true);
273 ctMask = computeCtMaskFlag(true, false, true);
274 setConnTrackRule(deviceId, ctState, ctMask, CT_NO_COMMIT, CT_NO_RECIRC,
275 TENANT_FORWARDING_TABLE, PRIORITY_CT_RULE, install);
276
277 //table={ACL_CT_TABLE(45)},ip,nw_dst=10.10.0.2,ct_state=+trk+new,action=drop
278 ctState = computeCtStateFlag(true, true, false);
279 ctMask = computeCtMaskFlag(true, true, false);
280 setConnTrackRule(deviceId, ctState, ctMask, CT_NO_COMMIT, CT_NO_RECIRC,
281 ACTION_DROP, PRIORITY_CT_DROP_RULE, install);
282 }
283
284 private void initializeAclTable(DeviceId deviceId, boolean install) {
285
286 ExtensionTreatment ctTreatment =
287 niciraConnTrackTreatmentBuilder(driverService, deviceId)
288 .commit(true)
289 .build();
290
291 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
292 sBuilder.matchEthType(Ethernet.TYPE_IPV4);
293
294 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
295 tBuilder.extension(ctTreatment, deviceId)
296 .transition(TENANT_FORWARDING_TABLE);
297
298 flowRuleService.setRule(appId,
299 deviceId,
300 sBuilder.build(),
301 tBuilder.build(),
302 PRIORITY_ACL_INGRESS_RULE,
303 TENANT_ACL_RECIRC_TABLE,
304 install);
305 }
306
307 private void initializeEgressTable(DeviceId deviceId, boolean install) {
308 if (install) {
309 flowRuleService.setUpTableMissEntry(deviceId, TENANT_ACL_EGRESS_TABLE);
310 } else {
311 flowRuleService.connectTables(deviceId, TENANT_ACL_EGRESS_TABLE, TENANT_FORWARDING_TABLE);
312 }
313 }
314
315 private void initializeIngressTable(DeviceId deviceId, boolean install) {
316 if (install) {
317 flowRuleService.setUpTableMissEntry(deviceId, TENANT_ACL_INGRESS_TABLE);
318 } else {
319 flowRuleService.connectTables(deviceId, TENANT_ACL_INGRESS_TABLE, TENANT_FORWARDING_TABLE);
320 }
321 }
322
323 private void updateSecurityGroupRule(KubevirtPort port,
324 KubevirtSecurityGroupRule sgRule, boolean install) {
325
326 if (port == null || sgRule == null) {
327 return;
328 }
329
330 if (sgRule.remoteGroupId() != null && !sgRule.remoteGroupId().isEmpty()) {
331 getRemotePorts(port, sgRule.remoteGroupId())
332 .forEach(rPort -> {
333 populateSecurityGroupRule(sgRule, port,
334 rPort.ipAddress().toIpPrefix(), install);
335 populateSecurityGroupRule(sgRule, rPort,
336 port.ipAddress().toIpPrefix(), install);
337
338 KubevirtSecurityGroupRule rSgRule = sgRule.updateDirection(
339 sgRule.direction().equalsIgnoreCase(EGRESS) ? INGRESS : EGRESS);
340 populateSecurityGroupRule(rSgRule, port,
341 rPort.ipAddress().toIpPrefix(), install);
342 populateSecurityGroupRule(rSgRule, rPort,
343 port.ipAddress().toIpPrefix(), install);
344 });
345 } else {
346 populateSecurityGroupRule(sgRule, port,
347 sgRule.remoteIpPrefix() == null ? IP_PREFIX_ANY :
348 sgRule.remoteIpPrefix(), install);
349 }
350 }
351
352 private boolean checkProtocol(String protocol) {
353 if (protocol == null) {
354 log.debug("No protocol was specified, use default IP(v4/v6) protocol.");
355 return true;
356 } else {
357 String protocolUpper = protocol.toUpperCase();
358 if (protocolUpper.equals(PROTO_TCP) ||
359 protocolUpper.equals(PROTO_UDP) ||
360 protocolUpper.equals(PROTO_ICMP) ||
361 protocolUpper.equals(PROTO_SCTP) ||
362 protocolUpper.equals(PROTO_ANY) ||
363 protocol.equals(PROTO_TCP_NUM) ||
364 protocol.equals(PROTO_UDP_NUM) ||
365 protocol.equals(PROTO_ICMP_NUM) ||
366 protocol.equals(PROTO_SCTP_NUM) ||
367 protocol.equals(PROTO_ANY_NUM)) {
368 return true;
369 } else {
370 log.error("Unsupported protocol {}, we only support " +
371 "TCP/UDP/ICMP/SCTP protocols.", protocol);
372 return false;
373 }
374 }
375 }
376
377 private void populateSecurityGroupRule(KubevirtSecurityGroupRule sgRule,
378 KubevirtPort port,
379 IpPrefix remoteIp,
380 boolean install) {
381 if (!checkProtocol(sgRule.protocol())) {
382 return;
383 }
384
385 DeviceId deviceId = port.isTenant() ? port.tenantDeviceId() : port.deviceId();
386
387 Set<TrafficSelector> selectors = buildSelectors(sgRule,
388 Ip4Address.valueOf(port.ipAddress().toInetAddress()),
389 remoteIp, port.networkId());
390 if (selectors == null || selectors.isEmpty()) {
391 return;
392 }
393
394 // if the device is not available we do not perform any action
395 if (deviceId == null || !deviceService.isAvailable(deviceId)) {
396 return;
397 }
398
399 // XXX All egress traffic needs to go through connection tracking module,
400 // which might hurt its performance.
401 ExtensionTreatment ctTreatment =
402 niciraConnTrackTreatmentBuilder(driverService, deviceId)
403 .commit(true)
404 .build();
405
406 TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
407
408 int aclTable;
409 if (sgRule.direction().equalsIgnoreCase(EGRESS)) {
410 aclTable = TENANT_ACL_EGRESS_TABLE;
411 tBuilder.transition(TENANT_ACL_RECIRC_TABLE);
412 } else {
413 aclTable = TENANT_ACL_INGRESS_TABLE;
414 tBuilder.extension(ctTreatment, deviceId)
415 .transition(TENANT_FORWARDING_TABLE);
416 }
417
418 int finalAclTable = aclTable;
419 selectors.forEach(selector -> {
420 flowRuleService.setRule(appId,
421 deviceId,
422 selector, tBuilder.build(),
423 PRIORITY_ACL_RULE,
424 finalAclTable,
425 install);
426 });
427 }
428
429 /**
430 * Sets connection tracking rule using OVS extension commands.
431 * It is not so graceful, but I don't want to make it more general because
432 * it is going to be used only here.
433 * The following is the usage of the function.
434 *
435 * @param deviceId Device ID
436 * @param ctState ctState: please use RulePopulatorUtil.computeCtStateFlag()
437 * to build the value
438 * @param ctMask crMask: please use RulePopulatorUtil.computeCtMaskFlag()
439 * to build the value
440 * @param commit CT_COMMIT for commit action, CT_NO_COMMIT otherwise
441 * @param recircTable table number for recirculation after CT actions.
442 * CT_NO_RECIRC with no recirculation
443 * @param action Additional actions. ACTION_DROP, ACTION_NONE,
444 * GOTO_XXX_TABLE are supported.
445 * @param priority priority value for the rule
446 * @param install true for insertion, false for removal
447 */
448 private void setConnTrackRule(DeviceId deviceId, long ctState, long ctMask,
449 int commit, short recircTable,
450 int action, int priority, boolean install) {
451
452 ExtensionSelector esCtSate = RulePopulatorUtil
453 .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
454 TrafficSelector selector = DefaultTrafficSelector.builder()
455 .extension(esCtSate, deviceId)
456 .matchEthType(Ethernet.TYPE_IPV4)
457 .build();
458
459 TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
460
461 if (commit == CT_COMMIT || recircTable > 0) {
462 RulePopulatorUtil.NiciraConnTrackTreatmentBuilder natTreatmentBuilder =
463 niciraConnTrackTreatmentBuilder(driverService, deviceId);
464 natTreatmentBuilder.natAction(false);
465 natTreatmentBuilder.commit(commit == CT_COMMIT);
466 if (recircTable > 0) {
467 natTreatmentBuilder.table(recircTable);
468 }
469 tb.extension(natTreatmentBuilder.build(), deviceId);
470 } else if (action == ACTION_DROP) {
471 tb.drop();
472 }
473
474 if (action != ACTION_NONE && action != ACTION_DROP) {
475 tb.transition(action);
476 }
477
478 int tableType = ERROR_TABLE;
479 if (priority == PRIORITY_CT_RULE || priority == PRIORITY_CT_DROP_RULE) {
480 tableType = TENANT_ACL_CT_TABLE;
481 } else if (priority == PRIORITY_CT_HOOK_RULE) {
482 tableType = TENANT_ACL_INGRESS_TABLE;
483 } else {
484 log.error("Cannot an appropriate table for the conn track rule.");
485 }
486
487 flowRuleService.setRule(
488 appId,
489 deviceId,
490 selector,
491 tb.build(),
492 priority,
493 tableType,
494 install);
495 }
496
497 /**
498 * Returns a set of host IP addresses engaged with supplied security group ID.
499 * It only searches a VM in the same tenant boundary.
500 *
501 * @param srcPort edgestack port
502 * @param sgId security group id
503 * @return set of ip addresses
504 */
505 private Set<KubevirtPort> getRemotePorts(KubevirtPort srcPort, String sgId) {
506 return portService.ports().stream()
507 .filter(port -> !port.macAddress().equals(srcPort.macAddress()))
508 .filter(port -> port.securityGroups().contains(sgId))
509 .filter(port -> port.ipAddress() != null)
510 .collect(Collectors.toSet());
511 }
512
513 private Set<TrafficSelector> buildSelectors(KubevirtSecurityGroupRule sgRule,
514 Ip4Address vmIp,
515 IpPrefix remoteIp,
516 String netId) {
517 if (remoteIp != null && remoteIp.equals(IpPrefix.valueOf(vmIp, VM_IP_PREFIX))) {
518 // do nothing if the remote IP is my IP
519 return null;
520 }
521
522 Set<TrafficSelector> selectorSet = Sets.newHashSet();
523
524 if (sgRule.portRangeMax() != null && sgRule.portRangeMin() != null &&
525 sgRule.portRangeMin() < sgRule.portRangeMax()) {
526 Map<TpPort, TpPort> portRangeMatchMap =
527 buildPortRangeMatches(sgRule.portRangeMin(),
528 sgRule.portRangeMax());
529 portRangeMatchMap.forEach((key, value) -> {
530
531 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
532 buildMatches(sBuilder, sgRule, vmIp, remoteIp, netId);
533
534 if (sgRule.protocol().equalsIgnoreCase(PROTO_TCP) ||
535 sgRule.protocol().equals(PROTO_TCP_NUM)) {
536 if (sgRule.direction().equalsIgnoreCase(EGRESS)) {
537 if (value.toInt() == TpPort.MAX_PORT) {
538 sBuilder.matchTcpSrc(key);
539 } else {
540 sBuilder.matchTcpSrcMasked(key, value);
541 }
542 } else {
543 if (value.toInt() == TpPort.MAX_PORT) {
544 sBuilder.matchTcpDst(key);
545 } else {
546 sBuilder.matchTcpDstMasked(key, value);
547 }
548 }
549 } else if (sgRule.protocol().equalsIgnoreCase(PROTO_UDP) ||
550 sgRule.protocol().equals(PROTO_UDP_NUM)) {
551 if (sgRule.direction().equalsIgnoreCase(EGRESS)) {
552 if (value.toInt() == TpPort.MAX_PORT) {
553 sBuilder.matchUdpSrc(key);
554 } else {
555 sBuilder.matchUdpSrcMasked(key, value);
556 }
557 } else {
558 if (value.toInt() == TpPort.MAX_PORT) {
559 sBuilder.matchUdpDst(key);
560 } else {
561 sBuilder.matchUdpDstMasked(key, value);
562 }
563 }
564 } else if (sgRule.protocol().equalsIgnoreCase(PROTO_SCTP) ||
565 sgRule.protocol().equals(PROTO_SCTP_NUM)) {
566 if (sgRule.direction().equalsIgnoreCase(EGRESS)) {
567 if (value.toInt() == TpPort.MAX_PORT) {
568 sBuilder.matchSctpSrc(key);
569 } else {
570 sBuilder.matchSctpSrcMasked(key, value);
571 }
572 } else {
573 if (value.toInt() == TpPort.MAX_PORT) {
574 sBuilder.matchSctpDst(key);
575 } else {
576 sBuilder.matchSctpDstMasked(key, value);
577 }
578 }
579 }
580
581 selectorSet.add(sBuilder.build());
582 });
583 } else {
584
585 TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
586 buildMatches(sBuilder, sgRule, vmIp, remoteIp, netId);
587
588 selectorSet.add(sBuilder.build());
589 }
590
591 return selectorSet;
592 }
593
594 private void buildMatches(TrafficSelector.Builder sBuilder,
595 KubevirtSecurityGroupRule sgRule, Ip4Address vmIp,
596 IpPrefix remoteIp, String netId) {
597 buildTunnelId(sBuilder, netId);
598 buildMatchEthType(sBuilder, sgRule.etherType());
599 buildMatchDirection(sBuilder, sgRule.direction(), vmIp);
600 buildMatchProto(sBuilder, sgRule.protocol());
601 buildMatchPort(sBuilder, sgRule.protocol(), sgRule.direction(),
602 sgRule.portRangeMin() == null ? 0 : sgRule.portRangeMin(),
603 sgRule.portRangeMax() == null ? 0 : sgRule.portRangeMax());
604 buildMatchIcmp(sBuilder, sgRule.protocol(),
605 sgRule.portRangeMin(), sgRule.portRangeMax());
606 buildMatchRemoteIp(sBuilder, remoteIp, sgRule.direction());
607 }
608
609 private void buildTunnelId(TrafficSelector.Builder sBuilder, String netId) {
610 KubevirtNetwork network = networkService.network(netId);
611
612 if (network == null) {
613 log.warn("Network {} not found!", netId);
614 return;
615 }
616
617 String segId = network.segmentId();
618 Type netType = network.type();
619
620 if (netType == VLAN) {
621 sBuilder.matchVlanId(VlanId.vlanId(segId));
622 } else if (netType == VXLAN || netType == GRE || netType == GENEVE) {
623 // sBuilder.matchTunnelId(Long.parseLong(segId));
624 log.trace("{} typed match rules are installed for security group", netType);
625 } else {
626 log.debug("Cannot tag the VID as it is unsupported vnet type {}", netType);
627 }
628
629
630 }
631
632 private void buildMatchDirection(TrafficSelector.Builder sBuilder,
633 String direction,
634 Ip4Address vmIp) {
635 if (direction.equalsIgnoreCase(EGRESS)) {
636 sBuilder.matchIPSrc(IpPrefix.valueOf(vmIp, VM_IP_PREFIX));
637 } else {
638 sBuilder.matchIPDst(IpPrefix.valueOf(vmIp, VM_IP_PREFIX));
639 }
640 }
641
642 private void buildMatchEthType(TrafficSelector.Builder sBuilder, String etherType) {
643 // Either IpSrc or IpDst (or both) is set by default, and we need to set EthType as IPv4.
644 sBuilder.matchEthType(Ethernet.TYPE_IPV4);
645 if (etherType != null && !Objects.equals(etherType, STR_NULL) &&
646 !etherType.equalsIgnoreCase(ETHTYPE_IPV4)) {
647 log.debug("EthType {} is not supported yet in Security Group", etherType);
648 }
649 }
650
651 private void buildMatchRemoteIp(TrafficSelector.Builder sBuilder,
652 IpPrefix remoteIpPrefix, String direction) {
653 if (remoteIpPrefix != null &&
654 !remoteIpPrefix.getIp4Prefix().equals(IP_PREFIX_ANY)) {
655 if (direction.equalsIgnoreCase(EGRESS)) {
656 sBuilder.matchIPDst(remoteIpPrefix);
657 } else {
658 sBuilder.matchIPSrc(remoteIpPrefix);
659 }
660 }
661 }
662
663 private void buildMatchProto(TrafficSelector.Builder sBuilder, String protocol) {
664 if (protocol != null) {
665 switch (protocol.toUpperCase()) {
666 case PROTO_ICMP:
667 case PROTO_ICMP_NUM:
668 sBuilder.matchIPProtocol(IPv4.PROTOCOL_ICMP);
669 break;
670 case PROTO_TCP:
671 case PROTO_TCP_NUM:
672 sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP);
673 break;
674 case PROTO_UDP:
675 case PROTO_UDP_NUM:
676 sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP);
677 break;
678 case PROTO_SCTP:
679 case PROTO_SCTP_NUM:
680 sBuilder.matchIPProtocol(PROTOCOL_SCTP);
681 break;
682 default:
683 break;
684 }
685 }
686 }
687
688 private void buildMatchPort(TrafficSelector.Builder sBuilder,
689 String protocol, String direction,
690 int portMin, int portMax) {
691 if (portMax > 0 && portMin == portMax) {
692 if (protocol.equalsIgnoreCase(PROTO_TCP) ||
693 protocol.equals(PROTO_TCP_NUM)) {
694 if (direction.equalsIgnoreCase(EGRESS)) {
695 sBuilder.matchTcpSrc(TpPort.tpPort(portMax));
696 } else {
697 sBuilder.matchTcpDst(TpPort.tpPort(portMax));
698 }
699 } else if (protocol.equalsIgnoreCase(PROTO_UDP) ||
700 protocol.equals(PROTO_UDP_NUM)) {
701 if (direction.equalsIgnoreCase(EGRESS)) {
702 sBuilder.matchUdpSrc(TpPort.tpPort(portMax));
703 } else {
704 sBuilder.matchUdpDst(TpPort.tpPort(portMax));
705 }
706 } else if (protocol.equalsIgnoreCase(PROTO_SCTP) ||
707 protocol.equals(PROTO_SCTP_NUM)) {
708 if (direction.equalsIgnoreCase(EGRESS)) {
709 sBuilder.matchSctpSrc(TpPort.tpPort(portMax));
710 } else {
711 sBuilder.matchSctpDst(TpPort.tpPort(portMax));
712 }
713 }
714 }
715 }
716
717 private void buildMatchIcmp(TrafficSelector.Builder sBuilder,
718 String protocol, Integer icmpCode, Integer icmpType) {
719 if (protocol != null) {
720 if (protocol.equalsIgnoreCase(PROTO_ICMP) ||
721 protocol.equals(PROTO_ICMP_NUM)) {
722 if (icmpCode != null && icmpCode >= ICMP_CODE_MIN &&
723 icmpCode <= ICMP_CODE_MAX) {
724 sBuilder.matchIcmpCode(icmpCode.byteValue());
725 }
726 if (icmpType != null && icmpType >= ICMP_TYPE_MIN &&
727 icmpType <= ICMP_TYPE_MAX) {
728 sBuilder.matchIcmpType(icmpType.byteValue());
729 }
730 }
731 }
732 }
733
734 private void resetSecurityGroupRules() {
735
736 if (getUseSecurityGroupFlag()) {
737 nodeService.completeNodes(WORKER).forEach(node -> {
738 initializeEgressTable(node.intgBridge(), true);
739 initializeConnTrackTable(node.intgBridge(), true);
740 initializeAclTable(node.intgBridge(), true);
741 initializeIngressTable(node.intgBridge(), true);
742
743 for (KubevirtNetwork network : networkService.tenantNetworks()) {
744 initializeEgressTable(network.tenantDeviceId(node.hostname()), true);
745 initializeIngressTable(network.tenantDeviceId(node.hostname()), true);
746 initializeConnTrackTable(network.tenantDeviceId(node.hostname()), true);
747 initializeAclTable(network.tenantDeviceId(node.hostname()), true);
748 }
749 });
750
751 securityGroupService.securityGroups().forEach(securityGroup ->
752 securityGroup.rules().forEach(this::securityGroupRuleAdded));
753 } else {
754 nodeService.completeNodes(WORKER).forEach(node -> {
755 initializeEgressTable(node.intgBridge(), false);
756 initializeConnTrackTable(node.intgBridge(), false);
757 initializeAclTable(node.intgBridge(), false);
758 initializeIngressTable(node.intgBridge(), false);
759
760 for (KubevirtNetwork network : networkService.tenantNetworks()) {
761 initializeEgressTable(network.tenantDeviceId(node.hostname()), false);
762 initializeIngressTable(network.tenantDeviceId(node.hostname()), false);
763 initializeConnTrackTable(network.tenantDeviceId(node.hostname()), false);
764 initializeAclTable(network.tenantDeviceId(node.hostname()), false);
765 }
766 });
767
768 securityGroupService.securityGroups().forEach(securityGroup ->
769 securityGroup.rules().forEach(this::securityGroupRuleRemoved));
770 }
771
772 log.info("Reset security group info " +
773 (getUseSecurityGroupFlag() ? "with" : "without") + " Security Group");
774 }
775
776 private void securityGroupRuleAdded(KubevirtSecurityGroupRule sgRule) {
777 portService.ports().stream()
778 .filter(port -> port.securityGroups().contains(sgRule.securityGroupId()))
779 .forEach(port -> {
780 updateSecurityGroupRule(port, sgRule, true);
781 log.info("Applied security group rule {} to port {}",
782 sgRule.id(), port.macAddress());
783 });
784 }
785
786 private void securityGroupRuleRemoved(KubevirtSecurityGroupRule sgRule) {
787 portService.ports().stream()
788 .filter(port -> port.securityGroups().contains(sgRule.securityGroupId()))
789 .forEach(port -> {
790 updateSecurityGroupRule(port, sgRule, false);
791 log.info("Removed security group rule {} from port {}",
792 sgRule.id(), port.macAddress());
793 });
794 }
795
796 private class InternalKubevirtPortListener implements KubevirtPortListener {
797
798 @Override
799 public boolean isRelevant(KubevirtPortEvent event) {
800 return getUseSecurityGroupFlag();
801 }
802
803 private boolean isRelevantHelper(KubevirtPortEvent event) {
804 DeviceId deviceId = event.subject().deviceId();
805
806 if (deviceId == null) {
807 return false;
808 }
809
810 return mastershipService.isLocalMaster(deviceId);
811 }
812
813 @Override
814 public void event(KubevirtPortEvent event) {
815 log.debug("security group event received {}", event);
816
817 switch (event.type()) {
818 case KUBEVIRT_PORT_SECURITY_GROUP_ADDED:
819 eventExecutor.execute(() -> processPortSgAdd(event));
820 break;
821 case KUBEVIRT_PORT_SECURITY_GROUP_REMOVED:
822 eventExecutor.execute(() -> processPortSgRemove(event));
823 break;
824 case KUBEVIRT_PORT_REMOVED:
825 eventExecutor.execute(() -> processPortRemove(event));
826 break;
827 case KUBEVIRT_PORT_DEVICE_ADDED:
828 eventExecutor.execute(() -> processPortDeviceAdded(event));
829 break;
830 default:
831 // do nothing for the other events
832 break;
833 }
834 }
835
836 private void processPortSgAdd(KubevirtPortEvent event) {
837 if (!isRelevantHelper(event)) {
838 return;
839 }
840
841 if (event.securityGroupId() == null ||
842 securityGroupService.securityGroup(event.securityGroupId()) == null) {
843 return;
844 }
845
846 KubevirtPort port = event.subject();
847 KubevirtSecurityGroup sg = securityGroupService.securityGroup(event.securityGroupId());
848
849 sg.rules().forEach(sgRule -> {
850 updateSecurityGroupRule(port, sgRule, true);
851 });
852 log.info("Added security group {} to port {}",
853 event.securityGroupId(), event.subject().macAddress());
854 }
855
856 private void processPortSgRemove(KubevirtPortEvent event) {
857 if (!isRelevantHelper(event)) {
858 return;
859 }
860
861 if (event.securityGroupId() == null ||
862 securityGroupService.securityGroup(event.securityGroupId()) == null) {
863 return;
864 }
865
866 KubevirtPort port = event.subject();
867 KubevirtSecurityGroup sg = securityGroupService.securityGroup(event.securityGroupId());
868
869 sg.rules().forEach(sgRule -> {
870 updateSecurityGroupRule(port, sgRule, false);
871 });
872 log.info("Removed security group {} from port {}",
873 event.securityGroupId(), event.subject().macAddress());
874 }
875
876 private void processPortRemove(KubevirtPortEvent event) {
877 if (!isRelevantHelper(event)) {
878 return;
879 }
880
881 KubevirtPort port = event.subject();
882 for (String sgStr : port.securityGroups()) {
883 KubevirtSecurityGroup sg = securityGroupService.securityGroup(sgStr);
884 sg.rules().forEach(sgRule -> {
885 updateSecurityGroupRule(port, sgRule, false);
886 });
887 log.info("Removed security group {} from port {}",
888 sgStr, event.subject().macAddress());
889 }
890 }
891
892 private void processPortDeviceAdded(KubevirtPortEvent event) {
893 if (!isRelevantHelper(event)) {
894 return;
895 }
896
897 for (String sgId : event.subject().securityGroups()) {
898 KubevirtSecurityGroup sg = securityGroupService.securityGroup(sgId);
899
900 sg.rules().forEach(sgRule -> {
901 updateSecurityGroupRule(event.subject(), sgRule, true);
902 });
903 log.info("Added security group {} to port {}",
904 event.securityGroupId(), event.subject().macAddress());
905 }
906 }
907 }
908
909 private class InternalNetworkListener implements KubevirtNetworkListener {
910
911 private boolean isRelevantHelper() {
912 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
913 }
914
915 @Override
916 public void event(KubevirtNetworkEvent event) {
917 switch (event.type()) {
918 case KUBEVIRT_NETWORK_CREATED:
919 eventExecutor.execute(() -> processNetworkCreation(event.subject()));
920 break;
921 case KUBEVIRT_NETWORK_REMOVED:
922 case KUBEVIRT_NETWORK_UPDATED:
923 default:
924 // do thing
925 break;
926 }
927 }
928
929 private void processNetworkCreation(KubevirtNetwork network) {
930 if (!isRelevantHelper()) {
931 return;
932 }
933
934 Set<KubevirtNode> nodes = nodeService.completeNodes(WORKER);
935
936 if (nodes.size() > 0) {
937 // now we wait 5s for all tenant bridges are created,
938 // FIXME: we need to fina a better way to wait all tenant bridges
939 // are created before installing default security group rules
940 try {
941 sleep(SLEEP_MS);
942 } catch (InterruptedException e) {
943 log.error("Failed to install security group default rules.");
944 }
945
946 for (KubevirtNode node : nodes) {
947 initializeEgressTable(network.tenantDeviceId(node.hostname()), true);
948 initializeIngressTable(network.tenantDeviceId(node.hostname()), true);
949 initializeConnTrackTable(network.tenantDeviceId(node.hostname()), true);
950 initializeAclTable(network.tenantDeviceId(node.hostname()), true);
951 }
952 }
953 }
954 }
955
956 private class InternalSecurityGroupListener implements KubevirtSecurityGroupListener {
957
958 @Override
959 public boolean isRelevant(KubevirtSecurityGroupEvent event) {
960 return getUseSecurityGroupFlag();
961 }
962
963 private boolean isRelevantHelper() {
964 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
965 }
966
967 @Override
968 public void event(KubevirtSecurityGroupEvent event) {
969 switch (event.type()) {
970 case KUBEVIRT_SECURITY_GROUP_RULE_CREATED:
971 eventExecutor.execute(() -> processSgRuleCreate(event));
972 break;
973 case KUBEVIRT_SECURITY_GROUP_RULE_REMOVED:
974 eventExecutor.execute(() -> processSgRuleRemove(event));
975 break;
976 default:
977 // do nothing
978 break;
979 }
980 }
981
982 private void processSgRuleCreate(KubevirtSecurityGroupEvent event) {
983 if (!isRelevantHelper()) {
984 return;
985 }
986
987 KubevirtSecurityGroupRule sgRuleToAdd = event.rule();
988 securityGroupRuleAdded(sgRuleToAdd);
989 log.info("Applied new security group rule {} to ports", sgRuleToAdd.id());
990 }
991
992 private void processSgRuleRemove(KubevirtSecurityGroupEvent event) {
993 if (!isRelevantHelper()) {
994 return;
995 }
996
997 KubevirtSecurityGroupRule sgRuleToRemove = event.rule();
998 securityGroupRuleRemoved(sgRuleToRemove);
999 log.info("Removed security group rule {} from ports", sgRuleToRemove.id());
1000 }
1001 }
1002
1003 private class InternalNodeListener implements KubevirtNodeListener {
1004
1005 @Override
1006 public boolean isRelevant(KubevirtNodeEvent event) {
1007 return event.subject().type() == WORKER;
1008 }
1009
1010 private boolean isRelevantHelper() {
1011 return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
1012 }
1013
1014 @Override
1015 public void event(KubevirtNodeEvent event) {
1016 switch (event.type()) {
1017 case KUBEVIRT_NODE_COMPLETE:
1018 eventExecutor.execute(() -> processNodeComplete(event.subject()));
1019 break;
1020 default:
1021 break;
1022 }
1023 }
1024
1025 private void processNodeComplete(KubevirtNode node) {
1026 if (!isRelevantHelper()) {
1027 return;
1028 }
1029
1030 resetSecurityGroupRulesByNode(node);
1031 }
1032
1033 private void resetSecurityGroupRulesByNode(KubevirtNode node) {
1034 if (getUseSecurityGroupFlag()) {
1035 initializeEgressTable(node.intgBridge(), true);
1036 initializeConnTrackTable(node.intgBridge(), true);
1037 initializeAclTable(node.intgBridge(), true);
1038 initializeIngressTable(node.intgBridge(), true);
1039
1040 for (KubevirtNetwork network : networkService.tenantNetworks()) {
1041 initializeEgressTable(network.tenantDeviceId(node.hostname()), true);
1042 initializeIngressTable(network.tenantDeviceId(node.hostname()), true);
1043 initializeConnTrackTable(network.tenantDeviceId(node.hostname()), true);
1044 initializeAclTable(network.tenantDeviceId(node.hostname()), true);
1045 }
1046
1047 securityGroupService.securityGroups().forEach(securityGroup ->
1048 securityGroup.rules().forEach(
1049 KubevirtSecurityGroupHandler.this::securityGroupRuleAdded));
1050 } else {
1051 initializeEgressTable(node.intgBridge(), false);
1052 initializeConnTrackTable(node.intgBridge(), false);
1053 initializeAclTable(node.intgBridge(), false);
1054 initializeIngressTable(node.intgBridge(), false);
1055
1056 for (KubevirtNetwork network : networkService.tenantNetworks()) {
1057 initializeEgressTable(network.tenantDeviceId(node.hostname()), false);
1058 initializeIngressTable(network.tenantDeviceId(node.hostname()), false);
1059 initializeConnTrackTable(network.tenantDeviceId(node.hostname()), false);
1060 initializeAclTable(network.tenantDeviceId(node.hostname()), false);
1061 }
1062
1063 securityGroupService.securityGroups().forEach(securityGroup ->
1064 securityGroup.rules().forEach(
1065 KubevirtSecurityGroupHandler.this::securityGroupRuleRemoved));
1066 }
1067
1068 log.info("Reset security group info " +
1069 (getUseSecurityGroupFlag() ? "with" : "without") + " Security Group");
1070 }
1071 }
1072}