Algorithm to find a minimal set of postal codes to cover a country (almost) completely
This post shows how to select a minimal set of postal codes (PLZ) to cover a country almost completely, using Python and some common libraries. The example uses Germany, but the approach can be adapted to other countries with similar postal code systems.
The model we’re using here is that a constant-radius circle is drawn around each selected postal code (= centre), and the goal is to cover as much of the country’s area as possible with as few circles as possible. This is a classic optimization problem, often referred to as the “set cover problem”, which is NP-hard. Therefore, we use a greedy approximation algorithm with some enhancements to find a good solution in a reasonable time.
This algorithm can easily be adapted for other countries and also for other sets of POIs.
This is primarily a stochastic algorithm, so it might not produce the absolutely optimal result, but it should be very close to optimal in most cases. Additionally, it runs in seconds even on a standard laptop (with the parameters like in the example below).
Since this code reads in the PLZ table as parquet
file, see our previous post Postleitzahlen und Koordinaten von GeoNames parsen
for the code and run it with python parse_de.py -p postleitzahlen.parquet
to create the required file.
Results
Using the example usage below, we end up selecting 429 centres
(postal codes) to cover 99.5%
of Germany with a radius of 35 km
around each selected postal code.
Full source code
__author__ = "Uli Köhler"
__copyright__ = "Copyright 2025, Uli Köhler"
__license__ = "CC0 1.0 Universal (CC0 1.0) Public Domain Dedication"
import geopandas as gpd
import pandas as pd
import numpy as np
from shapely.geometry import Point
from shapely.ops import unary_union
import matplotlib.pyplot as plt
from tqdm.auto import tqdm
import cartopy.crs as ccrs
import cartopy.feature as cfeature
from cartopy.feature import ShapelyFeature
from sklearn.neighbors import KDTree
import cartopy.io.shapereader as shpreader
# Inlined PLZ and country loaders (self-contained)
def load_plz_data(plz_file_path: str) -> pd.DataFrame:
"""Load postal-code tabular data from a file path (parquet or CSV)."""
if plz_file_path.endswith('.parquet') or plz_file_path.endswith('.parq'):
return pd.read_parquet(plz_file_path)
if plz_file_path.endswith('.csv') or plz_file_path.endswith('.txt'):
return pd.read_csv(plz_file_path)
# Try common readers as a fallback
try:
return pd.read_parquet(plz_file_path)
except Exception:
try:
return pd.read_csv(plz_file_path)
except Exception as exc:
raise RuntimeError(f'Could not read PLZ data from {plz_file_path}: {exc}') from exc
def create_plz_points(plz_df: pd.DataFrame) -> gpd.GeoDataFrame:
"""Create a GeoDataFrame of PLZ points from a DataFrame.
Expects longitude/latitude in columns named one of ('lon','lng','longitude') and ('lat','latitude').
Also ensures a 'PLZ' column exists by copying common postal-code columns or falling back to the index.
"""
df = plz_df.copy()
# tolerant column lookup for coordinates
lon_cols = [c for c in df.columns if c.lower() in ('lon', 'lng', 'longitude')]
lat_cols = [c for c in df.columns if c.lower() in ('lat', 'latitude')]
if not lon_cols or not lat_cols:
raise ValueError('PLZ dataframe must contain lon and lat columns (e.g. lon, lat)')
lon_col = lon_cols[0]
lat_col = lat_cols[0]
df = df.dropna(subset=[lon_col, lat_col]).copy()
geometry = [Point(xy) for xy in zip(df[lon_col].astype(float), df[lat_col].astype(float))]
gdf = gpd.GeoDataFrame(df, geometry=geometry, crs="EPSG:4326")
# Keep conventional column names for downstream code
if 'lon' not in gdf.columns:
gdf['lon'] = gdf[lon_col].astype(float)
if 'lat' not in gdf.columns:
gdf['lat'] = gdf[lat_col].astype(float)
# Ensure a PLZ column exists (postal code). Try common names first, else fallback to index as string
plz_cols = [c for c in gdf.columns if c.lower() in ('plz', 'postal_code', 'postal', 'postleitzahl')]
if plz_cols:
gdf['PLZ'] = gdf[plz_cols[0]].astype(str)
else:
# If there's a numeric column named like 'zip' or 'postcode', pick it
zip_cols = [c for c in gdf.columns if c.lower() in ('zip', 'postcode')]
if zip_cols:
gdf['PLZ'] = gdf[zip_cols[0]].astype(str)
else:
# fallback: create PLZ from index to have a stable identifier
gdf = gdf.reset_index(drop=False)
gdf['PLZ'] = gdf['index'].astype(str)
gdf.drop(columns=['index'], inplace=True)
return gdf
def load_germany_from_natural_earth() -> object:
"""Read 10m Natural Earth countries and return Germany geometry (shapely)"""
# Use cartopy's shapereader path and geopandas to load the 10m admin_0_countries shapefile
shp_path = shpreader.natural_earth(resolution='10m', category='cultural', name='admin_0_countries')
countries = gpd.read_file(shp_path)
# try common ISO column names for matching Germany
iso_cols = [c for c in ('ISO_A3', 'ISO3', 'adm0_a3', 'ADM0_A3', 'iso_a3') if c in countries.columns]
if iso_cols:
germany = countries[countries[iso_cols[0]] == 'DEU']
else:
# fallback: try matching by NAME or SOVEREIGNT
name_cols = [c for c in ('NAME_EN', 'NAME', 'SOVEREIGNT', 'ADMIN') if c in countries.columns]
if name_cols:
germany = countries[countries[name_cols[0]].str.contains('Germany', case=False, na=False)]
else:
raise RuntimeError('Could not determine ISO or name column in Natural Earth countries')
if germany.empty:
raise RuntimeError('Germany geometry not found in Natural Earth countries')
return germany.iloc[0].geometry
def filter_plz_in_germany(plz_gdf, germany_geometry):
"""Filter PLZ points that are within Germany"""
plz_gdf = plz_gdf.to_crs("EPSG:3857") # Web Mercator for distance calculations
germany_geometry = gpd.GeoSeries([germany_geometry], crs="EPSG:4326").to_crs("EPSG:3857")[0]
# Create a buffer around Germany to include PLZ near the border
buffered_germany = germany_geometry.buffer(50000) # 50km buffer
# Filter PLZ within buffered Germany
plz_in_germany = plz_gdf[plz_gdf.intersects(buffered_germany)]
return plz_in_germany
def calculate_coverage(selected_plz, germany_geometry, radius_km=30):
"""Calculate what percentage of Germany is covered by the selected PLZ"""
radius_m = radius_km * 1000
coverage = unary_union(selected_plz.geometry.buffer(radius_m))
germany_geometry = gpd.GeoSeries([germany_geometry], crs="EPSG:4326").to_crs("EPSG:3857")[0]
covered_area = germany_geometry.intersection(coverage).area
total_area = germany_geometry.area
return covered_area / total_area * 100
def _generate_grid_points_in_polygon(polygon_3857, step_m: float) -> np.ndarray:
"""Generate a regular grid of points (x, y) within the given polygon (EPSG:3857)."""
xmin, ymin, xmax, ymax = polygon_3857.bounds
xs = np.arange(xmin, xmax + step_m, step_m)
ys = np.arange(ymin, ymax + step_m, step_m)
points = []
# Simple loop is OK for ~20k-60k points
for x in xs:
for y in ys:
p = Point(x, y)
if polygon_3857.contains(p):
points.append((x, y))
if not points:
return np.empty((0, 2), dtype=float)
return np.array(points, dtype=float)
def select_plz_max_coverage(
plz_gdf: gpd.GeoDataFrame,
germany_geometry,
radius_km: float = 30,
coverage_target: float = 0.99,
sample_size: int = 300,
grid_step_km: float = 5.0,
exclusion_factor: float = 0.75,
max_steps: int = 10000,
patience: int = 500,
validate_every: int = 50,
random_state: int = 42,
verbose: bool = True
) -> gpd.GeoDataFrame:
"""
Single algorithm to select PLZ points maximizing covered area (approx).
Uses a regular grid of points inside Germany (EPSG:3857) as an area proxy.
Greedy randomized: at each step sample a subset of remaining candidates and pick
the one that covers the most currently-uncovered grid points within radius.
Enhancements:
- Periodic exact coverage checks (unary_union) to avoid premature stopping.
- Exclusion radius to suppress clustered selections: after picking a center,
drop any remaining PLZ within (exclusion_factor * radius_km).
Returns: GeoDataFrame (EPSG:3857) of selected PLZ points.
"""
rng = np.random.default_rng(random_state)
# Ensure projected CRS for distance/area and prepare geometry
work_gdf = plz_gdf.to_crs("EPSG:3857").copy()
germany_3857 = gpd.GeoSeries([germany_geometry], crs="EPSG:4326").to_crs("EPSG:3857")[0]
radius_m = float(radius_km) * 1000.0
exclusion_m = float(exclusion_factor) * radius_m
grid_step_m = float(grid_step_km) * 1000.0
# 1) Build grid points inside Germany as coverage proxy
if verbose:
print(f"Building {grid_step_km:.1f} km grid over Germany for coverage approximation...")
grid_xy = _generate_grid_points_in_polygon(germany_3857, step_m=grid_step_m)
if grid_xy.shape[0] == 0:
raise RuntimeError("Failed to generate grid points inside Germany polygon.")
if verbose:
print(f"Grid points: {grid_xy.shape[0]:,}")
# 2) KDTree on grid for fast radius queries
tree_grid = KDTree(grid_xy, leaf_size=40, metric='euclidean')
# 3) Precompute neighbor grid indices for each PLZ candidate
plz_xy = np.column_stack([work_gdf.geometry.x.values, work_gdf.geometry.y.values])
if verbose:
print("Precomputing candidate-to-grid coverage neighborhoods...")
neighbor_lists = tree_grid.query_radius(plz_xy, r=radius_m, return_distance=False)
# 3b) KDTree on PLZ candidates for exclusion filtering
tree_plz = KDTree(plz_xy, leaf_size=40, metric='euclidean')
# 4) Greedy randomized selection loop over uncovered grid mask
uncovered = np.ones(grid_xy.shape[0], dtype=bool)
selected_mask = np.zeros(len(work_gdf), dtype=bool)
active_mask = np.ones(len(work_gdf), dtype=bool) # candidates still available
no_improve_steps = 0
pbar = tqdm(total=max_steps, disable=not verbose, desc="Selecting centers (max-coverage)")
for step in range(max_steps):
pbar.update(1)
# Periodic exact coverage check
if step % validate_every == 0 and selected_mask.any():
selected_tmp = work_gdf.loc[selected_mask]
exact_cov = calculate_coverage(selected_tmp, germany_geometry, radius_km) / 100.0
if step % (validate_every * 2) == 0:
pbar.set_postfix_str(f"exact~{exact_cov*100:.1f}%")
if exact_cov >= coverage_target:
break
# Approximate coverage fraction on grid
covered_frac = 1.0 - (uncovered.sum() / uncovered.size)
if covered_frac >= coverage_target and selected_mask.any():
selected_tmp = work_gdf.loc[selected_mask]
exact_cov = calculate_coverage(selected_tmp, germany_geometry, radius_km) / 100.0
if exact_cov >= coverage_target:
break
remaining_idx = np.flatnonzero(active_mask)
if remaining_idx.size == 0:
break
# Sample candidates from active set
k = int(min(sample_size, remaining_idx.size))
sample = rng.choice(remaining_idx, size=k, replace=False) if k > 0 else []
# Evaluate marginal gain on grid
best_gain = 0
best_idx = None
for idx in sample:
neigh = neighbor_lists[idx]
if neigh.size == 0:
continue
gain = int(uncovered[neigh].sum())
if gain > best_gain:
best_gain = gain
best_idx = idx
if best_idx is None or best_gain == 0:
no_improve_steps += 1
if no_improve_steps >= patience:
break
else:
continue
# Accept best candidate
selected_mask[best_idx] = True
# Mark grid points as covered
uncovered[neighbor_lists[best_idx]] = False
# Exclude nearby PLZ to reduce clustering
if exclusion_m > 0:
near = tree_plz.query_radius(plz_xy[[best_idx]], r=exclusion_m, return_distance=False)[0]
active_mask[near] = False
# Also remove the chosen index
active_mask[best_idx] = False
no_improve_steps = 0
pbar.close()
selected = work_gdf.loc[selected_mask].copy()
selected = selected.set_crs("EPSG:3857")
return selected
def visualize_coverage(selected_plz, germany_geometry, radius_km=30):
"""Visualize the coverage using Cartopy"""
# Convert to WGS84 for visualization
selected_plz = selected_plz.to_crs("EPSG:4326")
germany_geometry = gpd.GeoSeries([germany_geometry], crs="EPSG:4326").to_crs("EPSG:4326")[0]
# Create figure with Cartopy projection
fig = plt.figure(figsize=(12, 12))
ax = fig.add_subplot(1, 1, 1, projection=ccrs.EqualEarth())
# Add map features
ax.add_feature(cfeature.LAND, facecolor='lightgray')
ax.add_feature(cfeature.OCEAN, facecolor='lightblue')
ax.add_feature(cfeature.COASTLINE, linewidth=0.5)
ax.add_feature(cfeature.BORDERS, linestyle=':', linewidth=0.5)
# Create ShapelyFeature for Germany
germany_feature = ShapelyFeature([germany_geometry], ccrs.PlateCarree(), facecolor='none', edgecolor='red', linewidth=1)
ax.add_feature(germany_feature)
# Create and plot circles (ALL PLZ now)
radius_m = radius_km * 1000
buffers_3857 = gpd.GeoSeries(selected_plz.geometry).to_crs("EPSG:3857").buffer(radius_m)
buffers_4326 = gpd.GeoSeries(buffers_3857, crs="EPSG:3857").to_crs("EPSG:4326")
# Draw each buffer individually so overlaps render as layered transparencies
for geom in buffers_4326:
ax.add_geometries([geom], crs=ccrs.PlateCarree(), facecolor='blue', edgecolor='none', alpha=0.2, zorder=2)
# Plot selected PLZ points on top
ax.scatter(selected_plz.geometry.x, selected_plz.geometry.y, color='red', s=5, transform=ccrs.PlateCarree(), label='Selected PLZ', zorder=3)
# Set extent to Germany with some padding
ax.set_extent([4, 16, 47, 56], crs=ccrs.PlateCarree())
# Add gridlines
gl = ax.gridlines(draw_labels=True, linestyle='--')
gl.top_labels = False
gl.right_labels = False
plt.title(f"PLZ Coverage of Germany with {len(selected_plz)} centers ({radius_km}km radius)")
plt.legend(loc='upper right')
plt.show()
def run_analysis(plz_file_path, radius_km=30, coverage_target=0.99):
"""Main function for the analysis using a single max-coverage algorithm"""
# Load Germany geometry from Natural Earth
print("Loading Germany geometry from Natural Earth...")
germany_geometry = load_germany_from_natural_earth()
# Load and process PLZ data
print("Loading PLZ data...")
plz_df = load_plz_data(plz_file_path)
print(f"Loaded {len(plz_df)} German postal codes")
plz_gdf = create_plz_points(plz_df)
print("Filtering PLZ within Germany...")
plz_in_germany = filter_plz_in_germany(plz_gdf, germany_geometry)
print(f"Found {len(plz_in_germany)} PLZ within or near Germany")
# Select PLZ centers using single algorithm
print("Selecting PLZ centers with max-coverage algorithm...")
selected_plz = select_plz_max_coverage(
plz_in_germany, germany_geometry, radius_km=radius_km, coverage_target=coverage_target, sample_size=300, grid_step_km=2.0, exclusion_factor=0.75, max_steps=10000, patience=500, validate_every=50, random_state=42, verbose=True
)
# Calculate coverage (exact, single union)
coverage_percent = calculate_coverage(selected_plz, germany_geometry, radius_km)
print(f"\nSelected {len(selected_plz)} PLZ centers covering {coverage_percent:.1f}% of Germany")
# Visualize coverage
visualize_coverage(selected_plz, germany_geometry=germany_geometry, radius_km=radius_km)
# Return results defensively: only include columns that exist
sel = selected_plz.to_crs("EPSG:4326")
desired = ['PLZ', 'lat', 'lon', 'geometry']
available = [c for c in desired if c in sel.columns or c == 'geometry']
# Ensure geometry is present
if 'geometry' not in available:
available.append('geometry')
return {
'selected_plz': sel[available],
'coverage_percent': coverage_percent,
'total_plz_used': len(selected_plz)
}
This code is suitable for Jupyter Notebooks.
Example usage
results = run_analysis(
plz_file_path="postleitzahlen.parquet",
radius_km=35,
coverage_target=0.995
)
How to find the nearest resulting PLZ codes to a given code
This optional addition takes a reference PLZ code of some place and generates a DataFrame of the next closest 50 selected PLZs. This assumes you’ve used a Jupyter Notebook to run the previous code.
# Cell: find the 50 selected PLZ nearest to a given PLZ (example: 63110)
plz_code = '10176'
# Ensure we have the selected results available; if not, run the analysis once (lightweight guard)
if 'results' not in globals():
print('`results` not found in the notebook namespace — running `run_analysis` (this may take time)')
results = run_analysis(plz_file_path="postleitzahlen.parquet", radius_km=35, coverage_target=0.995)
selected = results['selected_plz']
# Load full PLZ table to locate the reference PLZ coordinates
plz_df = load_plz_data("postleitzahlen.parquet")
plz_gdf = create_plz_points(plz_df)
# Try exact match on PLZ, else prefix match
ref = plz_gdf[plz_gdf['PLZ'].astype(str) == str(plz_code)]
if ref.empty:
ref = plz_gdf[plz_gdf['PLZ'].astype(str).str.startswith(str(plz_code))]
if ref.empty:
raise ValueError(f"Reference PLZ {plz_code} not found in PLZ dataset")
ref_point = ref.iloc[0].geometry
# Reproject both to a projected CRS for metric distances (EPSG:3857)
selected_3857 = selected.to_crs("EPSG:3857").copy()
ref_3857 = gpd.GeoSeries([ref_point], crs="EPSG:4326").to_crs("EPSG:3857")[0]
# Compute distances (meters)
selected_3857['distance_m'] = selected_3857.geometry.distance(ref_3857)
# If the reference PLZ itself is in the selected set, exclude it so we get the nearest other centers
if 'PLZ' in selected_3857.columns:
selected_for_sort = selected_3857[selected_3857['PLZ'].astype(str) != str(plz_code)]
else:
selected_for_sort = selected_3857
# Take 50 nearest
nearest = selected_for_sort.nsmallest(50, 'distance_m').copy()
# Convert back to geographic coords and prepare a friendly DataFrame
nearest_geo = nearest.to_crs("EPSG:4326")
nearest_geo['lat'] = nearest_geo.geometry.y
nearest_geo['lon'] = nearest_geo.geometry.x
nearest_geo['distance_km'] = (nearest_geo['distance_m'] / 1000.0).round(3)
# Choose columns to show: prefer human fields if present
prefer_columns = ['PLZ', 'name', 'place', 'Ortsteil', 'stadtteil', 'place_name', 'lat', 'lon', 'distance_km']
available = [c for c in prefer_columns if c in nearest_geo.columns]
# if none of the preferred name columns exist, show all non-geometry columns
if not available:
available = [c for c in nearest_geo.columns if c != 'geometry']
nearest_50_df = nearest_geo[available].reset_index(drop=True)
print(f"Nearest {len(nearest_50_df)} selected PLZs to {plz_code} (top {min(50, len(nearest_50_df))}):")
nearest_50_df.head(50) # display the DataFrame