Add a message handler for Openstack Telemetry view
Change-Id: I2803ac6e8f3c90e005bc73c43a5b867934daa80f
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
index 6033483..2f67050 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
@@ -32,6 +32,8 @@
* Implementation class of FlowInfo.
*/
public final class DefaultFlowInfo implements FlowInfo {
+ private static final String INGRESS_STATS = "Ingress Port :";
+ private static final String EGRESS_STATS = "Egress Port :";
private final byte flowType;
private final DeviceId deviceId;
@@ -185,6 +187,20 @@
}
@Override
+ public String uniqueFlowInfoKey() {
+ if (srcIp.address().isZero() || dstIp.address().isZero()) {
+ if (!srcIp.address().isZero()) {
+ return INGRESS_STATS + srcIp.toString();
+ }
+ if (!dstIp.address().isZero()) {
+ return EGRESS_STATS + dstIp.toString();
+ }
+ }
+ return srcIp.toString() + ":" + srcPort.toString() + " -> " +
+ dstIp.toString() + ":" + dstPort.toString();
+ }
+
+ @Override
public String toString() {
return toStringHelper(this)
.add("flowType", flowType)
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index 3a92bdf..79cb4c7 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -46,7 +47,6 @@
import org.onosproject.net.flow.FlowRuleOperations;
import org.onosproject.net.flow.FlowRuleOperationsContext;
import org.onosproject.net.flow.FlowRuleService;
-import org.onosproject.net.flow.IndexTableId;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
import org.onosproject.net.flow.criteria.IPCriterion;
@@ -69,8 +69,11 @@
import org.slf4j.LoggerFactory;
import java.util.Dictionary;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -93,6 +96,7 @@
import static org.onosproject.openstacknetworking.api.Constants.VTAP_INBOUND_TABLE;
import static org.onosproject.openstacknetworking.api.Constants.VTAP_OUTBOUND_TABLE;
import static org.onosproject.openstacknode.api.OpenstackNode.NodeType.COMPUTE;
+import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_DATA_POINT_SIZE;
import static org.onosproject.openstacktelemetry.api.Constants.FLAT;
import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
import static org.onosproject.openstacktelemetry.api.Constants.VLAN;
@@ -123,11 +127,10 @@
private static final boolean DEFAULT_EGRESS_STATS = false;
private static final boolean DEFAULT_PORT_STATS = true;
- private static final String MAC_NOT_NULL = "MAC should not be null";
-
private static final String ARBITRARY_IP = "0.0.0.0/32";
private static final int ARBITRARY_LENGTH = 32;
private static final String ARBITRARY_MAC = "00:00:00:00:00:00";
+ private static final MacAddress NO_HOST_MAC = MacAddress.valueOf(ARBITRARY_MAC);
private static final int ARBITRARY_IN_INTF = 0;
private static final int ARBITRARY_OUT_INTF = 0;
@@ -182,6 +185,7 @@
private ScheduledFuture result;
private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
+ private final Map<String, Queue<FlowInfo>> flowInfoMap = Maps.newConcurrentMap();
private static final int SOURCE_ID = 1;
private static final int TARGET_ID = 2;
@@ -189,8 +193,6 @@
private static final int METRIC_PRIORITY_SOURCE = SOURCE_ID * PRIORITY_BASE;
private static final int METRIC_PRIORITY_TARGET = TARGET_ID * PRIORITY_BASE;
- private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
-
@Activate
protected void activate() {
appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
@@ -264,13 +266,6 @@
IPProtocolCriterion ipProtocol =
(IPProtocolCriterion) selector.getCriterion(IP_PROTO);
- log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
- ((IndexTableId) entry.table()).id(),
- srcIp.ip().toString(),
- dstIp.ip().toString(),
- entry.packets(),
- entry.bytes());
-
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(srcIp.ip())
.withDstIp(dstIp.ip());
@@ -283,25 +278,13 @@
(TcpPortCriterion) selector.getCriterion(TCP_SRC);
TcpPortCriterion tcpDst =
(TcpPortCriterion) selector.getCriterion(TCP_DST);
-
- log.debug("TCP SRC Port: {}, DST Port: {}",
- tcpSrc.tcpPort().toInt(),
- tcpDst.tcpPort().toInt());
-
fBuilder.withSrcPort(tcpSrc.tcpPort());
fBuilder.withDstPort(tcpDst.tcpPort());
-
} else if (ipProtocol.protocol() == PROTOCOL_UDP) {
-
UdpPortCriterion udpSrc =
(UdpPortCriterion) selector.getCriterion(UDP_SRC);
UdpPortCriterion udpDst =
(UdpPortCriterion) selector.getCriterion(UDP_DST);
-
- log.debug("UDP SRC Port: {}, DST Port: {}",
- udpSrc.udpPort().toInt(),
- udpDst.udpPort().toInt());
-
fBuilder.withSrcPort(udpSrc.udpPort());
fBuilder.withDstPort(udpDst.udpPort());
} else {
@@ -387,7 +370,7 @@
.withSrcIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
.withDstIp(IpPrefix.valueOf(ARBITRARY_IP))
.withSrcMac(instPort.macAddress())
- .withDstMac(MacAddress.valueOf(ARBITRARY_MAC))
+ .withDstMac(NO_HOST_MAC)
.withDeviceId(instPort.deviceId())
.withInputInterfaceId(ARBITRARY_IN_INTF)
.withOutputInterfaceId(ARBITRARY_OUT_INTF)
@@ -420,7 +403,7 @@
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(IpPrefix.valueOf(ARBITRARY_IP))
.withDstIp(IpPrefix.valueOf(instPort.ipAddress(), ARBITRARY_LENGTH))
- .withSrcMac(MacAddress.valueOf(ARBITRARY_MAC))
+ .withSrcMac(NO_HOST_MAC)
.withDstMac(instPort.macAddress())
.withDeviceId(instPort.deviceId())
.withInputInterfaceId(ARBITRARY_IN_INTF)
@@ -460,7 +443,6 @@
StatsFlowRule statsFlowRule, int rulePriority,
boolean install) {
- log.debug("Table Transition: {} -> {}", fromTable, toTable);
int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
@@ -711,6 +693,26 @@
return NO_HOST_MAC;
}
+ private void enqFlowInfo(FlowInfo flowInfo) {
+ String key = flowInfo.uniqueFlowInfoKey();
+ Queue<FlowInfo> queue = flowInfoMap.get(key);
+ if (queue == null) {
+ Queue<FlowInfo> newQueue = new LinkedList<FlowInfo>();
+ newQueue.offer(flowInfo);
+ flowInfoMap.put(key, newQueue);
+ return;
+ }
+ queue.offer(flowInfo);
+
+ while (queue.size() > DEFAULT_DATA_POINT_SIZE) {
+ queue.remove(); // Removes a garbage data in the queue.
+ }
+ }
+
+ public Map<String, Queue<FlowInfo>> getFlowInfoMap() {
+ return flowInfoMap;
+ }
+
/**
* Extracts properties from the component configuration context.
*
@@ -775,6 +777,11 @@
}
telemetryService.publish(filteredFlowInfos);
+
+ // TODO: Refactor the following code to "TelemetryService" style.
+ filteredFlowInfos.forEach(flowInfo -> {
+ enqFlowInfo(flowInfo);
+ });
}
private boolean checkSrcDstLocalMaster(FlowInfo info) {