Python

How to fix Python unittest __init__() takes 1 positional argument but 2 were given

Problem:

You are trying to run your Python unit tests using the unittest package, but you see this unspecific stack trace:

Traceback (most recent call last):
  File "/usr/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/lib/python3.7/unittest/__main__.py", line 18, in <module>
    main(module=None)
  File "/usr/lib/python3.7/unittest/main.py", line 100, in __init__
    self.parseArgs(argv)
  File "/usr/lib/python3.7/unittest/main.py", line 124, in parseArgs
    self._do_discovery(argv[2:])
  File "/usr/lib/python3.7/unittest/main.py", line 244, in _do_discovery
    self.createTests(from_discovery=True, Loader=Loader)
  File "/usr/lib/python3.7/unittest/main.py", line 154, in createTests
    self.test = loader.discover(self.start, self.pattern, self.top)
  File "/usr/lib/python3.7/unittest/loader.py", line 349, in discover
    tests = list(self._find_tests(start_dir, pattern))
  File "/usr/lib/python3.7/unittest/loader.py", line 414, in _find_tests
    yield from self._find_tests(full_path, pattern, namespace)
  File "/usr/lib/python3.7/unittest/loader.py", line 406, in _find_tests
    full_path, pattern, namespace)
  File "/usr/lib/python3.7/unittest/loader.py", line 460, in _find_test_path
    return self.loadTestsFromModule(module, pattern=pattern), False
  File "/usr/lib/python3.7/unittest/loader.py", line 124, in loadTestsFromModule
    tests.append(self.loadTestsFromTestCase(obj))
  File "/usr/lib/python3.7/unittest/loader.py", line 93, in loadTestsFromTestCase
    loaded_suite = self.suiteClass(map(testCaseClass, testCaseNames))
  File "/usr/lib/python3.7/unittest/suite.py", line 24, in __init__
    self.addTests(tests)
  File "/usr/lib/python3.7/unittest/suite.py", line 57, in addTests
    for test in tests:
TypeError: __init__() takes 1 positional argument but 2 were given

Solution:

You have at least one test case like this one:

class MyTest(unittest.TestCase):
    def __init__(self):
        self.x = 1.0

    def test_stuff(self):
        assert(self.x == 1.0)

Overriding __init__(...) is not possible in this way when using unittest. You need to use setUp() instead.

Usually, just replacing def __init__(self): by def setUp(self): will do the trick. unittests will call setUp() automatically.

Our example will look like this:

class MyTest(unittest.TestCase):
    def setUp(self):
        self.x = 1.0

    def test_stuff(self):
        assert(self.x == 1.0)

If the error still persists, check if you have more testcases overriding the __init__() method.

Posted by Uli Köhler in Python

Two easy ways to download a file using Python requests

requests does not provide a

Option 1: Use requests_download:

First, install requests_download using

sudo pip3 install requests_download

or equivalent.

Now you can use it like this:

from requests_download import download

download(url, filename)

It also has built-in progress bar support:

from requests_download import download, HashTracker, ProgressTracker
from progressbar import DataTransferBar # sudo pip3 install progressbar2

progress = ProgressTracker(DataTransferBar())

download(pdfUrl, filename, trackers=(progress,))

Option 2: Do it yourself:

Use this snippet in your code:

import requests
import shutil

def requests_download_file(url, filename):
    with requests.get(url, stream=True) as response:
        with open(filename, 'wb') as fout:
            shutil.copyfileobj(response.raw, fout)
Posted by Uli Köhler in Python

How to fix landscape-package-reporter: UnicodeDecodeError: ‘utf-8’ codec can’t decode byte

On some servers attached to a landscape instance, I encountered this stacktrace when trying to run sudo landscape-package-reporter:

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/twisted/internet/defer.py", line 653, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
  File "/usr/lib/python3/dist-packages/landscape/client/package/reporter.py", line 92, in <lambda>
    result.addCallback(lambda x: self.request_unknown_hashes())
  File "/usr/lib/python3/dist-packages/landscape/client/package/reporter.py", line 485, in request_unknown_hashes
    self._facade.ensure_channels_reloaded()
  File "/usr/lib/python3/dist-packages/landscape/lib/apt/package/facade.py", line 265, in ensure_channels_reloaded
    self.reload_channels()
  File "/usr/lib/python3/dist-packages/landscape/lib/apt/package/facade.py", line 253, in reload_channels
    version, with_info=False).get_hash()
  File "/usr/lib/python3/dist-packages/landscape/lib/apt/package/facade.py", line 402, in get_package_skeleton
    return build_skeleton_apt(pkg, with_info=with_info, with_unicode=True)
  File "/usr/lib/python3/dist-packages/landscape/lib/apt/package/skeleton.py", line 131, in build_skeleton_apt
    version.record, "Provides", DEB_PROVIDES))
  File "/usr/lib/python3/dist-packages/apt/package.py", line 690, in record
    return Record(self._records.record)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x96 in position 724: invalid start byte

Tracing down the issue, it was related with a misplaced set of Unicode bytes (EB BF BD) in an old veeam version in version 1.0.0.944 of the veeamsnap package in /var/lib/apt/lists/repository.veeam.com_backup_linux_agent_dpkg_debian_public_dists_stable_veeam_binary-amd64_Packages: The Description field contains this text:

[...] Linux � simple [...]

The strange character is the U+FFFD � REPLACEMENT CHARACTER.

You can fix it by deleting this character. It’s just at the end of /var/lib/apt/lists/repository.veeam.com_backup_linux_agent_dpkg_debian_public_dists_stable_veeam_binary-amd64_Packages. However, if there’s an update for that repository, your change will be overwritten.

In order to fix it (my fix is for landscape-client version 18.01-0ubuntu3.5), I added a try: ... except: ... clause to skeleton.py, which will ignore some properties of the package where the issue occurs:

try:
    relations.update(parse_record_field(
        version.record, "Provides", DEB_PROVIDES))
    relations.add((
        DEB_NAME_PROVIDES,
        "%s = %s" % (version.package.name, version.version)))
    relations.update(parse_record_field(
        version.record, "Pre-Depends", DEB_REQUIRES, DEB_OR_REQUIRES))
    relations.update(parse_record_field(
        version.record, "Depends", DEB_REQUIRES, DEB_OR_REQUIRES))

    relations.add((
        DEB_UPGRADES, "%s < %s" % (version.package.name, version.version)))

    relations.update(parse_record_field(
        version.record, "Conflicts", DEB_CONFLICTS))
    relations.update(parse_record_field(
        version.record, "Breaks", DEB_CONFLICTS))
    skeleton.relations = sorted(relations)

    if with_info:
        skeleton.section = version.section
        skeleton.summary = version.summary
        skeleton.description = version.description
        skeleton.size = version.size
        if version.installed_size > 0:
            skeleton.installed_size = version.installed_size
        if with_unicode and not _PY3:
            skeleton.section = skeleton.section.decode("utf-8")
            skeleton.summary = skeleton.summary.decode("utf-8")
            # Avoid double-decoding package descriptions in build_skeleton_apt,
            # which causes an error with newer python-apt (Xenial onwards)
            if not isinstance(skeleton.description, unicode):
                skeleton.description = skeleton.description.decode("utf-8")
    return skeleton
except UnicodeError:
    return skeleton

Replace /usr/lib/python3/dist-packages/landscape/lib/apt/package/skeleton.py by this:

from landscape.lib.hashlib import sha1

import apt_pkg

from twisted.python.compat import unicode, _PY3


PACKAGE   = 1 << 0
PROVIDES  = 1 << 1
REQUIRES  = 1 << 2
UPGRADES  = 1 << 3
CONFLICTS = 1 << 4

DEB_PACKAGE       = 1 << 16 | PACKAGE
DEB_PROVIDES      = 2 << 16 | PROVIDES
DEB_NAME_PROVIDES = 3 << 16 | PROVIDES
DEB_REQUIRES      = 4 << 16 | REQUIRES
DEB_OR_REQUIRES   = 5 << 16 | REQUIRES
DEB_UPGRADES      = 6 << 16 | UPGRADES
DEB_CONFLICTS     = 7 << 16 | CONFLICTS


class PackageTypeError(Exception):
    """Raised when an unsupported package type is passed to build_skeleton."""


class PackageSkeleton(object):

    section = None
    summary = None
    description = None
    size = None
    installed_size = None
    _hash = None

    def __init__(self, type, name, version):
        self.type = type
        self.name = name
        self.version = version
        self.relations = []

    def add_relation(self, type, info):
        self.relations.append((type, info))

    def get_hash(self):
        """Calculate the package hash.

        If C{set_hash} has been used, that hash will be returned and the
        hash won't be the calculated value.
        """
        if self._hash is not None:
            return self._hash
        # We use ascii here as encoding  for backwards compatibility as it was
        # default encoding for conversion from unicode to bytes in Python 2.7.
        package_info = ("[%d %s %s]" % (self.type, self.name, self.version)
                        ).encode("ascii")
        digest = sha1(package_info)
        self.relations.sort()
        for pair in self.relations:
            digest.update(("[%d %s]" % (pair[0], pair[1])
                           ).encode("ascii"))
        return digest.digest()

    def set_hash(self, package_hash):
        """Set the hash to an explicit value.

        This should be used when the hash is previously known and can't
        be calculated from the relations anymore.

        The only use case for this is package resurrection. We're
        planning on getting rid of package resurrection, and this code
        can be removed when that is done.
        """
        self._hash = package_hash


def relation_to_string(relation_tuple):
    """Convert an apt relation to a string representation.

    @param relation_tuple: A tuple, (name, version, relation). version
        and relation can be the empty string, if the relation is on a
        name only.

    Returns something like "name > 1.0"
    """
    name, version, relation_type = relation_tuple
    relation_string = name
    if relation_type:
        relation_string += " %s %s" % (relation_type, version)
    return relation_string


def parse_record_field(record, record_field, relation_type,
                       or_relation_type=None):
    """Parse an apt C{Record} field and return skeleton relations

    @param record: An C{apt.package.Record} instance with package information.
    @param record_field: The name of the record field to parse.
    @param relation_type: The deb relation that can be passed to
        C{skeleton.add_relation()}
    @param or_relation_type: The deb relation that should be used if
        there is more than one value in a relation.
    """
    relations = set()
    values = apt_pkg.parse_depends(record.get(record_field, ""))
    for value in values:
        value_strings = [relation_to_string(relation) for relation in value]
        value_relation_type = relation_type
        if len(value_strings) > 1:
            value_relation_type = or_relation_type
        relation_string = " | ".join(value_strings)
        relations.add((value_relation_type, relation_string))
    return relations


def build_skeleton_apt(version, with_info=False, with_unicode=False):
    """Build a package skeleton from an apt package.

    @param version: An instance of C{apt.package.Version}
    @param with_info: Whether to extract extra information about the
        package, like description, summary, size.
    @param with_unicode: Whether the C{name} and C{version} of the
        skeleton should be unicode strings.
    """
    name, version_string = version.package.name, version.version
    if with_unicode:
        name, version_string = unicode(name), unicode(version_string)
    skeleton = PackageSkeleton(DEB_PACKAGE, name, version_string)
    relations = set()
    try:
        relations.update(parse_record_field(
            version.record, "Provides", DEB_PROVIDES))
        relations.add((
            DEB_NAME_PROVIDES,
            "%s = %s" % (version.package.name, version.version)))
        relations.update(parse_record_field(
            version.record, "Pre-Depends", DEB_REQUIRES, DEB_OR_REQUIRES))
        relations.update(parse_record_field(
            version.record, "Depends", DEB_REQUIRES, DEB_OR_REQUIRES))

        relations.add((
            DEB_UPGRADES, "%s < %s" % (version.package.name, version.version)))

        relations.update(parse_record_field(
            version.record, "Conflicts", DEB_CONFLICTS))
        relations.update(parse_record_field(
            version.record, "Breaks", DEB_CONFLICTS))
        skeleton.relations = sorted(relations)

        if with_info:
            skeleton.section = version.section
            skeleton.summary = version.summary
            skeleton.description = version.description
            skeleton.size = version.size
            if version.installed_size > 0:
                skeleton.installed_size = version.installed_size
            if with_unicode and not _PY3:
                skeleton.section = skeleton.section.decode("utf-8")
                skeleton.summary = skeleton.summary.decode("utf-8")
                # Avoid double-decoding package descriptions in build_skeleton_apt,
                # which causes an error with newer python-apt (Xenial onwards)
                if not isinstance(skeleton.description, unicode):
                    skeleton.description = skeleton.description.decode("utf-8")
        return skeleton
    except UnicodeError:
        return skeleton

After that, you can run sudo landscape-package-reporter again.

Posted by Uli Köhler in Linux, Python

How to auto-set Windows audio balance to a specific L-R difference using Python

When you can’t place your speakers equally far from your ears, you need to adjust the audio balance in order to compensate for the perceived difference in volume.

Windows allows you to compensate the audio volume natively using the system settings – however it has one critical issue: If you ever set your audio volume to zero, your balance settings get lost and you need to click through plenty of dialogs in order to re-configure it.

In our previous post How to set Windows audio balance using Python we showed how tp use the pycaw library to  (see that post for installation instructions etc).

The following Python script can be run to set the audio balance to. It has been designed to keep the mean (i.e. L+R) audio level in dB when adjusting the volume (i.e. it will not change the overall volume and hence avoid blowing out your eardrums) and will not do any adjustment if the balance is already within 0.1 dB.

Set desiredDelta to your desired left-right difference in dB (positive values mean that the left speaker will be louder than the right speaker)!

from ctypes import cast, POINTER
from comtypes import CLSCTX_ALL
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume
import math

# Get default audio device using PyCAW
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(
    IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
volume = cast(interface, POINTER(IAudioEndpointVolume))

# Get current volume of the left channel
currentVolumeLeft = volume.GetChannelVolumeLevel(0)
# Set the volume of the right channel to half of the volume of the left channel
volumeL = volume.GetChannelVolumeLevel(0)
volumeR = volume.GetChannelVolumeLevel(1)
print(f"Before adjustment: L={volumeL:.2f} dB, R={volumeR:.2f} dB")

desiredDelta = 6.0 # Desired delta between L and R. Positive means L is louder!

delta = abs(volumeR - volumeL)
mean = (volumeL + volumeR) / 2.

# Re-configure balance if delta is not 
if abs(delta - desiredDelta) > 0.1:
    # Adjust volume
    volume.SetChannelVolumeLevel(0, mean + desiredDelta/2., None) # Left
    volume.SetChannelVolumeLevel(1, mean - desiredDelta/2., None) # Right
    # Get & print new volume
    volumeL = volume.GetChannelVolumeLevel(0)
    volumeR = volume.GetChannelVolumeLevel(1)
    print(f"After adjustment: L={volumeL:.2f} dB, R={volumeR:.2f} dB")
else:
    print("No adjustment neccessary")

 

Posted by Uli Köhler in Audio, Python, Windows

How to set Windows audio balance using Python

In our previous post we showed how to set the Windows audio volume using pycaw.

First, we install the library using

pip install pycaw

Note: pycaw does not work with WSL (Windows Subsystem for Linux)! You actually need to install it using a Python environment running on Windows. I recommend Anaconda.

In order to set the audio balance, we can use volume.SetChannelVolumeLevel(...):

from ctypes import cast, POINTER
from comtypes import CLSCTX_ALL
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume
import math

# Get default audio device using PyCAW
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(
    IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
volume = cast(interface, POINTER(IAudioEndpointVolume))

# Get current volume of the left channel
currentVolumeLeft = volume.GetChannelVolumeLevel(0)
# Set the volume of the right channel to half of the volume of the left channel
volume.SetChannelVolumeLevel(1, currentVolumeLeft - 6.0, None)
# NOTE: -6.0 dB = half volume !

Note that by convention, the left channel is channel 0 and the right channel is channel 1. Depending on the type of sound card, there might be as few as 1 channel (e.g. a mono headset) or many channels like in a multichannel USB audio interface. use volume.GetChannelCount() to get the number of channels.

Posted by Uli Köhler in Audio, Python, Windows

How to set Windows audio volume using Python

We can use the pycaw library to set the Windows Audio volume using Python.

First, we install the library using

pip install pycaw

Note: pycaw does not work with WSL (Windows Subsystem for Linux)! You actually need to install it using a Python environment running on Windows. I recommend Anaconda.

Now we can set the volume to half the current volume using this script:

from ctypes import cast, POINTER
from comtypes import CLSCTX_ALL
from pycaw.pycaw import AudioUtilities, IAudioEndpointVolume
import math

# Get default audio device using PyCAW
devices = AudioUtilities.GetSpeakers()
interface = devices.Activate(
    IAudioEndpointVolume._iid_, CLSCTX_ALL, None)
volume = cast(interface, POINTER(IAudioEndpointVolume))

# Get current volume 
currentVolumeDb = volume.GetMasterVolumeLevel()
volume.SetMasterVolumeLevel(currentVolumeDb - 6.0, None)
# NOTE: -6.0 dB = half volume !

 

Posted by Uli Köhler in Audio, Python, Windows

How to fix Python ‘ValueError: Namespace GnomeDesktop not available’ on Ubuntu

Problem:

On Ubuntu, you are trying to run a Python script using the gi package and GnomeDesktop but you are seeing this stacktrace:

Traceback (most recent call last):
  File "myscript.py", line 48, in <module>
    gi.require_version('GnomeDesktop', '3.0')
  File "/usr/lib/python3/dist-packages/gi/__init__.py", line 130, in require_version
    raise ValueError('Namespace %s not available' % namespace)
ValueError: Namespace GnomeDesktop not available

Solution

Install gir1.2-gnomedesktop-3.0:

sudo apt -y install gir1.2-gnomedesktop-3.0

and retry running your script.

Posted by Uli Köhler in Linux, Python

Computing the CRC8-ATM CRC in Python

The 8-bit CRC8-ATM polynomial is used in many embedded applications, including Trinamic UART-controlled stepper motor drivers like the TMC2209:

\text{CRC} = x^8 + x^2 + x^1 + x^0

The following code provides an example on how to compute this type of CRC in Python:

def compute_crc8_atm(datagram, initial_value=0):
    crc = initial_value
    # Iterate bytes in data
    for byte in datagram:
        # Iterate bits in byte
        for _ in range(0, 8):
            if (crc >> 7) ^ (byte & 0x01):
                crc = ((crc << 1) ^ 0x07) & 0xFF
            else:
                crc = (crc << 1) & 0xFF
            # Shift to next bit
            byte = byte >> 1
    return crc

This code has been field-verified for the TMC2209.

Posted by Uli Köhler in Algorithms, Embedded, MicroPython, Python

MicroPython ESP32 minimal UART example

This example shows how to use UART on the ESP32 using MicroPython. In this example, we use UART1 which is mapped to pins GPIO9 (RX) and GPIO10 (TX).

from machine import UART
uart = UART(1, 115200) # 1st argument: UART number: Hardware UART #1

# Write
uart.write("test")

# Read
print(uart.read()) # Read as much as possible using

Don’t know how to upload the file to MicroPython so it is automatically run on boot?

Posted by Uli Köhler in Embedded, MicroPython, Python

How to fix ESP32 MicroPython ‘ValueError: pin can only be input’

Problem:

You are trying to initialize an ESP32 pin in MicroPython using

import machine
machine.Pin(34, machine.Pin.OUT)

but you see an error message like

>>> machine.Pin(34, machine.Pin.OUT)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: pin can only be input

Solution:

On the ESP32, pins with numbers >= 34 are input-only pins!

You need to use other pins < 34 if you need output capability!

For reference, see the relevant MicroPython source code section:

// configure mode
if (args[ARG_mode].u_obj != mp_const_none) {
    mp_int_t pin_io_mode = mp_obj_get_int(args[ARG_mode].u_obj);
    if (self->id >= 34 && (pin_io_mode & GPIO_MODE_DEF_OUTPUT)) {
        mp_raise_ValueError("pin can only be input");
    } else {
        gpio_set_direction(self->id, pin_io_mode);
    }
}
Posted by Uli Köhler in Embedded, MicroPython, Python

How to upload files to MicroPython over USB/serial

In this post we will investigate how to connect to a wireless network on boot

First, install ampy – a tool to modify the MicroPython filesystem over a serial connection.

sudo pip3 install adafruit-ampy

Now prepare your script – we’ll use main.py in this example.

Upload the file to the board:

ampy -p /dev/ttyUSB* put main.py

This only takes about 2-4 seconds. In case ampy is still running after 10 seconds, you might need to

  • Stop ampy (Ctrl+C), reset the board using the RESET button and retry the command
  • Stop ampy (Ctrl+C). Detach USB and ensure the board is powered off (and not powered externally). Re-Attach USB and retry the command.
  • In case that doesn’t help, try re-flashing your board with the most recent version of MicroPython. See How to flash MicroPython to your ESP32 board in 30 seconds. This will also clear the internal filesystem and hence remove any file that might cause failure to boot properly.
Posted by Uli Köhler in Embedded, MicroPython, Python

MicroPython ESP32 blink example

This MicroPython code blinks GPIO2 which is connected to the LED on most ESP32 boards.

import machine
import time
led = machine.Pin(2, machine.Pin.OUT)
while True:
    led.value(1)
    time.sleep(1)
    led.value(0)
    time.sleep(1)

Don’t know how to upload the file to MicroPython so it is automatically run on boot?

Posted by Uli Köhler in Embedded, MicroPython, Python

How to run WebREPL without webrepl_setup in MicroPython

Problem:

You want to enable WebREPL on your MicroPython board using

import webrepl
webrepl.start()

but it is only showing this error message:

WebREPL is not configured, run 'import webrepl_setup'

However, you want to configure WebREPL programmatically instead of manually running it on every single board.

Solution:

Use

import webrepl
webrepl.start(password="Rua8ohje")

This will circumvent webrepl_setup completely and is compatible with an automated setup process.

Note: At the time of writing this you can only use passwords with 8 characters max! (see How to fix MicroPython WebREPL ValueError in File &#8222;webrepl.py&#8220;, line 72, in start )

Posted by Uli Köhler in Embedded, MicroPython, Python

How to fix MicroPython WebREPL ValueError in File “webrepl.py”, line 72, in start

Problem:

You want to configure your MicroPython WebREPL programmatically using webrepl.start(password="...") but you see a stacktrace like

>>> webrepl.start(password="Rua8ohjedo")
Traceback (most recent call last):
 File "<stdin>", line 1, in <module>
 File "webrepl.py", line 72, in start
ValueError:

Solution:

Use a shorter password with 8 characters max:

webrepl.start(password="Rua8ohje")

 

Posted by Uli Köhler in Embedded, MicroPython, Python

How to autoconnect to Wifi using MicroPython on your ESP32 board

In this post we will investigate how to connect to a wireless network on boot

First, install ampy – a tool to modify the MicroPython filesystem over a serial connection.

sudo pip3 install adafruit-ampy

Now download main.py and save it in your current working directory and insert your wifi credentials:

import network
station = network.WLAN(network.STA_IF)
station.active(True)
station.connect("YourWifiName", "EnterYourWifiPasswordHere")

Upload the file to the board:

ampy -p /dev/ttyUSB* put main.py

In case ampy shows no output within 5 seconds, try resetting the board, waiting for 5-10 seconds and retrying the upload using ampy

Note: You can list the files on the board’s filesystem using

ampy -p /dev/ttyUSB0 ls

You can verify the content of main.py using

ampy -p /dev/ttyUSB0 get main.py
Posted by Uli Köhler in Embedded, MicroPython, Python

How to read IDF diabetes statistics in Python using Pandas

The International Diabetes Foundation provides a Data portal with various statistics related to diabetes.

In this post we’ll show how to read the Diabetes estimates (20-79 y) / People with diabetes, in 1,000s data export in CSV format using pandas.

First download IDF (people-with-diabetes--in-1-000s).csv from the data page.

Now we can parse the CSV file:

import pandas as pd

# Download at https://www.diabetesatlas.org/data/en/indicators/1/
df = pd.read_csv("IDF (people-with-diabetes--in-1-000s).csv")
# Parse year columns to obtain floats and multiply by thousands factor. Pandas fails to parse values like "12,345.67"
for column in df.columns:
    try:
        int(column)
        df[column] = df[column].apply(lambda s: None if s == "-" else float(s.replace(",", "")) * 1000)
    except:
        pass

As you can see in the postprocessing step, the number of diabetes patients are given in 1000s in the CSV, so we multiply them by 1000 to obtain the actual numbers.

If you want to modify the data columns (i.e. the columns referring to year), you can use this simple template:

for column in df.columns:
    try:
        int(column) # Will raise ValueError() if column is not a year number
        # Whatever you do here will only be applied to year columns
        df[column] = df[column] * 0.75 # Example on how to modify a column
        # But note that if your code raises an Exception, it will be ignored!
    except:
        pass

Let’s plot some data:

regions = df[df["Type"] == "Region"] # Only regions, not individual countries

from matplotlib import pyplot as plt
plt.style.use("ggplot")
plt.gcf().set_size_inches(20,4)
plt.ylabel("Diabetes patients [millions]")
plt.xlabel("Region")
plt.title("Diabetes patients in 2019 by region")
plt.bar(regions["Country/Territory"], regions["2019"] / 1e6)

Note that if you use a more recent dataset than the version I’m using the 2019 column might not exist in your CSV file. Choose an appropriate column in that case.

Posted by Uli Köhler in Bioinformatics, pandas, Python

Parsing World Population Prospects (WPP) XLSX data in Python

The United Nations provides the Word Population Prospects (WPP) dataset on geographic and age distribution of mankind as downloadable XLSX files.

Reading these files in Python is rather easy. First we have to find out how many rows to skip. For the 2019 WPP dataset this value is 16 since row 17 contains all the column headers. The number of rows to skip might be different depending on the dataset. We’re using WPP2019_POP_F07_1_POPULATION_BY_AGE_BOTH_SEXES.xlsx in this example.

We can use Pandas read_excel() function to import the dataset in Python:

import pandas as pd

df = pd.read_excel("WPP2019_INT_F03_1_POPULATION_BY_AGE_ANNUAL_BOTH_SEXES.xlsx", skiprows=16, na_values=["..."])

This will take a few seconds until the large dataset has been processed. Now we can check if skiprows=16 is the correct value. It is correct if pandas did recognize the column names correctly:

>>> df.columns
Index(['Index', 'Variant', 'Region, subregion, country or area *', 'Notes',
       'Country code', 'Type', 'Parent code', 'Reference date (as of 1 July)',
       '0-4', '5-9', '10-14', '15-19', '20-24', '25-29', '30-34', '35-39',
       '40-44', '45-49', '50-54', '55-59', '60-64', '65-69', '70-74', '75-79',
       '80-84', '85-89', '90-94', '95-99', '100+'],
      dtype='object')

Now let’s filter for a country:

russia = df[df["Region, subregion, country or area *"] == 'Russian Federation']

This will show us the population data for multiple years in 5-year intervals from 1950 to 2020. Now let’s filter for the most recent year:

russia.loc[russia["Reference date (as of 1 July)"].idxmax()]

This will show us a single dataset:

Index                                                 3255
Variant                                          Estimates
Region, subregion, country or area *    Russian Federation
Notes                                                  NaN
Country code                                           643
Type                                          Country/Area
Parent code                                            923
Reference date (as of 1 July)                         2020
0-4                                                9271.69
5-9                                                9350.92
10-14                                              8174.26
15-19                                              7081.77
20-24                                               6614.7
25-29                                              8993.09
30-34                                              12543.8
35-39                                              11924.7
40-44                                              10604.6
45-49                                              9770.68
50-54                                              8479.65
55-59                                                10418
60-64                                              10073.6
65-69                                              8427.75
70-74                                              5390.38
75-79                                              3159.34
80-84                                              3485.78
85-89                                              1389.64
90-94                                              668.338
95-99                                              102.243
100+                                                 9.407
Name: 3254, dtype: object
​

How can we plot that data? First, we need to select all the columns that contain age data. We’ll do this by manually inserting the name of the first such column (0-4) into the following code and assuming that there are no columns after the last age column:

>>> df.columns[df.columns.get_loc("0-4"):]
Index(['0-4', '5-9', '10-14', '15-19', '20-24', '25-29', '30-34', '35-39',
       '40-44', '45-49', '50-54', '55-59', '60-64', '65-69', '70-74', '75-79',
       '80-84', '85-89', '90-94', '95-99', '100+'],
      dtype='object')

Now let’s select those columns from the russia dataset:

most_recent_russia = russia.loc[russia["Reference date (as of 1 July)"].idxmax()]
age_columns = df.columns[df.columns.get_loc("0-4"):]

russian_age_data = most_recent_russia[age_columns]

Let’s have a look at the dataset:

>>> russian_age_data
0-4      9271.69
5-9      9350.92
10-14    8174.26
15-19    7081.77
20-24     6614.7
25-29    8993.09
30-34    12543.8
35-39    11924.7
40-44    10604.6
45-49    9770.68
50-54    8479.65
55-59      10418
60-64    10073.6
65-69    8427.75
70-74    5390.38
75-79    3159.34
80-84    3485.78
85-89    1389.64
90-94    668.338
95-99    102.243
100+       9.407

That looks useable, note however that the values are in thousands, i.e. we have to multiply the values by 1000 to obtain the actual estimates of the population. Let’s plot it:

from matplotlib import pyplot as plt
plt.style.use("ggplot")

plt.title("Age composition of the Russian population (2020)")
plt.ylabel("People in age group [Millions]")
plt.xlabel("Age group")
plt.gcf().set_size_inches(15,5)
# Data is given in thousands => divide by 1000 to obtain millions
plt.plot(russian_age_data.index, russian_age_data.as_matrix() / 1000., lw=3)

The finished plot will look like this:

Here’s our finished script:

#!/usr/bin/env python3
import pandas as pd
df = pd.read_excel("WPP2019_POP_F07_1_POPULATION_BY_AGE_BOTH_SEXES.xlsx", skiprows=16)
# Filter only russia
russia = df[df["Region, subregion, country or area *"] == 'Russian Federation']

# Filter only most recent estimate (1 row)
most_recent_russia = russia.loc[russia["Reference date (as of 1 July)"].idxmax()]
# Retain only value columns
age_columns = df.columns[df.columns.get_loc("0-4"):]
russian_age_data = most_recent_russia[age_columns]

# Plot!
from matplotlib import pyplot as plt
plt.style.use("ggplot")

plt.title("Age composition of the Russian population (2020)")
plt.ylabel("People in age group [Millions]")
plt.xlabel("Age group")
plt.gcf().set_size_inches(15,5)
# Data is given in thousands => divide by 1000 to obtain millions
plt.plot(russian_age_data.index, russian_age_data.as_matrix() / 1000., lw=3)

# Export as SVG
plt.savefig("russian-demographics.svg")

 

 

Posted by Uli Köhler in Bioinformatics, Data science, pandas, Python

How to save Matplotlib plot to string as SVG

You can use StringIO to save a Matplotlib plot to a string without saving it to an intermediary file:

from matplotlib import pyplot as plt
plt.plot([0, 1], [2, 3]) # Just a minimal showcase

# Save plot to StringIO
from io import StringIO
i = StringIO()
plt.savefig(i, format="svg")

# How to access the string
print(i.getvalue())

Note that unless you save to a file you need to set the format=... parameter when calling plt.savefig(). If saving to a file, Matplotlib will try to derive the format from the filename extension (like .svg)

Posted by Uli Köhler in Python

How to map MeSH ID to MeSH term using Python

Our MeSH-JSON project provides a pre-compiled MeSH-ID-to-term map as JSON:

Download here or use this command to download:

wget "http://techoverflow.net/downloads/mesh.json.gz"

 

How to use in Python:

#!/usr/bin/env python3
import json
import gzip

# How to load
with gzip.open("mesh.json.gz", "rb") as infile:
    mesh_id_to_term = json.load(infile)

# Usage example
print(mesh_id_to_term["D059630"]) # Prints 'Mesenchymal Stem Cells'

 

Posted by Uli Köhler in Bioinformatics, Python

How to parse all PubMed baseline files in parallel using Python

In our previous post How to parse PubMed baseline data using Python we investigate how to use the pubmed_parser library to parse PubMed medline data using Python.

In this follow-up we’ll provide an example of how to use glob to select all PubMed baseline files in a directory and use concurrent.futures with tqdm to provide a convenient yet easy-to-use process parallelism using ProcessPoolExecutor and a progress bar UI for the command line.

First, install the requirements using

pip install git+git://github.com/titipata/pubmed_parser.git six numpy tqdm

Now download this script, ensure some files like pubmed20n0002.xml.gz or pubmed20n0004.xml.gz are in the same directory and run it:

#!/usr/bin/env python3
import pubmed_parser as pp
import glob
import os
from collections import Counter
import concurrent.futures
from tqdm import tqdm

# Source: https://techoverflow.net/2017/05/18/how-to-use-concurrent-futures-map-with-a-tqdm-progress-bar/
def tqdm_parallel_map(executor, fn, *iterables, **kwargs):
    """
    Equivalent to executor.map(fn, *iterables),
    but displays a tqdm-based progress bar.
    
    Does not support timeout or chunksize as executor.submit is used internally
    
    **kwargs is passed to tqdm.
    """
    futures_list = []
    for iterable in iterables:
        futures_list += [executor.submit(fn, i) for i in iterable]
    for f in tqdm(concurrent.futures.as_completed(futures_list), total=len(futures_list), **kwargs):
        yield f.result()

def parse_and_process_file(filename):
    """
    This function contains our parsing code. Usually, you would only modify this function.
    """
    # Don't parse authors and references for this example, since we don't need it
    dat = pp.parse_medline_xml(filename, author_list=False, reference_list=False)

    # For this example, we'll build a set of count of all MeSH IDs in this file
    ctr = Counter()
    for entry in dat:
        terms = [term.partition(":")[0].strip() for term in entry["mesh_terms"].split(";")]
        for term in terms:
            ctr[term] += 1
    return filename, ctr


if __name__ == "__main__":
    # Find all pubmed files in the current directory
    all_filenames = glob.glob("pubmed*.xml.gz")
    # For some workloads you might want to use a ThreadPoolExecutor,
    # but a ProcessPoolExecutor is a good default
    executor = concurrent.futures.ProcessPoolExecutor(os.cpu_count())
    # Iterate results as they come in (the order is not the same as in the input!)
    for filename, ctr in tqdm_parallel_map(executor, parse_and_process_file, all_filenames):
        # NOTE: If you print() here, this might interfere with the progress bar,
        # but we accept that here since it's just an example
        print(filename, ctr)

Now you can start modifying the example, most notably the parse_and_process_file() function to do whatever processing you intend to do.

Posted by Uli Köhler in Bioinformatics, Python