Fix: resolve NPE caused by non-existence of IP protocol
Change-Id: I8f2233637986089b9347da03221db2852faa7fe7
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
index 8b28797..73258ac 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
@@ -41,11 +41,21 @@
public ByteBuffer encode(FlowInfo flowInfo) {
ByteBuffer byteBuffer = ByteBuffer.allocate(MESSAGE_SIZE);
+ int srcPort = 0;
+ int dstPort = 0;
String deviceId = flowInfo.deviceId().toString();
short switchId = (short) Integer.parseInt(deviceId.substring(3,
deviceId.length()), 16);
+ if (flowInfo.srcPort() != null) {
+ srcPort = flowInfo.srcPort().toInt();
+ }
+
+ if (flowInfo.dstPort() != null) {
+ dstPort = flowInfo.dstPort().toInt();
+ }
+
byteBuffer.put(flowInfo.flowType())
.putShort(switchId)
.putInt(flowInfo.inputInterfaceId())
@@ -53,10 +63,10 @@
.putShort(flowInfo.vlanId().toShort())
.put(flowInfo.srcIp().address().toOctets())
.put((byte) flowInfo.srcIp().prefixLength())
- .putShort((short) flowInfo.srcPort().toInt())
+ .putShort((short) srcPort)
.put(flowInfo.dstIp().address().toOctets())
.put((byte) flowInfo.dstIp().prefixLength())
- .putShort((short) flowInfo.dstPort().toInt())
+ .putShort((short) dstPort)
.put(flowInfo.protocol())
.put(flowInfo.srcMac().toBytes())
.put(flowInfo.dstMac().toBytes());
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
index 3f56362..a631abc 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
@@ -142,12 +142,13 @@
@Override
public boolean roughEquals(FlowInfo flowInfo) {
- return deviceId.equals(flowInfo.deviceId()) &&
- srcIp.equals(flowInfo.srcIp()) &&
- dstIp.equals(flowInfo.dstIp()) &&
- srcPort.equals(flowInfo.srcPort()) &&
- dstPort.equals(flowInfo.dstPort()) &&
- (protocol == flowInfo.protocol());
+ final DefaultFlowInfo other = (DefaultFlowInfo) flowInfo;
+ return Objects.equals(this.deviceId, other.deviceId) &&
+ Objects.equals(this.srcIp, other.srcIp) &&
+ Objects.equals(this.dstIp, other.dstIp) &&
+ Objects.equals(this.srcPort, other.srcPort) &&
+ Objects.equals(this.dstPort, other.dstPort) &&
+ Objects.equals(this.protocol, other.protocol);
}
@Override
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 8d6f62f..d1de54a 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -128,6 +128,7 @@
return null;
}
+ log.debug("Send telemetry record to kafka server...");
return producer.send(record);
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index 54f454e..3d609c6 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -28,7 +28,7 @@
import org.onlab.packet.VlanId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.net.Device;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Host;
import org.onosproject.net.device.DeviceService;
@@ -56,6 +56,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
@@ -105,6 +106,9 @@
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected OpenstackTelemetryService telemetryService;
private Timer timer;
@@ -275,37 +279,40 @@
fBuilder.withFlowType(FLOW_TYPE_SONA)
.withSrcIp(srcIp.ip())
- .withDstIp(dstIp.ip())
- .withProtocol((byte) ipProtocol.protocol());
+ .withDstIp(dstIp.ip());
- if (ipProtocol.protocol() == PROTOCOL_TCP) {
- TcpPortCriterion tcpSrc =
- (TcpPortCriterion) selector.getCriterion(TCP_SRC);
- TcpPortCriterion tcpDst =
- (TcpPortCriterion) selector.getCriterion(TCP_DST);
+ if (ipProtocol != null) {
+ fBuilder.withProtocol((byte) ipProtocol.protocol());
- log.debug("TCP SRC Port: {}, DST Port: {}",
- tcpSrc.tcpPort().toInt(),
- tcpDst.tcpPort().toInt());
+ if (ipProtocol.protocol() == PROTOCOL_TCP) {
+ TcpPortCriterion tcpSrc =
+ (TcpPortCriterion) selector.getCriterion(TCP_SRC);
+ TcpPortCriterion tcpDst =
+ (TcpPortCriterion) selector.getCriterion(TCP_DST);
- fBuilder.withSrcPort(tcpSrc.tcpPort());
- fBuilder.withDstPort(tcpDst.tcpPort());
+ log.debug("TCP SRC Port: {}, DST Port: {}",
+ tcpSrc.tcpPort().toInt(),
+ tcpDst.tcpPort().toInt());
- } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
+ fBuilder.withSrcPort(tcpSrc.tcpPort());
+ fBuilder.withDstPort(tcpDst.tcpPort());
- UdpPortCriterion udpSrc =
- (UdpPortCriterion) selector.getCriterion(UDP_SRC);
- UdpPortCriterion udpDst =
- (UdpPortCriterion) selector.getCriterion(UDP_DST);
+ } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
- log.debug("UDP SRC Port: {}, DST Port: {}",
- udpSrc.udpPort().toInt(),
- udpDst.udpPort().toInt());
+ UdpPortCriterion udpSrc =
+ (UdpPortCriterion) selector.getCriterion(UDP_SRC);
+ UdpPortCriterion udpDst =
+ (UdpPortCriterion) selector.getCriterion(UDP_DST);
- fBuilder.withSrcPort(udpSrc.udpPort());
- fBuilder.withDstPort(udpDst.udpPort());
- } else {
- log.debug("Other protocol: {}", ipProtocol.protocol());
+ log.debug("UDP SRC Port: {}, DST Port: {}",
+ udpSrc.udpPort().toInt(),
+ udpDst.udpPort().toInt());
+
+ fBuilder.withSrcPort(udpSrc.udpPort());
+ fBuilder.withDstPort(udpDst.udpPort());
+ } else {
+ log.debug("Other protocol: {}", ipProtocol.protocol());
+ }
}
fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
@@ -363,13 +370,13 @@
gFlowInfoSet.remove(gFlowInfo);
gFlowInfoSet.add(newFlowInfo);
- log.info("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
+ log.debug("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
return newFlowInfo;
}
}
// No such record, then build the FlowInfo object and return this object.
- log.info("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
+ log.debug("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
gFlowInfoSet.add(newFlowInfo);
return newFlowInfo;
@@ -384,18 +391,34 @@
.dstTpPort(statsFlowRule.srcTpPort())
.build();
- // FIXME: install stat flow rules for all devices for now
- // need to query the device where the host with the given IP located
- for (Device d : deviceService.getDevices()) {
- if (d.type() == Device.Type.CONTROLLER) {
- log.info("Not provide stats for 'CONTROLLER' ({})", d.id().toString());
- continue;
- }
+ DeviceId srcDeviceId = getDeviceId(statsFlowRule.srcIpPrefix().address());
+ DeviceId dstDeviceId = getDeviceId(statsFlowRule.dstIpPrefix().address());
- connectTables(d.id(), STAT_INBOUND_TABLE, DHCP_ARP_TABLE,
- statsFlowRule, METRIC_PRIORITY_SOURCE, install);
- connectTables(d.id(), STAT_OUTBOUND_TABLE, FORWARDING_TABLE,
- inverseFlowRule, METRIC_PRIORITY_TARGET, install);
+ if (srcDeviceId == null || dstDeviceId == null) {
+ return;
+ }
+
+ connectTables(srcDeviceId, STAT_INBOUND_TABLE, DHCP_ARP_TABLE,
+ statsFlowRule, METRIC_PRIORITY_SOURCE, install);
+ connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, FORWARDING_TABLE,
+ inverseFlowRule, METRIC_PRIORITY_TARGET, install);
+ }
+
+ /**
+ * Get Device ID which the VM is located.
+ *
+ * @param ipAddress IP Address of host
+ * @return Device ID
+ */
+ private DeviceId getDeviceId(IpAddress ipAddress) {
+ if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+ Optional<Host> host = hostService.getHostsByIp(ipAddress).stream().findAny();
+ return host.map(host1 -> host1.location().deviceId()).orElse(null);
+ } else {
+ log.error("Failed to get DeviceID which is connected to {}. " +
+ "The VM is not instantiated correctly now.",
+ ipAddress.toString());
+ return null;
}
}
@@ -446,8 +469,20 @@
@Override
public void run() {
log.debug("Timer Task Thread Starts ({})", loopCount++);
+
+ Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
+
+ // we only let the master controller of the device where the
+ // stats flow rules are installed send kafka message
+ getFlowInfo().forEach(f -> {
+ DeviceId deviceId = getDeviceId(f.srcIp().address());
+ if (mastershipService.isLocalMaster(deviceId)) {
+ filteredFlowInfos.add(f);
+ }
+ });
+
try {
- telemetryService.publish(getFlowInfo());
+ telemetryService.publish(filteredFlowInfos);
} catch (Exception ex) {
log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
}