Deserialize sFlow packet
Change-Id: Ib3cbdab53436391dfeeee2344834aa600e015a2d
diff --git a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPacket.java b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPacket.java
index 80c1ea4..8ceb6f7 100644
--- a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPacket.java
+++ b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPacket.java
@@ -18,10 +18,18 @@
import java.util.LinkedList;
import java.util.List;
+
import com.google.common.base.MoreObjects;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.Objects;
+import java.util.function.BiPredicate;
+import java.nio.ByteBuffer;
import org.onosproject.sflow.SflowSample;
+import org.onlab.packet.DeserializationException;
import org.onlab.packet.Deserializer;
import org.onlab.packet.BasePacket;
@@ -59,14 +67,17 @@
public final class SflowPacket extends BasePacket {
+ public static final int SFLOW_HEADER_LENGTH = 28;
+ public static final int SAMPLE_HEADER_MIN_LENGTH = 8;
+
private int version; //current supported version is 5
- private int agentIpVersion; // current supported version 1=v4, 2=v6
+ private IpVersion agentIpVersion; // current supported version 1=v4, 2=v6
private String agentAddress; // sflow agent IP address
private int subAgentID; //sflow implemented on distributed device
private int seqNumber; // To overcome spoofed attacks of spoofed sflow dgrams
private int sysUptime; // Milliseconds since device last booted
- private int numberOfSamples; // number of samplings
- private List<SflowSample> sFlowsample; // List of sampling data headers
+ private int numberOfSamples; // in datagrams
+ private List<SflowSample> sFlowsample; // List of Sample data headers
private SflowPacket(Builder builder) {
this.version = builder.version;
@@ -79,6 +90,39 @@
this.sFlowsample = builder.sflowSample;
}
+ /**
+ * Interface ip version.
+ * ip type : ipv4 and ipv6.
+ */
+ public enum IpVersion {
+
+ IPv4(1),
+ IPv6(2);
+
+ private final int version;
+
+ IpVersion(int version) {
+ this.version = version;
+ }
+
+ private static Map<Integer, IpVersion> parser = new ConcurrentHashMap<>();
+
+ static {
+ Arrays.stream(IpVersion.values()).forEach(ipVersion -> parser.put(ipVersion.version, ipVersion));
+ }
+
+ public static IpVersion getVersion(int type) throws DeserializationException {
+ if ((type < 1) || (type > 2)) {
+ throw new DeserializationException("Invalid ip type");
+ }
+ return Optional.of(type)
+ .filter(id -> parser.containsKey(id))
+ .map(id -> parser.get(id))
+ .orElse(IPv4);
+ }
+
+ }
+
/**
* Returns Version of Flow entry ; Supported Version 2,4,5.
@@ -95,7 +139,7 @@
*
* @return version number
*/
- public int getAgentIpVersion() {
+ public IpVersion getAgentIpVersion() {
return agentIpVersion;
}
@@ -146,9 +190,9 @@
}
/**
- * Returns number of samplings generated in sFlow datagram.
+ * Returns number of samples generated in sFlow datagram.
*
- * @return Number of samplings
+ * @return Number of Samples
**/
public int getNumberSample() {
@@ -156,7 +200,7 @@
}
/**
- * Returns number of samplings generated in sFlow datagram. An sFlow Datagram.
+ * Returns number of samples generated in sFlow datagram. An sFlow Datagram.
* contains lists of Packet Flow Records and counter records. The format of each
* Packet Flow Record is identified by a data_format value. The data_format name
* space is extensible, allowing for the addition of standard record types as
@@ -174,7 +218,39 @@
* @return deserializer function
*/
public static Deserializer<SflowPacket> deserializer() {
- return (data, offset, length) -> null;
+ return (data, offset, length) -> {
+ BiPredicate<ByteBuffer, Integer> isValidBuffer = (b, l)
+ -> b.hasRemaining() && b.remaining() >= l;
+
+ ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+ if (!isValidBuffer.test(bb, (SFLOW_HEADER_LENGTH + SAMPLE_HEADER_MIN_LENGTH))) {
+ throw new IllegalStateException("Invalid sflow packet buffer size.");
+ }
+ Builder builder = new Builder();
+ builder.version(bb.getInt())
+ .agentIpVersion(IpVersion.getVersion(bb.getInt()))
+ .agentAddress(bb.getInt())
+ .subAgentID(bb.getInt())
+ .seqNumber(bb.getInt())
+ .sysUptime(bb.getInt())
+ .numberOfSamples(bb.getInt());
+
+ while (bb.hasRemaining()) {
+ int sampleType = bb.getInt();
+ int sampleLength = bb.getInt();
+ bb.position(bb.position() - SAMPLE_HEADER_MIN_LENGTH);
+ int sflowSampleLenght = sampleLength + SAMPLE_HEADER_MIN_LENGTH;
+ if (bb.remaining() < sflowSampleLenght) {
+ break;
+ }
+ byte[] sflowSampleBytes = new byte[sflowSampleLenght];
+ bb.get(sflowSampleBytes);
+ builder.sflowSample((SflowSample) SflowSample.Type.getType(sampleType).getDecoder()
+ .deserialize(sflowSampleBytes, 0, sflowSampleLenght));
+ }
+ return builder.build();
+
+ };
}
@Override
@@ -186,7 +262,6 @@
public int hashCode() {
int hash = 3;
hash = 59 * hash + this.version;
- hash = 59 * hash + this.agentIpVersion;
hash = 59 * hash + this.seqNumber;
hash = 59 * hash + this.subAgentID;
hash = 59 * hash + this.sysUptime;
@@ -208,9 +283,7 @@
return false;
}
final SflowPacket other = (SflowPacket) obj;
- if (this.version != other.version) {
- return false;
- }
+
if (this.agentIpVersion != other.agentIpVersion) {
return false;
}
@@ -247,7 +320,7 @@
*/
private static class Builder {
private int version;
- private int agentIpVersion;
+ private IpVersion agentIpVersion;
private String agentAddress;
private int subAgentID;
private int seqNumber;
@@ -272,7 +345,7 @@
* @param agent ip version.
* @return this class builder.
*/
- public Builder agentIpVersion(int agentIpVersion) {
+ public Builder agentIpVersion(IpVersion agentIpVersion) {
this.agentIpVersion = agentIpVersion;
return this;
}
@@ -283,8 +356,17 @@
* @param IP Address.
* @return this class builder.
*/
- public Builder agentAddress(String agentAddress) {
- this.agentAddress = agentAddress;
+ public Builder agentAddress(int agentAddress) {
+ StringBuilder sb = new StringBuilder();
+ int result = 0;
+ for (int i = 0; i < 4; ++i) {
+ result = agentAddress >> (3 - i) * 8 & 0xff;
+ sb.append(result);
+ if (i != 3) {
+ sb.append(".");
+ }
+ }
+ this.agentAddress = sb.toString();
return this;
}
@@ -300,9 +382,9 @@
}
/**
- * Setter for number of samplings generated in sFlow datagram.
+ * Setter for number of samples generated in sFlow datagram.
*
- * @param Number of samplings.
+ * @param Number of Samples.
* @return this class builder.
*/
public Builder numberOfSamples(int numberOfSamples) {
@@ -322,7 +404,7 @@
}
/**
- * Returns Sequence number of flow entry . Incremented with each flow sampling.
+ * Returns Sequence number of flow entry . Incremented with each flow sample.
* generated by this source_id
*
* @param sequence number.
@@ -334,9 +416,9 @@
}
/**
- * Setter for list of samplings.
+ * Setter for list of SampleDataHeaders.
*
- * @param sflowSample list of samplings.
+ * @param sampleDataHeaders list of sampleDataHeaders.
* @return this class builder.
*/
public Builder sflowSample(SflowSample sflowSample) {
@@ -350,7 +432,7 @@
*/
private void checkArguments() {
checkState(version != 0, "Invalid Version.");
- checkState(agentIpVersion != 0, "Invalid ipVersionAgent.");
+ checkState(agentIpVersion != null, "Invalid ipVersionAgent.");
checkState(subAgentID != 0, "Invalid subAgentID.");
checkState(seqNumber != 0, "Invalid SeqNumber.");
checkState(sysUptime != 0, "Invalid sysUptime.");