Source code for prefgraph.algorithms.attention

"""Limited attention models for choice analysis.

Implements consideration set models where consumers don't see all options.
Based on Chapter 14 of Chambers & Echenique (2016) "Revealed Preference Theory"
and Masatlioglu, Nakajima & Ozbay (2012) "Revealed Attention" (AER).

Key insight: apparent irrationality may be due to limited attention rather
than inconsistent preferences. A choice is "attention-rational" if it's
optimal among the items actually considered.

Tech-Friendly Names (Primary):
    - test_warp_la(): Test WARP with Limited Attention (Masatlioglu et al. 2012)
    - estimate_consideration_sets(): Estimate which items are considered
    - test_attention_rationality(): Test rationalizability with limited attention
    - compute_salience_weights(): Estimate feature-based attention weights
    - fit_random_attention_model(): Fit Random Attention Model (Cattaneo et al. 2020)

Economics Names (Legacy Aliases):
    - identify_attention() -> estimate_consideration_sets()
    - check_attention_rationality() -> test_attention_rationality()
    - check_warp_la() -> test_warp_la()
"""

from __future__ import annotations

import time
from itertools import permutations
from typing import TYPE_CHECKING

import numpy as np
from numpy.typing import NDArray
from scipy.optimize import linprog

from prefgraph.core.result import (
    AttentionResult,
    WARPLAResult,
    RandomAttentionResult,
    AttentionOverloadResult,
    StatusQuoBiasResult,
)
from prefgraph.core.exceptions import (
    ComputationalLimitError,
    StatisticalError,
    DataValidationError,
)

if TYPE_CHECKING:
    from prefgraph.core.session import MenuChoiceLog, StochasticChoiceLog


[docs] def test_attention_rationality( log: "MenuChoiceLog", max_consideration_size: int | None = None, ) -> AttentionResult: """ Test if choices are rationalizable with limited attention. A choice is attention-rational if there exists: 1. A preference ordering over items 2. A consideration set function (what items are noticed) Such that each choice is optimal among considered items. This is a weaker notion than standard rationality - it allows apparent violations due to limited attention. Args: log: MenuChoiceLog with menus and choices max_consideration_size: Maximum consideration set size (None = no limit) Returns: AttentionResult with consideration sets and attention analysis Example: >>> from prefgraph import MenuChoiceLog, test_attention_rationality >>> result = test_attention_rationality(choice_log) >>> if result.is_attention_rational: ... print("Choices are rationalizable with limited attention") ... print(f"Avg consideration set size: {result.mean_consideration_size:.1f}") Note: **Complexity**: This function calls validate_menu_sarp internally, which uses Floyd-Warshall with O(I³) complexity where I is the number of unique items. For large item sets, this can be slow. References: Chambers & Echenique (2016), Chapter 14 Manzini, P. & Mariotti, M. (2014). "Stochastic Choice and Consideration Sets" """ start_time = time.perf_counter() n_obs = log.num_observations # First check standard SARP consistency from prefgraph.algorithms.abstract_choice import validate_menu_sarp sarp_result = validate_menu_sarp(log) if sarp_result.is_consistent: # Already rational without limited attention # Consideration sets are full menus consideration_sets = [set(menu) for menu in log.menus] computation_time = (time.perf_counter() - start_time) * 1000 return AttentionResult( consideration_sets=consideration_sets, attention_parameter=1.0, is_attention_rational=True, salience_weights=np.ones(max(log.all_items) + 1), default_option=None, inattention_rate=0.0, rationalizable_observations=list(range(n_obs)), computation_time_ms=computation_time, ) # Try to find consideration sets that rationalize the data consideration_sets, is_rational, rationalizable_obs = _find_consideration_sets( log, max_consideration_size ) # Compute attention parameter (average consideration set size / menu size) total_considered = sum(len(cs) for cs in consideration_sets) total_available = sum(len(menu) for menu in log.menus) attention_parameter = total_considered / max(total_available, 1) # Estimate salience weights salience_weights = compute_salience_weights(log, consideration_sets) # Identify default option (if any) default_option = _identify_default_option(log, consideration_sets) # Inattention rate inattention_obs = [ t for t in range(n_obs) if len(consideration_sets[t]) < len(log.menus[t]) ] inattention_rate = len(inattention_obs) / n_obs if n_obs > 0 else 0.0 computation_time = (time.perf_counter() - start_time) * 1000 return AttentionResult( consideration_sets=consideration_sets, attention_parameter=attention_parameter, is_attention_rational=is_rational, salience_weights=salience_weights, default_option=default_option, inattention_rate=inattention_rate, rationalizable_observations=rationalizable_obs, computation_time_ms=computation_time, )
[docs] def estimate_consideration_sets( log: "MenuChoiceLog", method: str = "greedy", ) -> list[set[int]]: """ Estimate consideration sets for each observation. The consideration set is the subset of menu items that the consumer actually notices/considers before making a choice. Args: log: MenuChoiceLog with menus and choices method: Estimation method ("greedy", "optimal", "salience") Returns: List of consideration sets, one per observation Note: The chosen item is always in the consideration set. """ if method == "greedy": consideration_sets, _, _ = _find_consideration_sets(log, None) elif method == "salience": consideration_sets = _estimate_salience_based_consideration(log) else: consideration_sets, _, _ = _find_consideration_sets(log, None) return consideration_sets
[docs] def compute_salience_weights( log: "MenuChoiceLog", consideration_sets: list[set[int]] | None = None, ) -> NDArray[np.float64]: """ Compute salience weights for each item. Higher weight = more likely to be noticed/considered. Estimated from frequency of appearing in consideration sets. Args: log: MenuChoiceLog with menus and choices consideration_sets: Optional pre-computed consideration sets Returns: Array of salience weights (one per item) """ if consideration_sets is None: consideration_sets = estimate_consideration_sets(log) max_item = max(log.all_items) weights = np.zeros(max_item + 1) counts = np.zeros(max_item + 1) for t, (menu, cs) in enumerate(zip(log.menus, consideration_sets)): for item in menu: counts[item] += 1 if item in cs: weights[item] += 1 # Normalize to get probability of consideration for i in range(len(weights)): if counts[i] > 0: weights[i] = weights[i] / counts[i] return weights
def _find_consideration_sets( log: "MenuChoiceLog", max_size: int | None, ) -> tuple[list[set[int]], bool, list[int]]: """ Find consideration sets that rationalize the data. Uses a greedy heuristic algorithm: 1. Start with consideration = {chosen item} 2. Add items needed to maintain preference consistency **Algorithmic Limitation**: This is a greedy heuristic, not an optimal algorithm. The problem of finding minimal consideration sets is NP-hard in general. This implementation may: - Produce larger-than-necessary consideration sets - Fail to find a rationalizing set even when one exists - Not guarantee the globally optimal solution For optimal results on small instances, consider integer programming formulations. Returns: Tuple of (consideration_sets, is_rational, rationalizable_observations) """ n_obs = log.num_observations # Build revealed preference from choices # choice[t] is preferred to all unchosen items in menu[t] revealed_pref: dict[int, set[int]] = {} # item -> items it's preferred to for t, (menu, choice) in enumerate(zip(log.menus, log.choices)): if choice not in revealed_pref: revealed_pref[choice] = set() for item in menu: if item != choice: revealed_pref[choice].add(item) # For each observation, find minimal consideration set consideration_sets = [] rationalizable = [] for t, (menu, choice) in enumerate(zip(log.menus, log.choices)): # Consideration set must contain chosen item consideration = {choice} # Add items that must be considered for consistency # An item x must be considered if: # 1. x is in menu # 2. x is revealed preferred to the choice (would cause violation) for item in menu: if item != choice: # Check if item is revealed preferred to choice if item in revealed_pref and choice in revealed_pref[item]: # item > choice in revealed preference # Must not consider item, or choice would be irrational pass else: # Safe to not consider this item pass # Simple approach: consideration = items not strictly preferred to choice for item in menu: if item == choice: continue # Check if choosing 'choice' over 'item' is consistent # with some preference ordering # If 'item' is strictly revealed preferred to 'choice' elsewhere, # we should not consider 'item' (to avoid violation) item_preferred = item in revealed_pref and choice in revealed_pref[item] if not item_preferred: # Can safely consider this item if max_size is None or len(consideration) < max_size: consideration.add(item) consideration_sets.append(consideration) # Check if this observation is rationalizable # Choice must be maximal in consideration set is_rational_obs = True for item in consideration: if item != choice: if item in revealed_pref and choice in revealed_pref[item]: is_rational_obs = False break if is_rational_obs: rationalizable.append(t) is_fully_rational = len(rationalizable) == n_obs return consideration_sets, is_fully_rational, rationalizable def _estimate_salience_based_consideration( log: "MenuChoiceLog", salience_threshold: float = 0.1, ) -> list[set[int]]: """ Estimate consideration sets based on item salience. Assumes more frequently chosen items are more salient. Args: log: MenuChoiceLog with menus and choices salience_threshold: Minimum choice frequency to be considered salient (default 0.1) """ # Compute choice frequencies choice_counts: dict[int, int] = {} for choice in log.choices: choice_counts[choice] = choice_counts.get(choice, 0) + 1 total_choices = len(log.choices) consideration_sets = [] for t, (menu, choice) in enumerate(zip(log.menus, log.choices)): # Always include chosen item consideration = {choice} # Include items with high choice frequency for item in menu: if item != choice: freq = choice_counts.get(item, 0) / total_choices if freq > salience_threshold: consideration.add(item) consideration_sets.append(consideration) return consideration_sets def _identify_default_option( log: "MenuChoiceLog", consideration_sets: list[set[int]], ) -> int | None: """ Identify if there's a default option (always considered). """ if not consideration_sets: return None # Find items that appear in all consideration sets common_items = set(consideration_sets[0]) for cs in consideration_sets[1:]: common_items &= cs if len(common_items) == 1: return list(common_items)[0] return None
[docs] def test_attention_filter( log: "MenuChoiceLog", filter_function: callable, ) -> dict: """ Test if choices are rational given a specific attention filter. An attention filter specifies which items are considered at each observation. This tests if choices are optimal within filtered menus. Args: log: MenuChoiceLog with menus and choices filter_function: Function(menu, t) -> consideration_set Returns: Dictionary with test results """ violations = [] # Build revealed preference revealed_pref: dict[int, set[int]] = {} for t, (menu, choice) in enumerate(zip(log.menus, log.choices)): consideration = filter_function(menu, t) if choice not in consideration: violations.append(t) continue if choice not in revealed_pref: revealed_pref[choice] = set() for item in consideration: if item != choice: revealed_pref[choice].add(item) # Check for cycles in revealed preference has_cycle = _has_preference_cycle(revealed_pref) return { "is_rational": len(violations) == 0 and not has_cycle, "violations": violations, "has_preference_cycle": has_cycle, }
def _has_preference_cycle(revealed_pref: dict[int, set[int]]) -> bool: """ Check if revealed preference relation has a cycle using DFS. """ visited = set() rec_stack = set() def dfs(node: int) -> bool: visited.add(node) rec_stack.add(node) for neighbor in revealed_pref.get(node, set()): if neighbor not in visited: if dfs(neighbor): return True elif neighbor in rec_stack: return True rec_stack.remove(node) return False for node in revealed_pref: if node not in visited: if dfs(node): return True return False # ============================================================================= # LEGACY ALIASES # ============================================================================= identify_attention = estimate_consideration_sets """Legacy alias: use estimate_consideration_sets instead.""" check_attention_rationality = test_attention_rationality """Legacy alias: use test_attention_rationality instead.""" # ============================================================================= # WARP(LA): WARP WITH LIMITED ATTENTION (Masatlioglu et al. 2012) # ============================================================================= def test_warp_la( log: "MenuChoiceLog", ) -> WARPLAResult: """ Test WARP with Limited Attention (Masatlioglu et al. 2012). WARP(LA) is a weakening of WARP that characterizes Choice with Limited Attention (CLA). A choice function satisfies WARP(LA) if and only if the revealed preference relation P is acyclic, where: xPy iff there exists T such that c(T) = x and c(T \\ y) != x In other words, x is revealed preferred to y if removing y from some menu changes the choice away from x. If WARP(LA) is satisfied, behavior can be rationalized by preference maximization within an attention filter (consideration set that is stable when removing unattended items). Args: log: MenuChoiceLog with menus and choices Returns: WARPLAResult with consistency test, revealed preference, and recovered ordering Example: >>> from prefgraph import MenuChoiceLog, test_warp_la >>> log = MenuChoiceLog( ... menus=[{0, 1, 2}, {0, 1}, {1, 2}, {0, 2}], ... choices=[0, 0, 1, 2] # Cyclical choice: c(xyz)=x, c(xy)=x, c(yz)=y, c(xz)=z ... ) >>> result = test_warp_la(log) >>> print(result.satisfies_warp_la) # True - rationalizable with limited attention >>> print(result.recovered_preference) # Recovered preference ordering References: Masatlioglu, Y., Nakajima, D., & Ozbay, E. Y. (2012). Revealed Attention. American Economic Review, 102(5), 2183-2205. """ start_time = time.perf_counter() n_obs = log.num_observations all_items = sorted(log.all_items) # Build menu-to-choice mapping menu_to_choice: dict[frozenset[int], int] = {} for menu, choice in zip(log.menus, log.choices): menu_key = frozenset(menu) menu_to_choice[menu_key] = choice # Build revealed preference relation P # xPy iff exists T such that c(T) = x != c(T \ y) revealed_preference: list[tuple[int, int]] = [] revealed_pref_set: set[tuple[int, int]] = set() for menu, choice in zip(log.menus, log.choices): menu_set = frozenset(menu) for y in menu: if y == choice: continue # Check if c(menu \ y) != choice submenu = menu_set - {y} if submenu in menu_to_choice: if menu_to_choice[submenu] != choice: # x is revealed preferred to y if (choice, y) not in revealed_pref_set: revealed_preference.append((choice, y)) revealed_pref_set.add((choice, y)) # Compute transitive closure of P using Floyd-Warshall transitive_closure = _compute_transitive_closure(revealed_pref_set, all_items) # Check for cycles in P (acyclicity test) violations = _find_preference_cycles(revealed_pref_set, all_items) satisfies_warp_la = len(violations) == 0 # Recover preference ordering if consistent recovered_preference = None attention_filter = None if satisfies_warp_la: # Find a linear extension of the revealed preference recovered_preference = _topological_sort(transitive_closure, all_items) # Construct attention filter attention_filter = _construct_attention_filter( log, menu_to_choice, transitive_closure, recovered_preference ) computation_time = (time.perf_counter() - start_time) * 1000 return WARPLAResult( satisfies_warp_la=satisfies_warp_la, revealed_preference=revealed_preference, transitive_closure=list(transitive_closure), attention_filter=attention_filter, recovered_preference=recovered_preference, violations=violations, num_observations=n_obs, computation_time_ms=computation_time, ) def _compute_transitive_closure( relation: set[tuple[int, int]], items: list[int], ) -> set[tuple[int, int]]: """Compute transitive closure using Floyd-Warshall algorithm.""" # Build adjacency matrix item_to_idx = {item: i for i, item in enumerate(items)} n = len(items) reachable = np.zeros((n, n), dtype=bool) for x, y in relation: if x in item_to_idx and y in item_to_idx: reachable[item_to_idx[x], item_to_idx[y]] = True # Floyd-Warshall for k in range(n): for i in range(n): for j in range(n): if reachable[i, k] and reachable[k, j]: reachable[i, j] = True # Extract transitive closure closure = set() for i in range(n): for j in range(n): if reachable[i, j]: closure.add((items[i], items[j])) return closure def _find_preference_cycles( relation: set[tuple[int, int]], items: list[int], ) -> list[tuple[int, ...]]: """Find cycles in the revealed preference relation using DFS.""" # Build adjacency list adj: dict[int, list[int]] = {item: [] for item in items} for x, y in relation: if x in adj: adj[x].append(y) cycles = [] visited = set() rec_stack = set() path = [] def dfs(node: int) -> bool: visited.add(node) rec_stack.add(node) path.append(node) for neighbor in adj.get(node, []): if neighbor not in visited: if dfs(neighbor): return True elif neighbor in rec_stack: # Found a cycle - extract it cycle_start = path.index(neighbor) cycle = tuple(path[cycle_start:]) if len(cycle) > 1: cycles.append(cycle) return False # Continue searching for more cycles path.pop() rec_stack.remove(node) return False for item in items: if item not in visited: dfs(item) return cycles def _topological_sort( transitive_closure: set[tuple[int, int]], items: list[int], ) -> tuple[int, ...]: """Perform topological sort to find a compatible preference ordering.""" # Build in-degree count in_degree = {item: 0 for item in items} adj: dict[int, list[int]] = {item: [] for item in items} for x, y in transitive_closure: if x in adj and y in in_degree: adj[x].append(y) in_degree[y] += 1 # Kahn's algorithm result = [] queue = [item for item in items if in_degree[item] == 0] while queue: # Sort for deterministic output queue.sort(reverse=True) node = queue.pop() result.append(node) for neighbor in adj[node]: in_degree[neighbor] -= 1 if in_degree[neighbor] == 0: queue.append(neighbor) return tuple(result) def _construct_attention_filter( log: "MenuChoiceLog", menu_to_choice: dict[frozenset[int], int], transitive_closure: set[tuple[int, int]], preference: tuple[int, ...], ) -> dict[frozenset[int], set[int]]: """ Construct an attention filter that rationalizes the choice data. Uses the construction from Theorem 3 in Masatlioglu et al. (2012): Gamma(S) = {x in S : c(S) is preferred to x} union {c(S)} """ pref_set = set(transitive_closure) attention_filter: dict[frozenset[int], set[int]] = {} for menu in log.menus: menu_key = frozenset(menu) if menu_key not in menu_to_choice: continue choice = menu_to_choice[menu_key] # Gamma(S) = {c(S)} union {x in S : c(S) > x} consideration = {choice} for item in menu: if item == choice: continue # Check if choice is preferred to item if (choice, item) in pref_set: consideration.add(item) attention_filter[menu_key] = consideration return attention_filter def recover_preference_with_attention( log: "MenuChoiceLog", ) -> tuple[tuple[int, ...] | None, dict[frozenset[int], set[int]] | None]: """ Recover preference ordering and attention filter from choice data. If the data satisfies WARP(LA), returns a preference ordering and attention filter that rationalize the choices. Otherwise returns None. Args: log: MenuChoiceLog with menus and choices Returns: Tuple of (preference_ordering, attention_filter) or (None, None) Example: >>> from prefgraph import MenuChoiceLog, recover_preference_with_attention >>> log = MenuChoiceLog(menus=[...], choices=[...]) >>> pref, attn = recover_preference_with_attention(log) >>> if pref is not None: ... print(f"Preference: {' > '.join(str(x) for x in pref)}") """ result = test_warp_la(log) return result.recovered_preference, result.attention_filter def validate_attention_filter_consistency( log: "MenuChoiceLog", attention_filter: dict[frozenset[int], set[int]], ) -> dict: """ Validate if given attention filter is consistent with a preference ordering. Tests if there exists a preference ordering such that choices are optimal within the attention filter. Args: log: MenuChoiceLog with menus and choices attention_filter: Dict mapping menu (frozenset) to consideration set Returns: Dictionary with validation results Example: >>> log = MenuChoiceLog(menus=[...], choices=[...]) >>> filter = {frozenset({0,1,2}): {0,1}, frozenset({1,2}): {1}} >>> result = validate_attention_filter_consistency(log, filter) >>> print(result['is_valid']) """ # Build revealed preference from filtered choices revealed_pref: set[tuple[int, int]] = set() violations = [] for menu, choice in zip(log.menus, log.choices): menu_key = frozenset(menu) consideration = attention_filter.get(menu_key, set(menu)) # Choice must be in consideration set if choice not in consideration: violations.append(f"Choice {choice} not in consideration set for {menu}") continue # Choice must be preferred to all other considered items for item in consideration: if item != choice: revealed_pref.add((choice, item)) # Check for cycles all_items = sorted(log.all_items) cycles = _find_preference_cycles(revealed_pref, all_items) is_valid = len(violations) == 0 and len(cycles) == 0 return { "is_valid": is_valid, "violations": violations, "preference_cycles": cycles, "revealed_preference": list(revealed_pref), } # ============================================================================= # RANDOM ATTENTION MODEL (Cattaneo et al. 2020) # ============================================================================= def fit_random_attention_model( log: "StochasticChoiceLog", assumption: str = "monotonic", alpha: float = 0.05, n_bootstrap: int = 1000, ) -> RandomAttentionResult: """ Fit Random Attention Model (RAM) to stochastic choice data. RAM assumes a fixed preference ordering and random attention: P(choose x from S) = P(x most preferred among considered items) Implements the method from Cattaneo et al. (2020) "The Random Attention Model". Args: log: StochasticChoiceLog with choice frequency data assumption: RAM variant ("monotonic", "independent", "general") - "monotonic": attention probability weakly increases with preference rank - "independent": attention probabilities independent across items - "general": minimal restrictions on attention alpha: Significance level for consistency test n_bootstrap: Number of bootstrap samples for p-value computation Returns: RandomAttentionResult with consistency test, preference, and attention scores Example: >>> from prefgraph import StochasticChoiceLog, fit_random_attention_model >>> log = StochasticChoiceLog( ... menus=[{0,1,2}, {0,1}], ... choice_frequencies=[{0: 50, 1: 30, 2: 20}, {0: 60, 1: 40}], ... total_observations_per_menu=[100, 100] ... ) >>> result = fit_random_attention_model(log) >>> print(f"RAM consistent: {result.is_ram_consistent}") >>> print(f"Estimated preference: {result.preference_ranking}") References: Cattaneo, M. D., Ma, X., Masatlioglu, Y., & Suleymanov, E. (2020). A Random Attention Model. Journal of Political Economy, 128(7). """ start_time = time.perf_counter() n_obs = sum(log.total_observations_per_menu) all_items = sorted(log.all_items) n_items = len(all_items) # Find compatible preference orderings compatible_prefs = _find_ram_compatible_preferences(log, assumption) # If no compatible preferences, RAM is not consistent if not compatible_prefs: computation_time = (time.perf_counter() - start_time) * 1000 return RandomAttentionResult( is_ram_consistent=False, preference_ranking=None, attention_bounds={}, item_attention_scores=np.zeros(n_items), test_statistic=1.0, p_value=0.0, compatible_preferences=[], assumption=assumption, num_observations=n_obs, computation_time_ms=computation_time, ) # Use first compatible preference for attention estimation best_pref = compatible_prefs[0] # Estimate attention probabilities attention_bounds, item_scores = _estimate_ram_attention(log, best_pref, assumption) # Compute test statistic (distance to RAM) test_stat = _compute_ram_test_statistic(log, best_pref, assumption) # Bootstrap p-value computation p_value = _bootstrap_ram_pvalue(log, test_stat, assumption, n_bootstrap) is_consistent = test_stat < 1e-6 or p_value > alpha computation_time = (time.perf_counter() - start_time) * 1000 return RandomAttentionResult( is_ram_consistent=is_consistent, preference_ranking=best_pref, attention_bounds=attention_bounds, item_attention_scores=item_scores, test_statistic=test_stat, p_value=p_value, compatible_preferences=compatible_prefs, assumption=assumption, num_observations=n_obs, computation_time_ms=computation_time, ) def _find_ram_compatible_preferences( log: "StochasticChoiceLog", assumption: str, ) -> list[tuple[int, ...]]: """Find preference orderings compatible with RAM constraints.""" all_items = sorted(log.all_items) n_items = len(all_items) # For small n, enumerate all orderings if n_items <= 6: compatible = [] for perm in permutations(all_items): if _is_ram_compatible(log, perm, assumption): compatible.append(perm) return compatible # For larger n, exact enumeration is computationally infeasible raise ComputationalLimitError( f"RAM search requires factorial enumeration which is infeasible for n={n_items} items. " f"Maximum supported is 6 items (6! = 720 orderings)." ) def _is_ram_compatible( log: "StochasticChoiceLog", preference: tuple[int, ...], assumption: str, ) -> bool: """Check if preference is compatible with RAM constraints.""" # Build rank mapping rank = {item: i for i, item in enumerate(preference)} # For each menu, check RAM constraints for m_idx in range(log.num_menus): menu = log.menus[m_idx] total = log.total_observations_per_menu[m_idx] if total == 0: continue # Get choice probabilities probs = {} for item in menu: probs[item] = log.get_choice_probability(m_idx, item) # Check RAM constraint: for items in menu, higher ranked items # should have higher choice probability conditional on being considered menu_list = sorted(menu, key=lambda x: rank.get(x, len(preference))) for i, item_i in enumerate(menu_list): for j, item_j in enumerate(menu_list): if i >= j: continue # item_i is ranked higher than item_j # Under RAM, P(choose item_i | S) >= P(choose item_j | S) * something # This is a necessary but not sufficient condition if probs.get(item_i, 0) == 0 and probs.get(item_j, 0) > 0: # Higher ranked item never chosen, lower ranked chosen # This violates RAM under monotonic attention if assumption == "monotonic": return False # More rigorous check using LP return _check_ram_feasibility_lp(log, preference, assumption) def _check_ram_feasibility_lp( log: "StochasticChoiceLog", preference: tuple[int, ...], assumption: str, ) -> bool: """Check RAM feasibility using linear programming.""" all_items = sorted(log.all_items) n_items = len(all_items) rank = {item: i for i, item in enumerate(preference)} item_to_idx = {item: i for i, item in enumerate(all_items)} # Decision variables: attention probabilities mu_i for each item # Variables: mu_0, mu_1, ..., mu_{n-1} n_vars = n_items # Build constraints from RAM model # P(choose x | S) = mu_x * prod_{y in S, y > x}(1 - mu_y) # These are non-linear, so we use bounds approach # Use simple feasibility check based on necessary conditions c = np.zeros(n_vars) # Dummy objective A_ub = [] b_ub = [] # Attention probabilities must be in [0, 1] bounds = [(0.0, 1.0) for _ in range(n_vars)] # For monotonic assumption: higher ranked items have higher attention if assumption == "monotonic": for i in range(n_items - 1): # mu_i >= mu_{i+1} for preference order row = np.zeros(n_vars) pref_item_i = preference[i] pref_item_j = preference[i + 1] row[item_to_idx[pref_item_i]] = -1 row[item_to_idx[pref_item_j]] = 1 A_ub.append(row) b_ub.append(0.0) if len(A_ub) == 0: return True # No constraints to violate A_ub = np.array(A_ub) b_ub = np.array(b_ub) try: result = linprog(c, A_ub=A_ub, b_ub=b_ub, bounds=bounds, method='highs') return result.success except Exception as e: from prefgraph.core.exceptions import SolverError raise SolverError( f"LP solver failed checking RAM feasibility. Original error: {e}" ) from e def _estimate_ram_attention( log: "StochasticChoiceLog", preference: tuple[int, ...], assumption: str, ) -> tuple[dict[tuple[frozenset[int], int], tuple[float, float]], NDArray[np.float64]]: """Estimate attention probabilities under RAM.""" all_items = sorted(log.all_items) n_items = len(all_items) rank = {item: i for i, item in enumerate(preference)} # Simple estimation: use choice frequencies as attention proxy choice_counts = np.zeros(n_items) appearance_counts = np.zeros(n_items) for m_idx in range(log.num_menus): menu = log.menus[m_idx] freqs = log.choice_frequencies[m_idx] total = log.total_observations_per_menu[m_idx] for item in menu: idx = all_items.index(item) appearance_counts[idx] += total choice_counts[idx] += freqs.get(item, 0) # Estimate attention as choice probability adjusted for preference item_scores = np.zeros(n_items) for i, item in enumerate(all_items): if appearance_counts[i] > 0: item_scores[i] = min(1.0, choice_counts[i] / appearance_counts[i] * 2) # Compute bounds for each (menu, item) pair attention_bounds: dict[tuple[frozenset[int], int], tuple[float, float]] = {} for m_idx in range(log.num_menus): menu = log.menus[m_idx] menu_key = frozenset(menu) for item in menu: idx = all_items.index(item) # Simple bounds: [0, 1] lower = max(0.0, item_scores[idx] - 0.2) upper = min(1.0, item_scores[idx] + 0.2) attention_bounds[(menu_key, item)] = (lower, upper) return attention_bounds, item_scores def _compute_ram_test_statistic( log: "StochasticChoiceLog", preference: tuple[int, ...], assumption: str, ) -> float: """Compute test statistic for RAM consistency.""" # Simple test: sum of squared deviations from RAM predictions rank = {item: i for i, item in enumerate(preference)} total_deviation = 0.0 n_comparisons = 0 for m_idx in range(log.num_menus): menu = list(log.menus[m_idx]) total = log.total_observations_per_menu[m_idx] if total == 0: continue # Sort menu by preference menu_sorted = sorted(menu, key=lambda x: rank.get(x, len(preference))) # Check monotonicity: higher ranked items should be chosen more often for i in range(len(menu_sorted) - 1): item_i = menu_sorted[i] item_j = menu_sorted[i + 1] p_i = log.get_choice_probability(m_idx, item_i) p_j = log.get_choice_probability(m_idx, item_j) # Under RAM with monotonic attention, P(i) >= P(j) * some_factor # Use simple violation measure if p_i < p_j: total_deviation += (p_j - p_i) ** 2 n_comparisons += 1 if n_comparisons == 0: return 0.0 return total_deviation / n_comparisons def _bootstrap_ram_pvalue( log: "StochasticChoiceLog", observed_stat: float, assumption: str, n_bootstrap: int, ) -> float: """Compute p-value using bootstrap.""" if n_bootstrap == 0: return 0.5 # No bootstrap, return neutral p-value all_items = sorted(log.all_items) n_items = len(all_items) # Generate bootstrap samples and compute test statistics bootstrap_stats = [] for _ in range(min(n_bootstrap, 100)): # Limit for speed # Generate random preference perm = tuple(np.random.permutation(all_items).tolist()) stat = _compute_ram_test_statistic(log, perm, assumption) bootstrap_stats.append(stat) if not bootstrap_stats: return 0.5 # P-value: proportion of bootstrap stats >= observed p_value = np.mean([s >= observed_stat for s in bootstrap_stats]) return float(p_value) def test_ram_consistency( log: "StochasticChoiceLog", preference: tuple[int, ...] | None = None, alpha: float = 0.05, ) -> RandomAttentionResult: """ Test if stochastic choice data is consistent with Random Attention Model. Args: log: StochasticChoiceLog with choice frequency data preference: Optional fixed preference ordering to test alpha: Significance level Returns: RandomAttentionResult with test results """ return fit_random_attention_model(log, assumption="monotonic", alpha=alpha) def estimate_attention_probabilities( log: "StochasticChoiceLog", preference: tuple[int, ...], ) -> NDArray[np.float64]: """ Estimate item attention probabilities given a preference ordering. Args: log: StochasticChoiceLog with choice data preference: Fixed preference ordering (best to worst) Returns: Array of attention probabilities per item """ _, item_scores = _estimate_ram_attention(log, preference, "monotonic") return item_scores def compute_attention_bounds( log: "StochasticChoiceLog", preference: tuple[int, ...], item: int, menu: frozenset[int], ) -> tuple[float, float]: """ Compute bounds on attention probability for item in menu. Args: log: StochasticChoiceLog with choice data preference: Fixed preference ordering item: Item to compute bounds for menu: Menu containing the item Returns: Tuple of (lower_bound, upper_bound) for attention probability """ bounds, _ = _estimate_ram_attention(log, preference, "monotonic") return bounds.get((menu, item), (0.0, 1.0)) # ============================================================================= # ATTENTION OVERLOAD (Lleras et al. 2017) # ============================================================================= def test_attention_overload( log: "MenuChoiceLog", quality_metric: str = "consistency", ) -> AttentionOverloadResult: """ Test for attention overload in menu choices (Lleras et al. 2017). Attention overload occurs when choice quality degrades as menu size increases. This is the "paradox of choice" - too many options can harm decision quality. Args: log: MenuChoiceLog with menus and choices quality_metric: How to measure quality per menu size: - "consistency": SARP consistency rate (default) - "frequency": Probability of choosing high-frequency items Returns: AttentionOverloadResult with overload detection and analysis Example: >>> from prefgraph import MenuChoiceLog, test_attention_overload >>> log = MenuChoiceLog(menus=[...], choices=[...]) >>> result = test_attention_overload(log) >>> if result.has_overload: ... print(f"Reduce menu size below {result.critical_menu_size}") References: Lleras, J. S., Masatlioglu, Y., Nakajima, D., & Ozbay, E. Y. (2017). When More is Less: Limited Consideration. Working Paper. """ start_time = time.perf_counter() n_obs = log.num_observations # Group observations by menu size size_groups: dict[int, list[int]] = {} for t, menu in enumerate(log.menus): size = len(menu) if size not in size_groups: size_groups[size] = [] size_groups[size].append(t) # Compute quality for each menu size menu_size_quality: dict[int, float] = {} if quality_metric == "consistency": # Quality = rate of choices consistent with SARP from prefgraph.algorithms.abstract_choice import validate_menu_sarp sarp_result = validate_menu_sarp(log) # For each menu size, compute fraction of consistent observations for size, obs_indices in size_groups.items(): if not obs_indices: continue # Simple heuristic: use the revealed preference structure # Count observations where the choice is optimal consistent_count = 0 for t in obs_indices: # If SARP is satisfied overall, all are consistent if sarp_result.is_consistent: consistent_count += 1 else: # Check if this observation participates in violations is_violated = any( t in cycle for cycle in sarp_result.violations ) if hasattr(sarp_result, 'violations') else False if not is_violated: consistent_count += 1 menu_size_quality[size] = consistent_count / len(obs_indices) else: # Quality = how often high-frequency items are chosen choice_counts: dict[int, int] = {} for choice in log.choices: choice_counts[choice] = choice_counts.get(choice, 0) + 1 for size, obs_indices in size_groups.items(): if not obs_indices: continue # Quality = avg normalized choice frequency total_quality = 0.0 max_count = max(choice_counts.values()) if choice_counts else 1 for t in obs_indices: choice = log.choices[t] total_quality += choice_counts.get(choice, 0) / max_count menu_size_quality[size] = total_quality / len(obs_indices) # Regress quality ~ log(menu_size) if len(menu_size_quality) >= 2: sizes = np.array(sorted(menu_size_quality.keys())) qualities = np.array([menu_size_quality[s] for s in sizes]) # Log transform of sizes log_sizes = np.log(sizes + 1) # Simple linear regression n = len(sizes) mean_x = np.mean(log_sizes) mean_y = np.mean(qualities) numerator = np.sum((log_sizes - mean_x) * (qualities - mean_y)) denominator = np.sum((log_sizes - mean_x) ** 2) if denominator > 1e-10: slope = numerator / denominator intercept = mean_y - slope * mean_x # Compute residuals and p-value (simplified) y_pred = slope * log_sizes + intercept ss_res = np.sum((qualities - y_pred) ** 2) ss_tot = np.sum((qualities - mean_y) ** 2) if ss_tot > 1e-10: r_squared = 1 - ss_res / ss_tot else: r_squared = 0.0 # Simplified p-value estimate based on sample size and r_squared # Using approximation: higher r_squared and more data = lower p-value if n >= 3 and r_squared > 0.1: p_value = max(0.001, (1 - r_squared) / (n - 1)) else: p_value = 1.0 else: slope = 0.0 p_value = 1.0 else: slope = 0.0 p_value = 1.0 # Detect overload has_overload = slope < -0.05 and p_value < 0.1 # Find critical menu size (where quality drops below mean) critical_menu_size = None if has_overload and menu_size_quality: mean_quality = np.mean(list(menu_size_quality.values())) for size in sorted(menu_size_quality.keys()): if menu_size_quality[size] < mean_quality: critical_menu_size = size break # Overload severity (0-1) if has_overload: overload_severity = min(1.0, abs(slope) * 2) # Scale slope to 0-1 else: overload_severity = 0.0 computation_time = (time.perf_counter() - start_time) * 1000 return AttentionOverloadResult( has_overload=has_overload, critical_menu_size=critical_menu_size, overload_severity=overload_severity, menu_size_quality=menu_size_quality, regression_slope=slope, p_value=p_value, num_observations=n_obs, computation_time_ms=computation_time, ) # ============================================================================= # STATUS QUO BIAS (Masatlioglu & Ok 2005) # ============================================================================= def test_status_quo_bias( log: "MenuChoiceLog", defaults: list[int] | None = None, ) -> StatusQuoBiasResult: """ Test for status quo bias in menu choices (Masatlioglu & Ok 2005). Status quo bias occurs when default options are chosen at higher rates than preference alone would predict. Args: log: MenuChoiceLog with menus and choices defaults: Default item per menu (optional). If None, auto-detected as the most common choice in similar menus. Returns: StatusQuoBiasResult with bias detection and analysis Example: >>> from prefgraph import MenuChoiceLog, test_status_quo_bias >>> log = MenuChoiceLog( ... menus=[{0, 1, 2}, {0, 1, 2}, {0, 1, 2}], ... choices=[0, 0, 1] ... ) >>> result = test_status_quo_bias(log, defaults=[0, 0, 0]) >>> if result.has_status_quo_bias: ... print(f"Default advantage: {result.default_advantage:.2%}") References: Masatlioglu, Y., & Ok, E. A. (2005). Rational choice with status quo bias. Journal of Economic Theory, 121(1), 1-29. """ start_time = time.perf_counter() n_obs = log.num_observations # Auto-detect defaults if not provided if defaults is None: # Use first item in each menu as default (common UI pattern) defaults = [] for menu in log.menus: if menu: defaults.append(min(menu)) # First item by index else: defaults.append(-1) # No default # Ensure defaults list matches observations if len(defaults) != n_obs: defaults = defaults[:n_obs] + [-1] * (n_obs - len(defaults)) # Count default choices num_defaults = 0 default_chosen = 0 non_default_chosen = 0 # Track bias by item item_default_counts: dict[int, int] = {} # Times as default item_chosen_as_default: dict[int, int] = {} # Times chosen when default for t, (menu, choice) in enumerate(zip(log.menus, log.choices)): default = defaults[t] if default < 0 or default not in menu: continue num_defaults += 1 if default not in item_default_counts: item_default_counts[default] = 0 item_chosen_as_default[default] = 0 item_default_counts[default] += 1 if choice == default: default_chosen += 1 item_chosen_as_default[default] += 1 else: non_default_chosen += 1 # Compute expected choice rate without bias (uniform assumption) expected_rate = 0.0 if num_defaults > 0: total_menu_sizes = 0 for t, menu in enumerate(log.menus): if defaults[t] in menu: total_menu_sizes += len(menu) if total_menu_sizes > 0: expected_rate = num_defaults / total_menu_sizes # Actual default choice rate actual_rate = default_chosen / num_defaults if num_defaults > 0 else 0.0 # Default advantage default_advantage = max(0.0, actual_rate - expected_rate) # Bias by item bias_by_item: dict[int, float] = {} for item in item_default_counts: if item_default_counts[item] > 0: item_rate = item_chosen_as_default[item] / item_default_counts[item] bias_by_item[item] = item_rate - expected_rate # Simple significance test # Under null hypothesis of no bias, default choice follows binomial # p-value approximation if num_defaults >= 10 and expected_rate > 0: # Z-test approximation se = np.sqrt(expected_rate * (1 - expected_rate) / num_defaults) if se > 0: z_stat = (actual_rate - expected_rate) / se # One-sided p-value (testing for positive bias) from scipy import stats try: p_value = 1 - stats.norm.cdf(z_stat) except Exception as e: raise StatisticalError( f"Failed to compute p-value for status quo bias test. Original error: {e}" ) from e else: p_value = 1.0 else: p_value = 1.0 has_status_quo_bias = default_advantage > 0.05 and p_value < 0.1 computation_time = (time.perf_counter() - start_time) * 1000 return StatusQuoBiasResult( has_status_quo_bias=has_status_quo_bias, default_advantage=default_advantage, bias_by_item=bias_by_item, p_value=p_value, num_defaults=num_defaults, num_observations=n_obs, computation_time_ms=computation_time, ) # ============================================================================= # LEGACY ALIASES FOR WARP(LA) # ============================================================================= check_warp_la = test_warp_la """Legacy alias: use test_warp_la instead.""" validate_attention_filter = validate_attention_filter_consistency """Legacy alias: use validate_attention_filter_consistency instead."""