"""Limited attention models for choice analysis.
Implements consideration set models where consumers don't see all options.
Based on Chapter 14 of Chambers & Echenique (2016) "Revealed Preference Theory"
and Masatlioglu, Nakajima & Ozbay (2012) "Revealed Attention" (AER).
Key insight: apparent irrationality may be due to limited attention rather
than inconsistent preferences. A choice is "attention-rational" if it's
optimal among the items actually considered.
Tech-Friendly Names (Primary):
- test_warp_la(): Test WARP with Limited Attention (Masatlioglu et al. 2012)
- estimate_consideration_sets(): Estimate which items are considered
- test_attention_rationality(): Test rationalizability with limited attention
- compute_salience_weights(): Estimate feature-based attention weights
- fit_random_attention_model(): Fit Random Attention Model (Cattaneo et al. 2020)
Economics Names (Legacy Aliases):
- identify_attention() -> estimate_consideration_sets()
- check_attention_rationality() -> test_attention_rationality()
- check_warp_la() -> test_warp_la()
"""
from __future__ import annotations
import time
from itertools import permutations
from typing import TYPE_CHECKING
import numpy as np
from numpy.typing import NDArray
from scipy.optimize import linprog
from prefgraph.core.result import (
AttentionResult,
WARPLAResult,
RandomAttentionResult,
AttentionOverloadResult,
StatusQuoBiasResult,
)
from prefgraph.core.exceptions import (
ComputationalLimitError,
StatisticalError,
DataValidationError,
)
if TYPE_CHECKING:
from prefgraph.core.session import MenuChoiceLog, StochasticChoiceLog
[docs]
def test_attention_rationality(
log: "MenuChoiceLog",
max_consideration_size: int | None = None,
) -> AttentionResult:
"""
Test if choices are rationalizable with limited attention.
A choice is attention-rational if there exists:
1. A preference ordering over items
2. A consideration set function (what items are noticed)
Such that each choice is optimal among considered items.
This is a weaker notion than standard rationality - it allows
apparent violations due to limited attention.
Args:
log: MenuChoiceLog with menus and choices
max_consideration_size: Maximum consideration set size (None = no limit)
Returns:
AttentionResult with consideration sets and attention analysis
Example:
>>> from prefgraph import MenuChoiceLog, test_attention_rationality
>>> result = test_attention_rationality(choice_log)
>>> if result.is_attention_rational:
... print("Choices are rationalizable with limited attention")
... print(f"Avg consideration set size: {result.mean_consideration_size:.1f}")
Note:
**Complexity**: This function calls validate_menu_sarp internally, which uses
Floyd-Warshall with O(I³) complexity where I is the number of unique items.
For large item sets, this can be slow.
References:
Chambers & Echenique (2016), Chapter 14
Manzini, P. & Mariotti, M. (2014). "Stochastic Choice and Consideration Sets"
"""
start_time = time.perf_counter()
n_obs = log.num_observations
# First check standard SARP consistency
from prefgraph.algorithms.abstract_choice import validate_menu_sarp
sarp_result = validate_menu_sarp(log)
if sarp_result.is_consistent:
# Already rational without limited attention
# Consideration sets are full menus
consideration_sets = [set(menu) for menu in log.menus]
computation_time = (time.perf_counter() - start_time) * 1000
return AttentionResult(
consideration_sets=consideration_sets,
attention_parameter=1.0,
is_attention_rational=True,
salience_weights=np.ones(max(log.all_items) + 1),
default_option=None,
inattention_rate=0.0,
rationalizable_observations=list(range(n_obs)),
computation_time_ms=computation_time,
)
# Try to find consideration sets that rationalize the data
consideration_sets, is_rational, rationalizable_obs = _find_consideration_sets(
log, max_consideration_size
)
# Compute attention parameter (average consideration set size / menu size)
total_considered = sum(len(cs) for cs in consideration_sets)
total_available = sum(len(menu) for menu in log.menus)
attention_parameter = total_considered / max(total_available, 1)
# Estimate salience weights
salience_weights = compute_salience_weights(log, consideration_sets)
# Identify default option (if any)
default_option = _identify_default_option(log, consideration_sets)
# Inattention rate
inattention_obs = [
t for t in range(n_obs)
if len(consideration_sets[t]) < len(log.menus[t])
]
inattention_rate = len(inattention_obs) / n_obs if n_obs > 0 else 0.0
computation_time = (time.perf_counter() - start_time) * 1000
return AttentionResult(
consideration_sets=consideration_sets,
attention_parameter=attention_parameter,
is_attention_rational=is_rational,
salience_weights=salience_weights,
default_option=default_option,
inattention_rate=inattention_rate,
rationalizable_observations=rationalizable_obs,
computation_time_ms=computation_time,
)
[docs]
def estimate_consideration_sets(
log: "MenuChoiceLog",
method: str = "greedy",
) -> list[set[int]]:
"""
Estimate consideration sets for each observation.
The consideration set is the subset of menu items that the
consumer actually notices/considers before making a choice.
Args:
log: MenuChoiceLog with menus and choices
method: Estimation method ("greedy", "optimal", "salience")
Returns:
List of consideration sets, one per observation
Note:
The chosen item is always in the consideration set.
"""
if method == "greedy":
consideration_sets, _, _ = _find_consideration_sets(log, None)
elif method == "salience":
consideration_sets = _estimate_salience_based_consideration(log)
else:
consideration_sets, _, _ = _find_consideration_sets(log, None)
return consideration_sets
[docs]
def compute_salience_weights(
log: "MenuChoiceLog",
consideration_sets: list[set[int]] | None = None,
) -> NDArray[np.float64]:
"""
Compute salience weights for each item.
Higher weight = more likely to be noticed/considered.
Estimated from frequency of appearing in consideration sets.
Args:
log: MenuChoiceLog with menus and choices
consideration_sets: Optional pre-computed consideration sets
Returns:
Array of salience weights (one per item)
"""
if consideration_sets is None:
consideration_sets = estimate_consideration_sets(log)
max_item = max(log.all_items)
weights = np.zeros(max_item + 1)
counts = np.zeros(max_item + 1)
for t, (menu, cs) in enumerate(zip(log.menus, consideration_sets)):
for item in menu:
counts[item] += 1
if item in cs:
weights[item] += 1
# Normalize to get probability of consideration
for i in range(len(weights)):
if counts[i] > 0:
weights[i] = weights[i] / counts[i]
return weights
def _find_consideration_sets(
log: "MenuChoiceLog",
max_size: int | None,
) -> tuple[list[set[int]], bool, list[int]]:
"""
Find consideration sets that rationalize the data.
Uses a greedy heuristic algorithm:
1. Start with consideration = {chosen item}
2. Add items needed to maintain preference consistency
**Algorithmic Limitation**: This is a greedy heuristic, not an optimal algorithm.
The problem of finding minimal consideration sets is NP-hard in general.
This implementation may:
- Produce larger-than-necessary consideration sets
- Fail to find a rationalizing set even when one exists
- Not guarantee the globally optimal solution
For optimal results on small instances, consider integer programming formulations.
Returns:
Tuple of (consideration_sets, is_rational, rationalizable_observations)
"""
n_obs = log.num_observations
# Build revealed preference from choices
# choice[t] is preferred to all unchosen items in menu[t]
revealed_pref: dict[int, set[int]] = {} # item -> items it's preferred to
for t, (menu, choice) in enumerate(zip(log.menus, log.choices)):
if choice not in revealed_pref:
revealed_pref[choice] = set()
for item in menu:
if item != choice:
revealed_pref[choice].add(item)
# For each observation, find minimal consideration set
consideration_sets = []
rationalizable = []
for t, (menu, choice) in enumerate(zip(log.menus, log.choices)):
# Consideration set must contain chosen item
consideration = {choice}
# Add items that must be considered for consistency
# An item x must be considered if:
# 1. x is in menu
# 2. x is revealed preferred to the choice (would cause violation)
for item in menu:
if item != choice:
# Check if item is revealed preferred to choice
if item in revealed_pref and choice in revealed_pref[item]:
# item > choice in revealed preference
# Must not consider item, or choice would be irrational
pass
else:
# Safe to not consider this item
pass
# Simple approach: consideration = items not strictly preferred to choice
for item in menu:
if item == choice:
continue
# Check if choosing 'choice' over 'item' is consistent
# with some preference ordering
# If 'item' is strictly revealed preferred to 'choice' elsewhere,
# we should not consider 'item' (to avoid violation)
item_preferred = item in revealed_pref and choice in revealed_pref[item]
if not item_preferred:
# Can safely consider this item
if max_size is None or len(consideration) < max_size:
consideration.add(item)
consideration_sets.append(consideration)
# Check if this observation is rationalizable
# Choice must be maximal in consideration set
is_rational_obs = True
for item in consideration:
if item != choice:
if item in revealed_pref and choice in revealed_pref[item]:
is_rational_obs = False
break
if is_rational_obs:
rationalizable.append(t)
is_fully_rational = len(rationalizable) == n_obs
return consideration_sets, is_fully_rational, rationalizable
def _estimate_salience_based_consideration(
log: "MenuChoiceLog",
salience_threshold: float = 0.1,
) -> list[set[int]]:
"""
Estimate consideration sets based on item salience.
Assumes more frequently chosen items are more salient.
Args:
log: MenuChoiceLog with menus and choices
salience_threshold: Minimum choice frequency to be considered salient (default 0.1)
"""
# Compute choice frequencies
choice_counts: dict[int, int] = {}
for choice in log.choices:
choice_counts[choice] = choice_counts.get(choice, 0) + 1
total_choices = len(log.choices)
consideration_sets = []
for t, (menu, choice) in enumerate(zip(log.menus, log.choices)):
# Always include chosen item
consideration = {choice}
# Include items with high choice frequency
for item in menu:
if item != choice:
freq = choice_counts.get(item, 0) / total_choices
if freq > salience_threshold:
consideration.add(item)
consideration_sets.append(consideration)
return consideration_sets
def _identify_default_option(
log: "MenuChoiceLog",
consideration_sets: list[set[int]],
) -> int | None:
"""
Identify if there's a default option (always considered).
"""
if not consideration_sets:
return None
# Find items that appear in all consideration sets
common_items = set(consideration_sets[0])
for cs in consideration_sets[1:]:
common_items &= cs
if len(common_items) == 1:
return list(common_items)[0]
return None
[docs]
def test_attention_filter(
log: "MenuChoiceLog",
filter_function: callable,
) -> dict:
"""
Test if choices are rational given a specific attention filter.
An attention filter specifies which items are considered at each
observation. This tests if choices are optimal within filtered menus.
Args:
log: MenuChoiceLog with menus and choices
filter_function: Function(menu, t) -> consideration_set
Returns:
Dictionary with test results
"""
violations = []
# Build revealed preference
revealed_pref: dict[int, set[int]] = {}
for t, (menu, choice) in enumerate(zip(log.menus, log.choices)):
consideration = filter_function(menu, t)
if choice not in consideration:
violations.append(t)
continue
if choice not in revealed_pref:
revealed_pref[choice] = set()
for item in consideration:
if item != choice:
revealed_pref[choice].add(item)
# Check for cycles in revealed preference
has_cycle = _has_preference_cycle(revealed_pref)
return {
"is_rational": len(violations) == 0 and not has_cycle,
"violations": violations,
"has_preference_cycle": has_cycle,
}
def _has_preference_cycle(revealed_pref: dict[int, set[int]]) -> bool:
"""
Check if revealed preference relation has a cycle using DFS.
"""
visited = set()
rec_stack = set()
def dfs(node: int) -> bool:
visited.add(node)
rec_stack.add(node)
for neighbor in revealed_pref.get(node, set()):
if neighbor not in visited:
if dfs(neighbor):
return True
elif neighbor in rec_stack:
return True
rec_stack.remove(node)
return False
for node in revealed_pref:
if node not in visited:
if dfs(node):
return True
return False
# =============================================================================
# LEGACY ALIASES
# =============================================================================
identify_attention = estimate_consideration_sets
"""Legacy alias: use estimate_consideration_sets instead."""
check_attention_rationality = test_attention_rationality
"""Legacy alias: use test_attention_rationality instead."""
# =============================================================================
# WARP(LA): WARP WITH LIMITED ATTENTION (Masatlioglu et al. 2012)
# =============================================================================
def test_warp_la(
log: "MenuChoiceLog",
) -> WARPLAResult:
"""
Test WARP with Limited Attention (Masatlioglu et al. 2012).
WARP(LA) is a weakening of WARP that characterizes Choice with Limited Attention (CLA).
A choice function satisfies WARP(LA) if and only if the revealed preference
relation P is acyclic, where:
xPy iff there exists T such that c(T) = x and c(T \\ y) != x
In other words, x is revealed preferred to y if removing y from some menu
changes the choice away from x.
If WARP(LA) is satisfied, behavior can be rationalized by preference maximization
within an attention filter (consideration set that is stable when removing
unattended items).
Args:
log: MenuChoiceLog with menus and choices
Returns:
WARPLAResult with consistency test, revealed preference, and recovered ordering
Example:
>>> from prefgraph import MenuChoiceLog, test_warp_la
>>> log = MenuChoiceLog(
... menus=[{0, 1, 2}, {0, 1}, {1, 2}, {0, 2}],
... choices=[0, 0, 1, 2] # Cyclical choice: c(xyz)=x, c(xy)=x, c(yz)=y, c(xz)=z
... )
>>> result = test_warp_la(log)
>>> print(result.satisfies_warp_la) # True - rationalizable with limited attention
>>> print(result.recovered_preference) # Recovered preference ordering
References:
Masatlioglu, Y., Nakajima, D., & Ozbay, E. Y. (2012). Revealed Attention.
American Economic Review, 102(5), 2183-2205.
"""
start_time = time.perf_counter()
n_obs = log.num_observations
all_items = sorted(log.all_items)
# Build menu-to-choice mapping
menu_to_choice: dict[frozenset[int], int] = {}
for menu, choice in zip(log.menus, log.choices):
menu_key = frozenset(menu)
menu_to_choice[menu_key] = choice
# Build revealed preference relation P
# xPy iff exists T such that c(T) = x != c(T \ y)
revealed_preference: list[tuple[int, int]] = []
revealed_pref_set: set[tuple[int, int]] = set()
for menu, choice in zip(log.menus, log.choices):
menu_set = frozenset(menu)
for y in menu:
if y == choice:
continue
# Check if c(menu \ y) != choice
submenu = menu_set - {y}
if submenu in menu_to_choice:
if menu_to_choice[submenu] != choice:
# x is revealed preferred to y
if (choice, y) not in revealed_pref_set:
revealed_preference.append((choice, y))
revealed_pref_set.add((choice, y))
# Compute transitive closure of P using Floyd-Warshall
transitive_closure = _compute_transitive_closure(revealed_pref_set, all_items)
# Check for cycles in P (acyclicity test)
violations = _find_preference_cycles(revealed_pref_set, all_items)
satisfies_warp_la = len(violations) == 0
# Recover preference ordering if consistent
recovered_preference = None
attention_filter = None
if satisfies_warp_la:
# Find a linear extension of the revealed preference
recovered_preference = _topological_sort(transitive_closure, all_items)
# Construct attention filter
attention_filter = _construct_attention_filter(
log, menu_to_choice, transitive_closure, recovered_preference
)
computation_time = (time.perf_counter() - start_time) * 1000
return WARPLAResult(
satisfies_warp_la=satisfies_warp_la,
revealed_preference=revealed_preference,
transitive_closure=list(transitive_closure),
attention_filter=attention_filter,
recovered_preference=recovered_preference,
violations=violations,
num_observations=n_obs,
computation_time_ms=computation_time,
)
def _compute_transitive_closure(
relation: set[tuple[int, int]],
items: list[int],
) -> set[tuple[int, int]]:
"""Compute transitive closure using Floyd-Warshall algorithm."""
# Build adjacency matrix
item_to_idx = {item: i for i, item in enumerate(items)}
n = len(items)
reachable = np.zeros((n, n), dtype=bool)
for x, y in relation:
if x in item_to_idx and y in item_to_idx:
reachable[item_to_idx[x], item_to_idx[y]] = True
# Floyd-Warshall
for k in range(n):
for i in range(n):
for j in range(n):
if reachable[i, k] and reachable[k, j]:
reachable[i, j] = True
# Extract transitive closure
closure = set()
for i in range(n):
for j in range(n):
if reachable[i, j]:
closure.add((items[i], items[j]))
return closure
def _find_preference_cycles(
relation: set[tuple[int, int]],
items: list[int],
) -> list[tuple[int, ...]]:
"""Find cycles in the revealed preference relation using DFS."""
# Build adjacency list
adj: dict[int, list[int]] = {item: [] for item in items}
for x, y in relation:
if x in adj:
adj[x].append(y)
cycles = []
visited = set()
rec_stack = set()
path = []
def dfs(node: int) -> bool:
visited.add(node)
rec_stack.add(node)
path.append(node)
for neighbor in adj.get(node, []):
if neighbor not in visited:
if dfs(neighbor):
return True
elif neighbor in rec_stack:
# Found a cycle - extract it
cycle_start = path.index(neighbor)
cycle = tuple(path[cycle_start:])
if len(cycle) > 1:
cycles.append(cycle)
return False # Continue searching for more cycles
path.pop()
rec_stack.remove(node)
return False
for item in items:
if item not in visited:
dfs(item)
return cycles
def _topological_sort(
transitive_closure: set[tuple[int, int]],
items: list[int],
) -> tuple[int, ...]:
"""Perform topological sort to find a compatible preference ordering."""
# Build in-degree count
in_degree = {item: 0 for item in items}
adj: dict[int, list[int]] = {item: [] for item in items}
for x, y in transitive_closure:
if x in adj and y in in_degree:
adj[x].append(y)
in_degree[y] += 1
# Kahn's algorithm
result = []
queue = [item for item in items if in_degree[item] == 0]
while queue:
# Sort for deterministic output
queue.sort(reverse=True)
node = queue.pop()
result.append(node)
for neighbor in adj[node]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
return tuple(result)
def _construct_attention_filter(
log: "MenuChoiceLog",
menu_to_choice: dict[frozenset[int], int],
transitive_closure: set[tuple[int, int]],
preference: tuple[int, ...],
) -> dict[frozenset[int], set[int]]:
"""
Construct an attention filter that rationalizes the choice data.
Uses the construction from Theorem 3 in Masatlioglu et al. (2012):
Gamma(S) = {x in S : c(S) is preferred to x} union {c(S)}
"""
pref_set = set(transitive_closure)
attention_filter: dict[frozenset[int], set[int]] = {}
for menu in log.menus:
menu_key = frozenset(menu)
if menu_key not in menu_to_choice:
continue
choice = menu_to_choice[menu_key]
# Gamma(S) = {c(S)} union {x in S : c(S) > x}
consideration = {choice}
for item in menu:
if item == choice:
continue
# Check if choice is preferred to item
if (choice, item) in pref_set:
consideration.add(item)
attention_filter[menu_key] = consideration
return attention_filter
def recover_preference_with_attention(
log: "MenuChoiceLog",
) -> tuple[tuple[int, ...] | None, dict[frozenset[int], set[int]] | None]:
"""
Recover preference ordering and attention filter from choice data.
If the data satisfies WARP(LA), returns a preference ordering and
attention filter that rationalize the choices. Otherwise returns None.
Args:
log: MenuChoiceLog with menus and choices
Returns:
Tuple of (preference_ordering, attention_filter) or (None, None)
Example:
>>> from prefgraph import MenuChoiceLog, recover_preference_with_attention
>>> log = MenuChoiceLog(menus=[...], choices=[...])
>>> pref, attn = recover_preference_with_attention(log)
>>> if pref is not None:
... print(f"Preference: {' > '.join(str(x) for x in pref)}")
"""
result = test_warp_la(log)
return result.recovered_preference, result.attention_filter
def validate_attention_filter_consistency(
log: "MenuChoiceLog",
attention_filter: dict[frozenset[int], set[int]],
) -> dict:
"""
Validate if given attention filter is consistent with a preference ordering.
Tests if there exists a preference ordering such that choices are optimal
within the attention filter.
Args:
log: MenuChoiceLog with menus and choices
attention_filter: Dict mapping menu (frozenset) to consideration set
Returns:
Dictionary with validation results
Example:
>>> log = MenuChoiceLog(menus=[...], choices=[...])
>>> filter = {frozenset({0,1,2}): {0,1}, frozenset({1,2}): {1}}
>>> result = validate_attention_filter_consistency(log, filter)
>>> print(result['is_valid'])
"""
# Build revealed preference from filtered choices
revealed_pref: set[tuple[int, int]] = set()
violations = []
for menu, choice in zip(log.menus, log.choices):
menu_key = frozenset(menu)
consideration = attention_filter.get(menu_key, set(menu))
# Choice must be in consideration set
if choice not in consideration:
violations.append(f"Choice {choice} not in consideration set for {menu}")
continue
# Choice must be preferred to all other considered items
for item in consideration:
if item != choice:
revealed_pref.add((choice, item))
# Check for cycles
all_items = sorted(log.all_items)
cycles = _find_preference_cycles(revealed_pref, all_items)
is_valid = len(violations) == 0 and len(cycles) == 0
return {
"is_valid": is_valid,
"violations": violations,
"preference_cycles": cycles,
"revealed_preference": list(revealed_pref),
}
# =============================================================================
# RANDOM ATTENTION MODEL (Cattaneo et al. 2020)
# =============================================================================
def fit_random_attention_model(
log: "StochasticChoiceLog",
assumption: str = "monotonic",
alpha: float = 0.05,
n_bootstrap: int = 1000,
) -> RandomAttentionResult:
"""
Fit Random Attention Model (RAM) to stochastic choice data.
RAM assumes a fixed preference ordering and random attention:
P(choose x from S) = P(x most preferred among considered items)
Implements the method from Cattaneo et al. (2020) "The Random Attention Model".
Args:
log: StochasticChoiceLog with choice frequency data
assumption: RAM variant ("monotonic", "independent", "general")
- "monotonic": attention probability weakly increases with preference rank
- "independent": attention probabilities independent across items
- "general": minimal restrictions on attention
alpha: Significance level for consistency test
n_bootstrap: Number of bootstrap samples for p-value computation
Returns:
RandomAttentionResult with consistency test, preference, and attention scores
Example:
>>> from prefgraph import StochasticChoiceLog, fit_random_attention_model
>>> log = StochasticChoiceLog(
... menus=[{0,1,2}, {0,1}],
... choice_frequencies=[{0: 50, 1: 30, 2: 20}, {0: 60, 1: 40}],
... total_observations_per_menu=[100, 100]
... )
>>> result = fit_random_attention_model(log)
>>> print(f"RAM consistent: {result.is_ram_consistent}")
>>> print(f"Estimated preference: {result.preference_ranking}")
References:
Cattaneo, M. D., Ma, X., Masatlioglu, Y., & Suleymanov, E. (2020).
A Random Attention Model. Journal of Political Economy, 128(7).
"""
start_time = time.perf_counter()
n_obs = sum(log.total_observations_per_menu)
all_items = sorted(log.all_items)
n_items = len(all_items)
# Find compatible preference orderings
compatible_prefs = _find_ram_compatible_preferences(log, assumption)
# If no compatible preferences, RAM is not consistent
if not compatible_prefs:
computation_time = (time.perf_counter() - start_time) * 1000
return RandomAttentionResult(
is_ram_consistent=False,
preference_ranking=None,
attention_bounds={},
item_attention_scores=np.zeros(n_items),
test_statistic=1.0,
p_value=0.0,
compatible_preferences=[],
assumption=assumption,
num_observations=n_obs,
computation_time_ms=computation_time,
)
# Use first compatible preference for attention estimation
best_pref = compatible_prefs[0]
# Estimate attention probabilities
attention_bounds, item_scores = _estimate_ram_attention(log, best_pref, assumption)
# Compute test statistic (distance to RAM)
test_stat = _compute_ram_test_statistic(log, best_pref, assumption)
# Bootstrap p-value computation
p_value = _bootstrap_ram_pvalue(log, test_stat, assumption, n_bootstrap)
is_consistent = test_stat < 1e-6 or p_value > alpha
computation_time = (time.perf_counter() - start_time) * 1000
return RandomAttentionResult(
is_ram_consistent=is_consistent,
preference_ranking=best_pref,
attention_bounds=attention_bounds,
item_attention_scores=item_scores,
test_statistic=test_stat,
p_value=p_value,
compatible_preferences=compatible_prefs,
assumption=assumption,
num_observations=n_obs,
computation_time_ms=computation_time,
)
def _find_ram_compatible_preferences(
log: "StochasticChoiceLog",
assumption: str,
) -> list[tuple[int, ...]]:
"""Find preference orderings compatible with RAM constraints."""
all_items = sorted(log.all_items)
n_items = len(all_items)
# For small n, enumerate all orderings
if n_items <= 6:
compatible = []
for perm in permutations(all_items):
if _is_ram_compatible(log, perm, assumption):
compatible.append(perm)
return compatible
# For larger n, exact enumeration is computationally infeasible
raise ComputationalLimitError(
f"RAM search requires factorial enumeration which is infeasible for n={n_items} items. "
f"Maximum supported is 6 items (6! = 720 orderings)."
)
def _is_ram_compatible(
log: "StochasticChoiceLog",
preference: tuple[int, ...],
assumption: str,
) -> bool:
"""Check if preference is compatible with RAM constraints."""
# Build rank mapping
rank = {item: i for i, item in enumerate(preference)}
# For each menu, check RAM constraints
for m_idx in range(log.num_menus):
menu = log.menus[m_idx]
total = log.total_observations_per_menu[m_idx]
if total == 0:
continue
# Get choice probabilities
probs = {}
for item in menu:
probs[item] = log.get_choice_probability(m_idx, item)
# Check RAM constraint: for items in menu, higher ranked items
# should have higher choice probability conditional on being considered
menu_list = sorted(menu, key=lambda x: rank.get(x, len(preference)))
for i, item_i in enumerate(menu_list):
for j, item_j in enumerate(menu_list):
if i >= j:
continue
# item_i is ranked higher than item_j
# Under RAM, P(choose item_i | S) >= P(choose item_j | S) * something
# This is a necessary but not sufficient condition
if probs.get(item_i, 0) == 0 and probs.get(item_j, 0) > 0:
# Higher ranked item never chosen, lower ranked chosen
# This violates RAM under monotonic attention
if assumption == "monotonic":
return False
# More rigorous check using LP
return _check_ram_feasibility_lp(log, preference, assumption)
def _check_ram_feasibility_lp(
log: "StochasticChoiceLog",
preference: tuple[int, ...],
assumption: str,
) -> bool:
"""Check RAM feasibility using linear programming."""
all_items = sorted(log.all_items)
n_items = len(all_items)
rank = {item: i for i, item in enumerate(preference)}
item_to_idx = {item: i for i, item in enumerate(all_items)}
# Decision variables: attention probabilities mu_i for each item
# Variables: mu_0, mu_1, ..., mu_{n-1}
n_vars = n_items
# Build constraints from RAM model
# P(choose x | S) = mu_x * prod_{y in S, y > x}(1 - mu_y)
# These are non-linear, so we use bounds approach
# Use simple feasibility check based on necessary conditions
c = np.zeros(n_vars) # Dummy objective
A_ub = []
b_ub = []
# Attention probabilities must be in [0, 1]
bounds = [(0.0, 1.0) for _ in range(n_vars)]
# For monotonic assumption: higher ranked items have higher attention
if assumption == "monotonic":
for i in range(n_items - 1):
# mu_i >= mu_{i+1} for preference order
row = np.zeros(n_vars)
pref_item_i = preference[i]
pref_item_j = preference[i + 1]
row[item_to_idx[pref_item_i]] = -1
row[item_to_idx[pref_item_j]] = 1
A_ub.append(row)
b_ub.append(0.0)
if len(A_ub) == 0:
return True # No constraints to violate
A_ub = np.array(A_ub)
b_ub = np.array(b_ub)
try:
result = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs')
return result.success
except Exception as e:
from prefgraph.core.exceptions import SolverError
raise SolverError(
f"LP solver failed checking RAM feasibility. Original error: {e}"
) from e
def _estimate_ram_attention(
log: "StochasticChoiceLog",
preference: tuple[int, ...],
assumption: str,
) -> tuple[dict[tuple[frozenset[int], int], tuple[float, float]], NDArray[np.float64]]:
"""Estimate attention probabilities under RAM."""
all_items = sorted(log.all_items)
n_items = len(all_items)
rank = {item: i for i, item in enumerate(preference)}
# Simple estimation: use choice frequencies as attention proxy
choice_counts = np.zeros(n_items)
appearance_counts = np.zeros(n_items)
for m_idx in range(log.num_menus):
menu = log.menus[m_idx]
freqs = log.choice_frequencies[m_idx]
total = log.total_observations_per_menu[m_idx]
for item in menu:
idx = all_items.index(item)
appearance_counts[idx] += total
choice_counts[idx] += freqs.get(item, 0)
# Estimate attention as choice probability adjusted for preference
item_scores = np.zeros(n_items)
for i, item in enumerate(all_items):
if appearance_counts[i] > 0:
item_scores[i] = min(1.0, choice_counts[i] / appearance_counts[i] * 2)
# Compute bounds for each (menu, item) pair
attention_bounds: dict[tuple[frozenset[int], int], tuple[float, float]] = {}
for m_idx in range(log.num_menus):
menu = log.menus[m_idx]
menu_key = frozenset(menu)
for item in menu:
idx = all_items.index(item)
# Simple bounds: [0, 1]
lower = max(0.0, item_scores[idx] - 0.2)
upper = min(1.0, item_scores[idx] + 0.2)
attention_bounds[(menu_key, item)] = (lower, upper)
return attention_bounds, item_scores
def _compute_ram_test_statistic(
log: "StochasticChoiceLog",
preference: tuple[int, ...],
assumption: str,
) -> float:
"""Compute test statistic for RAM consistency."""
# Simple test: sum of squared deviations from RAM predictions
rank = {item: i for i, item in enumerate(preference)}
total_deviation = 0.0
n_comparisons = 0
for m_idx in range(log.num_menus):
menu = list(log.menus[m_idx])
total = log.total_observations_per_menu[m_idx]
if total == 0:
continue
# Sort menu by preference
menu_sorted = sorted(menu, key=lambda x: rank.get(x, len(preference)))
# Check monotonicity: higher ranked items should be chosen more often
for i in range(len(menu_sorted) - 1):
item_i = menu_sorted[i]
item_j = menu_sorted[i + 1]
p_i = log.get_choice_probability(m_idx, item_i)
p_j = log.get_choice_probability(m_idx, item_j)
# Under RAM with monotonic attention, P(i) >= P(j) * some_factor
# Use simple violation measure
if p_i < p_j:
total_deviation += (p_j - p_i) ** 2
n_comparisons += 1
if n_comparisons == 0:
return 0.0
return total_deviation / n_comparisons
def _bootstrap_ram_pvalue(
log: "StochasticChoiceLog",
observed_stat: float,
assumption: str,
n_bootstrap: int,
) -> float:
"""Compute p-value using bootstrap."""
if n_bootstrap == 0:
return 0.5 # No bootstrap, return neutral p-value
all_items = sorted(log.all_items)
n_items = len(all_items)
# Generate bootstrap samples and compute test statistics
bootstrap_stats = []
for _ in range(min(n_bootstrap, 100)): # Limit for speed
# Generate random preference
perm = tuple(np.random.permutation(all_items).tolist())
stat = _compute_ram_test_statistic(log, perm, assumption)
bootstrap_stats.append(stat)
if not bootstrap_stats:
return 0.5
# P-value: proportion of bootstrap stats >= observed
p_value = np.mean([s >= observed_stat for s in bootstrap_stats])
return float(p_value)
def test_ram_consistency(
log: "StochasticChoiceLog",
preference: tuple[int, ...] | None = None,
alpha: float = 0.05,
) -> RandomAttentionResult:
"""
Test if stochastic choice data is consistent with Random Attention Model.
Args:
log: StochasticChoiceLog with choice frequency data
preference: Optional fixed preference ordering to test
alpha: Significance level
Returns:
RandomAttentionResult with test results
"""
return fit_random_attention_model(log, assumption="monotonic", alpha=alpha)
def estimate_attention_probabilities(
log: "StochasticChoiceLog",
preference: tuple[int, ...],
) -> NDArray[np.float64]:
"""
Estimate item attention probabilities given a preference ordering.
Args:
log: StochasticChoiceLog with choice data
preference: Fixed preference ordering (best to worst)
Returns:
Array of attention probabilities per item
"""
_, item_scores = _estimate_ram_attention(log, preference, "monotonic")
return item_scores
def compute_attention_bounds(
log: "StochasticChoiceLog",
preference: tuple[int, ...],
item: int,
menu: frozenset[int],
) -> tuple[float, float]:
"""
Compute bounds on attention probability for item in menu.
Args:
log: StochasticChoiceLog with choice data
preference: Fixed preference ordering
item: Item to compute bounds for
menu: Menu containing the item
Returns:
Tuple of (lower_bound, upper_bound) for attention probability
"""
bounds, _ = _estimate_ram_attention(log, preference, "monotonic")
return bounds.get((menu, item), (0.0, 1.0))
# =============================================================================
# ATTENTION OVERLOAD (Lleras et al. 2017)
# =============================================================================
def test_attention_overload(
log: "MenuChoiceLog",
quality_metric: str = "consistency",
) -> AttentionOverloadResult:
"""
Test for attention overload in menu choices (Lleras et al. 2017).
Attention overload occurs when choice quality degrades as menu size
increases. This is the "paradox of choice" - too many options can
harm decision quality.
Args:
log: MenuChoiceLog with menus and choices
quality_metric: How to measure quality per menu size:
- "consistency": SARP consistency rate (default)
- "frequency": Probability of choosing high-frequency items
Returns:
AttentionOverloadResult with overload detection and analysis
Example:
>>> from prefgraph import MenuChoiceLog, test_attention_overload
>>> log = MenuChoiceLog(menus=[...], choices=[...])
>>> result = test_attention_overload(log)
>>> if result.has_overload:
... print(f"Reduce menu size below {result.critical_menu_size}")
References:
Lleras, J. S., Masatlioglu, Y., Nakajima, D., & Ozbay, E. Y. (2017).
When More is Less: Limited Consideration. Working Paper.
"""
start_time = time.perf_counter()
n_obs = log.num_observations
# Group observations by menu size
size_groups: dict[int, list[int]] = {}
for t, menu in enumerate(log.menus):
size = len(menu)
if size not in size_groups:
size_groups[size] = []
size_groups[size].append(t)
# Compute quality for each menu size
menu_size_quality: dict[int, float] = {}
if quality_metric == "consistency":
# Quality = rate of choices consistent with SARP
from prefgraph.algorithms.abstract_choice import validate_menu_sarp
sarp_result = validate_menu_sarp(log)
# For each menu size, compute fraction of consistent observations
for size, obs_indices in size_groups.items():
if not obs_indices:
continue
# Simple heuristic: use the revealed preference structure
# Count observations where the choice is optimal
consistent_count = 0
for t in obs_indices:
# If SARP is satisfied overall, all are consistent
if sarp_result.is_consistent:
consistent_count += 1
else:
# Check if this observation participates in violations
is_violated = any(
t in cycle for cycle in sarp_result.violations
) if hasattr(sarp_result, 'violations') else False
if not is_violated:
consistent_count += 1
menu_size_quality[size] = consistent_count / len(obs_indices)
else:
# Quality = how often high-frequency items are chosen
choice_counts: dict[int, int] = {}
for choice in log.choices:
choice_counts[choice] = choice_counts.get(choice, 0) + 1
for size, obs_indices in size_groups.items():
if not obs_indices:
continue
# Quality = avg normalized choice frequency
total_quality = 0.0
max_count = max(choice_counts.values()) if choice_counts else 1
for t in obs_indices:
choice = log.choices[t]
total_quality += choice_counts.get(choice, 0) / max_count
menu_size_quality[size] = total_quality / len(obs_indices)
# Regress quality ~ log(menu_size)
if len(menu_size_quality) >= 2:
sizes = np.array(sorted(menu_size_quality.keys()))
qualities = np.array([menu_size_quality[s] for s in sizes])
# Log transform of sizes
log_sizes = np.log(sizes + 1)
# Simple linear regression
n = len(sizes)
mean_x = np.mean(log_sizes)
mean_y = np.mean(qualities)
numerator = np.sum((log_sizes - mean_x) * (qualities - mean_y))
denominator = np.sum((log_sizes - mean_x) ** 2)
if denominator > 1e-10:
slope = numerator / denominator
intercept = mean_y - slope * mean_x
# Compute residuals and p-value (simplified)
y_pred = slope * log_sizes + intercept
ss_res = np.sum((qualities - y_pred) ** 2)
ss_tot = np.sum((qualities - mean_y) ** 2)
if ss_tot > 1e-10:
r_squared = 1 - ss_res / ss_tot
else:
r_squared = 0.0
# Simplified p-value estimate based on sample size and r_squared
# Using approximation: higher r_squared and more data = lower p-value
if n >= 3 and r_squared > 0.1:
p_value = max(0.001, (1 - r_squared) / (n - 1))
else:
p_value = 1.0
else:
slope = 0.0
p_value = 1.0
else:
slope = 0.0
p_value = 1.0
# Detect overload
has_overload = slope < -0.05 and p_value < 0.1
# Find critical menu size (where quality drops below mean)
critical_menu_size = None
if has_overload and menu_size_quality:
mean_quality = np.mean(list(menu_size_quality.values()))
for size in sorted(menu_size_quality.keys()):
if menu_size_quality[size] < mean_quality:
critical_menu_size = size
break
# Overload severity (0-1)
if has_overload:
overload_severity = min(1.0, abs(slope) * 2) # Scale slope to 0-1
else:
overload_severity = 0.0
computation_time = (time.perf_counter() - start_time) * 1000
return AttentionOverloadResult(
has_overload=has_overload,
critical_menu_size=critical_menu_size,
overload_severity=overload_severity,
menu_size_quality=menu_size_quality,
regression_slope=slope,
p_value=p_value,
num_observations=n_obs,
computation_time_ms=computation_time,
)
# =============================================================================
# STATUS QUO BIAS (Masatlioglu & Ok 2005)
# =============================================================================
def test_status_quo_bias(
log: "MenuChoiceLog",
defaults: list[int] | None = None,
) -> StatusQuoBiasResult:
"""
Test for status quo bias in menu choices (Masatlioglu & Ok 2005).
Status quo bias occurs when default options are chosen at higher rates
than preference alone would predict.
Args:
log: MenuChoiceLog with menus and choices
defaults: Default item per menu (optional). If None, auto-detected
as the most common choice in similar menus.
Returns:
StatusQuoBiasResult with bias detection and analysis
Example:
>>> from prefgraph import MenuChoiceLog, test_status_quo_bias
>>> log = MenuChoiceLog(
... menus=[{0, 1, 2}, {0, 1, 2}, {0, 1, 2}],
... choices=[0, 0, 1]
... )
>>> result = test_status_quo_bias(log, defaults=[0, 0, 0])
>>> if result.has_status_quo_bias:
... print(f"Default advantage: {result.default_advantage:.2%}")
References:
Masatlioglu, Y., & Ok, E. A. (2005). Rational choice with status quo bias.
Journal of Economic Theory, 121(1), 1-29.
"""
start_time = time.perf_counter()
n_obs = log.num_observations
# Auto-detect defaults if not provided
if defaults is None:
# Use first item in each menu as default (common UI pattern)
defaults = []
for menu in log.menus:
if menu:
defaults.append(min(menu)) # First item by index
else:
defaults.append(-1) # No default
# Ensure defaults list matches observations
if len(defaults) != n_obs:
defaults = defaults[:n_obs] + [-1] * (n_obs - len(defaults))
# Count default choices
num_defaults = 0
default_chosen = 0
non_default_chosen = 0
# Track bias by item
item_default_counts: dict[int, int] = {} # Times as default
item_chosen_as_default: dict[int, int] = {} # Times chosen when default
for t, (menu, choice) in enumerate(zip(log.menus, log.choices)):
default = defaults[t]
if default < 0 or default not in menu:
continue
num_defaults += 1
if default not in item_default_counts:
item_default_counts[default] = 0
item_chosen_as_default[default] = 0
item_default_counts[default] += 1
if choice == default:
default_chosen += 1
item_chosen_as_default[default] += 1
else:
non_default_chosen += 1
# Compute expected choice rate without bias (uniform assumption)
expected_rate = 0.0
if num_defaults > 0:
total_menu_sizes = 0
for t, menu in enumerate(log.menus):
if defaults[t] in menu:
total_menu_sizes += len(menu)
if total_menu_sizes > 0:
expected_rate = num_defaults / total_menu_sizes
# Actual default choice rate
actual_rate = default_chosen / num_defaults if num_defaults > 0 else 0.0
# Default advantage
default_advantage = max(0.0, actual_rate - expected_rate)
# Bias by item
bias_by_item: dict[int, float] = {}
for item in item_default_counts:
if item_default_counts[item] > 0:
item_rate = item_chosen_as_default[item] / item_default_counts[item]
bias_by_item[item] = item_rate - expected_rate
# Simple significance test
# Under null hypothesis of no bias, default choice follows binomial
# p-value approximation
if num_defaults >= 10 and expected_rate > 0:
# Z-test approximation
se = np.sqrt(expected_rate * (1 - expected_rate) / num_defaults)
if se > 0:
z_stat = (actual_rate - expected_rate) / se
# One-sided p-value (testing for positive bias)
from scipy import stats
try:
p_value = 1 - stats.norm.cdf(z_stat)
except Exception as e:
raise StatisticalError(
f"Failed to compute p-value for status quo bias test. Original error: {e}"
) from e
else:
p_value = 1.0
else:
p_value = 1.0
has_status_quo_bias = default_advantage > 0.05 and p_value < 0.1
computation_time = (time.perf_counter() - start_time) * 1000
return StatusQuoBiasResult(
has_status_quo_bias=has_status_quo_bias,
default_advantage=default_advantage,
bias_by_item=bias_by_item,
p_value=p_value,
num_defaults=num_defaults,
num_observations=n_obs,
computation_time_ms=computation_time,
)
# =============================================================================
# LEGACY ALIASES FOR WARP(LA)
# =============================================================================
check_warp_la = test_warp_la
"""Legacy alias: use test_warp_la instead."""
validate_attention_filter = validate_attention_filter_consistency
"""Legacy alias: use validate_attention_filter_consistency instead."""