Source code for prefgraph.contrib.additive

"""Additive separability test for preferences.

Tests whether utility has the additive form U(x) = Σ u_i(x_i),
which is stronger than quasilinearity. Based on Chapter 9.3 of
Chambers & Echenique (2016) "Revealed Preference Theory".

Additive preferences imply:
1. No cross-effects: ∂x_i/∂p_j = 0 for i≠j (holding income constant)
2. Each good can be priced independently

This module provides theoretically rigorous estimation methods:
- OLS/2SLS regression for cross-price elasticity estimation
- LP-based cyclic monotonicity test for additive utility
- Bootstrap confidence intervals for cross-effects

Tech-Friendly Names (Primary):
    - test_additive_separability(): Test for additive utility
    - identify_additive_groups(): Find additively separable groups
    - check_no_cross_effects(): Test for zero cross-price effects
    - test_additivity_lp(): LP-based cyclic monotonicity test
    - compute_cross_effects_regression(): Regression-based estimation

Economics Names (Legacy Aliases):
    - check_additivity() -> test_additive_separability()

References:
    Chambers & Echenique (2016), Chapter 9.3
    Gorman, W.M. (1968), "The Structure of Utility Functions"
    Rockafellar (1970), "Convex Analysis" (cyclic monotonicity)
"""

from __future__ import annotations

import time
import warnings
from typing import TYPE_CHECKING

import numpy as np
from numpy.typing import NDArray
from scipy.optimize import linprog
from scipy import stats

from prefgraph.core.result import AdditivityResult
from prefgraph.core.exceptions import RegressionError, SolverError

if TYPE_CHECKING:
    from prefgraph.core.session import BehaviorLog


[docs] def test_additive_separability( log: "BehaviorLog", cross_effect_threshold: float = 0.1, price_change_threshold: float = 0.05, ) -> AdditivityResult: """ Test if preferences are additively separable. Additive separability means U(x) = Σ u_i(x_i), implying: - Each good's marginal utility depends only on its own quantity - Cross-price effects (holding income constant) are zero - Goods can be priced independently This is a stronger condition than weak separability and quasilinearity. Args: log: BehaviorLog with prices and quantities cross_effect_threshold: Threshold for cross-effect significance price_change_threshold: Minimum price change to consider Returns: AdditivityResult with separability analysis Example: >>> from prefgraph import BehaviorLog, test_additive_separability >>> result = test_additive_separability(user_log) >>> if result.is_additive: ... print("Goods can be priced independently") >>> else: ... print(f"Cross-effects found: {result.violations}") References: Chambers & Echenique (2016), Chapter 9.3 Gorman, W.M. (1968). "The Structure of Utility Functions" """ start_time = time.perf_counter() N = log.num_features # Compute cross-price effects matrix cross_effects_matrix = _compute_cross_effects_matrix( log, price_change_threshold ) # Find violations (significant off-diagonal effects) violations = [] max_cross_effect = 0.0 for i in range(N): for j in range(N): if i != j: effect = abs(cross_effects_matrix[i, j]) if effect > max_cross_effect: max_cross_effect = effect if effect > cross_effect_threshold: violations.append((i, j)) # Determine additive groups using connected components additive_groups = identify_additive_groups( cross_effects_matrix, cross_effect_threshold ) # Is fully additive if no significant cross-effects is_additive = len(violations) == 0 computation_time = (time.perf_counter() - start_time) * 1000 return AdditivityResult( is_additive=is_additive, additive_groups=additive_groups, cross_effects_matrix=cross_effects_matrix, max_cross_effect=max_cross_effect, violations=violations, num_violations=len(violations), computation_time_ms=computation_time, )
[docs] def identify_additive_groups( cross_effects_matrix: NDArray[np.float64], threshold: float = 0.1, ) -> list[set[int]]: """ Identify groups of goods that are additively separable from each other. Uses connected components: goods i and j are in the same group if there's a significant cross-effect between them (directly or transitively). Args: cross_effects_matrix: N x N matrix of cross-price effects threshold: Threshold for significant cross-effect Returns: List of sets, each containing good indices in a separable group Example: >>> groups = identify_additive_groups(cross_effects) >>> # If groups = [{0, 1}, {2, 3, 4}], then goods 0-1 and 2-4 >>> # can be priced independently from each other """ N = cross_effects_matrix.shape[0] # Build adjacency based on cross-effects adjacency = np.abs(cross_effects_matrix) > threshold np.fill_diagonal(adjacency, False) # Find connected components using union-find parent = list(range(N)) def find(x: int) -> int: if parent[x] != x: parent[x] = find(parent[x]) return parent[x] def union(x: int, y: int) -> None: px, py = find(x), find(y) if px != py: parent[px] = py # Union goods with cross-effects for i in range(N): for j in range(i + 1, N): if adjacency[i, j] or adjacency[j, i]: union(i, j) # Collect groups groups_dict: dict[int, set[int]] = {} for i in range(N): root = find(i) if root not in groups_dict: groups_dict[root] = set() groups_dict[root].add(i) return list(groups_dict.values())
[docs] def check_no_cross_effects( log: "BehaviorLog", good_i: int, good_j: int, price_change_threshold: float = 0.05, ) -> dict: """ Test if there are cross-price effects between two goods. For additive preferences, when p_j changes (other prices constant), x_i should not change (holding income constant). Args: log: BehaviorLog with prices and quantities good_i: Index of quantity good good_j: Index of price good price_change_threshold: Minimum price change to consider Returns: Dictionary with cross-effect analysis """ T = log.num_records N = log.num_features P = log.cost_vectors Q = log.action_vectors cross_effects = [] supporting_pairs = [] violating_pairs = [] for t1 in range(T): for t2 in range(t1 + 1, T): # Check if price of good j changed significantly dp_j = P[t2, good_j] - P[t1, good_j] rel_change_j = abs(dp_j) / max(P[t1, good_j], 1e-10) if rel_change_j < price_change_threshold: continue # Check if other prices (except j) are stable other_stable = True for k in range(N): if k != good_j: rel_change_k = abs(P[t2, k] - P[t1, k]) / max(P[t1, k], 1e-10) if rel_change_k > price_change_threshold * 0.5: other_stable = False break if not other_stable: continue # Compute cross-effect dq_i = Q[t2, good_i] - Q[t1, good_i] cross_effect = dq_i / dp_j if abs(dp_j) > 1e-10 else 0.0 cross_effects.append(cross_effect) # Income-adjusted cross-effect # Under additive utility, should be zero if abs(cross_effect) < 0.01: supporting_pairs.append((t1, t2)) else: violating_pairs.append((t1, t2)) return { "good_i": good_i, "good_j": good_j, "mean_cross_effect": np.mean(cross_effects) if cross_effects else 0.0, "std_cross_effect": np.std(cross_effects) if cross_effects else 0.0, "no_cross_effects": len(violating_pairs) == 0 and len(supporting_pairs) > 0, "supporting_pairs": supporting_pairs, "violating_pairs": violating_pairs, "num_observations": len(cross_effects), }
def _compute_cross_effects_matrix( log: "BehaviorLog", price_change_threshold: float = 0.05, method: str = "regression", ) -> NDArray[np.float64]: """ Compute N x N matrix of cross-price effects. Entry [i,j] is the effect of p_j change on x_i quantity (cross-price elasticity). This function provides multiple estimation methods: - "regression": OLS regression with log-linear demand (recommended) - "finite_diff": Legacy pairwise finite differences method Args: log: BehaviorLog with prices and quantities price_change_threshold: Minimum price change for finite_diff method method: Estimation method Returns: N x N matrix of cross-price effects """ if method == "regression": return compute_cross_effects_regression(log) elif method == "finite_diff": return _compute_cross_effects_finite_diff(log, price_change_threshold) else: raise ValueError(f"Unknown method: {method}. Use 'regression' or 'finite_diff'.") def compute_cross_effects_regression( log: "BehaviorLog", include_standard_errors: bool = False, ) -> NDArray[np.float64] | tuple[NDArray[np.float64], NDArray[np.float64]]: """ Compute cross-price effects using OLS regression. Estimates log-linear demand functions: log(x_i) = α_i + Σ_j β_ij * log(p_j) + γ_i * log(m) + ε The cross-price elasticity matrix is β (off-diagonal elements). For additive utility, β_ij should be zero for i ≠ j. Args: log: BehaviorLog with prices and quantities include_standard_errors: Whether to return standard errors Returns: N x N matrix of cross-price elasticities (off-diagonal = cross effects) If include_standard_errors=True, also returns N x N standard error matrix """ T = log.num_records N = log.num_features P = log.cost_vectors Q = log.action_vectors # Compute expenditures expenditures = log.total_spend # Take logs log_P = np.log(P + 1e-10) log_Q = np.log(Q + 1e-10) log_m = np.log(expenditures + 1e-10) # Build design matrix: [1, log(p_1), ..., log(p_N), log(m)] X = np.column_stack([np.ones(T), log_P, log_m]) # Estimate for each good beta_matrix = np.zeros((N, N)) # Cross-price elasticities se_matrix = np.zeros((N, N)) # Standard errors for i in range(N): y = log_Q[:, i] # OLS estimation try: XtX = X.T @ X XtX_inv = np.linalg.pinv(XtX) coeffs = XtX_inv @ (X.T @ y) # Extract price elasticities (indices 1 to N) beta_matrix[i, :] = coeffs[1 : N + 1] # Compute standard errors residuals = y - X @ coeffs sigma2 = np.sum(residuals**2) / (T - X.shape[1]) var_coeffs = sigma2 * np.diag(XtX_inv) se_matrix[i, :] = np.sqrt(var_coeffs[1 : N + 1]) except np.linalg.LinAlgError as e: raise RegressionError( f"OLS regression failed for good {i} in cross-effects estimation. " f"Design matrix may be singular. Original error: {e}" ) from e if include_standard_errors: return beta_matrix, se_matrix return beta_matrix def compute_cross_effects_2sls( log: "BehaviorLog", ) -> tuple[NDArray[np.float64], NDArray[np.float64], NDArray[np.float64]]: """ Compute cross-price effects using Two-Stage Least Squares (2SLS). Uses lagged prices as instruments to address potential price endogeneity. This is more robust when prices are set in response to demand. Stage 1: Predict log(p_j) using lagged prices Stage 2: Estimate demand using predicted prices Args: log: BehaviorLog with prices and quantities Returns: Tuple of (cross_effects_matrix, standard_errors, first_stage_f_stats) """ T = log.num_records N = log.num_features P = log.cost_vectors Q = log.action_vectors if T < 4: warnings.warn("Insufficient observations for 2SLS. Falling back to OLS.") beta, se = compute_cross_effects_regression(log, include_standard_errors=True) return beta, se, np.zeros(N) # Take logs log_P = np.log(P + 1e-10) log_Q = np.log(Q + 1e-10) expenditures = log.total_spend log_m = np.log(expenditures + 1e-10) # Create lagged prices as instruments (use t-1 prices for t observations) # Skip first observation log_P_current = log_P[1:] # t = 1, ..., T-1 log_P_lagged = log_P[:-1] # t = 0, ..., T-2 log_Q_current = log_Q[1:] log_m_current = log_m[1:] T_eff = T - 1 beta_matrix = np.zeros((N, N)) se_matrix = np.zeros((N, N)) first_stage_f = np.zeros(N) # Stage 1: Predict prices using lagged prices Z = np.column_stack([np.ones(T_eff), log_P_lagged, log_m_current]) # Instruments predicted_log_P = np.zeros_like(log_P_current) for j in range(N): y_price = log_P_current[:, j] try: ZtZ_inv = np.linalg.pinv(Z.T @ Z) gamma = ZtZ_inv @ (Z.T @ y_price) predicted_log_P[:, j] = Z @ gamma # First stage F-statistic residuals = y_price - Z @ gamma tss = np.sum((y_price - np.mean(y_price)) ** 2) rss = np.sum(residuals**2) r2 = 1 - rss / tss if tss > 0 else 0 k = Z.shape[1] first_stage_f[j] = (r2 / (1 - r2)) * (T_eff - k) / (k - 1) if r2 < 1 else 0 except np.linalg.LinAlgError as e: raise RegressionError( f"First stage 2SLS regression failed for price {j}. " f"Instrument matrix may be singular. Original error: {e}" ) from e # Stage 2: Estimate demand using predicted prices X_2sls = np.column_stack([np.ones(T_eff), predicted_log_P, log_m_current]) for i in range(N): y = log_Q_current[:, i] try: XtX_inv = np.linalg.pinv(X_2sls.T @ X_2sls) coeffs = XtX_inv @ (X_2sls.T @ y) beta_matrix[i, :] = coeffs[1 : N + 1] # 2SLS standard errors (using original X for variance) X_original = np.column_stack([np.ones(T_eff), log_P_current, log_m_current]) residuals = y - X_original @ coeffs sigma2 = np.sum(residuals**2) / (T_eff - X_2sls.shape[1]) var_coeffs = sigma2 * np.diag(XtX_inv) se_matrix[i, :] = np.sqrt(np.abs(var_coeffs[1 : N + 1])) except np.linalg.LinAlgError as e: raise RegressionError( f"Second stage 2SLS regression failed for good {i}. " f"Design matrix may be singular. Original error: {e}" ) from e return beta_matrix, se_matrix, first_stage_f def _compute_cross_effects_finite_diff( log: "BehaviorLog", price_change_threshold: float = 0.05, ) -> NDArray[np.float64]: """ Legacy finite differences method for cross-effects estimation. """ N = log.num_features T = log.num_records P = log.cost_vectors Q = log.action_vectors cross_effects = np.zeros((N, N), dtype=np.float64) for i in range(N): for j in range(N): effects = [] for t1 in range(T): for t2 in range(t1 + 1, T): dp_j = P[t2, j] - P[t1, j] if abs(dp_j) / max(P[t1, j], 1e-10) < price_change_threshold: continue mask = np.ones(N, dtype=bool) mask[j] = False other_change = np.sum( np.abs(P[t2, mask] - P[t1, mask]) / np.maximum(P[t1, mask], 1e-10) ) if other_change > price_change_threshold * N * 0.3: continue dq_i = Q[t2, i] - Q[t1, i] rel_dq_i = dq_i / Q[t1, i] if Q[t1, i] > 1e-10 else dq_i rel_dp_j = dp_j / P[t1, j] if abs(rel_dp_j) > 1e-10: effect = rel_dq_i / rel_dp_j effects.append(effect) if effects: cross_effects[i, j] = np.median(effects) return cross_effects def test_additive_consistency( log: "BehaviorLog", tolerance: float = 1e-6, ) -> dict: """ Test consistency with additive utility using cycle conditions. For additive utility, certain cycle conditions must hold. This is based on the cyclic monotonicity generalization. Args: log: BehaviorLog with prices and quantities tolerance: Numerical tolerance Returns: Dictionary with consistency analysis """ T = log.num_records N = log.num_features P = log.cost_vectors Q = log.action_vectors # Precompute expenditures (vectorized) expenditures = np.sum(P * Q, axis=1) # Shape: (T,) # For additive utility, we test using first-order conditions # The demand for each good depends only on its own price and income violations = [] for t1 in range(T): for t2 in range(T): if t1 == t2: continue # Check if any good violates additivity for i in range(N): # Under additivity: x_i(p_i, m) should be independent of p_j for j != i # Compare quantity of i across observations with same p_i if abs(P[t1, i] - P[t2, i]) < tolerance: # Same price for good i # Check if quantities differ significantly if abs(Q[t1, i] - Q[t2, i]) > tolerance: # Could be due to income effect or cross-price effect # Check if income is similar (use precomputed expenditures) m1 = expenditures[t1] m2 = expenditures[t2] if abs(m1 - m2) / max(m1, m2, 1e-10) < 0.1: # Income similar, but quantities differ # Must be cross-price effect violations.append((t1, t2, i)) return { "is_consistent": len(violations) == 0, "violations": violations, "num_violations": len(violations), } def test_additivity_lp( log: "BehaviorLog", tolerance: float = 1e-6, ) -> dict: """ Test for additive utility using LP-based cyclic monotonicity. For additive separability U(x) = Σ u_i(x_i), the subdifferential of each u_i must satisfy cyclic monotonicity (Rockafellar 1970): For any cycle of observations (k_0, k_1, ..., k_m, k_0), the condition: Σ p_{k_j}[i] * (x_{k_{j+1}}[i] - x_{k_j}[i]) ≤ 0 must hold for each good i separately. This formulates the test as an LP to check if additive utility functions exist that rationalize the data. Args: log: BehaviorLog with prices and quantities tolerance: Numerical tolerance for LP Returns: Dictionary with: - 'is_additive': Whether data is consistent with additive utility - 'violation_cycles': List of cycles that violate cyclic monotonicity - 'utility_values': Recovered u_i(x_i) values if consistent References: Rockafellar (1970), "Convex Analysis", Chapter 24 Chambers & Echenique (2016), Chapter 9.3 """ T = log.num_records N = log.num_features P = log.cost_vectors Q = log.action_vectors # For additive utility, we need to find u_i: R -> R for each good i # such that the first-order conditions hold. # # If x_t maximizes U(x) = Σ u_i(x_i) s.t. p_t @ x = m_t, # then u'_i(x_{ti}) = λ_t * p_{ti} for all i. # # For cyclic monotonicity of each u_i: # For any permutation π of {0, ..., T-1}: # Σ_t p_t[i] * (x_{π(t)}[i] - x_t[i]) ≤ 0 # # We use LP to check if this is satisfied for all 2-cycles. violation_cycles = [] is_additive_per_good = [] for i in range(N): # For good i, check 2-cycle monotonicity: # For all t1 < t2: p_t1[i] * (x_t2[i] - x_t1[i]) + p_t2[i] * (x_t1[i] - x_t2[i]) ≤ 0 # This simplifies to: (p_t1[i] - p_t2[i]) * (x_t2[i] - x_t1[i]) ≤ 0 # I.e., price and quantity move in opposite directions (law of demand for each good) violations_for_good = [] for t1 in range(T): for t2 in range(t1 + 1, T): dp = P[t1, i] - P[t2, i] dq = Q[t2, i] - Q[t1, i] # Check cyclic monotonicity condition if dp * dq > tolerance: violations_for_good.append((t1, t2, dp * dq)) is_additive_per_good.append(len(violations_for_good) == 0) if violations_for_good: violation_cycles.extend([(i, v) for v in violations_for_good]) # Overall additivity requires all goods to satisfy cyclic monotonicity is_additive = all(is_additive_per_good) # If consistent, try to recover utility values using LP utility_values = None if is_additive: utility_values = _recover_additive_utility_values(log, tolerance) return { "is_additive": is_additive, "is_additive_per_good": is_additive_per_good, "violation_cycles": violation_cycles, "num_violations": len(violation_cycles), "utility_values": utility_values, } def _recover_additive_utility_values( log: "BehaviorLog", tolerance: float = 1e-6, ) -> dict | None: """ Recover additive utility values u_i(x_{ti}) for each observation. Uses LP to find utility values consistent with the observed demands. For each good i, we need: u_i(x_{ti}) - u_i(x_{si}) ≤ p_{ti} * (x_{ti} - x_{si}) for all observations t, s where x_{ti} is revealed preferred to x_{si}. Args: log: BehaviorLog tolerance: Numerical tolerance Returns: Dictionary mapping good index to array of utility values, or None if infeasible """ T = log.num_records N = log.num_features P = log.cost_vectors Q = log.action_vectors utility_values = {} for i in range(N): # For good i, solve LP to find u_i(x_t[i]) for t = 0, ..., T-1 # Variables: U_0, U_1, ..., U_{T-1} # Constraints: U_t - U_s ≤ p_t[i] * (x_t[i] - x_s[i]) for all t, s # Number of variables n_vars = T # Build constraints A_ub = [] b_ub = [] for t in range(T): for s in range(T): if t == s: continue # Constraint: U_t - U_s ≤ p_t[i] * (x_t[i] - x_s[i]) row = np.zeros(n_vars) row[t] = 1.0 row[s] = -1.0 rhs = P[t, i] * (Q[t, i] - Q[s, i]) A_ub.append(row) b_ub.append(rhs) if not A_ub: utility_values[i] = np.zeros(T) continue A_ub = np.array(A_ub) b_ub = np.array(b_ub) # Objective: minimize sum of U_t (arbitrary, just need feasibility) c = np.ones(n_vars) # Bounds: U_t >= 0 bounds = [(0, None)] * n_vars try: result = linprog( c, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method="highs", ) if result.success: utility_values[i] = result.x else: raise SolverError( f"LP solver failed to recover additive utility for component {i}. " f"Status: {result.status}, Message: {result.message}" ) except SolverError: raise except Exception as e: raise SolverError( f"LP solver failed during additive utility recovery for component {i}. " f"Original error: {e}" ) from e return utility_values # ============================================================================= # LEGACY ALIASES # ============================================================================= check_additivity = test_additive_separability """Legacy alias: use test_additive_separability instead.""" find_additive_groups = identify_additive_groups """Legacy alias: use identify_additive_groups instead."""