blob: c9fdc4835c826e4cfdd10a70f9cc0ebb04a1d2d1 [file] [log] [blame]
Daniel Parkc1ba1f42022-05-16 17:30:24 +09001/*
2 * Copyright 2022-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.onosproject.kubevirtnetworking.impl;
17
18import io.prometheus.client.Gauge;
19import io.prometheus.client.exporter.MetricsServlet;
20import org.eclipse.jetty.server.Server;
21import org.eclipse.jetty.servlet.ServletContextHandler;
22import org.eclipse.jetty.servlet.ServletHolder;
23import org.onlab.packet.IpAddress;
24import org.onlab.packet.MacAddress;
25import org.onlab.util.SharedScheduledExecutors;
26import org.onosproject.core.CoreService;
27import org.onosproject.kubevirtnetworking.api.KubevirtFloatingIp;
28import org.onosproject.kubevirtnetworking.api.KubevirtPort;
29import org.onosproject.kubevirtnetworking.api.KubevirtPortService;
30import org.onosproject.kubevirtnetworking.api.KubevirtPrometheusAssuranceService;
31import org.onosproject.kubevirtnetworking.api.KubevirtRouter;
32import org.onosproject.kubevirtnetworking.api.KubevirtRouterService;
33import org.onosproject.kubevirtnode.api.KubevirtNode;
34import org.onosproject.kubevirtnode.api.KubevirtNodeService;
35import org.onosproject.net.device.DeviceService;
36import org.onosproject.net.flow.FlowEntry;
37import org.onosproject.net.flow.FlowRuleService;
38import org.onosproject.net.flow.IndexTableId;
39import org.onosproject.net.flow.TrafficSelector;
40import org.onosproject.net.flow.criteria.EthCriterion;
41import org.onosproject.net.flow.criteria.IPCriterion;
42import org.osgi.service.component.annotations.Activate;
43import org.osgi.service.component.annotations.Component;
44import org.osgi.service.component.annotations.Deactivate;
45import org.osgi.service.component.annotations.Reference;
46import org.osgi.service.component.annotations.ReferenceCardinality;
47import org.slf4j.Logger;
48
49import java.util.concurrent.ScheduledFuture;
50import java.util.concurrent.TimeUnit;
51
52import static org.onosproject.kubevirtnetworking.api.Constants.GW_ENTRY_TABLE;
53import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FLOATING_IP_DOWNSTREAM_RULE;
54import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_FLOATING_IP_UPSTREAM_RULE;
55import static org.onosproject.kubevirtnetworking.api.Constants.PRIORITY_STATEFUL_SNAT_RULE;
56import static org.onosproject.kubevirtnode.api.KubevirtNode.Type.GATEWAY;
57import static org.onosproject.net.flow.criteria.Criterion.Type.ETH_DST;
58import static org.onosproject.net.flow.criteria.Criterion.Type.ETH_SRC;
59import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
60import static org.slf4j.LoggerFactory.getLogger;
61
62/**
63 * Implementation of Kubevirt Prometheus Assurance Service.
64 */
65@Component(immediate = true, service = KubevirtPrometheusAssuranceService.class)
66public class KubevirtPrometheusAssuranceManager implements KubevirtPrometheusAssuranceService {
67 private final Logger log = getLogger(getClass());
68
69 private static final String APP_ID = "org.onosproject.kubevirtnetwork";
70
71
72 private static final String SRC_PORT = "srcPort";
73
74 private static final String FIP_ID = "fipId";
75 private static final String FIP_ADDRESS = "fipAddress";
76 private static final String MER_NAME = "merName";
77 private static final String NETWORK_NAME = "networkName";
78 private static final String ROUTER_NAME = "routerName";
79 private static final String ROUTER_SNAT_IP = "routerSnatIp";
80 private static final String VM_NAME = "vmName";
81 private static final String RX_BYTE = "rxByte";
82 private static final String TX_BYTE = "txByte";
83 private static final String RX_PKTS = "rxPkts";
84 private static final String TX_PKTS = "txPkts";
85
86
87 private static final String DST_PORT = "dstPort";
88 private static final String PROTOCOL = "protocol";
89
90
91 private static final long INITIAL_DELAY = 10L;
92 private static final long REFRESH_INTERVAL = 10L;
93 private static final TimeUnit TIME_UNIT_SECOND = TimeUnit.SECONDS;
94 private static final boolean RECOVER_FROM_FAILURE = true;
95
96 private static final String[] FIP_LABEL_TAGS = {FIP_ID, FIP_ADDRESS, MER_NAME, VM_NAME, NETWORK_NAME};
97 private static final String[] SNAT_LABEL_TAGS = {ROUTER_NAME, ROUTER_SNAT_IP, MER_NAME};
98
99 private Server prometheusExporter;
100 private StatsCollector collector;
101 private ScheduledFuture result;
102
103 private static Gauge byteFIPTx = Gauge.build()
104 .name("fip_tx_byte")
105 .help("fip_tx_byte")
106 .labelNames(FIP_LABEL_TAGS)
107 .register();
108 private static Gauge pktFIPTx = Gauge.build()
109 .name("fip_tx_pkts")
110 .help("fip_tx_pkts")
111 .labelNames(FIP_LABEL_TAGS)
112 .register();
113 private static Gauge byteFIPRx = Gauge.build()
114 .name("fip_rx_byte")
115 .help("fip_rx_byte")
116 .labelNames(FIP_LABEL_TAGS)
117 .register();
118 private static Gauge pktFIPRx = Gauge.build()
119 .name("fip_rx_pkts")
120 .help("fip_rx_pkts")
121 .labelNames(FIP_LABEL_TAGS)
122 .register();
123
124 private static Gauge byteSNATTx = Gauge.build()
125 .name("snat_tx_byte")
126 .help("snat_rx_pkts")
127 .labelNames(SNAT_LABEL_TAGS)
128 .register();
129 private static Gauge pktSNATTx = Gauge.build()
130 .name("snat_tx_pkts")
131 .help("snat_tx_pkts")
132 .labelNames(SNAT_LABEL_TAGS)
133 .register();
134
135 private static Gauge byteSNATRx = Gauge.build()
136 .name("snat_rx_byte")
137 .help("snat_rx_byte")
138 .labelNames(SNAT_LABEL_TAGS)
139 .register();
140 private static Gauge pktSNATRx = Gauge.build()
141 .name("snat_rx_pkts")
142 .help("snat_rx_pkts")
143 .labelNames(SNAT_LABEL_TAGS)
144 .register();
145
146 @Reference(cardinality = ReferenceCardinality.MANDATORY)
147 protected CoreService coreService;
148
149 @Reference(cardinality = ReferenceCardinality.MANDATORY)
150 protected FlowRuleService flowRuleService;
151
152 @Reference(cardinality = ReferenceCardinality.MANDATORY)
153 protected DeviceService deviceService;
154
155 @Reference(cardinality = ReferenceCardinality.MANDATORY)
156 protected KubevirtRouterService routerService;
157
158 @Reference(cardinality = ReferenceCardinality.MANDATORY)
159 protected KubevirtNodeService nodeService;
160
161 @Reference(cardinality = ReferenceCardinality.MANDATORY)
162 protected KubevirtPortService portService;
163
164 @Activate
165 protected void activate() {
166 startPrometheusExporter();
167 log.info("Started");
168 }
169
170 @Deactivate
171 protected void deactivate() {
172 stopPrometheusExporter();
173 log.info("Stopped");
174 }
175
176 @Override
177 public void startPrometheusExporter() {
178 try {
179 prometheusExporter = new Server(9300);
180 ServletContextHandler context = new ServletContextHandler();
181 context.setContextPath("/");
182 prometheusExporter.setHandler(context);
183 context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
184
185 prometheusExporter.start();
186
187 collector = new StatsCollector();
188
189 result = SharedScheduledExecutors.getSingleThreadExecutor()
190 .scheduleAtFixedRate(collector, INITIAL_DELAY,
191 REFRESH_INTERVAL, TIME_UNIT_SECOND, RECOVER_FROM_FAILURE);
192
193 log.info("Prometheus server start");
194 } catch (Exception ex) {
195 log.warn("Failed to start prometheus server due to {}", ex);
196 }
197 }
198
199 @Override
200 public void stopPrometheusExporter() {
201 try {
202 if (prometheusExporter != null) {
203 prometheusExporter.stop();
204 }
205 } catch (Exception e) {
206 log.warn("Failed to stop prometheus server due to {}", e);
207 }
208
209 result.cancel(true);
210 log.info("Prometheus exporter has stopped");
211 }
212
213 private void publish() {
214 publishFipMetrics();
215 publishSnatMetrics();
216 }
217
218 private void publishFipMetrics() {
219 if (prometheusExporter == null) {
220 log.error("Prometheus Server isn't ready.");
221 return;
222 }
223
Daniel Parkc1ba1f42022-05-16 17:30:24 +0900224 nodeService.completeNodes(GATEWAY).forEach(node -> {
225 flowRuleService.getFlowEntries(node.intgBridge()).forEach(flowEntry -> {
226
227
228 if (((IndexTableId) flowEntry.table()).id() == GW_ENTRY_TABLE &&
229 flowEntry.priority() == PRIORITY_FLOATING_IP_UPSTREAM_RULE) {
230
231 KubevirtFloatingIp floatingIp = floatingIpByUpstreamFlowEntry(flowEntry);
232 if (floatingIp == null || floatingIp.vmName() == null) {
233 return;
234 }
235
Daniel Park766a0402022-07-05 18:31:50 +0900236 String[] fipLabelValues = new String[5];
237
Daniel Parkc1ba1f42022-05-16 17:30:24 +0900238 fipLabelValues[0] = floatingIp.id();
239 fipLabelValues[1] = floatingIp.floatingIp().toString();
240 fipLabelValues[2] = node.hostname();
241 fipLabelValues[3] = floatingIp.vmName();
242 fipLabelValues[4] = floatingIp.networkName();
243
244 pktFIPTx.labels(fipLabelValues).set(flowEntry.packets());
245 byteFIPTx.labels(fipLabelValues).set(flowEntry.bytes());
246
247 } else if (((IndexTableId) flowEntry.table()).id() == GW_ENTRY_TABLE &&
248 flowEntry.priority() == PRIORITY_FLOATING_IP_DOWNSTREAM_RULE) {
249 KubevirtFloatingIp floatingIp = floatingIpByDownstreamFlowEntry(flowEntry);
250 if (floatingIp == null || floatingIp.vmName() == null) {
251 return;
252 }
253
Daniel Park766a0402022-07-05 18:31:50 +0900254 String[] fipLabelValues = new String[5];
255
Daniel Parkc1ba1f42022-05-16 17:30:24 +0900256 fipLabelValues[0] = floatingIp.id();
257 fipLabelValues[1] = floatingIp.floatingIp().toString();
258 fipLabelValues[2] = node.hostname();
259 fipLabelValues[3] = floatingIp.vmName();
260 fipLabelValues[4] = floatingIp.networkName();
261
262 pktFIPRx.labels(fipLabelValues).set(flowEntry.packets());
263 byteFIPRx.labels(fipLabelValues).set(flowEntry.bytes());
264 }
265 });
266 });
267 }
268
269
270 private void publishSnatMetrics() {
271 if (prometheusExporter == null) {
272 log.error("Prometheus Server isn't ready.");
273 return;
274 }
275
Daniel Parkc1ba1f42022-05-16 17:30:24 +0900276 routerService.routers().stream().filter(router -> router.enableSnat() &&
277 router.electedGateway() != null &&
278 router.peerRouter() != null &&
279 router.peerRouter().ipAddress() != null &&
280 router.peerRouter().macAddress() != null)
281 .forEach(router -> {
282 KubevirtNode gateway = nodeService.node(router.electedGateway());
283 if (gateway == null) {
284 return;
285 }
286
287 String routerSnatIp = router.external().keySet().stream().findAny().orElse(null);
288 if (routerSnatIp == null) {
289 return;
290 }
291
Daniel Park766a0402022-07-05 18:31:50 +0900292
293
Daniel Parkc1ba1f42022-05-16 17:30:24 +0900294 flowRuleService.getFlowEntries(gateway.intgBridge()).forEach(flowEntry -> {
295 if (((IndexTableId) flowEntry.table()).id() == GW_ENTRY_TABLE &&
296 flowEntry.priority() == PRIORITY_STATEFUL_SNAT_RULE) {
Daniel Park766a0402022-07-05 18:31:50 +0900297
Daniel Parkc1ba1f42022-05-16 17:30:24 +0900298 if (isSnatUpstreamFlorEntryForRouter(router, flowEntry)) {
Daniel Park766a0402022-07-05 18:31:50 +0900299 String[] snatLabelValues = new String[3];
300 snatLabelValues[0] = router.name();
301 snatLabelValues[1] = routerSnatIp;
302 snatLabelValues[2] = gateway.hostname();
Daniel Parkc1ba1f42022-05-16 17:30:24 +0900303 pktSNATTx.labels(snatLabelValues).set(flowEntry.packets());
304 byteSNATTx.labels(snatLabelValues).set(flowEntry.bytes());
305
306 } else if (isSnatDownstreamFlorEntryForRouter(routerSnatIp, flowEntry)) {
Daniel Park766a0402022-07-05 18:31:50 +0900307 String[] snatLabelValues = new String[3];
308 snatLabelValues[0] = router.name();
309 snatLabelValues[1] = routerSnatIp;
310 snatLabelValues[2] = gateway.hostname();
Daniel Parkc1ba1f42022-05-16 17:30:24 +0900311 pktSNATRx.labels(snatLabelValues).set(flowEntry.packets());
312 byteSNATRx.labels(snatLabelValues).set(flowEntry.bytes());
313 }
314 }
315 });
316 });
317 }
318
319 private boolean isSnatUpstreamFlorEntryForRouter(KubevirtRouter router, FlowEntry flowEntry) {
320 TrafficSelector selector = flowEntry.selector();
321
322 EthCriterion ethCriterion = (EthCriterion) selector.getCriterion(ETH_DST);
323 if (ethCriterion == null) {
324 return false;
325 }
326 MacAddress macAddress = ethCriterion.mac();
327
328 if (router.mac().equals(macAddress)) {
329 return true;
330 }
331
332 return false;
333 }
334
335 private boolean isSnatDownstreamFlorEntryForRouter(String routerSnatIp, FlowEntry flowEntry) {
336 TrafficSelector selector = flowEntry.selector();
337
338 IPCriterion ipCriterion = (IPCriterion) selector.getCriterion(IPV4_DST);
339 if (ipCriterion == null) {
340 return false;
341 }
342
343 IpAddress dstIp = ipCriterion.ip().address();
344
345 if (dstIp.toString().equals(routerSnatIp)) {
346 return true;
347 }
348 return false;
349 }
350
351 private KubevirtFloatingIp floatingIpByUpstreamFlowEntry(FlowEntry flowEntry) {
352 TrafficSelector selector = flowEntry.selector();
353
354 EthCriterion ethCriterion = (EthCriterion) selector.getCriterion(ETH_SRC);
355
356 if (ethCriterion == null) {
357 return null;
358 }
359 MacAddress macAddress = ethCriterion.mac();
360
361 KubevirtPort port = portService.port(macAddress);
362
363 if (port == null) {
364 return null;
365 }
366
367 return routerService.floatingIps()
368 .stream()
369 .filter(ip -> ip.vmName() != null && ip.vmName().equals(port.vmName()))
370 .findAny().orElse(null);
371 }
372
373 private KubevirtFloatingIp floatingIpByDownstreamFlowEntry(FlowEntry flowEntry) {
374 TrafficSelector selector = flowEntry.selector();
375
376 IPCriterion ipCriterion = (IPCriterion) selector.getCriterion(IPV4_DST);
377 if (ipCriterion == null) {
378 return null;
379 }
380
381 IpAddress dstIp = ipCriterion.ip().address();
382
383 return routerService.floatingIps()
384 .stream()
385 .filter(ip -> ip.floatingIp().equals(dstIp))
386 .findAny().orElse(null);
387 }
388
389 private class StatsCollector implements Runnable {
390 @Override
391 public void run() {
392 publish();
393 }
394 }
395}