Python

Python script to re-encode all audio files to OPUS recursively

This Python script recursively converts all FLAC, WAV and MP3 etc files recursively in the given input directory to OPUS. The default bitrate is 96k which is roughly equivalent to 192k to 256k MP3. You can select a different

After encoding, the source file and converted file are checked for their length automatically. If the length differs more than 0.2 seconds, the encoding is considered failed.

There is an option --delete to delete the source file (only if the OPUS file has equivalent length to the source file of course). Use with caution and make backups before using, no warranty is expressed or implied for this script – it might lose some of your music!

The script automatically performs parallel encoding, hence it’s pretty fast even if transcoding many files.

No files are overwritten unless the -o/--overwrite is given on the command line.

How to use

First, install ffmpeg and ffprobe. Now install the python binding for ffmpeg using

pip install ffmpeg-python

Here’s the command line options the script provides:

usage: ConvertToOPUS.py [-h] [--bitrate BITRATE] [--delete] [-o] [-j THREADS] directory

Transcode MP3, FLAC & WAV files to OPUS.

positional arguments:
  directory             Directory containing the files to transcode

options:
  -h, --help            show this help message and exit
  --bitrate BITRATE     Bitrate for the OPUS files
  --delete              Delete original files after successful transcoding
  -o, --overwrite       Overwrite existing OPUS files
  -j THREADS, --threads THREADS
                        Number of parallel transcodes

Source code

#!/usr/bin/env python3
import os
import sys
import argparse
import ffmpeg
from concurrent.futures import ProcessPoolExecutor
import logging

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

def get_file_duration(file_path):
    try:
        probe = ffmpeg.probe(file_path)
        audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None)
        return float(audio_stream['duration'])
    except Exception as e:
        logging.error(f"Failed to get duration for {file_path}: {e}")
        return None

def transcode_file(file_path, bitrate, delete_after_transcode, overwrite):
    try:
        output_file = file_path.rsplit('.', 1)[0] + '.opus'
        if not overwrite and os.path.exists(output_file):
            logging.info(f"{output_file} exists. Skipping due to no overwrite flag.")
            return

        stream = (
            ffmpeg
            .input(file_path)
            .output(output_file, ab=bitrate, loglevel="error")
        )

        if overwrite:
            stream = stream.overwrite_output()

        stream.run()

        original_duration = get_file_duration(file_path)
        transcoded_duration = get_file_duration(output_file)

        if original_duration is None or transcoded_duration is None:
            logging.error(f"Failed to transcode {file_path}. Could not retrieve file duration.")
            os.remove(output_file)
            return

        duration_diff = abs(original_duration - transcoded_duration)
        if duration_diff >= 0.2:
            logging.error(f"Transcoding failed for {file_path}. Duration mismatch.")
            os.remove(output_file)
        else:
            logging.info(f"Transcoded {file_path} to {output_file} successfully.")
            if delete_after_transcode:
                os.remove(file_path)
    except Exception as e:
        logging.error(f"Failed to transcode {file_path}: {e}")

def main():
    parser = argparse.ArgumentParser(description="Transcode MP3, FLAC & WAV files to OPUS.")
    parser.add_argument("directory", type=str, help="Directory containing the files to transcode")
    parser.add_argument("--bitrate", type=str, default="96k", help="Bitrate for the OPUS files")
    parser.add_argument("--delete", action="store_true", help="Delete original files after successful transcoding")
    parser.add_argument("-o", "--overwrite", action="store_true", help="Overwrite existing OPUS files")
    parser.add_argument("-j", "--threads", type=int, default=os.cpu_count(), help="Number of parallel transcodes")

    args = parser.parse_args()

    supported_extensions = ['mp3', 'flac', 'wav', 'm4a']

    # List all files with the supported extensions in the directory recursively
    files_to_transcode = []
    for root, dirs, files in os.walk(args.directory):
        for file in files:
            if file.split('.')[-1].lower() in supported_extensions:
                files_to_transcode.append(os.path.join(root, file))

    logging.info(f"Found {len(files_to_transcode)} files to transcode.")

    with ProcessPoolExecutor(max_workers=args.threads) as executor:
        for file_path in files_to_transcode:
            executor.submit(transcode_file, file_path, args.bitrate, args.delete, args.overwrite)

if __name__ == '__main__':
    main()

 

Posted by Uli Köhler in Audio, Audio/Video, Python

Python script to remove strings from filenames and directory names recursively

This Python script renames files and directories in a given folder recursively, removing a given string from filenames and directory names. It does not require any libraries to be installed.

Note that this script has been tested, however we can’t guarantee it won’t destroy some data under some conditions. Backup your data and use with caution (best to try using --dry --verbose!)

These are the command line options it provides:

usage: RemoveFromNames.py [-h] [-c] [--dry] [-D] [-F] [-i] [-v] dir remove_string

Process and rename files and directories by removing a specific string.

positional arguments:
  dir                   Input directory to scan recursively
  remove_string         String to remove from filenames and folder names

options:
  -h, --help            show this help message and exit
  -c, --compress_spaces
                        Compress multiple spaces into one
  --dry                 Dry run. Do not actually rename anything.
  -D, --no_dirs         Don't rename directories.
  -F, --no_files        Don't rename files.
  -i, --case_insensitive
                        Perform case-insensitive compare and replace.
  -v, --verbose         Print what is being renamed.

Source code:

#!/usr/bin/env python3
import os
import argparse

def rename_files_in_directory(base_dir, remove_string, compress_spaces=False, dry_run=False, verbose=False, no_dirs=False, no_files=False, case_insensitive=False):
    if case_insensitive:
        def contains(sub, main):
            return sub.lower() in main.lower()
        
        def replace(source, target, string):
            pattern = re.compile(re.escape(source), re.IGNORECASE)
            return pattern.sub(target, string)
    else:
        contains = lambda sub, main: sub in main
        replace = lambda source, target, string: string.replace(source, target)

    # First, rename directories if not skipped by --no_dirs
    if not no_dirs:
        for dirpath, dirnames, _ in os.walk(base_dir, topdown=False):
            for dirname in dirnames:
                if contains(remove_string, dirname):
                    new_name = replace(remove_string, "", dirname)
                    if compress_spaces:
                        new_name = ' '.join(new_name.split())
                    new_name = new_name.strip()

                    if os.path.exists(os.path.join(dirpath, new_name)):
                        print(f"Cannot rename directory '{dirname}' to '{new_name}' in '{dirpath}' because a directory with the new name already exists.")
                        continue

                    if verbose:
                        print(f"Renaming directory '{dirname}' to '{new_name}' in '{dirpath}'")
                    
                    if not dry_run:
                        os.rename(os.path.join(dirpath, dirname), os.path.join(dirpath, new_name))

    # Then, rename files if not skipped by --no_files
    if not no_files:
        for dirpath, _, filenames in os.walk(base_dir):
            for filename in filenames:
                if contains(remove_string, filename):
                    new_name = replace(remove_string, "", filename)
                    if compress_spaces:
                        new_name = ' '.join(new_name.split())
                    new_name = new_name.strip()
                    
                    if os.path.exists(os.path.join(dirpath, new_name)):
                        print(f"Cannot rename '{filename}' to '{new_name}' in '{dirpath}' because a file with the new name already exists.")
                        continue
                    
                    if verbose:
                        print(f"Renaming file '{filename}' to '{new_name}' in '{dirpath}'")
                    
                    if not dry_run:
                        os.rename(os.path.join(dirpath, filename), os.path.join(dirpath, new_name))

def main():
    parser = argparse.ArgumentParser(description="Process and rename files and directories by removing a specific string.")
    
    parser.add_argument("dir", type=str, help="Input directory to scan recursively")
    parser.add_argument("remove_string", type=str, help="String to remove from filenames and folder names")
    parser.add_argument("-c", "--compress_spaces", action="store_true", help="Compress multiple spaces into one")
    parser.add_argument("--dry", action="store_true", help="Dry run. Do not actually rename anything.")
    parser.add_argument("-D", "--no_dirs", action="store_true", help="Don't rename directories.")
    parser.add_argument("-F", "--no_files", action="store_true", help="Don't rename files.")
    parser.add_argument("-i", "--case_insensitive", action="store_true", help="Perform case-insensitive compare and replace.")
    parser.add_argument("-v", "--verbose", action="store_true", help="Print what is being renamed.")

    args = parser.parse_args()

    rename_files_in_directory(args.dir, args.remove_string, args.compress_spaces, args.dry, args.verbose, args.no_dirs, args.no_files, args.case_insensitive)

if __name__ == "__main__":
    main()

 

Posted by Uli Köhler in Python

How to read multimeter and paste into Excel spreadsheet cell on hotkey press using Python

The following Python script uses PyVISA / LabInstruments to read a Rigol DM3058(E) multimeter and automatically paste the voltage reading into an open spreadsheet on Shift+Alt+Q press.

First, clone LabInstruments to the current directory using

git clone https://github.com/ulikoehler/LabInstruments.git

Now install the requirements from the following requirements.txt

system_hotkey310
pyperclip
pyvisa
pyvisa-py

and run the script:

#!/usr/bin/env python3
"""
This script allows quick reading of multimeter, automatically pasting the reading
into the current e.g. excel spreadsheet and pressing "Enter" to advance to the next cell.

Press Shift+Alt+Q to read the multimeter and paste the reading into the current cell.
"""
import subprocess
import pyautogui
import time
import re
import pyperclip
import sexpdata
import pyvisa
from LabInstruments.DM3058 import DM3058

import locale
locale.setlocale(locale.LC_NUMERIC, f"{locale.getdefaultlocale()[0]}.utf-8")

rm = pyvisa.ResourceManager()
rm.list_resources()
inst = rm.open_resource('USB0::6833::2500::DM3R245003970::0::INSTR')
multimeter = DM3058(inst)
multimeter.set_speed("S")
multimeter.mode_dc_voltage()


def run():
    voltage = multimeter.read_voltage()
    
    locale_formatted_voltage = locale.format_string("%.5f", voltage, grouping=False)
    print("Voltage: ", locale_formatted_voltage)

    # Copy locale-formatted voltage to clipboard
    pyperclip.copy(locale_formatted_voltage)
    # Paste into cell
    pyautogui.hotkey('ctrl', 'v')
    # Wait for excel
    time.sleep(0.1)
    # Press Enter key to save
    pyautogui.press('enter')

# Register hotkeys
from system_hotkey import SystemHotkey
hk = SystemHotkey()
hk.register(('shift', 'alt', 'q'), callback=lambda x: run())

# Wait for exit
while True:
    time.sleep(10)

 

Posted by Uli Köhler in Electronics, Python

Jupyter ipywidgets slider making a HTTP request on change

The following Jupyter cell uses ipywidgets to display a slider from 0.0 … 1.0 and, on change, will submit a HTTP request to http://10.1.2.3.4/api/set-power?power=[slider value].

import ipywidgets as widgets
import requests
from IPython.display import display

# Define the slider widget
slider = widgets.FloatSlider(
    value=0.5,  # Initial value
    min=0.0,    # Minimum value
    max=1.0,    # Maximum value
    step=0.01,  # Step size
    description='Slider:'
)

# Define a function to handle slider changes
def on_slider_change(change):
    slider_value = change['new']
    # Define the API URL with the slider value
    api_url = f'http://10.1.2.3.4/api/set-power?power={slider_value}'
    print(api_url)
    
    # Make the HTTP request
    try:
        response = requests.get(api_url)
        response.raise_for_status()  # Raise an exception for HTTP errors
        print(f'Successfully set power to {slider_value}')
    except requests.exceptions.RequestException as e:
        print(f'Error: {e}')

# Attach the slider change handler to the slider widget
slider.observe(on_slider_change, names='value')

# Display the slider widget in the notebook
display(slider)

 

Posted by Uli Köhler in Jupyter, Python

How to prevent pyinvoke “UnexpectedExit: Encountered a bad command exit code!”

Problem:

When you try running your command using pyinvoke’s invoke.run(), the native command returns a non-zero exit code making your script fail instead of just continuing (e.g. to evaluate the output):

Traceback (most recent call last):
File "/home/user/myscript.py", line 7, in <module>
result = invoke.run("mycommand", out_stream=outStream)
# ...
File "/usr/local/lib/python3.10/dist-packages/invoke/runners.py", line 518, in _finish
raise UnexpectedExit(result)
invoke.exceptions.UnexpectedExit: Encountered a bad command exit code!

Solution:

Use warn=True as an argument to invoke.run()

This will tell invoke to just warn you about the command exiting with non-zero exit status instead of throwing an UnexpectedExit

Posted by Uli Köhler in Python

How to compute non-inverting OpAmp amplifier gain using UliEngineering in Python

In this example, we’ll use the UliEngineering library to compute the gain of a non-inverting OpAmp amplifier, given the two feedback resistor values R1 and R2

In order to install UliEngineering (a Python 3 library) run:

sudo pip3 install -U UliEngineering

We can now use noninverting_amplifier_gain() from the UliEngineering.Electronics.OpAmp package to convert between the units of temperature:

Tip: You can pass both numbers (like 100e3) or strings (such as 100 kΩ) to most UliEngineering functions. SI prefixes like k and m are automatically decoded.

Example:

from UliEngineering.Electronics.OpAmp import noninverting_amplifier_gain

# Gain of a non-inverting amplifier with 100k & 10k feedback resistor
gain = noninverting_amplifier_gain(100e3, 10e3)
# gain = 11.0

# ... or you can use strings
gain = noninverting_amplifier_gain("100k", "10k")
 
# ... or strings with units
gain = noninverting_amplifier_gain("100kΩ", "10kΩ")

# You can also automatically format the result
from UliEngineering.EngineerIO import auto_format
print(auto_format(noninverting_amplifier_gain, "100k", "10k"))
# prints 11.0 V/V
Posted by Uli Köhler in Electronics, Python

How to access RouterOS API in Python using the ‘routeros’ library (minimal example)

This example uses the routeros library from PyPI (GitHub) to access the MikroTik API and extract the system identity.

#!/usr/bin/env python3
from routeros import login

routeros = login('admin', 'abc123abc', '192.168.88.1')

output = routeros('/system/identity/print')
# Extract the one identity string from the list of dictionaries
print(output[0]['name'])

 

Posted by Uli Köhler in MikroTik, Python

Netmiko MikroTik RouterOS SSH key login example

In our previous example Netmiko MikroTik RouterOS minimal example we showed how to login to a RouterOS device using netmikoand password-based login.

#!/usr/bin/env python3  
from netmiko import ConnectHandler
import os.path
mikrotik = {
    'device_type': 'mikrotik_routeros',
    'host':   '192.168.88.1',
    'username': 'admin',
    'key_file': os.path.expanduser("~/.ssh/id_mikrotik"),
}
with ConnectHandler(**mikrotik) as mikrotik_connection:
    print(mikrotik_connection.send_command(f'/system/identity/print', cmd_verify=False))

 

Example output:

name: MySwitch01

 

Posted by Uli Köhler in MikroTik, Networking, Python

How to pass string to executable stdin using invoke.run() in Python

In our previous example How to capture stdout to string using invoke.run() in Python we showcased how to use invoke.run(..., out_stream=...) to capture stdout to a StringIO which can then be converted to a string.

Similarly, we can pass a string to the sub-process stdin by first converting it into a StringIO and then using invoke.run(..., in_stream=...).

stdin_str = "abc123" # what we want to pass to stdint

stdin_io = StringIO(stdin_str)
result = invoke.run("myexecutable", in_stream=stdin_in)
# ...
Posted by Uli Köhler in Python

How to capture stdout to string using invoke.run() in Python

This example showcases how to capture stdout to a string instead of printing it to system stdout:

The basic trick is to initialize a StringIO (a file-like object where pyinvoke can write the stdout from the sub-process to) and pass it to the out_stream argument of invoke.run()

import invoke
from io import StringIO

outStream = StringIO()
result = invoke.run("wg genkey", out_stream=outStream)
stdout_str = outStream.getvalue()
print("Result: ", stdout_str)

 

Posted by Uli Köhler in Python

How to list only sub-folders (not files) using Python

Whereas Python’s os.listdir("mydirectory") will list both sub-files and sub-directories, you can use this simple list comprehension:

parent_folder = "myfolder"
sub_directories = [
    filename for filename in os.listdir(parent_folder)
    if os.path.isdir(os.path.join(parent_folder, filename))
]

which will only list sub-directories of the parent_fodler

 

Posted by Uli Köhler in Python

How to matplotlib plt.savefig() to a io.BytesIO buffer

You can just pass a io.BytesIO() to plt.savefig() as first parameter. Note that format defaults to "png", but best practice is to explicitly specify it.

# Save plot to BytesIO
bio = io.BytesIO()
plt.savefig(bio, dpi=250, format="png")

Full example:

#!/usr/bin/env python3
import numpy as np
import io
import matplotlib.pyplot as plt

# Generate data
x = np.linspace(0, 2*np.pi, 100)
y = np.sin(x)

# Plot data
plt.plot(x, y)

# Save plot to BytesIO
bio = io.BytesIO()
plt.savefig(bio, dpi=250, format="png")

# Cleanup plot
plt.close(plt.gcf())
plt.clf()

# Write BytesIO to file
with open("plot.png", "wb") as f:
    f.write(bio.getvalue())

Output plot.png:

 

Posted by Uli Köhler in Python

Customizable Python INI file parser

I recommend using the configparser library which comes with Python for parsing INI files. However, if you need a highly customizable parser, this code is a good place to start:

def read_ini_config(filename):
    config = {}
    current_section = None

    with open(filename, 'r', encoding="utf-8") as file:
        for line in file:
            line = line.strip()

            # Skip empty lines and comments
            if not line or line.startswith(';') or line.startswith('#'):
                continue

            # Check if it's a section header
            if line.startswith('[') and line.endswith(']'):
                current_section = line[1:-1]
                config[current_section] = {}
            else:
                # Parse key-value pairs
                key, value = line.split('=', 1)
                key = key.strip()
                value = value.strip()
                config[current_section][key] = value

    return config

 

Posted by Uli Köhler in Python

How to list all tags of remote git repository using Python

import subprocess

def list_all_tags_for_remote_git_repo(url):
    """
    Given a repository URL, list all tags for that repository
    without cloning it.

    This function use "git ls-remote", so the
    "git" command line program must be available.
    """
    # Run the 'git' command to fetch and list remote tags
    result = subprocess.run([
        "git", "ls-remote", "--tags", repo_url
    ], stdout=subprocess.PIPE, text=True)

    # Process the output to extract tag names
    output_lines = result.stdout.splitlines()
    tags = [
        line.split("refs/tags/")[-1] for line in output_lines
        if "refs/tags/" in line and "^{}" not in line
    ]

    return tags

Usage example:

list_all_tags_for_remote_git_repo("https://github.com/EmbeddedRPC/erpc.git")

Result:

['1.10.0',
 '1.11.0',
 '1.4.0',
 '1.4.1',
 '1.5.0',
 '1.6.0',
 '1.7.0',
 '1.7.1',
 '1.7.2',
 '1.7.3',
 '1.7.4',
 '1.8.0',
 '1.8.1',
 '1.9.0',
 '1.9.1']

 

Posted by Uli Köhler in git, Python

How to fix PlatformIO: ‘Can not find working Python 3.6+ interpreter’ on Linux

Problem:

When trying to open PlatformIO in Visual Studio Code, you see the following error message:

PlatformIO: Can not find working Python 3.6+ Interpreter. Please install the latest Python 3 and restart VSCode

even though you have Python3 already installed.

Solution:

The issue here is not that PlatformIO can’t find Python 3, it is that you don’t have venv (virtual environments) installed for your Python version!

On Ubuntu/Debian, you can install it using

sudo apt -y install python3-venv

Alternatively you can install it using pip:

pip install virtualenv

 

Posted by Uli Köhler in PlatformIO, Python

How to fix pytest ERROR while parsing the following warning configuration: …

Problem:

You want to ignore a warning during a unit test using code such as

@pytest.mark.filterwarnings("divide by zero encountered in log10")
def test_auto_suffix_1d(self):
   # ...

but you see an error message such as

ERROR: while parsing the following warning configuration:

  divide by zero encountered in log10

This error occurred:

invalid action: 'divide by zero encountered in log10'

Solution:

In the argument to @pytest.mark.filterwarnings(...) you forgot the ignore: prefix:

Instead of

@pytest.mark.filterwarnings("divide by zero encountered in log10")

write

@pytest.mark.filterwarnings("ignore: divide by zero encountered in log10")

 

Posted by Uli Köhler in Python

How to ignore warnings in Python unit tests (pytest / tox)

Problem:

You have a unit test case such as this one from UliEngineering

def test_ratio_to_dB_infinite(self):
    self.assertEqual(-np.inf, ratio_to_dB(0))
    self.assertEqual(-np.inf, ratio_to_dB(0, factor=dBFactor.Power))
    self.assertEqual(-np.inf, ratio_to_dB(-5))
    self.assertEqual(-np.inf, ratio_to_dB(-5, factor=dBFactor.Power))

which outputs a warning:

tests/Math/TestDecibel.py::TestDecibel::test_ratio_to_dB_infinite
  /home/uli/UliEngineering/UliEngineering/Math/Decibel.py:29: RuntimeWarning: divide by zero encountered in log10
    return np.log10(v)

Solution:

You can ignore that specific warning for this specific test case by using a pytest annotation:

import pytest

@pytest.mark.filterwarnings("ignore:divide by zero encountered in log10")
def test_ratio_to_dB_infinite(self):
    self.assertEqual(-np.inf, ratio_to_dB(0))
    self.assertEqual(-np.inf, ratio_to_dB(0, factor=dBFactor.Power))
    self.assertEqual(-np.inf, ratio_to_dB(-5))
    self.assertEqual(-np.inf, ratio_to_dB(-5, factor=dBFactor.Power))

Other warnings will still be emitted.

Posted by Uli Köhler in Python

How to group files with the same prefix but different extensions into folders using Python

After, for example, re-encoding your videos, you often have a bunch of files in the same directory named e.g. MyMovie.avi and MyMovie.avi.mkv.

The following script will sort all files with the same prefix (MyMovie) into a directory called like the prefix. This allows easy sorting and comparing of those files.

Other files will not be touched. The operation is performed non-recursively and no files will be overwritten.

Note that the list of file extensions is hardcoded and double extensions such as .avi.mkv are also treated like standard extensions. The extension check is performed using .endswith()

#!/usr/bin/env python3
import argparse
import os
from collections import defaultdict

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("directory", help="path to the directory")
    args = parser.parse_args()

    directory = args.directory

    extensions = [
        ".avi",
        ".mkv",
        ".mp4"
    ]

    # Add extensions such as ".avi.mp4" at the front
    # so we check them first
    extensions = [e1 + e2 for e1 in extensions for e2 in extensions] + extensions

    files_by_prefix = defaultdict(list)
    for file in os.listdir(directory):
        # Split extension
        for potential_extension in extensions:
            if file.endswith(potential_extension):
                file_prefix = file[:-len(potential_extension)]
                files_by_prefix[file_prefix].append(file)

    # Ignore prefixes with only one file
    files_by_prefix = {k: v for k, v in files_by_prefix.items() if len(v) > 1}

    # Create directory for every prefix and move all the files into it
    for prefix, files in files_by_prefix.items():
        os.makedirs(os.path.join(directory, prefix), exist_ok=True)
        # Move file using if destination doesn't exist
        for file in files:
            print(f"{file} -> {prefix}/{file}")
            os.rename(os.path.join(directory, file), os.path.join(directory, prefix, file))


 

Posted by Uli Köhler in Python

How to set exposure time using picamera2

The following code will capture a 640×480 PNG image into Exposure10ms.png, using a fixed exposure of 10ms:

#!/usr/bin/env python3
import time
import picamera2
import numpy as np

with picamera2.Picamera2() as camera:
    camera.set_controls({"ExposureTime": 1000, "AnalogueGain": 1.0})
    camera.start()
    camera.capture_file("Exposure1ms.png")
    camera.stop()

Note that it is important to run camera.start() after camera.set_controls().

Posted by Uli Köhler in Python, Raspberry Pi

How to use picamera2 to capture Raspberry Pi HQ camera (IMX477) high resolution image

The following code will capture a single 4056x3056px image from a Raspberry Pi HQ camera using the IMX477 sensor into either a file or a numpy array. This is the maximum resolution supported by that camera.

Capturing to a file

This will capture a single frame to CameraTest.png. The PNG file will be quite large, around 15-25 Megabytes.

#!/usr/bin/env python3
import time
import picamera2
import numpy as np

with picamera2.Picamera2() as camera:
    # Create high resolution still capture config
    camera_config = camera.create_still_configuration({"size":(4056, 3040)})
    camera.configure(camera_config)
    # Start camera with config
    camera.start()
    # Capture a single frame to CameraTest.png
    camera.capture_file("CameraTest.png")

Capturing to a numpy array

#!/usr/bin/env python3
import time
import picamera2
import numpy as np

with picamera2.Picamera2() as camera:
    # Create high resolution still capture config
    camera_config = camera.create_still_configuration({"size":(4056, 3040)})
    camera.configure(camera_config)
    # Start capture
    camera.start()
    array = camera.capture_array("main")
    # TODO Do something with [array]
    print(array.shape)

Example output:

[0:55:52.878964095] [5768]  INFO Camera camera_manager.cpp:297 libcamera v0.0.5+83-bde9b04f
[0:55:52.913906171] [5769]  INFO RPI vc4.cpp:437 Registered camera /base/soc/i2c0mux/i2c@1/imx477@1a to Unicam device /dev/media3 and ISP device /dev/media0
[0:55:52.913998855] [5769]  INFO RPI pipeline_base.cpp:1101 Using configuration file '/usr/share/libcamera/pipeline/rpi/vc4/rpi_apps.yaml'
[0:55:52.921056014] [5768]  INFO Camera camera.cpp:1033 configuring streams: (0) 4056x3040-BGR888 (1) 4056x3040-SBGGR12_CSI2P
[0:55:52.922090144] [5769]  INFO RPI vc4.cpp:565 Sensor: /base/soc/i2c0mux/i2c@1/imx477@1a - Selected sensor format: 4056x3040-SBGGR12_1X12 - Selected unicam format: 4056x3040-pBCC
(3040, 4056, 3)

Note that the NumPy array is compatible with Python OpenCV functions.

Posted by Uli Köhler in Python, Raspberry Pi