Python script to re-encode all audio files to OPUS recursively

This Python script recursively converts all FLAC, WAV and MP3 etc files recursively in the given input directory to OPUS. The default bitrate is 96k which is roughly equivalent to 192k to 256k MP3. You can select a different

After encoding, the source file and converted file are checked for their length automatically. If the length differs more than 0.2 seconds, the encoding is considered failed.

There is an option --delete to delete the source file (only if the OPUS file has equivalent length to the source file of course). Use with caution and make backups before using, no warranty is expressed or implied for this script – it might lose some of your music!

The script automatically performs parallel encoding, hence it’s pretty fast even if transcoding many files.

No files are overwritten unless the -o/--overwrite is given on the command line.

How to use

First, install ffmpeg and ffprobe. Now install the python binding for ffmpeg using

pip install ffmpeg-python

Here’s the command line options the script provides:

usage: [-h] [--bitrate BITRATE] [--delete] [-o] [-j THREADS] directory

Transcode MP3, FLAC & WAV files to OPUS.

positional arguments:
  directory             Directory containing the files to transcode

  -h, --help            show this help message and exit
  --bitrate BITRATE     Bitrate for the OPUS files
  --delete              Delete original files after successful transcoding
  -o, --overwrite       Overwrite existing OPUS files
  -j THREADS, --threads THREADS
                        Number of parallel transcodes

Source code

#!/usr/bin/env python3
import os
import sys
import argparse
import ffmpeg
from concurrent.futures import ProcessPoolExecutor
import logging

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

def get_file_duration(file_path):
        probe = ffmpeg.probe(file_path)
        audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None)
        return float(audio_stream['duration'])
    except Exception as e:
        logging.error(f"Failed to get duration for {file_path}: {e}")
        return None

def transcode_file(file_path, bitrate, delete_after_transcode, overwrite):
        output_file = file_path.rsplit('.', 1)[0] + '.opus'
        if not overwrite and os.path.exists(output_file):
  "{output_file} exists. Skipping due to no overwrite flag.")

        stream = (
            .output(output_file, ab=bitrate, loglevel="error")

        if overwrite:
            stream = stream.overwrite_output()

        original_duration = get_file_duration(file_path)
        transcoded_duration = get_file_duration(output_file)

        if original_duration is None or transcoded_duration is None:
            logging.error(f"Failed to transcode {file_path}. Could not retrieve file duration.")

        duration_diff = abs(original_duration - transcoded_duration)
        if duration_diff >= 0.2:
            logging.error(f"Transcoding failed for {file_path}. Duration mismatch.")
  "Transcoded {file_path} to {output_file} successfully.")
            if delete_after_transcode:
    except Exception as e:
        logging.error(f"Failed to transcode {file_path}: {e}")

def main():
    parser = argparse.ArgumentParser(description="Transcode MP3, FLAC & WAV files to OPUS.")
    parser.add_argument("directory", type=str, help="Directory containing the files to transcode")
    parser.add_argument("--bitrate", type=str, default="96k", help="Bitrate for the OPUS files")
    parser.add_argument("--delete", action="store_true", help="Delete original files after successful transcoding")
    parser.add_argument("-o", "--overwrite", action="store_true", help="Overwrite existing OPUS files")
    parser.add_argument("-j", "--threads", type=int, default=os.cpu_count(), help="Number of parallel transcodes")

    args = parser.parse_args()

    supported_extensions = ['mp3', 'flac', 'wav', 'm4a']

    # List all files with the supported extensions in the directory recursively
    files_to_transcode = []
    for root, dirs, files in os.walk(
        for file in files:
            if file.split('.')[-1].lower() in supported_extensions:
                files_to_transcode.append(os.path.join(root, file))"Found {len(files_to_transcode)} files to transcode.")

    with ProcessPoolExecutor(max_workers=args.threads) as executor:
        for file_path in files_to_transcode:
            executor.submit(transcode_file, file_path, args.bitrate, args.delete, args.overwrite)

if __name__ == '__main__':