Python: SLIP-Decoder mit serial_asyncio

English Deutsch

Das folgende Python-Skript empfängt SLIP-kodierte Daten von einer seriellen Schnittstelle (in diesem Beispiel /dev/ttyACM0) und dekodiert die SLIP-Nachrichten mit der vollständig asynchronen (auf asyncio basierenden) Bibliothek serial_asyncio, die Sie mit

install_pyserial_asyncio.sh
pip install -U pyserial-asyncio

installieren können. Außerdem müssen Sie ansicolors für farbiges Ausgeben auf der Konsole installieren:

install_ansicolors.sh
pip install -U ansicolors
slip_decoder.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
__author__ = "Uli Köhler"
__license__ = "CC0 1.0 Universal"

import asyncio
from colors import red
import serial_asyncio

SLIP_END = 0o300
SLIP_ESC = 0o333
SLIP_ESCEND = 0o334
SLIP_ESCESC = 0o335

def handle_slip_message(msg):
    print(f"Nachricht empfangen mit Länge", len(msg))

class SLIPProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        self.msg = bytes() # Nachrichtenpuffer
        self.transport = transport
        print('Port geöffnet', transport)
        transport.serial.rts = False  # Sie können das Serial-Objekt über transport manipulieren
        # "Enter" senden, um Ausgabe anzufordern
        self.buf = b''

    def check_for_slip_message(self):
        # Identify end of message in data
        decoded = []
        last_char_is_esc = False
        for i in range(len(self.buf)):
            c = self.buf[i]
            if last_char_is_esc:
                # This character must be either
                # SLIP_ESCEND or SLIP_ESCESC
                if c == SLIP_ESCEND: # Literal END character
                    decoded.append(SLIP_END)
                elif c == SLIP_ESCESC: # Literal ESC character
                    decoded.append(SLIP_ESC)
                else:
                    print(red("Encountered invalid SLIP escape sequence. Ignoring..."))
                    # Ignore bad part of message
                    self.buf = self.buf[i+1:]
                    break
                last_char_is_esc = False # Reset state
            else: # last char was NOT ESC
                if c == 192: # END of message
                    # Remove current message from buffer
                    self.buf = self.buf[i+1:]
                    # Emit message
                    return bytes(decoded)
                elif c == SLIP_ESC:
                    # Handle escaped character next
                    last_char_is_esc = True
                else: # Any other character
                    decoded.append(c)
        # No more bytes in buffer => no more message
        return None

    def data_received(self, data):
        # Append new data to buffer
        self.buf += data
        while True:
            msg = self.check_for_slip_message()
            if msg is None:
                break # Need to wait for more data
            else: # msg is not None
                handle_slip_message(msg)

    def connection_lost(self, exc):
        print('port closed')
        self.transport.loop.stop()

    def pause_writing(self):
        print('pause writing')
        print(self.transport.get_write_buffer_size())

    def resume_writing(self):
        print(self.transport.get_write_buffer_size())
        print('resume writing')

loop = asyncio.get_event_loop()
coro = serial_asyncio.create_serial_connection(loop, SLIPProtocol, '/dev/ttyACM0', baudrate=115200)
transport, protocol = loop.run_until_complete(coro)
loop.run_forever()
loop.close()

Check out similar posts by category: Embedded, Python