The following Python script receives SLIP-encoded data from a serial port (/dev/ttyACM0
in this example) and decodes the SLIP messages using the fully asynchronous (asyncio
-based) serial_asyncio
library which you can install using
pip install -U pyserial-asyncio
You also need to install ansicolors
for colored printing on the console:
pip install -U ansicolors
Full source code
#!/usr/bin/env python3 # -*- coding: utf-8 -*- __author__ = "Uli Köhler" __license__ = "CC0 1.0 Universal" import asyncio from colors import red import serial_asyncio SLIP_END = 0o300 SLIP_ESC = 0o333 SLIP_ESCEND = 0o334 SLIP_ESCESC = 0o335 def handle_slip_message(msg): print(f"Received message of length", len(msg)) class SLIPProtocol(asyncio.Protocol): def connection_made(self, transport): self.msg = bytes() # Message buffer self.transport = transport print('port opened', transport) transport.serial.rts = False # You can manipulate Serial object via transport # Send "enter" to prompt output self.buf = b'' def check_for_slip_message(self): # Identify end of message in data decoded = [] last_char_is_esc = False for i in range(len(self.buf)): c = self.buf[i] if last_char_is_esc: # This character must be either # SLIP_ESCEND or SLIP_ESCESC if c == SLIP_ESCEND: # Literal END character decoded.append(SLIP_END) elif c == SLIP_ESCESC: # Literal ESC character decoded.append(SLIP_ESC) else: print(red("Encountered invalid SLIP escape sequence. Ignoring...")) # Ignore bad part of message self.buf = self.buf[i+1:] break last_char_is_esc = False # Reset state else: # last char was NOT ESC if c == 192: # END of message # Remove current message from buffer self.buf = self.buf[i+1:] # Emit message return bytes(decoded) elif c == SLIP_ESC: # Handle escaped character next last_char_is_esc = True else: # Any other character decoded.append(c) # No more bytes in buffer => no more message return None def data_received(self, data): # Append new data to buffer self.buf += data while True: msg = self.check_for_slip_message() if msg is None: break # Need to wait for more data else: # msg is not None handle_slip_message(msg) def connection_lost(self, exc): print('port closed') self.transport.loop.stop() def pause_writing(self): print('pause writing') print(self.transport.get_write_buffer_size()) def resume_writing(self): print(self.transport.get_write_buffer_size()) print('resume writing') loop = asyncio.get_event_loop() coro = serial_asyncio.create_serial_connection(loop, SLIPProtocol, '/dev/ttyACM0', baudrate=115200) transport, protocol = loop.run_until_complete(coro) loop.run_forever() loop.close()