Implements Kubevirt Prometheus Assurance exporter in kubevirt networking app.

Change-Id: Ief89fe71f24c2999ae02d94b4f0f82b3822ae19f
(cherry picked from commit c1ba1f4b1d389b051c64e0bbc4c2217e50fd78a8)
diff --git a/apps/kubevirt-networking/BUILD b/apps/kubevirt-networking/BUILD
index 580cb60..ec4607d 100644
--- a/apps/kubevirt-networking/BUILD
+++ b/apps/kubevirt-networking/BUILD
@@ -35,6 +35,15 @@
     "//apps/kubevirt-networking/api:onos-apps-kubevirt-networking-api",
     "//apps/kubevirt-networking/app:onos-apps-kubevirt-networking-app",
     "//apps/kubevirt-networking/web:onos-apps-kubevirt-networking-web",
+    # prometheus deps
+    "@simpleclient//jar",
+    "@simpleclient_common//jar",
+    "@simpleclient_hotspot//jar",
+    "@simpleclient_servlet//jar",
+    "@jetty_servlet//jar",
+    "@jetty_server//jar",
+    "@jetty_util//jar",
+    "@jetty_websocket//jar",
 ]
 
 onos_app(
diff --git a/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/Constants.java b/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/Constants.java
index e3e9bce..0675c93 100644
--- a/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/Constants.java
+++ b/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/Constants.java
@@ -104,7 +104,9 @@
     public static final int CLI_MARGIN_LENGTH = 2;
 
     public static final int PRIORITY_STATEFUL_SNAT_RULE = 40500;
-    public static final int PRIORITY_FLOATING_IP_RULE = 40800;
+    public static final int PRIORITY_FLOATING_IP_UPSTREAM_RULE = 40800;
+    public static final int PRIORITY_FLOATING_IP_DOWNSTREAM_RULE = 40700;
+
     public static final int PRIORITY_INTERNAL_ROUTING_RULE = 41000;
     public static final int PRIORITY_LB_RULE = 41500;
     public static final int PRIORITY_LB_FIP_RULE = 41500;
diff --git a/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/KubevirtPrometheusAssuranceService.java b/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/KubevirtPrometheusAssuranceService.java
new file mode 100644
index 0000000..4e10fff
--- /dev/null
+++ b/apps/kubevirt-networking/api/src/main/java/org/onosproject/kubevirtnetworking/api/KubevirtPrometheusAssuranceService.java
@@ -0,0 +1,21 @@
+/*
+ * Copyright 2022-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.api;
+
+public interface KubevirtPrometheusAssuranceService {
+    void startPrometheusExporter();
+    void stopPrometheusExporter();
+}
diff --git a/apps/kubevirt-networking/app/BUILD b/apps/kubevirt-networking/app/BUILD
index 0358e6e..6b37d90 100644
--- a/apps/kubevirt-networking/app/BUILD
+++ b/apps/kubevirt-networking/app/BUILD
@@ -37,6 +37,16 @@
     "@zjsonpatch//jar",
     "@validation_api//jar",
     "@dns_java//jar",
+    "@simpleclient//jar",
+    "@simpleclient_common//jar",
+    "@simpleclient_hotspot//jar",
+    "@simpleclient_servlet//jar",
+    "@jetty_servlet//jar",
+    "@jetty_http//jar",
+    "@jetty_server//jar",
+    "@jetty_util//jar",
+    "@jetty_websocket//jar",
+    "@servlet_api//jar",
 ]
 
 TEST_DEPS = TEST_ADAPTERS + [
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpHandler.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpHandler.java
index 2d5b87d..53fefc9 100644
--- a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpHandler.java
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtFloatingIpHandler.java
@@ -68,7 +68,8 @@
 import static org.onosproject.kubevirtnetworking.api.Constants.KUBEVIRT_NETWORKING_APP_ID;
 import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_ARP_GATEWAY_RULE;
 import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FLOATING_GATEWAY_TUN_BRIDGE_RULE;
-import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FLOATING_IP_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FLOATING_IP_DOWNSTREAM_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FLOATING_IP_UPSTREAM_RULE;
 import static org.onosproject.kubevirtnetworking.api.Constants.TUNNEL_DEFAULT_TABLE;
 import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GENEVE;
 import static org.onosproject.kubevirtnetworking.api.KubevirtNetwork.Type.GRE;
@@ -255,7 +256,7 @@
                 electedGw.intgBridge(),
                 selector,
                 treatment,
-                PRIORITY_FLOATING_IP_RULE,
+                PRIORITY_FLOATING_IP_UPSTREAM_RULE,
                 GW_ENTRY_TABLE,
                 install);
     }
@@ -285,7 +286,7 @@
                 electedGw.intgBridge(),
                 selector,
                 treatment,
-                PRIORITY_FLOATING_IP_RULE,
+                PRIORITY_FLOATING_IP_DOWNSTREAM_RULE,
                 GW_ENTRY_TABLE,
                 install);
     }
diff --git a/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPrometheusAssuranceManager.java b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPrometheusAssuranceManager.java
new file mode 100644
index 0000000..f9abc78
--- /dev/null
+++ b/apps/kubevirt-networking/app/src/main/java/org/onosproject/kubevirtnetworking/impl/KubevirtPrometheusAssuranceManager.java
@@ -0,0 +1,387 @@
+/*
+ * Copyright 2022-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.kubevirtnetworking.impl;
+
+import io.prometheus.client.Gauge;
+import io.prometheus.client.exporter.MetricsServlet;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.util.SharedScheduledExecutors;
+import org.onosproject.core.CoreService;
+import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
+import org.onosproject.kubevirtnetworking.api.KubevirtPort;
+import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
+import org.onosproject.kubevirtnetworking.api.KubevirtPrometheusAssuranceService;
+import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
+import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
+import org.onosproject.kubevirtnode.api.KubevirtNode;
+import org.onosproject.kubevirtnode.api.KubevirtNodeService;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.flow.FlowEntry;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.IndexTableId;
+import org.onosproject.net.flow.TrafficSelector;
+import org.onosproject.net.flow.criteria.EthCriterion;
+import org.onosproject.net.flow.criteria.IPCriterion;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.onosproject.kubevirtnetworking.api.Constants.GW_ENTRY_TABLE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FLOATING_IP_DOWNSTREAM_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FLOATING_IP_UPSTREAM_RULE;
+import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_STATEFUL_SNAT_RULE;
+import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.GATEWAY;
+import static org.onosproject.net.flow.criteria.Criterion.Type.ETH_DST;
+import static org.onosproject.net.flow.criteria.Criterion.Type.ETH_SRC;
+import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Implementation of Kubevirt Prometheus Assurance Service.
+ */
+@Component(immediate = true, service = KubevirtPrometheusAssuranceService.class)
+public class KubevirtPrometheusAssuranceManager implements KubevirtPrometheusAssuranceService {
+    private final Logger log = getLogger(getClass());
+
+    private static final String APP_ID = "org.onosproject.kubevirtnetwork";
+
+
+    private static final String SRC_PORT = "srcPort";
+
+    private static final String FIP_ID = "fipId";
+    private static final String FIP_ADDRESS = "fipAddress";
+    private static final String MER_NAME = "merName";
+    private static final String NETWORK_NAME = "networkName";
+    private static final String ROUTER_NAME = "routerName";
+    private static final String ROUTER_SNAT_IP = "routerSnatIp";
+    private static final String VM_NAME = "vmName";
+    private static final String RX_BYTE = "rxByte";
+    private static final String TX_BYTE = "txByte";
+    private static final String RX_PKTS = "rxPkts";
+    private static final String TX_PKTS = "txPkts";
+
+
+    private static final String DST_PORT = "dstPort";
+    private static final String PROTOCOL = "protocol";
+
+
+    private static final long INITIAL_DELAY = 10L;
+    private static final long REFRESH_INTERVAL = 10L;
+    private static final TimeUnit TIME_UNIT_SECOND = TimeUnit.SECONDS;
+    private static final boolean RECOVER_FROM_FAILURE = true;
+
+    private static final String[] FIP_LABEL_TAGS = {FIP_ID, FIP_ADDRESS, MER_NAME, VM_NAME, NETWORK_NAME};
+    private static final String[] SNAT_LABEL_TAGS = {ROUTER_NAME, ROUTER_SNAT_IP, MER_NAME};
+
+    private Server prometheusExporter;
+    private StatsCollector collector;
+    private ScheduledFuture result;
+
+    private static Gauge byteFIPTx = Gauge.build()
+            .name("fip_tx_byte")
+            .help("fip_tx_byte")
+            .labelNames(FIP_LABEL_TAGS)
+            .register();
+    private static Gauge pktFIPTx = Gauge.build()
+            .name("fip_tx_pkts")
+            .help("fip_tx_pkts")
+            .labelNames(FIP_LABEL_TAGS)
+            .register();
+    private static Gauge byteFIPRx = Gauge.build()
+            .name("fip_rx_byte")
+            .help("fip_rx_byte")
+            .labelNames(FIP_LABEL_TAGS)
+            .register();
+    private static Gauge pktFIPRx = Gauge.build()
+            .name("fip_rx_pkts")
+            .help("fip_rx_pkts")
+            .labelNames(FIP_LABEL_TAGS)
+            .register();
+
+    private static Gauge byteSNATTx = Gauge.build()
+            .name("snat_tx_byte")
+            .help("snat_rx_pkts")
+            .labelNames(SNAT_LABEL_TAGS)
+            .register();
+    private static Gauge pktSNATTx = Gauge.build()
+            .name("snat_tx_pkts")
+            .help("snat_tx_pkts")
+            .labelNames(SNAT_LABEL_TAGS)
+            .register();
+
+    private static Gauge byteSNATRx = Gauge.build()
+            .name("snat_rx_byte")
+            .help("snat_rx_byte")
+            .labelNames(SNAT_LABEL_TAGS)
+            .register();
+    private static Gauge pktSNATRx = Gauge.build()
+            .name("snat_rx_pkts")
+            .help("snat_rx_pkts")
+            .labelNames(SNAT_LABEL_TAGS)
+            .register();
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected CoreService coreService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected FlowRuleService flowRuleService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtRouterService routerService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtNodeService nodeService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected KubevirtPortService portService;
+
+    @Activate
+    protected void activate() {
+        startPrometheusExporter();
+        log.info("Started");
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        stopPrometheusExporter();
+        log.info("Stopped");
+    }
+
+    @Override
+    public void startPrometheusExporter() {
+        try {
+            prometheusExporter = new Server(9300);
+            ServletContextHandler context = new ServletContextHandler();
+            context.setContextPath("/");
+            prometheusExporter.setHandler(context);
+            context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
+
+            prometheusExporter.start();
+
+            collector = new StatsCollector();
+
+            result = SharedScheduledExecutors.getSingleThreadExecutor()
+                    .scheduleAtFixedRate(collector, INITIAL_DELAY,
+                            REFRESH_INTERVAL, TIME_UNIT_SECOND, RECOVER_FROM_FAILURE);
+
+            log.info("Prometheus server start");
+        } catch (Exception ex) {
+            log.warn("Failed to start prometheus server due to {}", ex);
+        }
+    }
+
+    @Override
+    public void stopPrometheusExporter() {
+        try {
+            if (prometheusExporter != null) {
+                prometheusExporter.stop();
+            }
+        } catch (Exception e) {
+            log.warn("Failed to stop prometheus server due to {}", e);
+        }
+
+        result.cancel(true);
+        log.info("Prometheus exporter has stopped");
+    }
+
+    private void publish() {
+        publishFipMetrics();
+        publishSnatMetrics();
+    }
+
+    private void publishFipMetrics() {
+        if (prometheusExporter == null) {
+            log.error("Prometheus Server isn't ready.");
+            return;
+        }
+
+        String[] fipLabelValues = new String[5];
+
+        nodeService.completeNodes(GATEWAY).forEach(node -> {
+            flowRuleService.getFlowEntries(node.intgBridge()).forEach(flowEntry -> {
+
+
+                if (((IndexTableId) flowEntry.table()).id() == GW_ENTRY_TABLE &&
+                        flowEntry.priority() == PRIORITY_FLOATING_IP_UPSTREAM_RULE) {
+
+                    KubevirtFloatingIp floatingIp = floatingIpByUpstreamFlowEntry(flowEntry);
+                    if (floatingIp == null || floatingIp.vmName() == null) {
+                        return;
+                    }
+
+                    fipLabelValues[0] = floatingIp.id();
+                    fipLabelValues[1] = floatingIp.floatingIp().toString();
+                    fipLabelValues[2] = node.hostname();
+                    fipLabelValues[3] = floatingIp.vmName();
+                    fipLabelValues[4] = floatingIp.networkName();
+
+                    pktFIPTx.labels(fipLabelValues).set(flowEntry.packets());
+                    byteFIPTx.labels(fipLabelValues).set(flowEntry.bytes());
+
+                } else if (((IndexTableId) flowEntry.table()).id() == GW_ENTRY_TABLE &&
+                        flowEntry.priority() == PRIORITY_FLOATING_IP_DOWNSTREAM_RULE) {
+                    KubevirtFloatingIp floatingIp = floatingIpByDownstreamFlowEntry(flowEntry);
+                    if (floatingIp == null || floatingIp.vmName() == null) {
+                        return;
+                    }
+
+                    fipLabelValues[0] = floatingIp.id();
+                    fipLabelValues[1] = floatingIp.floatingIp().toString();
+                    fipLabelValues[2] = node.hostname();
+                    fipLabelValues[3] = floatingIp.vmName();
+                    fipLabelValues[4] = floatingIp.networkName();
+
+                    pktFIPRx.labels(fipLabelValues).set(flowEntry.packets());
+                    byteFIPRx.labels(fipLabelValues).set(flowEntry.bytes());
+                }
+            });
+        });
+    }
+
+
+    private void publishSnatMetrics() {
+        if (prometheusExporter == null) {
+            log.error("Prometheus Server isn't ready.");
+            return;
+        }
+
+        String[] snatLabelValues = new String[3];
+
+        routerService.routers().stream().filter(router -> router.enableSnat() &&
+                        router.electedGateway() != null &&
+                        router.peerRouter() != null &&
+                        router.peerRouter().ipAddress() != null &&
+                        router.peerRouter().macAddress() != null)
+                .forEach(router -> {
+                    KubevirtNode gateway = nodeService.node(router.electedGateway());
+                    if (gateway == null) {
+                        return;
+                    }
+
+                    String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
+                    if (routerSnatIp == null) {
+                        return;
+                    }
+
+                    flowRuleService.getFlowEntries(gateway.intgBridge()).forEach(flowEntry -> {
+                        if (((IndexTableId) flowEntry.table()).id() == GW_ENTRY_TABLE &&
+                                flowEntry.priority() == PRIORITY_STATEFUL_SNAT_RULE) {
+                            snatLabelValues[0] = router.name();
+                            snatLabelValues[1] = routerSnatIp;
+                            snatLabelValues[2] = gateway.hostname();
+                            if (isSnatUpstreamFlorEntryForRouter(router, flowEntry)) {
+                                pktSNATTx.labels(snatLabelValues).set(flowEntry.packets());
+                                byteSNATTx.labels(snatLabelValues).set(flowEntry.bytes());
+
+                            } else if (isSnatDownstreamFlorEntryForRouter(routerSnatIp, flowEntry)) {
+                                pktSNATRx.labels(snatLabelValues).set(flowEntry.packets());
+                                byteSNATRx.labels(snatLabelValues).set(flowEntry.bytes());
+                            }
+                        }
+                    });
+                });
+    }
+
+    private boolean isSnatUpstreamFlorEntryForRouter(KubevirtRouter router, FlowEntry flowEntry) {
+        TrafficSelector selector = flowEntry.selector();
+
+        EthCriterion ethCriterion = (EthCriterion) selector.getCriterion(ETH_DST);
+        if (ethCriterion == null) {
+            return false;
+        }
+        MacAddress macAddress = ethCriterion.mac();
+
+        if (router.mac().equals(macAddress)) {
+            return true;
+        }
+
+        return false;
+    }
+
+    private boolean isSnatDownstreamFlorEntryForRouter(String routerSnatIp,  FlowEntry flowEntry) {
+        TrafficSelector selector = flowEntry.selector();
+
+        IPCriterion ipCriterion = (IPCriterion) selector.getCriterion(IPV4_DST);
+        if (ipCriterion == null) {
+            return false;
+        }
+
+        IpAddress dstIp = ipCriterion.ip().address();
+
+        if (dstIp.toString().equals(routerSnatIp)) {
+            return true;
+        }
+        return false;
+    }
+
+    private KubevirtFloatingIp floatingIpByUpstreamFlowEntry(FlowEntry flowEntry) {
+        TrafficSelector selector = flowEntry.selector();
+
+        EthCriterion ethCriterion = (EthCriterion) selector.getCriterion(ETH_SRC);
+
+        if (ethCriterion == null) {
+            return null;
+        }
+        MacAddress macAddress = ethCriterion.mac();
+
+        KubevirtPort port = portService.port(macAddress);
+
+        if (port == null) {
+            return null;
+        }
+
+        return routerService.floatingIps()
+                .stream()
+                .filter(ip -> ip.vmName() != null && ip.vmName().equals(port.vmName()))
+                .findAny().orElse(null);
+    }
+
+    private KubevirtFloatingIp floatingIpByDownstreamFlowEntry(FlowEntry flowEntry) {
+        TrafficSelector selector = flowEntry.selector();
+
+        IPCriterion ipCriterion = (IPCriterion) selector.getCriterion(IPV4_DST);
+        if (ipCriterion == null) {
+            return null;
+        }
+
+        IpAddress dstIp = ipCriterion.ip().address();
+
+        return routerService.floatingIps()
+                .stream()
+                .filter(ip -> ip.floatingIp().equals(dstIp))
+                .findAny().orElse(null);
+    }
+
+    private class StatsCollector implements Runnable {
+        @Override
+        public void run() {
+            publish();
+        }
+    }
+}