Implement floating Ip capability in kubevirt networking app.

Change-Id: Iad9358b321dd0b46a2418c8c2d78ffd91c605e28
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpHandler.java
new file mode 100644
index 0000000..726abf2
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpHandler.java
@@ -0,0 +1,434 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+import org.onlab.packet.ARP;
+import org.onlab.packet.EthType;
+import org.onlab.packet.Ethernet;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
+import org.onosproject.kubevirtnetworking.api.KubevirtFlowRuleService;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetwork;
+import org.onosproject.kubevirtnetworking.api.KubevirtNetworkService;
+import org.onosproject.kubevirtnetworking.api.KubevirtPort;
+import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
+import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
+import org.onosproject.kubevirtnetworking.api.KubevirtRouterEvent;
+import org.onosproject.kubevirtnetworking.api.KubevirtRouterListener;
+import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
+import org.onosproject.kubevirtnetworking.util.RulePopulatorUtil;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.net.Device;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceAdminService;
+import org.onosproject.net.driver.DriverService;
+import org.onosproject.net.flow.DefaultTrafficSelector;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.kubevirtnetworking.api.Constants.FORWARDING_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRE_FLAT_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FLOATING_IP_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FORWARDING_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
+import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GENEVE;
+import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GRE;
+import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.VXLAN;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.externalPatchPortNum;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.gatewayNodeForSpecifiedRouter;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.getRouterMacAddress;
+import static org.onosproject.kubevirtnetworking.util.KubevirtNetworkingUtil.tunnelPort;
+import static org.onosproject.kubevirtnetworking.util.RulePopulatorUtil.buildExtension;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Handles kubevirt floating ip.
+ */
+@Component(immediate = true)
+public class KubevirtFloatingIpHandler {
+    protected final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceAdminService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtPortService kubevirtPortService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtNodeService kubevirtNodeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtNetworkService kubevirtNetworkService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtFlowRuleService flowService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DriverService driverService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtRouterService kubevirtRouterService;
+
+    private final ExecutorService eventExecutor = newSingleThreadExecutor(
+            groupedThreads(this.getClass().getSimpleName(), "event-handler"));
+
+    private ApplicationId appId;
+    private NodeId localNodeId;
+
+    private final InternalRouterEventListener kubevirtRouterlistener =
+            new InternalRouterEventListener();
+
+    @Activate
+    protected void activate() {
+        appId = coreService.registerApplication(KUBEVIRT_NETWORKING_APP_ID);
+        localNodeId = clusterService.getLocalNode().id();
+        leadershipService.runForLeadership(appId.name());
+        kubevirtRouterService.addListener(kubevirtRouterlistener);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        leadershipService.withdraw(appId.name());
+        kubevirtRouterService.removeListener(kubevirtRouterlistener);
+
+        eventExecutor.shutdown();
+
+        log.info("Stopped");
+    }
+
+    private void setFloatingIpRules(KubevirtRouter router,
+                                    KubevirtFloatingIp floatingIp,
+                                    boolean install) {
+
+        KubevirtPort kubevirtPort = getKubevirtPort(floatingIp);
+        if (kubevirtPort == null) {
+            log.warn("Failed to install floating Ip rules for floating ip {} " +
+                    "because there's no kubevirt port associated to it", floatingIp.floatingIp());
+            return;
+        }
+
+        KubevirtNetwork kubevirtNetwork = kubevirtNetworkService.network(kubevirtPort.networkId());
+        if (kubevirtNetwork.type() == VXLAN || kubevirtNetwork.type() == GENEVE || kubevirtNetwork.type() == GRE) {
+            setFloatingIpDownstreamRulesToGatewayTunBridge(router, floatingIp, kubevirtNetwork, kubevirtPort, install);
+        }
+
+        setFloatingIpArpResponseRules(router, floatingIp, kubevirtPort, install);
+        setFloatingIpUpstreamRules(router, floatingIp, kubevirtPort, install);
+        setFloatingIpDownstreamRules(router, floatingIp, kubevirtPort, install);
+    }
+
+    private void setFloatingIpArpResponseRules(KubevirtRouter router,
+                                               KubevirtFloatingIp floatingIp,
+                                               KubevirtPort port,
+                                               boolean install) {
+
+        KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
+
+        if (electedGw == null) {
+            log.warn("Failed to install floating Ip rules for floating ip {} " +
+                    "because there's no gateway assigned to it", floatingIp.floatingIp());
+            return;
+        }
+
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchInPort(externalPatchPortNum(deviceService, electedGw))
+                .matchEthType(EthType.EtherType.ARP.ethType().toShort())
+                .matchArpOp(ARP.OP_REQUEST)
+                .matchArpTpa(floatingIp.floatingIp().getIp4Address())
+                .build();
+
+        Device device = deviceService.getDevice(electedGw.intgBridge());
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .extension(RulePopulatorUtil.buildMoveEthSrcToDstExtension(device), device.id())
+                .extension(RulePopulatorUtil.buildMoveArpShaToThaExtension(device), device.id())
+                .extension(RulePopulatorUtil.buildMoveArpSpaToTpaExtension(device), device.id())
+                .setArpOp(ARP.OP_REPLY)
+                .setEthSrc(port.macAddress())
+                .setArpSha(port.macAddress())
+                .setArpSpa(floatingIp.floatingIp().getIp4Address())
+                .setOutput(PortNumber.IN_PORT)
+                .build();
+
+        flowService.setRule(
+                appId,
+                electedGw.intgBridge(),
+                selector,
+                treatment,
+                PRIORITY_ARP_GATEWAY_RULE,
+                PRE_FLAT_TABLE,
+                install);
+    }
+    private KubevirtPort getKubevirtPort(KubevirtFloatingIp floatingIp) {
+
+        return kubevirtPortService.ports().stream()
+                .filter(port -> port.ipAddress().equals(floatingIp.fixedIp()))
+                .findAny().orElse(null);
+    }
+
+    private void setFloatingIpUpstreamRules(KubevirtRouter router,
+                                            KubevirtFloatingIp floatingIp,
+                                            KubevirtPort port,
+                                            boolean install) {
+
+        KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
+
+        if (electedGw == null) {
+            log.warn("Failed to install floating Ip rules for floating ip {} " +
+                    "because there's no gateway assigned to it", floatingIp.floatingIp());
+            return;
+        }
+
+        MacAddress peerMacAddress = router.peerRouter().macAddress();
+
+        if (peerMacAddress == null) {
+            log.warn("Failed to install floating Ip rules for floating ip {} and router {}" +
+                    "because there's no peer router mac address", floatingIp.floatingIp(),
+                    router.name());
+            return;
+        }
+
+        MacAddress routerMacAddress = getRouterMacAddress(router);
+
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchEthSrc(port.macAddress())
+                .matchEthDst(routerMacAddress)
+                .matchIPSrc(IpPrefix.valueOf(floatingIp.fixedIp(), 32))
+                .build();
+
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setEthDst(peerMacAddress)
+                .setEthSrc(port.macAddress())
+                .setIpSrc(floatingIp.floatingIp())
+                .setOutput(externalPatchPortNum(deviceService, electedGw))
+                .build();
+
+        flowService.setRule(
+                appId,
+                electedGw.intgBridge(),
+                selector,
+                treatment,
+                PRIORITY_FLOATING_IP_RULE,
+                PRE_FLAT_TABLE,
+                install);
+    }
+
+    private void setFloatingIpDownstreamRules(KubevirtRouter router,
+                                              KubevirtFloatingIp floatingIp,
+                                              KubevirtPort port,
+                                              boolean install) {
+        KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
+
+        if (electedGw == null) {
+            log.warn("Failed to install floating Ip rules for floating ip {} " +
+                    "because there's no gateway assigned to it", floatingIp.floatingIp());
+            return;
+        }
+
+        MacAddress routerMacAddress = getRouterMacAddress(router);
+
+        TrafficSelector selector = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchEthDst(port.macAddress())
+                .matchIPDst(IpPrefix.valueOf(floatingIp.floatingIp(), 32))
+                .build();
+
+        TrafficTreatment treatment = DefaultTrafficTreatment.builder()
+                .setEthSrc(routerMacAddress)
+                .setEthDst(port.macAddress())
+                .setIpDst(floatingIp.fixedIp())
+                .transition(FORWARDING_TABLE)
+                .build();
+
+        flowService.setRule(
+                appId,
+                electedGw.intgBridge(),
+                selector,
+                treatment,
+                PRIORITY_FLOATING_IP_RULE,
+                PRE_FLAT_TABLE,
+                install);
+    }
+
+
+    private void setFloatingIpDownstreamRulesToGatewayTunBridge(KubevirtRouter router,
+                                                                KubevirtFloatingIp floatingIp,
+                                                                KubevirtNetwork network,
+                                                                KubevirtPort port,
+                                                                boolean install) {
+        KubevirtNode electedGw = gatewayNodeForSpecifiedRouter(kubevirtNodeService, router);
+
+        if (electedGw == null) {
+            log.warn("Failed to install floating Ip rules for floating ip {} " +
+                    "because there's no gateway assigned to it", floatingIp.floatingIp());
+            return;
+        }
+
+        KubevirtNode workerNode = kubevirtNodeService.node(port.deviceId());
+        if (workerNode == null) {
+            log.warn("Failed to install floating Ip rules for floating ip {} " +
+                    "because fail to fine the worker node that the associated port is running on",
+                    floatingIp.floatingIp());
+        }
+
+        PortNumber tunnelPortNumber = tunnelPort(electedGw, network);
+        if (tunnelPortNumber == null) {
+            return;
+        }
+
+
+        TrafficSelector.Builder sBuilder = DefaultTrafficSelector.builder()
+                .matchEthType(Ethernet.TYPE_IPV4)
+                .matchIPDst(IpPrefix.valueOf(port.ipAddress(), 32))
+                .matchEthDst(port.macAddress());
+
+        TrafficTreatment.Builder tBuilder = DefaultTrafficTreatment.builder()
+                .setTunnelId(Long.parseLong(network.segmentId()))
+                .extension(buildExtension(
+                        deviceService,
+                        electedGw.tunBridge(),
+                        workerNode.dataIp().getIp4Address()),
+                        electedGw.tunBridge())
+                .setOutput(tunnelPortNumber);
+
+        flowService.setRule(
+                appId,
+                electedGw.tunBridge(),
+                sBuilder.build(),
+                tBuilder.build(),
+                PRIORITY_FORWARDING_RULE,
+                TUNNEL_DEFAULT_TABLE,
+                install);
+    }
+
+    private class InternalRouterEventListener implements KubevirtRouterListener {
+        private boolean isRelevantHelper() {
+            return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+        }
+
+
+        @Override
+        public void event(KubevirtRouterEvent event) {
+            switch (event.type()) {
+                case KUBEVIRT_ROUTER_CREATED:
+                    eventExecutor.execute(() -> processRouterCreation(event.subject()));
+                    break;
+                case KUBEVIRT_ROUTER_UPDATED:
+                    eventExecutor.execute(() -> processRouterUpdate(event.subject()));
+                    break;
+                case KUBEVIRT_ROUTER_REMOVED:
+                    eventExecutor.execute(() -> processRouterDeletion(event.subject()));
+                    break;
+                case KUBEVIRT_FLOATING_IP_ASSOCIATED:
+                    eventExecutor.execute(() -> processFloatingIpAssociation(event.subject(),
+                            event.floatingIp()));
+                    break;
+                case KUBEVIRT_FLOATING_IP_DISASSOCIATED:
+                    eventExecutor.execute(() -> processFloatingIpDisassociation(event.subject(),
+                            event.floatingIp()));
+                    break;
+
+                default:
+                    //do nothing
+                    break;
+            }
+        }
+
+        private void processRouterCreation(KubevirtRouter router) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+            kubevirtRouterService.floatingIpsByRouter(router.name())
+                    .stream()
+                    .filter(fip -> fip.fixedIp() != null)
+                    .forEach(fip -> {
+                        processFloatingIpAssociation(router, fip);
+                    });
+        }
+
+        private void processRouterDeletion(KubevirtRouter router) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+            kubevirtRouterService.floatingIpsByRouter(router.name())
+                    .stream()
+                    .filter(fip -> fip.fixedIp() != null)
+                    .forEach(fip -> {
+                        processFloatingIpDisassociation(router, fip);
+                    });
+        }
+
+        private void processRouterUpdate(KubevirtRouter router) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+            kubevirtRouterService.floatingIpsByRouter(router.name())
+                    .stream()
+                    .filter(fip -> fip.fixedIp() != null)
+                    .forEach(fip -> {
+                        processFloatingIpAssociation(router, fip);
+                    });
+        }
+
+        private void processFloatingIpAssociation(KubevirtRouter router, KubevirtFloatingIp floatingIp) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+            setFloatingIpRules(router, floatingIp, true);
+        }
+
+        private void processFloatingIpDisassociation(KubevirtRouter router, KubevirtFloatingIp floatingIp) {
+            if (!isRelevantHelper()) {
+                return;
+            }
+            setFloatingIpRules(router, floatingIp, false);
+        }
+    }
+}