A Python SLIP decoder using serial_asyncio
The following Python script receives SLIP-encoded data from a serial port (/dev/ttyACM0
in this example) and decodes the SLIP messages using the fully asynchronous (asyncio
-based) serial_asyncio
library which you can install using
pip install -U pyserial-asyncio
You also need to install ansicolors
for colored printing on the console:
pip install -U ansicolors
Full source code
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "Uli Köhler"
__license__ = "CC0 1.0 Universal"
import asyncio
from colors import red
import serial_asyncio
SLIP_END = 0o300
SLIP_ESC = 0o333
SLIP_ESCEND = 0o334
SLIP_ESCESC = 0o335
def handle_slip_message(msg):
print(f"Received message of length", len(msg))
class SLIPProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.msg = bytes() # Message buffer
self.transport = transport
print('port opened', transport)
transport.serial.rts = False # You can manipulate Serial object via transport
# Send "enter" to prompt output
self.buf = b''
def check_for_slip_message(self):
# Identify end of message in data
decoded = []
last_char_is_esc = False
for i in range(len(self.buf)):
c = self.buf[i]
if last_char_is_esc:
# This character must be either
# SLIP_ESCEND or SLIP_ESCESC
if c == SLIP_ESCEND: # Literal END character
decoded.append(SLIP_END)
elif c == SLIP_ESCESC: # Literal ESC character
decoded.append(SLIP_ESC)
else:
print(red("Encountered invalid SLIP escape sequence. Ignoring..."))
# Ignore bad part of message
self.buf = self.buf[i+1:]
break
last_char_is_esc = False # Reset state
else: # last char was NOT ESC
if c == 192: # END of message
# Remove current message from buffer
self.buf = self.buf[i+1:]
# Emit message
return bytes(decoded)
elif c == SLIP_ESC:
# Handle escaped character next
last_char_is_esc = True
else: # Any other character
decoded.append(c)
# No more bytes in buffer => no more message
return None
def data_received(self, data):
# Append new data to buffer
self.buf += data
while True:
msg = self.check_for_slip_message()
if msg is None:
break # Need to wait for more data
else: # msg is not None
handle_slip_message(msg)
def connection_lost(self, exc):
print('port closed')
self.transport.loop.stop()
def pause_writing(self):
print('pause writing')
print(self.transport.get_write_buffer_size())
def resume_writing(self):
print(self.transport.get_write_buffer_size())
print('resume writing')
loop = asyncio.get_event_loop()
coro = serial_asyncio.create_serial_connection(loop, SLIPProtocol, '/dev/ttyACM0', baudrate=115200)
transport, protocol = loop.run_until_complete(coro)
loop.run_forever()
loop.close()