Support to publish port TX and RX stats in openstacktelemetry
Change-Id: I368fb676e4817cd01e5782a3b37170e2b9a5c6bd
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java
index 7fed73a..dcf682d 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/StatsFlowRuleAdminService.java
@@ -44,7 +44,7 @@
*
* @return a set of flow infos
*/
- Set<FlowInfo> getFlowInfo();
+ Set<FlowInfo> getFlowInfos();
/**
* Deletes stat flow rule.
diff --git a/apps/openstacktelemetry/app/BUCK b/apps/openstacktelemetry/app/BUCK
index 97bc0f9..4cc3d98 100644
--- a/apps/openstacktelemetry/app/BUCK
+++ b/apps/openstacktelemetry/app/BUCK
@@ -10,6 +10,7 @@
'//lib:jersey-client',
'//cli:onos-cli',
'//lib:org.apache.karaf.shell.console',
+ '//apps/openstacknode/api:onos-apps-openstacknode-api',
'//apps/openstacknetworking/api:onos-apps-openstacknetworking-api',
'//apps/openstacktelemetry/api:onos-apps-openstacktelemetry-api',
'//lib:kafka-clients',
diff --git a/apps/openstacktelemetry/app/pom.xml b/apps/openstacktelemetry/app/pom.xml
index 5b0f8bf..920958e 100644
--- a/apps/openstacktelemetry/app/pom.xml
+++ b/apps/openstacktelemetry/app/pom.xml
@@ -87,6 +87,12 @@
<dependency>
<groupId>org.onosproject</groupId>
+ <artifactId>onos-apps-openstacknode-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
index 228bc2b..2ced23b 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
@@ -25,6 +25,9 @@
*/
public class TinaMessageByteBufferCodec {
+ public static final String KAFKA_TOPIC = "sona.flow";
+ public static final String KAFKA_KEY = "flowdata";
+
private static final int HEADER_SIZE = 8;
private static final int ENTRY_SIZE = 88;
private static final int MILLISECONDS = 1000;
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index d1de54a..56a0e81 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -124,7 +124,7 @@
public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
if (producer == null) {
- log.warn("Kafka telemetry service has not been enabled!");
+ log.debug("Kafka telemetry service has not been enabled!");
return null;
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index da492a9..f3f386f 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -36,6 +36,9 @@
import java.util.List;
import java.util.Set;
+import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_KEY;
+import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_TOPIC;
+
/**
* Openstack telemetry manager.
*/
@@ -45,9 +48,6 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private static final String KAFKA_TOPIC = "sona.flow";
- private static final String KAFKA_KEY = "flowdata";
-
private List<TelemetryService> telemetryServices = Lists.newArrayList();
@Activate
@@ -89,6 +89,8 @@
if (service instanceof RestTelemetryManager) {
invokeRestPublisher((RestTelemetryService) service, flowInfos);
}
+
+ log.trace("Publishing Flow Infos {}", flowInfos);
});
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index 9025224..a7ead8e 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -16,7 +16,6 @@
package org.onosproject.openstacktelemetry.impl;
import com.google.common.collect.Sets;
-import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -29,12 +28,17 @@
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.SharedScheduledExecutorService;
+import org.onlab.util.SharedScheduledExecutors;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.device.PortStatistics;
import org.onosproject.net.flow.DefaultFlowRule;
import org.onosproject.net.flow.DefaultTrafficSelector;
import org.onosproject.net.flow.DefaultTrafficTreatment;
@@ -51,7 +55,11 @@
import org.onosproject.net.flow.criteria.TcpPortCriterion;
import org.onosproject.net.flow.criteria.UdpPortCriterion;
import org.onosproject.net.host.HostService;
+import org.onosproject.openstacknetworking.api.InstancePort;
+import org.onosproject.openstacknetworking.api.InstancePortService;
import org.onosproject.openstacknetworking.api.OpenstackNetworkService;
+import org.onosproject.openstacknode.api.OpenstackNode;
+import org.onosproject.openstacknode.api.OpenstackNodeService;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.StatsFlowRule;
@@ -62,10 +70,12 @@
import org.slf4j.LoggerFactory;
import java.util.Dictionary;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.onlab.packet.Ethernet.TYPE_IPV4;
import static org.onlab.packet.IPv4.PROTOCOL_TCP;
@@ -83,6 +93,7 @@
import static org.onosproject.openstacknetworking.api.Constants.VTAP_FLAT_OUTBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
+import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
import static org.onosproject.openstacktelemetry.api.Constants.FLAT;
import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
import static org.onosproject.openstacktelemetry.api.Constants.VLAN;
@@ -101,16 +112,28 @@
private static final byte FLOW_TYPE_SONA = 1; // VLAN
private static final long MILLISECONDS = 1000L;
+ private static final long INITIAL_DELAY = 5L;
private static final long REFRESH_INTERVAL = 5L;
+ private static final TimeUnit TIME_UNIT_SECOND = TimeUnit.SECONDS;
private static final String REVERSE_PATH_STATS = "reversePathStats";
private static final String EGRESS_STATS = "egressStats";
+ private static final String PORT_STATS = "portStats";
private static final boolean DEFAULT_REVERSE_PATH_STATS = false;
private static final boolean DEFAULT_EGRESS_STATS = false;
+ private static final boolean DEFAULT_PORT_STATS = true;
private static final String MAC_NOT_NULL = "MAC should not be null";
+ private static final String ARBITRARY_IP = "0.0.0.0/32";
+ private static final int ARBITRARY_LENGTH = 32;
+ private static final String ARBITRARY_MAC = "00:00:00:00:00:00";
+ private static final int ARBITRARY_IN_INTF = 0;
+ private static final int ARBITRARY_OUT_INTF = 0;
+
+ private static final boolean RECOVER_FROM_FAILURE = true;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@@ -121,6 +144,9 @@
protected HostService hostService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ComponentConfigService componentConfigService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -130,6 +156,12 @@
protected OpenstackNetworkService osNetworkService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected InstancePortService instPortService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OpenstackNodeService osNodeService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackTelemetryService telemetryService;
@Property(name = REVERSE_PATH_STATS, boolValue = DEFAULT_REVERSE_PATH_STATS,
@@ -142,12 +174,16 @@
"collecting the flow-based stats for egress port.")
private boolean egressStats = DEFAULT_EGRESS_STATS;
+ @Property(name = PORT_STATS, boolValue = DEFAULT_PORT_STATS,
+ label = "A flag which indicates whether to collect port TX & RX stats.")
+ private boolean portStats = DEFAULT_PORT_STATS;
+
private ApplicationId appId;
- private Timer timer;
- private TimerTask task;
+ private TelemetryCollector collector;
+ private SharedScheduledExecutorService executor;
+ private ScheduledFuture result;
private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
- private int loopCount = 0;
private static final int SOURCE_ID = 1;
private static final int TARGET_ID = 2;
@@ -157,15 +193,12 @@
private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
- public StatsFlowRuleManager() {
- this.timer = new Timer("openstack-telemetry-sender");
- }
-
@Activate
protected void activate() {
appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
componentConfigService.registerProperties(getClass());
+ executor = SharedScheduledExecutors.getSingleThreadExecutor();
this.start();
@@ -177,6 +210,8 @@
componentConfigService.unregisterProperties(getClass(), false);
+ flowRuleService.removeFlowRulesById(appId);
+
log.info("Stopped");
}
@@ -190,16 +225,17 @@
@Override
public void start() {
log.info("Start publishing thread");
- task = new InternalTimerTask();
- timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
- MILLISECONDS * REFRESH_INTERVAL);
+ collector = new TelemetryCollector();
+
+ result = executor.scheduleAtFixedRate(collector, INITIAL_DELAY,
+ REFRESH_INTERVAL, TIME_UNIT_SECOND, RECOVER_FROM_FAILURE);
}
@Override
public void stop() {
log.info("Stop data publishing thread");
- task.cancel();
- task = null;
+ result.cancel(true);
+ collector = null;
}
@Override
@@ -210,12 +246,221 @@
@Override
public void deleteStatFlowRule(StatsFlowRule statsFlowRule) {
- // FIXME: following code might not be necessary
- flowRuleService.removeFlowRulesById(appId);
setStatFlowRule(statsFlowRule, false);
}
+ /**
+ * Gets a set of the flow infos.
+ *
+ * @return a set of flow infos
+ */
+ @Override
+ public Set<FlowInfo> getFlowInfos() {
+ Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
+
+ // obtain all flow rule entries installed by telemetry app
+ for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
+ FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
+ TrafficSelector selector = entry.selector();
+
+ IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
+ IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
+ IPProtocolCriterion ipProtocol =
+ (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
+
+ log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
+ ((IndexTableId) entry.table()).id(),
+ srcIp.ip().toString(),
+ dstIp.ip().toString(),
+ entry.packets(),
+ entry.bytes());
+
+ fBuilder.withFlowType(FLOW_TYPE_SONA)
+ .withSrcIp(srcIp.ip())
+ .withDstIp(dstIp.ip());
+
+ if (ipProtocol != null) {
+ fBuilder.withProtocol((byte) ipProtocol.protocol());
+
+ if (ipProtocol.protocol() == PROTOCOL_TCP) {
+ TcpPortCriterion tcpSrc =
+ (TcpPortCriterion) selector.getCriterion(TCP_SRC);
+ TcpPortCriterion tcpDst =
+ (TcpPortCriterion) selector.getCriterion(TCP_DST);
+
+ log.debug("TCP SRC Port: {}, DST Port: {}",
+ tcpSrc.tcpPort().toInt(),
+ tcpDst.tcpPort().toInt());
+
+ fBuilder.withSrcPort(tcpSrc.tcpPort());
+ fBuilder.withDstPort(tcpDst.tcpPort());
+
+ } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
+
+ UdpPortCriterion udpSrc =
+ (UdpPortCriterion) selector.getCriterion(UDP_SRC);
+ UdpPortCriterion udpDst =
+ (UdpPortCriterion) selector.getCriterion(UDP_DST);
+
+ log.debug("UDP SRC Port: {}, DST Port: {}",
+ udpSrc.udpPort().toInt(),
+ udpDst.udpPort().toInt());
+
+ fBuilder.withSrcPort(udpSrc.udpPort());
+ fBuilder.withDstPort(udpDst.udpPort());
+ } else {
+ log.debug("Other protocol: {}", ipProtocol.protocol());
+ }
+ }
+
+ fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
+ .withDstMac(getMacAddress(dstIp.ip().address()))
+ .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
+ .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
+ .withVlanId(getVlanId(srcIp.ip().address()))
+ .withDeviceId(entry.deviceId());
+
+ StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
+
+ // TODO: need to collect error and drop packets stats
+ // TODO: need to make the refresh interval configurable
+ sBuilder.withStartupTime(System.currentTimeMillis())
+ .withFstPktArrTime(System.currentTimeMillis())
+ .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
+ .withCurrAccPkts((int) entry.packets())
+ .withCurrAccBytes(entry.bytes())
+ .withErrorPkts((short) 0)
+ .withDropPkts((short) 0);
+
+ fBuilder.withStatsInfo(sBuilder.build());
+
+ FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
+
+ flowInfos.add(flowInfo);
+
+ log.debug("FlowInfo: \n{}", flowInfo.toString());
+ }
+
+ return flowInfos;
+ }
+
+ /**
+ * Gets a set of flow infos by referring to destination VM port.
+ *
+ * @return flow infos
+ */
+ private Set<FlowInfo> getDstPortBasedFlowInfos() {
+ Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
+ Set<PortNumber> instPortNums = instPortService.instancePorts()
+ .stream()
+ .map(InstancePort::portNumber)
+ .collect(Collectors.toSet());
+ Set<DeviceId> deviceIds = osNodeService.completeNodes(COMPUTE)
+ .stream()
+ .map(OpenstackNode::intgBridge)
+ .collect(Collectors.toSet());
+
+ deviceIds.forEach(d -> {
+ List<PortStatistics> stats =
+ deviceService.getPortStatistics(d)
+ .stream()
+ .filter(s -> instPortNums.contains(s.portNumber()))
+ .collect(Collectors.toList());
+
+ stats.forEach(s -> {
+ InstancePort instPort = getInstancePort(d, s.portNumber());
+ flowInfos.add(buildTxPortInfo(instPort, s));
+ flowInfos.add(buildRxPortInfo(instPort, s));
+ });
+ });
+
+ return flowInfos;
+ }
+
+ /**
+ * Obtains the flow info generated by TX port.
+ *
+ * @param instPort instance port
+ * @param stat port statistics
+ * @return flow info
+ */
+ private FlowInfo buildTxPortInfo(InstancePort instPort, PortStatistics stat) {
+ FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
+
+ fBuilder.withFlowType(FLOW_TYPE_SONA)
+ .withSrcIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
+ .withDstIp(IpPrefix.valueOf(ARBITRARY_IP))
+ .withSrcMac(instPort.macAddress())
+ .withDstMac(MacAddress.valueOf(ARBITRARY_MAC))
+ .withDeviceId(instPort.deviceId())
+ .withInputInterfaceId(ARBITRARY_IN_INTF)
+ .withOutputInterfaceId(ARBITRARY_OUT_INTF)
+ .withVlanId(VlanId.vlanId());
+
+ StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
+ sBuilder.withStartupTime(System.currentTimeMillis())
+ .withFstPktArrTime(System.currentTimeMillis())
+ .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
+ .withCurrAccPkts((int) stat.packetsSent())
+ .withCurrAccBytes(stat.bytesSent())
+ .withErrorPkts((short) stat.packetsTxErrors())
+ .withDropPkts((short) stat.packetsTxDropped());
+
+ fBuilder.withStatsInfo(sBuilder.build());
+
+ return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
+ }
+
+ /**
+ * Obtains the flow info generated by RX port.
+ *
+ * @param instPort instance port
+ * @param stat port statistics
+ * @return flow info
+ */
+ private FlowInfo buildRxPortInfo(InstancePort instPort, PortStatistics stat) {
+ FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
+
+ fBuilder.withFlowType(FLOW_TYPE_SONA)
+ .withSrcIp(IpPrefix.valueOf(ARBITRARY_IP))
+ .withDstIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
+ .withSrcMac(MacAddress.valueOf(ARBITRARY_MAC))
+ .withDstMac(instPort.macAddress())
+ .withDeviceId(instPort.deviceId())
+ .withInputInterfaceId(ARBITRARY_IN_INTF)
+ .withOutputInterfaceId(ARBITRARY_OUT_INTF)
+ .withVlanId(VlanId.vlanId());
+
+ StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
+ sBuilder.withStartupTime(System.currentTimeMillis())
+ .withFstPktArrTime(System.currentTimeMillis())
+ .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
+ .withCurrAccPkts((int) stat.packetsReceived())
+ .withCurrAccBytes(stat.bytesReceived())
+ .withErrorPkts((short) stat.packetsRxErrors())
+ .withDropPkts((short) stat.packetsRxDropped());
+
+ fBuilder.withStatsInfo(sBuilder.build());
+
+ return mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
+ }
+
+ /**
+ * Obtains instance port which associated with the given device identifier
+ * and port number.
+ *
+ * @param deviceId device identifier
+ * @param portNumber port number
+ * @return instance port
+ */
+ private InstancePort getInstancePort(DeviceId deviceId, PortNumber portNumber) {
+ return instPortService.instancePorts().stream()
+ .filter(p -> p.deviceId().equals(deviceId))
+ .filter(p -> p.portNumber().equals(portNumber))
+ .findFirst().orElse(null);
+ }
+
private void connectTables(DeviceId deviceId, int fromTable, int toTable,
StatsFlowRule statsFlowRule, int rulePriority,
boolean install) {
@@ -278,111 +523,19 @@
flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
@Override
public void onSuccess(FlowRuleOperations ops) {
- log.debug("Provisioned vni or forwarding table: \n {}", ops.toString());
+ log.debug("Install rules for telemetry stats: \n {}",
+ ops.toString());
}
@Override
public void onError(FlowRuleOperations ops) {
- log.debug("Failed to provision vni or forwarding table: \n {}", ops.toString());
+ log.debug("Failed to install rules for telemetry stats: \n {}",
+ ops.toString());
}
}));
}
/**
- * Gets a set of the flow infos.
- *
- * @return a set of flow infos
- */
- public Set<FlowInfo> getFlowInfo() {
- Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
-
- // obtain all flow rule entries installed by telemetry app
- for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
- FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
- TrafficSelector selector = entry.selector();
-
- IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
- IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
- IPProtocolCriterion ipProtocol =
- (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
-
- log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
- ((IndexTableId) entry.table()).id(),
- srcIp.ip().toString(),
- dstIp.ip().toString(),
- entry.packets(),
- entry.bytes());
-
- fBuilder.withFlowType(FLOW_TYPE_SONA)
- .withSrcIp(srcIp.ip())
- .withDstIp(dstIp.ip());
-
- if (ipProtocol != null) {
- fBuilder.withProtocol((byte) ipProtocol.protocol());
-
- if (ipProtocol.protocol() == PROTOCOL_TCP) {
- TcpPortCriterion tcpSrc =
- (TcpPortCriterion) selector.getCriterion(TCP_SRC);
- TcpPortCriterion tcpDst =
- (TcpPortCriterion) selector.getCriterion(TCP_DST);
-
- log.debug("TCP SRC Port: {}, DST Port: {}",
- tcpSrc.tcpPort().toInt(),
- tcpDst.tcpPort().toInt());
-
- fBuilder.withSrcPort(tcpSrc.tcpPort());
- fBuilder.withDstPort(tcpDst.tcpPort());
-
- } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
-
- UdpPortCriterion udpSrc =
- (UdpPortCriterion) selector.getCriterion(UDP_SRC);
- UdpPortCriterion udpDst =
- (UdpPortCriterion) selector.getCriterion(UDP_DST);
-
- log.debug("UDP SRC Port: {}, DST Port: {}",
- udpSrc.udpPort().toInt(),
- udpDst.udpPort().toInt());
-
- fBuilder.withSrcPort(udpSrc.udpPort());
- fBuilder.withDstPort(udpDst.udpPort());
- } else {
- log.debug("Other protocol: {}", ipProtocol.protocol());
- }
- }
-
- fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
- .withDstMac(getMacAddress(dstIp.ip().address()))
- .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
- .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
- .withVlanId(getVlanId(srcIp.ip().address()))
- .withDeviceId(entry.deviceId());
-
- StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
-
- // TODO: need to collect error and drop packets stats
- // TODO: need to make the refresh interval configurable
- sBuilder.withStartupTime(System.currentTimeMillis())
- .withFstPktArrTime(System.currentTimeMillis())
- .withLstPktOffset((int) (REFRESH_INTERVAL * MILLISECONDS))
- .withCurrAccPkts((int) entry.packets())
- .withCurrAccBytes(entry.bytes())
- .withErrorPkts((short) 0)
- .withDropPkts((short) 0);
-
- fBuilder.withStatsInfo(sBuilder.build());
-
- FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
-
- flowInfos.add(flowInfo);
-
- log.debug("FlowInfo: \n{}", flowInfo.toString());
- }
-
- return flowInfos;
- }
-
- /**
* Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
*
* @param flowInfo current FlowInfo object
@@ -515,9 +668,7 @@
Optional<Host> host = hostService.getHostsByIp(ipAddress).stream().findAny();
return host.map(host1 -> host1.location().deviceId()).orElse(null);
} else {
- log.warn("Failed to get DeviceID which is connected to {}. " +
- "The destination is either a bare-metal or located out of DC",
- ipAddress.toString());
+ log.debug("No DeviceID is associated to {}", ipAddress.toString());
return null;
}
}
@@ -593,29 +744,54 @@
egressStats = egressStatsConfigured;
log.info("Configured. Egress stats flag is {}", egressStats);
}
+
+ Boolean portStatsConfigured = getBooleanProperty(properties, PORT_STATS);
+ if (portStatsConfigured == null) {
+ portStats = DEFAULT_PORT_STATS;
+ log.info("Port stats flag is NOT " +
+ "configured, default value is {}", portStats);
+ } else {
+ portStats = portStatsConfigured;
+ log.info("Configured. Port stats flag is {}", portStats);
+ }
}
- private class InternalTimerTask extends TimerTask {
+ private class TelemetryCollector implements Runnable {
@Override
public void run() {
- log.debug("Timer task thread starts ({})", loopCount++);
-
Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
// we only let the master controller of the device where the
- // stats flow rules are installed send kafka message
- getFlowInfo().forEach(f -> {
- DeviceId deviceId = getDeviceId(f.srcIp().address());
- if (mastershipService.isLocalMaster(deviceId)) {
+ // stats flow rules are installed send stats message
+ getFlowInfos().forEach(f -> {
+ if (checkSrcDstLocalMaster(f)) {
filteredFlowInfos.add(f);
}
});
- try {
- telemetryService.publish(filteredFlowInfos);
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+ // we only let the master controller of the device where the port
+ // is located to send stats message
+ if (portStats) {
+ getDstPortBasedFlowInfos().forEach(f -> {
+ if (checkSrcDstLocalMaster(f)) {
+ filteredFlowInfos.add(f);
+ }
+ });
}
+
+ telemetryService.publish(filteredFlowInfos);
+ }
+
+ private boolean checkSrcDstLocalMaster(FlowInfo info) {
+ DeviceId srcDeviceId = getDeviceId(info.srcIp().address());
+ DeviceId dstDeviceId = getDeviceId(info.dstIp().address());
+
+ boolean isSrcLocalMaster = srcDeviceId != null &&
+ mastershipService.isLocalMaster(srcDeviceId);
+ boolean isDstLocalMaster = dstDeviceId != null &&
+ mastershipService.isLocalMaster(dstDeviceId);
+
+ return isSrcLocalMaster || isDstLocalMaster;
}
}
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
index 2efab8b..419b789 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
@@ -30,6 +30,7 @@
import org.slf4j.LoggerFactory;
import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
@@ -98,6 +99,9 @@
* @param input openstack flow rule JSON input stream
* @return 200 OK if processing is correct.
*/
+ @DELETE
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
public Response deleteBulkFlowRule(InputStream input) {
log.info("DELETE BULK FLOW RULE: {}", input.toString());
@@ -132,7 +136,7 @@
log.info("GET BULK FLOW RULE");
Set<FlowInfo> flowInfoSet;
- flowInfoSet = statsFlowRuleService.getFlowInfo();
+ flowInfoSet = statsFlowRuleService.getFlowInfos();
log.info("\n\n======================================================\n" +
"FlowInfo Set: \n{}" +
@@ -151,11 +155,10 @@
}
@GET
- @Path("list/{src_ip_prefix}/{dst_ip_prefix}")
+ @Path("list/{srcIpPrefix}/{dstIpPrefix}")
@Produces(MediaType.APPLICATION_JSON)
- public Response getFlowRule(
- @PathParam("src_ip_prefix") String srcIpPrefix,
- @PathParam("dst_ip_prefix") String dstIpPrefix) {
+ public Response getFlowRule(@PathParam("srcIpPrefix") String srcIpPrefix,
+ @PathParam("dstIpPrefix") String dstIpPrefix) {
return ok(root).build();
}