Postleitzahlen und Koordinaten von GeoNames parsen
Daten herunterladen
example.sh
wget https://download.geonames.org/export/dump/DE.zip
unzip DE.zipDaten parsen
parse_de.py
#!/usr/bin/env python3
"""Simple parser for DE.txt (tab-delimited geonames-like file).
Fields (tab-separated, UTF-8):
0 country code
1 postal code
2 place name
3 admin name1
4 admin code1
5 admin name2
6 admin code2
7 admin name3
8 admin code3
9 latitude
10 longitude
11 accuracy
This script exposes functions to iterate parsed rows as dataclasses or dicts
and a small CLI to print counts or write JSON/CSV output.
"""
from __future__ import annotations
import csv
import json
from dataclasses import dataclass, asdict
from typing import Optional, Iterator
import argparse
from typing import List
@dataclass
class DERecord:
country_code: str
postal_code: str
place_name: str
admin_name1: str
admin_code1: str
admin_name2: str
admin_code2: str
admin_name3: str
admin_code3: str
latitude: Optional[float]
longitude: Optional[float]
accuracy: Optional[int]
def _to_float(val: str) -> Optional[float]:
if val is None:
return None
s = val.strip()
if s == "":
return None
try:
return float(s)
except ValueError:
return None
def _to_int(val: str) -> Optional[int]:
if val is None:
return None
s = val.strip()
if s == "":
return None
try:
return int(s)
except ValueError:
return None
def parse_row(row: list[str]) -> Optional[DERecord]:
"""Parse a CSV row (list of strings) into DERecord.
The parser is robust to rows that have extra tabs inside the place name
by mapping fields from the end and joining any middle columns into
the place_name.
"""
if not row:
return None
# need at least country, postal, place, admin_name1, admin_code1,
# admin_name2, admin_code2, admin_name3, admin_code3, lat, lon -> that's 11
if len(row) < 11:
return None
# Map from end to support place names containing tabs
# last fields: ... admin_name1, admin_code1, admin_name2, admin_code2,
# admin_name3, admin_code3, latitude, longitude, accuracy
# We'll reference from the end for fixed positions.
# If len is exactly 12, the normal mapping applies.
try:
country_code = row[0].strip()
postal_code = row[1].strip()
# place name is everything between index 2 and the index where admin_name1 starts
# admin_name1 is at index: len(row) - 9
admin_name1_idx = len(row) - 9
if admin_name1_idx <= 2:
# malformed
return None
place_name = "\t".join(part.strip() for part in row[2:admin_name1_idx])
admin_name1 = row[admin_name1_idx].strip()
admin_code1 = row[admin_name1_idx + 1].strip()
admin_name2 = row[admin_name1_idx + 2].strip()
admin_code2 = row[admin_name1_idx + 3].strip()
admin_name3 = row[admin_name1_idx + 4].strip()
admin_code3 = row[admin_name1_idx + 5].strip()
latitude = _to_float(row[admin_name1_idx + 6])
longitude = _to_float(row[admin_name1_idx + 7])
# accuracy may be missing or empty
accuracy = _to_int(row[admin_name1_idx + 8]) if (admin_name1_idx + 8) < len(row) else None
except Exception:
return None
return DERecord(
country_code=country_code,
postal_code=postal_code,
place_name=place_name,
admin_name1=admin_name1,
admin_code1=admin_code1,
admin_name2=admin_name2,
admin_code2=admin_code2,
admin_name3=admin_name3,
admin_code3=admin_code3,
latitude=latitude,
longitude=longitude,
accuracy=accuracy,
)
def iter_de_file(path: str) -> Iterator[DERecord]:
"""Yield DERecord objects from a tab-delimited file path.
Opens the file using UTF-8 encoding and uses csv.reader with delimiter '\t'.
"""
with open(path, newline="", encoding="utf-8") as fh:
reader = csv.reader(fh, delimiter="\t")
for row in reader:
rec = parse_row(row)
if rec is not None:
yield rec
def parse_de_to_dataframe(path: str = "DE.txt", limit: int = 0):
"""Parse DE.txt into a pandas DataFrame. Always defaults to filename 'DE.txt'.
The function collects records using the existing robust parser and
converts them into a pandas.DataFrame.
"""
try:
import pandas as pd
except Exception as e:
raise RuntimeError("pandas is required to convert to DataFrame. Install with: pip install pandas") from e
rows: List[dict] = []
count = 0
for rec in iter_de_file(path):
rows.append(asdict(rec))
count += 1
if limit and count >= limit:
break
df = pd.DataFrame(rows)
return df
def main() -> None:
p = argparse.ArgumentParser(description="Parse DE.txt (tab-delimited) into a pandas DataFrame. Always uses file 'DE.txt'.")
p.add_argument("--output-json", "-j", help="Write JSON lines to this file")
p.add_argument("--limit", "-n", type=int, default=0, help="Limit number of rows to parse (0 = all)")
p.add_argument("--count-only", "-c", action="store_true", help="Only print the number of parsed rows")
args = p.parse_args()
path = "DE.txt"
# Build DataFrame
try:
df = parse_de_to_dataframe(path=path, limit=args.limit)
except RuntimeError as re:
print(str(re))
return
# If requested, write JSON lines using DataFrame rows
if args.output_json:
with open(args.output_json, "w", encoding="utf-8") as out_fh:
for row in df.to_dict(orient="records"):
out_fh.write(json.dumps(row, ensure_ascii=False) + "\n")
# Print count or dataframe as requested
if args.count_only:
print(len(df))
else:
# Usage example: print the dataframe (show the first 20 rows)
try:
import pandas as pd
# make display readable in terminals: show all columns, limit width
pd.set_option('display.max_columns', None)
pd.set_option('display.width', 160)
pd.set_option('display.max_colwidth', 60)
print(df.head(20).to_string(index=False))
except Exception:
# fallback
print(df.head(20))
if __name__ == "__main__":
main()You can run the script as follows:
example.sh
python parse_de.pyCheck out similar posts by category:
Geoinformatics, Python
If this post helped you, please consider buying me a coffee or donating via PayPal to support research & publishing of new posts on TechOverflow