Python

How to fix Python bottle Unsupported response type: <class ‘dict’>

Problem:

You are running a Python HTTP server using bottle. When you access your HTTP server endpoint, you see a HTTP 500 error message like

Unsupported response type: <class 'dict'>

Solution:

This occurs because you are trying to return a Python list of dictionaries, for example in

from bottle import route, run, template, response

@route('/')
def index():
    # We expect bottle to return a JSON here
    # but that doesn't happen!
    return [{"a": "b"}]

run(host='localhost', port=8080)

In order to work around this behaviour, you need to set response.content_type and explicitly use json.dumps() to convert your JSON into a string:

from bottle import route, run, template, response
import json

@route('/')
def index():
    response.content_type = 'application/json'
    return json.dumps([{"a": "b"}])

run(host='localhost', port=8080)

 

Posted by Uli Köhler in Python

How to fix ElasticSearch ‘Root mapping definition has unsupported parameters’

Problem:

You want to create an ElasticSearch index with a custom mapping or update the mapping of an existing ElasticSearch index but you see an error message like

elasticsearch.exceptions.RequestError: RequestError(400, 'mapper_parsing_exception', 'Root mapping definition has unsupported parameters:  [mappings : {properties={num_total={type=integer}, approved={type=integer}, num_translated={type=integer}, pattern_length={type=integer}, num_unapproved={type=integer}, pattern={type=keyword}, num_approved={type=integer}, translated={type=integer}, untranslated={type=integer}, num_untranslated={type=integer}, group={type=keyword}}}]')

Solution:

This can point to multiple issues. Essentially, ElasticSearch is trying to tell you that the structure of your JSON is not correct.

Often this error is misinterpreted as individual field definitions being wrong, but this is rarely the issue (and only if an individual field definition is completely malformed).

If your message is structured like

... unsupported parameters:  [mappings : ...

then the most likely root cause is that you have mappings nested inside mappings in your JSON. This also applies if you update a mapping (put_mapping) – in this case the outer mapping is implicit!

Example: Your code looks like this:

es.indices.put_mapping(index='my_index, doc_type='_doc', body={
    "mappings": {
        "properties": {
            "pattern": {
                "type":  "keyword"
            }
        }
    }
})

ElasticSearch will internally create a JSON like this internally:

{
    "mappings": {
        "mappings": {
            "properties": {
                "pattern": {
                    "type":  "keyword"
                }
            }
        }
    }
}

See that there are two mappings inside each other? ElasticSearch does not view this as a correctly structured JSON, therefore you need to remove the "mapping": {...} from your code, resulting in

es.indices.put_mapping(index='my_index, doc_type='_doc', body={
    "properties": {
        "pattern": {
            "type":  "keyword"
        }
    }
})
Posted by Uli Köhler in Databases, ElasticSearch, Python

How to create draft email on IMAP server using Python

Use this Python script to create a draft email on your IMAP server. The email is not sent automatically but only stored in your draft folder.

#!/usr/bin/env python3
import imaplib
import ssl
import email.message
import email.charset
import time

tls_context = ssl.create_default_context()

server = imaplib.IMAP4('imap.mydomain.com')
server.starttls(ssl_context=tls_context)
server.login('email@mydomain.com', 'password')
# Select mailbox
server.select("INBOX.Drafts")
# Create message
new_message = email.message.Message()
new_message["From"] = "Your name <sender@mydomain.com>"
new_message["To"] = "Name of Recipient <recpient@mydomain.com>"
new_message["Subject"] = "Your subject"
new_message.set_payload("""
This is your message.
It can have multiple lines and
contain special characters: äöü.
""")
# Fix special characters by setting the same encoding we'll use later to encode the message
new_message.set_charset(email.charset.Charset("utf-8"))
encoded_message = str(new_message).encode("utf-8")
server.append('INBOX.Drafts', '', imaplib.Time2Internaldate(time.time()), encoded_message)
# Cleanup
server.close()

Also see Minimal Python IMAP over TLS example

Posted by Uli Köhler in Python

Minimal Python IMAP over TLS example

Note: Under some circumstances you might want to consider using IMAP over SSL instead. See Minimal Python IMAP over SSL example

This example code will login to the server, start a TLS session, list the mailboxes and logout immediately.

#!/usr/bin/env python3
import imaplib
import ssl

# Load system's trusted SSL certificates
tls_context = ssl.create_default_context()

# Connect (unencrypted at first)
server = imaplib.IMAP4('imap.mydomain.com')
# Start TLS encryption. Will fail if TLS session can't be established
server.starttls(ssl_context=tls_context)
# Login. ONLY DO THIS AFTER server.starttls() !!
server.login('email@mydomain.com', 'password')
# Print list of mailboxes on server
code, mailboxes = server.list()
for mailbox in mailboxes:
    print(mailbox.decode("utf-8"))
# Select mailbox
server.select("INBOX")
# Cleanup
server.close()

Remember to replace:

  • imap.mydomain.com with the domain name or IP address of your IMAP server
  • email@mydomain.com by the email address you want to login with
  • password by the password you want to login with

You need to absolutely ensure that you run server.starttls(...) first and only afterwards do server.login(...). If you fail to do so, eavesdroppers might be able to read your username and password which is not encrypted!

When running this script, a successful output might look like this:

(\HasChildren) "." INBOX
(\HasNoChildren) "." INBOX.Spam
(\HasNoChildren) "." INBOX.Drafts
(\HasNoChildren) "." INBOX.Sent
(\HasNoChildren) "." INBOX.Trash

If your credentials don’t work you’ll see an error message like this:

Traceback (most recent call last):
  File "./imaptest.py", line 5, in <module>
    server.login('email@domain.com', 'mypassword')
  File "/usr/lib/python3.6/imaplib.py", line 598, in login
    raise self.error(dat[-1])
imaplib.error: b'[AUTHENTICATIONFAILED] Authentication failed.'

Note that in order to be able to server.close() the connection, it’s required that you server.select() a mailbox first ; this is why we can’t just omit the server.select("INBOX") line even though we don’t actually do anything with the mailbox. See this post for a more concise example on this behaviour.

Posted by Uli Köhler in Python

How to fix Python IMAP ‘command CLOSE illegal in state AUTH, only allowed in states SELECTED

Problem:

You have IMAP code in Python similar to

server = imaplib.IMAP4_SSL('imap.mydomain.com')
server.login('email@mydomain.com', 'password')
# ...
# Cleanup
server.close()

but when you run it, server.close() fails with an error message like

Traceback (most recent call last):
  File "./imaptest.py", line 13, in <module>
    server.close()
  File "/usr/lib/python3.6/imaplib.py", line 461, in close
    typ, dat = self._simple_command('CLOSE')
  File "/usr/lib/python3.6/imaplib.py", line 1196, in _simple_command
    return self._command_complete(name, self._command(name, *args))
  File "/usr/lib/python3.6/imaplib.py", line 944, in _command
    ', '.join(Commands[name])))
imaplib.error: command CLOSE illegal in state AUTH, only allowed in states SELECTED

Solution:

Prior to server.close(), you must run server.select() at least once. If in doubt, just server.select("INBOX")because this will always work.

Insert this line before server.close():

server.select("INBOX")

It should look like this:

server = imaplib.IMAP4_SSL('imap.mydomain.com')
server.login('email@mydomain.com', 'password')
# ...
# Cleanup
server.select("INBOX")
server.close()

For a complete example see Minimal Python IMAP over SSL example

 

Posted by Uli Köhler in Python

Minimal Python IMAP over SSL example

Note: Consider using IMAP with TLS instead of IMAP over SSL. See Minimal Python IMAP over TLS example.

This example code will login to the server using port 993 (IMAP over SSL), list the mailboxes and logout immediately.

#!/usr/bin/env python3
import imaplib

server = imaplib.IMAP4_SSL('imap.mydomain.com')
server.login('email@mydomain.com', 'password')
# Print list of mailboxes on server
code, mailboxes = server.list()
for mailbox in mailboxes:
    print(mailbox.decode("utf-8"))
# Select mailbox
server.select("INBOX")
# Cleanup
server.close()

Remember to replace:

  • imap.mydomain.com with the domain name or IP address of your IMAP server
  • email@mydomain.com by the email address you want to login with
  • password by the password you want to login with

When running this script, a successful output might look like this:

(\HasChildren) "." INBOX
(\HasNoChildren) "." INBOX.Spam
(\HasNoChildren) "." INBOX.Drafts
(\HasNoChildren) "." INBOX.Sent
(\HasNoChildren) "." INBOX.Trash

If your credentials don’t work you’ll see an error message like this:

Traceback (most recent call last):
  File "./imaptest.py", line 5, in <module>
    server.login('email@domain.com', 'mypassword')
  File "/usr/lib/python3.6/imaplib.py", line 598, in login
    raise self.error(dat[-1])
imaplib.error: b'[AUTHENTICATIONFAILED] Authentication failed.'

Note that in order to be able to server.close() the connection, it’s required that you server.select() a mailbox first ; this is why we can’t just omit the server.select("INBOX") line even though we don’t actually do anything with the mailbox. See this post for a more concise example on this behaviour.

Posted by Uli Köhler in Python

How to upload your Python package to PyPI in 30 seconds

Prerequisite: Install twine:

sudo pip3 install twine

Before the next step, ensure you have no uncommitted files, because those will be deleted!

Also, ensure that your package is ready for release. Ensure that you have the correct version listed in setup.py

sudo chown -R $USER: .
git clean -xdf
python3 setup.py sdist
twine upload dist/*

Variant if you don’t have python3:

sudo chown -R $USER: .
git clean -xdf
python setup.py sdist
twine upload dist/*

For more detailed instructions see this post.

Detailed explanation of the commands:

  • sudo chown -R $USER: . Fix permission issues possibly introduced by sudo python3 setup.py install
  • git clean -xdf Remove uncommitted files and other fuzz
  • python3 setup.py sdist Build source package
  • twine upload dist/* This will ask you for user PyPI username and password and then upload the package.
Posted by Uli Köhler in Linux, Python

ElasticSearch equivalent to MongoDB .distinct(…)

Let’s say we have an ElasticSearch index called strings with a field pattern of {"type": "keyword"}.

Now we want to do the equivalent of MongoDB db.getCollection('...').distinct('pattern'):

Solution:

In Python you can use the iterate_distinct_field() helper from this previous post on ElasticSearch distinct. Full example:

from elasticsearch import Elasticsearch

es = Elasticsearch()

def iterate_distinct_field(es, fieldname, pagesize=250, **kwargs):
    """
    Helper to get all distinct values from ElasticSearch
    (ordered by number of occurrences)
    """
    compositeQuery = {
        "size": pagesize,
        "sources": [{
                fieldname: {
                    "terms": {
                        "field": fieldname
                    }
                }
            }
        ]
    }
    # Iterate over pages
    while True:
        result = es.search(**kwargs, body={
            "aggs": {
                "values": {
                    "composite": compositeQuery
                }
            }
        })
        # Yield each bucket
        for aggregation in result["aggregations"]["values"]["buckets"]:
            yield aggregation
        # Set "after" field
        if "after_key" in result["aggregations"]["values"]:
            compositeQuery["after"] = \
                result["aggregations"]["values"]["after_key"]
        else: # Finished!
            break

# Usage example
for result in iterate_distinct_field(es, fieldname="pattern.keyword", index="strings"):
    print(result) # e.g. {'key': {'pattern': 'mypattern'}, 'doc_count': 315}
Posted by Uli Köhler in Databases, ElasticSearch, Python

How to query distinct field values in ElasticSearch

Let’s say we have an ElasticSearch index called strings with a field pattern of {"type": "keyword"}.

Get the top N values of the column

If we want to get the top N ( 12 in our example) entries, i.e. the patterns that are present in the most documents, we can use this query:

{
    "aggs" : {
        "patterns" : {
            "terms" : {
                "field" : "pattern.keyword",
                "size": 12
            }
        }
    }
}

Full example in Python:

from elasticsearch import Elasticsearch

es = Elasticsearch()

result = es.search(index="strings", body={
    "aggs" : {
        "patterns" : {
            "terms" : {
                "field" : "pattern.keyword",
                "size": 12
            }
        }
    }
})
for aggregation in result["aggregations"]["patterns"]["buckets"]:
    print(aggregation) # e.g. {'key': 'mypattern, 'doc_count': 2802}

See the terms aggregation documentation for more infos.

Get all the distinct values of the column

Getting all the values is slightly more complicated since we need to use a composite aggregation that returns an after_key to paginate the query.

This Python helper function will automatically paginate the query with configurable page size:

from elasticsearch import Elasticsearch

es = Elasticsearch()

def iterate_distinct_field(es, fieldname, pagesize=250, **kwargs):
    """
    Helper to get all distinct values from ElasticSearch
    (ordered by number of occurrences)
    """
    compositeQuery = {
        "size": pagesize,
        "sources": [{
                fieldname: {
                    "terms": {
                        "field": fieldname
                    }
                }
            }
        ]
    }
    # Iterate over pages
    while True:
        result = es.search(**kwargs, body={
            "aggs": {
                "values": {
                    "composite": compositeQuery
                }
            }
        })
        # Yield each bucket
        for aggregation in result["aggregations"]["values"]["buckets"]:
            yield aggregation
        # Set "after" field
        if "after_key" in result["aggregations"]["values"]:
            compositeQuery["after"] = \
                result["aggregations"]["values"]["after_key"]
        else: # Finished!
            break

# Usage example
for result in iterate_distinct_field(es, fieldname="pattern.keyword", index="strings"):
    print(result) # e.g. {'key': {'pattern': 'mypattern'}, 'doc_count': 315}
Posted by Uli Köhler in Databases, ElasticSearch, Python

How to fix ModuleNotFoundError: No module named ‘grpc’ in Python

Problem:

You want to run a Python script that is using some Google Cloud services. However you see an error message similar to this:

[...]
  File "/usr/local/lib/python3.6/dist-packages/google/api_core/gapic_v1/__init__.py", line 16, in <module>
    from google.api_core.gapic_v1 import config
  File "/usr/local/lib/python3.6/dist-packages/google/api_core/gapic_v1/config.py", line 23, in <module>
    import grpc
ModuleNotFoundError: No module named 'grpc'

Solution:

Install the grpcio Python module:

sudo pip3 install grpcio

or, for Python 2.x

sudo pip install grpcio
Posted by Uli Köhler in Cloud, Linux, Python

How to fix MicroPython I2C no data

Problem:

You’ve configured MicroPython’s I2C similar to this (in my case on the ESP8266 but this applies to many MCUs):

i2c = machine.I2C(-1, machine.Pin(5), machine.Pin(4))

but you can’t find any devices on the bus:

>>> i2c.scan()
[]

Solution:

Likely you forgot to configure the pins as pullups. I2C needs pullups to work, and many MCUs (like the ESP8266) provide support for integrated (weak) pull-ups.

p4 = machine.Pin(4, mode=machine.Pin.OUT, pull=machine.Pin.PULL_UP)
p5 = machine.Pin(5, mode=machine.Pin.OUT, pull=machine.Pin.PULL_UP)
i2c = machine.I2C(-1, p5, p4)

i2c.scan() # [47]

You can also verify this by checking with a multimeter or an oscilloscope: When no communication is going on on the I2C bus, the voltage should be equivalent to the supply voltage of your MCU (usually 3.3V or 5V – 0V indicates a missing pullup or some other error).

Posted by Uli Köhler in Electronics, MicroPython, Python

How to fix MicroPython ‘ValueError: invalid I2C peripheral’

If you see the error message

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
ValueError: invalid I2C peripheral

you are likely running code like this:

import machine

i2c = machine.I2C(machine.Pin(5), machine.Pin(4))

Solution

The MicroPython API has changed (source: forum). You need to use this syntax instead:

import machine

i2c = machine.I2C(-1, machine.Pin(5), machine.Pin(4))

-1 is the I2C ID that selects a specific peripheral. -1 selects a software I2C implementation which can work on most pins. See the MicroPython I2C class documentation for more details.

Posted by Uli Köhler in Electronics, MicroPython, Python

Get all suffixes for a string in Python (Suffix list)

Problem:

Given a string, e.g. foobar, you want to get the list of all suffixes of said string, e.g. ["r", "ar", "bar", "obar", "oobar", "foobar"]

Solution:

Use this snippet:

def all_suffixes(s):
    return [s[-i:] for i in range(1, len(s) + 1)]

s = "foobar"
print(all_suffixes(s)) # ['r', 'ar', 'bar', 'obar', 'oobar', 'foobar']

 

Posted by Uli Köhler in Algorithms, Python

How to use Z0 (characteristic impedance of vacuum) constant in SciPy/NumPy

If you want to use the Z0 constant (characteristic impedance of free space) in Python, use this snippet:

import scipy.constants
Z0 = scipy.constants.physical_constants['characteristic impedance of vacuum'][0]

print(Z0) # 376.73031346177066

In contrast to other constants, Z0 is not available directly like scipy.constants.pi but you need to use the scipy.constants.physical_constants dict in order to access it.

Posted by Uli Köhler in Mathematics, Python

Fixing requests session TypeError: __init__() got an unexpected keyword argument ‘headers’

Problem:

You are trying to initialize a Python requests session using a custom set of HTTP headers like this:

s = requests.Session(headers={
    "User-Agent": "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0"
})

but you only see this stacktrace:

File "RequestsSession.py", line 30, in translate
    "User-Agent": "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0"
TypeError: __init__() got an unexpected keyword argument 'headers'

Solution:

You can’t use the headers=... argument in the requests.Session(...) constructor.

Use s.headers.update({...}) instead:

s = requests.Session()
s.headers.update({
    "User-Agent": "Mozilla/5.0 (Windows NT 6.2; WOW64; rv:34.0) Gecko/20100101 Firefox/34.0"
})

 

Posted by Uli Köhler in Python

How to fix ModuleNotFoundError: No module named ‘google.cloud.iam’

Problem:

You want to run a Python script that uses one of the Google Cloud Python APIs but you get this error message:

ModuleNotFoundError: No module named 'google.cloud.iam'

Solution:

Reinstall any google cloud package using pip:

sudo pip install --upgrade google-cloud-storage

or

sudo pip3 install --upgrade google-cloud-storage

That will also reinstall the relevant google.cloud.iam module.

After that, re-run your script. If that didn’t work, try to install --upgrade some other google-cloud-* module, especially the modules you actually use in your script.

 

Posted by Uli Köhler in Cloud, Python

Fixing numpy.distutils.system_info.NotFoundError: No lapack/blas resources found on Ubuntu or Travis

Note: If you are on Windows, you can not install scipy using pip! Follow this guide instead: https://www.scipy.org/install.html. This blog post is only for Linux-based systems!

When building some of my libraries on Travis, I encountered this error during

sudo pip3 install numpy scipy --upgrade
numpy.distutils.system_info.NotFoundError: No lapack/blas resources

Solution

Install lapack and blas:

sudo apt-get -y install liblapack-dev libblas-dev

In most cases you will then get this error message:

error: library dfftpack has Fortran sources but no Fortran compiler found

Fix that by

sudo apt-get install -y gfortran

In Travis, you can do it like this in .travis.yml:

before_install:
    - sudo apt-get -y install liblapack-dev libblas-dev gfortran
Posted by Uli Köhler in Build systems, Linux, Python

Easily compute & visualize FFTs in Python using UliEngineering

UliEngineering is a mixed data analytics library in Python – one of the utilities it provides is an easy-to-use package to compute FFTs. In contrast to other package, this library is oriented towards practical usecases and allows you to do the FFT in only one line of code! No knowledge of Math required.

How to install UliEngineering

UliEngineering is a Python 3 only library. Install using pip:

sudo pip3 install -U UliEngineering

Getting started

First, we’ll generate some test data. See this previous post for more details on how to generate sinusoid test data:

from UliEngineering.SignalProcessing.Simulation import *

# Generate test data: 100 Hz + 400 Hz tone
data = sine_wave(frequency=100.0, samplerate=1000, amplitude=1.) \
       + sine_wave(frequency=400.0, samplerate=1000, amplitude=0.5)

The test data consists of a 100 Hz sine wave plus a 400 Hz sine wave (with half the amplitude). That signal is sampled with a sample rate of 1000 Hz.

Now we can compute & visualize the FFT using matplotlib:

# Compute FFT. Use the same samplerate 
# NOTE: Window defaults to "blackman"!
from UliEngineering.SignalProcessing.FFT import compute_fft
fft = compute_fft(data, samplerate=1e3)

# Plot
from matplotlib import pyplot as plt
plt.style.use("ggplot")

plt.gcf().set_size_inches(10, 5) # Use (20, 10) to get a larger plot
plt.plot(fft.frequencies, fft.amplitudes)
plt.xlabel("Frequency")
plt.ylabel("Amplitude")


compute_fft(data, samplerate=1e3) returns a tuple (fftx, ffty). It performs an FFT the size of the input (i.e. since data is a length-1000 array, the FFT will be of size 1000).

fft.frequencies is an array of frequencies (in Hz), corresponding to the values in fft.amplitudes. You can also use fft.angles to get relative angles in degrees, but that is not being covered in this blogpost.

As you can see in the plot shown above, the maximum frequency that can be detected is always half the samplerate, i.e. for our samplerate of f_s = 1000\,\text{Hz} it is 500\,\text{Hz}. See this more math-centric FFT explanation if you want to know more details.

Internally, compute_fft() performs this computation:

2 \cdot \frac{\text{abs}\left(\text{FFT}(\text{data} \cdot \text{Window})\right)}{\text{len(data)}}
  • 2 is a correction factor that takes into account that we throw away the latter half of the raw FFT result (since we’re doing a real FFT)
  • \frac{1}{\text{len(data)}} normalizes the FFT results so they are indepedent of the data length (i.e. if you pass a longer sample of the same sine wave you will still get the same result
  • \text{abs}\left(\cdots\right) Converts the complex phase-aware result of the FFT to a spectrum which is easier to read & visualize.
  • Window (which defaults to blackman) is the window which is applied to the data to alleviate some mathematical effects at the beginning and the end of the dataset. See wikipedia on window functions for more details. UliEngineering currently offers this list of window functions:
    • blackman
    • bartlett
    • hamming
    • hanning
    • kaiser (Parameter is fixed to 2.0)
    • none

Selecting frequency ranges

Using the UliEngineering API, selecting a frequency range of the FFT is trivially easy: Just use fft[lowfreq:highfreq]. You can use fft[lowfreq:] to select everything starting from lowfreq or use fft[:highfreq] to select everything up to highfreq.

from UliEngineering.SignalProcessing.FFT import compute_fft
fft = compute_fft(data, samplerate=1e3)

# Select frequency range: 50 to 200 Hz
fft = fft[50.0:200.0]

# Plot
from matplotlib import pyplot as plt
plt.style.use("ggplot")

plt.gcf().set_size_inches(10, 5) # Use (20, 10) to get a larger plot
plt.plot(fft.frequencies, fft.amplitudes)
plt.xlabel("Frequency")
plt.ylabel("Amplitude")
plt.savefig("/ram/fft-frequency-range.svg")

Extract amplitude & angle at a certain frequency

By using [frequency], i.e. getitem operator with a single value, you get an FFTPoint() object containing the frequency, amplitude and relative angle for a given frequency. The library automatically selects the closest FFT bucket, so even if your FFT does not have a bucket for that specific frequency, you will get sensible results.

from UliEngineering.SignalProcessing.FFT import compute_fft
fft = compute_fft(data, samplerate=1e3)

# Show value at a certain frequency
print(fft[30]) # FFTPoint(frequency=30.0, value=2.91e-08, angle=0.0)
print(fft[100]) # FFTPoint(frequency=100.0, value=0.419, angle=0.0)

Short FFTs for long data

Using compute_fft(), if we have an extremely long data array, this means we’ll compute an extremely long FFT. In many cases, this is not desirable and you want to compute a fixed-size FFT (most often a power-of-two FFT, e.g. 1024, 2048, 4096 etc).

Let’s generate some long test data and assume we want to compute a size-1024 FFT on it

from UliEngineering.SignalProcessing.FFT import *
fftx, ffty = simple_serial_fft_reduce(data, samplerate=1e3, fftsize=1024)

Plotting the data like above yields

which looks almost exactly like our compute_fft() plot before – just as we expected.

simple_serial_fft_reduce() takes care of all the magic and normalization for us, including partitioning the data into overlapping chunks, adding the FFTs and properly normalizing the result.

The naming convention is significant here:

  • simple_...._reduce means this is a variant with sensible defaults (simple) for using a reduction function (default: sum) on multiple FFTs.
  • serial means the individual FFTs

Parallelizing the FFTs

If you have a huge dataset, you can use simple_parallel_fft_reduce() identically tosimple_serial_fft_reduce():

from UliEngineering.SignalProcessing.FFT import *
fftx, ffty = simple_parallel_fft_reduce(data, samplerate=1e3, fftsize=1024)

However in most cases you want to initialize the executor manually so you can re-use it later:

from UliEngineering.SignalProcessing.FFT import *
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor() # No argument => use num_cpus threads
fftx, ffty = simple_parallel_fft_reduce(data, samplerate=1e3, fftsize=1024, executor=executor)

We can use a ThreadPoolExecutor() since scipy.fftpack (which UliEngineering uses to do the hard math) unlocks the Python GIL.

Note that due to the need to do a lot of housekeeping tasks, simple_parallel_fft_reduce() is much slower than simple_serial_fft_reduce() if you have a dataset so small that parallelization is not effective. My initial recommendation is to consider using the parallel variant if the total execution time of the serial variant is larger than 0.5\,s

Posted by Uli Köhler in Data science, Mathematics, Python

Easily generate square/triangle/sawtooth/inverse sawtooth waveform data in Python using UliEngineering

In a previous post, I’ve detailed how to generate sine/cosine wave data with frequency, amplitude, offset, phase shift and time offset using only a single line of code.

This post extends this approach by showing how to generate square wave, triangle wave, sawtooth wave and inverse sawtooth wave data – still in only one line of code.

All of the parameters, including frequency, amplitude, offset, phase shift and time offset apply for those functions as well – see the previous post for details and examples for those parameters.

We are using the UliEngineering library, more specifically the UliEngineering.SignalProcessing.Simulation package:

How to install UliEngineering

UliEngineering is a Python 3 only library. Install using pip:

sudo pip3 install -U UliEngineering

Square wave

from UliEngineering.SignalProcessing.Simulation import square_wave

data = square_wave(frequency=10.0, samplerate=10e3)

Triangle wave

from UliEngineering.SignalProcessing.Simulation import triangle_wave

data = triangle_wave(frequency=10.0, samplerate=10e3)

Sawtooth wave

from UliEngineering.SignalProcessing.Simulation import sawtooth

data = sawtooth(frequency=10.0, samplerate=10e3)

Inverse sawtooth wave

from UliEngineering.SignalProcessing.Simulation import inverse_sawtooth

data = inverse_sawtooth(frequency=10.0, samplerate=10e3)

Plotting code

This code was used to generate the plots for this post in Jupyter:

%matplotlib inline
from matplotlib import pyplot as plt
plt.style.use("ggplot")

from UliEngineering.SignalProcessing.Simulation import square_wave

data = square_wave(frequency=10.0, samplerate=10e3)

# set_size_inches(20, 10) to make it even larger!
plt.gcf().set_size_inches(10, 5)
plt.plot(data, label="original")
plt.savefig("/dev/shm/square-wave.svg")
Posted by Uli Köhler in Data science, Mathematics, Python