Fix: resolve NPE caused by non-existence of IP protocol

Change-Id: I8f2233637986089b9347da03221db2852faa7fe7
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
index 8b28797..73258ac 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
@@ -41,11 +41,21 @@
     public ByteBuffer encode(FlowInfo flowInfo) {
 
         ByteBuffer byteBuffer = ByteBuffer.allocate(MESSAGE_SIZE);
+        int srcPort = 0;
+        int dstPort = 0;
 
         String  deviceId = flowInfo.deviceId().toString();
         short switchId = (short) Integer.parseInt(deviceId.substring(3,
                                                   deviceId.length()), 16);
 
+        if (flowInfo.srcPort() != null) {
+            srcPort = flowInfo.srcPort().toInt();
+        }
+
+        if (flowInfo.dstPort() != null) {
+            dstPort = flowInfo.dstPort().toInt();
+        }
+
         byteBuffer.put(flowInfo.flowType())
                 .putShort(switchId)
                 .putInt(flowInfo.inputInterfaceId())
@@ -53,10 +63,10 @@
                 .putShort(flowInfo.vlanId().toShort())
                 .put(flowInfo.srcIp().address().toOctets())
                 .put((byte) flowInfo.srcIp().prefixLength())
-                .putShort((short) flowInfo.srcPort().toInt())
+                .putShort((short) srcPort)
                 .put(flowInfo.dstIp().address().toOctets())
                 .put((byte) flowInfo.dstIp().prefixLength())
-                .putShort((short) flowInfo.dstPort().toInt())
+                .putShort((short) dstPort)
                 .put(flowInfo.protocol())
                 .put(flowInfo.srcMac().toBytes())
                 .put(flowInfo.dstMac().toBytes());
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
index 3f56362..a631abc 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
@@ -142,12 +142,13 @@
 
     @Override
     public boolean roughEquals(FlowInfo flowInfo) {
-        return deviceId.equals(flowInfo.deviceId()) &&
-                srcIp.equals(flowInfo.srcIp()) &&
-                dstIp.equals(flowInfo.dstIp()) &&
-                srcPort.equals(flowInfo.srcPort()) &&
-                dstPort.equals(flowInfo.dstPort()) &&
-                (protocol == flowInfo.protocol());
+        final DefaultFlowInfo other = (DefaultFlowInfo) flowInfo;
+        return Objects.equals(this.deviceId, other.deviceId) &&
+                Objects.equals(this.srcIp, other.srcIp) &&
+                Objects.equals(this.dstIp, other.dstIp) &&
+                Objects.equals(this.srcPort, other.srcPort) &&
+                Objects.equals(this.dstPort, other.dstPort) &&
+                Objects.equals(this.protocol, other.protocol);
     }
 
     @Override
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index 8d6f62f..d1de54a 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -128,6 +128,7 @@
             return null;
         }
 
+        log.debug("Send telemetry record to kafka server...");
         return producer.send(record);
     }
 
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index 54f454e..3d609c6 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -28,7 +28,7 @@
 import org.onlab.packet.VlanId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
-import org.onosproject.net.Device;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Host;
 import org.onosproject.net.device.DeviceService;
@@ -56,6 +56,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Optional;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -105,6 +106,9 @@
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected OpenstackTelemetryService telemetryService;
 
     private Timer timer;
@@ -275,37 +279,40 @@
 
             fBuilder.withFlowType(FLOW_TYPE_SONA)
                     .withSrcIp(srcIp.ip())
-                    .withDstIp(dstIp.ip())
-                    .withProtocol((byte) ipProtocol.protocol());
+                    .withDstIp(dstIp.ip());
 
-            if (ipProtocol.protocol() == PROTOCOL_TCP) {
-                TcpPortCriterion tcpSrc =
-                                (TcpPortCriterion) selector.getCriterion(TCP_SRC);
-                TcpPortCriterion tcpDst =
-                                (TcpPortCriterion) selector.getCriterion(TCP_DST);
+            if (ipProtocol != null) {
+                fBuilder.withProtocol((byte) ipProtocol.protocol());
 
-                log.debug("TCP SRC Port: {}, DST Port: {}",
-                                                    tcpSrc.tcpPort().toInt(),
-                                                    tcpDst.tcpPort().toInt());
+                if (ipProtocol.protocol() == PROTOCOL_TCP) {
+                    TcpPortCriterion tcpSrc =
+                            (TcpPortCriterion) selector.getCriterion(TCP_SRC);
+                    TcpPortCriterion tcpDst =
+                            (TcpPortCriterion) selector.getCriterion(TCP_DST);
 
-                fBuilder.withSrcPort(tcpSrc.tcpPort());
-                fBuilder.withDstPort(tcpDst.tcpPort());
+                    log.debug("TCP SRC Port: {}, DST Port: {}",
+                            tcpSrc.tcpPort().toInt(),
+                            tcpDst.tcpPort().toInt());
 
-            } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
+                    fBuilder.withSrcPort(tcpSrc.tcpPort());
+                    fBuilder.withDstPort(tcpDst.tcpPort());
 
-                UdpPortCriterion udpSrc =
-                                (UdpPortCriterion) selector.getCriterion(UDP_SRC);
-                UdpPortCriterion udpDst =
-                                (UdpPortCriterion) selector.getCriterion(UDP_DST);
+                } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
 
-                log.debug("UDP SRC Port: {}, DST Port: {}",
-                                                    udpSrc.udpPort().toInt(),
-                                                    udpDst.udpPort().toInt());
+                    UdpPortCriterion udpSrc =
+                            (UdpPortCriterion) selector.getCriterion(UDP_SRC);
+                    UdpPortCriterion udpDst =
+                            (UdpPortCriterion) selector.getCriterion(UDP_DST);
 
-                fBuilder.withSrcPort(udpSrc.udpPort());
-                fBuilder.withDstPort(udpDst.udpPort());
-            } else {
-                log.debug("Other protocol: {}", ipProtocol.protocol());
+                    log.debug("UDP SRC Port: {}, DST Port: {}",
+                            udpSrc.udpPort().toInt(),
+                            udpDst.udpPort().toInt());
+
+                    fBuilder.withSrcPort(udpSrc.udpPort());
+                    fBuilder.withDstPort(udpDst.udpPort());
+                } else {
+                    log.debug("Other protocol: {}", ipProtocol.protocol());
+                }
             }
 
             fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
@@ -363,13 +370,13 @@
 
                 gFlowInfoSet.remove(gFlowInfo);
                 gFlowInfoSet.add(newFlowInfo);
-                log.info("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
+                log.debug("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
                 return newFlowInfo;
             }
         }
 
         // No such record, then build the FlowInfo object and return this object.
-        log.info("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
+        log.debug("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
         FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
         gFlowInfoSet.add(newFlowInfo);
         return newFlowInfo;
@@ -384,18 +391,34 @@
                                         .dstTpPort(statsFlowRule.srcTpPort())
                                         .build();
 
-        // FIXME: install stat flow rules for all devices for now
-        // need to query the device where the host with the given IP located
-        for (Device d : deviceService.getDevices()) {
-            if (d.type() == Device.Type.CONTROLLER) {
-                log.info("Not provide stats for 'CONTROLLER' ({})", d.id().toString());
-                continue;
-            }
+        DeviceId srcDeviceId = getDeviceId(statsFlowRule.srcIpPrefix().address());
+        DeviceId dstDeviceId = getDeviceId(statsFlowRule.dstIpPrefix().address());
 
-            connectTables(d.id(), STAT_INBOUND_TABLE, DHCP_ARP_TABLE,
-                            statsFlowRule, METRIC_PRIORITY_SOURCE, install);
-            connectTables(d.id(), STAT_OUTBOUND_TABLE, FORWARDING_TABLE,
-                            inverseFlowRule, METRIC_PRIORITY_TARGET, install);
+        if (srcDeviceId == null || dstDeviceId == null) {
+            return;
+        }
+
+        connectTables(srcDeviceId, STAT_INBOUND_TABLE, DHCP_ARP_TABLE,
+                statsFlowRule, METRIC_PRIORITY_SOURCE, install);
+        connectTables(dstDeviceId, STAT_OUTBOUND_TABLE, FORWARDING_TABLE,
+                inverseFlowRule, METRIC_PRIORITY_TARGET, install);
+    }
+
+    /**
+     * Get Device ID which the VM is located.
+     *
+     * @param ipAddress IP Address of host
+     * @return Device ID
+     */
+    private DeviceId getDeviceId(IpAddress ipAddress) {
+        if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+            Optional<Host> host = hostService.getHostsByIp(ipAddress).stream().findAny();
+            return host.map(host1 -> host1.location().deviceId()).orElse(null);
+        } else {
+            log.error("Failed to get DeviceID which is connected to {}. " +
+                            "The VM is not instantiated correctly now.",
+                    ipAddress.toString());
+            return null;
         }
     }
 
@@ -446,8 +469,20 @@
         @Override
         public void run() {
             log.debug("Timer Task Thread Starts ({})", loopCount++);
+
+            Set<FlowInfo> filteredFlowInfos = Sets.newConcurrentHashSet();
+
+            // we only let the master controller of the device where the
+            // stats flow rules are installed send kafka message
+            getFlowInfo().forEach(f -> {
+                DeviceId deviceId = getDeviceId(f.srcIp().address());
+                if (mastershipService.isLocalMaster(deviceId)) {
+                    filteredFlowInfos.add(f);
+                }
+            });
+
             try {
-                telemetryService.publish(getFlowInfo());
+                telemetryService.publish(filteredFlowInfos);
             } catch (Exception ex) {
                 log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
             }