10  The Complete Pipeline Code (Python)

This appendix reproduces the full, runnable Python pipeline behind the worked example: the actual source, not a simplified sketch. It is generated directly from the pipeline/ package in the companion repository, so it always matches the code that produced every figure and number in this guide.

The pipeline is built on the sdc-redistribute package and runs end to end with a single command:

uv run python -m pipeline.run

The modules below appear in execution order. Figure generation (pipeline/style.py, pipeline/maps.py, pipeline/figures.py) lives in the same repository and is omitted here to keep the focus on the data method.

10.1 Configuration

Central settings: the Arlington FIPS codes, the ACS count variables (aggregate income and households), and the dataset paths every other module reads.

pipeline/config.py

"""Central configuration: geographies, CRS, source identifiers, paths."""
from pathlib import Path

from dotenv import load_dotenv

# Load secrets (e.g. CENSUS_API_KEY) from a local .env if present. The file is
# gitignored; loading it here makes the key available to any pipeline module.
load_dotenv()

# Arlington County, Virginia
STATE_FIPS = "51"
COUNTY_FIPS = "013"
ACS_YEAR = 2021          # 5-year ACS vintage used throughout
OOKLA_YEAR = 2023
OOKLA_QUARTER = 2

# Area weighting is handled inside sdc-redistribute, which reprojects geographic
# inputs to EPSG:3857 before computing intersection areas. At county scale the
# choice of projected CRS is immaterial (<0.3% area-ratio spread vs State Plane),
# so the pipeline does not impose its own projected CRS.

# ACS variables are EXTENSIVE COUNTS (never redistribute the median directly).
# Mean household income is derived later as agg_income / households.
ACS_VARS = {
    "agg_income": "B19025_001",   # Aggregate household income (dollars)
    "households": "B11001_001",   # Total households (count)
}

# Ookla open data S3 parquet path template
OOKLA_S3 = (
    "s3://ookla-open-data/parquet/performance/type=fixed/"
    "year={year}/quarter={quarter}/"
    "{year}-{month:02d}-01_performance_fixed_tiles.parquet"
)

# Paths
ROOT = Path(__file__).resolve().parent.parent
DATA = ROOT / "data"

# Source / output dataset paths
CIVIC_ASSOC = DATA / "va013_geo_arl_2021_civic_associations.geojson"
BLOCK_GROUPS = DATA / "va013_block_groups.geojson"          # produced by acquire_geographies
BLOCKS = DATA / "va013_geo_blocks.geojson"                  # existing
ACS_COUNTS = DATA / "va013_acs_income_counts.geojson"       # produced by acquire_acs
OOKLA_TILES = DATA / "ookla_tiles_arlington.geojson"        # existing/refreshable
CIVIC_INCOME = DATA / "civic_income.csv"                    # produced by redistribute_income
CIVIC_BROADBAND = DATA / "civic_broadband.csv"             # produced by redistribute_broadband
CIVIC_COMBINED = DATA / "civic_combined.geojson"           # produced by combine
PARCELS = DATA / "va013_parcels.geojson"                   # produced by acquire_parcels
CIVIC_INCOME_PARCELS = DATA / "civic_income_parcels.csv"   # produced by redistribute_income_parcels
CIVIC_INCOME_COMPARISON = DATA / "civic_income_comparison.geojson"  # produced by compare_methods

10.2 Acquire census geographies

Download Arlington’s census block groups, the source geography for the income redistribution.

pipeline/acquire_geographies.py

"""Acquire Arlington census block-group geometries via pygris."""
from __future__ import annotations

import geopandas as gpd
from pygris import block_groups

from pipeline import config


def fetch_block_groups() -> gpd.GeoDataFrame:
    """Download Arlington County (VA/013) block groups for the ACS vintage."""
    bg = block_groups(
        state=config.STATE_FIPS,
        county=config.COUNTY_FIPS,
        year=config.ACS_YEAR,
        cb=True,
    )
    return bg


def main() -> None:
    bg = fetch_block_groups()
    bg.to_file(config.BLOCK_GROUPS, driver="GeoJSON")
    print(f"Wrote {len(bg)} block groups → {config.BLOCK_GROUPS}")


if __name__ == "__main__":
    main()

10.3 Acquire ACS income counts

Pull aggregate household income and household counts from the Census API. These are counts (extensive measures); mean income is derived later.

pipeline/acquire_acs.py

"""Acquire Arlington ACS aggregate income + household counts (block group)."""
from __future__ import annotations

import os

import geopandas as gpd
import pandas as pd
from census import Census

from pipeline import config
from pipeline.acquire_geographies import fetch_block_groups


def _sanitize_counts(df: pd.DataFrame) -> pd.DataFrame:
    """Replace Census negative sentinels (e.g. -666666666) with 0.

    Aggregate income and household counts are non-negative by definition; the
    Census API returns large negative sentinel codes for suppressed or
    not-applicable cells (typically zero-household block groups). Left in place,
    these sentinels would propagate through area-weighted redistribution and
    produce negative incomes. Treating them as 0 is correct for the
    zero-household areas they mark.
    """
    out = df.copy()
    for col in ("agg_income", "households"):
        out[col] = out[col].mask(out[col] < 0, 0)
    return out


def fetch_acs_counts(api_key: str | None = None) -> pd.DataFrame:
    """Return a DataFrame: GEOID, agg_income, households for Arlington block groups."""
    key = api_key or os.environ.get("CENSUS_API_KEY")
    c = Census(key, year=config.ACS_YEAR)
    # The census library needs the API estimate suffix "E" on each variable
    # (e.g. B19025_001E), unlike tidycensus which strips it.
    agg_var = config.ACS_VARS["agg_income"] + "E"
    hh_var = config.ACS_VARS["households"] + "E"
    rows = c.acs5.state_county_blockgroup(
        fields=("NAME", agg_var, hh_var),
        state_fips=config.STATE_FIPS,
        county_fips=config.COUNTY_FIPS,
        blockgroup=Census.ALL,
        tract=Census.ALL,
    )
    df = pd.DataFrame(rows)
    df["GEOID"] = df["state"] + df["county"] + df["tract"] + df["block group"]
    df = df.rename(columns={agg_var: "agg_income", hh_var: "households"})
    df = _sanitize_counts(df)
    return df[["GEOID", "agg_income", "households"]]


def main() -> None:
    counts = fetch_acs_counts()
    bg = fetch_block_groups()[["GEOID", "geometry"]]
    merged = bg.merge(counts, on="GEOID", how="left")
    gdf = gpd.GeoDataFrame(merged, geometry="geometry", crs=bg.crs)
    gdf.to_file(config.ACS_COUNTS, driver="GeoJSON")
    print(f"Wrote {len(gdf)} block groups with income counts → {config.ACS_COUNTS}")


if __name__ == "__main__":
    main()

10.4 Acquire Ookla broadband tiles

Read the Ookla open-data broadband tiles from S3 and clip them to Arlington.

pipeline/acquire_ookla.py

"""Acquire Ookla fixed-broadband tiles for Arlington from S3 open data."""
from __future__ import annotations

import geopandas as gpd
import pandas as pd
from shapely import wkt

from pipeline import config
from pipeline.acquire_geographies import fetch_block_groups


def _s3_path() -> str:
    month = config.OOKLA_QUARTER * 3 - 2  # Q1->01, Q2->04, Q3->07, Q4->10
    return config.OOKLA_S3.format(
        year=config.OOKLA_YEAR, quarter=config.OOKLA_QUARTER, month=month
    )


def fetch_ookla_tiles() -> gpd.GeoDataFrame:
    """Download and bbox-filter Ookla tiles to Arlington County."""
    df = pd.read_parquet(_s3_path(), storage_options={"anon": True})
    df["geometry"] = df["tile"].apply(wkt.loads)
    tiles = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")

    bbox = fetch_block_groups().to_crs("EPSG:4326").total_bounds  # minx,miny,maxx,maxy
    minx, miny, maxx, maxy = bbox
    return tiles.cx[minx:maxx, miny:maxy].copy()


def main() -> None:
    tiles = fetch_ookla_tiles()
    cols = ["quadkey", "avg_d_kbps", "avg_u_kbps", "avg_lat_ms", "tests", "devices", "geometry"]
    tiles[cols].to_file(config.OOKLA_TILES, driver="GeoJSON")
    print(f"Wrote {len(tiles)} Ookla tiles → {config.OOKLA_TILES}")


if __name__ == "__main__":
    main()

10.5 Redistribute income

Area-weight the income and household counts onto civic associations with sdc-redistribute, then derive mean household income.

pipeline/redistribute_income.py

"""Redistribute ACS income counts to civic associations and derive mean income."""
from __future__ import annotations

from pathlib import Path

import geopandas as gpd
import pandas as pd
from sdc_redistribute import redistribute_direct

from pipeline import config


def redistribute_income(
    counts_long: pd.DataFrame, source_geo: Path, target_geo: Path
) -> pd.DataFrame:
    """Return per-civic-association agg_income, households, and derived mean_income.

    ``counts_long`` is long format (geoid, year, measure, value) with measures
    ``agg_income`` and ``households``.
    """
    result = redistribute_direct(
        source_df=counts_long,
        source_geo=source_geo,
        target_geos={"civic_association": target_geo},
        count_cols=["agg_income", "households"],
        source_id="geoid",
    )
    # result is long with measures agg_income_direct / households_direct
    wide = result.pivot_table(
        index="geoid", columns="measure", values="value", aggfunc="first"
    ).reset_index()
    wide.columns.name = None
    wide = wide.rename(
        columns={"agg_income_direct": "agg_income", "households_direct": "households"}
    )
    # Mean household income is an INTENSIVE quantity derived from two counts.
    wide["mean_income"] = wide["agg_income"] / wide["households"].where(
        wide["households"] > 0
    )
    return wide[["geoid", "agg_income", "households", "mean_income"]]


def main() -> None:
    # ACS counts GeoJSON uses uppercase GEOID; rename to the lowercase `geoid`
    # that redistribute_direct expects, for both the long-format counts and the
    # source geometry. Write a geoid-keyed source file for redistribute_direct.
    bg = gpd.read_file(config.ACS_COUNTS).rename(columns={"GEOID": "geoid"})
    long = bg[["geoid", "agg_income", "households"]].melt(
        id_vars="geoid", var_name="measure", value_name="value"
    )
    long["year"] = config.ACS_YEAR

    src_path = config.DATA / "_acs_src.geojson"
    bg[["geoid", "geometry"]].to_file(src_path, driver="GeoJSON")
    out = redistribute_income(long, src_path, config.CIVIC_ASSOC)
    src_path.unlink(missing_ok=True)

    out.to_csv(config.CIVIC_INCOME, index=False)
    print(f"Wrote {len(out)} civic-association income rows → {config.CIVIC_INCOME}")


if __name__ == "__main__":
    main()

10.6 Redistribute broadband

Redistribute test counts and the speed×tests product, then derive the count-weighted mean download and upload speeds.

pipeline/redistribute_broadband.py

"""Redistribute Ookla broadband to civic associations; derive mean speeds."""
from __future__ import annotations

from pathlib import Path

import geopandas as gpd
import pandas as pd
from sdc_redistribute import redistribute_direct

from pipeline import config

MIN_TESTS = 5  # statistical-reliability filter on source tiles


def redistribute_broadband(
    counts_long: pd.DataFrame, source_geo: Path, target_geo: Path
) -> pd.DataFrame:
    """Return per-civic-association tests and derived download/upload Mbps."""
    result = redistribute_direct(
        source_df=counts_long,
        source_geo=source_geo,
        target_geos={"civic_association": target_geo},
        count_cols=["tests", "d_product", "u_product"],
        source_id="geoid",
    )
    wide = result.pivot_table(
        index="geoid", columns="measure", values="value", aggfunc="first"
    ).reset_index()
    wide.columns.name = None
    wide = wide.rename(
        columns={
            "tests_direct": "tests",
            "d_product_direct": "d_product",
            "u_product_direct": "u_product",
        }
    )
    valid = wide["tests"] > 0
    wide["download_mbps"] = (wide["d_product"] / wide["tests"].where(valid)) / 1000
    wide["upload_mbps"] = (wide["u_product"] / wide["tests"].where(valid)) / 1000
    return wide[["geoid", "tests", "download_mbps", "upload_mbps"]]


def build_counts_long(tiles: gpd.GeoDataFrame) -> pd.DataFrame:
    """Build the long-format count frame from raw Ookla tiles."""
    t = tiles[tiles["tests"] >= MIN_TESTS].copy()
    t["d_product"] = t["avg_d_kbps"] * t["tests"]
    t["u_product"] = t["avg_u_kbps"] * t["tests"]
    long = t[["quadkey", "tests", "d_product", "u_product"]].melt(
        id_vars="quadkey", var_name="measure", value_name="value"
    )
    long = long.rename(columns={"quadkey": "geoid"})
    long["year"] = config.OOKLA_YEAR
    return long


def main() -> None:
    tiles = gpd.read_file(config.OOKLA_TILES)
    tiles["geoid"] = tiles["quadkey"].astype(str)
    long = build_counts_long(tiles)
    # Write a geoid-keyed source file for redistribute_direct
    src = tiles[["geoid", "geometry"]].copy()
    src_path = config.DATA / "_ookla_src.geojson"
    src.to_file(src_path, driver="GeoJSON")
    out = redistribute_broadband(long, src_path, config.CIVIC_ASSOC)
    out.to_csv(config.CIVIC_BROADBAND, index=False)
    src_path.unlink(missing_ok=True)
    print(f"Wrote {len(out)} civic-association broadband rows → {config.CIVIC_BROADBAND}")


if __name__ == "__main__":
    main()

10.7 Combine and derive metrics

Join the redistributed income and broadband measures onto the civic association polygons and compute the income-to-speed ratio and the bivariate income×speed classes.

pipeline/combine.py

"""Combine income + broadband and compute derived policy metrics."""
from __future__ import annotations

import geopandas as gpd
import pandas as pd

from pipeline import config


def _tercile(series: pd.Series) -> pd.Series:
    """Return 1/2/3 tercile labels by rank (robust to ties and NaN)."""
    ranks = series.rank(method="first")
    return pd.qcut(ranks, 3, labels=[1, 2, 3]).astype("Int64")


def add_derived_metrics(df: pd.DataFrame) -> pd.DataFrame:
    """Add income_speed_ratio and bivariate_class (income-speed terciles)."""
    out = df.copy()
    out["income_speed_ratio"] = out["mean_income"] / out["download_mbps"].where(
        out["download_mbps"] > 0
    )
    inc = _tercile(out["mean_income"])
    spd = _tercile(out["download_mbps"])
    out["bivariate_class"] = inc.astype(str) + "-" + spd.astype(str)
    return out


def main() -> None:
    civ = gpd.read_file(config.CIVIC_ASSOC)[["geoid", "region_name", "geometry"]]
    income = pd.read_csv(config.CIVIC_INCOME, dtype={"geoid": str})
    broadband = pd.read_csv(config.CIVIC_BROADBAND, dtype={"geoid": str})
    merged = civ.merge(income, on="geoid", how="left").merge(
        broadband, on="geoid", how="left"
    )
    merged = add_derived_metrics(merged)
    gdf = gpd.GeoDataFrame(merged, geometry="geometry", crs=civ.crs)
    gdf.to_file(config.CIVIC_COMBINED, driver="GeoJSON")
    print(f"Wrote {len(gdf)} combined civic-association rows → {config.CIVIC_COMBINED}")


if __name__ == "__main__":
    main()

10.8 Acquire parcels

Page through the Arlington MHUD FeatureServer and reduce parcel polygons to unit-bearing centroids.

pipeline/acquire_parcels.py

"""Acquire Arlington MHUD parcel polygons and reduce them to unit-bearing centroids."""
from __future__ import annotations

import json
import urllib.parse
import urllib.request

import geopandas as gpd
import pandas as pd

from pipeline import config

LAYER = (
    "https://arlgis.arlingtonva.us/arcgis/rest/services/Open_Data/"
    "od_MHUD_Polygons/FeatureServer/0/query"
)
PAGE = 2000


def _fetch_page(offset: int) -> gpd.GeoDataFrame | None:
    params = {
        "where": "1=1",
        "outFields": "Total_Units,Unit_Type",
        "outSR": "4326",
        "resultOffset": str(offset),
        "resultRecordCount": str(PAGE),
        "f": "geojson",
    }
    url = LAYER + "?" + urllib.parse.urlencode(params)
    with urllib.request.urlopen(url, timeout=60) as resp:  # noqa: S310
        data = json.load(resp)
    feats = data.get("features", [])
    if not feats:
        return None
    return gpd.GeoDataFrame.from_features(feats, crs="EPSG:4326")


def fetch_parcels() -> gpd.GeoDataFrame:
    """Page through the MHUD FeatureServer and return all parcel polygons."""
    parts: list[gpd.GeoDataFrame] = []
    offset = 0
    while True:
        page = _fetch_page(offset)
        if page is None or len(page) == 0:
            break
        parts.append(page)
        if len(page) < PAGE:
            break
        offset += PAGE
    gdf = gpd.GeoDataFrame(pd.concat(parts, ignore_index=True), crs="EPSG:4326")
    return gdf


def to_centroids(polys: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """Reduce parcel polygons to centroids, keeping Total_Units and Unit_Type."""
    cent = polys.copy()
    # representative_point() is guaranteed inside the polygon (unlike centroid)
    cent["geometry"] = cent.geometry.representative_point()
    return cent[["Total_Units", "Unit_Type", "geometry"]]


def main() -> None:
    polys = fetch_parcels()
    pts = to_centroids(polys)
    pts.to_file(config.PARCELS, driver="GeoJSON")
    print(f"Wrote {len(pts)} parcel centroids → {config.PARCELS}")


if __name__ == "__main__":
    main()

10.9 Redistribute income through parcels

Explode parcels into per-unit points and redistribute block-group income through them (the dasymetric second approach).

pipeline/redistribute_income_parcels.py

"""Redistribute income through unit-weighted parcels (dasymetric), derive mean income."""
from __future__ import annotations

from pathlib import Path

import geopandas as gpd
import pandas as pd
from sdc_redistribute import redistribute_parcels

from pipeline import config


def explode_by_units(parcels: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
    """Replicate each parcel into Total_Units identical points; drop units <= 0."""
    p = parcels[parcels["Total_Units"] > 0].copy()
    reps = p["Total_Units"].astype(int).to_numpy()
    out = p.loc[p.index.repeat(reps)].reset_index(drop=True)
    return out


def redistribute_income_parcels(
    counts_long: pd.DataFrame,
    parcels: gpd.GeoDataFrame,
    source_geo: Path,
    target_geo: Path,
) -> pd.DataFrame:
    """Return per-civic-association agg_income, households, derived mean_income."""
    points = explode_by_units(parcels)
    result = redistribute_parcels(
        source_df=counts_long,
        parcel_centroids=points,
        source_geo=source_geo,
        target_geos={"civic_association": target_geo},
        count_cols=["agg_income", "households"],
        source_id="geoid",
    )
    wide = result.pivot_table(
        index="geoid", columns="measure", values="value", aggfunc="first"
    ).reset_index()
    wide.columns.name = None
    wide = wide.rename(
        columns={"agg_income_parcels": "agg_income", "households_parcels": "households"}
    )
    wide["mean_income"] = wide["agg_income"] / wide["households"].where(
        wide["households"] > 0
    )
    return wide[["geoid", "agg_income", "households", "mean_income"]]


def main() -> None:
    parcels = gpd.read_file(config.PARCELS)
    bg = gpd.read_file(config.ACS_COUNTS).rename(columns={"GEOID": "geoid"})
    long = bg[["geoid", "agg_income", "households"]].melt(
        id_vars="geoid", var_name="measure", value_name="value"
    )
    long["year"] = config.ACS_YEAR

    src_path = config.DATA / "_acs_src_parcels.geojson"
    bg[["geoid", "geometry"]].to_file(src_path, driver="GeoJSON")
    out = redistribute_income_parcels(long, parcels, src_path, config.CIVIC_ASSOC)
    src_path.unlink(missing_ok=True)

    out.to_csv(config.CIVIC_INCOME_PARCELS, index=False)
    print(f"Wrote {len(out)} parcel-weighted income rows → {config.CIVIC_INCOME_PARCELS}")


if __name__ == "__main__":
    main()

10.10 Compare methods

Join the area-weighted and parcel-weighted income and quantify the difference per civic association.

pipeline/compare_methods.py

"""Compare area-weighted vs parcel-weighted income per civic association."""
from __future__ import annotations

import geopandas as gpd
import pandas as pd

from pipeline import config


def compare_income(area: pd.DataFrame, parcel: pd.DataFrame) -> pd.DataFrame:
    """Join the two methods' mean income; add diff (parcel - area) and pct_diff."""
    a = area[["geoid", "mean_income"]].rename(columns={"mean_income": "mean_income_area"})
    p = parcel[["geoid", "mean_income"]].rename(columns={"mean_income": "mean_income_parcel"})
    out = a.merge(p, on="geoid", how="outer")
    out["diff"] = out["mean_income_parcel"] - out["mean_income_area"]
    out["pct_diff"] = 100 * out["diff"] / out["mean_income_area"].where(
        out["mean_income_area"] > 0
    )
    return out


def main() -> None:
    civ = gpd.read_file(config.CIVIC_ASSOC)[["geoid", "region_name", "geometry"]]
    area = pd.read_csv(config.CIVIC_INCOME, dtype={"geoid": str})
    parcel = pd.read_csv(config.CIVIC_INCOME_PARCELS, dtype={"geoid": str})
    cmp = compare_income(area, parcel)
    gdf = civ.merge(cmp, on="geoid", how="left")
    gdf = gpd.GeoDataFrame(gdf, geometry="geometry", crs=civ.crs)

    # Honest accounting: report parcel-method household total vs ACS source.
    acs = gpd.read_file(config.ACS_COUNTS)
    src_hh = float(acs["households"].sum())
    parcel_hh = float(pd.read_csv(config.CIVIC_INCOME_PARCELS)["households"].sum())
    pct = 100 * (parcel_hh - src_hh) / src_hh if src_hh else 0.0
    print(f"Household totals, ACS source: {src_hh:,.0f}; parcel method: {parcel_hh:,.0f} ({pct:+.1f}%)")

    gdf.to_file(config.CIVIC_INCOME_COMPARISON, driver="GeoJSON")
    print(f"Wrote {len(gdf)} comparison rows → {config.CIVIC_INCOME_COMPARISON}")


if __name__ == "__main__":
    main()

10.11 Validate

Confirm household totals are preserved by the redistribution and that no speeds or incomes are negative.

pipeline/validate.py

"""Integrity checks on redistributed outputs."""
from __future__ import annotations

import pandas as pd


def check_total_preserved(source_total: float, target_total: float, tol: float = 0.01) -> None:
    """Assert target total matches source total within relative tolerance ``tol``."""
    if source_total == 0:
        return
    rel = abs(target_total - source_total) / abs(source_total)
    assert rel <= tol, f"total drift {rel:.3%} exceeds {tol:.3%}"


def check_ranges(df: pd.DataFrame) -> None:
    """Assert speed/income columns are non-negative where present."""
    for col in ("download_mbps", "upload_mbps", "mean_income", "households"):
        if col in df.columns:
            vals = df[col].dropna()
            assert (vals >= 0).all(), f"{col} has negative values"


def main() -> None:
    import geopandas as gpd
    from pipeline import config

    combined = gpd.read_file(config.CIVIC_COMBINED)
    acs = gpd.read_file(config.ACS_COUNTS)
    check_total_preserved(
        acs["households"].sum(), combined["households"].sum(), tol=0.02
    )
    check_ranges(combined)
    print("Validation passed.")


if __name__ == "__main__":
    main()

10.12 Run the full pipeline

Wire every stage into one command: uv run python -m pipeline.run.

pipeline/run.py

"""Run the full pipeline end to end (acquisition → outputs → validation)."""
from __future__ import annotations

from pipeline import (
    acquire_acs,
    acquire_geographies,
    acquire_ookla,
    acquire_parcels,
    combine,
    compare_methods,
    redistribute_broadband,
    redistribute_income,
    redistribute_income_parcels,
    validate,
)


def main() -> None:
    acquire_geographies.main()
    acquire_acs.main()
    acquire_ookla.main()
    acquire_parcels.main()
    redistribute_income.main()
    redistribute_broadband.main()
    combine.main()
    redistribute_income_parcels.main()
    compare_methods.main()
    validate.main()
    print("Pipeline complete.")


if __name__ == "__main__":
    main()