Python

How to fix poetry publish HTTP Error 403: Invalid or non-existent authentication information

Problem:

You intend to publish your package on PyPI using poetry using a command such as

poetry publish

However, you see the following error message:

Publishing mypackage (0.1.0) to PyPI
 - Uploading mypackage-0.1.0-py3-none-any.whl FAILED

HTTP Error 403: Invalid or non-existent authentication information. See https://pypi.org/help/#invalid-auth for more information. | b'<html>\n <head>\n  <title>403 Invalid or non-existent authentication information. See https://pypi.org/help/#invalid-auth for more information.\n \n <body>\n  <h1>403 Invalid or non-existent authentication information. See https://pypi.org/help/#invalid-auth for more information.\n  Access was denied to this resource.<br/><br/>\nInvalid or non-existent authentication information. See https://pypi.org/help/#invalid-auth for more information.\n\n\n \n'

Solution:

First, you need to create a PyPI API token if you don’t have one already. The username/password authentication is deprecated.

Now run the following command to configure poetry:

poetry config http-basic.pypi __token__ APITOKEN

where APITOKEN is the API token you want to use on this computer. Leave __token__ as-is since it’s a special username that tells PyPI to use an API token instead of the old & deprecated username/password authentication

It should look like this:

poetry config http-basic.pypi __token__ pypi-AgEIc[...]

Now retry publishing:

poetry publish

Example output:

Publishing mypackage (0.1.0) to PyPI
 - Uploading mypackage-0.1.0-py3-none-any.whl 100%
 - Uploading mypackage-0.1.0.tar.gz 100%

 

Posted by Uli Köhler in Python

Recommended python project initialization & build tool

I recommend Poetry for initializing, managing & publishing python projects.

Posted by Uli Köhler in Python

Robust Linux Python serial port filtering by name and manufacturer using udev

Important note: This code is deprecated as it has been superseded by the UliSerial library on GitHub. UIiSerial implements the serial port filtering in a platform-independent manner using pyserial built-in features.

 

Typically, when one intends to select a serial port in Python, you use a sequence such as

glob.glob('/dev/ttyACM*')[0]

or just go to straight hard-coding the port such as /dev/ttyACM0.

It should be quite obvious why this isn’t robust:

  • There might be multiple serial ports and the order is unknown
  • You code might need to work on different computers, which lead to different ports being assigned
  • When you re-plug a device, it might get assigned a differnent serial port number (e.g. /dev/ttyACM1)

The following code uses only the linux integrated tool udevadm to query information and built-in libraries. Currently it does not work on anything but Linux.

import glob
import subprocess
import re

def get_serial_ports():
    # Get a list of /dev/ttyACM* and /dev/ttyUSB* devices
    ports = glob.glob('/dev/ttyACM*') + glob.glob('/dev/ttyUSB*')
    return ports

def get_udev_info(port):
    # Run the udevadm command and get the output
    result = subprocess.run(['udevadm', 'info', '-q', 'all', '-r', '-n', port], capture_output=True, text=True)
    return result.stdout

def find_serial_ports(vendor=None, model=None, usb_vendor=None, usb_model=None, usb_model_id=None, vendor_id=None):
    # Mapping of human-readable names to udevadm keys
    attribute_mapping = {
        "vendor": "ID_VENDOR_FROM_DATABASE",
        "model": "ID_MODEL_FROM_DATABASE",
        "usb_vendor": "ID_USB_VENDOR",
        "usb_model": "ID_USB_MODEL_ENC",
        "usb_model_id": "ID_USB_MODEL_ID",
        "vendor_id": "ID_VENDOR_ID",
        "usb_path": "ID_PATH",
        "serial": "ID_SERIAL_SHORT"
    }

    # Filters based on provided arguments
    filters = {}
    for key, value in locals().items():
        if value and key in attribute_mapping:
            filters[attribute_mapping[key]] = value

    # Find and filter ports
    ports = get_serial_ports()
    filtered_ports = []

    for port in ports:
        udev_info = get_udev_info(port)
        match = True

        for key, value in filters.items():
            if not re.search(f"{key}={re.escape(value)}", udev_info):
                match = False
                break

        if match:
            filtered_ports.append(port)

    return filtered_ports

def get_serial_port_info(port):
    # Mapping of udevadm keys to human-readable names
    attribute_mapping = {
        "ID_VENDOR_FROM_DATABASE": "vendor",
        "ID_MODEL_FROM_DATABASE": "model",
        "ID_USB_VENDOR": "usb_vendor",
        "ID_USB_MODEL_ENC": "usb_model",
        "ID_USB_MODEL_ID": "usb_model_id",
        "ID_VENDOR_ID": "vendor_id",
        "ID_SERIAL_SHORT": "serial",
    }

    # Run the udevadm command and get the output
    udev_info = get_udev_info(port)
    port_info = {}

    for line in udev_info.splitlines():
        if line.startswith('E: '):
            key, value = line[3:].split('=', 1)
            if key in attribute_mapping:
                # Decode escape sequences like \x20 to a space.
                # NOTE: Since only \x20 is common, we currently only replace that one
                port_info[attribute_mapping[key]] = value.replace('\\x20', ' ')

    return port_info

def find_serial_port(**kwargs):
    """
    Find a single serial port matching the provided filters.
    """
    ports = find_serial_ports(**kwargs)
    if len(ports) > 1:
        raise ValueError("Multiple matching ports found for filters: " + str(kwargs))
    elif len(ports) == 0:
        raise ValueError("No matching ports found for filters: " + str(kwargs))
    else:
        return ports[0]

 

Example: Find a serial port

# Example usage: Find a serial port
matching_ports = find_serial_ports(vendor="OpenMoko, Inc.")
print(matching_ports) # Prints e.g. ['/dev/ttyACM0']

Example: Print information about a given port

This is typically used so you know what filters to add for find_serial_ports().

# Example usage: Print info about a serial port
serial_port_info = get_serial_port_info('/dev/ttyACM0')
print(serial_port_info)

This prints, for example:

{
    'serial': '01010A23535223934CF29A1EF5000007',
    'vendor_id': '1d50',
    'usb_model': 'Marlin USB Device',
    'usb_model_id': '6029',
    'usb_vendor':
    'marlinfw.org',
    'vendor': 'OpenMoko, Inc.',
    'model': 'Marlin 2.0 (Serial)'
}

 

Posted by Uli Köhler in Electronics, Linux, Python

Jupyter widget notebook with two sliders, making a HTTP POST request on change

This extended version of Jupyter Widget notebook with interactive IntSlider making a HTTP POST request features two sliders instead of one.

import ipywidgets as widgets
import httpx
from IPython.display import display
# Define the slider widget
delaySlider = widgets.IntSlider(
    value=450,  # Initial value
    min=0,    # Minimum value
    max=2000,    # Maximum value
    step=1,  # Step size
    description='Delay:'
)
lengthSlider = widgets.IntSlider(
    value=20*10,  # Initial value
    min=0,    # Minimum value
    max=40*10,    # Maximum value
    step=1,  # Step size
    description='Length:'
)
# Define a function to handle slider changes
def on_slider_change(change):
    # Define the API URL with the slider value
    httpx.post("http://10.1.2.3/api/configure", json={"channels":[{
        "channel": 0,
        "delay": delaySlider.value,
        "length": lengthSlider.value,
    }]})
# Attach the slider change handler to the slider widget
delaySlider.observe(on_slider_change, names='value')
lengthSlider.observe(on_slider_change, names='value')
# Display the slider widget in the notebook
display(widgets.Box(children=[delaySlider, lengthSlider]))

 

Posted by Uli Köhler in Jupyter, Python

Jupyter Widget notebook with interactive IntSlider making a HTTP POST request

This Jupyter notebook displays an IntSlider and makes a HTTP POST request with JSON body on every change using the httpx library.

import ipywidgets as widgets
import httpx
from IPython.display import display
# Define the slider widget
slider = widgets.IntSlider(
    value=450,  # Initial value
    min=0,    # Minimum value
    max=2000,    # Maximum value
    step=1,  # Step size
    description='Value:'
)
# Define a function to handle slider changes
def on_slider_change(change):
    slider_value = change['new']
    # Define the API URL with the slider value
    httpx.post("http://10.1.2.3/api/configure", json={"delay": slider_value})
# Attach the slider change handler to the slider widget
slider.observe(on_slider_change, names='value')
# Display the slider widget in the notebook
display(slider)

 

Posted by Uli Köhler in Jupyter, Python

Trimming Down Long Directory and File Names using Python

This Python script takes a directory path and then proceeds to scan every file and sub-directory within, looking for any filenames or directory names that exceed a specified length. Once found, the script truncates these names to fit within the desired length and appends an ellipsis (e.g., ...) to indicate the truncation. The beauty of this script lies in its configurability and the safety features embedded within it.

Note: Using this script will lose information (because the new filenames will be shorter and any important information in the rest of the filename will be lost forever). Additionally, it might lose information in other ways. Even though we have tested it carefully, it might still destroy your data in unexpected ways. Use a

How to use

The script provides the following command line options:

usage: FixLongFilenames.py [-h] [-n LENGTH] [--ellipsis ELLIPSIS] [--dry] directory

Shorten long filenames and directory names.

positional arguments:
  directory             The directory to process.

options:
  -h, --help            show this help message and exit
  -n LENGTH, --length LENGTH
                        The maximum allowable length for directory or file names.
  --ellipsis ELLIPSIS   The ellipsis to use when shortening.
  --dry                 Dry run mode, only log what would be renamed without actual renaming.

Source code

#!/usr/bin/env python3
import os
import argparse

def shorten_path(path, max_length, ellipsis, dry_run):
    if os.path.isdir(path):
        base_name = os.path.basename(path)
        if len(base_name) > max_length:
            new_name = base_name[:max_length] + ellipsis
            new_path = os.path.join(os.path.dirname(path), new_name)
            if not os.path.exists(new_path):
                if dry_run:
                    print(f"[DRY RUN] Directory would be renamed: {path} -> {new_name}")
                else:
                    os.rename(path, new_path)
                    print(f"Renamed directory: {path} -> {new_name}")
                return new_path
    else:
        base_name, ext = os.path.splitext(os.path.basename(path))
        if len(base_name) > max_length:
            new_name = base_name[:max_length] + ellipsis + ext
            new_path = os.path.join(os.path.dirname(path), new_name)
            if not os.path.exists(new_path):
                if dry_run:
                    print(f"[DRY RUN] File would be renamed: {path} -> {new_name}")
                else:
                    os.rename(path, new_path)
                    print(f"Renamed file: {path} -> {new_name}")
                return new_path
    return path

def iterate_and_shorten(directory, max_length, ellipsis, dry_run):
    for root, dirs, files in os.walk(directory, topdown=False):
        for dname in dirs:
            dpath = os.path.join(root, dname)
            shortened_path = shorten_path(dpath, max_length, ellipsis, dry_run)
            dirs[dirs.index(dname)] = os.path.basename(shortened_path)

        for fname in files:
            fpath = os.path.join(root, fname)
            shorten_path(fpath, max_length, ellipsis, dry_run)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Shorten long filenames and directory names.")
    parser.add_argument('directory', help="The directory to process.")
    parser.add_argument('-n', '--length', type=int, default=100, help="The maximum allowable length for directory or file names.")
    parser.add_argument('--ellipsis', default="...", help="The ellipsis to use when shortening.")
    parser.add_argument('--dry', action="store_true", help="Dry run mode, only log what would be renamed without actual renaming.")
    args = parser.parse_args()

    iterate_and_shorten(args.directory, args.length, args.ellipsis, args.dry)

 

Posted by Uli Köhler in Python

Python script to re-encode all audio files to OPUS recursively

This Python script recursively converts all FLAC, WAV and MP3 etc files recursively in the given input directory to OPUS. The default bitrate is 96k which is roughly equivalent to 192k to 256k MP3. You can select a different

After encoding, the source file and converted file are checked for their length automatically. If the length differs more than 0.2 seconds, the encoding is considered failed.

There is an option --delete to delete the source file (only if the OPUS file has equivalent length to the source file of course). Use with caution and make backups before using, no warranty is expressed or implied for this script – it might lose some of your music!

The script automatically performs parallel encoding, hence it’s pretty fast even if transcoding many files.

No files are overwritten unless the -o/--overwrite is given on the command line.

How to use

First, install ffmpeg and ffprobe. Now install the python binding for ffmpeg using

pip install ffmpeg-python

Here’s the command line options the script provides:

usage: ConvertToOPUS.py [-h] [--bitrate BITRATE] [--delete] [-o] [-j THREADS] directory

Transcode MP3, FLAC & WAV files to OPUS.

positional arguments:
  directory             Directory containing the files to transcode

options:
  -h, --help            show this help message and exit
  --bitrate BITRATE     Bitrate for the OPUS files
  --delete              Delete original files after successful transcoding
  -o, --overwrite       Overwrite existing OPUS files
  -j THREADS, --threads THREADS
                        Number of parallel transcodes

Source code

#!/usr/bin/env python3
import os
import sys
import argparse
import ffmpeg
from concurrent.futures import ProcessPoolExecutor
import logging

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

def get_file_duration(file_path):
    try:
        probe = ffmpeg.probe(file_path)
        audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None)
        return float(audio_stream['duration'])
    except Exception as e:
        logging.error(f"Failed to get duration for {file_path}: {e}")
        return None

def transcode_file(file_path, bitrate, delete_after_transcode, overwrite):
    try:
        output_file = file_path.rsplit('.', 1)[0] + '.opus'
        if not overwrite and os.path.exists(output_file):
            logging.info(f"{output_file} exists. Skipping due to no overwrite flag.")
            return

        stream = (
            ffmpeg
            .input(file_path)
            .output(output_file, ab=bitrate, loglevel="error")
        )

        if overwrite:
            stream = stream.overwrite_output()

        stream.run()

        original_duration = get_file_duration(file_path)
        transcoded_duration = get_file_duration(output_file)

        if original_duration is None or transcoded_duration is None:
            logging.error(f"Failed to transcode {file_path}. Could not retrieve file duration.")
            os.remove(output_file)
            return

        duration_diff = abs(original_duration - transcoded_duration)
        if duration_diff >= 0.2:
            logging.error(f"Transcoding failed for {file_path}. Duration mismatch.")
            os.remove(output_file)
        else:
            logging.info(f"Transcoded {file_path} to {output_file} successfully.")
            if delete_after_transcode:
                os.remove(file_path)
    except Exception as e:
        logging.error(f"Failed to transcode {file_path}: {e}")

def main():
    parser = argparse.ArgumentParser(description="Transcode MP3, FLAC & WAV files to OPUS.")
    parser.add_argument("directory", type=str, help="Directory containing the files to transcode")
    parser.add_argument("--bitrate", type=str, default="96k", help="Bitrate for the OPUS files")
    parser.add_argument("--delete", action="store_true", help="Delete original files after successful transcoding")
    parser.add_argument("-o", "--overwrite", action="store_true", help="Overwrite existing OPUS files")
    parser.add_argument("-j", "--threads", type=int, default=os.cpu_count(), help="Number of parallel transcodes")

    args = parser.parse_args()

    supported_extensions = ['mp3', 'flac', 'wav', 'm4a']

    # List all files with the supported extensions in the directory recursively
    files_to_transcode = []
    for root, dirs, files in os.walk(args.directory):
        for file in files:
            if file.split('.')[-1].lower() in supported_extensions:
                files_to_transcode.append(os.path.join(root, file))

    logging.info(f"Found {len(files_to_transcode)} files to transcode.")

    with ProcessPoolExecutor(max_workers=args.threads) as executor:
        for file_path in files_to_transcode:
            executor.submit(transcode_file, file_path, args.bitrate, args.delete, args.overwrite)

if __name__ == '__main__':
    main()

 

Posted by Uli Köhler in Audio, Audio/Video, Python

Python script to remove strings from filenames and directory names recursively

This Python script renames files and directories in a given folder recursively, removing a given string from filenames and directory names. It does not require any libraries to be installed.

Note that this script has been tested, however we can’t guarantee it won’t destroy some data under some conditions. Backup your data and use with caution (best to try using --dry --verbose!)

These are the command line options it provides:

usage: RemoveFromNames.py [-h] [-c] [--dry] [-D] [-F] [-i] [-v] dir remove_string

Process and rename files and directories by removing a specific string.

positional arguments:
  dir                   Input directory to scan recursively
  remove_string         String to remove from filenames and folder names

options:
  -h, --help            show this help message and exit
  -c, --compress_spaces
                        Compress multiple spaces into one
  --dry                 Dry run. Do not actually rename anything.
  -D, --no_dirs         Don't rename directories.
  -F, --no_files        Don't rename files.
  -i, --case_insensitive
                        Perform case-insensitive compare and replace.
  -v, --verbose         Print what is being renamed.

Source code:

#!/usr/bin/env python3
import os
import argparse

def rename_files_in_directory(base_dir, remove_string, compress_spaces=False, dry_run=False, verbose=False, no_dirs=False, no_files=False, case_insensitive=False):
    if case_insensitive:
        def contains(sub, main):
            return sub.lower() in main.lower()
        
        def replace(source, target, string):
            pattern = re.compile(re.escape(source), re.IGNORECASE)
            return pattern.sub(target, string)
    else:
        contains = lambda sub, main: sub in main
        replace = lambda source, target, string: string.replace(source, target)

    # First, rename directories if not skipped by --no_dirs
    if not no_dirs:
        for dirpath, dirnames, _ in os.walk(base_dir, topdown=False):
            for dirname in dirnames:
                if contains(remove_string, dirname):
                    new_name = replace(remove_string, "", dirname)
                    if compress_spaces:
                        new_name = ' '.join(new_name.split())
                    new_name = new_name.strip()

                    if os.path.exists(os.path.join(dirpath, new_name)):
                        print(f"Cannot rename directory '{dirname}' to '{new_name}' in '{dirpath}' because a directory with the new name already exists.")
                        continue

                    if verbose:
                        print(f"Renaming directory '{dirname}' to '{new_name}' in '{dirpath}'")
                    
                    if not dry_run:
                        os.rename(os.path.join(dirpath, dirname), os.path.join(dirpath, new_name))

    # Then, rename files if not skipped by --no_files
    if not no_files:
        for dirpath, _, filenames in os.walk(base_dir):
            for filename in filenames:
                if contains(remove_string, filename):
                    new_name = replace(remove_string, "", filename)
                    if compress_spaces:
                        new_name = ' '.join(new_name.split())
                    new_name = new_name.strip()
                    
                    if os.path.exists(os.path.join(dirpath, new_name)):
                        print(f"Cannot rename '{filename}' to '{new_name}' in '{dirpath}' because a file with the new name already exists.")
                        continue
                    
                    if verbose:
                        print(f"Renaming file '{filename}' to '{new_name}' in '{dirpath}'")
                    
                    if not dry_run:
                        os.rename(os.path.join(dirpath, filename), os.path.join(dirpath, new_name))

def main():
    parser = argparse.ArgumentParser(description="Process and rename files and directories by removing a specific string.")
    
    parser.add_argument("dir", type=str, help="Input directory to scan recursively")
    parser.add_argument("remove_string", type=str, help="String to remove from filenames and folder names")
    parser.add_argument("-c", "--compress_spaces", action="store_true", help="Compress multiple spaces into one")
    parser.add_argument("--dry", action="store_true", help="Dry run. Do not actually rename anything.")
    parser.add_argument("-D", "--no_dirs", action="store_true", help="Don't rename directories.")
    parser.add_argument("-F", "--no_files", action="store_true", help="Don't rename files.")
    parser.add_argument("-i", "--case_insensitive", action="store_true", help="Perform case-insensitive compare and replace.")
    parser.add_argument("-v", "--verbose", action="store_true", help="Print what is being renamed.")

    args = parser.parse_args()

    rename_files_in_directory(args.dir, args.remove_string, args.compress_spaces, args.dry, args.verbose, args.no_dirs, args.no_files, args.case_insensitive)

if __name__ == "__main__":
    main()

 

Posted by Uli Köhler in Python

How to read multimeter and paste into Excel spreadsheet cell on hotkey press using Python

The following Python script uses PyVISA / LabInstruments to read a Rigol DM3058(E) multimeter and automatically paste the voltage reading into an open spreadsheet on Shift+Alt+Q press.

First, clone LabInstruments to the current directory using

git clone https://github.com/ulikoehler/LabInstruments.git

Now install the requirements from the following requirements.txt

system_hotkey310
pyperclip
pyvisa
pyvisa-py

and run the script:

#!/usr/bin/env python3
"""
This script allows quick reading of multimeter, automatically pasting the reading
into the current e.g. excel spreadsheet and pressing "Enter" to advance to the next cell.

Press Shift+Alt+Q to read the multimeter and paste the reading into the current cell.
"""
import subprocess
import pyautogui
import time
import re
import pyperclip
import sexpdata
import pyvisa
from LabInstruments.DM3058 import DM3058

import locale
locale.setlocale(locale.LC_NUMERIC, f"{locale.getdefaultlocale()[0]}.utf-8")

rm = pyvisa.ResourceManager()
rm.list_resources()
inst = rm.open_resource('USB0::6833::2500::DM3R245003970::0::INSTR')
multimeter = DM3058(inst)
multimeter.set_speed("S")
multimeter.mode_dc_voltage()


def run():
    voltage = multimeter.read_voltage()
    
    locale_formatted_voltage = locale.format_string("%.5f", voltage, grouping=False)
    print("Voltage: ", locale_formatted_voltage)

    # Copy locale-formatted voltage to clipboard
    pyperclip.copy(locale_formatted_voltage)
    # Paste into cell
    pyautogui.hotkey('ctrl', 'v')
    # Wait for excel
    time.sleep(0.1)
    # Press Enter key to save
    pyautogui.press('enter')

# Register hotkeys
from system_hotkey import SystemHotkey
hk = SystemHotkey()
hk.register(('shift', 'alt', 'q'), callback=lambda x: run())

# Wait for exit
while True:
    time.sleep(10)

 

Posted by Uli Köhler in Electronics, Python

Jupyter ipywidgets slider making a HTTP request on change

The following Jupyter cell uses ipywidgets to display a slider from 0.0 … 1.0 and, on change, will submit a HTTP request to http://10.1.2.3.4/api/set-power?power=[slider value].

import ipywidgets as widgets
import requests
from IPython.display import display

# Define the slider widget
slider = widgets.FloatSlider(
    value=0.5,  # Initial value
    min=0.0,    # Minimum value
    max=1.0,    # Maximum value
    step=0.01,  # Step size
    description='Slider:'
)

# Define a function to handle slider changes
def on_slider_change(change):
    slider_value = change['new']
    # Define the API URL with the slider value
    api_url = f'http://10.1.2.3.4/api/set-power?power={slider_value}'
    print(api_url)
    
    # Make the HTTP request
    try:
        response = requests.get(api_url)
        response.raise_for_status()  # Raise an exception for HTTP errors
        print(f'Successfully set power to {slider_value}')
    except requests.exceptions.RequestException as e:
        print(f'Error: {e}')

# Attach the slider change handler to the slider widget
slider.observe(on_slider_change, names='value')

# Display the slider widget in the notebook
display(slider)

 

Posted by Uli Köhler in Jupyter, Python

How to prevent pyinvoke “UnexpectedExit: Encountered a bad command exit code!”

Problem:

When you try running your command using pyinvoke’s invoke.run(), the native command returns a non-zero exit code making your script fail instead of just continuing (e.g. to evaluate the output):

Traceback (most recent call last):
File "/home/user/myscript.py", line 7, in <module>
result = invoke.run("mycommand", out_stream=outStream)
# ...
File "/usr/local/lib/python3.10/dist-packages/invoke/runners.py", line 518, in _finish
raise UnexpectedExit(result)
invoke.exceptions.UnexpectedExit: Encountered a bad command exit code!

Solution:

Use warn=True as an argument to invoke.run()

This will tell invoke to just warn you about the command exiting with non-zero exit status instead of throwing an UnexpectedExit

Posted by Uli Köhler in Python

How to compute non-inverting OpAmp amplifier gain using UliEngineering in Python

In this example, we’ll use the UliEngineering library to compute the gain of a non-inverting OpAmp amplifier, given the two feedback resistor values R1 and R2

In order to install UliEngineering (a Python 3 library) run:

sudo pip3 install -U UliEngineering

We can now use noninverting_amplifier_gain() from the UliEngineering.Electronics.OpAmp package to convert between the units of temperature:

Tip: You can pass both numbers (like 100e3) or strings (such as 100 kΩ) to most UliEngineering functions. SI prefixes like k and m are automatically decoded.

Example:

from UliEngineering.Electronics.OpAmp import noninverting_amplifier_gain

# Gain of a non-inverting amplifier with 100k & 10k feedback resistor
gain = noninverting_amplifier_gain(100e3, 10e3)
# gain = 11.0

# ... or you can use strings
gain = noninverting_amplifier_gain("100k", "10k")
 
# ... or strings with units
gain = noninverting_amplifier_gain("100kΩ", "10kΩ")

# You can also automatically format the result
from UliEngineering.EngineerIO import auto_format
print(auto_format(noninverting_amplifier_gain, "100k", "10k"))
# prints 11.0 V/V
Posted by Uli Köhler in Electronics, Python

How to access RouterOS API in Python using the ‘routeros’ library (minimal example)

This example uses the routeros library from PyPI (GitHub) to access the MikroTik API and extract the system identity.

#!/usr/bin/env python3
from routeros import login

routeros = login('admin', 'abc123abc', '192.168.88.1')

output = routeros('/system/identity/print')
# Extract the one identity string from the list of dictionaries
print(output[0]['name'])

 

Posted by Uli Köhler in MikroTik, Python

Netmiko MikroTik RouterOS SSH key login example

In our previous example Netmiko MikroTik RouterOS minimal example we showed how to login to a RouterOS device using netmikoand password-based login.

#!/usr/bin/env python3  
from netmiko import ConnectHandler
import os.path
mikrotik = {
    'device_type': 'mikrotik_routeros',
    'host':   '192.168.88.1',
    'username': 'admin',
    'key_file': os.path.expanduser("~/.ssh/id_mikrotik"),
}
with ConnectHandler(**mikrotik) as mikrotik_connection:
    print(mikrotik_connection.send_command(f'/system/identity/print', cmd_verify=False))

 

Example output:

name: MySwitch01

 

Posted by Uli Köhler in MikroTik, Networking, Python

How to pass string to executable stdin using invoke.run() in Python

In our previous example How to capture stdout to string using invoke.run() in Python we showcased how to use invoke.run(..., out_stream=...) to capture stdout to a StringIO which can then be converted to a string.

Similarly, we can pass a string to the sub-process stdin by first converting it into a StringIO and then using invoke.run(..., in_stream=...).

stdin_str = "abc123" # what we want to pass to stdint

stdin_io = StringIO(stdin_str)
result = invoke.run("myexecutable", in_stream=stdin_in)
# ...
Posted by Uli Köhler in Python

How to capture stdout to string using invoke.run() in Python

This example showcases how to capture stdout to a string instead of printing it to system stdout:

The basic trick is to initialize a StringIO (a file-like object where pyinvoke can write the stdout from the sub-process to) and pass it to the out_stream argument of invoke.run()

import invoke
from io import StringIO

outStream = StringIO()
result = invoke.run("wg genkey", out_stream=outStream)
stdout_str = outStream.getvalue()
print("Result: ", stdout_str)

 

Posted by Uli Köhler in Python

How to list only sub-folders (not files) using Python

Whereas Python’s os.listdir("mydirectory") will list both sub-files and sub-directories, you can use this simple list comprehension:

parent_folder = "myfolder"
sub_directories = [
    filename for filename in os.listdir(parent_folder)
    if os.path.isdir(os.path.join(parent_folder, filename))
]

which will only list sub-directories of the parent_fodler

 

Posted by Uli Köhler in Python

How to matplotlib plt.savefig() to a io.BytesIO buffer

You can just pass a io.BytesIO() to plt.savefig() as first parameter. Note that format defaults to "png", but best practice is to explicitly specify it.

# Save plot to BytesIO
bio = io.BytesIO()
plt.savefig(bio, dpi=250, format="png")

Full example:

#!/usr/bin/env python3
import numpy as np
import io
import matplotlib.pyplot as plt

# Generate data
x = np.linspace(0, 2*np.pi, 100)
y = np.sin(x)

# Plot data
plt.plot(x, y)

# Save plot to BytesIO
bio = io.BytesIO()
plt.savefig(bio, dpi=250, format="png")

# Cleanup plot
plt.close(plt.gcf())
plt.clf()

# Write BytesIO to file
with open("plot.png", "wb") as f:
    f.write(bio.getvalue())

Output plot.png:

 

Posted by Uli Köhler in Python

Customizable Python INI file parser

I recommend using the configparser library which comes with Python for parsing INI files. However, if you need a highly customizable parser, this code is a good place to start:

def read_ini_config(filename):
    config = {}
    current_section = None

    with open(filename, 'r', encoding="utf-8") as file:
        for line in file:
            line = line.strip()

            # Skip empty lines and comments
            if not line or line.startswith(';') or line.startswith('#'):
                continue

            # Check if it's a section header
            if line.startswith('[') and line.endswith(']'):
                current_section = line[1:-1]
                config[current_section] = {}
            else:
                # Parse key-value pairs
                key, value = line.split('=', 1)
                key = key.strip()
                value = value.strip()
                config[current_section][key] = value

    return config

 

Posted by Uli Köhler in Python

How to list all tags of remote git repository using Python

import subprocess

def list_all_tags_for_remote_git_repo(url):
    """
    Given a repository URL, list all tags for that repository
    without cloning it.

    This function use "git ls-remote", so the
    "git" command line program must be available.
    """
    # Run the 'git' command to fetch and list remote tags
    result = subprocess.run([
        "git", "ls-remote", "--tags", repo_url
    ], stdout=subprocess.PIPE, text=True)

    # Process the output to extract tag names
    output_lines = result.stdout.splitlines()
    tags = [
        line.split("refs/tags/")[-1] for line in output_lines
        if "refs/tags/" in line and "^{}" not in line
    ]

    return tags

Usage example:

list_all_tags_for_remote_git_repo("https://github.com/EmbeddedRPC/erpc.git")

Result:

['1.10.0',
 '1.11.0',
 '1.4.0',
 '1.4.1',
 '1.5.0',
 '1.6.0',
 '1.7.0',
 '1.7.1',
 '1.7.2',
 '1.7.3',
 '1.7.4',
 '1.8.0',
 '1.8.1',
 '1.9.0',
 '1.9.1']

 

Posted by Uli Köhler in git, Python