10 The Complete Pipeline Code (Python)
This appendix reproduces the full, runnable Python pipeline behind the worked example: the actual source, not a simplified sketch. It is generated directly from the pipeline/ package in the companion repository, so it always matches the code that produced every figure and number in this guide.
The pipeline is built on the sdc-redistribute package and runs end to end with a single command:
uv run python -m pipeline.runThe modules below appear in execution order. Figure generation (pipeline/style.py, pipeline/maps.py, pipeline/figures.py) lives in the same repository and is omitted here to keep the focus on the data method.
10.1 Configuration
Central settings: the Arlington FIPS codes, the ACS count variables (aggregate income and households), and the dataset paths every other module reads.
pipeline/config.py
"""Central configuration: geographies, CRS, source identifiers, paths."""
from pathlib import Path
from dotenv import load_dotenv
# Load secrets (e.g. CENSUS_API_KEY) from a local .env if present. The file is
# gitignored; loading it here makes the key available to any pipeline module.
load_dotenv()
# Arlington County, Virginia
STATE_FIPS = "51"
COUNTY_FIPS = "013"
ACS_YEAR = 2021 # 5-year ACS vintage used throughout
OOKLA_YEAR = 2023
OOKLA_QUARTER = 2
# Area weighting is handled inside sdc-redistribute, which reprojects geographic
# inputs to EPSG:3857 before computing intersection areas. At county scale the
# choice of projected CRS is immaterial (<0.3% area-ratio spread vs State Plane),
# so the pipeline does not impose its own projected CRS.
# ACS variables are EXTENSIVE COUNTS (never redistribute the median directly).
# Mean household income is derived later as agg_income / households.
ACS_VARS = {
"agg_income": "B19025_001", # Aggregate household income (dollars)
"households": "B11001_001", # Total households (count)
}
# Ookla open data S3 parquet path template
OOKLA_S3 = (
"s3://ookla-open-data/parquet/performance/type=fixed/"
"year={year}/quarter={quarter}/"
"{year}-{month:02d}-01_performance_fixed_tiles.parquet"
)
# Paths
ROOT = Path(__file__).resolve().parent.parent
DATA = ROOT / "data"
# Source / output dataset paths
CIVIC_ASSOC = DATA / "va013_geo_arl_2021_civic_associations.geojson"
BLOCK_GROUPS = DATA / "va013_block_groups.geojson" # produced by acquire_geographies
BLOCKS = DATA / "va013_geo_blocks.geojson" # existing
ACS_COUNTS = DATA / "va013_acs_income_counts.geojson" # produced by acquire_acs
OOKLA_TILES = DATA / "ookla_tiles_arlington.geojson" # existing/refreshable
CIVIC_INCOME = DATA / "civic_income.csv" # produced by redistribute_income
CIVIC_BROADBAND = DATA / "civic_broadband.csv" # produced by redistribute_broadband
CIVIC_COMBINED = DATA / "civic_combined.geojson" # produced by combine
PARCELS = DATA / "va013_parcels.geojson" # produced by acquire_parcels
CIVIC_INCOME_PARCELS = DATA / "civic_income_parcels.csv" # produced by redistribute_income_parcels
CIVIC_INCOME_COMPARISON = DATA / "civic_income_comparison.geojson" # produced by compare_methods10.2 Acquire census geographies
Download Arlington’s census block groups, the source geography for the income redistribution.
pipeline/acquire_geographies.py
"""Acquire Arlington census block-group geometries via pygris."""
from __future__ import annotations
import geopandas as gpd
from pygris import block_groups
from pipeline import config
def fetch_block_groups() -> gpd.GeoDataFrame:
"""Download Arlington County (VA/013) block groups for the ACS vintage."""
bg = block_groups(
state=config.STATE_FIPS,
county=config.COUNTY_FIPS,
year=config.ACS_YEAR,
cb=True,
)
return bg
def main() -> None:
bg = fetch_block_groups()
bg.to_file(config.BLOCK_GROUPS, driver="GeoJSON")
print(f"Wrote {len(bg)} block groups → {config.BLOCK_GROUPS}")
if __name__ == "__main__":
main()10.3 Acquire ACS income counts
Pull aggregate household income and household counts from the Census API. These are counts (extensive measures); mean income is derived later.
pipeline/acquire_acs.py
"""Acquire Arlington ACS aggregate income + household counts (block group)."""
from __future__ import annotations
import os
import geopandas as gpd
import pandas as pd
from census import Census
from pipeline import config
from pipeline.acquire_geographies import fetch_block_groups
def _sanitize_counts(df: pd.DataFrame) -> pd.DataFrame:
"""Replace Census negative sentinels (e.g. -666666666) with 0.
Aggregate income and household counts are non-negative by definition; the
Census API returns large negative sentinel codes for suppressed or
not-applicable cells (typically zero-household block groups). Left in place,
these sentinels would propagate through area-weighted redistribution and
produce negative incomes. Treating them as 0 is correct for the
zero-household areas they mark.
"""
out = df.copy()
for col in ("agg_income", "households"):
out[col] = out[col].mask(out[col] < 0, 0)
return out
def fetch_acs_counts(api_key: str | None = None) -> pd.DataFrame:
"""Return a DataFrame: GEOID, agg_income, households for Arlington block groups."""
key = api_key or os.environ.get("CENSUS_API_KEY")
c = Census(key, year=config.ACS_YEAR)
# The census library needs the API estimate suffix "E" on each variable
# (e.g. B19025_001E), unlike tidycensus which strips it.
agg_var = config.ACS_VARS["agg_income"] + "E"
hh_var = config.ACS_VARS["households"] + "E"
rows = c.acs5.state_county_blockgroup(
fields=("NAME", agg_var, hh_var),
state_fips=config.STATE_FIPS,
county_fips=config.COUNTY_FIPS,
blockgroup=Census.ALL,
tract=Census.ALL,
)
df = pd.DataFrame(rows)
df["GEOID"] = df["state"] + df["county"] + df["tract"] + df["block group"]
df = df.rename(columns={agg_var: "agg_income", hh_var: "households"})
df = _sanitize_counts(df)
return df[["GEOID", "agg_income", "households"]]
def main() -> None:
counts = fetch_acs_counts()
bg = fetch_block_groups()[["GEOID", "geometry"]]
merged = bg.merge(counts, on="GEOID", how="left")
gdf = gpd.GeoDataFrame(merged, geometry="geometry", crs=bg.crs)
gdf.to_file(config.ACS_COUNTS, driver="GeoJSON")
print(f"Wrote {len(gdf)} block groups with income counts → {config.ACS_COUNTS}")
if __name__ == "__main__":
main()10.4 Acquire Ookla broadband tiles
Read the Ookla open-data broadband tiles from S3 and clip them to Arlington.
pipeline/acquire_ookla.py
"""Acquire Ookla fixed-broadband tiles for Arlington from S3 open data."""
from __future__ import annotations
import geopandas as gpd
import pandas as pd
from shapely import wkt
from pipeline import config
from pipeline.acquire_geographies import fetch_block_groups
def _s3_path() -> str:
month = config.OOKLA_QUARTER * 3 - 2 # Q1->01, Q2->04, Q3->07, Q4->10
return config.OOKLA_S3.format(
year=config.OOKLA_YEAR, quarter=config.OOKLA_QUARTER, month=month
)
def fetch_ookla_tiles() -> gpd.GeoDataFrame:
"""Download and bbox-filter Ookla tiles to Arlington County."""
df = pd.read_parquet(_s3_path(), storage_options={"anon": True})
df["geometry"] = df["tile"].apply(wkt.loads)
tiles = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
bbox = fetch_block_groups().to_crs("EPSG:4326").total_bounds # minx,miny,maxx,maxy
minx, miny, maxx, maxy = bbox
return tiles.cx[minx:maxx, miny:maxy].copy()
def main() -> None:
tiles = fetch_ookla_tiles()
cols = ["quadkey", "avg_d_kbps", "avg_u_kbps", "avg_lat_ms", "tests", "devices", "geometry"]
tiles[cols].to_file(config.OOKLA_TILES, driver="GeoJSON")
print(f"Wrote {len(tiles)} Ookla tiles → {config.OOKLA_TILES}")
if __name__ == "__main__":
main()10.5 Redistribute income
Area-weight the income and household counts onto civic associations with sdc-redistribute, then derive mean household income.
pipeline/redistribute_income.py
"""Redistribute ACS income counts to civic associations and derive mean income."""
from __future__ import annotations
from pathlib import Path
import geopandas as gpd
import pandas as pd
from sdc_redistribute import redistribute_direct
from pipeline import config
def redistribute_income(
counts_long: pd.DataFrame, source_geo: Path, target_geo: Path
) -> pd.DataFrame:
"""Return per-civic-association agg_income, households, and derived mean_income.
``counts_long`` is long format (geoid, year, measure, value) with measures
``agg_income`` and ``households``.
"""
result = redistribute_direct(
source_df=counts_long,
source_geo=source_geo,
target_geos={"civic_association": target_geo},
count_cols=["agg_income", "households"],
source_id="geoid",
)
# result is long with measures agg_income_direct / households_direct
wide = result.pivot_table(
index="geoid", columns="measure", values="value", aggfunc="first"
).reset_index()
wide.columns.name = None
wide = wide.rename(
columns={"agg_income_direct": "agg_income", "households_direct": "households"}
)
# Mean household income is an INTENSIVE quantity derived from two counts.
wide["mean_income"] = wide["agg_income"] / wide["households"].where(
wide["households"] > 0
)
return wide[["geoid", "agg_income", "households", "mean_income"]]
def main() -> None:
# ACS counts GeoJSON uses uppercase GEOID; rename to the lowercase `geoid`
# that redistribute_direct expects, for both the long-format counts and the
# source geometry. Write a geoid-keyed source file for redistribute_direct.
bg = gpd.read_file(config.ACS_COUNTS).rename(columns={"GEOID": "geoid"})
long = bg[["geoid", "agg_income", "households"]].melt(
id_vars="geoid", var_name="measure", value_name="value"
)
long["year"] = config.ACS_YEAR
src_path = config.DATA / "_acs_src.geojson"
bg[["geoid", "geometry"]].to_file(src_path, driver="GeoJSON")
out = redistribute_income(long, src_path, config.CIVIC_ASSOC)
src_path.unlink(missing_ok=True)
out.to_csv(config.CIVIC_INCOME, index=False)
print(f"Wrote {len(out)} civic-association income rows → {config.CIVIC_INCOME}")
if __name__ == "__main__":
main()10.6 Redistribute broadband
Redistribute test counts and the speed×tests product, then derive the count-weighted mean download and upload speeds.
pipeline/redistribute_broadband.py
"""Redistribute Ookla broadband to civic associations; derive mean speeds."""
from __future__ import annotations
from pathlib import Path
import geopandas as gpd
import pandas as pd
from sdc_redistribute import redistribute_direct
from pipeline import config
MIN_TESTS = 5 # statistical-reliability filter on source tiles
def redistribute_broadband(
counts_long: pd.DataFrame, source_geo: Path, target_geo: Path
) -> pd.DataFrame:
"""Return per-civic-association tests and derived download/upload Mbps."""
result = redistribute_direct(
source_df=counts_long,
source_geo=source_geo,
target_geos={"civic_association": target_geo},
count_cols=["tests", "d_product", "u_product"],
source_id="geoid",
)
wide = result.pivot_table(
index="geoid", columns="measure", values="value", aggfunc="first"
).reset_index()
wide.columns.name = None
wide = wide.rename(
columns={
"tests_direct": "tests",
"d_product_direct": "d_product",
"u_product_direct": "u_product",
}
)
valid = wide["tests"] > 0
wide["download_mbps"] = (wide["d_product"] / wide["tests"].where(valid)) / 1000
wide["upload_mbps"] = (wide["u_product"] / wide["tests"].where(valid)) / 1000
return wide[["geoid", "tests", "download_mbps", "upload_mbps"]]
def build_counts_long(tiles: gpd.GeoDataFrame) -> pd.DataFrame:
"""Build the long-format count frame from raw Ookla tiles."""
t = tiles[tiles["tests"] >= MIN_TESTS].copy()
t["d_product"] = t["avg_d_kbps"] * t["tests"]
t["u_product"] = t["avg_u_kbps"] * t["tests"]
long = t[["quadkey", "tests", "d_product", "u_product"]].melt(
id_vars="quadkey", var_name="measure", value_name="value"
)
long = long.rename(columns={"quadkey": "geoid"})
long["year"] = config.OOKLA_YEAR
return long
def main() -> None:
tiles = gpd.read_file(config.OOKLA_TILES)
tiles["geoid"] = tiles["quadkey"].astype(str)
long = build_counts_long(tiles)
# Write a geoid-keyed source file for redistribute_direct
src = tiles[["geoid", "geometry"]].copy()
src_path = config.DATA / "_ookla_src.geojson"
src.to_file(src_path, driver="GeoJSON")
out = redistribute_broadband(long, src_path, config.CIVIC_ASSOC)
out.to_csv(config.CIVIC_BROADBAND, index=False)
src_path.unlink(missing_ok=True)
print(f"Wrote {len(out)} civic-association broadband rows → {config.CIVIC_BROADBAND}")
if __name__ == "__main__":
main()10.7 Combine and derive metrics
Join the redistributed income and broadband measures onto the civic association polygons and compute the income-to-speed ratio and the bivariate income×speed classes.
pipeline/combine.py
"""Combine income + broadband and compute derived policy metrics."""
from __future__ import annotations
import geopandas as gpd
import pandas as pd
from pipeline import config
def _tercile(series: pd.Series) -> pd.Series:
"""Return 1/2/3 tercile labels by rank (robust to ties and NaN)."""
ranks = series.rank(method="first")
return pd.qcut(ranks, 3, labels=[1, 2, 3]).astype("Int64")
def add_derived_metrics(df: pd.DataFrame) -> pd.DataFrame:
"""Add income_speed_ratio and bivariate_class (income-speed terciles)."""
out = df.copy()
out["income_speed_ratio"] = out["mean_income"] / out["download_mbps"].where(
out["download_mbps"] > 0
)
inc = _tercile(out["mean_income"])
spd = _tercile(out["download_mbps"])
out["bivariate_class"] = inc.astype(str) + "-" + spd.astype(str)
return out
def main() -> None:
civ = gpd.read_file(config.CIVIC_ASSOC)[["geoid", "region_name", "geometry"]]
income = pd.read_csv(config.CIVIC_INCOME, dtype={"geoid": str})
broadband = pd.read_csv(config.CIVIC_BROADBAND, dtype={"geoid": str})
merged = civ.merge(income, on="geoid", how="left").merge(
broadband, on="geoid", how="left"
)
merged = add_derived_metrics(merged)
gdf = gpd.GeoDataFrame(merged, geometry="geometry", crs=civ.crs)
gdf.to_file(config.CIVIC_COMBINED, driver="GeoJSON")
print(f"Wrote {len(gdf)} combined civic-association rows → {config.CIVIC_COMBINED}")
if __name__ == "__main__":
main()10.8 Acquire parcels
Page through the Arlington MHUD FeatureServer and reduce parcel polygons to unit-bearing centroids.
pipeline/acquire_parcels.py
"""Acquire Arlington MHUD parcel polygons and reduce them to unit-bearing centroids."""
from __future__ import annotations
import json
import urllib.parse
import urllib.request
import geopandas as gpd
import pandas as pd
from pipeline import config
LAYER = (
"https://arlgis.arlingtonva.us/arcgis/rest/services/Open_Data/"
"od_MHUD_Polygons/FeatureServer/0/query"
)
PAGE = 2000
def _fetch_page(offset: int) -> gpd.GeoDataFrame | None:
params = {
"where": "1=1",
"outFields": "Total_Units,Unit_Type",
"outSR": "4326",
"resultOffset": str(offset),
"resultRecordCount": str(PAGE),
"f": "geojson",
}
url = LAYER + "?" + urllib.parse.urlencode(params)
with urllib.request.urlopen(url, timeout=60) as resp: # noqa: S310
data = json.load(resp)
feats = data.get("features", [])
if not feats:
return None
return gpd.GeoDataFrame.from_features(feats, crs="EPSG:4326")
def fetch_parcels() -> gpd.GeoDataFrame:
"""Page through the MHUD FeatureServer and return all parcel polygons."""
parts: list[gpd.GeoDataFrame] = []
offset = 0
while True:
page = _fetch_page(offset)
if page is None or len(page) == 0:
break
parts.append(page)
if len(page) < PAGE:
break
offset += PAGE
gdf = gpd.GeoDataFrame(pd.concat(parts, ignore_index=True), crs="EPSG:4326")
return gdf
def to_centroids(polys: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Reduce parcel polygons to centroids, keeping Total_Units and Unit_Type."""
cent = polys.copy()
# representative_point() is guaranteed inside the polygon (unlike centroid)
cent["geometry"] = cent.geometry.representative_point()
return cent[["Total_Units", "Unit_Type", "geometry"]]
def main() -> None:
polys = fetch_parcels()
pts = to_centroids(polys)
pts.to_file(config.PARCELS, driver="GeoJSON")
print(f"Wrote {len(pts)} parcel centroids → {config.PARCELS}")
if __name__ == "__main__":
main()10.9 Redistribute income through parcels
Explode parcels into per-unit points and redistribute block-group income through them (the dasymetric second approach).
pipeline/redistribute_income_parcels.py
"""Redistribute income through unit-weighted parcels (dasymetric), derive mean income."""
from __future__ import annotations
from pathlib import Path
import geopandas as gpd
import pandas as pd
from sdc_redistribute import redistribute_parcels
from pipeline import config
def explode_by_units(parcels: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Replicate each parcel into Total_Units identical points; drop units <= 0."""
p = parcels[parcels["Total_Units"] > 0].copy()
reps = p["Total_Units"].astype(int).to_numpy()
out = p.loc[p.index.repeat(reps)].reset_index(drop=True)
return out
def redistribute_income_parcels(
counts_long: pd.DataFrame,
parcels: gpd.GeoDataFrame,
source_geo: Path,
target_geo: Path,
) -> pd.DataFrame:
"""Return per-civic-association agg_income, households, derived mean_income."""
points = explode_by_units(parcels)
result = redistribute_parcels(
source_df=counts_long,
parcel_centroids=points,
source_geo=source_geo,
target_geos={"civic_association": target_geo},
count_cols=["agg_income", "households"],
source_id="geoid",
)
wide = result.pivot_table(
index="geoid", columns="measure", values="value", aggfunc="first"
).reset_index()
wide.columns.name = None
wide = wide.rename(
columns={"agg_income_parcels": "agg_income", "households_parcels": "households"}
)
wide["mean_income"] = wide["agg_income"] / wide["households"].where(
wide["households"] > 0
)
return wide[["geoid", "agg_income", "households", "mean_income"]]
def main() -> None:
parcels = gpd.read_file(config.PARCELS)
bg = gpd.read_file(config.ACS_COUNTS).rename(columns={"GEOID": "geoid"})
long = bg[["geoid", "agg_income", "households"]].melt(
id_vars="geoid", var_name="measure", value_name="value"
)
long["year"] = config.ACS_YEAR
src_path = config.DATA / "_acs_src_parcels.geojson"
bg[["geoid", "geometry"]].to_file(src_path, driver="GeoJSON")
out = redistribute_income_parcels(long, parcels, src_path, config.CIVIC_ASSOC)
src_path.unlink(missing_ok=True)
out.to_csv(config.CIVIC_INCOME_PARCELS, index=False)
print(f"Wrote {len(out)} parcel-weighted income rows → {config.CIVIC_INCOME_PARCELS}")
if __name__ == "__main__":
main()10.10 Compare methods
Join the area-weighted and parcel-weighted income and quantify the difference per civic association.
pipeline/compare_methods.py
"""Compare area-weighted vs parcel-weighted income per civic association."""
from __future__ import annotations
import geopandas as gpd
import pandas as pd
from pipeline import config
def compare_income(area: pd.DataFrame, parcel: pd.DataFrame) -> pd.DataFrame:
"""Join the two methods' mean income; add diff (parcel - area) and pct_diff."""
a = area[["geoid", "mean_income"]].rename(columns={"mean_income": "mean_income_area"})
p = parcel[["geoid", "mean_income"]].rename(columns={"mean_income": "mean_income_parcel"})
out = a.merge(p, on="geoid", how="outer")
out["diff"] = out["mean_income_parcel"] - out["mean_income_area"]
out["pct_diff"] = 100 * out["diff"] / out["mean_income_area"].where(
out["mean_income_area"] > 0
)
return out
def main() -> None:
civ = gpd.read_file(config.CIVIC_ASSOC)[["geoid", "region_name", "geometry"]]
area = pd.read_csv(config.CIVIC_INCOME, dtype={"geoid": str})
parcel = pd.read_csv(config.CIVIC_INCOME_PARCELS, dtype={"geoid": str})
cmp = compare_income(area, parcel)
gdf = civ.merge(cmp, on="geoid", how="left")
gdf = gpd.GeoDataFrame(gdf, geometry="geometry", crs=civ.crs)
# Honest accounting: report parcel-method household total vs ACS source.
acs = gpd.read_file(config.ACS_COUNTS)
src_hh = float(acs["households"].sum())
parcel_hh = float(pd.read_csv(config.CIVIC_INCOME_PARCELS)["households"].sum())
pct = 100 * (parcel_hh - src_hh) / src_hh if src_hh else 0.0
print(f"Household totals, ACS source: {src_hh:,.0f}; parcel method: {parcel_hh:,.0f} ({pct:+.1f}%)")
gdf.to_file(config.CIVIC_INCOME_COMPARISON, driver="GeoJSON")
print(f"Wrote {len(gdf)} comparison rows → {config.CIVIC_INCOME_COMPARISON}")
if __name__ == "__main__":
main()10.11 Validate
Confirm household totals are preserved by the redistribution and that no speeds or incomes are negative.
pipeline/validate.py
"""Integrity checks on redistributed outputs."""
from __future__ import annotations
import pandas as pd
def check_total_preserved(source_total: float, target_total: float, tol: float = 0.01) -> None:
"""Assert target total matches source total within relative tolerance ``tol``."""
if source_total == 0:
return
rel = abs(target_total - source_total) / abs(source_total)
assert rel <= tol, f"total drift {rel:.3%} exceeds {tol:.3%}"
def check_ranges(df: pd.DataFrame) -> None:
"""Assert speed/income columns are non-negative where present."""
for col in ("download_mbps", "upload_mbps", "mean_income", "households"):
if col in df.columns:
vals = df[col].dropna()
assert (vals >= 0).all(), f"{col} has negative values"
def main() -> None:
import geopandas as gpd
from pipeline import config
combined = gpd.read_file(config.CIVIC_COMBINED)
acs = gpd.read_file(config.ACS_COUNTS)
check_total_preserved(
acs["households"].sum(), combined["households"].sum(), tol=0.02
)
check_ranges(combined)
print("Validation passed.")
if __name__ == "__main__":
main()10.12 Run the full pipeline
Wire every stage into one command: uv run python -m pipeline.run.
pipeline/run.py
"""Run the full pipeline end to end (acquisition → outputs → validation)."""
from __future__ import annotations
from pipeline import (
acquire_acs,
acquire_geographies,
acquire_ookla,
acquire_parcels,
combine,
compare_methods,
redistribute_broadband,
redistribute_income,
redistribute_income_parcels,
validate,
)
def main() -> None:
acquire_geographies.main()
acquire_acs.main()
acquire_ookla.main()
acquire_parcels.main()
redistribute_income.main()
redistribute_broadband.main()
combine.main()
redistribute_income_parcels.main()
compare_methods.main()
validate.main()
print("Pipeline complete.")
if __name__ == "__main__":
main()