Programming languages

How to skip first element of a Generator/Iterator in Python

Use the skip_first() utility function from UliEngineering:

First, install UliEngineering using

pip install --user UliEngineering

Note that UliEngineering requires Python 3.3+.

Now you can use skip_first() like this:

from UliEngineering.Utils.Iterable import skip_first

for v in skip_first(v for v in [1,2,3,4,5]):
    print(v) # Prints 2,3,4,5

skip_first() will work for any Iterable or Iterator.

Don’t want to install UliEngineering?

Copy the skip_first() utility function into your own code:

import collections

def skip_first(it):
    """
    Skip the first element of an Iterator or Iterable,
    like a Generator or a list.
    This will always return a generator or raise TypeError()
    in case the argument's type is not compatible
    """
    if isinstance(it, collections.Iterator):
        try:
            next(it)
            yield from it
        except StopIteration:
            return
    elif isinstance(it, collections.Iterable):
        yield from skip_first(it.__iter__())
    else:
        raise TypeError(f"You must pass an Iterator or an Iterable to skip_first(), but you passed {it}")

 

Posted by Uli Köhler in Python

How to fix NumPy timedelta64 TypeError: Invalid datetime unit “min” in metadata

Problem:

You want to construct a NumPy timedelta64 from a value in minutes using

np.timedelta64(1, 'min')

but you see an error message like

Traceback (most recent call last):
  File "test.py", line 3, in <module>
    delta = np.timedelta64(1, 'min')
TypeError: Invalid datetime unit "min" in metadata

Solution:

numpy uses m as specifier for minutes, not min! Change your code to

np.timedelta64(1, 'm')

 

Posted by Uli Köhler in Python

Split pandas DataFrame every time a Series is True

In our previous post we explored how to Split pandas DataFrame every time a column is True.

This slightly modified function also works if the given Series is not a column in the DataFrame:

def split_dataframe_by_series(df, series):
    """
    Split a DataFrame where the given series is True. Yields a number of dataframes
    """
    previous_index = df.index[0]

    for split_point in df[series].index:
        yield df[previous_index:split_point]
        previous_index = split_point
    # Yield remainder of dataset
    try:
        yield df[split_point:]
    except UnboundLocalError:
        pass # There is no split point => Ignore

Full example

We’ll use the ZeroCrossing column we built in our previous post on How to detect value change in pandas string column/series which itself builds on our post on How to create pandas time series DataFrame example dataset. Based on that example we add the modified utility function shown above:

import pandas as pd

# Load pre-built time series example dataset
df = pd.read_csv("https://techoverflow.net/datasets/timeseries-example.csv", parse_dates=["Timestamp"])
df.set_index("Timestamp", inplace=True)

# Create a new column containing "Positive" or "Negative"
df["SinePositive"] = (df["Sine"] >= 0).map({True: "Positive", False: "Negative"})
# Create "change" column (boolean)
df["ZeroCrossing"] = df["SinePositive"].shift() != df["SinePositive"]
# Set first entry to False
df["ZeroCrossing"].iloc[0] = False

def split_dataframe_by_series(df, series):
    """
    Split a DataFrame where the given series is True. Yields a number of dataframes
    """
    previous_index = df.index[0]

    for split_point in df[series].index:
        yield df[previous_index:split_point]
        previous_index = split_point
    # Yield remainder of dataset
    try:
        yield df[split_point:]
    except UnboundLocalError:
        pass # There is no split point => Ignore

# Print result
split_frames = list(split_dataframe_by_series(df, df["ZeroCrossing"]))
print(f"Split DataFrame into {len(split_frames)} separate frames by zero-crossing")

Note that converting the result of split_dataframe_to_series() into a list might not be neccessary depending on your application. If possible, I recommend directly iterating the data frames using a for loop, e.g.:

for df_section in split_dataframe_by_series(df, df["ZeroCrossing"]):
    pass # TODO: Your code goes here!

 

Posted by Uli Köhler in pandas, Python

Split pandas DataFrame every time a column is True

TL;DR

If the Series you want to use to split is a column in the DataFrame, continue reading this post. Else, read Split pandas DataFrame every time a Series is True.

Use this utility function:

def split_dataframe_by_column(df, column):
    """
    Split a DataFrame where a column is True. Yields a number of dataframes
    """
    previous_index = df.index[0]

    for split_point in df[df[column]].index:
        yield df[previous_index:split_point]
        previous_index = split_point
    # Yield remainder of dataset
    try:
        yield df[split_point:]
    except UnboundLocalError:
        pass # There is no split point => Ignore

# Usage example:
list(split_dataframe_by_column(df, "ZeroCrossing"))

Note that one or more of those dataframes might be empty.

Full example:

We’ll use the ZeroCrossing column we built in our previous post on How to detect value change in pandas string column/series which itself builds on our post on How to create pandas time series DataFrame example dataset. Based on that example we add the utility function shown above:

import pandas as pd

# Load pre-built time series example dataset
df = pd.read_csv("https://techoverflow.net/datasets/timeseries-example.csv", parse_dates=["Timestamp"])
df.set_index("Timestamp", inplace=True)

# Create a new column containing "Positive" or "Negative"
df["SinePositive"] = (df["Sine"] >= 0).map({True: "Positive", False: "Negative"})
# Create "change" column (boolean)
df["ZeroCrossing"] = df["SinePositive"].shift() != df["SinePositive"]
# Set first entry to False
df["ZeroCrossing"].iloc[0] = False

def split_dataframe_by_column(df, column):
    """Split a DataFrame where a column is True. Yields a number of dataframes"""
    previous_index = df.index[0]

    for split_point in df[df[column]].index:
        yield df[previous_index:split_point]
        previous_index = split_point
    # Yield remainder of dataset
    try:
        yield df[split_point:]
    except UnboundLocalError:
        pass # There is no split point => Ignore

# Print result
split_frames = list(split_dataframe_by_column(df, "ZeroCrossing"))
print(f"Split DataFrame into {len(split_frames)} separate frames by zero-crossing")
# This prints "Split DataFrame into 20 separate frames by zero-crossing"

 

Posted by Uli Köhler in pandas, Python

Get index where column is True in pandas

TL;DR

Simply use

df[df["ZeroCrossing"]].index

Full example:

We’ll use the ZeroCrossing column we built in our previous post on How to detect value change in pandas string column/series which itself builds on our post on How to create pandas time series DataFrame example dataset. Based on that example, we only modify the last line:

import pandas as pd

# Load pre-built time series example dataset
df = pd.read_csv("https://techoverflow.net/datasets/timeseries-example.csv", parse_dates=["Timestamp"])
df.set_index("Timestamp", inplace=True)

# Create a new column containing "Positive" or "Negative"
df["SinePositive"] = (df["Sine"] >= 0).map({True: "Positive", False: "Negative"})
# Create "change" column (boolean)
df["ZeroCrossing"] = df["SinePositive"].shift() != df["SinePositive"]
# Set first entry to False
df["ZeroCrossing"].iloc[0] = False

# Print result
print(df[df["ZeroCrossing"]].index)

This prints

DatetimeIndex(['2020-05-25 20:05:10.040874', '2020-05-25 20:05:10.090874',
               '2020-05-25 20:05:10.140874', '2020-05-25 20:05:10.190874',
               '2020-05-25 20:05:10.240874', '2020-05-25 20:05:10.290874',
               '2020-05-25 20:05:10.340874', '2020-05-25 20:05:10.390874',
               '2020-05-25 20:05:10.440774', '2020-05-25 20:05:10.490874',
               '2020-05-25 20:05:10.540874', '2020-05-25 20:05:10.590874',
               '2020-05-25 20:05:10.640774', '2020-05-25 20:05:10.690874',
               '2020-05-25 20:05:10.740874', '2020-05-25 20:05:10.790874',
               '2020-05-25 20:05:10.840874', '2020-05-25 20:05:10.890774',
               '2020-05-25 20:05:10.940874'],
              dtype='datetime64[ns]', name='Timestamp', freq=None)
Posted by Uli Köhler in pandas, Python

How to convert pandas Timedelta to seconds

TL;DR

Use

my_timedelta / np.timedelta64(1, 's')

Full example

import pandas as pd
import numpy as np
import time

# Create timedelta
t1 = pd.Timestamp("now")
time.sleep(3)
t2 = pd.Timestamp("now")
my_timedelta = t2 - t1

# Convert timedelta to seconds
my_timedelta_in_seconds = my_timedelta / np.timedelta64(1, 's')
print(my_timedelta_in_seconds) # prints 3.00154

 

Posted by Uli Köhler in pandas, Python

How to detect value change in pandas string column/series

TL;DR

In order to get a series that is True every time the input string column changes, use

my_column_changes = df["MyStringColumn"].shift() != df["MyStringColumn"]

The first value of this Series will always be True since the value is considered to be NaN before the start of the series (due to the behaviour of shift()). In order to force the first value to be False, use

my_column_changes.iloc[0] = False

In order to get the rows in the dataframe where the column changes, use

df[my_column_changes]

or use this one-liner:

df[df["MyStringColumn"].shift() != df["MyStringColumn"]]

In order to assign this value to a new column in the DataFrame, use e.g.

df["MyStringColumnChanges"] = df["MyStringColumn"].shift() != df["MyStringColumn"]

Full example:

First we load our example from our previous post on How to create pandas time series DataFrame example dataset:

import pandas as pd

# Load pre-built time series example dataset
df = pd.read_csv("https://techoverflow.net/datasets/timeseries-example.csv", parse_dates=["Timestamp"])
df.set_index("Timestamp", inplace=True)

Now we create a new column that contains Positive if the sine wave value in the "Sine" column is positive or "Negative" if that value is negative:

df["SinePositive"] = (df["Sine"] >= 0).map({True: "Positive", False: "Negative"})

Now we create the ZeroCrossing column using the method shown above:

# Create "change" column (boolean)
df["ZeroCrossing"] = df["SinePositive"].shift() != df["SinePositive"]

… and set the first entry to False since we don’t consider the start of the series to be a zero crossing:

df["ZeroCrossing"].iloc[0] = False

Now we can use

df[df["ZeroCrossing"]]

to show the rows in the DataFrame where the zero crossing happened:

                                    Sine   Cosine SinePositive  ZeroCrossing
Timestamp                                                                   
2020-05-25 20:05:10.040874 -6.283144e-03 -0.99998     Negative          True
2020-05-25 20:05:10.090874  6.283144e-03  0.99998     Positive          True
2020-05-25 20:05:10.140874 -6.283144e-03 -0.99998     Negative          True
2020-05-25 20:05:10.190874  6.283144e-03  0.99998     Positive          True
2020-05-25 20:05:10.240874 -6.283144e-03 -0.99998     Negative          True
2020-05-25 20:05:10.290874  6.283144e-03  0.99998     Positive          True
2020-05-25 20:05:10.340874 -6.283144e-03 -0.99998     Negative          True
2020-05-25 20:05:10.390874  6.283144e-03  0.99998     Positive          True
2020-05-25 20:05:10.440774 -2.450532e-15 -1.00000     Negative          True
2020-05-25 20:05:10.490874  6.283144e-03  0.99998     Positive          True
2020-05-25 20:05:10.540874 -6.283144e-03 -0.99998     Negative          True
2020-05-25 20:05:10.590874  6.283144e-03  0.99998     Positive          True
2020-05-25 20:05:10.640774 -1.960673e-15 -1.00000     Negative          True
2020-05-25 20:05:10.690874  6.283144e-03  0.99998     Positive          True
2020-05-25 20:05:10.740874 -6.283144e-03 -0.99998     Negative          True
2020-05-25 20:05:10.790874  6.283144e-03  0.99998     Positive          True
2020-05-25 20:05:10.840874 -6.283144e-03 -0.99998     Negative          True
2020-05-25 20:05:10.890774  4.901063e-15  1.00000     Positive          True
2020-05-25 20:05:10.940874 -6.283144e-03 -0.99998     Negative          True

 

Full example code:

import pandas as pd

# Load pre-built time series example dataset
df = pd.read_csv("https://techoverflow.net/datasets/timeseries-example.csv", parse_dates=["Timestamp"])
df.set_index("Timestamp", inplace=True)

# Create a new column containing "Positive" or "Negative"
df["SinePositive"] = (df["Sine"] >= 0).map({True: "Positive", False: "Negative"})
# Create "change" column (boolean)
df["ZeroCrossing"] = df["SinePositive"].shift() != df["SinePositive"]
# Set first entry to False
df["ZeroCrossing"].iloc[0] = False

# Print result
print(df[df["ZeroCrossing"]])

 

Posted by Uli Köhler in pandas, Python

Matplotlib custom SI-prefix unit tick formatter

You can use UliEngineering’s format_value() to easily make a custom matplotlib tick formatter that formats values with SI-prefixes like k, M, G, T, …

Formatting the Y axis ticks

The following example formats the Y axis in the Unit J (Joule). For example, 100000 would be formatted as 100 kJ.

The formatter function we use is

def format_joules(value, pos=None):
    return format_value(value, 'J')

In order to set the formatter, function, use

# Set our formatter as Y axis formatter
plt.gca().yaxis.set_major_formatter(mtick.FuncFormatter(format_joules))

Example:

import matplotlib.ticker as mtick
from UliEngineering.EngineerIO import format_value
from matplotlib import pyplot as plt

def format_joules(value, pos=None):
    return format_value(value, 'J')

# Set our formatter as Y axis formatter
plt.gca().yaxis.set_major_formatter(mtick.FuncFormatter(format_joules))

Formatting the X axis ticks

In order to format the X axis ticks instead, use the same formatter function but activate it using

# Set our formatter as Y axis formatter
plt.gca().xaxis.set_major_formatter(mtick.FuncFormatter(format_joules))

How to set the number of decimal places

UliEngineering’s format_value() allows you set the decimal places using e.g. significant_digits=4

def format_joules(value, pos=None):
    return format_value(value, 'J', significant_digits=4)

Full example

This example generates the Y axis plot shown above

import matplotlib.ticker as mtick
from UliEngineering.EngineerIO import format_value
from matplotlib import pyplot as plt
plt.style.use("ggplot")
import numpy as np

def format_joules(value, pos=None):
    return format_value(value, 'J')

# Set our formatter as Y axis formatter
plt.gca().yaxis.set_major_formatter(mtick.FuncFormatter(format_joules))

# Generate test data
test_data = np.arange(1, 1.2e6)
plt.plot(test_data)
plt.gcf().set_size_inches(10,5)
plt.savefig("/ram/mpl-si-formatter.svg")
Posted by Uli Köhler in Python

How to create pandas time series DataFrame example dataset

TL;DR: Use our pre-built example dataset like this:

# Load pre-built time series example dataset
df = pd.read_csv("https://datasets.techoverflow.net/timeseries-example.csv", parse_dates=["Timestamp"])
df.set_index("Timestamp", inplace=True)

How to build your own time series example dataset

In our previous post Easily generate sine/cosine waveform data in Python using UliEngineering we showed how to generate sine and cosine waves using UliEngineering.

In this post, we show how to create a pandas DataFrame containing sine and cosine data to be used as a sample time series dataset.

First, we generate the sine and cosine wave data:

import pandas as pd
import numpy as np
from UliEngineering.SignalProcessing.Simulation import sine_wave, cosine_wave

# Configure the properties of the sine wave here
frequency = 10.0 # 10 Hz sine / cosine wave
samplerate = 10000 # 10 kHz
nseconds = 1 # Generate 1 second of data

sine = sine_wave(frequency=frequency, samplerate=samplerate, length=nseconds)
cosine = cosine_wave(frequency=frequency, samplerate=samplerate, length=nseconds)
nsamples = len(sine) # How many values we have in the data arrays

After that, we define the timestamp where the dataset starts:

start_timestamp = pd.Timestamp('now')

Now we can create a list of Timestamp objects representing the points in time where the signal has been sampled:

# Create timestamps by offsetting
timedelta = pd.Timedelta(1/samplerate, 'seconds')
timestamps =  [start_timestamp + i * timedelta for i in range(nsamples)]

Now we’re reading to create the DataFrame object:

df = pd.DataFrame(index=timestamps, data={
    "Sine": sine,
    "Cosine": cosine
})
df.index.name = 'Timestamp'

Now we can use df.plot() to plot the dataset:

# Use nice plotting style
from matplotlib import pyplot as plt
plt.style.use("ggplot")
# Plot dataset
df.plot()
# Make figure larger
plt.gcf().set_size_inches(10, 5)

Additionally we can export the dataset as CSV using

df.to_csv("/ram/timeseries-example.csv")

This example file is also available online at https://techoverflow.net/datasets/timeseries-example.csv

Full example:

#!/usr/bin/env python3
import pandas as pd
import numpy as np
from UliEngineering.SignalProcessing.Simulation import sine_wave, cosine_wave
# Configure the properties of the sine wave here
frequency = 10.0 # 10 Hz sine / cosine wave
samplerate = 10000 # 10 kHz
nseconds = 1 # Generate 1 second of data
sine = sine_wave(frequency=frequency, samplerate=samplerate, length=nseconds)
cosine = cosine_wave(frequency=frequency, samplerate=samplerate, length=nseconds)
nsamples = len(sine) # How many values we have in the data arrays

start_timestamp = pd.Timestamp('now')

# Create timestamps by offsetting
timedelta = pd.Timedelta(1/samplerate, 'seconds')
timestamps =  [start_timestamp + i * timedelta for i in range(nsamples)]

df = pd.DataFrame(index=timestamps, data={
    "Sine": sine,
    "Cosine": cosine
})
df.index.name = 'Timestamp'

df.to_csv("timeseries-example.csv")

 

Posted by Uli Köhler in pandas, Python

How to get last 10 minutes of a pandas DataFrame

In our previous post we showed how to subtract 5 minutes from a pandas DataFrame:

pd.Timestamp('now') - pd.Timedelta(10, 'minutes')

We can also use this knowledge in order to get the last 10 minutes of a pandas DataFrame. In our example, we assume that df[“Timestamp”] contains the timestamp. First, we get the last timestamp in the dataset using

# Use this if the timestamp is the index of the DataFrame
last_ts = df.index.iloc[-1]

or

# ... or use this if the timestamp is in a colum
last_ts = df["Timestamp"].iloc[-1]

Next, we define the first timestamp that shall be considered by subtracting 10 minutes from last_ts:

first_ts = last_ts - pd.Timedelta(10, 'minutes')

Now we can filter the DataFrame using

# Use this if the Timestamp is in a column
filtered_df = df[df["Timestamp"] >= first_ts]

or

# Use this if the Timestamp is the index of the DataFrame
filtered_df = df[df.index >= first_ts]

By filtering, we don’t need the DataFrame to be sorted and the original order will be maintained.

Full example:

This example loads our pre-built time series example dataset from our previous post How to create pandas time series DataFrame example dataset. The code loads that dataset (which is 1 second long) and takes the last 0.5 seconds from it.

import pandas as pd

# Load example dataset
df = pd.read_csv("https://techoverflow.net/datasets/timeseries-example.csv", parse_dates=["Timestamp"])
df.set_index("Timestamp", inplace=True)

# Use this if the timestamp is the index of the DataFrame
last_ts = df.index[-1]

first_ts = last_ts - pd.Timedelta(0.5, 'seconds')

filtered_df = df[df.index >= first_ts]

# Plot the result
filtered_df.plot()

 

Posted by Uli Köhler in pandas, Python

How to subtract 5 minutes from pandas Timestamp

In our previous post we showed how to create a pandas Timestamp representing the current point in time:

pd.Timestamp('now')

You can subtract 5 minutes from that timestamp by using Timedelta(5, 'minutes'):

pd.Timestamp('now') - pd.Timedelta(5, 'minutes')
Posted by Uli Köhler in pandas, Python

How to create pandas ‘now’ Timestamp

In order to create a pandas Timestamp representing the current point in time, use

pd.Timestamp('now')

This will create a Timestamp in the current timezone.

Full example:

import pandas as pd

now = pd.Timestamp('now')

print(now) # Prints e.g. Timestamp('2020-05-25 19:02:31.051836')

 

Posted by Uli Köhler in pandas, Python

How to get last row of Pandas DataFrame

Use .iloc[-1] to get the last row (all columns) of a pandas DataFrame, for example:

my_dataframe.iloc[-1]

 

Posted by Uli Köhler in pandas, Python

How to get last element of Pandas Series

Use .iloc[-1] to get the last element of a pandas Series, for example:

my_dataframe['MyColumn'].iloc[-1]

 

Posted by Uli Köhler in pandas, Python

Python threading minimal example

This minimal example shows how to create a thread that prints Hello world every second:

import threading
import time

def my_thread_func():
    while True:
        print("Hello world")
        time.sleep(1)

my_thread = threading.Thread(target=my_thread_func)
my_thread.start()

 

Posted by Uli Köhler in Python

How to fix RCurl ‘Cannot find curl-config’ or ‘checking for curl-config… no’

Problem:

You want to install RCurl using

Rscript -e "install.packages('RCurl')"

but you see an error message like

Installing package into ‘/usr/local/lib/R/site-library’
(as ‘lib’ is unspecified)
trying URL 'https://cloud.r-project.org/src/contrib/RCurl_1.98-1.2.tar.gz'
Content type 'application/x-gzip' length 699583 bytes (683 KB)
==================================================
downloaded 683 KB

* installing *source* package ‘RCurl’ ...
** package ‘RCurl’ successfully unpacked and MD5 sums checked
** using staged installation
checking for curl-config... no
Cannot find curl-config
ERROR: configuration failed for package ‘RCurl’
* removing ‘/usr/local/lib/R/site-library/RCurl’

The downloaded source packages are in
 ‘/tmp/RtmpSVQKd1/downloaded_packages’
Warning message:
In install.packages("RCurl") :
  installation of package ‘RCurl’ had non-zero exit status

Solution:

You need to install the curl development headers on your machine. RCurl complains that it can’t find those development headers.

On Ubuntu/debian, use

sudo apt -y install libcurl4-openssl-dev

 

Posted by Uli Köhler in R

How to fix Python unittest __init__() takes 1 positional argument but 2 were given

Problem:

You are trying to run your Python unit tests using the unittest package, but you see this unspecific stack trace:

Traceback (most recent call last):
  File "/usr/lib/python3.7/runpy.py", line 193, in _run_module_as_main
    "__main__", mod_spec)
  File "/usr/lib/python3.7/runpy.py", line 85, in _run_code
    exec(code, run_globals)
  File "/usr/lib/python3.7/unittest/__main__.py", line 18, in <module>
    main(module=None)
  File "/usr/lib/python3.7/unittest/main.py", line 100, in __init__
    self.parseArgs(argv)
  File "/usr/lib/python3.7/unittest/main.py", line 124, in parseArgs
    self._do_discovery(argv[2:])
  File "/usr/lib/python3.7/unittest/main.py", line 244, in _do_discovery
    self.createTests(from_discovery=True, Loader=Loader)
  File "/usr/lib/python3.7/unittest/main.py", line 154, in createTests
    self.test = loader.discover(self.start, self.pattern, self.top)
  File "/usr/lib/python3.7/unittest/loader.py", line 349, in discover
    tests = list(self._find_tests(start_dir, pattern))
  File "/usr/lib/python3.7/unittest/loader.py", line 414, in _find_tests
    yield from self._find_tests(full_path, pattern, namespace)
  File "/usr/lib/python3.7/unittest/loader.py", line 406, in _find_tests
    full_path, pattern, namespace)
  File "/usr/lib/python3.7/unittest/loader.py", line 460, in _find_test_path
    return self.loadTestsFromModule(module, pattern=pattern), False
  File "/usr/lib/python3.7/unittest/loader.py", line 124, in loadTestsFromModule
    tests.append(self.loadTestsFromTestCase(obj))
  File "/usr/lib/python3.7/unittest/loader.py", line 93, in loadTestsFromTestCase
    loaded_suite = self.suiteClass(map(testCaseClass, testCaseNames))
  File "/usr/lib/python3.7/unittest/suite.py", line 24, in __init__
    self.addTests(tests)
  File "/usr/lib/python3.7/unittest/suite.py", line 57, in addTests
    for test in tests:
TypeError: __init__() takes 1 positional argument but 2 were given

Solution:

You have at least one test case like this one:

class MyTest(unittest.TestCase):
    def __init__(self):
        self.x = 1.0

    def test_stuff(self):
        assert(self.x == 1.0)

Overriding __init__(...) is not possible in this way when using unittest. You need to use setUp() instead.

Usually, just replacing def __init__(self): by def setUp(self): will do the trick. unittests will call setUp() automatically.

Our example will look like this:

class MyTest(unittest.TestCase):
    def setUp(self):
        self.x = 1.0

    def test_stuff(self):
        assert(self.x == 1.0)

If the error still persists, check if you have more testcases overriding the __init__() method.

Posted by Uli Köhler in Python

Two easy ways to download a file using Python requests

requests does not provide a

Option 1: Use requests_download:

First, install requests_download using

sudo pip3 install requests_download

or equivalent.

Now you can use it like this:

from requests_download import download

download(url, filename)

It also has built-in progress bar support:

from requests_download import download, HashTracker, ProgressTracker
from progressbar import DataTransferBar # sudo pip3 install progressbar2

progress = ProgressTracker(DataTransferBar())

download(pdfUrl, filename, trackers=(progress,))

Option 2: Do it yourself:

Use this snippet in your code:

import requests
import shutil

def requests_download_file(url, filename):
    with requests.get(url, stream=True) as response:
        with open(filename, 'wb') as fout:
            shutil.copyfileobj(response.raw, fout)
Posted by Uli Köhler in Python

How to fix landscape-package-reporter: UnicodeDecodeError: ‘utf-8’ codec can’t decode byte

On some servers attached to a landscape instance, I encountered this stacktrace when trying to run sudo landscape-package-reporter:

Traceback (most recent call last):
  File "/usr/lib/python3/dist-packages/twisted/internet/defer.py", line 653, in _runCallbacks
    current.result = callback(current.result, *args, **kw)
  File "/usr/lib/python3/dist-packages/landscape/client/package/reporter.py", line 92, in <lambda>
    result.addCallback(lambda x: self.request_unknown_hashes())
  File "/usr/lib/python3/dist-packages/landscape/client/package/reporter.py", line 485, in request_unknown_hashes
    self._facade.ensure_channels_reloaded()
  File "/usr/lib/python3/dist-packages/landscape/lib/apt/package/facade.py", line 265, in ensure_channels_reloaded
    self.reload_channels()
  File "/usr/lib/python3/dist-packages/landscape/lib/apt/package/facade.py", line 253, in reload_channels
    version, with_info=False).get_hash()
  File "/usr/lib/python3/dist-packages/landscape/lib/apt/package/facade.py", line 402, in get_package_skeleton
    return build_skeleton_apt(pkg, with_info=with_info, with_unicode=True)
  File "/usr/lib/python3/dist-packages/landscape/lib/apt/package/skeleton.py", line 131, in build_skeleton_apt
    version.record, "Provides", DEB_PROVIDES))
  File "/usr/lib/python3/dist-packages/apt/package.py", line 690, in record
    return Record(self._records.record)
UnicodeDecodeError: 'utf-8' codec can't decode byte 0x96 in position 724: invalid start byte

Tracing down the issue, it was related with a misplaced set of Unicode bytes (EB BF BD) in an old veeam version in version 1.0.0.944 of the veeamsnap package in /var/lib/apt/lists/repository.veeam.com_backup_linux_agent_dpkg_debian_public_dists_stable_veeam_binary-amd64_Packages: The Description field contains this text:

[...] Linux � simple [...]

The strange character is the U+FFFD � REPLACEMENT CHARACTER.

You can fix it by deleting this character. It’s just at the end of /var/lib/apt/lists/repository.veeam.com_backup_linux_agent_dpkg_debian_public_dists_stable_veeam_binary-amd64_Packages. However, if there’s an update for that repository, your change will be overwritten.

In order to fix it (my fix is for landscape-client version 18.01-0ubuntu3.5), I added a try: ... except: ... clause to skeleton.py, which will ignore some properties of the package where the issue occurs:

try:
    relations.update(parse_record_field(
        version.record, "Provides", DEB_PROVIDES))
    relations.add((
        DEB_NAME_PROVIDES,
        "%s = %s" % (version.package.name, version.version)))
    relations.update(parse_record_field(
        version.record, "Pre-Depends", DEB_REQUIRES, DEB_OR_REQUIRES))
    relations.update(parse_record_field(
        version.record, "Depends", DEB_REQUIRES, DEB_OR_REQUIRES))

    relations.add((
        DEB_UPGRADES, "%s < %s" % (version.package.name, version.version)))

    relations.update(parse_record_field(
        version.record, "Conflicts", DEB_CONFLICTS))
    relations.update(parse_record_field(
        version.record, "Breaks", DEB_CONFLICTS))
    skeleton.relations = sorted(relations)

    if with_info:
        skeleton.section = version.section
        skeleton.summary = version.summary
        skeleton.description = version.description
        skeleton.size = version.size
        if version.installed_size > 0:
            skeleton.installed_size = version.installed_size
        if with_unicode and not _PY3:
            skeleton.section = skeleton.section.decode("utf-8")
            skeleton.summary = skeleton.summary.decode("utf-8")
            # Avoid double-decoding package descriptions in build_skeleton_apt,
            # which causes an error with newer python-apt (Xenial onwards)
            if not isinstance(skeleton.description, unicode):
                skeleton.description = skeleton.description.decode("utf-8")
    return skeleton
except UnicodeError:
    return skeleton

Replace /usr/lib/python3/dist-packages/landscape/lib/apt/package/skeleton.py by this:

from landscape.lib.hashlib import sha1

import apt_pkg

from twisted.python.compat import unicode, _PY3


PACKAGE   = 1 << 0
PROVIDES  = 1 << 1
REQUIRES  = 1 << 2
UPGRADES  = 1 << 3
CONFLICTS = 1 << 4

DEB_PACKAGE       = 1 << 16 | PACKAGE
DEB_PROVIDES      = 2 << 16 | PROVIDES
DEB_NAME_PROVIDES = 3 << 16 | PROVIDES
DEB_REQUIRES      = 4 << 16 | REQUIRES
DEB_OR_REQUIRES   = 5 << 16 | REQUIRES
DEB_UPGRADES      = 6 << 16 | UPGRADES
DEB_CONFLICTS     = 7 << 16 | CONFLICTS


class PackageTypeError(Exception):
    """Raised when an unsupported package type is passed to build_skeleton."""


class PackageSkeleton(object):

    section = None
    summary = None
    description = None
    size = None
    installed_size = None
    _hash = None

    def __init__(self, type, name, version):
        self.type = type
        self.name = name
        self.version = version
        self.relations = []

    def add_relation(self, type, info):
        self.relations.append((type, info))

    def get_hash(self):
        """Calculate the package hash.

        If C{set_hash} has been used, that hash will be returned and the
        hash won't be the calculated value.
        """
        if self._hash is not None:
            return self._hash
        # We use ascii here as encoding  for backwards compatibility as it was
        # default encoding for conversion from unicode to bytes in Python 2.7.
        package_info = ("[%d %s %s]" % (self.type, self.name, self.version)
                        ).encode("ascii")
        digest = sha1(package_info)
        self.relations.sort()
        for pair in self.relations:
            digest.update(("[%d %s]" % (pair[0], pair[1])
                           ).encode("ascii"))
        return digest.digest()

    def set_hash(self, package_hash):
        """Set the hash to an explicit value.

        This should be used when the hash is previously known and can't
        be calculated from the relations anymore.

        The only use case for this is package resurrection. We're
        planning on getting rid of package resurrection, and this code
        can be removed when that is done.
        """
        self._hash = package_hash


def relation_to_string(relation_tuple):
    """Convert an apt relation to a string representation.

    @param relation_tuple: A tuple, (name, version, relation). version
        and relation can be the empty string, if the relation is on a
        name only.

    Returns something like "name > 1.0"
    """
    name, version, relation_type = relation_tuple
    relation_string = name
    if relation_type:
        relation_string += " %s %s" % (relation_type, version)
    return relation_string


def parse_record_field(record, record_field, relation_type,
                       or_relation_type=None):
    """Parse an apt C{Record} field and return skeleton relations

    @param record: An C{apt.package.Record} instance with package information.
    @param record_field: The name of the record field to parse.
    @param relation_type: The deb relation that can be passed to
        C{skeleton.add_relation()}
    @param or_relation_type: The deb relation that should be used if
        there is more than one value in a relation.
    """
    relations = set()
    values = apt_pkg.parse_depends(record.get(record_field, ""))
    for value in values:
        value_strings = [relation_to_string(relation) for relation in value]
        value_relation_type = relation_type
        if len(value_strings) > 1:
            value_relation_type = or_relation_type
        relation_string = " | ".join(value_strings)
        relations.add((value_relation_type, relation_string))
    return relations


def build_skeleton_apt(version, with_info=False, with_unicode=False):
    """Build a package skeleton from an apt package.

    @param version: An instance of C{apt.package.Version}
    @param with_info: Whether to extract extra information about the
        package, like description, summary, size.
    @param with_unicode: Whether the C{name} and C{version} of the
        skeleton should be unicode strings.
    """
    name, version_string = version.package.name, version.version
    if with_unicode:
        name, version_string = unicode(name), unicode(version_string)
    skeleton = PackageSkeleton(DEB_PACKAGE, name, version_string)
    relations = set()
    try:
        relations.update(parse_record_field(
            version.record, "Provides", DEB_PROVIDES))
        relations.add((
            DEB_NAME_PROVIDES,
            "%s = %s" % (version.package.name, version.version)))
        relations.update(parse_record_field(
            version.record, "Pre-Depends", DEB_REQUIRES, DEB_OR_REQUIRES))
        relations.update(parse_record_field(
            version.record, "Depends", DEB_REQUIRES, DEB_OR_REQUIRES))

        relations.add((
            DEB_UPGRADES, "%s < %s" % (version.package.name, version.version)))

        relations.update(parse_record_field(
            version.record, "Conflicts", DEB_CONFLICTS))
        relations.update(parse_record_field(
            version.record, "Breaks", DEB_CONFLICTS))
        skeleton.relations = sorted(relations)

        if with_info:
            skeleton.section = version.section
            skeleton.summary = version.summary
            skeleton.description = version.description
            skeleton.size = version.size
            if version.installed_size > 0:
                skeleton.installed_size = version.installed_size
            if with_unicode and not _PY3:
                skeleton.section = skeleton.section.decode("utf-8")
                skeleton.summary = skeleton.summary.decode("utf-8")
                # Avoid double-decoding package descriptions in build_skeleton_apt,
                # which causes an error with newer python-apt (Xenial onwards)
                if not isinstance(skeleton.description, unicode):
                    skeleton.description = skeleton.description.decode("utf-8")
        return skeleton
    except UnicodeError:
        return skeleton

After that, you can run sudo landscape-package-reporter again.

Posted by Uli Köhler in Linux, Python

How to fix Angular 9 @ViewChild Expected 2 arguments, but got 1: An argument for ‘opts’ was not provided.

Problem:

You are trying to compile your Angular 9.x application, but you see an error message like

app/my-component/my-component.component.ts:24:4 - error TS2554: Expected 2 arguments, but got 1.

24   @ViewChild(MyOtherComponent) myOtherComponent: MyOtherComponent;
      ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

  ../node_modules/@angular/core/core.d.ts:7888:47
    7888     (selector: Type<any> | Function | string, opts: {
                                                       ~~~~~~~
    7889         read?: any;
         ~~~~~~~~~~~~~~~~~~~
    7890         static: boolean;
         ~~~~~~~~~~~~~~~~~~~~~~~~
    7891     }): any;
         ~~~~~
    An argument for 'opts' was not provided.

Solution:

Find this line in your code at the location specified in the error message:

@ViewChild(MyOtherComponent) myOtherComponent: MyOtherComponent;

and add

{static: false}

as second argument to the @ViewChild() declaration:

@ViewChild(MyOtherComponent, {static: false}) myOtherComponent: MyOtherComponent;

In most cases, you want to use static: false. See this post on StackOverflow for details on when to use static: true as opposed to static: false.

Posted by Uli Köhler in Angular, Javascript, Typescript