Source code for prefgraph.contrib.gross_substitutes

"""Gross substitutes test and compensated demand analysis.

Tests for gross substitutes/complements relationships and implements
Slutsky decomposition of price effects. Based on Chapter 10.3 of
Chambers & Echenique (2016) "Revealed Preference Theory".

Tech-Friendly Names (Primary):
    - test_cross_price_effect(): Test substitute/complement relationship
    - decompose_price_effects(): Slutsky decomposition
    - compute_hicksian_demand(): Compensated demand estimation

Economics Names (Legacy Aliases):
    - check_gross_substitutes() -> test_cross_price_effect()
"""

from __future__ import annotations

import time

import numpy as np
from numpy.typing import NDArray

from prefgraph.core.session import ConsumerSession
from prefgraph.core.result import (
    GrossSubstitutesResult,
    SubstitutionMatrixResult,
    CompensatedDemandResult,
)
from prefgraph.core.exceptions import ValueRangeError, DataValidationError


def check_gross_substitutes(
    session: ConsumerSession,
    good_g: int,
    good_h: int,
    price_change_threshold: float = 0.05,
    tolerance: float = 1e-10,
) -> GrossSubstitutesResult:
    """
    Test if two goods are gross substitutes based on revealed preference data.

    Gross substitutes: when the price of good g increases (other prices constant)
    and quantity of g decreases, we should see quantity of h increase.

    Gross complements: when p_g increases and x_g decreases, x_h also decreases.

    The algorithm:
    1. Finds observation pairs where price of g changed significantly
    2. Checks if other prices stayed relatively constant
    3. Analyzes the direction of quantity changes
    4. Classifies the relationship based on majority of informative pairs

    Args:
        session: ConsumerSession with prices and quantities
        good_g: Index of first good
        good_h: Index of second good (potential substitute)
        price_change_threshold: Minimum relative price change to consider (default 5%)
        tolerance: Numerical tolerance

    Returns:
        GrossSubstitutesResult with relationship classification and confidence

    Example:
        >>> import numpy as np
        >>> from prefgraph import ConsumerSession, check_gross_substitutes
        >>> # Prices for goods 0 and 1 over 3 observations
        >>> prices = np.array([[1.0, 2.0], [2.0, 2.0], [1.0, 1.0]])
        >>> quantities = np.array([[4.0, 1.0], [1.0, 3.0], [2.0, 2.0]])
        >>> session = ConsumerSession(prices=prices, quantities=quantities)
        >>> result = check_gross_substitutes(session, good_g=0, good_h=1)
        >>> print(f"Relationship: {result.relationship}")

    References:
        Hicks, J. R. (1939). Value and Capital. Oxford University Press.
    """
    start_time = time.perf_counter()

    T = session.num_observations
    N = session.num_goods

    if good_g < 0 or good_g >= N or good_h < 0 or good_h >= N:
        raise ValueRangeError(
            f"Good indices must be in [0, {N}). "
            f"Got good_g={good_g}, good_h={good_h}. "
            f"Hint: Use valid indices from 0 to {N - 1}."
        )
    if good_g == good_h:
        raise DataValidationError(
            f"good_g and good_h must be different. Got {good_g} for both. "
            f"Hint: Gross substitutes analysis requires two distinct goods."
        )

    P = session.prices  # T x N
    Q = session.quantities  # T x N

    substitutes_pairs: list[tuple[int, int]] = []
    complements_pairs: list[tuple[int, int]] = []

    # Compare all pairs of observations
    for i in range(T):
        for j in range(i + 1, T):
            # Check if price of g changed significantly while others ~constant
            pg_i, pg_j = P[i, good_g], P[j, good_g]
            ph_i, ph_j = P[i, good_h], P[j, good_h]

            # Skip if prices are near zero
            if pg_i < tolerance or pg_j < tolerance:
                continue

            # Relative price change for good g
            rel_change_g = abs(pg_j - pg_i) / pg_i

            # Check if price of h stayed relatively constant
            rel_change_h = abs(ph_j - ph_i) / max(ph_i, tolerance)

            # We want: significant change in p_g, small change in p_h
            if rel_change_g < price_change_threshold:
                continue  # Not enough price movement in g
            if rel_change_h > rel_change_g * 0.5:
                continue  # Too much change in h relative to g

            # Check other prices didn't change too much
            other_goods = [k for k in range(N) if k != good_g and k != good_h]
            if other_goods:
                other_changes = [
                    abs(P[j, k] - P[i, k]) / max(P[i, k], tolerance)
                    for k in other_goods
                ]
                if max(other_changes) > rel_change_g * 0.5:
                    continue  # Other prices changed too much

            # Get quantity changes
            xg_i, xg_j = Q[i, good_g], Q[j, good_g]
            xh_i, xh_j = Q[i, good_h], Q[j, good_h]

            # Direction of price change for g
            pg_increased = pg_j > pg_i + tolerance
            pg_decreased = pg_j < pg_i - tolerance

            # Direction of quantity changes
            xg_increased = xg_j > xg_i + tolerance
            xg_decreased = xg_j < xg_i - tolerance
            xh_increased = xh_j > xh_i + tolerance
            xh_decreased = xh_j < xh_i - tolerance

            # Gross substitutes pattern:
            # p_g up, x_g down => x_h up (or p_g down, x_g up => x_h down)
            if pg_increased and xg_decreased:
                if xh_increased:
                    substitutes_pairs.append((i, j))
                elif xh_decreased:
                    complements_pairs.append((i, j))
            elif pg_decreased and xg_increased:
                if xh_decreased:
                    substitutes_pairs.append((i, j))
                elif xh_increased:
                    complements_pairs.append((i, j))

    # Determine relationship
    n_subs = len(substitutes_pairs)
    n_comp = len(complements_pairs)
    informative_pairs = n_subs + n_comp

    if informative_pairs == 0:
        relationship = "inconclusive"
        are_substitutes = False
        are_complements = False
        confidence = 0.0
        supporting = []
        violating = []
    elif n_subs > n_comp:
        relationship = "substitutes"
        are_substitutes = True
        are_complements = False
        confidence = n_subs / informative_pairs
        supporting = substitutes_pairs
        violating = complements_pairs
    elif n_comp > n_subs:
        relationship = "complements"
        are_substitutes = False
        are_complements = True
        confidence = n_comp / informative_pairs
        supporting = complements_pairs
        violating = substitutes_pairs
    else:
        relationship = "independent"
        are_substitutes = False
        are_complements = False
        confidence = 0.5
        supporting = []
        violating = []

    computation_time = (time.perf_counter() - start_time) * 1000

    return GrossSubstitutesResult(
        are_substitutes=are_substitutes,
        are_complements=are_complements,
        relationship=relationship,
        supporting_pairs=supporting,
        violating_pairs=violating,
        confidence_score=confidence,
        good_g_index=good_g,
        good_h_index=good_h,
        computation_time_ms=computation_time,
    )


# Relationship codes for efficient storage
_RELATIONSHIP_CODES = {
    "self": 0,
    "substitutes": 1,
    "complements": 2,
    "independent": 3,
    "inconclusive": 4,
}
_CODE_TO_RELATIONSHIP = {v: k for k, v in _RELATIONSHIP_CODES.items()}


def compute_substitution_matrix(
    session: ConsumerSession,
    price_change_threshold: float = 0.05,
) -> SubstitutionMatrixResult:
    """
    Compute pairwise substitution relationships for all goods.

    Returns an N x N matrix where entry [g, h] indicates the relationship
    between goods g and h.

    Args:
        session: ConsumerSession
        price_change_threshold: Minimum price change to consider

    Returns:
        SubstitutionMatrixResult with relationship matrix

    Example:
        >>> from prefgraph import ConsumerSession, compute_substitution_matrix
        >>> result = compute_substitution_matrix(session)
        >>> print(f"Substitute pairs: {result.substitute_pairs}")
        >>> print(f"Complement pairs: {result.complement_pairs}")
    """
    start_time = time.perf_counter()

    N = session.num_goods
    # Use int8 codes for memory efficiency instead of object dtype strings
    relationship_codes = np.zeros((N, N), dtype=np.int8)
    confidence_matrix = np.zeros((N, N))

    for g in range(N):
        for h in range(N):
            if g == h:
                relationship_codes[g, h] = _RELATIONSHIP_CODES["self"]
                confidence_matrix[g, h] = 1.0
            elif g < h:
                result = check_gross_substitutes(session, g, h, price_change_threshold)
                code = _RELATIONSHIP_CODES.get(result.relationship, _RELATIONSHIP_CODES["inconclusive"])
                relationship_codes[g, h] = code
                relationship_codes[h, g] = code
                confidence_matrix[g, h] = result.confidence_score
                confidence_matrix[h, g] = result.confidence_score

    # Convert codes back to strings for the result object (for API compatibility)
    relationship_matrix = np.empty((N, N), dtype=object)
    for g in range(N):
        for h in range(N):
            relationship_matrix[g, h] = _CODE_TO_RELATIONSHIP[relationship_codes[g, h]]

    computation_time = (time.perf_counter() - start_time) * 1000

    return SubstitutionMatrixResult(
        relationship_matrix=relationship_matrix,
        confidence_matrix=confidence_matrix,
        num_goods=N,
        computation_time_ms=computation_time,
    )


def check_law_of_demand(
    session: ConsumerSession,
    good: int,
    price_change_threshold: float = 0.05,
    tolerance: float = 1e-10,
) -> dict:
    """
    Check if a good satisfies the law of demand (own-price effect is negative).

    The law of demand states that when price increases, quantity demanded
    decreases (holding other factors constant).

    Args:
        session: ConsumerSession
        good: Index of the good to test
        price_change_threshold: Minimum price change to consider
        tolerance: Numerical tolerance

    Returns:
        Dictionary with:
        - satisfies_law: True if law of demand holds
        - supporting_pairs: Pairs where law holds
        - violating_pairs: Pairs where law is violated (Giffen good behavior)
        - confidence: Fraction of pairs supporting the law
    """
    T = session.num_observations
    N = session.num_goods
    P = session.prices
    Q = session.quantities

    supporting_pairs: list[tuple[int, int]] = []
    violating_pairs: list[tuple[int, int]] = []

    for i in range(T):
        for j in range(i + 1, T):
            pg_i, pg_j = P[i, good], P[j, good]

            if pg_i < tolerance or pg_j < tolerance:
                continue

            rel_change = abs(pg_j - pg_i) / pg_i
            if rel_change < price_change_threshold:
                continue

            # Check other prices (vectorized)
            mask = np.ones(N, dtype=bool)
            mask[good] = False
            if np.any(mask):
                other_changes = np.abs(P[j, mask] - P[i, mask]) / np.maximum(P[i, mask], tolerance)
                if np.max(other_changes) > rel_change * 0.5:
                    continue

            xg_i, xg_j = Q[i, good], Q[j, good]

            # Law of demand: price up => quantity down
            price_up = pg_j > pg_i + tolerance
            price_down = pg_j < pg_i - tolerance
            qty_up = xg_j > xg_i + tolerance
            qty_down = xg_j < xg_i - tolerance

            if (price_up and qty_down) or (price_down and qty_up):
                supporting_pairs.append((i, j))
            elif (price_up and qty_up) or (price_down and qty_down):
                violating_pairs.append((i, j))

    total = len(supporting_pairs) + len(violating_pairs)
    confidence = len(supporting_pairs) / total if total > 0 else 0.5

    return {
        "satisfies_law": len(violating_pairs) == 0 and len(supporting_pairs) > 0,
        "supporting_pairs": supporting_pairs,
        "violating_pairs": violating_pairs,
        "confidence": confidence,
        "num_informative_pairs": total,
    }


# =============================================================================
# TECH-FRIENDLY ALIASES
# =============================================================================

# test_cross_price_effect: Tech-friendly name for check_gross_substitutes
test_cross_price_effect = check_gross_substitutes
"""
Test how changes in one item's price affect demand for another item.

This is the tech-friendly alias for check_gross_substitutes.

Use this to understand cross-price relationships between products:
- Substitutes: Price of A up → Demand for B up (users switch)
- Complements: Price of A up → Demand for B down (bought together)
- Independent: No clear relationship

Example:
    >>> from prefgraph import BehaviorLog, test_cross_price_effect
    >>> result = test_cross_price_effect(user_log, good_g=0, good_h=1)
    >>> if result.are_substitutes:
    ...     print("Users treat these as substitutes")
"""

compute_cross_price_matrix = compute_substitution_matrix
"""
Compute all pairwise cross-price relationships.

Returns an N x N matrix of relationships between all goods.
"""


# =============================================================================
# COMPENSATED DEMAND (Chapter 10.3)
# =============================================================================


[docs] def decompose_price_effects( session: ConsumerSession, price_change_threshold: float = 0.05, tolerance: float = 1e-10, ) -> CompensatedDemandResult: """ Decompose price effects into substitution and income effects. The Slutsky equation states: Total effect = Substitution effect + Income effect dx_i/dp_j = (∂x_i/∂p_j)|_u + x_j * (∂x_i/∂m) The substitution effect measures how demand changes when price changes but utility is held constant (compensated demand). The income effect measures how the change in purchasing power affects demand. Args: session: ConsumerSession with prices and quantities price_change_threshold: Minimum price change to consider tolerance: Numerical tolerance Returns: CompensatedDemandResult with Slutsky decomposition Example: >>> from prefgraph import ConsumerSession, decompose_price_effects >>> result = decompose_price_effects(user_session) >>> print(f"Substitution effect for good 0 from price 1: {result.substitution_effects[0,1]:.3f}") >>> print(f"Income effect: {result.income_effects[0,1]:.3f}") >>> print(f"Satisfies compensated law: {result.satisfies_compensated_law}") References: Chambers & Echenique (2016), Chapter 10.3 Slutsky, E. (1915). "On the Theory of the Budget of the Consumer" """ start_time = time.perf_counter() T = session.num_observations N = session.num_goods P = session.prices Q = session.quantities # Compute substitution and income effects matrices substitution_effects = np.zeros((N, N), dtype=np.float64) income_effects = np.zeros((N, N), dtype=np.float64) total_effects = np.zeros((N, N), dtype=np.float64) for i in range(N): for j in range(N): s_ij, ie_ij, te_ij = _estimate_slutsky_components( P, Q, i, j, price_change_threshold, tolerance ) substitution_effects[i, j] = s_ij income_effects[i, j] = ie_ij total_effects[i, j] = te_ij # Compute own-price elasticities own_price_elasticities = {} for i in range(N): avg_price = np.mean(P[:, i]) avg_qty = np.mean(Q[:, i]) if avg_qty > tolerance: own_price_elasticities[i] = total_effects[i, i] * avg_price / avg_qty else: own_price_elasticities[i] = 0.0 # Compute cross-price elasticities cross_price_elasticities = np.zeros((N, N), dtype=np.float64) for i in range(N): for j in range(N): avg_price_j = np.mean(P[:, j]) avg_qty_i = np.mean(Q[:, i]) if avg_qty_i > tolerance: cross_price_elasticities[i, j] = total_effects[i, j] * avg_price_j / avg_qty_i # Check compensated law of demand # Substitution effects should be negative semi-definite # Own-price substitution effects should be negative (or zero) violations = [] for i in range(N): if substitution_effects[i, i] > tolerance: violations.append((i, i)) satisfies_compensated_law = len(violations) == 0 computation_time = (time.perf_counter() - start_time) * 1000 return CompensatedDemandResult( substitution_effects=substitution_effects, income_effects=income_effects, satisfies_compensated_law=satisfies_compensated_law, own_price_elasticities=own_price_elasticities, cross_price_elasticities=cross_price_elasticities, violations=violations, computation_time_ms=computation_time, )
[docs] def compute_hicksian_demand( session: ConsumerSession, target_utility: float | None = None, method: str = "exact", ) -> dict: """ Compute Hicksian (compensated) demand via expenditure minimization. Hicksian demand h(p, u) solves:: min_x p @ x s.t. U(x) >= u x >= 0 This implementation recovers the Afriat utility function U(x) from the data and uses constrained optimization to solve for Hicksian demand at any price vector and utility level. Args: session: ConsumerSession with prices and quantities target_utility: Utility level for computing derivatives (default: median) method: Computation method: - "exact": Full Afriat recovery + constrained optimization - "approximation": Legacy finite differences (faster but approximate) Returns: Dictionary containing: - 'success': Whether Afriat utility recovery succeeded - 'hicksian_demand_fn': Callable (prices, utility) -> quantities - 'hicksian_derivatives': N x N matrix of ∂h_i/∂p_j at target utility - 'target_utility': Utility level used for derivatives - 'utility_function': Callable to evaluate utility at any bundle - 'observation_utilities': Utility at each observed bundle - 'observations_used': Number of observations (for backward compatibility) Example: >>> from prefgraph import ConsumerSession, compute_hicksian_demand >>> import numpy as np >>> P = np.array([[1.0, 2.0], [1.5, 1.5], [2.0, 1.0]]) >>> Q = np.array([[2.0, 1.0], [1.5, 1.5], [1.0, 2.0]]) >>> session = ConsumerSession(prices=P, quantities=Q) >>> result = compute_hicksian_demand(session, method='exact') >>> if result['success']: ... h = result['hicksian_demand_fn'] ... print(f"h([1,1], 0.5) = {h([1, 1], 0.5)}") References: Chambers & Echenique (2016), Chapter 10.3 Afriat (1967), "The Construction of Utility Functions" """ if method == "approximation": return _compute_hicksian_demand_approximation(session, target_utility) T = session.num_observations N = session.num_goods P = session.prices Q = session.quantities # Recover Afriat utility function utility_fn, U, lambdas, success = _recover_afriat_utility_for_hicksian(P, Q) if not success or utility_fn is None: # Fall back to approximation method result = _compute_hicksian_demand_approximation(session, target_utility) result['success'] = False result['utility_function'] = None result['hicksian_demand_fn'] = None return result # Compute utility at each observation observation_utilities = np.array([utility_fn(Q[t]) for t in range(T)]) if target_utility is None: target_utility = np.median(observation_utilities) # Create Hicksian demand function def hicksian_demand_fn( prices: NDArray[np.float64], utility_level: float, ) -> NDArray[np.float64] | None: """ Compute Hicksian demand h(p, u). Args: prices: N-dimensional price vector utility_level: Target utility level Returns: N-dimensional quantity vector or None if optimization fails """ return _solve_hicksian_at_point( utility_fn, np.asarray(prices), utility_level, Q, tolerance=1e-8 ) # Compute Hicksian derivatives at target utility level # ∂h_i/∂p_j ≈ [h_i(p + ε*e_j, u) - h_i(p - ε*e_j, u)] / (2ε) hicksian_derivatives = np.zeros((N, N), dtype=np.float64) # Use mean prices as evaluation point p_mean = np.mean(P, axis=0) epsilon = 0.01 * np.mean(p_mean) # 1% perturbation # Compute baseline Hicksian demand h_base = hicksian_demand_fn(p_mean, target_utility) if h_base is not None: for j in range(N): p_plus = p_mean.copy() p_plus[j] += epsilon p_minus = p_mean.copy() p_minus[j] -= epsilon h_plus = hicksian_demand_fn(p_plus, target_utility) h_minus = hicksian_demand_fn(p_minus, target_utility) if h_plus is not None and h_minus is not None: for i in range(N): hicksian_derivatives[i, j] = (h_plus[i] - h_minus[i]) / (2 * epsilon) return { "success": True, "hicksian_demand_fn": hicksian_demand_fn, "hicksian_derivatives": hicksian_derivatives, "target_utility": target_utility, "utility_function": utility_fn, "observation_utilities": observation_utilities, "observations_used": T, }
def _recover_afriat_utility_for_hicksian( P: NDArray[np.float64], Q: NDArray[np.float64], tolerance: float = 1e-8, ) -> tuple[callable | None, NDArray | None, NDArray | None, bool]: """ Recover Afriat utility function from price-quantity data. Solves the LP: min Σ λ_k s.t. U_k <= U_l + λ_l * p_l @ (x_k - x_l) for all k, l U_k >= 0, λ_k > ε Then constructs: u(x) = min_k { U_k + λ_k * p_k @ (x - x_k) } Returns: Tuple of (utility_function, U_values, lambda_values, success) """ from scipy.optimize import linprog T = P.shape[0] n_vars = 2 * T constraints_A = [] constraints_b = [] for k in range(T): for obs_l in range(T): if k == obs_l: continue row = np.zeros(n_vars) row[k] = 1.0 row[obs_l] = -1.0 diff = Q[obs_l] - Q[k] lambda_coef = P[obs_l] @ diff row[T + obs_l] = lambda_coef constraints_A.append(row) constraints_b.append(0.0) A_ub = np.array(constraints_A) if constraints_A else np.zeros((0, n_vars)) b_ub = np.array(constraints_b) if constraints_b else np.zeros(0) epsilon = 1e-6 bounds = [(0, None)] * T + [(epsilon, None)] * T c = np.zeros(n_vars) c[T:] = 1.0 try: result = linprog( c, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method="highs", options={"presolve": True}, ) if result.success: U = result.x[:T] lambdas = result.x[T:] # Construct the Afriat utility function def utility_fn(x: NDArray[np.float64]) -> float: x = np.asarray(x, dtype=np.float64) values = np.zeros(T) for k in range(T): values[k] = U[k] + lambdas[k] * (P[k] @ (x - Q[k])) return float(np.min(values)) return utility_fn, U, lambdas, True except Exception as e: from prefgraph.core.exceptions import OptimizationError raise OptimizationError( f"Afriat utility recovery failed for Hicksian demand. Original error: {e}" ) from e return None, None, None, False def _solve_hicksian_at_point( utility_fn: callable, prices: NDArray[np.float64], target_utility: float, observed_Q: NDArray[np.float64], tolerance: float = 1e-8, ) -> NDArray[np.float64] | None: """ Solve the Hicksian demand problem at a specific price-utility point. min_x p @ x s.t. U(x) >= u x >= 0 Args: utility_fn: Afriat utility function prices: N-dimensional price vector target_utility: Target utility level u observed_Q: Observed quantity matrix (for initial guess) tolerance: Numerical tolerance Returns: Optimal quantity vector or None if optimization fails """ from scipy.optimize import minimize N = len(prices) def objective(x: NDArray[np.float64]) -> float: return float(prices @ x) def utility_constraint(x: NDArray[np.float64]) -> float: return utility_fn(x) - target_utility # Find initial guess: observed bundle closest to target utility T = observed_Q.shape[0] obs_utilities = np.array([utility_fn(observed_Q[t]) for t in range(T)]) closest_idx = np.argmin(np.abs(obs_utilities - target_utility)) x0 = observed_Q[closest_idx].copy() # Ensure x0 is positive x0 = np.maximum(x0, 1e-6) bounds = [(1e-10, None)] * N constraints = [{"type": "ineq", "fun": utility_constraint}] try: result = minimize( objective, x0, method="SLSQP", bounds=bounds, constraints=constraints, options={"maxiter": 1000, "ftol": tolerance}, ) if result.success: return result.x except Exception as e: from prefgraph.core.exceptions import OptimizationError raise OptimizationError( f"Hicksian demand optimization failed. Original error: {e}" ) from e return None def _compute_hicksian_demand_approximation( session: ConsumerSession, target_utility: float | None = None, ) -> dict: """ Legacy approximation of Hicksian demand using finite differences. This is the original implementation kept for backward compatibility and as a fallback when Afriat recovery fails. """ T = session.num_observations N = session.num_goods P = session.prices Q = session.quantities # Estimate utility for each observation utilities = np.zeros(T) for t in range(T): utilities[t] = np.sum(np.log(Q[t] + 1e-10)) if target_utility is None: target_utility = np.median(utilities) # Find observations near target utility utility_diffs = np.abs(utilities - target_utility) near_target = utility_diffs < np.std(utilities) * 0.5 if np.sum(near_target) < 2: near_target = np.ones(T, dtype=bool) # Estimate Hicksian demand derivatives hicksian_derivatives = np.zeros((N, N), dtype=np.float64) for i in range(N): for j in range(N): effects = [] for t1 in range(T): if not near_target[t1]: continue for t2 in range(t1 + 1, T): if not near_target[t2]: continue dp_j = P[t2, j] - P[t1, j] if abs(dp_j) < 1e-10: continue dq_i = Q[t2, i] - Q[t1, i] effects.append(dq_i / dp_j) if effects: hicksian_derivatives[i, j] = np.median(effects) return { "success": False, "hicksian_demand_fn": None, "hicksian_derivatives": hicksian_derivatives, "target_utility": target_utility, "utility_function": None, "observation_utilities": utilities, "observations_used": int(np.sum(near_target)), }
[docs] def check_compensated_law_of_demand( session: ConsumerSession, tolerance: float = 1e-6, ) -> dict: """ Check if data satisfies the compensated law of demand. The compensated law states that substitution effects are negative for own-price changes: when price increases (holding utility constant), quantity demanded decreases. Args: session: ConsumerSession with prices and quantities tolerance: Numerical tolerance Returns: Dictionary with test results """ result = decompose_price_effects(session) # Check own-price substitution effects N = session.num_goods violations = [] for i in range(N): if result.substitution_effects[i, i] > tolerance: violations.append(i) return { "satisfies_law": len(violations) == 0, "violations": violations, "substitution_effects_diagonal": np.diag(result.substitution_effects), }
def _estimate_slutsky_components( P: NDArray[np.float64], Q: NDArray[np.float64], good_i: int, good_j: int, price_threshold: float, tolerance: float, ) -> tuple[float, float, float]: """ Estimate Slutsky decomposition components for a pair of goods. Returns: Tuple of (substitution_effect, income_effect, total_effect) """ T = P.shape[0] N = P.shape[1] # Precompute expenditures (vectorized) expenditures = np.sum(P * Q, axis=1) total_effects = [] income_effects_raw = [] for t1 in range(T): for t2 in range(t1 + 1, T): # Price change in good j dp_j = P[t2, good_j] - P[t1, good_j] if abs(dp_j) / max(P[t1, good_j], tolerance) < price_threshold: continue # Check other prices are relatively stable (vectorized) mask = np.ones(N, dtype=bool) mask[good_j] = False other_change = np.sum( np.abs(P[t2, mask] - P[t1, mask]) / np.maximum(P[t1, mask], tolerance) ) if other_change > price_threshold * (N - 1) * 0.5: continue # Quantity change in good i dq_i = Q[t2, good_i] - Q[t1, good_i] # Total effect (Marshallian) total_effect = dq_i / dp_j total_effects.append(total_effect) # Estimate income effect # Income effect = x_j * (dq_i/dm) dm = expenditures[t2] - expenditures[t1] if abs(dm) > tolerance: dq_dm = dq_i / dm x_j_avg = (Q[t1, good_j] + Q[t2, good_j]) / 2 income_effect = x_j_avg * dq_dm income_effects_raw.append(income_effect) if not total_effects: return 0.0, 0.0, 0.0 total_effect = np.median(total_effects) income_effect = np.median(income_effects_raw) if income_effects_raw else 0.0 substitution_effect = total_effect - income_effect return substitution_effect, income_effect, total_effect # ============================================================================= # ADDITIONAL TECH-FRIENDLY ALIASES # ============================================================================= compute_slutsky_decomposition = decompose_price_effects """ Compute Slutsky decomposition of price effects. Alias for decompose_price_effects. """ estimate_compensated_demand = compute_hicksian_demand """ Estimate compensated (Hicksian) demand. Alias for compute_hicksian_demand. """