Initial implementation of security group for kubevirt tenant network

Change-Id: If49d03021408a134be01267cc4eee9e0091e3c3d
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
new file mode 100644
index 0000000..77b60d9
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtSecurityGroupHandler.java
@@ -0,0 +1,1072 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+import com.google.common.collect.Sets;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.Ip4Address;
+import org.onlab.packet.Ip4Prefix;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.TpPort;
+import org.onlab.packet.VlanId;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cfg.ConfigProperty;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkEvent;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkListener;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
+import org.onosproject.kubevirtnetworking.api.KubevirtPort;
+import org.onosproject.kubevirtnetworking.api.KubevirtPortEvent;
+import org.onosproject.kubevirtnetworking.api.KubevirtPortListener;
+import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
+import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroup;
+import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupEvent;
+import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupListener;
+import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupRule;
+import org.onosproject.kubevirtnetworking.api.KubevirtSecurityGroupService;
+import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeEvent;
+import org.onosproject.kubevirtnode.api.KubevirtNodeListener;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.ExtensionSelector;
+import org.onosproject.net.flow.instructions.ExtensionTreatment;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Dictionary;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static java.lang.Thread.sleep;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.ERROR_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ACL_INGRESS_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ACL_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_CT_DROP_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_CT_HOOK_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_CT_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_ACL_CT_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_ACL_EGRESS_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_ACL_INGRESS_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_ACL_RECIRC_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TENANT_FORWARDING_TABLE;
+import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GENEVE;
+import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GRE;
+import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VLAN;
+import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VXLAN;
+import static org.onosproject.kubevirtnetworking.impl.OsgiPropertyConstants.USE_SECURITY_GROUP;
+import static org.onosproject.kubevirtnetworking.impl.OsgiPropertyConstants.USE_SECURITY_GROUP_DEFAULT;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getPropertyValueAsBoolean;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildPortRangeMatches;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.computeCtMaskFlag;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.computeCtStateFlag;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.niciraConnTrackTreatmentBuilder;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.WORKER;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Populates flow rules to handle EdgeStack SecurityGroups.
+ */
+@Component(
+        immediate = true,
+        property = {
+                USE_SECURITY_GROUP + ":Boolean=" + USE_SECURITY_GROUP_DEFAULT
+        }
+)
+public class KubevirtSecurityGroupHandler {
+
+    private final Logger log = getLogger(getClass());
+
+    private static final int VM_IP_PREFIX = 32;
+
+    private static final String STR_NULL = "null";
+    private static final String PROTO_ICMP = "ICMP";
+    private static final String PROTO_ICMP_NUM = "1";
+    private static final String PROTO_TCP = "TCP";
+    private static final String PROTO_TCP_NUM = "6";
+    private static final String PROTO_UDP = "UDP";
+    private static final String PROTO_UDP_NUM = "17";
+    private static final String PROTO_SCTP = "SCTP";
+    private static final String PROTO_SCTP_NUM = "132";
+    private static final byte PROTOCOL_SCTP = (byte) 0x84;
+    private static final String PROTO_ANY = "ANY";
+    private static final String PROTO_ANY_NUM = "0";
+    private static final String ETHTYPE_IPV4 = "IPV4";
+    private static final String EGRESS = "EGRESS";
+    private static final String INGRESS = "INGRESS";
+    private static final IpPrefix IP_PREFIX_ANY = Ip4Prefix.valueOf("0.0.0.0/0");
+
+    private static final int ICMP_CODE_MIN = 0;
+    private static final int ICMP_CODE_MAX = 255;
+    private static final int ICMP_TYPE_MIN = 0;
+    private static final int ICMP_TYPE_MAX = 255;
+
+    private static final int CT_COMMIT = 0;
+    private static final int CT_NO_COMMIT = 1;
+    private static final short CT_NO_RECIRC = -1;
+
+    private static final int ACTION_NONE = 0;
+    private static final int ACTION_DROP = -1;
+
+    private static final long SLEEP_MS = 5000;
+
+    /** Apply EdgeStack security group rule for VM traffic. */
+    private boolean useSecurityGroup = USE_SECURITY_GROUP_DEFAULT;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DriverService driverService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService configService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtNodeService nodeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtNetworkService networkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtPortService portService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtFlowRuleService flowRuleService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtSecurityGroupService securityGroupService;
+
+    private final KubevirtPortListener portListener =
+            new InternalKubevirtPortListener();
+    private final KubevirtSecurityGroupListener securityGroupListener =
+            new InternalSecurityGroupListener();
+    private final KubevirtNodeListener nodeListener =
+            new InternalNodeListener();
+    private final KubevirtNetworkListener networkListener =
+            new InternalNetworkListener();
+
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
+    private ApplicationId appId;
+    private NodeId localNodeId;
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+        localNodeId = clusterService.getLocalNode().id();
+        securityGroupService.addListener(securityGroupListener);
+        portService.addListener(portListener);
+        networkService.addListener(networkListener);
+        configService.registerProperties(getClass());
+        nodeService.addListener(nodeListener);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        securityGroupService.removeListener(securityGroupListener);
+        portService.removeListener(portListener);
+        configService.unregisterProperties(getClass(), false);
+        nodeService.removeListener(nodeListener);
+        networkService.removeListener(networkListener);
+        eventExecutor.shutdown();
+
+        log.info("Stopped");
+    }
+
+    @Modified
+    protected void modified(ComponentContext context) {
+        Dictionary<?, ?> properties = context.getProperties();
+        Boolean flag;
+
+        flag = Tools.isPropertyEnabled(properties, USE_SECURITY_GROUP);
+        if (flag == null) {
+            log.info("useSecurityGroup is not configured, " +
+                    "using current value of {}", useSecurityGroup);
+        } else {
+            useSecurityGroup = flag;
+            log.info("Configured. useSecurityGroup is {}",
+                    useSecurityGroup ? "enabled" : "disabled");
+        }
+
+        securityGroupService.setSecurityGroupEnabled(useSecurityGroup);
+        resetSecurityGroupRules();
+    }
+
+    private boolean getUseSecurityGroupFlag() {
+        Set<ConfigProperty> properties =
+                configService.getProperties(getClass().getName());
+        return getPropertyValueAsBoolean(properties, USE_SECURITY_GROUP);
+    }
+
+    private void initializeConnTrackTable(DeviceId deviceId, boolean install) {
+
+        // table={ACL_INGRESS_TABLE(44)},ip,ct_state=-trk, actions=ct(table:{ACL_CT_TABLE(45)})
+        long ctState = computeCtStateFlag(false, false, false);
+        long ctMask = computeCtMaskFlag(true, false, false);
+        setConnTrackRule(deviceId, ctState, ctMask, CT_NO_COMMIT, (short) TENANT_ACL_CT_TABLE,
+                ACTION_NONE, PRIORITY_CT_HOOK_RULE, install);
+
+        //table={ACL_CT_TABLE(45)},ip,nw_dst=10.10.0.2,ct_state=+trk+est,action=goto_table:{NORMAL_TABLE(80)}
+        ctState = computeCtStateFlag(true, false, true);
+        ctMask = computeCtMaskFlag(true, false, true);
+        setConnTrackRule(deviceId, ctState, ctMask, CT_NO_COMMIT, CT_NO_RECIRC,
+                TENANT_FORWARDING_TABLE, PRIORITY_CT_RULE, install);
+
+        //table={ACL_CT_TABLE(45)},ip,nw_dst=10.10.0.2,ct_state=+trk+new,action=drop
+        ctState = computeCtStateFlag(true, true, false);
+        ctMask = computeCtMaskFlag(true, true, false);
+        setConnTrackRule(deviceId, ctState, ctMask, CT_NO_COMMIT, CT_NO_RECIRC,
+                ACTION_DROP, PRIORITY_CT_DROP_RULE, install);
+    }
+
+    private void initializeAclTable(DeviceId deviceId, boolean install) {
+
+        ExtensionTreatment ctTreatment =
+                niciraConnTrackTreatmentBuilder(driverService, deviceId)
+                        .commit(true)
+                        .build();
+
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+        sBuilder.matchEthType(Ethernet.TYPE_IPV4);
+
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+        tBuilder.extension(ctTreatment, deviceId)
+                .transition(TENANT_FORWARDING_TABLE);
+
+        flowRuleService.setRule(appId,
+                deviceId,
+                sBuilder.build(),
+                tBuilder.build(),
+                PRIORITY_ACL_INGRESS_RULE,
+                TENANT_ACL_RECIRC_TABLE,
+                install);
+    }
+
+    private void initializeEgressTable(DeviceId deviceId, boolean install) {
+        if (install) {
+            flowRuleService.setUpTableMissEntry(deviceId, TENANT_ACL_EGRESS_TABLE);
+        } else {
+            flowRuleService.connectTables(deviceId, TENANT_ACL_EGRESS_TABLE, TENANT_FORWARDING_TABLE);
+        }
+    }
+
+    private void initializeIngressTable(DeviceId deviceId, boolean install) {
+        if (install) {
+            flowRuleService.setUpTableMissEntry(deviceId, TENANT_ACL_INGRESS_TABLE);
+        } else {
+            flowRuleService.connectTables(deviceId, TENANT_ACL_INGRESS_TABLE, TENANT_FORWARDING_TABLE);
+        }
+    }
+
+    private void updateSecurityGroupRule(KubevirtPort port,
+                                         KubevirtSecurityGroupRule sgRule, boolean install) {
+
+        if (port == null || sgRule == null) {
+            return;
+        }
+
+        if (sgRule.remoteGroupId() != null && !sgRule.remoteGroupId().isEmpty()) {
+            getRemotePorts(port, sgRule.remoteGroupId())
+                    .forEach(rPort -> {
+                        populateSecurityGroupRule(sgRule, port,
+                                rPort.ipAddress().toIpPrefix(), install);
+                        populateSecurityGroupRule(sgRule, rPort,
+                                port.ipAddress().toIpPrefix(), install);
+
+                        KubevirtSecurityGroupRule rSgRule = sgRule.updateDirection(
+                                sgRule.direction().equalsIgnoreCase(EGRESS) ? INGRESS : EGRESS);
+                        populateSecurityGroupRule(rSgRule, port,
+                                rPort.ipAddress().toIpPrefix(), install);
+                        populateSecurityGroupRule(rSgRule, rPort,
+                                port.ipAddress().toIpPrefix(), install);
+                    });
+        } else {
+            populateSecurityGroupRule(sgRule, port,
+                    sgRule.remoteIpPrefix() == null ? IP_PREFIX_ANY :
+                            sgRule.remoteIpPrefix(), install);
+        }
+    }
+
+    private boolean checkProtocol(String protocol) {
+        if (protocol == null) {
+            log.debug("No protocol was specified, use default IP(v4/v6) protocol.");
+            return true;
+        } else {
+            String protocolUpper = protocol.toUpperCase();
+            if (protocolUpper.equals(PROTO_TCP) ||
+                    protocolUpper.equals(PROTO_UDP) ||
+                    protocolUpper.equals(PROTO_ICMP) ||
+                    protocolUpper.equals(PROTO_SCTP) ||
+                    protocolUpper.equals(PROTO_ANY) ||
+                    protocol.equals(PROTO_TCP_NUM) ||
+                    protocol.equals(PROTO_UDP_NUM) ||
+                    protocol.equals(PROTO_ICMP_NUM) ||
+                    protocol.equals(PROTO_SCTP_NUM) ||
+                    protocol.equals(PROTO_ANY_NUM)) {
+                return true;
+            } else {
+                log.error("Unsupported protocol {}, we only support " +
+                        "TCP/UDP/ICMP/SCTP protocols.", protocol);
+                return false;
+            }
+        }
+    }
+
+    private void populateSecurityGroupRule(KubevirtSecurityGroupRule sgRule,
+                                           KubevirtPort port,
+                                           IpPrefix remoteIp,
+                                           boolean install) {
+        if (!checkProtocol(sgRule.protocol())) {
+            return;
+        }
+
+        DeviceId deviceId = port.isTenant() ? port.tenantDeviceId() : port.deviceId();
+
+        Set<TrafficSelector> selectors = buildSelectors(sgRule,
+                Ip4Address.valueOf(port.ipAddress().toInetAddress()),
+                remoteIp, port.networkId());
+        if (selectors == null || selectors.isEmpty()) {
+            return;
+        }
+
+        // if the device is not available we do not perform any action
+        if (deviceId == null || !deviceService.isAvailable(deviceId)) {
+            return;
+        }
+
+        // XXX All egress traffic needs to go through connection tracking module,
+        // which might hurt its performance.
+        ExtensionTreatment ctTreatment =
+                niciraConnTrackTreatmentBuilder(driverService, deviceId)
+                        .commit(true)
+                        .build();
+
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder();
+
+        int aclTable;
+        if (sgRule.direction().equalsIgnoreCase(EGRESS)) {
+            aclTable = TENANT_ACL_EGRESS_TABLE;
+            tBuilder.transition(TENANT_ACL_RECIRC_TABLE);
+        } else {
+            aclTable = TENANT_ACL_INGRESS_TABLE;
+            tBuilder.extension(ctTreatment, deviceId)
+                    .transition(TENANT_FORWARDING_TABLE);
+        }
+
+        int finalAclTable = aclTable;
+        selectors.forEach(selector -> {
+            flowRuleService.setRule(appId,
+                    deviceId,
+                    selector, tBuilder.build(),
+                    PRIORITY_ACL_RULE,
+                    finalAclTable,
+                    install);
+        });
+    }
+
+    /**
+     * Sets connection tracking rule using OVS extension commands.
+     * It is not so graceful, but I don't want to make it more general because
+     * it is going to be used only here.
+     * The following is the usage of the function.
+     *
+     * @param deviceId Device ID
+     * @param ctState ctState: please use RulePopulatorUtil.computeCtStateFlag()
+     *                to build the value
+     * @param ctMask crMask: please use RulePopulatorUtil.computeCtMaskFlag()
+     *               to build the value
+     * @param commit CT_COMMIT for commit action, CT_NO_COMMIT otherwise
+     * @param recircTable table number for recirculation after CT actions.
+     *                    CT_NO_RECIRC with no recirculation
+     * @param action Additional actions. ACTION_DROP, ACTION_NONE,
+     *               GOTO_XXX_TABLE are supported.
+     * @param priority priority value for the rule
+     * @param install true for insertion, false for removal
+     */
+    private void setConnTrackRule(DeviceId deviceId, long ctState, long ctMask,
+                                  int commit, short recircTable,
+                                  int action, int priority, boolean install) {
+
+        ExtensionSelector esCtSate = RulePopulatorUtil
+                .buildCtExtensionSelector(driverService, deviceId, ctState, ctMask);
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .extension(esCtSate, deviceId)
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .build();
+
+        TrafficTreatment.Builder tb = DefaultTrafficTreatment.builder();
+
+        if (commit == CT_COMMIT || recircTable > 0) {
+            RulePopulatorUtil.NiciraConnTrackTreatmentBuilder natTreatmentBuilder =
+                    niciraConnTrackTreatmentBuilder(driverService, deviceId);
+            natTreatmentBuilder.natAction(false);
+            natTreatmentBuilder.commit(commit == CT_COMMIT);
+            if (recircTable > 0) {
+                natTreatmentBuilder.table(recircTable);
+            }
+            tb.extension(natTreatmentBuilder.build(), deviceId);
+        } else if (action == ACTION_DROP) {
+            tb.drop();
+        }
+
+        if (action != ACTION_NONE && action != ACTION_DROP) {
+            tb.transition(action);
+        }
+
+        int tableType = ERROR_TABLE;
+        if (priority == PRIORITY_CT_RULE || priority == PRIORITY_CT_DROP_RULE) {
+            tableType = TENANT_ACL_CT_TABLE;
+        } else if (priority == PRIORITY_CT_HOOK_RULE) {
+            tableType = TENANT_ACL_INGRESS_TABLE;
+        } else {
+            log.error("Cannot an appropriate table for the conn track rule.");
+        }
+
+        flowRuleService.setRule(
+                appId,
+                deviceId,
+                selector,
+                tb.build(),
+                priority,
+                tableType,
+                install);
+    }
+
+    /**
+     * Returns a set of host IP addresses engaged with supplied security group ID.
+     * It only searches a VM in the same tenant boundary.
+     *
+     * @param srcPort edgestack port
+     * @param sgId security group id
+     * @return set of ip addresses
+     */
+    private Set<KubevirtPort> getRemotePorts(KubevirtPort srcPort, String sgId) {
+        return portService.ports().stream()
+                .filter(port -> !port.macAddress().equals(srcPort.macAddress()))
+                .filter(port -> port.securityGroups().contains(sgId))
+                .filter(port -> port.ipAddress() != null)
+                .collect(Collectors.toSet());
+    }
+
+    private Set<TrafficSelector> buildSelectors(KubevirtSecurityGroupRule sgRule,
+                                                Ip4Address vmIp,
+                                                IpPrefix remoteIp,
+                                                String netId) {
+        if (remoteIp != null && remoteIp.equals(IpPrefix.valueOf(vmIp, VM_IP_PREFIX))) {
+            // do nothing if the remote IP is my IP
+            return null;
+        }
+
+        Set<TrafficSelector> selectorSet = Sets.newHashSet();
+
+        if (sgRule.portRangeMax() != null && sgRule.portRangeMin() != null &&
+                sgRule.portRangeMin() < sgRule.portRangeMax()) {
+            Map<TpPort, TpPort> portRangeMatchMap =
+                    buildPortRangeMatches(sgRule.portRangeMin(),
+                            sgRule.portRangeMax());
+            portRangeMatchMap.forEach((key, value) -> {
+
+                TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+                buildMatches(sBuilder, sgRule, vmIp, remoteIp, netId);
+
+                if (sgRule.protocol().equalsIgnoreCase(PROTO_TCP) ||
+                        sgRule.protocol().equals(PROTO_TCP_NUM)) {
+                    if (sgRule.direction().equalsIgnoreCase(EGRESS)) {
+                        if (value.toInt() == TpPort.MAX_PORT) {
+                            sBuilder.matchTcpSrc(key);
+                        } else {
+                            sBuilder.matchTcpSrcMasked(key, value);
+                        }
+                    } else {
+                        if (value.toInt() == TpPort.MAX_PORT) {
+                            sBuilder.matchTcpDst(key);
+                        } else {
+                            sBuilder.matchTcpDstMasked(key, value);
+                        }
+                    }
+                } else if (sgRule.protocol().equalsIgnoreCase(PROTO_UDP) ||
+                        sgRule.protocol().equals(PROTO_UDP_NUM)) {
+                    if (sgRule.direction().equalsIgnoreCase(EGRESS)) {
+                        if (value.toInt() == TpPort.MAX_PORT) {
+                            sBuilder.matchUdpSrc(key);
+                        } else {
+                            sBuilder.matchUdpSrcMasked(key, value);
+                        }
+                    } else {
+                        if (value.toInt() == TpPort.MAX_PORT) {
+                            sBuilder.matchUdpDst(key);
+                        } else {
+                            sBuilder.matchUdpDstMasked(key, value);
+                        }
+                    }
+                } else if (sgRule.protocol().equalsIgnoreCase(PROTO_SCTP) ||
+                        sgRule.protocol().equals(PROTO_SCTP_NUM)) {
+                    if (sgRule.direction().equalsIgnoreCase(EGRESS)) {
+                        if (value.toInt() == TpPort.MAX_PORT) {
+                            sBuilder.matchSctpSrc(key);
+                        } else {
+                            sBuilder.matchSctpSrcMasked(key, value);
+                        }
+                    } else {
+                        if (value.toInt() == TpPort.MAX_PORT) {
+                            sBuilder.matchSctpDst(key);
+                        } else {
+                            sBuilder.matchSctpDstMasked(key, value);
+                        }
+                    }
+                }
+
+                selectorSet.add(sBuilder.build());
+            });
+        } else {
+
+            TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder();
+            buildMatches(sBuilder, sgRule, vmIp, remoteIp, netId);
+
+            selectorSet.add(sBuilder.build());
+        }
+
+        return selectorSet;
+    }
+
+    private void buildMatches(TrafficSelector.Builder sBuilder,
+                              KubevirtSecurityGroupRule sgRule, Ip4Address vmIp,
+                              IpPrefix remoteIp, String netId) {
+        buildTunnelId(sBuilder, netId);
+        buildMatchEthType(sBuilder, sgRule.etherType());
+        buildMatchDirection(sBuilder, sgRule.direction(), vmIp);
+        buildMatchProto(sBuilder, sgRule.protocol());
+        buildMatchPort(sBuilder, sgRule.protocol(), sgRule.direction(),
+                sgRule.portRangeMin() == null ? 0 : sgRule.portRangeMin(),
+                sgRule.portRangeMax() == null ? 0 : sgRule.portRangeMax());
+        buildMatchIcmp(sBuilder, sgRule.protocol(),
+                sgRule.portRangeMin(), sgRule.portRangeMax());
+        buildMatchRemoteIp(sBuilder, remoteIp, sgRule.direction());
+    }
+
+    private void buildTunnelId(TrafficSelector.Builder sBuilder, String netId) {
+        KubevirtNetwork network = networkService.network(netId);
+
+        if (network == null) {
+            log.warn("Network {} not found!", netId);
+            return;
+        }
+
+        String segId = network.segmentId();
+        Type netType = network.type();
+
+        if (netType == VLAN) {
+            sBuilder.matchVlanId(VlanId.vlanId(segId));
+        } else if (netType == VXLAN || netType == GRE || netType == GENEVE) {
+            // sBuilder.matchTunnelId(Long.parseLong(segId));
+            log.trace("{} typed match rules are installed for security group", netType);
+        } else {
+            log.debug("Cannot tag the VID as it is unsupported vnet type {}", netType);
+        }
+
+
+    }
+
+    private void buildMatchDirection(TrafficSelector.Builder sBuilder,
+                                     String direction,
+                                     Ip4Address vmIp) {
+        if (direction.equalsIgnoreCase(EGRESS)) {
+            sBuilder.matchIPSrc(IpPrefix.valueOf(vmIp, VM_IP_PREFIX));
+        } else {
+            sBuilder.matchIPDst(IpPrefix.valueOf(vmIp, VM_IP_PREFIX));
+        }
+    }
+
+    private void buildMatchEthType(TrafficSelector.Builder sBuilder, String etherType) {
+        // Either IpSrc or IpDst (or both) is set by default, and we need to set EthType as IPv4.
+        sBuilder.matchEthType(Ethernet.TYPE_IPV4);
+        if (etherType != null && !Objects.equals(etherType, STR_NULL) &&
+                !etherType.equalsIgnoreCase(ETHTYPE_IPV4)) {
+            log.debug("EthType {} is not supported yet in Security Group", etherType);
+        }
+    }
+
+    private void buildMatchRemoteIp(TrafficSelector.Builder sBuilder,
+                                    IpPrefix remoteIpPrefix, String direction) {
+        if (remoteIpPrefix != null &&
+                !remoteIpPrefix.getIp4Prefix().equals(IP_PREFIX_ANY)) {
+            if (direction.equalsIgnoreCase(EGRESS)) {
+                sBuilder.matchIPDst(remoteIpPrefix);
+            } else {
+                sBuilder.matchIPSrc(remoteIpPrefix);
+            }
+        }
+    }
+
+    private void buildMatchProto(TrafficSelector.Builder sBuilder, String protocol) {
+        if (protocol != null) {
+            switch (protocol.toUpperCase()) {
+                case PROTO_ICMP:
+                case PROTO_ICMP_NUM:
+                    sBuilder.matchIPProtocol(IPv4.PROTOCOL_ICMP);
+                    break;
+                case PROTO_TCP:
+                case PROTO_TCP_NUM:
+                    sBuilder.matchIPProtocol(IPv4.PROTOCOL_TCP);
+                    break;
+                case PROTO_UDP:
+                case PROTO_UDP_NUM:
+                    sBuilder.matchIPProtocol(IPv4.PROTOCOL_UDP);
+                    break;
+                case PROTO_SCTP:
+                case PROTO_SCTP_NUM:
+                    sBuilder.matchIPProtocol(PROTOCOL_SCTP);
+                    break;
+                default:
+                    break;
+            }
+        }
+    }
+
+    private void buildMatchPort(TrafficSelector.Builder sBuilder,
+                                String protocol, String direction,
+                                int portMin, int portMax) {
+        if (portMax > 0 && portMin == portMax) {
+            if (protocol.equalsIgnoreCase(PROTO_TCP) ||
+                    protocol.equals(PROTO_TCP_NUM)) {
+                if (direction.equalsIgnoreCase(EGRESS)) {
+                    sBuilder.matchTcpSrc(TpPort.tpPort(portMax));
+                } else {
+                    sBuilder.matchTcpDst(TpPort.tpPort(portMax));
+                }
+            } else if (protocol.equalsIgnoreCase(PROTO_UDP) ||
+                    protocol.equals(PROTO_UDP_NUM)) {
+                if (direction.equalsIgnoreCase(EGRESS)) {
+                    sBuilder.matchUdpSrc(TpPort.tpPort(portMax));
+                } else {
+                    sBuilder.matchUdpDst(TpPort.tpPort(portMax));
+                }
+            } else if (protocol.equalsIgnoreCase(PROTO_SCTP) ||
+                    protocol.equals(PROTO_SCTP_NUM)) {
+                if (direction.equalsIgnoreCase(EGRESS)) {
+                    sBuilder.matchSctpSrc(TpPort.tpPort(portMax));
+                } else {
+                    sBuilder.matchSctpDst(TpPort.tpPort(portMax));
+                }
+            }
+        }
+    }
+
+    private void buildMatchIcmp(TrafficSelector.Builder sBuilder,
+                                String protocol, Integer icmpCode, Integer icmpType) {
+        if (protocol != null) {
+            if (protocol.equalsIgnoreCase(PROTO_ICMP) ||
+                    protocol.equals(PROTO_ICMP_NUM)) {
+                if (icmpCode != null && icmpCode >= ICMP_CODE_MIN &&
+                        icmpCode <= ICMP_CODE_MAX) {
+                    sBuilder.matchIcmpCode(icmpCode.byteValue());
+                }
+                if (icmpType != null && icmpType >= ICMP_TYPE_MIN &&
+                        icmpType <= ICMP_TYPE_MAX) {
+                    sBuilder.matchIcmpType(icmpType.byteValue());
+                }
+            }
+        }
+    }
+
+    private void resetSecurityGroupRules() {
+
+        if (getUseSecurityGroupFlag()) {
+            nodeService.completeNodes(WORKER).forEach(node -> {
+                initializeEgressTable(node.intgBridge(), true);
+                initializeConnTrackTable(node.intgBridge(), true);
+                initializeAclTable(node.intgBridge(), true);
+                initializeIngressTable(node.intgBridge(), true);
+
+                for (KubevirtNetwork network : networkService.tenantNetworks()) {
+                    initializeEgressTable(network.tenantDeviceId(node.hostname()), true);
+                    initializeIngressTable(network.tenantDeviceId(node.hostname()), true);
+                    initializeConnTrackTable(network.tenantDeviceId(node.hostname()), true);
+                    initializeAclTable(network.tenantDeviceId(node.hostname()), true);
+                }
+            });
+
+            securityGroupService.securityGroups().forEach(securityGroup ->
+                    securityGroup.rules().forEach(this::securityGroupRuleAdded));
+        } else {
+            nodeService.completeNodes(WORKER).forEach(node -> {
+                initializeEgressTable(node.intgBridge(), false);
+                initializeConnTrackTable(node.intgBridge(), false);
+                initializeAclTable(node.intgBridge(), false);
+                initializeIngressTable(node.intgBridge(), false);
+
+                for (KubevirtNetwork network : networkService.tenantNetworks()) {
+                    initializeEgressTable(network.tenantDeviceId(node.hostname()), false);
+                    initializeIngressTable(network.tenantDeviceId(node.hostname()), false);
+                    initializeConnTrackTable(network.tenantDeviceId(node.hostname()), false);
+                    initializeAclTable(network.tenantDeviceId(node.hostname()), false);
+                }
+            });
+
+            securityGroupService.securityGroups().forEach(securityGroup ->
+                    securityGroup.rules().forEach(this::securityGroupRuleRemoved));
+        }
+
+        log.info("Reset security group info " +
+                (getUseSecurityGroupFlag() ? "with" : "without") + " Security Group");
+    }
+
+    private void securityGroupRuleAdded(KubevirtSecurityGroupRule sgRule) {
+        portService.ports().stream()
+                .filter(port -> port.securityGroups().contains(sgRule.securityGroupId()))
+                .forEach(port -> {
+                    updateSecurityGroupRule(port, sgRule, true);
+                    log.info("Applied security group rule {} to port {}",
+                            sgRule.id(), port.macAddress());
+                });
+    }
+
+    private void securityGroupRuleRemoved(KubevirtSecurityGroupRule sgRule) {
+        portService.ports().stream()
+                .filter(port -> port.securityGroups().contains(sgRule.securityGroupId()))
+                .forEach(port -> {
+                    updateSecurityGroupRule(port, sgRule, false);
+                    log.info("Removed security group rule {} from port {}",
+                            sgRule.id(), port.macAddress());
+                });
+    }
+
+    private class InternalKubevirtPortListener implements KubevirtPortListener {
+
+        @Override
+        public boolean isRelevant(KubevirtPortEvent event) {
+            return getUseSecurityGroupFlag();
+        }
+
+        private boolean isRelevantHelper(KubevirtPortEvent event) {
+            DeviceId deviceId = event.subject().deviceId();
+
+            if (deviceId == null) {
+                return false;
+            }
+
+            return mastershipService.isLocalMaster(deviceId);
+        }
+
+        @Override
+        public void event(KubevirtPortEvent event) {
+            log.debug("security group event received {}", event);
+
+            switch (event.type()) {
+                case KUBEVIRT_PORT_SECURITY_GROUP_ADDED:
+                    eventExecutor.execute(() -> processPortSgAdd(event));
+                    break;
+                case KUBEVIRT_PORT_SECURITY_GROUP_REMOVED:
+                    eventExecutor.execute(() -> processPortSgRemove(event));
+                    break;
+                case KUBEVIRT_PORT_REMOVED:
+                    eventExecutor.execute(() -> processPortRemove(event));
+                    break;
+                case KUBEVIRT_PORT_DEVICE_ADDED:
+                    eventExecutor.execute(() -> processPortDeviceAdded(event));
+                    break;
+                default:
+                    // do nothing for the other events
+                    break;
+            }
+        }
+
+        private void processPortSgAdd(KubevirtPortEvent event) {
+            if (!isRelevantHelper(event)) {
+                return;
+            }
+
+            if (event.securityGroupId() == null ||
+                securityGroupService.securityGroup(event.securityGroupId()) == null) {
+                return;
+            }
+
+            KubevirtPort port = event.subject();
+            KubevirtSecurityGroup sg = securityGroupService.securityGroup(event.securityGroupId());
+
+            sg.rules().forEach(sgRule -> {
+                updateSecurityGroupRule(port, sgRule, true);
+            });
+            log.info("Added security group {} to port {}",
+                    event.securityGroupId(), event.subject().macAddress());
+        }
+
+        private void processPortSgRemove(KubevirtPortEvent event) {
+            if (!isRelevantHelper(event)) {
+                return;
+            }
+
+            if (event.securityGroupId() == null ||
+                securityGroupService.securityGroup(event.securityGroupId()) == null) {
+                return;
+            }
+
+            KubevirtPort port = event.subject();
+            KubevirtSecurityGroup sg = securityGroupService.securityGroup(event.securityGroupId());
+
+            sg.rules().forEach(sgRule -> {
+                updateSecurityGroupRule(port, sgRule, false);
+            });
+            log.info("Removed security group {} from port {}",
+                    event.securityGroupId(), event.subject().macAddress());
+        }
+
+        private void processPortRemove(KubevirtPortEvent event) {
+            if (!isRelevantHelper(event)) {
+                return;
+            }
+
+            KubevirtPort port = event.subject();
+            for (String sgStr : port.securityGroups()) {
+                KubevirtSecurityGroup sg = securityGroupService.securityGroup(sgStr);
+                sg.rules().forEach(sgRule -> {
+                    updateSecurityGroupRule(port, sgRule, false);
+                });
+                log.info("Removed security group {} from port {}",
+                                        sgStr, event.subject().macAddress());
+            }
+        }
+
+        private void processPortDeviceAdded(KubevirtPortEvent event) {
+            if (!isRelevantHelper(event)) {
+                return;
+            }
+
+            for (String sgId : event.subject().securityGroups()) {
+                KubevirtSecurityGroup sg = securityGroupService.securityGroup(sgId);
+
+                sg.rules().forEach(sgRule -> {
+                    updateSecurityGroupRule(event.subject(), sgRule, true);
+                });
+                log.info("Added security group {} to port {}",
+                        event.securityGroupId(), event.subject().macAddress());
+            }
+        }
+    }
+
+    private class InternalNetworkListener implements KubevirtNetworkListener {
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(KubevirtNetworkEvent event) {
+            switch (event.type()) {
+                case KUBEVIRT_NETWORK_CREATED:
+                    eventExecutor.execute(() -> processNetworkCreation(event.subject()));
+                    break;
+                case KUBEVIRT_NETWORK_REMOVED:
+                case KUBEVIRT_NETWORK_UPDATED:
+                default:
+                    // do thing
+                    break;
+            }
+        }
+
+        private void processNetworkCreation(KubevirtNetwork network) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            Set<KubevirtNode> nodes = nodeService.completeNodes(WORKER);
+
+            if (nodes.size() > 0) {
+                // now we wait 5s for all tenant bridges are created,
+                // FIXME: we need to fina a better way to wait all tenant bridges
+                // are created before installing default security group rules
+                try {
+                    sleep(SLEEP_MS);
+                } catch (InterruptedException e) {
+                    log.error("Failed to install security group default rules.");
+                }
+
+                for (KubevirtNode node : nodes) {
+                    initializeEgressTable(network.tenantDeviceId(node.hostname()), true);
+                    initializeIngressTable(network.tenantDeviceId(node.hostname()), true);
+                    initializeConnTrackTable(network.tenantDeviceId(node.hostname()), true);
+                    initializeAclTable(network.tenantDeviceId(node.hostname()), true);
+                }
+            }
+        }
+    }
+
+    private class InternalSecurityGroupListener implements KubevirtSecurityGroupListener {
+
+        @Override
+        public boolean isRelevant(KubevirtSecurityGroupEvent event) {
+            return getUseSecurityGroupFlag();
+        }
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(KubevirtSecurityGroupEvent event) {
+            switch (event.type()) {
+                case KUBEVIRT_SECURITY_GROUP_RULE_CREATED:
+                    eventExecutor.execute(() -> processSgRuleCreate(event));
+                    break;
+                case KUBEVIRT_SECURITY_GROUP_RULE_REMOVED:
+                    eventExecutor.execute(() -> processSgRuleRemove(event));
+                    break;
+                default:
+                    // do nothing
+                    break;
+            }
+        }
+
+        private void processSgRuleCreate(KubevirtSecurityGroupEvent event) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            KubevirtSecurityGroupRule sgRuleToAdd = event.rule();
+            securityGroupRuleAdded(sgRuleToAdd);
+            log.info("Applied new security group rule {} to ports", sgRuleToAdd.id());
+        }
+
+        private void processSgRuleRemove(KubevirtSecurityGroupEvent event) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            KubevirtSecurityGroupRule sgRuleToRemove = event.rule();
+            securityGroupRuleRemoved(sgRuleToRemove);
+            log.info("Removed security group rule {} from ports", sgRuleToRemove.id());
+        }
+    }
+
+    private class InternalNodeListener implements KubevirtNodeListener {
+
+        @Override
+        public boolean isRelevant(KubevirtNodeEvent event) {
+            return event.subject().type() == WORKER;
+        }
+
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+        @Override
+        public void event(KubevirtNodeEvent event) {
+            switch (event.type()) {
+                case KUBEVIRT_NODE_COMPLETE:
+                    eventExecutor.execute(() -> processNodeComplete(event.subject()));
+                    break;
+                default:
+                    break;
+            }
+        }
+
+        private void processNodeComplete(KubevirtNode node) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+
+            resetSecurityGroupRulesByNode(node);
+        }
+
+        private void resetSecurityGroupRulesByNode(KubevirtNode node) {
+            if (getUseSecurityGroupFlag()) {
+                initializeEgressTable(node.intgBridge(), true);
+                initializeConnTrackTable(node.intgBridge(), true);
+                initializeAclTable(node.intgBridge(), true);
+                initializeIngressTable(node.intgBridge(), true);
+
+                for (KubevirtNetwork network : networkService.tenantNetworks()) {
+                    initializeEgressTable(network.tenantDeviceId(node.hostname()), true);
+                    initializeIngressTable(network.tenantDeviceId(node.hostname()), true);
+                    initializeConnTrackTable(network.tenantDeviceId(node.hostname()), true);
+                    initializeAclTable(network.tenantDeviceId(node.hostname()), true);
+                }
+
+                securityGroupService.securityGroups().forEach(securityGroup ->
+                        securityGroup.rules().forEach(
+                                KubevirtSecurityGroupHandler.this::securityGroupRuleAdded));
+            } else {
+                initializeEgressTable(node.intgBridge(), false);
+                initializeConnTrackTable(node.intgBridge(), false);
+                initializeAclTable(node.intgBridge(), false);
+                initializeIngressTable(node.intgBridge(), false);
+
+                for (KubevirtNetwork network : networkService.tenantNetworks()) {
+                    initializeEgressTable(network.tenantDeviceId(node.hostname()), false);
+                    initializeIngressTable(network.tenantDeviceId(node.hostname()), false);
+                    initializeConnTrackTable(network.tenantDeviceId(node.hostname()), false);
+                    initializeAclTable(network.tenantDeviceId(node.hostname()), false);
+                }
+
+                securityGroupService.securityGroups().forEach(securityGroup ->
+                        securityGroup.rules().forEach(
+                                KubevirtSecurityGroupHandler.this::securityGroupRuleRemoved));
+            }
+
+            log.info("Reset security group info " +
+                    (getUseSecurityGroupFlag() ? "with" : "without") + " Security Group");
+        }
+    }
+}
\ No newline at end of file