Python script to re-encode all audio files to OPUS recursively
This Python script recursively converts all FLAC, WAV and MP3 etc files recursively in the given input directory to OPUS. The default bitrate is 96k
which is roughly equivalent to 192k
to 256k
MP3. You can select a different
After encoding, the source file and converted file are checked for their lengthautomatically. If the length differs more than 0.2
seconds, the encoding is considered failed.
There is an option --delete
to delete the source file (only if the OPUS file has equivalent length to the source file of course). Use with caution and make backups before using, no warranty is expressed or implied for this script - it might lose some of your music!
The script automatically performs parallel encoding, hence it’s pretty fast even if transcoding many files.
No files are overwritten unless the -o/--overwrite
is given on the command line.
How to use
First, install ffmpeg
and ffprobe
. Now install the python binding for ffmpeg
using
pip install ffmpeg-python
Here’s the command line options the script provides:
usage: ConvertToOPUS.py [-h] [--bitrate BITRATE] [--delete] [-o] [-j THREADS] directory
Transcode MP3, FLAC & WAV files to OPUS.
positional arguments:
directory Directory containing the files to transcode
options:
-h, --help show this help message and exit
--bitrate BITRATE Bitrate for the OPUS files
--delete Delete original files after successful transcoding
-o, --overwrite Overwrite existing OPUS files
-j THREADS, --threads THREADS
Number of parallel transcodes
Source code
#!/usr/bin/env python3
import os
import sys
import argparse
import ffmpeg
from concurrent.futures import ProcessPoolExecutor
import logging
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
def get_file_duration(file_path):
try:
probe = ffmpeg.probe(file_path)
audio_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'audio'), None)
return float(audio_stream['duration'])
except Exception as e:
logging.error(f"Failed to get duration for {file_path}: {e}")
return None
def transcode_file(file_path, bitrate, delete_after_transcode, overwrite):
try:
output_file = file_path.rsplit('.', 1)[0] + '.opus'
if not overwrite and os.path.exists(output_file):
logging.info(f"{output_file} exists. Skipping due to no overwrite flag.")
return
stream = (
ffmpeg
.input(file_path)
.output(output_file, ab=bitrate, loglevel="error")
)
if overwrite:
stream = stream.overwrite_output()
stream.run()
original_duration = get_file_duration(file_path)
transcoded_duration = get_file_duration(output_file)
if original_duration is None or transcoded_duration is None:
logging.error(f"Failed to transcode {file_path}. Could not retrieve file duration.")
os.remove(output_file)
return
duration_diff = abs(original_duration - transcoded_duration)
if duration_diff >= 0.2:
logging.error(f"Transcoding failed for {file_path}. Duration mismatch.")
os.remove(output_file)
else:
logging.info(f"Transcoded {file_path} to {output_file} successfully.")
if delete_after_transcode:
os.remove(file_path)
except Exception as e:
logging.error(f"Failed to transcode {file_path}: {e}")
def main():
parser = argparse.ArgumentParser(description="Transcode MP3, FLAC & WAV files to OPUS.")
parser.add_argument("directory", type=str, help="Directory containing the files to transcode")
parser.add_argument("--bitrate", type=str, default="96k", help="Bitrate for the OPUS files")
parser.add_argument("--delete", action="store_true", help="Delete original files after successful transcoding")
parser.add_argument("-o", "--overwrite", action="store_true", help="Overwrite existing OPUS files")
parser.add_argument("-j", "--threads", type=int, default=os.cpu_count(), help="Number of parallel transcodes")
args = parser.parse_args()
supported_extensions = ['mp3', 'flac', 'wav', 'm4a']
# List all files with the supported extensions in the directory recursively
files_to_transcode = []
for root, dirs, files in os.walk(args.directory):
for file in files:
if file.split('.')[-1].lower() in supported_extensions:
files_to_transcode.append(os.path.join(root, file))
logging.info(f"Found {len(files_to_transcode)} files to transcode.")
with ProcessPoolExecutor(max_workers=args.threads) as executor:
for file_path in files_to_transcode:
executor.submit(transcode_file, file_path, args.bitrate, args.delete, args.overwrite)
if __name__ == '__main__':
main()