Transmit datagrams (packets) via TCP
Berkeley Sockets give possibility to:
1. Transmit datagrams with no guarantee of delivery (UDP
)
2. or a byte stream with a guarantee (TCP
).
When we transmit certain amount of bytes ovar TCP
using send
function there is no guarantee that we will get exactly the same amount after recv
call on the receiving side.
For example, you can send
one time 1000
bytes, and recv
400
after first call and then 600
after second. Similarly, the flow can have reverse process - gluing - you can send 200
bytes 3
times and receive 600
immediately. The reason for this behavior is, Nagle algorithm: by allowing the accumulation of bytes and the subsequent sending it significantly improve data transmission efficiency. But what if you need to transfer the package as a whole, i.e. the entire datagram. One option is to use TLV
approach (Tag-length-value).
Here is a simple packets extractor from the stream based on the primitive TLV
. To add the data we transferred one byte marker (just a number) and size of the two-byte packet. This will allow us to know when the flow accumulated enough bytes for the entire package.
class StreamGramsExtractor:
"""
Allows receive datagrams via reliable stream using simple TLV (Type-length-value)
"""
# STReam States
STRS_WAIT_MARKER = 0
STRS_WAIT_HI_SIZE = 1
STRS_WAIT_LO_SIZE = 2
STRS_WAIT_DATA = 3
PACK_MARKER = 0x72
def __init__(self, log=None):
self._log = log
self._rcv_size = 0
self._rcv_state = self.STRS_WAIT_MARKER
self._rcv_buf = bytearray([])
def put_chunk(self, chunk):
"""
Put stream chunk and receive list of packets
:param chunk:
:return:
"""
i = 0
packets = []
while i < len(chunk):
if self._rcv_state == self.STRS_WAIT_MARKER:
if chunk[i] == self.PACK_MARKER:
self._rcv_state = self.STRS_WAIT_HI_SIZE
else:
if self._log:
self._log.warning(
"Received init byte {} not equal marker {}".format(chunk[i], self.PACK_MARKER))
return [] # drop this chunk
i += 1
elif self._rcv_state == self.STRS_WAIT_HI_SIZE:
self._rcv_state = self.STRS_WAIT_LO_SIZE
self._rcv_size = chunk[i] << 8
i += 1
elif self._rcv_state == self.STRS_WAIT_LO_SIZE:
self._rcv_state = self.STRS_WAIT_DATA
self._rcv_size |= chunk[i]
i += 1
else:
if len(self._rcv_buf) + len(chunk) - i >= self._rcv_size:
packets.append(self._rcv_buf + chunk[i:i + self._rcv_size])
self._rcv_buf = bytearray([])
i += self._rcv_size
self._rcv_state = self.STRS_WAIT_MARKER
else:
self._rcv_buf += chunk[i:]
i = len(chunk)
return packets
def wrap_packet(self, packet):
"""
Call this method before send to stream protocol,
It will add metaheader for extract full packet from stream
:param packet: packet
:return: meta header + packet
"""
l = len(packet)
return bytearray([
self.PACK_MARKER,
(l >> 8) & 0xFF, l & 0xFF]) + packet
How to use:
1. Create instanse of a class on transmitter and receiver
2. Before call send
wrap bytearray
using wrap_packet
. It will return bytearray
that you can pass to send
3. After receive new portion of stream on receiver side call put_chank
. put_chunk
will return list of compleate packets (or empty list if there is still no compleate packets)