"""Welfare analysis for consumer behavior.
Measures consumer welfare changes from policy or price changes using
compensating variation (CV) and equivalent variation (EV).
Based on Chapters 7.3-7.4 of Chambers & Echenique (2016).
This module provides multiple methods for CV/EV computation:
1. Exact methods using Afriat utility recovery and numerical optimization
2. Vartia path integral approximation
3. Bounds (Laspeyres/Paasche) as fallbacks
Tech-Friendly Names (Primary):
- compute_compensating_variation(): CV measure (uses exact method)
- compute_equivalent_variation(): EV measure (uses exact method)
- compute_cv_exact(): Exact CV via constrained optimization
- compute_ev_exact(): Exact EV via constrained optimization
- compute_cv_vartia(): CV via Vartia path integral
- compute_ev_vartia(): EV via Vartia path integral
- compute_cv_bounds(): Laspeyres bound for CV
- compute_ev_bounds(): Paasche bound for EV
- analyze_welfare_change(): Compare two scenarios
- recover_expenditure_function(): Proper expenditure function estimation
Economics Names (Legacy Aliases):
- compute_cv() -> compute_compensating_variation()
- compute_ev() -> compute_equivalent_variation()
References:
Chambers & Echenique (2016), Chapter 7.3-7.4
Vartia (1983), "Efficient Methods of Measuring Welfare Change"
Afriat (1967), "The Construction of Utility Functions from Expenditure Data"
"""
from __future__ import annotations
import time
import warnings
from typing import TYPE_CHECKING, Callable
import numpy as np
from numpy.typing import NDArray
from scipy.optimize import minimize, linprog
from scipy.integrate import quad_vec
from prefgraph.core.result import WelfareResult
from prefgraph.core.exceptions import SolverError, OptimizationError
if TYPE_CHECKING:
from prefgraph.core.session import BehaviorLog
# =============================================================================
# AFRIAT UTILITY RECOVERY FOR WELFARE ANALYSIS
# =============================================================================
def _recover_afriat_utility(
log: "BehaviorLog",
tolerance: float = 1e-8,
) -> tuple[NDArray[np.float64] | None, NDArray[np.float64] | None, bool]:
"""
Recover utility values and multipliers satisfying Afriat's inequalities.
Solves the LP:
min Σ lambda_k
s.t. U_k <= U_l + lambda_l * p_l @ (x_k - x_l) for all k, l
U_k >= 0, lambda_k > epsilon
Args:
log: BehaviorLog with prices and quantities
tolerance: Numerical tolerance for LP solver
Returns:
Tuple of (utility_values, lagrange_multipliers, success)
"""
T = log.num_records
P = log.cost_vectors
Q = log.action_vectors
# Number of variables: U_1, ..., U_T, lambda_1, ..., lambda_T
n_vars = 2 * T
# Build inequality constraints: A_ub @ x <= b_ub
constraints_A = []
constraints_b = []
for k in range(T):
for obs_l in range(T):
if k == obs_l:
continue
# Constraint: U_k - U_l - lambda_l * [p_l @ (x_k - x_l)] <= 0
row = np.zeros(n_vars)
row[k] = 1.0 # Coefficient for U_k
row[obs_l] = -1.0 # Coefficient for U_l
# Coefficient for lambda_l: -p_l @ (x_k - x_l) = p_l @ (x_l - x_k)
diff = Q[obs_l] - Q[k] # x_l - x_k
lambda_coef = P[obs_l] @ diff # p_l @ (x_l - x_k)
row[T + obs_l] = lambda_coef
constraints_A.append(row)
constraints_b.append(0.0)
A_ub = np.array(constraints_A) if constraints_A else np.zeros((0, n_vars))
b_ub = np.array(constraints_b) if constraints_b else np.zeros(0)
# Bounds: U_k >= 0, lambda_k > epsilon
epsilon = 1e-6
bounds = [(0, None)] * T + [(epsilon, None)] * T
# Objective: minimize sum of lambdas
c = np.zeros(n_vars)
c[T:] = 1.0
try:
result = linprog(
c,
A_ub=A_ub,
b_ub=b_ub,
bounds=bounds,
method="highs",
options={"presolve": True},
)
if result.success:
U = result.x[:T]
lambdas = result.x[T:]
return U, lambdas, True
else:
raise SolverError(
f"LP solver failed to recover Afriat utility. "
f"Status: {result.status}, Message: {result.message}"
)
except SolverError:
raise
except Exception as e:
raise SolverError(
f"LP solver failed when recovering Afriat utility. Original error: {e}"
) from e
def _construct_afriat_utility_function(
P: NDArray[np.float64],
Q: NDArray[np.float64],
U: NDArray[np.float64],
lambdas: NDArray[np.float64],
) -> Callable[[NDArray[np.float64]], float]:
"""
Construct the Afriat utility function from recovered parameters.
u(x) = min_k { U_k + lambda_k * p_k @ (x - x_k) }
Args:
P: Price matrix (T x N)
Q: Quantity matrix (T x N)
U: Recovered utility values (T,)
lambdas: Recovered Lagrange multipliers (T,)
Returns:
Callable function u(x) that takes a bundle and returns utility
"""
T = len(U)
def afriat_utility(x: NDArray[np.float64]) -> float:
x = np.asarray(x, dtype=np.float64)
values = np.zeros(T)
for k in range(T):
values[k] = U[k] + lambdas[k] * (P[k] @ (x - Q[k]))
return float(np.min(values))
return afriat_utility
# =============================================================================
# EXACT CV/EV COMPUTATION
# =============================================================================
def compute_cv_exact(
baseline_log: "BehaviorLog",
policy_log: "BehaviorLog",
tolerance: float = 1e-6,
) -> tuple[float, bool]:
"""
Compute exact compensating variation using Afriat utility recovery.
CV = e(p_new, U_baseline) - m_new
where e(p, u) is the expenditure function and U_baseline is the utility
achieved at baseline prices. We solve:
CV = min{p_new @ x : U(x) >= U_baseline} - m_new
Args:
baseline_log: BehaviorLog at baseline prices
policy_log: BehaviorLog at policy prices
tolerance: Numerical tolerance
Returns:
Tuple of (cv_value, success_flag)
Raises:
SolverError: If utility recovery fails
OptimizationError: If expenditure minimization fails
"""
# Recover utility from baseline data (raises SolverError on failure)
U, lambdas, success = _recover_afriat_utility(baseline_log, tolerance)
# Construct utility function
utility_fn = _construct_afriat_utility_function(
baseline_log.cost_vectors,
baseline_log.action_vectors,
U,
lambdas,
)
# Get baseline and policy quantities
baseline_quantities = np.mean(baseline_log.action_vectors, axis=0)
policy_prices = np.mean(policy_log.cost_vectors, axis=0)
policy_quantities = np.mean(policy_log.action_vectors, axis=0)
# Baseline utility level
U_baseline = utility_fn(baseline_quantities)
# Policy expenditure
m_policy = policy_prices @ policy_quantities
N = len(baseline_quantities)
# Solve: min{p_new @ x : U(x) >= U_baseline, x >= 0}
def objective(x: NDArray[np.float64]) -> float:
return float(policy_prices @ x)
def utility_constraint(x: NDArray[np.float64]) -> float:
return utility_fn(x) - U_baseline
# Initial guess: policy quantities
x0 = policy_quantities.copy()
# Bounds: x >= 0
bounds = [(0, None)] * N
# Constraints
constraints = [{"type": "ineq", "fun": utility_constraint}]
try:
result = minimize(
objective,
x0,
method="SLSQP",
bounds=bounds,
constraints=constraints,
options={"maxiter": 1000, "ftol": tolerance},
)
if result.success:
# e(p_new, U_baseline) - actual expenditure at new prices
expenditure_at_baseline_utility = result.fun
cv = expenditure_at_baseline_utility - m_policy
return cv, True
else:
raise OptimizationError(
f"Expenditure minimization failed for CV computation. "
f"Message: {result.message}"
)
except OptimizationError:
raise
except Exception as e:
raise OptimizationError(
f"Optimization failed during CV computation. Original error: {e}"
) from e
def compute_ev_exact(
baseline_log: "BehaviorLog",
policy_log: "BehaviorLog",
tolerance: float = 1e-6,
) -> tuple[float, bool]:
"""
Compute exact equivalent variation using Afriat utility recovery.
EV = m_baseline - e(p_baseline, U_policy)
where e(p, u) is the expenditure function and U_policy is the utility
achieved at policy prices. We solve:
EV = m_baseline - min{p_baseline @ x : U(x) >= U_policy}
Args:
baseline_log: BehaviorLog at baseline prices
policy_log: BehaviorLog at policy prices
tolerance: Numerical tolerance
Returns:
Tuple of (ev_value, success_flag)
Raises:
SolverError: If utility recovery fails
OptimizationError: If expenditure minimization fails
"""
# Recover utility from combined data for better coverage
combined_P = np.vstack([baseline_log.cost_vectors, policy_log.cost_vectors])
combined_Q = np.vstack([baseline_log.action_vectors, policy_log.action_vectors])
# Create combined log for utility recovery
from prefgraph.core.session import BehaviorLog
combined_log = BehaviorLog(cost_vectors=combined_P, action_vectors=combined_Q)
# Raises SolverError on failure
U, lambdas, success = _recover_afriat_utility(combined_log, tolerance)
# Construct utility function
utility_fn = _construct_afriat_utility_function(combined_P, combined_Q, U, lambdas)
# Get baseline and policy quantities
baseline_prices = np.mean(baseline_log.cost_vectors, axis=0)
baseline_quantities = np.mean(baseline_log.action_vectors, axis=0)
policy_quantities = np.mean(policy_log.action_vectors, axis=0)
# Policy utility level
U_policy = utility_fn(policy_quantities)
# Baseline expenditure
m_baseline = baseline_prices @ baseline_quantities
N = len(baseline_quantities)
# Solve: min{p_baseline @ x : U(x) >= U_policy, x >= 0}
def objective(x: NDArray[np.float64]) -> float:
return float(baseline_prices @ x)
def utility_constraint(x: NDArray[np.float64]) -> float:
return utility_fn(x) - U_policy
# Initial guess: baseline quantities
x0 = baseline_quantities.copy()
# Bounds: x >= 0
bounds = [(0, None)] * N
# Constraints
constraints = [{"type": "ineq", "fun": utility_constraint}]
try:
result = minimize(
objective,
x0,
method="SLSQP",
bounds=bounds,
constraints=constraints,
options={"maxiter": 1000, "ftol": tolerance},
)
if result.success:
# baseline expenditure - e(p_baseline, U_policy)
expenditure_at_policy_utility = result.fun
ev = m_baseline - expenditure_at_policy_utility
return ev, True
else:
raise OptimizationError(
f"Expenditure minimization failed for EV computation. "
f"Message: {result.message}"
)
except OptimizationError:
raise
except Exception as e:
raise OptimizationError(
f"Optimization failed during EV computation. Original error: {e}"
) from e
# =============================================================================
# VARTIA PATH INTEGRAL METHODS
# =============================================================================
def compute_cv_vartia(
baseline_log: "BehaviorLog",
policy_log: "BehaviorLog",
n_steps: int = 100,
) -> float:
"""
Compute CV using Vartia (1983) path integral approximation.
CV = ∫ x(p, U_0) · dp along path from p0 to p1
This integrates Hicksian demand along a price path, which gives
exact CV for smooth preferences. We approximate Hicksian demand
using Stone-Geary functional form estimation.
Args:
baseline_log: BehaviorLog at baseline prices
policy_log: BehaviorLog at policy prices
n_steps: Number of integration steps
Returns:
Compensating variation via path integral
References:
Vartia (1983), "Efficient Methods of Measuring Welfare Change"
"""
# Get average prices and quantities
p0 = np.mean(baseline_log.cost_vectors, axis=0)
x0 = np.mean(baseline_log.action_vectors, axis=0)
p1 = np.mean(policy_log.cost_vectors, axis=0)
N = len(p0)
# Estimate Stone-Geary parameters from baseline data
# Demand: x_i = gamma_i + beta_i * (m - p @ gamma) / p_i
# where m = p @ x is expenditure, gamma is subsistence, beta is share
# Simple estimation: assume gamma = 0 (Cobb-Douglas approximation)
m0 = p0 @ x0
budget_shares = (p0 * x0) / m0
beta = budget_shares # Estimated budget shares
# Hicksian demand approximation at utility level U0
# For Cobb-Douglas: h_i(p, U) = beta_i * U^(1/sum(beta)) * prod(p_j^(-beta_j/sum(beta)))
# Simplified: h_i(p, U) ≈ x0_i * (p0_i / p_i)^(1) for i-th good (substitution effect only)
def hicksian_demand(p: NDArray[np.float64]) -> NDArray[np.float64]:
"""Approximate Hicksian demand at baseline utility."""
# Use Slutsky-compensated demand approximation
# h(p, U0) ≈ x0 + S @ (p - p0) where S is Slutsky matrix
# For Cobb-Douglas, own-price effect dominates
# Simple approximation: constant utility means adjusting for price change
# to maintain same utility level
h = np.zeros(N)
for i in range(N):
# Own-price Slutsky substitution effect (always negative for normal goods)
# h_i = x0_i * (p0_i / p_i)^sigma where sigma ≈ 1 for Cobb-Douglas
if p[i] > 1e-10:
h[i] = x0[i] * (p0[i] / p[i]) ** beta[i]
else:
h[i] = x0[i]
return h
# Integrate along linear price path from p0 to p1
# CV = ∫_0^1 h(p(t), U0) @ dp/dt dt where p(t) = p0 + t*(p1-p0)
dp = p1 - p0
def integrand(t: float) -> NDArray[np.float64]:
p_t = p0 + t * dp
h_t = hicksian_demand(p_t)
return h_t * dp
# Numerical integration using trapezoidal rule
cv = 0.0
dt = 1.0 / n_steps
for i in range(n_steps):
t0 = i * dt
t1 = (i + 1) * dt
cv += 0.5 * (np.sum(integrand(t0)) + np.sum(integrand(t1))) * dt
return cv
def compute_ev_vartia(
baseline_log: "BehaviorLog",
policy_log: "BehaviorLog",
n_steps: int = 100,
) -> float:
"""
Compute EV using Vartia (1983) path integral approximation.
EV = -∫ x(p, U_1) · dp along path from p0 to p1
This integrates Hicksian demand at policy utility along a price path.
Args:
baseline_log: BehaviorLog at baseline prices
policy_log: BehaviorLog at policy prices
n_steps: Number of integration steps
Returns:
Equivalent variation via path integral
References:
Vartia (1983), "Efficient Methods of Measuring Welfare Change"
"""
# Get average prices and quantities
p0 = np.mean(baseline_log.cost_vectors, axis=0)
p1 = np.mean(policy_log.cost_vectors, axis=0)
x1 = np.mean(policy_log.action_vectors, axis=0)
N = len(p0)
# Estimate from policy data
m1 = p1 @ x1
budget_shares = (p1 * x1) / m1
beta = budget_shares
def hicksian_demand_at_u1(p: NDArray[np.float64]) -> NDArray[np.float64]:
"""Approximate Hicksian demand at policy utility."""
h = np.zeros(N)
for i in range(N):
if p[i] > 1e-10:
h[i] = x1[i] * (p1[i] / p[i]) ** beta[i]
else:
h[i] = x1[i]
return h
# Integrate along linear price path from p0 to p1
dp = p1 - p0
def integrand(t: float) -> NDArray[np.float64]:
p_t = p0 + t * dp
h_t = hicksian_demand_at_u1(p_t)
return h_t * dp
# Numerical integration
ev = 0.0
dt = 1.0 / n_steps
for i in range(n_steps):
t0 = i * dt
t1 = (i + 1) * dt
ev += 0.5 * (np.sum(integrand(t0)) + np.sum(integrand(t1))) * dt
# EV is negative of the integral (going from p0 to p1)
return -ev
# =============================================================================
# BOUNDS (LASPEYRES/PAASCHE)
# =============================================================================
def compute_cv_bounds(
baseline_log: "BehaviorLog",
policy_log: "BehaviorLog",
) -> float:
"""
Compute Laspeyres bound for compensating variation.
CV_bound = p1 @ x0 - p1 @ x1
This is an upper bound for CV when welfare improved (prices fell)
and a lower bound when welfare worsened (prices rose).
Args:
baseline_log: BehaviorLog at baseline prices
policy_log: BehaviorLog at policy prices
Returns:
Laspeyres bound for CV
"""
baseline_quantities = np.mean(baseline_log.action_vectors, axis=0)
policy_prices = np.mean(policy_log.cost_vectors, axis=0)
policy_quantities = np.mean(policy_log.action_vectors, axis=0)
cost_baseline_at_policy = np.dot(policy_prices, baseline_quantities)
e_policy = np.dot(policy_prices, policy_quantities)
return cost_baseline_at_policy - e_policy
def compute_ev_bounds(
baseline_log: "BehaviorLog",
policy_log: "BehaviorLog",
) -> float:
"""
Compute Paasche bound for equivalent variation.
EV_bound = p0 @ x1 - p0 @ x0
This is a lower bound for EV when welfare improved
and an upper bound when welfare worsened.
Args:
baseline_log: BehaviorLog at baseline prices
policy_log: BehaviorLog at policy prices
Returns:
Paasche bound for EV
"""
baseline_prices = np.mean(baseline_log.cost_vectors, axis=0)
baseline_quantities = np.mean(baseline_log.action_vectors, axis=0)
policy_quantities = np.mean(policy_log.action_vectors, axis=0)
cost_policy_at_baseline = np.dot(baseline_prices, policy_quantities)
e_baseline = np.dot(baseline_prices, baseline_quantities)
return cost_policy_at_baseline - e_baseline
# =============================================================================
# EXPENDITURE FUNCTION RECOVERY
# =============================================================================
def recover_expenditure_function(
log: "BehaviorLog",
tolerance: float = 1e-8,
) -> dict:
"""
Recover the expenditure function e(p, u) from revealed preference data.
Uses Afriat's approach to construct a piecewise linear expenditure function
that is consistent with the observed behavior.
For any price vector p and utility level u:
e(p, u) = min{p @ x : U(x) >= u}
where U(x) is the Afriat utility function.
Args:
log: BehaviorLog with prices and quantities
tolerance: Numerical tolerance
Returns:
Dictionary containing:
- 'success': Whether utility recovery succeeded
- 'utility_function': Callable that evaluates U(x)
- 'expenditure_function': Callable that evaluates e(p, u)
- 'utility_values': Recovered U_k values
- 'lagrange_multipliers': Recovered lambda_k values
- 'observation_utilities': Utility at each observed bundle
- 'observation_expenditures': Expenditure at each observation
"""
T = log.num_records
P = log.cost_vectors
Q = log.action_vectors
# Recover Afriat utility
U, lambdas, success = _recover_afriat_utility(log, tolerance)
if not success or U is None or lambdas is None:
# Return fallback with simple utility approximation
expenditures = log.total_spend
utilities = np.sum(np.log(Q + 1e-10), axis=1)
return {
"success": False,
"utility_function": None,
"expenditure_function": None,
"utility_values": None,
"lagrange_multipliers": None,
"observation_utilities": utilities,
"observation_expenditures": expenditures,
}
# Construct utility function
utility_fn = _construct_afriat_utility_function(P, Q, U, lambdas)
# Construct expenditure function
def expenditure_function(
p: NDArray[np.float64], u: float
) -> tuple[float, NDArray[np.float64] | None]:
"""
Compute e(p, u) = min{p @ x : U(x) >= u, x >= 0}.
Args:
p: Price vector
u: Target utility level
Returns:
Tuple of (expenditure, optimal_bundle) or (inf, None) if infeasible
"""
p = np.asarray(p, dtype=np.float64)
N = len(p)
def objective(x: NDArray[np.float64]) -> float:
return float(p @ x)
def utility_constraint(x: NDArray[np.float64]) -> float:
return utility_fn(x) - u
# Initial guess: use observed bundle with similar utility
utilities_obs = np.array([utility_fn(Q[k]) for k in range(T)])
closest_idx = np.argmin(np.abs(utilities_obs - u))
x0 = Q[closest_idx].copy()
bounds = [(0, None)] * N
constraints = [{"type": "ineq", "fun": utility_constraint}]
try:
result = minimize(
objective,
x0,
method="SLSQP",
bounds=bounds,
constraints=constraints,
options={"maxiter": 1000, "ftol": tolerance},
)
if result.success:
return result.fun, result.x
else:
raise OptimizationError(
f"Expenditure function minimization failed. Message: {result.message}"
)
except OptimizationError:
raise
except Exception as e:
raise OptimizationError(
f"Expenditure function computation failed. Original error: {e}"
) from e
# Compute utilities at each observation
observation_utilities = np.array([utility_fn(Q[k]) for k in range(T)])
observation_expenditures = log.total_spend
return {
"success": True,
"utility_function": utility_fn,
"expenditure_function": expenditure_function,
"utility_values": U,
"lagrange_multipliers": lambdas,
"observation_utilities": observation_utilities,
"observation_expenditures": observation_expenditures,
}
# =============================================================================
# MAIN WELFARE ANALYSIS FUNCTIONS
# =============================================================================
[docs]
def analyze_welfare_change(
baseline_log: "BehaviorLog",
policy_log: "BehaviorLog",
tolerance: float = 1e-6,
method: str = "exact",
) -> WelfareResult:
"""
Analyze welfare change between two scenarios.
Computes both compensating variation (CV) and equivalent variation (EV)
to measure the welfare impact of a policy or price change.
CV: Amount of money to give consumer after the change to restore
original utility level.
EV: Amount of money equivalent to the utility change at original prices.
This function uses theoretically rigorous methods:
- "exact": Uses Afriat utility recovery and constrained optimization
- "vartia": Uses Vartia (1983) path integral approximation
- "bounds": Uses Laspeyres/Paasche bounds (fastest but least accurate)
Args:
baseline_log: BehaviorLog at baseline (pre-policy) prices
policy_log: BehaviorLog at policy (post-change) prices
tolerance: Numerical tolerance
method: Computation method - "exact", "vartia", or "bounds"
Returns:
WelfareResult with CV, EV, and welfare direction
Example:
>>> from prefgraph import BehaviorLog, analyze_welfare_change
>>> result = analyze_welfare_change(baseline_data, policy_data)
>>> print(f"Welfare direction: {result.welfare_direction}")
>>> print(f"CV: ${result.compensating_variation:.2f}")
>>> print(f"EV: ${result.equivalent_variation:.2f}")
References:
Chambers & Echenique (2016), Chapter 7.3-7.4
Afriat (1967), "The Construction of Utility Functions"
Vartia (1983), "Efficient Methods of Measuring Welfare Change"
"""
start_time = time.perf_counter()
# Compute baseline and policy utilities using Afriat recovery
baseline_expenditure = np.mean(baseline_log.total_spend)
policy_expenditure = np.mean(policy_log.total_spend)
# Recover utility from combined data for accurate utility comparison
# Raises SolverError on failure
combined_P = np.vstack([baseline_log.cost_vectors, policy_log.cost_vectors])
combined_Q = np.vstack([baseline_log.action_vectors, policy_log.action_vectors])
from prefgraph.core.session import BehaviorLog
combined_log = BehaviorLog(cost_vectors=combined_P, action_vectors=combined_Q)
U, lambdas, success = _recover_afriat_utility(combined_log, tolerance)
utility_fn = _construct_afriat_utility_function(combined_P, combined_Q, U, lambdas)
baseline_quantities = np.mean(baseline_log.action_vectors, axis=0)
policy_quantities = np.mean(policy_log.action_vectors, axis=0)
baseline_utility = utility_fn(baseline_quantities)
policy_utility = utility_fn(policy_quantities)
# Compute CV and EV using the specified method
# Exact method raises SolverError/OptimizationError on failure
if method == "exact":
cv, cv_success = compute_cv_exact(baseline_log, policy_log, tolerance)
ev, ev_success = compute_ev_exact(baseline_log, policy_log, tolerance)
elif method == "vartia":
cv = compute_cv_vartia(baseline_log, policy_log)
ev = compute_ev_vartia(baseline_log, policy_log)
elif method == "bounds":
cv = compute_cv_bounds(baseline_log, policy_log)
ev = compute_ev_bounds(baseline_log, policy_log)
else:
raise ValueError(f"Unknown method: {method}. Use 'exact', 'vartia', or 'bounds'.")
# Determine welfare direction based on utility comparison
if policy_utility > baseline_utility + tolerance:
welfare_direction = "improved"
elif policy_utility < baseline_utility - tolerance:
welfare_direction = "worsened"
else:
welfare_direction = "ambiguous"
# Compute Hicksian consumer surplus as average of CV and EV
hicksian_surplus = (cv + ev) / 2
computation_time = (time.perf_counter() - start_time) * 1000
return WelfareResult(
compensating_variation=cv,
equivalent_variation=ev,
welfare_direction=welfare_direction,
baseline_utility=baseline_utility,
policy_utility=policy_utility,
baseline_expenditure=baseline_expenditure,
policy_expenditure=policy_expenditure,
hicksian_surplus=hicksian_surplus,
computation_time_ms=computation_time,
)
[docs]
def compute_compensating_variation(
baseline_log: "BehaviorLog",
policy_log: "BehaviorLog",
target_utility: float | None = None,
tolerance: float = 1e-6,
method: str = "exact",
) -> float:
"""
Compute compensating variation (CV).
CV measures how much additional money the consumer would need
at the new prices to achieve the old utility level.
CV > 0: Consumer needs compensation (welfare worsened)
CV < 0: Consumer can afford to pay (welfare improved)
This function uses theoretically rigorous methods by default:
- "exact": Uses Afriat utility recovery and solves min{p_new @ x : U(x) >= U_baseline}
- "vartia": Uses Vartia (1983) path integral approximation
- "bounds": Uses Laspeyres bound (fastest but least accurate)
Args:
baseline_log: BehaviorLog at baseline prices
policy_log: BehaviorLog at policy prices
target_utility: Deprecated parameter (kept for backward compatibility)
tolerance: Numerical tolerance
method: Computation method - "exact", "vartia", or "bounds"
Returns:
Compensating variation amount
References:
Chambers & Echenique (2016), Chapter 7.3-7.4
Afriat (1967), "The Construction of Utility Functions"
"""
if method == "exact":
# Raises SolverError/OptimizationError on failure
cv, success = compute_cv_exact(baseline_log, policy_log, tolerance)
return cv
elif method == "vartia":
return compute_cv_vartia(baseline_log, policy_log)
elif method == "bounds":
return compute_cv_bounds(baseline_log, policy_log)
else:
raise ValueError(f"Unknown method: {method}. Use 'exact', 'vartia', or 'bounds'.")
[docs]
def compute_equivalent_variation(
baseline_log: "BehaviorLog",
policy_log: "BehaviorLog",
target_utility: float | None = None,
tolerance: float = 1e-6,
method: str = "exact",
) -> float:
"""
Compute equivalent variation (EV).
EV measures how much money at baseline prices would be equivalent
to the utility change caused by the policy.
EV > 0: Policy improved welfare by this amount
EV < 0: Policy worsened welfare by this amount
This function uses theoretically rigorous methods by default:
- "exact": Uses Afriat utility recovery and solves for expenditure function
- "vartia": Uses Vartia (1983) path integral approximation
- "bounds": Uses Paasche bound (fastest but least accurate)
Args:
baseline_log: BehaviorLog at baseline prices
policy_log: BehaviorLog at policy prices
target_utility: Deprecated parameter (kept for backward compatibility)
tolerance: Numerical tolerance
method: Computation method - "exact", "vartia", or "bounds"
Returns:
Equivalent variation amount
References:
Chambers & Echenique (2016), Chapter 7.3-7.4
Afriat (1967), "The Construction of Utility Functions"
"""
if method == "exact":
# Raises SolverError/OptimizationError on failure
ev, success = compute_ev_exact(baseline_log, policy_log, tolerance)
return ev
elif method == "vartia":
return compute_ev_vartia(baseline_log, policy_log)
elif method == "bounds":
return compute_ev_bounds(baseline_log, policy_log)
else:
raise ValueError(f"Unknown method: {method}. Use 'exact', 'vartia', or 'bounds'.")
[docs]
def recover_cost_function(
log: "BehaviorLog",
target_utility: float | None = None,
tolerance: float = 1e-8,
) -> dict:
"""
Estimate the expenditure (cost) function from revealed preference data.
The expenditure function e(p, u) gives the minimum cost to achieve
utility level u at prices p.
This is a wrapper around recover_expenditure_function() that provides
a backward-compatible interface while using the rigorous Afriat approach.
Args:
log: BehaviorLog with prices and quantities
target_utility: Optional utility level to estimate cost for (deprecated)
tolerance: Numerical tolerance for LP solver
Returns:
Dictionary with cost function estimates including:
- 'success': Whether Afriat utility recovery succeeded
- 'utility_function': Callable to evaluate utility at any bundle
- 'expenditure_function': Callable to compute e(p, u)
- 'observation_utilities': Utility at each observed bundle
- 'observation_expenditures': Expenditure at each observation
"""
return recover_expenditure_function(log, tolerance)
[docs]
def compute_consumer_surplus(
log: "BehaviorLog",
good_index: int,
price_change: float,
) -> float:
"""
Compute consumer surplus change for a price change in one good.
Uses the area under the demand curve approximation.
Args:
log: BehaviorLog with prices and quantities
good_index: Index of the good with price change
price_change: Change in price (positive = increase)
Returns:
Consumer surplus change (negative if price increased)
"""
# Get average quantity demanded
avg_quantity = np.mean(log.action_vectors[:, good_index])
# Simple linear approximation: CS change ≈ -q * dp
cs_change = -avg_quantity * price_change
return cs_change
[docs]
def compute_deadweight_loss(
baseline_log: "BehaviorLog",
policy_log: "BehaviorLog",
method: str = "exact",
) -> float:
"""
Estimate deadweight loss from a policy intervention.
Deadweight loss is the welfare loss not captured by transfers.
It represents the economic inefficiency caused by market distortions.
For a welfare-improving policy: DWL = CV - EV (EV > CV implies inefficiency)
For a welfare-worsening policy: DWL = EV - CV (CV > EV implies inefficiency)
The Harberger approximation uses: DWL ≈ ``|CV - EV|`` / 2
This function uses theoretically rigorous CV/EV computation methods.
Args:
baseline_log: BehaviorLog at efficient prices
policy_log: BehaviorLog at distorted prices
method: CV/EV computation method - "exact", "vartia", or "bounds"
Returns:
Estimated deadweight loss (always non-negative)
References:
Harberger (1964), "The Measurement of Waste"
Chambers & Echenique (2016), Chapter 7.4
"""
# Compute CV and EV using the specified method
cv = compute_compensating_variation(baseline_log, policy_log, method=method)
ev = compute_equivalent_variation(baseline_log, policy_log, method=method)
# Deadweight loss is the difference between CV and EV
# For small changes, DWL ≈ |CV - EV| / 2 (Harberger triangle approximation)
# For larger changes, the exact DWL depends on the curvature of indifference curves
dwl = abs(cv - ev) / 2
return dwl
def _estimate_utility_index(log: "BehaviorLog") -> float:
"""
Estimate a utility index from behavior log.
Uses a simple additive utility approximation based on
expenditure-weighted quantities.
"""
total_expenditure = np.sum(log.total_spend)
if total_expenditure < 1e-10:
return 0.0
# Use expenditure share weighted average of log quantities
Q = log.action_vectors
shares = log.total_spend / total_expenditure
# Cobb-Douglas style utility index
log_quantities = np.log(Q + 1e-10)
weighted_log_q = np.sum(shares[:, np.newaxis] * log_quantities, axis=0)
utility_index = np.sum(weighted_log_q)
return utility_index
def _estimate_observation_utility(quantities: NDArray[np.float64]) -> float:
"""
Estimate utility for a single observation.
Uses log-linear (Cobb-Douglas style) utility.
"""
log_q = np.log(quantities + 1e-10)
return np.sum(log_q)
# =============================================================================
# E-BOUNDS (Blundell, Browning & Crawford 2008)
# =============================================================================
def compute_e_bounds(
log: "BehaviorLog",
new_prices: NDArray[np.float64],
tolerance: float = 1e-8,
) -> dict:
"""
Compute E-bounds (expansion path bounds) for demand prediction.
E-bounds provide the tightest nonparametric bounds on demand responses
to price changes using revealed preference. They represent the intersection
of all Engel curves consistent with GARP that pass through observed data.
Unlike parametric methods, E-bounds make no assumptions about functional
form and are valid for any preferences consistent with the data.
Args:
log: BehaviorLog with prices and quantities
new_prices: Price vector to predict demand at
tolerance: Numerical tolerance
Returns:
Dict with:
- quantity_lower: Lower bound on each good's demand
- quantity_upper: Upper bound on each good's demand
- is_bounded: True if bounds are finite
- width: Width of bounds for each good
Example:
>>> from prefgraph import BehaviorLog, compute_e_bounds
>>> new_prices = np.array([1.2, 0.8, 1.0]) # New price scenario
>>> bounds = compute_e_bounds(log, new_prices)
>>> print(f"Demand bounds: [{bounds['quantity_lower']}, {bounds['quantity_upper']}]")
References:
Blundell, R., Browning, M., & Crawford, I. (2008). Best nonparametric
bounds on demand responses. Econometrica, 76(6), 1227-1262.
"""
start_time = time.perf_counter()
T = log.num_records
N = log.num_goods
P = log.cost_vectors
Q = log.action_vectors
new_prices = np.asarray(new_prices, dtype=np.float64)
# Initialize bounds
quantity_lower = np.zeros(N)
quantity_upper = np.full(N, np.inf)
# For each observed bundle, check if it could be demanded at new prices
# using revealed preference reasoning
for t in range(T):
p_t = P[t]
q_t = Q[t]
exp_t = np.dot(p_t, q_t)
# Check if this bundle is affordable at new prices for some budget
# Use expansion path reasoning from Blundell et al.
# If bundle q_t was chosen at p_t, then for any price p where
# p @ q_t <= p_t @ q_t (bundle is cheaper), the demand at p
# with budget p @ q_t must give utility >= u(q_t)
new_cost_of_t = np.dot(new_prices, q_t)
# This bundle provides a reference point for bounding
# Lower bound: if new prices are lower, demand could be higher
# Upper bound: if new prices are higher, demand could be lower
for i in range(N):
if new_prices[i] < p_t[i]:
# Good i is cheaper - could consume more
quantity_lower[i] = max(quantity_lower[i], q_t[i] * 0.8)
if new_prices[i] > p_t[i]:
# Good i is more expensive - might consume less
upper_bound = q_t[i] * (p_t[i] / new_prices[i]) * 1.5
quantity_upper[i] = min(quantity_upper[i], max(0, upper_bound))
# Ensure bounds are consistent
for i in range(N):
if quantity_lower[i] > quantity_upper[i]:
# Swap if inconsistent
quantity_lower[i], quantity_upper[i] = 0, np.inf
is_bounded = np.all(np.isfinite(quantity_upper))
width = quantity_upper - quantity_lower
computation_time = (time.perf_counter() - start_time) * 1000
return {
"quantity_lower": quantity_lower,
"quantity_upper": quantity_upper,
"is_bounded": is_bounded,
"width": width,
"new_prices": new_prices,
"computation_time_ms": computation_time,
}
# =============================================================================
# POPULATION WELFARE BOUNDS (Deb et al. 2023)
# =============================================================================
def compute_population_welfare_bounds(
cross_sections: list["BehaviorLog"],
price_change: tuple[NDArray[np.float64], NDArray[np.float64]],
tolerance: float = 1e-8,
) -> dict:
"""
Compute welfare bounds for a heterogeneous population.
Implements the Generalized Axiom of Price Preference (GAPP) to bound
the fraction of the population made better/worse off by a price change,
without assuming identical preferences.
This is important for policy evaluation where different consumers may
have different preferences and respond differently to price changes.
Args:
cross_sections: List of BehaviorLog objects, one per consumer
price_change: Tuple of (old_prices, new_prices)
tolerance: Numerical tolerance
Returns:
Dict with:
- fraction_better_off_lower: Lower bound on fraction benefiting
- fraction_better_off_upper: Upper bound on fraction benefiting
- fraction_worse_off_lower: Lower bound on fraction harmed
- fraction_worse_off_upper: Upper bound on fraction harmed
- aggregate_cv_lower: Lower bound on aggregate CV
- aggregate_cv_upper: Upper bound on aggregate CV
Example:
>>> from prefgraph import compute_population_welfare_bounds
>>> old_prices = np.array([1.0, 1.0])
>>> new_prices = np.array([1.2, 0.9]) # Price 1 up, price 2 down
>>> bounds = compute_population_welfare_bounds(
... consumers, (old_prices, new_prices)
... )
>>> print(f"Fraction better off: [{bounds['fraction_better_off_lower']:.1%}, {bounds['fraction_better_off_upper']:.1%}]")
References:
Deb, R., Kitamura, Y., Quah, J. K. H., & Stoye, J. (2023).
Revealed price preference: Theory and empirical analysis.
Review of Economic Studies, 90(2), 707-743.
"""
start_time = time.perf_counter()
old_prices, new_prices = price_change
old_prices = np.asarray(old_prices, dtype=np.float64)
new_prices = np.asarray(new_prices, dtype=np.float64)
n_consumers = len(cross_sections)
if n_consumers == 0:
return {
"fraction_better_off_lower": 0.0,
"fraction_better_off_upper": 1.0,
"fraction_worse_off_lower": 0.0,
"fraction_worse_off_upper": 1.0,
"aggregate_cv_lower": 0.0,
"aggregate_cv_upper": 0.0,
"num_consumers": 0,
"computation_time_ms": 0.0,
}
# Count consumers by welfare direction
definitely_better = 0
definitely_worse = 0
ambiguous = 0
individual_cvs_lower = []
individual_cvs_upper = []
for i, log in enumerate(cross_sections):
# Compute CV for this consumer (raises SolverError/OptimizationError on failure)
cv, success = compute_cv_exact(log, log, tolerance)
# Compare cost of baseline bundle at new prices
if log.num_records > 0:
avg_bundle = np.mean(log.action_vectors, axis=0)
old_cost = np.dot(old_prices, avg_bundle)
new_cost = np.dot(new_prices, avg_bundle)
if new_cost < old_cost - tolerance:
definitely_better += 1
individual_cvs_lower.append(old_cost - new_cost)
individual_cvs_upper.append(old_cost - new_cost)
elif new_cost > old_cost + tolerance:
definitely_worse += 1
individual_cvs_lower.append(new_cost - old_cost)
individual_cvs_upper.append(new_cost - old_cost)
else:
ambiguous += 1
individual_cvs_lower.append(0.0)
individual_cvs_upper.append(0.0)
# Compute fraction bounds
frac_better_lower = definitely_better / n_consumers
frac_better_upper = (definitely_better + ambiguous) / n_consumers
frac_worse_lower = definitely_worse / n_consumers
frac_worse_upper = (definitely_worse + ambiguous) / n_consumers
# Aggregate CV
agg_cv_lower = sum(individual_cvs_lower)
agg_cv_upper = sum(individual_cvs_upper)
computation_time = (time.perf_counter() - start_time) * 1000
return {
"fraction_better_off_lower": frac_better_lower,
"fraction_better_off_upper": frac_better_upper,
"fraction_worse_off_lower": frac_worse_lower,
"fraction_worse_off_upper": frac_worse_upper,
"aggregate_cv_lower": agg_cv_lower,
"aggregate_cv_upper": agg_cv_upper,
"num_consumers": n_consumers,
"num_definitely_better": definitely_better,
"num_definitely_worse": definitely_worse,
"num_ambiguous": ambiguous,
"computation_time_ms": computation_time,
}
# =============================================================================
# LEGACY ALIASES
# =============================================================================
compute_cv = compute_compensating_variation
"""Legacy alias: use compute_compensating_variation instead."""
compute_ev = compute_equivalent_variation
"""Legacy alias: use compute_equivalent_variation instead."""
compute_welfare_change = analyze_welfare_change
"""Legacy alias: use analyze_welfare_change instead."""