Berkeley Sockets give possibility to:

1. Transmit datagrams with no guarantee of delivery (UDP)

2. or a byte stream with a guarantee (TCP).

When we transmit certain amount of bytes ovar TCP using send function there is no guarantee that we will get exactly the same amount after recv call on the receiving side.

For example, you can send one time 1000 bytes, and recv 400 after first call and then 600 after second. Similarly, the flow can have reverse process - gluing - you can send 200 bytes 3 times and receive 600 immediately. The reason for this behavior is, Nagle algorithm: by allowing the accumulation of bytes and the subsequent sending it significantly improve data transmission efficiency. But what if you need to transfer the package as a whole, i.e. the entire datagram. One option is to use TLV approach (Tag-length-value).

Here is a simple packets extractor from the stream based on the primitive TLV. To add the data we transferred one byte marker (just a number) and size of the two-byte packet. This will allow us to know when the flow accumulated enough bytes for the entire package.

class StreamGramsExtractor:
    Allows receive datagrams via reliable stream using simple TLV (Type-length-value)
    # STReam States

    PACK_MARKER = 0x72

    def __init__(self, log=None):
        self._log = log
        self._rcv_size = 0
        self._rcv_state = self.STRS_WAIT_MARKER
        self._rcv_buf = bytearray([])

    def put_chunk(self, chunk):
        Put stream chunk and receive list of packets
        :param chunk:
        i = 0
        packets = []
        while i < len(chunk):
            if self._rcv_state == self.STRS_WAIT_MARKER:
                if chunk[i] == self.PACK_MARKER:
                    self._rcv_state = self.STRS_WAIT_HI_SIZE
                    if self._log:
                            "Received init byte {} not equal marker {}".format(chunk[i], self.PACK_MARKER))
                    return []  # drop this chunk
                i += 1
            elif self._rcv_state == self.STRS_WAIT_HI_SIZE:
                self._rcv_state = self.STRS_WAIT_LO_SIZE
                self._rcv_size = chunk[i] << 8
                i += 1
            elif self._rcv_state == self.STRS_WAIT_LO_SIZE:
                self._rcv_state = self.STRS_WAIT_DATA
                self._rcv_size |= chunk[i]
                i += 1
                if len(self._rcv_buf) + len(chunk) - i >= self._rcv_size:
                    packets.append(self._rcv_buf + chunk[i:i + self._rcv_size])
                    self._rcv_buf = bytearray([])
                    i += self._rcv_size
                    self._rcv_state = self.STRS_WAIT_MARKER
                    self._rcv_buf += chunk[i:]
                    i = len(chunk)
        return packets

    def wrap_packet(self, packet):
        Call this method before send to stream protocol,
        It will add metaheader for extract full packet from stream
        :param packet: packet
        :return: meta header + packet
        l = len(packet)
        return bytearray([
            (l >> 8) & 0xFF, l & 0xFF]) + packet

How to use:

1. Create instanse of a class on transmitter and receiver

2. Before call send wrap bytearray using wrap_packet. It will return bytearray that you can pass to send

3. After receive new portion of stream on receiver side call put_chank. put_chunk will return list of compleate packets (or empty list if there is still no compleate packets)