11 The Complete Pipeline Code (R)
This appendix reproduces the runnable R counterpart to the worked example. It is generated directly from the pipeline-r/ directory in the companion repository. The R pipeline reads the same intermediate data the Python pipeline produces and performs the redistribution, combination, and validation in R, built on the sdc.redistribute package:
# install the released package from CRAN:
# install.packages("sdc.redistribute")
# or the development version from the repository:
# install.packages("pak"); pak::pak("dads2busy/sdc.redistribute")
# from the repository root:
# Rscript pipeline-r/run.RData acquisition is shown as a recipe (acquire-recipe.R) rather than re-run. The compare_to_python.R module confirms the R results match the Python results.
11.1 Configuration
Shared paths, the projected CRS used for area weighting, and the count constants. Mirrors the Python configuration.
pipeline-r/config.R
# Shared configuration for the R pipeline. Mirrors pipeline/config.py.
# Run all modules from the repository root (e.g. `Rscript pipeline-r/run.R`).
PROJECT_CRS <- 3857 # match the CRS sdc-redistribute uses internally in Python
ACS_YEAR <- 2021
OOKLA_YEAR <- 2023
MIN_TESTS <- 5 # statistical-reliability filter on source tiles
DATA <- "data"
path_data <- function(...) file.path(DATA, ...)
# Inputs (shared with the Python pipeline)
CIVIC_ASSOC <- path_data("va013_geo_arl_2021_civic_associations.geojson")
ACS_COUNTS <- path_data("va013_acs_income_counts.geojson")
OOKLA_TILES <- path_data("ookla_tiles_arlington.geojson")
PARCELS <- path_data("va013_parcels.geojson")
# R outputs (suffixed _r so they never clobber the Python outputs)
CIVIC_INCOME_R <- path_data("civic_income_r.csv")
CIVIC_BROADBAND_R <- path_data("civic_broadband_r.csv")
CIVIC_COMBINED_R <- path_data("civic_combined_r.geojson")
CIVIC_INCOME_PARCELS_R <- path_data("civic_income_parcels_r.csv")
CIVIC_INCOME_COMPARISON_R <- path_data("civic_income_comparison_r.geojson")
# Python outputs (for parity comparison)
CIVIC_COMBINED_PY <- path_data("civic_combined.geojson")11.2 Redistribute income
Area-weight aggregate income and households onto civic associations with sdc.redistribute::redistribute_direct(), then derive mean income.
pipeline-r/redistribute_income.R
# Area-weight ACS income counts onto civic associations; derive mean income.
# Parallels pipeline/redistribute_income.py.
source("pipeline-r/config.R")
suppressPackageStartupMessages({
library(sf)
library(sdc.redistribute)
})
redistribute_income <- function(acs_path = ACS_COUNTS, civic_path = CIVIC_ASSOC) {
bg <- sf::st_read(acs_path, quiet = TRUE)
names(bg)[names(bg) == "GEOID"] <- "geoid"
civ <- sf::st_read(civic_path, quiet = TRUE)
bg <- sf::st_transform(bg, PROJECT_CRS)
civ <- sf::st_transform(civ, PROJECT_CRS)
# agg_income and households are extensive counts; redistribute both, then
# derive mean income (an intensive quantity) as their ratio.
out <- redistribute_direct(bg, civ, extensive = c("agg_income", "households"))
out$mean_income <- ifelse(out$households > 0, out$agg_income / out$households, NA_real_)
d <- sf::st_drop_geometry(out)
d[, c("geoid", "agg_income", "households", "mean_income")]
}
run_income <- function() {
out <- redistribute_income()
write.csv(out, CIVIC_INCOME_R, row.names = FALSE)
cat(sprintf("Wrote %d civic-association income rows -> %s\n", nrow(out), CIVIC_INCOME_R))
}
if (sys.nframe() == 0) run_income()11.3 Redistribute broadband
Rebuild the speed×tests product, redistribute it with the test counts, then derive the count-weighted mean download and upload speeds.
pipeline-r/redistribute_broadband.R
# Area-weight Ookla broadband onto civic associations; derive count-weighted
# mean speeds. Parallels pipeline/redistribute_broadband.py.
source("pipeline-r/config.R")
suppressPackageStartupMessages({
library(sf)
library(sdc.redistribute)
})
redistribute_broadband <- function(tiles_path = OOKLA_TILES, civic_path = CIVIC_ASSOC) {
tiles <- sf::st_read(tiles_path, quiet = TRUE)
tiles <- tiles[tiles$tests >= MIN_TESTS, ]
# Speed is intensive: rebuild the extensive speed x tests product so the
# count-weighted mean can be recovered after redistribution.
tiles$d_product <- tiles$avg_d_kbps * tiles$tests
tiles$u_product <- tiles$avg_u_kbps * tiles$tests
civ <- sf::st_read(civic_path, quiet = TRUE)
tiles <- sf::st_transform(tiles, PROJECT_CRS)
civ <- sf::st_transform(civ, PROJECT_CRS)
out <- redistribute_direct(tiles, civ, extensive = c("tests", "d_product", "u_product"))
valid <- out$tests > 0
out$download_mbps <- ifelse(valid, (out$d_product / out$tests) / 1000, NA_real_)
out$upload_mbps <- ifelse(valid, (out$u_product / out$tests) / 1000, NA_real_)
d <- sf::st_drop_geometry(out)
d[, c("geoid", "tests", "download_mbps", "upload_mbps")]
}
run_broadband <- function() {
out <- redistribute_broadband()
write.csv(out, CIVIC_BROADBAND_R, row.names = FALSE)
cat(sprintf("Wrote %d civic-association broadband rows -> %s\n", nrow(out), CIVIC_BROADBAND_R))
}
if (sys.nframe() == 0) run_broadband()11.4 Combine and derive metrics
Join redistributed income and broadband onto the civic polygons and compute the income-to-speed ratio and bivariate income×speed classes.
pipeline-r/combine.R
# Join redistributed income + broadband onto civic polygons; derive metrics.
# Parallels pipeline/combine.py.
source("pipeline-r/config.R")
suppressPackageStartupMessages(library(sf))
tercile <- function(x) {
# 1/2/3 labels by rank (robust to ties and NA), mirroring pandas qcut on ranks.
r <- rank(x, ties.method = "first", na.last = "keep")
br <- stats::quantile(r, probs = c(0, 1/3, 2/3, 1), na.rm = TRUE)
cut(r, breaks = br, labels = c("1", "2", "3"), include.lowest = TRUE)
}
add_derived_metrics <- function(df) {
df$income_speed_ratio <- ifelse(df$download_mbps > 0,
df$mean_income / df$download_mbps, NA_real_)
inc <- tercile(df$mean_income)
spd <- tercile(df$download_mbps)
df$bivariate_class <- paste0(as.character(inc), "-", as.character(spd))
df
}
run_combine <- function() {
civ <- sf::st_read(CIVIC_ASSOC, quiet = TRUE)[, c("geoid", "region_name")]
income <- read.csv(CIVIC_INCOME_R, colClasses = c(geoid = "character"))
broadband <- read.csv(CIVIC_BROADBAND_R, colClasses = c(geoid = "character"))
civ$geoid <- as.character(civ$geoid)
merged <- merge(civ, income, by = "geoid", all.x = TRUE)
merged <- merge(merged, broadband, by = "geoid", all.x = TRUE)
merged <- add_derived_metrics(merged)
sf::st_write(merged, CIVIC_COMBINED_R, delete_dsn = TRUE, quiet = TRUE)
cat(sprintf("Wrote %d combined civic-association rows -> %s\n",
nrow(merged), CIVIC_COMBINED_R))
}
if (sys.nframe() == 0) run_combine()11.5 Redistribute income through parcels
Dasymetric income via redistribute_parcels(), weighting parcels by their unit count (the second approach).
pipeline-r/redistribute_income_parcels.R
# Dasymetric income redistribution through unit-weighted parcels.
# Parallels pipeline/redistribute_income_parcels.py: Python replicates each
# parcel into Total_Units identical points; here we pass Total_Units as the
# per-point weight, which splits each source value in the same proportion.
source("pipeline-r/config.R")
suppressPackageStartupMessages({
library(sf)
library(sdc.redistribute)
})
redistribute_income_parcels <- function(acs_path = ACS_COUNTS,
parcels_path = PARCELS,
civic_path = CIVIC_ASSOC) {
bg <- sf::st_read(acs_path, quiet = TRUE)
names(bg)[names(bg) == "GEOID"] <- "geoid"
parcels <- sf::st_read(parcels_path, quiet = TRUE)
parcels <- parcels[parcels$Total_Units > 0, ]
civ <- sf::st_read(civic_path, quiet = TRUE)
bg <- sf::st_transform(bg, PROJECT_CRS)
parcels <- sf::st_transform(parcels, PROJECT_CRS)
civ <- sf::st_transform(civ, PROJECT_CRS)
out <- redistribute_parcels(bg, civ, parcels,
extensive = c("agg_income", "households"),
weights = "Total_Units")
out$mean_income <- ifelse(out$households > 0, out$agg_income / out$households, NA_real_)
d <- sf::st_drop_geometry(out)
d[, c("geoid", "agg_income", "households", "mean_income")]
}
run_income_parcels <- function() {
out <- redistribute_income_parcels()
write.csv(out, CIVIC_INCOME_PARCELS_R, row.names = FALSE)
cat(sprintf("Wrote %d parcel-weighted income rows -> %s\n",
nrow(out), CIVIC_INCOME_PARCELS_R))
}
if (sys.nframe() == 0) run_income_parcels()11.6 Compare methods
Join the area-weighted and parcel-weighted income and quantify the difference per civic association.
pipeline-r/compare_methods.R
# Compare area-weighted vs parcel-weighted income per civic association.
# Parallels pipeline/compare_methods.py.
source("pipeline-r/config.R")
suppressPackageStartupMessages(library(sf))
compare_income <- function(area, parcel) {
a <- data.frame(geoid = as.character(area$geoid),
mean_income_area = area$mean_income, stringsAsFactors = FALSE)
p <- data.frame(geoid = as.character(parcel$geoid),
mean_income_parcel = parcel$mean_income, stringsAsFactors = FALSE)
out <- merge(a, p, by = "geoid", all = TRUE)
out$diff <- out$mean_income_parcel - out$mean_income_area
out$pct_diff <- ifelse(out$mean_income_area > 0,
100 * out$diff / out$mean_income_area, NA_real_)
out
}
run_compare <- function() {
civ <- sf::st_read(CIVIC_ASSOC, quiet = TRUE)[, c("geoid", "region_name")]
civ$geoid <- as.character(civ$geoid)
area <- read.csv(CIVIC_INCOME_R, colClasses = c(geoid = "character"))
parcel <- read.csv(CIVIC_INCOME_PARCELS_R, colClasses = c(geoid = "character"))
cmp <- compare_income(area, parcel)
out <- merge(civ, cmp, by = "geoid", all.x = TRUE)
sf::st_write(out, CIVIC_INCOME_COMPARISON_R, delete_dsn = TRUE, quiet = TRUE)
cat(sprintf("Wrote %d comparison rows -> %s\n", nrow(out), CIVIC_INCOME_COMPARISON_R))
}
if (sys.nframe() == 0) run_compare()11.7 Validate
Confirm household totals are preserved and that no speeds or incomes are negative.
pipeline-r/validate.R
# Integrity checks on redistributed outputs. Parallels pipeline/validate.py.
source("pipeline-r/config.R")
suppressPackageStartupMessages(library(sf))
check_total_preserved <- function(source_total, target_total, tol = 0.01) {
if (source_total == 0) return(invisible(TRUE))
rel <- abs(target_total - source_total) / abs(source_total)
if (rel > tol) {
stop(sprintf("total drift %.3f%% exceeds %.3f%%", 100 * rel, 100 * tol), call. = FALSE)
}
invisible(TRUE)
}
check_ranges <- function(df) {
for (col in c("download_mbps", "upload_mbps", "mean_income", "households")) {
if (col %in% names(df)) {
vals <- df[[col]][!is.na(df[[col]])]
if (any(vals < 0)) stop(sprintf("%s has negative values", col), call. = FALSE)
}
}
invisible(TRUE)
}
run_validate <- function() {
combined <- sf::st_drop_geometry(sf::st_read(CIVIC_COMBINED_R, quiet = TRUE))
acs <- sf::st_drop_geometry(sf::st_read(ACS_COUNTS, quiet = TRUE))
check_total_preserved(sum(acs$households), sum(combined$households, na.rm = TRUE), tol = 0.02)
check_ranges(combined)
cat("Validation passed.\n")
}
if (sys.nframe() == 0) run_validate()11.8 Run the R pipeline
Run the redistribute/combine/validate stages end to end: Rscript pipeline-r/run.R.
pipeline-r/run.R
# Run the runnable R stages end to end (acquisition is a recipe; see
# pipeline-r/acquire-recipe.R). Parallels pipeline/run.py.
# Usage from the repository root: Rscript pipeline-r/run.R
source("pipeline-r/redistribute_income.R")
source("pipeline-r/redistribute_broadband.R")
source("pipeline-r/combine.R")
source("pipeline-r/redistribute_income_parcels.R")
source("pipeline-r/compare_methods.R")
source("pipeline-r/validate.R")
run_income()
run_broadband()
run_combine()
run_income_parcels()
run_compare()
run_validate()
cat("R pipeline complete.\n")11.9 Acquisition recipe
How the same inputs would be produced in R (tidycensus/tigris/ooklaOpenDataR). Shown for reference; not executed by the pipeline.
pipeline-r/acquire-recipe.R
# Data-acquisition recipe (NOT executed by run.R). Shows how the same
# intermediate inputs the R pipeline reads would be produced in R, paralleling
# the Python acquire_*.py modules. Requires a Census API key and network access.
#
# install.packages(c("tidycensus", "tigris", "sf"))
# # ooklaOpenDataR is on GitHub: remotes::install_github("teamookla/ooklaOpenDataR")
if (FALSE) { # recipe guard: never runs
library(tidycensus)
library(tigris)
library(sf)
# 1. Census block-group geometries for Arlington County, VA (FIPS 51013),
# 2021 vintage. Parallels acquire_geographies.py.
bg_geo <- tigris::block_groups(state = "51", county = "013", year = 2021, cb = FALSE)
# 2. ACS extensive counts: aggregate household income (B19025_001) and
# households (B11001_001). Parallels acquire_acs.py. NEVER fetch the median.
acs <- tidycensus::get_acs(
geography = "block group", state = "51", county = "013",
year = 2021, survey = "acs5", geometry = TRUE,
variables = c(agg_income = "B19025_001", households = "B11001_001"),
output = "wide"
)
# acs has agg_incomeE / householdsE columns; rename to agg_income / households.
# 3. Ookla fixed-broadband tiles, clipped to Arlington. Parallels acquire_ookla.py.
# ooklaOpenDataR::get_performance_tiles(service = "fixed", year = 2023,
# quarter = 2) returns the global tile set; clip to the county bbox.
# 4. Parcels: county MHUD parcel points carrying Total_Units. Parallels
# acquire_parcels.py (an ArcGIS FeatureServer; read via arcgislayers or sf).
}11.10 Parity with the Python pipeline
Confirm the R outputs match the Python pipeline’s mean income and download speed within tolerance.
pipeline-r/compare_to_python.R
# Parity check: confirm the R combined output matches the Python combined output
# (mean_income and download_mbps) within tolerance. Small differences are
# expected from GEOS/sf vs shapely/geopandas geometry handling.
source("pipeline-r/config.R")
suppressPackageStartupMessages(library(sf))
compare_to_python <- function(tol = 0.02) {
r <- sf::st_drop_geometry(sf::st_read(CIVIC_COMBINED_R, quiet = TRUE))
py <- sf::st_drop_geometry(sf::st_read(CIVIC_COMBINED_PY, quiet = TRUE))
r$geoid <- as.character(r$geoid)
py$geoid <- as.character(py$geoid)
m <- merge(r, py, by = "geoid", suffixes = c("_r", "_py"))
rel <- function(a, b) {
ok <- is.finite(a) & is.finite(b) & b != 0
max(abs(a[ok] - b[ok]) / abs(b[ok]))
}
d_income <- rel(m$mean_income_r, m$mean_income_py)
d_speed <- rel(m$download_mbps_r, m$download_mbps_py)
cat(sprintf("Parity vs Python (n=%d): max rel diff mean_income=%.4f%% download_mbps=%.4f%%\n",
nrow(m), 100 * d_income, 100 * d_speed))
if (d_income > tol || d_speed > tol) {
stop(sprintf("parity exceeds tolerance %.2f%%", 100 * tol), call. = FALSE)
}
cat(sprintf("Parity OK (within %.2f%%).\n", 100 * tol))
invisible(TRUE)
}
if (sys.nframe() == 0) compare_to_python()