11  The Complete Pipeline Code (R)

This appendix reproduces the runnable R counterpart to the worked example. It is generated directly from the pipeline-r/ directory in the companion repository. The R pipeline reads the same intermediate data the Python pipeline produces and performs the redistribution, combination, and validation in R, built on the sdc.redistribute package:

# install the released package from CRAN:
# install.packages("sdc.redistribute")
# or the development version from the repository:
# install.packages("pak"); pak::pak("dads2busy/sdc.redistribute")
# from the repository root:
# Rscript pipeline-r/run.R

Data acquisition is shown as a recipe (acquire-recipe.R) rather than re-run. The compare_to_python.R module confirms the R results match the Python results.

11.1 Configuration

Shared paths, the projected CRS used for area weighting, and the count constants. Mirrors the Python configuration.

pipeline-r/config.R

# Shared configuration for the R pipeline. Mirrors pipeline/config.py.
# Run all modules from the repository root (e.g. `Rscript pipeline-r/run.R`).

PROJECT_CRS <- 3857   # match the CRS sdc-redistribute uses internally in Python
ACS_YEAR    <- 2021
OOKLA_YEAR  <- 2023
MIN_TESTS   <- 5      # statistical-reliability filter on source tiles

DATA <- "data"
path_data <- function(...) file.path(DATA, ...)

# Inputs (shared with the Python pipeline)
CIVIC_ASSOC <- path_data("va013_geo_arl_2021_civic_associations.geojson")
ACS_COUNTS  <- path_data("va013_acs_income_counts.geojson")
OOKLA_TILES <- path_data("ookla_tiles_arlington.geojson")
PARCELS     <- path_data("va013_parcels.geojson")

# R outputs (suffixed _r so they never clobber the Python outputs)
CIVIC_INCOME_R            <- path_data("civic_income_r.csv")
CIVIC_BROADBAND_R         <- path_data("civic_broadband_r.csv")
CIVIC_COMBINED_R          <- path_data("civic_combined_r.geojson")
CIVIC_INCOME_PARCELS_R    <- path_data("civic_income_parcels_r.csv")
CIVIC_INCOME_COMPARISON_R <- path_data("civic_income_comparison_r.geojson")

# Python outputs (for parity comparison)
CIVIC_COMBINED_PY <- path_data("civic_combined.geojson")

11.2 Redistribute income

Area-weight aggregate income and households onto civic associations with sdc.redistribute::redistribute_direct(), then derive mean income.

pipeline-r/redistribute_income.R

# Area-weight ACS income counts onto civic associations; derive mean income.
# Parallels pipeline/redistribute_income.py.
source("pipeline-r/config.R")
suppressPackageStartupMessages({
  library(sf)
  library(sdc.redistribute)
})

redistribute_income <- function(acs_path = ACS_COUNTS, civic_path = CIVIC_ASSOC) {
  bg <- sf::st_read(acs_path, quiet = TRUE)
  names(bg)[names(bg) == "GEOID"] <- "geoid"
  civ <- sf::st_read(civic_path, quiet = TRUE)

  bg  <- sf::st_transform(bg,  PROJECT_CRS)
  civ <- sf::st_transform(civ, PROJECT_CRS)

  # agg_income and households are extensive counts; redistribute both, then
  # derive mean income (an intensive quantity) as their ratio.
  out <- redistribute_direct(bg, civ, extensive = c("agg_income", "households"))
  out$mean_income <- ifelse(out$households > 0, out$agg_income / out$households, NA_real_)

  d <- sf::st_drop_geometry(out)
  d[, c("geoid", "agg_income", "households", "mean_income")]
}

run_income <- function() {
  out <- redistribute_income()
  write.csv(out, CIVIC_INCOME_R, row.names = FALSE)
  cat(sprintf("Wrote %d civic-association income rows -> %s\n", nrow(out), CIVIC_INCOME_R))
}

if (sys.nframe() == 0) run_income()

11.3 Redistribute broadband

Rebuild the speed×tests product, redistribute it with the test counts, then derive the count-weighted mean download and upload speeds.

pipeline-r/redistribute_broadband.R

# Area-weight Ookla broadband onto civic associations; derive count-weighted
# mean speeds. Parallels pipeline/redistribute_broadband.py.
source("pipeline-r/config.R")
suppressPackageStartupMessages({
  library(sf)
  library(sdc.redistribute)
})

redistribute_broadband <- function(tiles_path = OOKLA_TILES, civic_path = CIVIC_ASSOC) {
  tiles <- sf::st_read(tiles_path, quiet = TRUE)
  tiles <- tiles[tiles$tests >= MIN_TESTS, ]
  # Speed is intensive: rebuild the extensive speed x tests product so the
  # count-weighted mean can be recovered after redistribution.
  tiles$d_product <- tiles$avg_d_kbps * tiles$tests
  tiles$u_product <- tiles$avg_u_kbps * tiles$tests

  civ <- sf::st_read(civic_path, quiet = TRUE)
  tiles <- sf::st_transform(tiles, PROJECT_CRS)
  civ   <- sf::st_transform(civ,   PROJECT_CRS)

  out <- redistribute_direct(tiles, civ, extensive = c("tests", "d_product", "u_product"))
  valid <- out$tests > 0
  out$download_mbps <- ifelse(valid, (out$d_product / out$tests) / 1000, NA_real_)
  out$upload_mbps   <- ifelse(valid, (out$u_product / out$tests) / 1000, NA_real_)

  d <- sf::st_drop_geometry(out)
  d[, c("geoid", "tests", "download_mbps", "upload_mbps")]
}

run_broadband <- function() {
  out <- redistribute_broadband()
  write.csv(out, CIVIC_BROADBAND_R, row.names = FALSE)
  cat(sprintf("Wrote %d civic-association broadband rows -> %s\n", nrow(out), CIVIC_BROADBAND_R))
}

if (sys.nframe() == 0) run_broadband()

11.4 Combine and derive metrics

Join redistributed income and broadband onto the civic polygons and compute the income-to-speed ratio and bivariate income×speed classes.

pipeline-r/combine.R

# Join redistributed income + broadband onto civic polygons; derive metrics.
# Parallels pipeline/combine.py.
source("pipeline-r/config.R")
suppressPackageStartupMessages(library(sf))

tercile <- function(x) {
  # 1/2/3 labels by rank (robust to ties and NA), mirroring pandas qcut on ranks.
  r <- rank(x, ties.method = "first", na.last = "keep")
  br <- stats::quantile(r, probs = c(0, 1/3, 2/3, 1), na.rm = TRUE)
  cut(r, breaks = br, labels = c("1", "2", "3"), include.lowest = TRUE)
}

add_derived_metrics <- function(df) {
  df$income_speed_ratio <- ifelse(df$download_mbps > 0,
                                  df$mean_income / df$download_mbps, NA_real_)
  inc <- tercile(df$mean_income)
  spd <- tercile(df$download_mbps)
  df$bivariate_class <- paste0(as.character(inc), "-", as.character(spd))
  df
}

run_combine <- function() {
  civ <- sf::st_read(CIVIC_ASSOC, quiet = TRUE)[, c("geoid", "region_name")]
  income    <- read.csv(CIVIC_INCOME_R,    colClasses = c(geoid = "character"))
  broadband <- read.csv(CIVIC_BROADBAND_R, colClasses = c(geoid = "character"))
  civ$geoid <- as.character(civ$geoid)

  merged <- merge(civ, income, by = "geoid", all.x = TRUE)
  merged <- merge(merged, broadband, by = "geoid", all.x = TRUE)
  merged <- add_derived_metrics(merged)

  sf::st_write(merged, CIVIC_COMBINED_R, delete_dsn = TRUE, quiet = TRUE)
  cat(sprintf("Wrote %d combined civic-association rows -> %s\n",
              nrow(merged), CIVIC_COMBINED_R))
}

if (sys.nframe() == 0) run_combine()

11.5 Redistribute income through parcels

Dasymetric income via redistribute_parcels(), weighting parcels by their unit count (the second approach).

pipeline-r/redistribute_income_parcels.R

# Dasymetric income redistribution through unit-weighted parcels.
# Parallels pipeline/redistribute_income_parcels.py: Python replicates each
# parcel into Total_Units identical points; here we pass Total_Units as the
# per-point weight, which splits each source value in the same proportion.
source("pipeline-r/config.R")
suppressPackageStartupMessages({
  library(sf)
  library(sdc.redistribute)
})

redistribute_income_parcels <- function(acs_path = ACS_COUNTS,
                                        parcels_path = PARCELS,
                                        civic_path = CIVIC_ASSOC) {
  bg <- sf::st_read(acs_path, quiet = TRUE)
  names(bg)[names(bg) == "GEOID"] <- "geoid"
  parcels <- sf::st_read(parcels_path, quiet = TRUE)
  parcels <- parcels[parcels$Total_Units > 0, ]
  civ <- sf::st_read(civic_path, quiet = TRUE)

  bg      <- sf::st_transform(bg,      PROJECT_CRS)
  parcels <- sf::st_transform(parcels, PROJECT_CRS)
  civ     <- sf::st_transform(civ,     PROJECT_CRS)

  out <- redistribute_parcels(bg, civ, parcels,
                              extensive = c("agg_income", "households"),
                              weights = "Total_Units")
  out$mean_income <- ifelse(out$households > 0, out$agg_income / out$households, NA_real_)

  d <- sf::st_drop_geometry(out)
  d[, c("geoid", "agg_income", "households", "mean_income")]
}

run_income_parcels <- function() {
  out <- redistribute_income_parcels()
  write.csv(out, CIVIC_INCOME_PARCELS_R, row.names = FALSE)
  cat(sprintf("Wrote %d parcel-weighted income rows -> %s\n",
              nrow(out), CIVIC_INCOME_PARCELS_R))
}

if (sys.nframe() == 0) run_income_parcels()

11.6 Compare methods

Join the area-weighted and parcel-weighted income and quantify the difference per civic association.

pipeline-r/compare_methods.R

# Compare area-weighted vs parcel-weighted income per civic association.
# Parallels pipeline/compare_methods.py.
source("pipeline-r/config.R")
suppressPackageStartupMessages(library(sf))

compare_income <- function(area, parcel) {
  a <- data.frame(geoid = as.character(area$geoid),
                  mean_income_area = area$mean_income, stringsAsFactors = FALSE)
  p <- data.frame(geoid = as.character(parcel$geoid),
                  mean_income_parcel = parcel$mean_income, stringsAsFactors = FALSE)
  out <- merge(a, p, by = "geoid", all = TRUE)
  out$diff <- out$mean_income_parcel - out$mean_income_area
  out$pct_diff <- ifelse(out$mean_income_area > 0,
                         100 * out$diff / out$mean_income_area, NA_real_)
  out
}

run_compare <- function() {
  civ    <- sf::st_read(CIVIC_ASSOC, quiet = TRUE)[, c("geoid", "region_name")]
  civ$geoid <- as.character(civ$geoid)
  area   <- read.csv(CIVIC_INCOME_R,         colClasses = c(geoid = "character"))
  parcel <- read.csv(CIVIC_INCOME_PARCELS_R, colClasses = c(geoid = "character"))
  cmp <- compare_income(area, parcel)
  out <- merge(civ, cmp, by = "geoid", all.x = TRUE)
  sf::st_write(out, CIVIC_INCOME_COMPARISON_R, delete_dsn = TRUE, quiet = TRUE)
  cat(sprintf("Wrote %d comparison rows -> %s\n", nrow(out), CIVIC_INCOME_COMPARISON_R))
}

if (sys.nframe() == 0) run_compare()

11.7 Validate

Confirm household totals are preserved and that no speeds or incomes are negative.

pipeline-r/validate.R

# Integrity checks on redistributed outputs. Parallels pipeline/validate.py.
source("pipeline-r/config.R")
suppressPackageStartupMessages(library(sf))

check_total_preserved <- function(source_total, target_total, tol = 0.01) {
  if (source_total == 0) return(invisible(TRUE))
  rel <- abs(target_total - source_total) / abs(source_total)
  if (rel > tol) {
    stop(sprintf("total drift %.3f%% exceeds %.3f%%", 100 * rel, 100 * tol), call. = FALSE)
  }
  invisible(TRUE)
}

check_ranges <- function(df) {
  for (col in c("download_mbps", "upload_mbps", "mean_income", "households")) {
    if (col %in% names(df)) {
      vals <- df[[col]][!is.na(df[[col]])]
      if (any(vals < 0)) stop(sprintf("%s has negative values", col), call. = FALSE)
    }
  }
  invisible(TRUE)
}

run_validate <- function() {
  combined <- sf::st_drop_geometry(sf::st_read(CIVIC_COMBINED_R, quiet = TRUE))
  acs <- sf::st_drop_geometry(sf::st_read(ACS_COUNTS, quiet = TRUE))
  check_total_preserved(sum(acs$households), sum(combined$households, na.rm = TRUE), tol = 0.02)
  check_ranges(combined)
  cat("Validation passed.\n")
}

if (sys.nframe() == 0) run_validate()

11.8 Run the R pipeline

Run the redistribute/combine/validate stages end to end: Rscript pipeline-r/run.R.

pipeline-r/run.R

# Run the runnable R stages end to end (acquisition is a recipe; see
# pipeline-r/acquire-recipe.R). Parallels pipeline/run.py.
# Usage from the repository root: Rscript pipeline-r/run.R
source("pipeline-r/redistribute_income.R")
source("pipeline-r/redistribute_broadband.R")
source("pipeline-r/combine.R")
source("pipeline-r/redistribute_income_parcels.R")
source("pipeline-r/compare_methods.R")
source("pipeline-r/validate.R")

run_income()
run_broadband()
run_combine()
run_income_parcels()
run_compare()
run_validate()
cat("R pipeline complete.\n")

11.9 Acquisition recipe

How the same inputs would be produced in R (tidycensus/tigris/ooklaOpenDataR). Shown for reference; not executed by the pipeline.

pipeline-r/acquire-recipe.R

# Data-acquisition recipe (NOT executed by run.R). Shows how the same
# intermediate inputs the R pipeline reads would be produced in R, paralleling
# the Python acquire_*.py modules. Requires a Census API key and network access.
#
# install.packages(c("tidycensus", "tigris", "sf"))
# # ooklaOpenDataR is on GitHub: remotes::install_github("teamookla/ooklaOpenDataR")

if (FALSE) {  # recipe guard: never runs

  library(tidycensus)
  library(tigris)
  library(sf)

  # 1. Census block-group geometries for Arlington County, VA (FIPS 51013),
  #    2021 vintage. Parallels acquire_geographies.py.
  bg_geo <- tigris::block_groups(state = "51", county = "013", year = 2021, cb = FALSE)

  # 2. ACS extensive counts: aggregate household income (B19025_001) and
  #    households (B11001_001). Parallels acquire_acs.py. NEVER fetch the median.
  acs <- tidycensus::get_acs(
    geography = "block group", state = "51", county = "013",
    year = 2021, survey = "acs5", geometry = TRUE,
    variables = c(agg_income = "B19025_001", households = "B11001_001"),
    output = "wide"
  )
  # acs has agg_incomeE / householdsE columns; rename to agg_income / households.

  # 3. Ookla fixed-broadband tiles, clipped to Arlington. Parallels acquire_ookla.py.
  #    ooklaOpenDataR::get_performance_tiles(service = "fixed", year = 2023,
  #      quarter = 2) returns the global tile set; clip to the county bbox.

  # 4. Parcels: county MHUD parcel points carrying Total_Units. Parallels
  #    acquire_parcels.py (an ArcGIS FeatureServer; read via arcgislayers or sf).
}

11.10 Parity with the Python pipeline

Confirm the R outputs match the Python pipeline’s mean income and download speed within tolerance.

pipeline-r/compare_to_python.R

# Parity check: confirm the R combined output matches the Python combined output
# (mean_income and download_mbps) within tolerance. Small differences are
# expected from GEOS/sf vs shapely/geopandas geometry handling.
source("pipeline-r/config.R")
suppressPackageStartupMessages(library(sf))

compare_to_python <- function(tol = 0.02) {
  r  <- sf::st_drop_geometry(sf::st_read(CIVIC_COMBINED_R, quiet = TRUE))
  py <- sf::st_drop_geometry(sf::st_read(CIVIC_COMBINED_PY, quiet = TRUE))
  r$geoid  <- as.character(r$geoid)
  py$geoid <- as.character(py$geoid)
  m <- merge(r, py, by = "geoid", suffixes = c("_r", "_py"))

  rel <- function(a, b) {
    ok <- is.finite(a) & is.finite(b) & b != 0
    max(abs(a[ok] - b[ok]) / abs(b[ok]))
  }
  d_income <- rel(m$mean_income_r,   m$mean_income_py)
  d_speed  <- rel(m$download_mbps_r, m$download_mbps_py)

  cat(sprintf("Parity vs Python (n=%d): max rel diff  mean_income=%.4f%%  download_mbps=%.4f%%\n",
              nrow(m), 100 * d_income, 100 * d_speed))
  if (d_income > tol || d_speed > tol) {
    stop(sprintf("parity exceeds tolerance %.2f%%", 100 * tol), call. = FALSE)
  }
  cat(sprintf("Parity OK (within %.2f%%).\n", 100 * tol))
  invisible(TRUE)
}

if (sys.nframe() == 0) compare_to_python()