"""Stochastic choice and random utility models.
Implements probabilistic choice models including logit, Luce model,
and random utility maximization (RUM). Based on Chapter 13 of
Chambers & Echenique (2016) "Revealed Preference Theory".
Tech-Friendly Names (Primary):
- fit_random_utility_model(): Fit RUM to stochastic choice data
- test_mcfadden_axioms(): Test IIA and regularity conditions
- estimate_choice_probabilities(): Predict choice probabilities
- check_independence_irrelevant_alternatives(): Test IIA
Economics Names (Legacy Aliases):
- fit_rum() -> fit_random_utility_model()
- check_iia() -> check_independence_irrelevant_alternatives()
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import numpy as np
from numpy.typing import NDArray
from scipy.optimize import minimize
from prefgraph.core.result import (
StochasticChoiceResult,
RUMConsistencyResult,
RegularityResult,
RegularityViolation,
)
from prefgraph.core.exceptions import SolverError
if TYPE_CHECKING:
from prefgraph.core.session import StochasticChoiceLog, MenuChoiceLog
[docs]
def fit_random_utility_model(
log: "StochasticChoiceLog",
model_type: str = "logit",
max_iterations: int = 1000,
) -> StochasticChoiceResult:
"""
Fit a random utility model to stochastic choice data.
Random utility models assume the consumer has utility U_i = V_i + epsilon_i
where V_i is deterministic and epsilon_i is random. Different assumptions
about epsilon distribution lead to different models:
- Logit: epsilon ~ Gumbel (IIA holds)
- Probit: epsilon ~ Normal
- Luce: probability proportional to utility
Args:
log: StochasticChoiceLog with choice frequency data
model_type: Type of model ("logit", "probit", "luce")
max_iterations: Maximum optimization iterations
Returns:
StochasticChoiceResult with model parameters and fit statistics
Example:
>>> from prefgraph import StochasticChoiceLog, fit_random_utility_model
>>> result = fit_random_utility_model(choice_data, model_type="logit")
>>> print(f"Model: {result.model_type}")
>>> print(f"Satisfies IIA: {result.satisfies_iia}")
>>> print(f"Log-likelihood: {result.log_likelihood:.2f}")
References:
Chambers & Echenique (2016), Chapter 13
McFadden, D. (1974). "Conditional Logit Analysis of Qualitative Choice Behavior"
"""
start_time = time.perf_counter()
n_menus = log.num_menus
# Estimate item utilities
if model_type == "logit":
utilities, parameters = _fit_logit_model(log, max_iterations)
elif model_type == "luce":
utilities, parameters = _fit_luce_model(log)
else:
# Default to logit
utilities, parameters = _fit_logit_model(log, max_iterations)
# Compute predicted choice probabilities
choice_probabilities = _compute_choice_probabilities(
log, utilities, model_type
)
# Compute log-likelihood
log_likelihood = _compute_log_likelihood(log, choice_probabilities)
# Compute AIC and BIC
n_params = len(utilities)
n_obs = sum(log.total_observations_per_menu)
aic = 2 * n_params - 2 * log_likelihood
bic = np.log(n_obs) * n_params - 2 * log_likelihood
# Test IIA (Independence of Irrelevant Alternatives)
satisfies_iia = check_independence_irrelevant_alternatives(log)
# Test regularity (monotonicity)
regularity_violations = _find_regularity_violations(log)
computation_time = (time.perf_counter() - start_time) * 1000
return StochasticChoiceResult(
model_type=model_type,
parameters=parameters,
satisfies_iia=satisfies_iia,
choice_probabilities=choice_probabilities,
log_likelihood=log_likelihood,
aic=aic,
bic=bic,
regularity_violations=regularity_violations,
computation_time_ms=computation_time,
)
[docs]
def test_mcfadden_axioms(
log: "StochasticChoiceLog",
) -> dict:
"""
Test McFadden's axioms for random utility maximization.
The axioms include:
1. Regularity: P(x|A) >= P(x|B) when A ⊆ B (removing options doesn't decrease choice probability)
2. IIA: P(x|A)/P(y|A) = P(x|B)/P(y|B) for all A,B containing x,y
Args:
log: StochasticChoiceLog with choice frequency data
Returns:
Dictionary with axiom test results
"""
satisfies_iia = check_independence_irrelevant_alternatives(log)
regularity_violations = _find_regularity_violations(log)
satisfies_regularity = len(regularity_violations) == 0
return {
"satisfies_iia": satisfies_iia,
"satisfies_regularity": satisfies_regularity,
"regularity_violations": regularity_violations,
"is_rum_consistent": satisfies_iia and satisfies_regularity,
}
[docs]
def check_independence_irrelevant_alternatives(
log: "StochasticChoiceLog",
tolerance: float = 0.1,
) -> bool:
"""
Test Independence of Irrelevant Alternatives (IIA).
IIA states that the relative odds of choosing x over y should not
depend on what other alternatives are available:
P(x|A) / P(y|A) = P(x|B) / P(y|B) for all menus A, B containing both x and y.
Args:
log: StochasticChoiceLog with choice frequency data
tolerance: Tolerance for ratio comparison
Returns:
True if IIA approximately holds
Note:
IIA is a strong condition that often fails in practice
(e.g., red bus/blue bus paradox).
"""
n_menus = log.num_menus
# For each pair of items, check if odds ratio is consistent across menus
items = sorted(log.all_items)
for x in items:
for y in items:
if x >= y:
continue
odds_ratios = []
for m_idx in range(n_menus):
menu = log.menus[m_idx]
if x in menu and y in menu:
p_x = log.get_choice_probability(m_idx, x)
p_y = log.get_choice_probability(m_idx, y)
if p_y > 1e-10:
ratio = p_x / p_y
odds_ratios.append(ratio)
# Check if odds ratios are consistent
if len(odds_ratios) >= 2:
cv = np.std(odds_ratios) / max(np.mean(odds_ratios), 1e-10)
if cv > tolerance:
return False
return True
[docs]
def estimate_choice_probabilities(
log: "StochasticChoiceLog",
utilities: NDArray[np.float64],
model_type: str = "logit",
) -> NDArray[np.float64]:
"""
Estimate choice probabilities given utilities.
Args:
log: StochasticChoiceLog with menu structure
utilities: Array of item utilities
model_type: Type of model
Returns:
Array of choice probabilities (flattened)
"""
return _compute_choice_probabilities(log, utilities, model_type)
[docs]
def fit_luce_model(
log: "StochasticChoiceLog",
) -> tuple[NDArray[np.float64], dict[str, float]]:
"""
Fit Luce choice model to stochastic choice data.
The Luce model (also called Bradley-Terry) assumes:
P(x|A) = v(x) / Σ_{y ∈ A} v(y)
where v(x) is the "choice value" of item x.
Args:
log: StochasticChoiceLog with choice frequency data
Returns:
Tuple of (utilities, parameters)
"""
return _fit_luce_model(log)
def _fit_logit_model(
log: "StochasticChoiceLog",
max_iterations: int = 1000,
) -> tuple[NDArray[np.float64], dict[str, float]]:
"""
Fit multinomial logit model using MLE.
"""
n_items = max(log.all_items) + 1
# Initial utilities
utilities = np.zeros(n_items)
# Objective: negative log-likelihood
def neg_log_likelihood(u: NDArray[np.float64]) -> float:
ll = 0.0
for m_idx in range(log.num_menus):
menu = log.menus[m_idx]
freqs = log.choice_frequencies[m_idx]
total = log.total_observations_per_menu[m_idx]
if total == 0:
continue
# Validate non-empty menu
if len(menu) == 0:
continue
# Compute choice probabilities using log-sum-exp trick for numerical stability
# This prevents overflow when utilities are large
menu_arr = np.array(list(menu))
u_menu = u[menu_arr]
max_u = np.max(u_menu) # Subtract max for numerical stability
exp_u = np.exp(u_menu - max_u)
log_sum_exp = max_u + np.log(np.sum(exp_u))
for item, count in freqs.items():
if count > 0:
# log(p) = u[item] - log_sum_exp
log_p = u[item] - log_sum_exp
ll += count * log_p
return -ll
# Optimize
result = minimize(
neg_log_likelihood,
utilities,
method="BFGS",
options={"maxiter": max_iterations},
)
utilities = result.x
# Normalize so minimum utility is 0
utilities = utilities - np.min(utilities)
parameters = {
"scale": 1.0, # Logit scale parameter
"convergence": float(result.success),
}
return utilities, parameters
def _fit_luce_model(
log: "StochasticChoiceLog",
) -> tuple[NDArray[np.float64], dict[str, float]]:
"""
Fit Luce choice model using simple frequency-based estimation.
"""
n_items = max(log.all_items) + 1
# Estimate v(x) from choice frequencies
# Use: v(x) ∝ average choice probability across menus containing x
choice_counts = np.zeros(n_items)
appearance_counts = np.zeros(n_items)
for m_idx in range(log.num_menus):
menu = log.menus[m_idx]
freqs = log.choice_frequencies[m_idx]
total = log.total_observations_per_menu[m_idx]
for item in menu:
appearance_counts[item] += total
choice_counts[item] += freqs.get(item, 0)
# Estimate utilities as log of choice values
utilities = np.zeros(n_items)
for i in range(n_items):
if appearance_counts[i] > 0:
v_i = choice_counts[i] / appearance_counts[i]
utilities[i] = np.log(max(v_i, 1e-10))
else:
utilities[i] = -10.0 # Very low utility for unseen items
# Normalize
utilities = utilities - np.min(utilities)
parameters = {
"method": "frequency_based",
}
return utilities, parameters
def _compute_choice_probabilities(
log: "StochasticChoiceLog",
utilities: NDArray[np.float64],
model_type: str,
) -> NDArray[np.float64]:
"""
Compute choice probabilities for all menus.
Uses log-sum-exp trick for numerical stability.
"""
all_probs = []
for m_idx in range(log.num_menus):
menu = log.menus[m_idx]
# Handle empty menus
if len(menu) == 0:
continue
menu_arr = np.array(list(menu))
u_menu = utilities[menu_arr]
if model_type == "logit" or model_type == "luce":
# Use log-sum-exp trick for numerical stability
max_u = np.max(u_menu)
exp_u = np.exp(u_menu - max_u)
sum_exp_u = np.sum(exp_u)
probs = exp_u / sum_exp_u
else:
# Default to logit with log-sum-exp
max_u = np.max(u_menu)
exp_u = np.exp(u_menu - max_u)
sum_exp_u = np.sum(exp_u)
probs = exp_u / sum_exp_u
all_probs.extend(probs)
return np.array(all_probs)
def _compute_log_likelihood(
log: "StochasticChoiceLog",
choice_probabilities: NDArray[np.float64],
) -> float:
"""
Compute log-likelihood of the model.
"""
ll = 0.0
prob_idx = 0
for m_idx in range(log.num_menus):
menu = list(log.menus[m_idx])
freqs = log.choice_frequencies[m_idx]
for item in menu:
count = freqs.get(item, 0)
if count > 0:
p = choice_probabilities[prob_idx]
ll += count * np.log(max(p, 1e-10))
prob_idx += 1
return ll
def _find_regularity_violations(
log: "StochasticChoiceLog",
tolerance: float = 0.01,
) -> list[int]:
"""
Find observations that violate regularity.
Regularity: if A ⊆ B, then P(x|A) >= P(x|B) for all x ∈ A.
(Removing options should not decrease choice probability.)
Args:
log: StochasticChoiceLog with choice frequency data
tolerance: Tolerance for probability comparison (default 0.01)
"""
violations = []
n_menus = log.num_menus
for m1 in range(n_menus):
for m2 in range(n_menus):
if m1 == m2:
continue
menu1 = log.menus[m1]
menu2 = log.menus[m2]
# Check if menu1 ⊆ menu2
if menu1.issubset(menu2):
# For each item in menu1, P(x|menu1) should >= P(x|menu2)
for item in menu1:
p1 = log.get_choice_probability(m1, item)
p2 = log.get_choice_probability(m2, item)
if p1 < p2 - tolerance:
violations.append(m1)
break
return list(set(violations))
def fit_from_deterministic(
log: "MenuChoiceLog",
model_type: str = "logit",
) -> StochasticChoiceResult:
"""
Fit a stochastic model to deterministic choice data.
Treats each deterministic choice as a single observation and
aggregates by menu to create stochastic choice data.
Args:
log: MenuChoiceLog with deterministic choices
model_type: Type of stochastic model to fit
Returns:
StochasticChoiceResult with fitted model
"""
from prefgraph.core.session import StochasticChoiceLog
# Convert to stochastic format
stochastic_log = StochasticChoiceLog.from_repeated_choices(
log.menus, log.choices
)
return fit_random_utility_model(stochastic_log, model_type)
# =============================================================================
# LEGACY ALIASES
# =============================================================================
fit_rum = fit_random_utility_model
"""Legacy alias: use fit_random_utility_model instead."""
check_iia = check_independence_irrelevant_alternatives
"""Legacy alias: use check_independence_irrelevant_alternatives instead."""
def test_regularity(
log: "StochasticChoiceLog",
tolerance: float = 0.01,
) -> RegularityResult:
"""
Test the regularity axiom for stochastic choice data.
Regularity (Luce axiom) states that adding options to a menu should
never INCREASE the probability of choosing any particular item:
For all A ⊆ B and x ∈ A: P(x|A) >= P(x|B)
Violations indicate decoy effects, attraction effects, or
consideration set changes.
Args:
log: StochasticChoiceLog with choice frequency data
tolerance: Tolerance for probability comparison (default 0.01)
Returns:
RegularityResult with detailed violation information
Example:
>>> from prefgraph import StochasticChoiceLog, test_regularity
>>> log = StochasticChoiceLog(
... menus=[{0,1}, {0,1,2}],
... choice_frequencies=[{0: 60, 1: 40}, {0: 45, 1: 35, 2: 20}],
... total_observations_per_menu=[100, 100]
... )
>>> result = test_regularity(log)
>>> if not result.satisfies_regularity:
... print(f"Found {result.num_violations} violations")
... print(f"Worst: {result.worst_violation}")
References:
Luce, R. D. (1959). Individual Choice Behavior
Chambers & Echenique (2016), Chapter 13
"""
start_time = time.perf_counter()
violations: list[RegularityViolation] = []
n_menus = log.num_menus
num_testable_pairs = 0
for m1 in range(n_menus):
for m2 in range(n_menus):
if m1 == m2:
continue
menu1 = log.menus[m1]
menu2 = log.menus[m2]
# Check if menu1 ⊆ menu2
if menu1.issubset(menu2):
num_testable_pairs += 1
# For each item in menu1, P(x|menu1) should >= P(x|menu2)
for item in menu1:
p_subset = log.get_choice_probability(m1, item)
p_superset = log.get_choice_probability(m2, item)
if p_subset < p_superset - tolerance:
magnitude = p_superset - p_subset
violations.append(RegularityViolation(
item=item,
subset_menu_idx=m1,
superset_menu_idx=m2,
prob_in_subset=p_subset,
prob_in_superset=p_superset,
magnitude=magnitude,
))
# Find worst violation
worst_violation = None
if violations:
worst_violation = max(violations, key=lambda v: v.magnitude)
# Compute violation rate
violation_rate = len(violations) / max(num_testable_pairs, 1)
satisfies_regularity = len(violations) == 0
computation_time = (time.perf_counter() - start_time) * 1000
return RegularityResult(
satisfies_regularity=satisfies_regularity,
violations=violations,
worst_violation=worst_violation,
violation_rate=violation_rate,
num_testable_pairs=num_testable_pairs,
computation_time_ms=computation_time,
)
# =============================================================================
# RUM CONSISTENCY TESTING (Block & Marschak 1960, Smeulders et al. 2021)
# =============================================================================
def test_rum_consistency(
log: "StochasticChoiceLog",
tolerance: float = 1e-6,
max_iterations: int = 1000,
) -> RUMConsistencyResult:
"""
Test if stochastic choice data can be rationalized by a Random Utility Model.
A RUM represents choice probabilities as:
P(choose x | S) = sum_{sigma: x = argmax_{y in S} sigma(y)} pi(sigma)
where pi is a probability distribution over preference orderings sigma.
This uses the column generation algorithm from Smeulders et al. (2021)
for computational efficiency with large numbers of items.
Args:
log: StochasticChoiceLog with choice frequency data
tolerance: Numerical tolerance for LP feasibility
max_iterations: Maximum iterations for column generation
Returns:
RUMConsistencyResult with consistency test and rationalizing distribution
Example:
>>> from prefgraph import StochasticChoiceLog, test_rum_consistency
>>> log = StochasticChoiceLog(
... menus=[{0,1,2}, {0,1}, {1,2}],
... choice_frequencies=[{0: 40, 1: 35, 2: 25}, {0: 55, 1: 45}, {1: 60, 2: 40}],
... total_observations_per_menu=[100, 100, 100]
... )
>>> result = test_rum_consistency(log)
>>> print(f"RUM consistent: {result.is_rum_consistent}")
>>> if result.rationalizing_distribution:
... print(f"Uses {result.num_orderings_used} orderings")
References:
Block, H. D., & Marschak, J. (1960). Random orderings and stochastic theories
of responses. Contributions to probability and statistics, 2, 97-132.
Smeulders, B., Crama, Y., & Spieksma, F. C. (2021). Revealed preference theory:
An algorithmic outlook. European Journal of Operational Research, 294(3).
"""
start_time = time.perf_counter()
all_items = sorted(log.all_items)
n_items = len(all_items)
# First check regularity (necessary condition)
regularity_violations = _find_regularity_violations(log)
regularity_satisfied = len(regularity_violations) == 0
# For small n, use exact enumeration
if n_items <= 6:
result = _test_rum_exact(log, tolerance)
else:
# For larger n, use column generation
result = _test_rum_column_generation(log, tolerance, max_iterations)
computation_time = (time.perf_counter() - start_time) * 1000
return RUMConsistencyResult(
is_rum_consistent=result["is_consistent"],
distance_to_rum=result["distance"],
regularity_satisfied=regularity_satisfied,
num_orderings_used=result["num_orderings"],
rationalizing_distribution=result["distribution"],
num_iterations=result["iterations"],
constraint_violations=result["violations"],
computation_time_ms=computation_time,
)
def _test_rum_exact(
log: "StochasticChoiceLog",
tolerance: float,
) -> dict:
"""Test RUM consistency using exact enumeration of all orderings."""
from itertools import permutations
from scipy.optimize import linprog
all_items = sorted(log.all_items)
n_items = len(all_items)
# Generate all possible orderings
orderings = list(permutations(all_items))
n_orderings = len(orderings)
# Build constraint matrix
# For each (menu, item) pair, we have a constraint:
# sum_{sigma: item is best in menu under sigma} pi_sigma = observed_prob
constraints_eq = []
b_eq = []
constraint_labels = []
for m_idx in range(log.num_menus):
menu = log.menus[m_idx]
total = log.total_observations_per_menu[m_idx]
if total == 0:
continue
for item in menu:
observed_prob = log.get_choice_probability(m_idx, item)
# Build row: coefficient for each ordering
row = np.zeros(n_orderings)
for o_idx, ordering in enumerate(orderings):
# Is item the best in menu under this ordering?
rank = {x: i for i, x in enumerate(ordering)}
best_in_menu = min(menu, key=lambda x: rank[x])
if best_in_menu == item:
row[o_idx] = 1.0
constraints_eq.append(row)
b_eq.append(observed_prob)
constraint_labels.append(f"P({item}|{set(menu)})={observed_prob:.3f}")
# Add constraint: probabilities sum to 1
constraints_eq.append(np.ones(n_orderings))
b_eq.append(1.0)
A_eq = np.array(constraints_eq)
b_eq = np.array(b_eq)
# Bounds: pi >= 0
bounds = [(0.0, 1.0) for _ in range(n_orderings)]
# Objective: minimize sum of slack variables (we use Phase I LP)
# Actually for feasibility, any objective works
c = np.zeros(n_orderings)
try:
result = linprog(c, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs')
if result.success:
# Extract non-zero probabilities
distribution = {}
for o_idx, prob in enumerate(result.x):
if prob > tolerance:
distribution[orderings[o_idx]] = float(prob)
return {
"is_consistent": True,
"distance": 0.0,
"num_orderings": len(distribution),
"distribution": distribution,
"iterations": 1,
"violations": [],
}
else:
# Compute distance to nearest RUM using relaxed LP
distance, violations = _compute_rum_distance(log, orderings, tolerance)
return {
"is_consistent": False,
"distance": distance,
"num_orderings": 0,
"distribution": None,
"iterations": 1,
"violations": violations,
}
except SolverError:
raise
except Exception as e:
raise SolverError(
f"LP solver failed during RUM exact test. Original error: {e}"
) from e
def _compute_rum_distance(
log: "StochasticChoiceLog",
orderings: list[tuple[int, ...]],
tolerance: float,
) -> tuple[float, list[str]]:
"""Compute L1 distance to nearest RUM."""
from scipy.optimize import linprog
n_orderings = len(orderings)
# Variables: pi_1, ..., pi_K (ordering probs), s+ and s- slack variables
# For each constraint, add slack: A @ pi + s+ - s- = b
# Minimize: sum(s+) + sum(s-)
n_constraints = 0
constraints = []
b = []
for m_idx in range(log.num_menus):
menu = log.menus[m_idx]
total = log.total_observations_per_menu[m_idx]
if total == 0:
continue
for item in menu:
observed_prob = log.get_choice_probability(m_idx, item)
row = np.zeros(n_orderings)
for o_idx, ordering in enumerate(orderings):
rank = {x: i for i, x in enumerate(ordering)}
best_in_menu = min(menu, key=lambda x: rank[x])
if best_in_menu == item:
row[o_idx] = 1.0
constraints.append(row)
b.append(observed_prob)
n_constraints += 1
# Sum to 1 constraint
constraints.append(np.ones(n_orderings))
b.append(1.0)
n_constraints += 1
# Build augmented system with slack variables
# Variables: [pi_1, ..., pi_K, s+_1, ..., s+_m, s-_1, ..., s-_m]
n_slack = n_constraints - 1 # Don't need slack for sum-to-1
n_vars = n_orderings + 2 * n_slack
A_eq = np.zeros((n_constraints, n_vars))
b_eq = np.array(b)
for i in range(n_constraints - 1):
A_eq[i, :n_orderings] = constraints[i]
A_eq[i, n_orderings + i] = 1 # s+
A_eq[i, n_orderings + n_slack + i] = -1 # s-
# Sum to 1 (no slack)
A_eq[n_constraints - 1, :n_orderings] = constraints[n_constraints - 1]
# Objective: minimize sum of slack
c = np.zeros(n_vars)
c[n_orderings:] = 1.0
# Bounds
bounds = [(0.0, 1.0) for _ in range(n_orderings)] + [(0.0, None) for _ in range(2 * n_slack)]
try:
result = linprog(c, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs')
if result.success:
distance = float(result.fun)
# Identify which constraints are violated
violations = []
slack_plus = result.x[n_orderings:n_orderings + n_slack]
slack_minus = result.x[n_orderings + n_slack:]
for i in range(n_slack):
if slack_plus[i] > tolerance or slack_minus[i] > tolerance:
violations.append(f"Constraint {i}: slack+ = {slack_plus[i]:.4f}, slack- = {slack_minus[i]:.4f}")
return distance, violations
else:
raise SolverError(
f"LP solver failed when computing RUM distance. Status: {result.status}, "
f"Message: {result.message}"
)
except SolverError:
raise
except Exception as e:
raise SolverError(
f"LP solver failed during RUM distance computation. Original error: {e}"
) from e
def _test_rum_column_generation(
log: "StochasticChoiceLog",
tolerance: float,
max_iterations: int,
) -> dict:
"""
Test RUM consistency using column generation algorithm.
Based on Smeulders et al. (2021) algorithm.
"""
from scipy.optimize import linprog
all_items = sorted(log.all_items)
n_items = len(all_items)
# Start with a few initial orderings
initial_orderings = _generate_initial_orderings(log, n_items)
active_orderings = list(initial_orderings)
# Build initial constraint structure
n_constraints = 0
observed_probs = []
for m_idx in range(log.num_menus):
menu = log.menus[m_idx]
total = log.total_observations_per_menu[m_idx]
if total == 0:
continue
for item in menu:
observed_probs.append(log.get_choice_probability(m_idx, item))
n_constraints += 1
# Iterative column generation
for iteration in range(max_iterations):
n_orderings = len(active_orderings)
# Build constraint matrix for current orderings
A_eq = np.zeros((n_constraints + 1, n_orderings)) # +1 for sum constraint
constraint_idx = 0
for m_idx in range(log.num_menus):
menu = log.menus[m_idx]
total = log.total_observations_per_menu[m_idx]
if total == 0:
continue
for item in menu:
for o_idx, ordering in enumerate(active_orderings):
rank = {x: i for i, x in enumerate(ordering)}
best_in_menu = min(menu, key=lambda x: rank[x])
if best_in_menu == item:
A_eq[constraint_idx, o_idx] = 1.0
constraint_idx += 1
# Sum to 1 constraint
A_eq[n_constraints, :] = 1.0
b_eq = np.array(observed_probs + [1.0])
# Solve LP
c = np.zeros(n_orderings)
bounds = [(0.0, 1.0) for _ in range(n_orderings)]
try:
result = linprog(c, A_eq=A_eq, b_eq=b_eq, bounds=bounds, method='highs')
except Exception as e:
raise SolverError(
f"LP solver failed during RUM column generation at iteration {iteration + 1}. "
f"Original error: {e}"
) from e
if result.success:
# Found feasible solution
distribution = {}
for o_idx, prob in enumerate(result.x):
if prob > tolerance:
distribution[active_orderings[o_idx]] = float(prob)
return {
"is_consistent": True,
"distance": 0.0,
"num_orderings": len(distribution),
"distribution": distribution,
"iterations": iteration + 1,
"violations": [],
}
# Pricing problem: find new ordering to add
new_ordering = _solve_pricing_problem(log, all_items, n_constraints)
if new_ordering is None or new_ordering in active_orderings:
# No improving column found
break
active_orderings.append(new_ordering)
# Column generation didn't find solution
return {
"is_consistent": False,
"distance": 1.0,
"num_orderings": 0,
"distribution": None,
"iterations": max_iterations,
"violations": ["Column generation did not converge"],
}
def _generate_initial_orderings(
log: "StochasticChoiceLog",
n_items: int,
) -> list[tuple[int, ...]]:
"""Generate initial set of orderings for column generation."""
all_items = sorted(log.all_items)
orderings = []
# Add ordering based on overall choice frequency
choice_counts = {item: 0 for item in all_items}
for m_idx in range(log.num_menus):
freqs = log.choice_frequencies[m_idx]
for item, count in freqs.items():
choice_counts[item] += count
sorted_items = sorted(all_items, key=lambda x: -choice_counts.get(x, 0))
orderings.append(tuple(sorted_items))
# Add reverse ordering
orderings.append(tuple(reversed(sorted_items)))
# Add some random orderings
for _ in range(min(5, n_items)):
perm = tuple(np.random.permutation(all_items).tolist())
if perm not in orderings:
orderings.append(perm)
return orderings
def _solve_pricing_problem(
log: "StochasticChoiceLog",
all_items: list[int],
n_constraints: int,
) -> tuple[int, ...] | None:
"""Solve pricing problem to find new ordering to add."""
# Simple heuristic: generate random orderings and check if they help
for _ in range(10):
perm = tuple(np.random.permutation(all_items).tolist())
return perm
return None
def compute_distance_to_rum(
log: "StochasticChoiceLog",
norm: str = "l1",
) -> float:
"""
Compute distance from observed choice data to nearest RUM.
Args:
log: StochasticChoiceLog with choice frequency data
norm: Distance norm ("l1", "l2", "linf")
Returns:
Distance to nearest RUM (0 = RUM consistent)
Example:
>>> log = StochasticChoiceLog(...)
>>> distance = compute_distance_to_rum(log)
>>> print(f"Distance to RUM: {distance:.4f}")
"""
result = test_rum_consistency(log)
return result.distance_to_rum
def fit_rum_distribution(
log: "StochasticChoiceLog",
) -> dict[tuple[int, ...], float]:
"""
Fit a RUM distribution to stochastic choice data.
Returns the sparse representation of the probability distribution
over preference orderings that best fits the data.
Args:
log: StochasticChoiceLog with choice frequency data
Returns:
Dict mapping preference orderings to probabilities
Example:
>>> log = StochasticChoiceLog(...)
>>> distribution = fit_rum_distribution(log)
>>> for ordering, prob in distribution.items():
... print(f"{' > '.join(map(str, ordering))}: {prob:.3f}")
"""
result = test_rum_consistency(log)
if result.rationalizing_distribution:
return result.rationalizing_distribution
return {}
# =============================================================================
# STOCHASTIC TRANSITIVITY TESTS (WST/MST/SST)
# =============================================================================
def test_stochastic_transitivity(
log: "StochasticChoiceLog",
level: str = "all",
tolerance: float = 0.01,
) -> "StochasticTransitivityResult":
"""
Test stochastic transitivity axioms (WST/MST/SST).
Stochastic transitivity axioms describe how pairwise choice probabilities
should be related when forming transitive chains. Let P(a,b) denote the
probability of choosing a over b in a binary choice.
Axiom Definitions:
- WST (Weak): P(a,b) > 0.5 and P(b,c) > 0.5 => P(a,c) > 0.5
- MST (Moderate): P(a,b) > 0.5 and P(b,c) > 0.5 => P(a,c) >= min(P(a,b), P(b,c))
- SST (Strong): P(a,b) > 0.5 and P(b,c) > 0.5 => P(a,c) >= max(P(a,b), P(b,c))
These form a hierarchy: SST => MST => WST. If a consumer's choices satisfy
SST, they also satisfy MST and WST.
Args:
log: StochasticChoiceLog with pairwise choice data
level: Which levels to test - "all", "weak", "moderate", or "strong"
tolerance: Tolerance for probability comparisons
Returns:
StochasticTransitivityResult with violation details for each level
Example:
>>> from prefgraph import StochasticChoiceLog, test_stochastic_transitivity
>>> log = StochasticChoiceLog(
... menus=[{0,1}, {1,2}, {0,2}],
... choice_frequencies=[{0: 60, 1: 40}, {1: 55, 2: 45}, {0: 70, 2: 30}],
... total_observations_per_menu=[100, 100, 100]
... )
>>> result = test_stochastic_transitivity(log)
>>> print(f"Strongest level satisfied: {result.strongest_satisfied}")
References:
Luce, R. D. (1959). Individual Choice Behavior: A Theoretical Analysis.
Tversky, A. (1969). Intransitivity of preferences. Psychological Review.
"""
from prefgraph.core.result import StochasticTransitivityResult
start_time = time.perf_counter()
# Build pairwise probability matrix
items = sorted(log.all_items)
n_items = len(items)
item_to_idx = {item: idx for idx, item in enumerate(items)}
# P[i,j] = probability of choosing item i over item j
P_matrix = np.full((n_items, n_items), 0.5)
# Extract pairwise probabilities from menus of size 2
for m_idx in range(log.num_menus):
menu = log.menus[m_idx]
if len(menu) == 2:
items_in_menu = list(menu)
total = log.total_observations_per_menu[m_idx]
if total > 0:
i, j = items_in_menu[0], items_in_menu[1]
idx_i, idx_j = item_to_idx[i], item_to_idx[j]
p_i = log.get_choice_probability(m_idx, i)
P_matrix[idx_i, idx_j] = p_i
P_matrix[idx_j, idx_i] = 1 - p_i
# Test all transitivity levels
wst_violations: list[tuple[int, int, int]] = []
mst_violations: list[tuple[int, int, int]] = []
sst_violations: list[tuple[int, int, int]] = []
num_testable = 0
for a_idx in range(n_items):
for b_idx in range(n_items):
if a_idx == b_idx:
continue
for c_idx in range(n_items):
if c_idx == a_idx or c_idx == b_idx:
continue
p_ab = P_matrix[a_idx, b_idx]
p_bc = P_matrix[b_idx, c_idx]
p_ac = P_matrix[a_idx, c_idx]
# Only test if P(a,b) > 0.5 and P(b,c) > 0.5
if p_ab > 0.5 + tolerance and p_bc > 0.5 + tolerance:
num_testable += 1
a, b, c = items[a_idx], items[b_idx], items[c_idx]
# WST: P(a,c) > 0.5
if p_ac <= 0.5 + tolerance:
wst_violations.append((a, b, c))
# MST: P(a,c) >= min(P(a,b), P(b,c))
if p_ac < min(p_ab, p_bc) - tolerance:
mst_violations.append((a, b, c))
# SST: P(a,c) >= max(P(a,b), P(b,c))
if p_ac < max(p_ab, p_bc) - tolerance:
sst_violations.append((a, b, c))
satisfies_wst = len(wst_violations) == 0
satisfies_mst = len(mst_violations) == 0
satisfies_sst = len(sst_violations) == 0
computation_time = (time.perf_counter() - start_time) * 1000
return StochasticTransitivityResult(
satisfies_wst=satisfies_wst,
satisfies_mst=satisfies_mst,
satisfies_sst=satisfies_sst,
wst_violations=wst_violations,
mst_violations=mst_violations,
sst_violations=sst_violations,
num_testable_triples=num_testable,
computation_time_ms=computation_time,
)
# =============================================================================
# ADDITIVE PERTURBED UTILITY (Fudenberg et al. 2015)
# =============================================================================
def test_additive_perturbed_utility(
log: "StochasticChoiceLog",
tolerance: float = 0.01,
) -> dict:
"""
Test if stochastic choices satisfy Additive Perturbed Utility (APU).
APU models assume the agent maximizes:
U(x) - c(p(x))
where U(x) is utility, p(x) is the choice probability, and c is a
strictly convex "cost of attention" function. This generalizes
the multinomial logit model.
Key testable implications:
1. Regularity: P(x|A) >= P(x|B) when A subset of B
2. Monotonicity in choice probabilities
3. Log-odds linearity (for logit special case)
Args:
log: StochasticChoiceLog with choice frequency data
tolerance: Tolerance for probability comparisons
Returns:
Dict with test results including:
- is_apu_consistent: Whether data is consistent with APU
- satisfies_regularity: Whether regularity holds
- regularity_violations: List of regularity violations
- monotonicity_score: Measure of monotonicity
Example:
>>> from prefgraph import StochasticChoiceLog, test_additive_perturbed_utility
>>> result = test_additive_perturbed_utility(log)
>>> if result["is_apu_consistent"]:
... print("Consistent with APU model")
References:
Fudenberg, D., Iijima, R., & Strzalecki, T. (2015). Stochastic choice
and revealed perturbed utility. Econometrica, 83(6), 2371-2409.
"""
start_time = time.perf_counter()
# Test regularity (necessary condition for APU)
regularity_violations = _find_regularity_violations(log, tolerance)
satisfies_regularity = len(regularity_violations) == 0
# Test IIA (logit special case of APU)
satisfies_iia = check_independence_irrelevant_alternatives(log, tolerance)
# Compute monotonicity score
# For APU: items with higher utility should have higher choice probabilities
# across different menus (after accounting for menu composition)
monotonicity_score = _compute_monotonicity_score(log)
# APU is consistent if regularity holds
# (This is a necessary but not sufficient condition)
is_apu_consistent = satisfies_regularity
computation_time = (time.perf_counter() - start_time) * 1000
return {
"is_apu_consistent": is_apu_consistent,
"satisfies_regularity": satisfies_regularity,
"satisfies_iia": satisfies_iia,
"regularity_violations": regularity_violations,
"monotonicity_score": monotonicity_score,
"is_logit_consistent": satisfies_iia,
"computation_time_ms": computation_time,
}
def _compute_monotonicity_score(log: "StochasticChoiceLog") -> float:
"""
Compute monotonicity score for choice probabilities.
Higher score indicates more monotonic relationship between
item "quality" and choice probabilities.
"""
items = sorted(log.all_items)
n_items = len(items)
if n_items < 2:
return 1.0
# Estimate item quality from average choice probability
quality = {}
for item in items:
probs = []
for m_idx in range(log.num_menus):
if item in log.menus[m_idx]:
probs.append(log.get_choice_probability(m_idx, item))
quality[item] = np.mean(probs) if probs else 0.5
# Count concordant/discordant pairs across menus
concordant = 0
discordant = 0
for m_idx in range(log.num_menus):
menu_items = list(log.menus[m_idx])
for i in range(len(menu_items)):
for j in range(i + 1, len(menu_items)):
item_i, item_j = menu_items[i], menu_items[j]
p_i = log.get_choice_probability(m_idx, item_i)
p_j = log.get_choice_probability(m_idx, item_j)
# Does quality ordering match probability ordering?
quality_order = quality[item_i] > quality[item_j]
prob_order = p_i > p_j
if quality_order == prob_order:
concordant += 1
else:
discordant += 1
total = concordant + discordant
if total == 0:
return 1.0
return concordant / total
# =============================================================================
# ADDITIONAL LEGACY ALIASES
# =============================================================================
check_rum_consistency = test_rum_consistency
"""Legacy alias: use test_rum_consistency instead."""
test_wst = test_stochastic_transitivity
"""Legacy alias: use test_stochastic_transitivity instead."""
check_stochastic_transitivity = test_stochastic_transitivity
"""Legacy alias: use test_stochastic_transitivity instead."""
check_apu = test_additive_perturbed_utility
"""Legacy alias: use test_additive_perturbed_utility instead."""