Skip to content

Catchment

sdc_catchment

sdc-catchment: floating catchment area spatial accessibility.

catchment_connections

catchment_connections(cost, weight: WeightSpec = None, consumer_ids=None, provider_ids=None, **weight_kwargs) -> pd.DataFrame

Extract non-zero consumer-provider connections with weights and costs.

Source code in packages/sdc-catchment/src/sdc_catchment/catchment.py
def catchment_connections(
    cost,
    weight: WeightSpec = None,
    consumer_ids=None,
    provider_ids=None,
    **weight_kwargs,
) -> pd.DataFrame:
    """Extract non-zero consumer-provider connections with weights and costs."""
    W = catchment_weight(cost, weight, **weight_kwargs)
    cost_arr = _to_sparse(cost).toarray()
    n_rows, n_cols = W.shape
    if consumer_ids is None:
        consumer_ids = np.arange(n_rows)
    if provider_ids is None:
        provider_ids = np.arange(n_cols)
    W_coo = W.tocoo()
    rows = []
    for i, j, w in zip(W_coo.row, W_coo.col, W_coo.data):
        if w > 0:
            rows.append({"from_id": consumer_ids[i], "to_id": provider_ids[j], "weight": w, "cost": cost_arr[i, j]})
    return pd.DataFrame(rows, columns=["from_id", "to_id", "weight", "cost"])

catchment_network

catchment_network(connections: DataFrame, from_start=None, to_start=None) -> pd.DataFrame

Extract connected subgraph via breadth-first search.

Source code in packages/sdc-catchment/src/sdc_catchment/catchment.py
def catchment_network(
    connections: pd.DataFrame,
    from_start=None,
    to_start=None,
) -> pd.DataFrame:
    """Extract connected subgraph via breadth-first search."""
    froms: set = set()
    tos: set = set()
    if from_start is not None:
        froms.add(from_start)
    if to_start is not None:
        tos.add(to_start)
    if not froms and not tos:
        froms.add(connections["from_id"].iloc[0])

    while True:
        new_tos = set(connections[connections["from_id"].isin(froms)]["to_id"])
        new_froms = set(connections[connections["to_id"].isin(tos)]["from_id"])
        combined_froms = froms | new_froms
        combined_tos = tos | new_tos
        if combined_froms == froms and combined_tos == tos:
            break
        froms = combined_froms
        tos = combined_tos

    mask = connections["from_id"].isin(froms) & connections["to_id"].isin(tos)
    return connections[mask].reset_index(drop=True)

catchment_ratio

catchment_ratio(consumers: DataFrame, providers: DataFrame, cost, weight: WeightSpec = None, scale: float = 2.0, max_cost: float | None = None, normalize_weight: bool = False, adjust_consumers: Callable | None = None, adjust_providers: Callable | None = None, consumers_commutes=None, consumers_id: str = 'geoid', consumers_value: str = 'value', providers_id: str = 'geoid', providers_value: str = 'value', adjust_zeros: float | bool = 1e-06, return_type: str | int | float = 'original') -> pd.Series

Calculate provider-to-consumer ratios within floating catchment areas.

Source code in packages/sdc-catchment/src/sdc_catchment/catchment.py
def catchment_ratio(
    consumers: pd.DataFrame,
    providers: pd.DataFrame,
    cost,
    weight: WeightSpec = None,
    scale: float = 2.0,
    max_cost: float | None = None,
    normalize_weight: bool = False,
    adjust_consumers: Callable | None = None,
    adjust_providers: Callable | None = None,
    consumers_commutes=None,
    consumers_id: str = "geoid",
    consumers_value: str = "value",
    providers_id: str = "geoid",
    providers_value: str = "value",
    adjust_zeros: float | bool = 1e-6,
    return_type: str | int | float = "original",
) -> pd.Series:
    """Calculate provider-to-consumer ratios within floating catchment areas."""
    c_ids = consumers[consumers_id].values
    c_vals = consumers[consumers_value].values.astype(float)
    p_ids = providers[providers_id].values
    p_vals = providers[providers_value].values.astype(float)

    cost_sp = _to_sparse(cost)
    if cost_sp.shape[0] != len(consumers):
        raise ValueError(f"Cost matrix dimension mismatch: {cost_sp.shape[0]} rows but {len(consumers)} consumers")
    if cost_sp.shape[1] != len(providers):
        raise ValueError(f"Cost matrix dimension mismatch: {cost_sp.shape[1]} columns but {len(providers)} providers")

    W = catchment_weight(cost, weight, max_cost, scale, normalize_weight, adjust_zeros)

    if consumers_commutes is not None:
        W = _apply_commute_blending(W, consumers_commutes)

    # W_providers used in step 1 (provider demand), W_consumers used in step 2 (consumer access)
    W_consumers = W.toarray().copy()
    W_providers = W.toarray().copy()
    if adjust_consumers is not None:
        W_consumers = np.asarray(adjust_consumers(W_consumers))
    if adjust_providers is not None:
        W_providers = np.asarray(adjust_providers(W_providers))

    if return_type == "demand":
        demand = W_consumers.T @ c_vals
        return pd.Series(demand, index=p_ids)

    if return_type == "supply":
        supply = W_consumers @ p_vals
        return pd.Series(supply, index=c_ids)

    weighted_demand = W_providers.T @ c_vals
    weighted_demand = np.where(weighted_demand > 0, weighted_demand, np.inf)
    ratios = p_vals / weighted_demand
    access = W_consumers @ ratios

    if isinstance(return_type, (int, float)) and not isinstance(return_type, bool):
        access = access * float(return_type)
    elif return_type == "region":
        access = access * c_vals
    elif return_type == "normalized":
        a_min, a_max = access.min(), access.max()
        if a_max > a_min:
            access = (access - a_min) / (a_max - a_min)
        else:
            access = np.zeros_like(access)
    elif return_type != "original":
        raise ValueError(f"Unknown return_type: {return_type!r}")

    return pd.Series(access, index=c_ids)

catchment_weight

catchment_weight(cost, weight: WeightSpec = None, max_cost: float | None = None, scale: float = 2.0, normalize_weight: bool = False, adjust_zeros: float | bool = 1e-06) -> sparse.csc_matrix

Construct a weight matrix from a cost matrix using kernel decay functions.

Parameters:

Name Type Description Default
cost sparse matrix, ndarray, or DataFrame

Cost/distance matrix. Rows = consumers, columns = providers.

required
weight WeightSpec

None = use cost as weight. float = binary threshold (exclusive: cost < threshold). list of (distance, weight) tuples = stepped. str = kernel name. callable = custom function (cost_matrix) -> weight_matrix.

None
max_cost float or None

Zero out weights where cost exceeds this value.

None
scale float

Scale parameter for kernel functions.

2.0
normalize_weight bool

Apply 3SFCA selection probability: w * (w / rowsum). NOT simple row normalization.

False
adjust_zeros float or False

Replace zeros in cost with this value. Skipped when weight is None.

1e-06

Returns:

Type Description
csc_matrix
Source code in packages/sdc-catchment/src/sdc_catchment/catchment.py
def catchment_weight(
    cost,
    weight: WeightSpec = None,
    max_cost: float | None = None,
    scale: float = 2.0,
    normalize_weight: bool = False,
    adjust_zeros: float | bool = 1e-6,
) -> sparse.csc_matrix:
    """Construct a weight matrix from a cost matrix using kernel decay functions.

    Parameters
    ----------
    cost : sparse matrix, ndarray, or DataFrame
        Cost/distance matrix. Rows = consumers, columns = providers.
    weight : WeightSpec
        None = use cost as weight. float = binary threshold (exclusive: cost < threshold).
        list of (distance, weight) tuples = stepped. str = kernel name.
        callable = custom function (cost_matrix) -> weight_matrix.
    max_cost : float or None
        Zero out weights where cost exceeds this value.
    scale : float
        Scale parameter for kernel functions.
    normalize_weight : bool
        Apply 3SFCA selection probability: w * (w / rowsum). NOT simple row normalization.
    adjust_zeros : float or False
        Replace zeros in cost with this value. Skipped when weight is None.

    Returns
    -------
    scipy.sparse.csc_matrix
    """
    cost_sp = _to_sparse(cost)
    c = cost_sp.toarray().astype(float)

    if weight is None:
        w = c.copy()
    elif callable(weight) and not isinstance(weight, str):
        if adjust_zeros and isinstance(adjust_zeros, (int, float)):
            c = np.where((c == 0) & (c >= 0), adjust_zeros, c)
        w = np.asarray(weight(c), dtype=float)
    elif isinstance(weight, str):
        if weight not in KERNELS:
            raise ValueError(f"Unknown kernel '{weight}'. Choose from: {list(KERNELS)}")
        if adjust_zeros and isinstance(adjust_zeros, (int, float)):
            c = np.where((c == 0) & (c >= 0), adjust_zeros, c)
        w = KERNELS[weight](c, scale)
    elif isinstance(weight, (int, float)) and not isinstance(weight, bool):
        if adjust_zeros and isinstance(adjust_zeros, (int, float)):
            c = np.where((c == 0) & (c >= 0), adjust_zeros, c)
        w = np.where((c > 0) & (c < float(weight)), 1.0, 0.0)
    elif isinstance(weight, list):
        if adjust_zeros and isinstance(adjust_zeros, (int, float)):
            c = np.where((c == 0) & (c >= 0), adjust_zeros, c)
        steps = sorted(weight, key=lambda x: x[0])
        w = np.zeros_like(c)
        for dist, wt in steps:
            w = np.where((c > 0) & (c < dist) & (w == 0), wt, w)
    else:
        raise TypeError(f"Unsupported weight type: {type(weight)}")

    if max_cost is not None:
        cost_arr = cost_sp.toarray().astype(float)
        w[cost_arr > max_cost] = 0.0

    w[~np.isfinite(w)] = 0.0
    w[w < 0] = 0.0
    w[cost_sp.toarray() < 0] = 0.0

    if normalize_weight:
        row_sums = w.sum(axis=1, keepdims=True)
        row_sums[row_sums == 0] = 1.0
        w = w * (w / row_sums)

    return sparse.csc_matrix(w)

euclidean_cost

euclidean_cost(consumers_xy: ndarray, providers_xy: ndarray) -> np.ndarray

Compute Euclidean distance matrix between consumer and provider coordinates.

Parameters:

Name Type Description Default
consumers_xy ndarray of shape (n, 2)

Consumer coordinates (x, y).

required
providers_xy ndarray of shape (m, 2)

Provider coordinates (x, y).

required

Returns:

Type Description
ndarray of shape (n, m)

Pairwise Euclidean distances.

Source code in packages/sdc-catchment/src/sdc_catchment/catchment.py
def euclidean_cost(consumers_xy: np.ndarray, providers_xy: np.ndarray) -> np.ndarray:
    """Compute Euclidean distance matrix between consumer and provider coordinates.

    Parameters
    ----------
    consumers_xy : ndarray of shape (n, 2)
        Consumer coordinates (x, y).
    providers_xy : ndarray of shape (m, 2)
        Provider coordinates (x, y).

    Returns
    -------
    ndarray of shape (n, m)
        Pairwise Euclidean distances.
    """
    return cdist(np.asarray(consumers_xy), np.asarray(providers_xy), metric="euclidean")