blob: 7111fc0dbd7b78f580f63a43b80d71fa11dbfe01 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.onosproject.openstacktelemetry.impl;
import com.google.common.collect.Sets;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.exporter.MetricsServlet;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.onlab.packet.TpPort;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.PrometheusTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.PrometheusTelemetryConfig;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Set;
import static org.onosproject.openstacktelemetry.api.Constants.PROMETHEUS_SCHEME;
import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
import static org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig.fromTelemetryConfig;
/**
* Prometheus telemetry manager.
*/
@Component(immediate = true, service = PrometheusTelemetryAdminService.class)
public class PrometheusTelemetryManager implements PrometheusTelemetryAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
private Set<Server> prometheusExporters = Sets.newConcurrentHashSet();
private static final String FLOW_TYPE = "flowType";
private static final String DEVICE_ID = "deviceId";
private static final String INPUT_INTERFACE_ID = "inputInterfaceId";
private static final String OUTPUT_INTERFACE_ID = "outputInterfaceId";
private static final String VLAN_ID = "vlanId";
private static final String VXLAN_ID = "vxlanId";
private static final String SRC_IP = "srcIp";
private static final String DST_IP = "dstIp";
private static final String SRC_PORT = "srcPort";
private static final String DST_PORT = "dstPort";
private static final String PROTOCOL = "protocol";
private static final String[] LABEL_TAGS = {
FLOW_TYPE, DEVICE_ID, INPUT_INTERFACE_ID, OUTPUT_INTERFACE_ID,
VLAN_ID, VXLAN_ID, SRC_IP, DST_IP, SRC_PORT, DST_PORT, PROTOCOL
};
private static final String STAT_NAME_VM2VM_BYTE = "vm2vm_byte";
private static final String STAT_NAME_VM2VM_BYTE_PREV = "vm2vm_byte_prev";
private static final String STAT_NAME_VM2VM_BYTE_CURR = "vm2vm_byte_curr";
private static final String STAT_NAME_VM2VM_PKT = "vm2vm_pkt";
private static final String STAT_NAME_VM2VM_PKT_PREV = "vm2vm_pkt_prev";
private static final String STAT_NAME_VM2VM_PKT_CURR = "vm2vm_pkt_curr";
private static final String STAT_NAME_ERROR_PKT = "error_pkt";
private static final String STAT_NAME_DROP_PKT = "drop_pkt";
private static final String HELP_MSG_VM2VM_BYTE =
"SONA flow bytes statistics for VM to VM";
private static final String HELP_MSG_VM2VM_BYTE_PREV =
HELP_MSG_VM2VM_BYTE + " [Accumulated previous byte]";
private static final String HELP_MSG_VM2VM_BYTE_CURR =
HELP_MSG_VM2VM_BYTE + " [Accumulated current byte]";
private static final String HELP_MSG_VM2VM_PKT =
"SONA flow packets statistics for VM to VM";
private static final String HELP_MSG_VM2VM_PKT_PREV =
HELP_MSG_VM2VM_PKT + " [Accumulated previous pkt]";
private static final String HELP_MSG_VM2VM_PKT_CURR =
HELP_MSG_VM2VM_PKT + " [Accumulated current pkt]";
private static final String HELP_MSG_ERROR = "SONA error statistics";
private static final String HELP_MSG_DROP = "SONA drop statistics";
private static Gauge byteVM2VM = Gauge.build()
.name(STAT_NAME_VM2VM_BYTE)
.help(HELP_MSG_VM2VM_BYTE)
.labelNames(LABEL_TAGS)
.register();
private static Gauge byteVM2VMPrev = Gauge.build()
.name(STAT_NAME_VM2VM_BYTE_PREV)
.help(HELP_MSG_VM2VM_BYTE_PREV)
.labelNames(LABEL_TAGS)
.register();
private static Gauge byteVM2VMCurr = Gauge.build()
.name(STAT_NAME_VM2VM_BYTE_CURR)
.help(HELP_MSG_VM2VM_BYTE_CURR)
.labelNames(LABEL_TAGS)
.register();
private static Gauge pktVM2VM = Gauge.build()
.name(STAT_NAME_VM2VM_PKT)
.help(HELP_MSG_VM2VM_PKT)
.labelNames(LABEL_TAGS)
.register();
private static Gauge pktVM2VMPrev = Gauge.build()
.name(STAT_NAME_VM2VM_PKT_PREV)
.help(HELP_MSG_VM2VM_PKT_PREV)
.labelNames(LABEL_TAGS)
.register();
private static Gauge pktVM2VMCurr = Gauge.build()
.name(STAT_NAME_VM2VM_PKT_CURR)
.help(HELP_MSG_VM2VM_PKT_CURR)
.labelNames(LABEL_TAGS)
.register();
private static Counter pktError = Counter.build()
.name(STAT_NAME_ERROR_PKT)
.help(HELP_MSG_ERROR)
.labelNames(LABEL_TAGS)
.register();
private static Counter pktDrop = Counter.build()
.name(STAT_NAME_DROP_PKT)
.help(HELP_MSG_DROP)
.labelNames(LABEL_TAGS)
.register();
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected TelemetryConfigService telemetryConfigService;
@Activate
protected void activate() {
openstackTelemetryService.addTelemetryService(this);
log.info("Started");
}
@Deactivate
protected void deactivate() {
stop();
openstackTelemetryService.removeTelemetryService(this);
log.info("Stopped");
}
@Override
public void start() {
log.info("Prometheus exporter starts.");
telemetryConfigService.getConfigsByType(PROMETHEUS).forEach(c -> {
PrometheusTelemetryConfig prometheusConfig = fromTelemetryConfig(c);
if (prometheusConfig != null &&
!c.name().equals(PROMETHEUS_SCHEME) && c.enabled()) {
try {
// TODO Offer a 'Authentication'
Server prometheusExporter = new Server(prometheusConfig.port());
ServletContextHandler context = new ServletContextHandler();
context.setContextPath("/");
prometheusExporter.setHandler(context);
context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
log.info("Prometheus server start");
prometheusExporter.start();
prometheusExporters.add(prometheusExporter);
} catch (Exception ex) {
log.warn("Exception: {}", ex);
}
}
});
}
@Override
public void stop() {
prometheusExporters.forEach(pe -> {
try {
pe.stop();
} catch (Exception e) {
log.warn("Failed to stop prometheus server due to {}", e);
}
});
log.info("Prometheus exporter has stopped");
}
@Override
public void restart() {
stop();
start();
}
@Override
public void publish(Set<FlowInfo> flowInfos) {
if (flowInfos.isEmpty()) {
log.debug("No FlowInfo record to publish");
return;
}
long flowByte;
int flowPkt;
String[] labelValues;
for (FlowInfo flowInfo: flowInfos) {
flowByte = flowInfo.statsInfo().currAccBytes() - flowInfo.statsInfo().prevAccBytes();
flowPkt = flowInfo.statsInfo().currAccPkts() - flowInfo.statsInfo().prevAccPkts();
labelValues = getLabelValues(flowInfo);
byteVM2VM.labels(labelValues).set(flowByte);
byteVM2VMPrev.labels(labelValues).set(flowInfo.statsInfo().prevAccBytes());
byteVM2VMCurr.labels(labelValues).set(flowInfo.statsInfo().currAccBytes());
pktVM2VM.labels(labelValues).set(flowPkt);
pktVM2VMPrev.labels(labelValues).set(flowInfo.statsInfo().prevAccPkts());
pktVM2VMCurr.labels(labelValues).set(flowInfo.statsInfo().currAccPkts());
pktError.labels(labelValues).inc(flowInfo.statsInfo().errorPkts());
pktDrop.labels(labelValues).inc(flowInfo.statsInfo().dropPkts());
}
}
private String[] getLabelValues(FlowInfo flowInfo) {
String[] labelValues = new String[LABEL_TAGS.length];
labelValues[Arrays.asList(LABEL_TAGS).indexOf(FLOW_TYPE)]
= String.valueOf(flowInfo.flowType());
labelValues[Arrays.asList(LABEL_TAGS).indexOf(DEVICE_ID)]
= flowInfo.deviceId().toString();
labelValues[Arrays.asList(LABEL_TAGS).indexOf(INPUT_INTERFACE_ID)]
= String.valueOf(flowInfo.inputInterfaceId());
labelValues[Arrays.asList(LABEL_TAGS).indexOf(OUTPUT_INTERFACE_ID)]
= String.valueOf(flowInfo.outputInterfaceId());
labelValues[Arrays.asList(LABEL_TAGS).indexOf(VXLAN_ID)]
= String.valueOf(flowInfo.vxlanId());
labelValues[Arrays.asList(LABEL_TAGS).indexOf(SRC_IP)]
= flowInfo.srcIp().toString();
labelValues[Arrays.asList(LABEL_TAGS).indexOf(DST_IP)]
= flowInfo.dstIp().toString();
labelValues[Arrays.asList(LABEL_TAGS).indexOf(SRC_PORT)]
= getTpPort(flowInfo.srcPort());
labelValues[Arrays.asList(LABEL_TAGS).indexOf(DST_PORT)]
= getTpPort(flowInfo.dstPort());
labelValues[Arrays.asList(LABEL_TAGS).indexOf(PROTOCOL)]
= String.valueOf(flowInfo.protocol());
if (flowInfo.vlanId() != null) {
labelValues[Arrays.asList(LABEL_TAGS).indexOf(VLAN_ID)]
= flowInfo.vlanId().toString();
}
return labelValues;
}
private String getTpPort(TpPort tpPort) {
if (tpPort == null) {
return "";
}
return tpPort.toString();
}
@Override
public boolean isRunning() {
return !prometheusExporters.isEmpty();
}
}