How to parse all PubMed baseline files in parallel using Python

In our previous post How to parse PubMed baseline data using Python we investigate how to use the pubmed_parser library to parse PubMed medline data using Python.

In this follow-up we’ll provide an example of how to use glob to select all PubMed baseline files in a directory and use concurrent.futures with tqdm to provide a convenient yet easy-to-use process parallelism using ProcessPoolExecutor and a progress bar UI for the command line.

First, install the requirements using

pip install git+git:// six numpy tqdm

Now download this script, ensure some files like pubmed20n0002.xml.gz or pubmed20n0004.xml.gz are in the same directory and run it:

#!/usr/bin/env python3
import pubmed_parser as pp
import glob
import os
from collections import Counter
import concurrent.futures
from tqdm import tqdm

# Source:
def tqdm_parallel_map(executor, fn, *iterables, **kwargs):
    Equivalent to, *iterables),
    but displays a tqdm-based progress bar.
    Does not support timeout or chunksize as executor.submit is used internally
    **kwargs is passed to tqdm.
    futures_list = []
    for iterable in iterables:
        futures_list += [executor.submit(fn, i) for i in iterable]
    for f in tqdm(concurrent.futures.as_completed(futures_list), total=len(futures_list), **kwargs):
        yield f.result()

def parse_and_process_file(filename):
    This function contains our parsing code. Usually, you would only modify this function.
    # Don't parse authors and references for this example, since we don't need it
    dat = pp.parse_medline_xml(filename, author_list=False, reference_list=False)

    # For this example, we'll build a set of count of all MeSH IDs in this file
    ctr = Counter()
    for entry in dat:
        terms = [term.partition(":")[0].strip() for term in entry["mesh_terms"].split(";")]
        for term in terms:
            ctr[term] += 1
    return filename, ctr

if __name__ == "__main__":
    # Find all pubmed files in the current directory
    all_filenames = glob.glob("pubmed*.xml.gz")
    # For some workloads you might want to use a ThreadPoolExecutor,
    # but a ProcessPoolExecutor is a good default
    executor = concurrent.futures.ProcessPoolExecutor(os.cpu_count())
    # Iterate results as they come in (the order is not the same as in the input!)
    for filename, ctr in tqdm_parallel_map(executor, parse_and_process_file, all_filenames):
        # NOTE: If you print() here, this might interfere with the progress bar,
        # but we accept that here since it's just an example
        print(filename, ctr)

Now you can start modifying the example, most notably the parse_and_process_file() function to do whatever processing you intend to do.