pyloxi: keep track of offset/length ourselves in OFReader
For skip_align to work we need to know the offset from the beginning of the
message, not from the beginning of our OFReader (which might have already been
misaligned).
diff --git a/py_gen/templates/generic_util.py b/py_gen/templates/generic_util.py
index cd62057..89b158d 100644
--- a/py_gen/templates/generic_util.py
+++ b/py_gen/templates/generic_util.py
@@ -79,50 +79,60 @@
known field lengths. This class supports efficiently reading
fields sequentially and is intended to be used recursively by the
parsers of child objects which will implicitly update the offset.
+
+ buf: buffer object
+ start: initial position in the buffer
+ length: number of bytes after start
+ offset: distance from start
"""
- def __init__(self, buf):
+ def __init__(self, buf, start=0, length=None):
self.buf = buf
+ self.start = start
+ if length is None:
+ self.length = len(buf) - start
+ else:
+ self.length = length
self.offset = 0
def read(self, fmt):
st = struct.Struct(fmt)
- if self.offset + st.size > len(self.buf):
+ if self.offset + st.size > self.length:
raise loxi.ProtocolError("Buffer too short")
- result = st.unpack_from(self.buf, self.offset)
+ result = st.unpack_from(self.buf, self.start+self.offset)
self.offset += st.size
return result
def read_all(self):
- buf = buffer(self.buf, self.offset)
- self.offset += len(buf)
- return str(buf)
+ s = self.buf[(self.start+self.offset):(self.start+self.length)]
+ assert(len(s) == self.length - self.offset)
+ self.offset = self.length
+ return s
def peek(self, fmt, offset=0):
st = struct.Struct(fmt)
- if self.offset + offset + st.size > len(self.buf):
+ if self.offset + offset + st.size > self.length:
raise loxi.ProtocolError("Buffer too short")
- result = st.unpack_from(self.buf, self.offset + offset)
+ result = st.unpack_from(self.buf, self.start + self.offset + offset)
return result
def skip(self, length):
- if self.offset + length > len(self.buf):
+ if self.offset + length > self.length:
raise loxi.ProtocolError("Buffer too short")
self.offset += length
def skip_align(self):
- new_offset = (self.offset + 7) / 8 * 8
- if new_offset > len(self.buf):
+ new_offset = ((self.start + self.offset + 7) / 8 * 8) - self.start
+ if new_offset > self.length:
raise loxi.ProtocolError("Buffer too short")
self.offset = new_offset
def is_empty(self):
- return self.offset == len(self.buf)
+ return self.offset == self.length
- # Used when parsing variable length objects which have external length
- # fields (e.g. the actions list in an OF 1.0 packet-out message).
+ # Used when parsing objects that have their own length fields
def slice(self, length):
- if self.offset + length > len(self.buf):
+ if self.offset + length > self.length:
raise loxi.ProtocolError("Buffer too short")
- buf = OFReader(buffer(self.buf, self.offset, length))
+ reader = OFReader(self.buf, self.start + self.offset, length)
self.offset += length
- return buf
+ return reader
diff --git a/py_gen/tests/generic_util.py b/py_gen/tests/generic_util.py
index da045b9..a3b18c6 100644
--- a/py_gen/tests/generic_util.py
+++ b/py_gen/tests/generic_util.py
@@ -116,5 +116,18 @@
self.assertEquals(reader.slice(2).read_all(), "fg")
self.assertEquals(reader.is_empty(), True)
+ def test_skip_align(self):
+ reader = OFReader("abcd" + "efgh" + "ijkl" + "mnop" + "qr")
+ reader.skip_align()
+ self.assertEquals(reader.peek('2s')[0], 'ab')
+ self.assertEquals(reader.read('2s')[0], "ab")
+ reader.skip_align()
+ self.assertEquals(reader.peek('2s')[0], 'ij')
+ self.assertEquals(reader.read('2s')[0], 'ij')
+ child = reader.slice(8)
+ self.assertEquals(child.peek('2s')[0], 'kl')
+ child.skip_align()
+ self.assertEquals(child.peek('2s')[0], 'qr')
+
if __name__ == '__main__':
unittest.main()