Postleitzahlen und Koordinaten von GeoNames parsen

Daten herunterladen

example.sh
wget https://download.geonames.org/export/dump/DE.zip
unzip DE.zip

Daten parsen

parse_de.py
#!/usr/bin/env python3
"""Simple parser for DE.txt (tab-delimited geonames-like file).

Fields (tab-separated, UTF-8):
 0 country code
 1 postal code
 2 place name
 3 admin name1
 4 admin code1
 5 admin name2
 6 admin code2
 7 admin name3
 8 admin code3
 9 latitude
10 longitude
11 accuracy

This script exposes functions to iterate parsed rows as dataclasses or dicts
and a small CLI to print counts or write JSON/CSV output.
"""

from __future__ import annotations
import csv
import json
from dataclasses import dataclass, asdict
from typing import Optional, Iterator
import argparse
from typing import List


@dataclass
class DERecord:
    country_code: str
    postal_code: str
    place_name: str
    admin_name1: str
    admin_code1: str
    admin_name2: str
    admin_code2: str
    admin_name3: str
    admin_code3: str
    latitude: Optional[float]
    longitude: Optional[float]
    accuracy: Optional[int]


def _to_float(val: str) -> Optional[float]:
    if val is None:
        return None
    s = val.strip()
    if s == "":
        return None
    try:
        return float(s)
    except ValueError:
        return None


def _to_int(val: str) -> Optional[int]:
    if val is None:
        return None
    s = val.strip()
    if s == "":
        return None
    try:
        return int(s)
    except ValueError:
        return None


def parse_row(row: list[str]) -> Optional[DERecord]:
    """Parse a CSV row (list of strings) into DERecord.

    The parser is robust to rows that have extra tabs inside the place name
    by mapping fields from the end and joining any middle columns into
    the place_name.
    """
    if not row:
        return None
    # need at least country, postal, place, admin_name1, admin_code1,
    # admin_name2, admin_code2, admin_name3, admin_code3, lat, lon -> that's 11
    if len(row) < 11:
        return None

    # Map from end to support place names containing tabs
    # last fields: ... admin_name1, admin_code1, admin_name2, admin_code2,
    # admin_name3, admin_code3, latitude, longitude, accuracy
    # We'll reference from the end for fixed positions.
    # If len is exactly 12, the normal mapping applies.
    try:
        country_code = row[0].strip()
        postal_code = row[1].strip()
        # place name is everything between index 2 and the index where admin_name1 starts
        # admin_name1 is at index: len(row) - 9
        admin_name1_idx = len(row) - 9
        if admin_name1_idx <= 2:
            # malformed
            return None
        place_name = "\t".join(part.strip() for part in row[2:admin_name1_idx])
        admin_name1 = row[admin_name1_idx].strip()
        admin_code1 = row[admin_name1_idx + 1].strip()
        admin_name2 = row[admin_name1_idx + 2].strip()
        admin_code2 = row[admin_name1_idx + 3].strip()
        admin_name3 = row[admin_name1_idx + 4].strip()
        admin_code3 = row[admin_name1_idx + 5].strip()
        latitude = _to_float(row[admin_name1_idx + 6])
        longitude = _to_float(row[admin_name1_idx + 7])
        # accuracy may be missing or empty
        accuracy = _to_int(row[admin_name1_idx + 8]) if (admin_name1_idx + 8) < len(row) else None
    except Exception:
        return None

    return DERecord(
        country_code=country_code,
        postal_code=postal_code,
        place_name=place_name,
        admin_name1=admin_name1,
        admin_code1=admin_code1,
        admin_name2=admin_name2,
        admin_code2=admin_code2,
        admin_name3=admin_name3,
        admin_code3=admin_code3,
        latitude=latitude,
        longitude=longitude,
        accuracy=accuracy,
    )


def iter_de_file(path: str) -> Iterator[DERecord]:
    """Yield DERecord objects from a tab-delimited file path.

    Opens the file using UTF-8 encoding and uses csv.reader with delimiter '\t'.
    """
    with open(path, newline="", encoding="utf-8") as fh:
        reader = csv.reader(fh, delimiter="\t")
        for row in reader:
            rec = parse_row(row)
            if rec is not None:
                yield rec


def parse_de_to_dataframe(path: str = "DE.txt", limit: int = 0):
    """Parse DE.txt into a pandas DataFrame. Always defaults to filename 'DE.txt'.

    The function collects records using the existing robust parser and
    converts them into a pandas.DataFrame.
    """
    try:
        import pandas as pd
    except Exception as e:
        raise RuntimeError("pandas is required to convert to DataFrame. Install with: pip install pandas") from e

    rows: List[dict] = []
    count = 0
    for rec in iter_de_file(path):
        rows.append(asdict(rec))
        count += 1
        if limit and count >= limit:
            break

    df = pd.DataFrame(rows)
    return df



def main() -> None:
    p = argparse.ArgumentParser(description="Parse DE.txt (tab-delimited) into a pandas DataFrame. Always uses file 'DE.txt'.")
    p.add_argument("--output-json", "-j", help="Write JSON lines to this file")
    p.add_argument("--limit", "-n", type=int, default=0, help="Limit number of rows to parse (0 = all)")
    p.add_argument("--count-only", "-c", action="store_true", help="Only print the number of parsed rows")
    args = p.parse_args()

    path = "DE.txt"

    # Build DataFrame
    try:
        df = parse_de_to_dataframe(path=path, limit=args.limit)
    except RuntimeError as re:
        print(str(re))
        return

    # If requested, write JSON lines using DataFrame rows
    if args.output_json:
        with open(args.output_json, "w", encoding="utf-8") as out_fh:
            for row in df.to_dict(orient="records"):
                out_fh.write(json.dumps(row, ensure_ascii=False) + "\n")

    # Print count or dataframe as requested
    if args.count_only:
        print(len(df))
    else:
        # Usage example: print the dataframe (show the first 20 rows)
        try:
            import pandas as pd
            # make display readable in terminals: show all columns, limit width
            pd.set_option('display.max_columns', None)
            pd.set_option('display.width', 160)
            pd.set_option('display.max_colwidth', 60)
            print(df.head(20).to_string(index=False))
        except Exception:
            # fallback
            print(df.head(20))


if __name__ == "__main__":
    main()

You can run the script as follows:

example.sh
python parse_de.py

Check out similar posts by category: Geoinformatics, Python