Deserialize sFlow packet

Change-Id: Ib3cbdab53436391dfeeee2344834aa600e015a2d
diff --git a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPacket.java b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPacket.java
index 80c1ea4..8ceb6f7 100644
--- a/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPacket.java
+++ b/apps/ipflow-monitor/sflow/app/src/main/java/org/onosproject/sflow/impl/SflowPacket.java
@@ -18,10 +18,18 @@
 
 import java.util.LinkedList;
 import java.util.List;
+
 import com.google.common.base.MoreObjects;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.Objects;
+import java.util.function.BiPredicate;
+import java.nio.ByteBuffer;
 
 import org.onosproject.sflow.SflowSample;
+import org.onlab.packet.DeserializationException;
 import org.onlab.packet.Deserializer;
 import org.onlab.packet.BasePacket;
 
@@ -59,14 +67,17 @@
 
 public final class SflowPacket extends BasePacket {
 
+    public static final int SFLOW_HEADER_LENGTH = 28;
+    public static final int SAMPLE_HEADER_MIN_LENGTH = 8;
+
     private int version; //current supported version is 5
-    private int agentIpVersion; // current supported version 1=v4, 2=v6
+    private IpVersion agentIpVersion; // current supported version 1=v4, 2=v6
     private String agentAddress; // sflow agent IP address
     private int subAgentID; //sflow implemented on distributed device
     private int seqNumber; // To overcome spoofed attacks of spoofed sflow dgrams
     private int sysUptime; // Milliseconds since device last booted
-    private int numberOfSamples; // number of samplings
-    private List<SflowSample> sFlowsample; // List of sampling data headers
+    private int numberOfSamples; // in datagrams
+    private List<SflowSample> sFlowsample; // List of Sample data headers
 
     private SflowPacket(Builder builder) {
         this.version = builder.version;
@@ -79,6 +90,39 @@
         this.sFlowsample = builder.sflowSample;
     }
 
+    /**
+     * Interface ip version.
+     * ip type : ipv4 and ipv6.
+     */
+    public enum IpVersion {
+
+        IPv4(1),
+        IPv6(2);
+
+        private final int version;
+
+        IpVersion(int version) {
+            this.version = version;
+        }
+
+        private static Map<Integer, IpVersion> parser = new ConcurrentHashMap<>();
+
+        static {
+            Arrays.stream(IpVersion.values()).forEach(ipVersion -> parser.put(ipVersion.version, ipVersion));
+        }
+
+        public static IpVersion getVersion(int type) throws DeserializationException {
+            if ((type < 1) || (type > 2)) {
+                throw new DeserializationException("Invalid ip type");
+            }
+            return Optional.of(type)
+                    .filter(id -> parser.containsKey(id))
+                    .map(id -> parser.get(id))
+                    .orElse(IPv4);
+        }
+
+    }
+
 
     /**
      * Returns Version of Flow entry ; Supported Version 2,4,5.
@@ -95,7 +139,7 @@
      *
      * @return version number
      */
-    public int getAgentIpVersion() {
+    public IpVersion getAgentIpVersion() {
         return agentIpVersion;
     }
 
@@ -146,9 +190,9 @@
     }
 
     /**
-     * Returns number of samplings generated in sFlow datagram.
+     * Returns number of samples generated in sFlow datagram.
      *
-     * @return Number of samplings
+     * @return Number of Samples
      **/
 
     public int getNumberSample() {
@@ -156,7 +200,7 @@
     }
 
     /**
-     * Returns number of samplings generated in sFlow datagram. An sFlow Datagram.
+     * Returns number of samples generated in sFlow datagram. An sFlow Datagram.
      * contains lists of Packet Flow Records and counter records. The format of each
      * Packet Flow Record is identified by a data_format value. The data_format name
      * space is extensible, allowing for the addition of standard record types as
@@ -174,7 +218,39 @@
      * @return deserializer function
      */
     public static Deserializer<SflowPacket> deserializer() {
-        return (data, offset, length) -> null;
+        return (data, offset, length) -> {
+            BiPredicate<ByteBuffer, Integer> isValidBuffer = (b, l)
+                    -> b.hasRemaining() && b.remaining() >= l;
+
+            ByteBuffer bb = ByteBuffer.wrap(data, offset, length);
+            if (!isValidBuffer.test(bb, (SFLOW_HEADER_LENGTH + SAMPLE_HEADER_MIN_LENGTH))) {
+                throw new IllegalStateException("Invalid sflow packet buffer size.");
+            }
+            Builder builder = new Builder();
+            builder.version(bb.getInt())
+                    .agentIpVersion(IpVersion.getVersion(bb.getInt()))
+                    .agentAddress(bb.getInt())
+                    .subAgentID(bb.getInt())
+                    .seqNumber(bb.getInt())
+                    .sysUptime(bb.getInt())
+                    .numberOfSamples(bb.getInt());
+
+            while (bb.hasRemaining()) {
+                int sampleType = bb.getInt();
+                int sampleLength = bb.getInt();
+                bb.position(bb.position() - SAMPLE_HEADER_MIN_LENGTH);
+                int sflowSampleLenght = sampleLength + SAMPLE_HEADER_MIN_LENGTH;
+                if (bb.remaining() < sflowSampleLenght) {
+                    break;
+                }
+                byte[] sflowSampleBytes = new byte[sflowSampleLenght];
+                bb.get(sflowSampleBytes);
+                builder.sflowSample((SflowSample) SflowSample.Type.getType(sampleType).getDecoder()
+                        .deserialize(sflowSampleBytes, 0, sflowSampleLenght));
+            }
+            return builder.build();
+
+        };
     }
 
     @Override
@@ -186,7 +262,6 @@
     public int hashCode() {
         int hash = 3;
         hash = 59 * hash + this.version;
-        hash = 59 * hash + this.agentIpVersion;
         hash = 59 * hash + this.seqNumber;
         hash = 59 * hash + this.subAgentID;
         hash = 59 * hash + this.sysUptime;
@@ -208,9 +283,7 @@
             return false;
         }
         final SflowPacket other = (SflowPacket) obj;
-        if (this.version != other.version) {
-            return false;
-        }
+
         if (this.agentIpVersion != other.agentIpVersion) {
             return false;
         }
@@ -247,7 +320,7 @@
      */
     private static class Builder {
         private int version;
-        private int agentIpVersion;
+        private IpVersion agentIpVersion;
         private String agentAddress;
         private int subAgentID;
         private int seqNumber;
@@ -272,7 +345,7 @@
          * @param agent ip version.
          * @return this class builder.
          */
-        public Builder agentIpVersion(int agentIpVersion) {
+        public Builder agentIpVersion(IpVersion agentIpVersion) {
             this.agentIpVersion = agentIpVersion;
             return this;
         }
@@ -283,8 +356,17 @@
          * @param IP Address.
          * @return this class builder.
          */
-        public Builder agentAddress(String agentAddress) {
-            this.agentAddress = agentAddress;
+        public Builder agentAddress(int agentAddress) {
+            StringBuilder sb = new StringBuilder();
+            int result = 0;
+            for (int i = 0; i < 4; ++i) {
+                result = agentAddress >> (3 - i) * 8 & 0xff;
+                sb.append(result);
+                if (i != 3) {
+                    sb.append(".");
+                }
+            }
+            this.agentAddress = sb.toString();
             return this;
         }
 
@@ -300,9 +382,9 @@
         }
 
         /**
-         * Setter for number of samplings generated in sFlow datagram.
+         * Setter for number of samples generated in sFlow datagram.
          *
-         * @param Number of samplings.
+         * @param Number of Samples.
          * @return this class builder.
          */
         public Builder numberOfSamples(int numberOfSamples) {
@@ -322,7 +404,7 @@
         }
 
         /**
-         * Returns Sequence number of flow entry . Incremented with each flow sampling.
+         * Returns Sequence number of flow entry . Incremented with each flow sample.
          * generated by this source_id
          *
          * @param sequence number.
@@ -334,9 +416,9 @@
         }
 
         /**
-         * Setter for list of samplings.
+         * Setter for list of SampleDataHeaders.
          *
-         * @param sflowSample list of samplings.
+         * @param sampleDataHeaders list of sampleDataHeaders.
          * @return this class builder.
          */
         public Builder sflowSample(SflowSample sflowSample) {
@@ -350,7 +432,7 @@
          */
         private void checkArguments() {
             checkState(version != 0, "Invalid Version.");
-            checkState(agentIpVersion != 0, "Invalid ipVersionAgent.");
+            checkState(agentIpVersion != null, "Invalid ipVersionAgent.");
             checkState(subAgentID != 0, "Invalid subAgentID.");
             checkState(seqNumber != 0, "Invalid SeqNumber.");
             checkState(sysUptime != 0, "Invalid sysUptime.");