"""BehavioralSummary: Unified summary of all behavioral tests.
This module provides a statsmodels-style unified summary for behavioral
analysis results, combining consistency tests and goodness-of-fit metrics.
"""
from __future__ import annotations
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
import numpy as np
from prefgraph.core.display import ResultDisplayMixin
from prefgraph.core.mixins import ResultSummaryMixin
if TYPE_CHECKING:
from prefgraph.core.session import (
BehaviorLog,
MenuChoiceLog,
RiskChoiceLog,
StochasticChoiceLog,
ProductionLog,
)
from prefgraph.core.result import (
GARPResult,
AEIResult,
MPIResult,
WARPResult,
SARPResult,
HoutmanMaksResult,
RiskProfileResult,
RUMConsistencyResult,
RegularityResult,
StochasticTransitivityResult,
StochasticChoiceResult,
ProductionGARPResult,
OptimalEfficiencyResult,
)
[docs]
@dataclass
class BehavioralSummary(ResultDisplayMixin):
"""Unified summary of all behavioral tests (statsmodels-style).
Provides a comprehensive overview of behavioral consistency analysis,
combining multiple tests and metrics in a single, professional output.
Attributes:
garp_result: GARP consistency test result
warp_result: WARP consistency test result (optional)
sarp_result: SARP consistency test result (optional)
aei_result: Afriat Efficiency Index result
mpi_result: Money Pump Index result
houtman_maks_result: Houtman-Maks efficiency result (optional)
num_observations: Number of observations in the dataset
num_goods: Number of goods/dimensions
computation_time_ms: Total computation time in milliseconds
Example:
>>> from prefgraph import BehaviorLog, BehavioralSummary
>>> log = BehaviorLog(prices, quantities)
>>> summary = BehavioralSummary.from_log(log)
>>> print(summary.summary())
"""
garp_result: GARPResult
warp_result: WARPResult | None
sarp_result: SARPResult | None
aei_result: AEIResult
mpi_result: MPIResult
houtman_maks_result: HoutmanMaksResult | None
optimal_efficiency_result: "OptimalEfficiencyResult | None"
num_observations: int
num_goods: int
computation_time_ms: float
# Rich stats fields (added for statsmodels-style output)
price_stats: dict[str, float] | None = field(default=None, repr=False)
quantity_stats: dict[str, float] | None = field(default=None, repr=False)
expenditure_stats: dict[str, float] | None = field(default=None, repr=False)
r_density: float | None = field(default=None, repr=False)
p_density: float | None = field(default=None, repr=False)
r_star_density: float | None = field(default=None, repr=False)
violation_pair_count: int | None = field(default=None, repr=False)
user_id: str | None = field(default=None, repr=False)
@property
def is_consistent(self) -> bool:
"""True if data passes GARP consistency test."""
return self.garp_result.is_consistent
@property
def efficiency_index(self) -> float:
"""Afriat Efficiency Index (AEI) score."""
return self.aei_result.efficiency_index
@property
def mpi_value(self) -> float:
"""Money Pump Index value."""
return self.mpi_result.mpi_value
[docs]
def score(self) -> float:
"""Return aggregate scikit-learn style score in [0, 1].
Combines AEI and (1 - MPI) with equal weighting.
"""
aei = self.aei_result.efficiency_index
mpi = min(1.0, self.mpi_result.mpi_value)
return (aei + (1.0 - mpi)) / 2.0
[docs]
def summary(self) -> str:
"""Return formatted summary table (statsmodels-style).
Returns a professional text summary including:
- Two-column header with key results
- Input data statistics (prices, quantities, expenditure)
- Revealed preference graph density
- Consistency test results with [+]/[-] indicators
- Goodness-of-fit metrics with sub-details
- Interpretation guidance
Returns:
Multi-line formatted string suitable for printing.
"""
m = ResultSummaryMixin
W = 70
sep = "-" * W
def _indicator(passed: bool) -> str:
return f"[+] {'PASS' if passed else 'FAIL'}" if passed else f"[-] FAIL"
def _time_str(ms: float) -> str:
return f"{ms:.2f} ms" if ms < 1000 else f"{ms / 1000:.2f} s"
lines: list[str] = []
# === Two-column header ===
lines.append("=" * W)
lines.append(" " * ((W - 18) // 2) + "BEHAVIORAL SUMMARY")
lines.append("=" * W)
uid = self.user_id or "N/A"
garp_str = "[+] PASS" if self.garp_result.is_consistent else "[-] FAIL"
warp_str = "[+] PASS" if (self.warp_result and self.warp_result.is_consistent) else (
"[-] FAIL" if self.warp_result else "N/A"
)
sarp_str = "[+] PASS" if (self.sarp_result and self.sarp_result.is_consistent) else (
"[-] FAIL" if self.sarp_result else "N/A"
)
lines.append(m._format_two_column_row("User ID", uid, "GARP", garp_str, W))
lines.append(m._format_two_column_row("No. Observations", self.num_observations, "WARP", warp_str, W))
lines.append(m._format_two_column_row("No. Goods", self.num_goods, "SARP", sarp_str, W))
lines.append(m._format_two_column_row(
"Method", "Floyd-Warshall",
"AEI", f"{self.aei_result.efficiency_index:.4f}", W,
))
lines.append(m._format_two_column_row(
"Computation Time", _time_str(self.computation_time_ms),
"MPI", f"{self.mpi_result.mpi_value:.4f}", W,
))
lines.append("=" * W)
# === Input Data Statistics ===
if self.price_stats and self.quantity_stats and self.expenditure_stats:
lines.append("")
lines.append("Input Data:")
lines.append(sep)
lines.append(m._format_descriptive_table({
"Prices": self.price_stats,
"Quantities": self.quantity_stats,
"Expenditure": self.expenditure_stats,
}, W))
# === Revealed Preference Graph ===
if self.r_density is not None:
T = self.num_observations
T2 = T * T
lines.append("")
lines.append("Revealed Preference Graph:")
lines.append(sep)
r_edges = int(round(self.r_density * T2))
p_edges = int(round(self.p_density * T2)) if self.p_density is not None else 0
rs_edges = int(round(self.r_star_density * T2)) if self.r_star_density is not None else 0
lines.append(m._format_matrix_density("R (direct, p'x >= p'y)", r_edges, T2, W))
lines.append(m._format_matrix_density("P (strict, p'x > p'y)", p_edges, T2, W))
lines.append(m._format_matrix_density("R* (transitive closure)", rs_edges, T2, W))
vp = self.violation_pair_count if self.violation_pair_count is not None else 0
lines.append(m._format_metric("Violation pairs (R* & P')", vp, W - 4))
# === Consistency Tests ===
lines.append("")
lines.append("Consistency Tests:")
lines.append(sep)
n_garp = self.garp_result.num_violations
garp_detail = f" ({n_garp} cycle{'s' if n_garp != 1 else ''})" if n_garp > 0 else ""
lines.append(m._format_metric("GARP", f"{_indicator(self.garp_result.is_consistent)}{garp_detail}", W - 4))
if self.warp_result is not None:
n_warp = self.warp_result.num_violations
warp_detail = f" ({n_warp} violation{'s' if n_warp != 1 else ''})" if n_warp > 0 else ""
lines.append(m._format_metric("WARP", f"{_indicator(self.warp_result.is_consistent)}{warp_detail}", W - 4))
if self.sarp_result is not None:
n_sarp = self.sarp_result.num_violations if hasattr(self.sarp_result, 'num_violations') else 0
sarp_detail = f" ({n_sarp} cycle{'s' if n_sarp != 1 else ''})" if n_sarp > 0 else ""
lines.append(m._format_metric("SARP", f"{_indicator(self.sarp_result.is_consistent)}{sarp_detail}", W - 4))
# === Goodness-of-Fit ===
lines.append("")
lines.append("Goodness-of-Fit:")
lines.append(sep)
# AEI with sub-metrics
aei = self.aei_result
lines.append(m._format_metric("Afriat Efficiency (AEI)", aei.efficiency_index, W - 4))
lines.append(m._format_metric(" Binary search iterations", aei.binary_search_iterations, W - 4))
waste = (1.0 - aei.efficiency_index) * 100
lines.append(m._format_metric(" Budget waste", f"{waste:.2f}%", W - 4))
# MPI with sub-metrics
mpi = self.mpi_result
lines.append(m._format_metric("Money Pump Index (MPI)", mpi.mpi_value, W - 4))
lines.append(m._format_metric(" Violation cycles", mpi.num_cycles, W - 4))
if mpi.worst_cycle is not None:
worst_cost = max(c for _, c in mpi.cycle_costs) if mpi.cycle_costs else mpi.mpi_value
lines.append(m._format_metric(" Worst cycle cost", f"{worst_cost:.4f}", W - 4))
lines.append(m._format_metric(" Total expenditure", f"${mpi.total_expenditure:,.2f}", W - 4))
# Houtman-Maks
if self.houtman_maks_result is not None:
hm = self.houtman_maks_result
hm_score = 1.0 - hm.fraction
lines.append(m._format_metric("Houtman-Maks Index", hm_score, W - 4))
lines.append(m._format_metric(
" Observations removed",
f"{hm.num_removed} / {self.num_observations}", W - 4,
))
# === Power Analysis ===
if self.optimal_efficiency_result is not None:
lines.append("")
lines.append("Power Analysis:")
lines.append(sep)
pr = self.optimal_efficiency_result
bronars = 1.0 - pr.relative_areas[-1] if pr.relative_areas else 0.0
lines.append(m._format_metric("Bronars Power", bronars, W - 4))
lines.append(m._format_metric("Optimal Efficiency (e*)", pr.optimal_efficiency, W - 4))
lines.append(m._format_metric("Optimal Measure (m*)", pr.optimal_measure, W - 4))
# === Interpretation ===
lines.append("")
lines.append("Interpretation:")
lines.append(sep)
lines.append(f" {m._format_interpretation(aei.efficiency_index, 'efficiency')}")
if not self.garp_result.is_consistent:
lines.append(f" ~{waste:.1f}% budget waste; an arbitrager could extract ~{mpi.mpi_value * 100:.1f}%.")
if self.houtman_maks_result is not None:
hm = self.houtman_maks_result
pct = 100.0 * hm.num_removed / self.num_observations if self.num_observations > 0 else 0
lines.append(f" {hm.num_removed} observations ({pct:.1f}%) must be removed for full consistency.")
# === Footer ===
lines.append("=" * W)
return "\n".join(lines)
def _repr_html_(self) -> str:
"""Return HTML representation for Jupyter notebook display."""
from prefgraph.viz.html_templates import render_behavioral_summary_html
# Prepare consistency tests
consistency_tests = [
("GARP", self.garp_result.is_consistent),
]
if self.warp_result is not None:
consistency_tests.append(("WARP", self.warp_result.is_consistent))
if self.sarp_result is not None:
consistency_tests.append(("SARP", self.sarp_result.is_consistent))
# Prepare goodness metrics
goodness_metrics = [
("Afriat Efficiency (AEI)", self.aei_result.efficiency_index),
("Money Pump Index (MPI)", self.mpi_result.mpi_value),
]
if self.houtman_maks_result is not None:
hm_score = 1.0 - self.houtman_maks_result.fraction
goodness_metrics.append(("Houtman-Maks Index", hm_score))
return render_behavioral_summary_html(
num_observations=self.num_observations,
num_goods=self.num_goods,
consistency_tests=consistency_tests,
goodness_metrics=goodness_metrics,
computation_time_ms=self.computation_time_ms,
)
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return dictionary representation for serialization."""
result = {
"is_consistent": self.is_consistent,
"efficiency_index": self.efficiency_index,
"mpi_value": self.mpi_value,
"num_observations": self.num_observations,
"num_goods": self.num_goods,
"score": self.score(),
"computation_time_ms": self.computation_time_ms,
"garp": self.garp_result.to_dict(),
"aei": self.aei_result.to_dict(),
"mpi": self.mpi_result.to_dict(),
}
if self.warp_result is not None:
result["warp"] = self.warp_result.to_dict()
if self.sarp_result is not None:
result["sarp"] = self.sarp_result.to_dict()
if self.houtman_maks_result is not None:
result["houtman_maks"] = self.houtman_maks_result.to_dict()
if self.optimal_efficiency_result is not None:
power_result = self.optimal_efficiency_result
bronars_power = 1.0 - power_result.relative_areas[-1] if power_result.relative_areas else 0.0
result["power_analysis"] = {
"bronars_power": bronars_power,
"optimal_efficiency": power_result.optimal_efficiency,
"optimal_measure": power_result.optimal_measure,
}
return result
[docs]
def short_summary(self) -> str:
"""Return one-liner summary."""
indicator = "[+]" if self.is_consistent else "[-]"
return f"BehavioralSummary: {indicator} AEI={self.efficiency_index:.4f}, MPI={self.mpi_value:.4f}"
def __repr__(self) -> str:
"""Compact string representation."""
indicator = "[+]" if self.is_consistent else "[-]"
return (
f"BehavioralSummary: {indicator} "
f"n={self.num_observations}, "
f"AEI={self.efficiency_index:.4f}, "
f"MPI={self.mpi_value:.4f}"
)
def __str__(self) -> str:
"""Return formatted summary table when printed."""
return self.summary()
[docs]
@classmethod
def from_log(
cls,
log: BehaviorLog,
include_warp: bool = True,
include_sarp: bool = True,
include_power: bool = False,
) -> "BehavioralSummary":
"""Create BehavioralSummary by running all tests on a BehaviorLog.
This factory method runs GARP, AEI, MPI, and optionally WARP, SARP,
Houtman-Maks, and power analysis tests, combining results into a unified summary.
Args:
log: BehaviorLog containing the behavioral data
include_warp: Whether to include WARP test (default: True)
include_sarp: Whether to include SARP test (default: True)
include_power: Whether to include power analysis (default: False)
Returns:
BehavioralSummary instance with all test results
Example:
>>> summary = BehavioralSummary.from_log(behavior_log)
>>> print(summary)
>>> # With power analysis
>>> summary = BehavioralSummary.from_log(behavior_log, include_power=True)
>>> print(summary)
"""
start_time = time.perf_counter()
# Import algorithms here to avoid circular imports
from prefgraph.algorithms.garp import validate_consistency, check_warp
from prefgraph.algorithms.differentiable import validate_sarp
from prefgraph.algorithms.aei import compute_integrity_score
from prefgraph.algorithms.mpi import compute_confusion_metric, compute_houtman_maks_index
# Run required tests
garp_result = validate_consistency(log)
aei_result = compute_integrity_score(log)
mpi_result = compute_confusion_metric(log)
# Run optional tests
warp_result = None
if include_warp:
warp_result = check_warp(log)
sarp_result = None
if include_sarp:
sarp_result = validate_sarp(log)
# Houtman-Maks: always compute, even for consistent data.
# Heufer & Hjertstrand (2015), Section 2 (references/papers/md/
# HeuferHjertstrand2015_ConsistentSubsets.md):
# "The HM-index is the maximal fraction of non-zero elements
# in the binary vector v such that GARP(v) holds."
# For consistent data, compute_houtman_maks_index fast-exits with
# fraction=0.0, removed=[] - trivially correct. Never leave as None
# because callers (summary display, user code) expect this field
# to always be populated.
houtman_maks_result = compute_houtman_maks_index(log)
# Power analysis (optional, computationally expensive)
optimal_efficiency_result = None
if include_power:
from prefgraph.algorithms.power_analysis import compute_optimal_efficiency
optimal_efficiency_result = compute_optimal_efficiency(
log, n_simulations=200, n_efficiency_levels=10
)
# Compute rich stats from log and garp_result
def _array_stats(arr: "np.ndarray") -> dict[str, float]:
return {
"mean": float(np.mean(arr)),
"std": float(np.std(arr)),
"min": float(np.min(arr)),
"max": float(np.max(arr)),
}
price_stats = _array_stats(log.cost_vectors)
quantity_stats = _array_stats(log.action_vectors)
expenditure_stats = _array_stats(log.total_spend)
T = log.num_observations
T2 = T * T if T > 0 else 1
R = garp_result.direct_revealed_preference
P = garp_result.strict_revealed_preference
Rstar = garp_result.transitive_closure
r_density = float(np.sum(R)) / T2
p_density = float(np.sum(P)) / T2
r_star_density = float(np.sum(Rstar)) / T2
violation_pair_count = int(np.sum(Rstar & P.T))
end_time = time.perf_counter()
total_time_ms = (end_time - start_time) * 1000
return cls(
garp_result=garp_result,
warp_result=warp_result,
sarp_result=sarp_result,
aei_result=aei_result,
mpi_result=mpi_result,
houtman_maks_result=houtman_maks_result,
optimal_efficiency_result=optimal_efficiency_result,
num_observations=log.num_observations,
num_goods=log.num_goods,
computation_time_ms=total_time_ms,
price_stats=price_stats,
quantity_stats=quantity_stats,
expenditure_stats=expenditure_stats,
r_density=r_density,
p_density=p_density,
r_star_density=r_star_density,
violation_pair_count=violation_pair_count,
user_id=log.user_id,
)
@dataclass
class MenuChoiceSummary(ResultDisplayMixin):
"""Unified summary of menu-based choice analysis.
Attributes:
warp_result: WARP consistency result
sarp_result: SARP consistency result
congruence_result: Congruence (full rationalizability) result
efficiency_result: Houtman-Maks efficiency result
utility_result: Ordinal utility recovery result (optional)
num_observations: Number of choice observations
num_alternatives: Number of unique alternatives
computation_time_ms: Total computation time in milliseconds
"""
warp_result: Any # AbstractWARPResult
sarp_result: Any # AbstractSARPResult
congruence_result: Any # CongruenceResult
efficiency_result: Any # HoutmanMaksAbstractResult
utility_result: Any | None # OrdinalUtilityResult
num_observations: int
num_alternatives: int
computation_time_ms: float
menu_size_stats: dict[str, float] | None = field(default=None, repr=False)
choice_diversity: float | None = field(default=None, repr=False)
@property
def is_rationalizable(self) -> bool:
"""True if choices are fully rationalizable."""
return bool(self.congruence_result.is_rationalizable)
@property
def efficiency_score(self) -> float:
"""Houtman-Maks efficiency score."""
return float(self.efficiency_result.efficiency_index)
def score(self) -> float:
"""Return scikit-learn style score in [0, 1]."""
return self.efficiency_score
def summary(self) -> str:
"""Return formatted summary table (statsmodels-style)."""
m = ResultSummaryMixin
W = 70
sep = "-" * W
def _ind(passed: bool) -> str:
return "[+] PASS" if passed else "[-] FAIL"
def _time_str(ms: float) -> str:
return f"{ms:.2f} ms" if ms < 1000 else f"{ms / 1000:.2f} s"
lines: list[str] = []
# Two-column header
lines.append("=" * W)
lines.append(" " * ((W - 19) // 2) + "MENU CHOICE SUMMARY")
lines.append("=" * W)
lines.append(m._format_two_column_row(
"No. Observations", self.num_observations,
"WARP", _ind(self.warp_result.is_consistent), W,
))
lines.append(m._format_two_column_row(
"No. Alternatives", self.num_alternatives,
"SARP", _ind(self.sarp_result.is_consistent), W,
))
lines.append(m._format_two_column_row(
"Computation Time", _time_str(self.computation_time_ms),
"Congruence", _ind(self.congruence_result.is_rationalizable), W,
))
lines.append("=" * W)
# Input Data
if self.menu_size_stats is not None:
lines.append("")
lines.append("Input Data:")
lines.append(sep)
lines.append(m._format_descriptive_table({"Menu Size": self.menu_size_stats}, W))
lines.append("")
if self.choice_diversity is not None:
items_chosen = int(round(self.choice_diversity * self.num_alternatives))
lines.append(m._format_metric(
"Unique Items Chosen", f"{items_chosen} / {self.num_alternatives}", W - 4,
))
lines.append(m._format_metric("Choice Diversity", self.choice_diversity, W - 4))
# Consistency Tests with violation counts
lines.append("")
lines.append("Consistency Tests:")
lines.append(sep)
n_warp = self.warp_result.num_violations if hasattr(self.warp_result, 'num_violations') else 0
warp_detail = f" ({n_warp} violation{'s' if n_warp != 1 else ''})" if n_warp > 0 else ""
lines.append(m._format_metric("WARP", f"{_ind(self.warp_result.is_consistent)}{warp_detail}", W - 4))
n_sarp = self.sarp_result.num_violations if hasattr(self.sarp_result, 'num_violations') else 0
sarp_detail = f" ({n_sarp} cycle{'s' if n_sarp != 1 else ''})" if n_sarp > 0 else ""
lines.append(m._format_metric("SARP", f"{_ind(self.sarp_result.is_consistent)}{sarp_detail}", W - 4))
lines.append(m._format_metric("Congruence", _ind(self.congruence_result.is_rationalizable), W - 4))
# Goodness-of-Fit
lines.append("")
lines.append("Goodness-of-Fit:")
lines.append(sep)
lines.append(m._format_metric("Houtman-Maks Efficiency", self.efficiency_score, W - 4))
if hasattr(self.efficiency_result, 'removed_observations'):
n_removed = len(self.efficiency_result.removed_observations)
lines.append(m._format_metric(
" Observations removed", f"{n_removed} / {self.num_observations}", W - 4,
))
# Preference Order
if self.utility_result is not None and self.utility_result.success:
lines.append("")
lines.append("Recovered Preference Order:")
lines.append(sep)
if self.utility_result.preference_order:
order_str = " > ".join(str(i) for i in self.utility_result.preference_order[:10])
lines.append(f" {order_str}")
if len(self.utility_result.preference_order) > 10:
lines.append(f" ... ({len(self.utility_result.preference_order) - 10} more)")
# Interpretation
lines.append("")
lines.append("Interpretation:")
lines.append(sep)
if self.congruence_result.is_rationalizable:
lines.append(" Choices are fully rationalizable by a complete preference ordering.")
elif self.sarp_result.is_consistent:
lines.append(" Choices satisfy SARP but not Congruence (violates maximality).")
elif self.warp_result.is_consistent:
lines.append(" Choices satisfy WARP but not SARP (long preference cycles exist).")
else:
lines.append(" Choices violate WARP - direct preference reversals found.")
lines.append(f" Efficiency: {self.efficiency_score * 100:.1f}% of observations are consistent.")
lines.append("=" * W)
return "\n".join(lines)
def __repr__(self) -> str:
"""Compact string representation."""
indicator = "[+]" if self.is_rationalizable else "[-]"
return (
f"MenuChoiceSummary: {indicator} "
f"n={self.num_observations}, "
f"efficiency={self.efficiency_score:.4f}"
)
def __str__(self) -> str:
"""Return formatted summary table when printed."""
return self.summary()
@classmethod
def from_log(cls, log: MenuChoiceLog) -> "MenuChoiceSummary":
"""Create MenuChoiceSummary by running all tests on a MenuChoiceLog.
Args:
log: MenuChoiceLog containing the choice data
Returns:
MenuChoiceSummary instance with all test results
"""
start_time = time.perf_counter()
from prefgraph.algorithms.abstract_choice import (
validate_menu_warp,
validate_menu_sarp,
validate_menu_consistency,
compute_menu_efficiency,
fit_menu_preferences,
)
warp_result = validate_menu_warp(log)
sarp_result = validate_menu_sarp(log)
congruence_result = validate_menu_consistency(log)
efficiency_result = compute_menu_efficiency(log)
# Try to recover preferences
utility_result = None
if sarp_result.is_consistent:
utility_result = fit_menu_preferences(log)
# Input data stats
menu_sizes = np.array([len(m) for m in log.menus], dtype=np.float64)
menu_size_stats = {
"mean": float(np.mean(menu_sizes)), "std": float(np.std(menu_sizes)),
"min": float(np.min(menu_sizes)), "max": float(np.max(menu_sizes)),
}
items_chosen = len(set(log.choices))
n_alt = log.num_alternatives if log.num_alternatives > 0 else 1
choice_diversity = items_chosen / n_alt
end_time = time.perf_counter()
total_time_ms = (end_time - start_time) * 1000
return cls(
warp_result=warp_result,
sarp_result=sarp_result,
congruence_result=congruence_result,
efficiency_result=efficiency_result,
utility_result=utility_result,
num_observations=log.num_observations,
num_alternatives=log.num_alternatives,
computation_time_ms=total_time_ms,
menu_size_stats=menu_size_stats,
choice_diversity=choice_diversity,
)
@dataclass
class RiskChoiceSummary(ResultDisplayMixin):
"""Unified summary of risk choice analysis.
Provides a comprehensive overview of risk preferences analysis,
combining risk profile estimation with Expected Utility axiom tests.
Attributes:
risk_profile_result: Result of CRRA risk profile estimation
eu_axioms_satisfied: Whether Expected Utility axioms hold
eu_violations: List of EU axiom violations
num_observations: Number of choice observations
num_risk_seeking_choices: Choices where risky option with lower EV was chosen
num_risk_averse_choices: Choices where safe option with lower EV was chosen
computation_time_ms: Total computation time in milliseconds
Example:
>>> from prefgraph import RiskChoiceLog, RiskChoiceSummary
>>> log = RiskChoiceLog(safe_values, risky_outcomes, risky_probs, choices)
>>> summary = RiskChoiceSummary.from_log(log)
>>> print(summary.summary())
"""
risk_profile_result: "RiskProfileResult"
eu_axioms_satisfied: bool
eu_violations: list[str]
num_observations: int
num_risk_seeking_choices: int
num_risk_averse_choices: int
computation_time_ms: float
safe_value_stats: dict[str, float] | None = field(default=None, repr=False)
risky_ev_stats: dict[str, float] | None = field(default=None, repr=False)
@property
def risk_category(self) -> str:
"""Risk category: 'risk_seeking', 'risk_neutral', or 'risk_averse'."""
return str(self.risk_profile_result.risk_category)
@property
def risk_aversion_coefficient(self) -> float:
"""Arrow-Pratt coefficient of relative risk aversion (rho)."""
return float(self.risk_profile_result.risk_aversion_coefficient)
@property
def consistency_score(self) -> float:
"""Fraction of choices consistent with the estimated risk profile."""
return float(self.risk_profile_result.consistency_score)
def score(self) -> float:
"""Return scikit-learn style score in [0, 1]."""
return self.consistency_score
def summary(self) -> str:
"""Return formatted summary table (statsmodels-style)."""
m = ResultSummaryMixin
W = 70
sep = "-" * W
def _ind(passed: bool) -> str:
return "[+] PASS" if passed else "[-] FAIL"
def _time_str(ms: float) -> str:
return f"{ms:.2f} ms" if ms < 1000 else f"{ms / 1000:.2f} s"
lines: list[str] = []
# Two-column header
lines.append("=" * W)
lines.append(" " * ((W - 19) // 2) + "RISK CHOICE SUMMARY")
lines.append("=" * W)
cat = self.risk_category.replace("_", " ").title()
lines.append(m._format_two_column_row(
"No. Observations", self.num_observations,
"Risk Category", cat, W,
))
lines.append(m._format_two_column_row(
"Risk-Seeking Choices", self.num_risk_seeking_choices,
"Risk Aversion (rho)", f"{self.risk_aversion_coefficient:.4f}", W,
))
lines.append(m._format_two_column_row(
"Risk-Averse Choices", self.num_risk_averse_choices,
"Consistency", f"{self.consistency_score:.4f}", W,
))
lines.append(m._format_two_column_row(
"Computation Time", _time_str(self.computation_time_ms),
"EU Axioms", _ind(self.eu_axioms_satisfied), W,
))
lines.append("=" * W)
# Input Data
if self.safe_value_stats is not None and self.risky_ev_stats is not None:
lines.append("")
lines.append("Input Data:")
lines.append(sep)
lines.append(m._format_descriptive_table({
"Safe Values": self.safe_value_stats,
"Risky EV": self.risky_ev_stats,
}, W))
# Choice Distribution
lines.append("")
lines.append("Choice Distribution:")
lines.append(sep)
total = self.num_observations
if total > 0:
seek_pct = 100.0 * self.num_risk_seeking_choices / total
averse_pct = 100.0 * self.num_risk_averse_choices / total
neutral = total - self.num_risk_seeking_choices - self.num_risk_averse_choices
neutral_pct = 100.0 * neutral / total
lines.append(m._format_metric("Risk-Seeking", f"{self.num_risk_seeking_choices} ({seek_pct:.1f}%)", W - 4))
lines.append(m._format_metric("Risk-Averse", f"{self.num_risk_averse_choices} ({averse_pct:.1f}%)", W - 4))
lines.append(m._format_metric("Risk-Neutral", f"{neutral} ({neutral_pct:.1f}%)", W - 4))
# Risk Profile
lines.append("")
lines.append("Risk Profile (CRRA):")
lines.append(sep)
lines.append(m._format_metric("Risk Category", cat, W - 4))
lines.append(m._format_metric("Risk Aversion (rho)", self.risk_aversion_coefficient, W - 4))
lines.append(m._format_metric("Consistency Score", self.consistency_score, W - 4))
# EU Axioms
lines.append("")
lines.append("Expected Utility Axioms:")
lines.append(sep)
eu_str = "[+] SATISFIED" if self.eu_axioms_satisfied else "[-] VIOLATED"
lines.append(m._format_metric("Status", eu_str, W - 4))
if not self.eu_axioms_satisfied and self.eu_violations:
lines.append(m._format_metric(" Num. violations", len(self.eu_violations), W - 4))
for v in self.eu_violations[:3]:
lines.append(f" - {v}")
if len(self.eu_violations) > 3:
lines.append(f" ... and {len(self.eu_violations) - 3} more")
# Interpretation
lines.append("")
lines.append("Interpretation:")
lines.append(sep)
if self.risk_category == "risk_averse":
lines.append(" Decision-maker prefers certainty over gambles.")
rho = max(self.risk_aversion_coefficient, 0.1)
lines.append(f" Certainty premium: ~{(1 - 0.5 ** (1 / rho)) * 100:.0f}% less for certainty.")
elif self.risk_category == "risk_seeking":
lines.append(" Decision-maker prefers gambles over certainty.")
else:
lines.append(" Decision-maker approximately maximizes expected value.")
lines.append(f" Model fit: {self.consistency_score * 100:.1f}% of choices consistent with CRRA profile.")
lines.append("=" * W)
return "\n".join(lines)
def __repr__(self) -> str:
"""Compact string representation."""
indicator = "[+]" if self.eu_axioms_satisfied else "[-]"
return (
f"RiskChoiceSummary: {indicator} "
f"{self.risk_category}, "
f"rho={self.risk_aversion_coefficient:.2f}, "
f"consistency={self.consistency_score:.2f}"
)
def __str__(self) -> str:
"""Return formatted summary table when printed."""
return self.summary()
def to_dict(self) -> dict[str, Any]:
"""Return dictionary representation for serialization."""
return {
"risk_category": self.risk_category,
"risk_aversion_coefficient": self.risk_aversion_coefficient,
"consistency_score": self.consistency_score,
"eu_axioms_satisfied": self.eu_axioms_satisfied,
"num_eu_violations": len(self.eu_violations),
"num_observations": self.num_observations,
"num_risk_seeking_choices": self.num_risk_seeking_choices,
"num_risk_averse_choices": self.num_risk_averse_choices,
"computation_time_ms": self.computation_time_ms,
"score": self.score(),
}
@classmethod
def from_log(cls, log: "RiskChoiceLog") -> "RiskChoiceSummary":
"""Create RiskChoiceSummary by running all tests on a RiskChoiceLog.
Args:
log: RiskChoiceLog containing the risk choice data
Returns:
RiskChoiceSummary instance with all test results
"""
start_time = time.perf_counter()
from prefgraph.algorithms.risk import (
compute_risk_profile,
check_expected_utility_axioms,
)
# Run risk profile analysis
risk_profile_result = compute_risk_profile(log)
# Check EU axioms
eu_satisfied, eu_violations = check_expected_utility_axioms(log)
# Input data stats
def _arr_stats(arr: np.ndarray) -> dict[str, float]:
return {"mean": float(np.mean(arr)), "std": float(np.std(arr)),
"min": float(np.min(arr)), "max": float(np.max(arr))}
safe_value_stats = _arr_stats(log.safe_values)
risky_evs = np.sum(log.risky_outcomes * log.risky_probabilities, axis=1)
risky_ev_stats = _arr_stats(risky_evs)
end_time = time.perf_counter()
total_time_ms = (end_time - start_time) * 1000
return cls(
risk_profile_result=risk_profile_result,
eu_axioms_satisfied=eu_satisfied,
eu_violations=eu_violations,
num_observations=log.num_observations,
num_risk_seeking_choices=log.num_risk_seeking_choices,
num_risk_averse_choices=log.num_risk_averse_choices,
computation_time_ms=total_time_ms,
safe_value_stats=safe_value_stats,
risky_ev_stats=risky_ev_stats,
)
@dataclass
class StochasticChoiceSummary(ResultDisplayMixin):
"""Unified summary of stochastic choice analysis.
Provides a comprehensive overview of probabilistic choice analysis,
combining RUM consistency tests, regularity tests, and model fitting.
Attributes:
rum_result: RUM consistency test result
regularity_result: Regularity (Luce axiom) test result
transitivity_result: Stochastic transitivity test result
iia_satisfied: Whether Independence of Irrelevant Alternatives holds
model_result: Fitted stochastic choice model (if consistent)
num_menus: Number of distinct menus
num_items: Number of unique items
total_observations: Total number of choice observations
computation_time_ms: Total computation time in milliseconds
Example:
>>> from prefgraph import StochasticChoiceLog, StochasticChoiceSummary
>>> log = StochasticChoiceLog(menus, choice_frequencies)
>>> summary = StochasticChoiceSummary.from_log(log)
>>> print(summary.summary())
"""
rum_result: "RUMConsistencyResult"
regularity_result: "RegularityResult"
transitivity_result: "StochasticTransitivityResult"
iia_satisfied: bool
model_result: "StochasticChoiceResult | None"
num_menus: int
num_items: int
total_observations: int
computation_time_ms: float
menu_size_stats: dict[str, float] | None = field(default=None, repr=False)
obs_per_menu_stats: dict[str, float] | None = field(default=None, repr=False)
mean_choice_entropy: float | None = field(default=None, repr=False)
@property
def is_rum_consistent(self) -> bool:
"""True if data is consistent with a Random Utility Model."""
return bool(self.rum_result.is_rum_consistent)
@property
def satisfies_regularity(self) -> bool:
"""True if regularity (Luce axiom) is satisfied."""
return bool(self.regularity_result.satisfies_regularity)
@property
def strongest_transitivity(self) -> str:
"""Strongest stochastic transitivity level satisfied (WST/MST/SST/None)."""
return str(self.transitivity_result.strongest_satisfied)
def score(self) -> float:
"""Return scikit-learn style score in [0, 1]."""
return float(self.rum_result.score())
def summary(self) -> str:
"""Return formatted summary table (statsmodels-style)."""
m = ResultSummaryMixin
W = 70
sep = "-" * W
def _ind(passed: bool) -> str:
return "[+] PASS" if passed else "[-] FAIL"
def _time_str(ms: float) -> str:
return f"{ms:.2f} ms" if ms < 1000 else f"{ms / 1000:.2f} s"
lines: list[str] = []
# Two-column header
lines.append("=" * W)
lines.append(" " * ((W - 25) // 2) + "STOCHASTIC CHOICE SUMMARY")
lines.append("=" * W)
lines.append(m._format_two_column_row(
"No. Menus", self.num_menus,
"RUM Consistency", _ind(self.is_rum_consistent), W,
))
lines.append(m._format_two_column_row(
"Unique Items", self.num_items,
"Regularity", _ind(self.satisfies_regularity), W,
))
lines.append(m._format_two_column_row(
"Total Observations", self.total_observations,
"IIA", _ind(self.iia_satisfied), W,
))
lines.append(m._format_two_column_row(
"Computation Time", _time_str(self.computation_time_ms),
"Transitivity", self.strongest_transitivity, W,
))
lines.append("=" * W)
# Input Data
if self.menu_size_stats is not None:
lines.append("")
lines.append("Input Data:")
lines.append(sep)
stats_rows: dict[str, dict[str, float]] = {"Menu Size": self.menu_size_stats}
if self.obs_per_menu_stats is not None:
stats_rows["Obs per Menu"] = self.obs_per_menu_stats
lines.append(m._format_descriptive_table(stats_rows, W))
if self.mean_choice_entropy is not None:
lines.append("")
lines.append(m._format_metric("Mean Choice Entropy", f"{self.mean_choice_entropy:.4f}", W - 4))
# Consistency Tests
lines.append("")
lines.append("Consistency Tests:")
lines.append(sep)
lines.append(m._format_metric("RUM Consistency", _ind(self.is_rum_consistent), W - 4))
if hasattr(self.rum_result, 'distance_to_rum'):
lines.append(m._format_metric(" Distance to nearest RUM", self.rum_result.distance_to_rum, W - 4))
lines.append(m._format_metric("Regularity (Luce)", _ind(self.satisfies_regularity), W - 4))
if hasattr(self.regularity_result, 'num_violations'):
n_reg = self.regularity_result.num_violations
if n_reg > 0:
lines.append(m._format_metric(" Regularity violations", n_reg, W - 4))
lines.append(m._format_metric("IIA", _ind(self.iia_satisfied), W - 4))
# Stochastic Transitivity
lines.append("")
lines.append("Stochastic Transitivity:")
lines.append(sep)
tr = self.transitivity_result
lines.append(m._format_metric("Weak (WST)", _ind(tr.satisfies_wst), W - 4))
lines.append(m._format_metric("Moderate (MST)", _ind(tr.satisfies_mst), W - 4))
lines.append(m._format_metric("Strong (SST)", _ind(tr.satisfies_sst), W - 4))
if hasattr(tr, 'num_triples_tested') and tr.num_triples_tested:
lines.append(m._format_metric(" Triples tested", tr.num_triples_tested, W - 4))
# Model Fit
if self.model_result is not None:
lines.append("")
lines.append("Model Fit:")
lines.append(sep)
lines.append(m._format_metric("Model Type", self.model_result.model_type, W - 4))
lines.append(m._format_metric("Log-Likelihood", self.model_result.log_likelihood, W - 4))
lines.append(m._format_metric("AIC", self.model_result.aic, W - 4))
lines.append(m._format_metric("BIC", self.model_result.bic, W - 4))
# Interpretation
lines.append("")
lines.append("Interpretation:")
lines.append(sep)
if self.is_rum_consistent:
lines.append(" Choices can be rationalized by a random utility model.")
lines.append(f" Strongest transitivity satisfied: {self.strongest_transitivity}")
else:
lines.append(" Choices cannot be explained by any random utility model.")
if hasattr(self.rum_result, 'distance_to_rum'):
lines.append(f" Distance to nearest RUM: {self.rum_result.distance_to_rum:.4f}")
lines.append("=" * W)
return "\n".join(lines)
def __repr__(self) -> str:
"""Compact string representation."""
indicator = "[+]" if self.is_rum_consistent else "[-]"
return (
f"StochasticChoiceSummary: {indicator} "
f"menus={self.num_menus}, "
f"RUM={'consistent' if self.is_rum_consistent else 'inconsistent'}, "
f"transitivity={self.strongest_transitivity}"
)
def __str__(self) -> str:
"""Return formatted summary table when printed."""
return self.summary()
def to_dict(self) -> dict[str, Any]:
"""Return dictionary representation for serialization."""
result = {
"is_rum_consistent": self.is_rum_consistent,
"satisfies_regularity": self.satisfies_regularity,
"iia_satisfied": self.iia_satisfied,
"strongest_transitivity": self.strongest_transitivity,
"distance_to_rum": self.rum_result.distance_to_rum,
"num_menus": self.num_menus,
"num_items": self.num_items,
"total_observations": self.total_observations,
"computation_time_ms": self.computation_time_ms,
"score": self.score(),
}
if self.model_result is not None:
result["model_type"] = self.model_result.model_type
result["log_likelihood"] = self.model_result.log_likelihood
result["aic"] = self.model_result.aic
result["bic"] = self.model_result.bic
return result
@classmethod
def from_log(cls, log: "StochasticChoiceLog") -> "StochasticChoiceSummary":
"""Create StochasticChoiceSummary by running all tests on a StochasticChoiceLog.
Args:
log: StochasticChoiceLog containing the stochastic choice data
Returns:
StochasticChoiceSummary instance with all test results
"""
start_time = time.perf_counter()
from prefgraph.algorithms.stochastic import (
test_rum_consistency,
test_regularity,
test_stochastic_transitivity,
check_independence_irrelevant_alternatives,
fit_random_utility_model,
)
# Run all tests
rum_result = test_rum_consistency(log)
regularity_result = test_regularity(log)
transitivity_result = test_stochastic_transitivity(log)
iia_satisfied = check_independence_irrelevant_alternatives(log)
# Fit model if consistent
model_result = None
if rum_result.is_rum_consistent:
model_result = fit_random_utility_model(log)
# Calculate total observations
obs_per_menu = log.total_observations_per_menu or []
total_observations = sum(obs_per_menu)
# Input data stats
menu_sizes = np.array([len(m) for m in log.menus], dtype=np.float64)
menu_size_stats = {
"mean": float(np.mean(menu_sizes)), "std": float(np.std(menu_sizes)),
"min": float(np.min(menu_sizes)), "max": float(np.max(menu_sizes)),
}
obs_arr = np.array(obs_per_menu, dtype=np.float64) if obs_per_menu else np.array([0.0])
obs_per_menu_stats = {
"mean": float(np.mean(obs_arr)), "std": float(np.std(obs_arr)),
"min": float(np.min(obs_arr)), "max": float(np.max(obs_arr)),
}
entropies = []
for freq in log.choice_frequencies:
total = sum(freq.values())
if total > 0:
probs = [c / total for c in freq.values() if c > 0]
entropies.append(-sum(p * np.log2(p) for p in probs))
mean_choice_entropy = float(np.mean(entropies)) if entropies else 0.0
end_time = time.perf_counter()
total_time_ms = (end_time - start_time) * 1000
return cls(
rum_result=rum_result,
regularity_result=regularity_result,
transitivity_result=transitivity_result,
iia_satisfied=iia_satisfied,
model_result=model_result,
num_menus=log.num_menus,
num_items=log.num_items,
total_observations=total_observations,
computation_time_ms=total_time_ms,
menu_size_stats=menu_size_stats,
obs_per_menu_stats=obs_per_menu_stats,
mean_choice_entropy=mean_choice_entropy,
)
@dataclass
class ProductionSummary(ResultDisplayMixin):
"""Unified summary of production/firm behavior analysis.
Provides a comprehensive overview of production efficiency analysis,
combining profit maximization tests, cost minimization checks, and
efficiency metrics.
Attributes:
profit_max_result: Profit maximization test result
cost_min_result: Cost minimization test result
returns_to_scale: Returns to scale classification
technical_efficiency: Overall technical efficiency score
cost_efficiency: Cost efficiency score
profit_efficiency: Profit efficiency score
num_observations: Number of production observations
num_inputs: Number of inputs
num_outputs: Number of outputs
computation_time_ms: Total computation time in milliseconds
Example:
>>> from prefgraph import ProductionLog, ProductionSummary
>>> log = ProductionLog(input_prices, input_quantities, output_prices, output_quantities)
>>> summary = ProductionSummary.from_log(log)
>>> print(summary.summary())
"""
profit_max_result: "ProductionGARPResult"
cost_min_result: dict[str, Any]
returns_to_scale: str
technical_efficiency: float
cost_efficiency: float
profit_efficiency: float
num_observations: int
num_inputs: int
num_outputs: int
computation_time_ms: float
input_price_stats: dict[str, float] | None = field(default=None, repr=False)
output_price_stats: dict[str, float] | None = field(default=None, repr=False)
profit_stats: dict[str, float] | None = field(default=None, repr=False)
@property
def is_profit_maximizing(self) -> bool:
"""True if firm behavior is consistent with profit maximization."""
return bool(self.profit_max_result.is_profit_maximizing)
@property
def is_cost_minimizing(self) -> bool:
"""True if firm behavior is consistent with cost minimization."""
return bool(self.cost_min_result.get("is_cost_minimizing", False))
def score(self) -> float:
"""Return scikit-learn style score in [0, 1]."""
return self.profit_efficiency
def summary(self) -> str:
"""Return formatted summary table (statsmodels-style)."""
m = ResultSummaryMixin
W = 70
sep = "-" * W
def _ind(passed: bool) -> str:
return "[+] PASS" if passed else "[-] FAIL"
def _time_str(ms: float) -> str:
return f"{ms:.2f} ms" if ms < 1000 else f"{ms / 1000:.2f} s"
lines: list[str] = []
# Two-column header
lines.append("=" * W)
lines.append(" " * ((W - 18) // 2) + "PRODUCTION SUMMARY")
lines.append("=" * W)
lines.append(m._format_two_column_row(
"No. Observations", self.num_observations,
"Profit Max", _ind(self.is_profit_maximizing), W,
))
lines.append(m._format_two_column_row(
"No. Inputs", self.num_inputs,
"Cost Min", _ind(self.is_cost_minimizing), W,
))
lines.append(m._format_two_column_row(
"No. Outputs", self.num_outputs,
"Returns to Scale", self.returns_to_scale.title(), W,
))
lines.append(m._format_two_column_row(
"Computation Time", _time_str(self.computation_time_ms),
"Profit Efficiency", f"{self.profit_efficiency:.4f}", W,
))
lines.append("=" * W)
# Input Data
if self.input_price_stats is not None:
lines.append("")
lines.append("Input Data:")
lines.append(sep)
stats_rows: dict[str, dict[str, float]] = {
"Input Prices": self.input_price_stats,
}
if self.output_price_stats is not None:
stats_rows["Output Prices"] = self.output_price_stats
if self.profit_stats is not None:
stats_rows["Profit"] = self.profit_stats
lines.append(m._format_descriptive_table(stats_rows, W))
# Consistency Tests
lines.append("")
lines.append("Consistency Tests:")
lines.append(sep)
pm_detail = ""
if not self.is_profit_maximizing:
pm_detail = f" ({self.profit_max_result.num_violations} violations)"
lines.append(m._format_metric("Profit Maximization", f"{_ind(self.is_profit_maximizing)}{pm_detail}", W - 4))
lines.append(m._format_metric("Cost Minimization", _ind(self.is_cost_minimizing), W - 4))
lines.append(m._format_metric("Returns to Scale", self.returns_to_scale.title(), W - 4))
# Efficiency Metrics
lines.append("")
lines.append("Efficiency Metrics:")
lines.append(sep)
lines.append(m._format_metric("Technical Efficiency", self.technical_efficiency, W - 4))
lines.append(m._format_metric("Cost Efficiency", self.cost_efficiency, W - 4))
lines.append(m._format_metric("Profit Efficiency", self.profit_efficiency, W - 4))
# Per-input efficiency
if hasattr(self.profit_max_result, 'input_efficiency_vector'):
input_eff = self.profit_max_result.input_efficiency_vector
if len(input_eff) > 0:
lines.append("")
lines.append("Per-Input Efficiency:")
lines.append(sep)
for i, eff in enumerate(input_eff[:5]):
lines.append(m._format_metric(f"Input {i}", eff, W - 4))
if len(input_eff) > 5:
lines.append(f" ... ({len(input_eff) - 5} more inputs)")
# Interpretation
lines.append("")
lines.append("Interpretation:")
lines.append(sep)
if self.is_profit_maximizing:
lines.append(" Firm behavior is consistent with profit maximization.")
else:
lines.append(f" Found {self.profit_max_result.num_violations} profit maximization violation(s).")
lines.append(f" Returns to scale: {self.returns_to_scale}.")
lines.append(f" Operating at {self.profit_efficiency * 100:.1f}% of optimal profit efficiency.")
lines.append("=" * W)
return "\n".join(lines)
def __repr__(self) -> str:
"""Compact string representation."""
indicator = "[+]" if self.is_profit_maximizing else "[-]"
return (
f"ProductionSummary: {indicator} "
f"n={self.num_observations}, "
f"RTS={self.returns_to_scale}, "
f"profit_eff={self.profit_efficiency:.2f}"
)
def __str__(self) -> str:
"""Return formatted summary table when printed."""
return self.summary()
def to_dict(self) -> dict[str, Any]:
"""Return dictionary representation for serialization."""
return {
"is_profit_maximizing": self.is_profit_maximizing,
"is_cost_minimizing": self.is_cost_minimizing,
"returns_to_scale": self.returns_to_scale,
"technical_efficiency": self.technical_efficiency,
"cost_efficiency": self.cost_efficiency,
"profit_efficiency": self.profit_efficiency,
"num_violations": self.profit_max_result.num_violations,
"num_observations": self.num_observations,
"num_inputs": self.num_inputs,
"num_outputs": self.num_outputs,
"computation_time_ms": self.computation_time_ms,
"score": self.score(),
}
@classmethod
def from_log(cls, log: "ProductionLog") -> "ProductionSummary":
"""Create ProductionSummary by running all tests on a ProductionLog.
Args:
log: ProductionLog containing the production data
Returns:
ProductionSummary instance with all test results
"""
start_time = time.perf_counter()
from prefgraph.algorithms.production import (
test_profit_maximization,
check_cost_minimization,
)
# Run all tests
profit_max_result = test_profit_maximization(log)
cost_min_result = check_cost_minimization(log)
# Input data stats
def _arr_stats(arr: np.ndarray) -> dict[str, float]:
return {"mean": float(np.mean(arr)), "std": float(np.std(arr)),
"min": float(np.min(arr)), "max": float(np.max(arr))}
input_price_stats = _arr_stats(log.input_prices)
output_price_stats = _arr_stats(log.output_prices)
profit_stats = _arr_stats(log.profit)
end_time = time.perf_counter()
total_time_ms = (end_time - start_time) * 1000
return cls(
profit_max_result=profit_max_result,
cost_min_result=cost_min_result,
returns_to_scale=profit_max_result.returns_to_scale,
technical_efficiency=profit_max_result.technical_efficiency,
cost_efficiency=profit_max_result.cost_efficiency_score,
profit_efficiency=profit_max_result.profit_efficiency,
num_observations=log.num_observations,
num_inputs=log.num_inputs,
num_outputs=log.num_outputs,
computation_time_ms=total_time_ms,
input_price_stats=input_price_stats,
output_price_stats=output_price_stats,
profit_stats=profit_stats,
)
[docs]
@dataclass
class PanelSummary(ResultDisplayMixin):
"""Aggregate summary for multi-user panel analysis.
Combines per-user BehavioralSummary results into aggregate statistics:
consistency rates, efficiency distributions, and identification of
the most inconsistent users.
Example:
>>> from prefgraph import BehaviorPanel
>>> panel = BehaviorPanel.from_logs(logs)
>>> ps = panel.summary()
>>> print(ps)
"""
user_summaries: dict[str, "BehavioralSummary"] = field(repr=False)
num_users: int
total_observations: int
num_goods: int
obs_per_user_stats: dict[str, float]
garp_pass_rate: float
warp_pass_rate: float | None
sarp_pass_rate: float | None
aei_distribution: dict[str, float]
mpi_distribution: dict[str, float]
hm_distribution: dict[str, float] | None
top_inconsistent: list[tuple[str, float, float, int]] # (uid, aei, mpi, T)
computation_time_ms: float
# Period-level breakdown (None if no period structure)
period_stats: list[dict[str, Any]] | None = field(default=None, repr=False)
num_periods: int = field(default=0, repr=False)
[docs]
@classmethod
def from_summaries(
cls,
user_summaries: dict[str, "BehavioralSummary"],
period_map: dict[str, tuple[str, str]] | None = None,
) -> "PanelSummary":
"""Build PanelSummary from per-user BehavioralSummary results."""
if not user_summaries:
raise ValueError("Cannot create PanelSummary from empty dict")
n = len(user_summaries)
summaries = list(user_summaries.values())
# Obs per entry
obs_counts = np.array([s.num_observations for s in summaries], dtype=np.float64)
aei_vals = np.array([s.efficiency_index for s in summaries], dtype=np.float64)
mpi_vals = np.array([s.mpi_value for s in summaries], dtype=np.float64)
def _dist(arr: np.ndarray) -> dict[str, float]:
return {
"mean": float(np.mean(arr)),
"std": float(np.std(arr)),
"min": float(np.min(arr)),
"25%": float(np.percentile(arr, 25)),
"50%": float(np.percentile(arr, 50)),
"75%": float(np.percentile(arr, 75)),
"max": float(np.max(arr)),
}
# Count unique users (period_map may have multiple entries per user)
if period_map is not None:
unique_users = set(u for u, _ in period_map.values())
num_users = len(unique_users)
else:
num_users = n
# GARP pass rate
garp_pass = sum(1 for s in summaries if s.is_consistent)
# WARP/SARP pass rates (if computed)
warp_pass_rate = None
if summaries[0].warp_result is not None:
warp_pass = sum(1 for s in summaries if s.warp_result and s.warp_result.is_consistent)
warp_pass_rate = warp_pass / n
sarp_pass_rate = None
if summaries[0].sarp_result is not None:
sarp_pass = sum(1 for s in summaries if s.sarp_result and s.sarp_result.is_consistent)
sarp_pass_rate = sarp_pass / n
# Houtman-Maks distribution
hm_vals = []
for s in summaries:
if s.houtman_maks_result is not None:
hm_vals.append(1.0 - s.houtman_maks_result.fraction)
else:
hm_vals.append(1.0)
hm_arr = np.array(hm_vals, dtype=np.float64)
hm_distribution = _dist(hm_arr)
# Top inconsistent entries (sorted by AEI ascending)
user_list = list(user_summaries.items())
user_list.sort(key=lambda x: x[1].efficiency_index)
top_inconsistent = [
(uid, s.efficiency_index, s.mpi_value, s.num_observations)
for uid, s in user_list[:5]
]
# Total computation time
total_time = sum(s.computation_time_ms for s in summaries)
# Num goods (from first entry)
num_goods = summaries[0].num_goods
# Period-level breakdown
period_stats = None
num_periods = 0
if period_map is not None:
periods_set = sorted(set(p for _, p in period_map.values()))
num_periods = len(periods_set)
period_stats = []
for period in periods_set:
period_keys = [k for k, (_, p) in period_map.items() if p == period]
period_summaries = [user_summaries[k] for k in period_keys if k in user_summaries]
if not period_summaries:
continue
n_p = len(period_summaries)
p_garp = sum(1 for s in period_summaries if s.is_consistent)
p_aei = np.mean([s.efficiency_index for s in period_summaries])
p_mpi = np.mean([s.mpi_value for s in period_summaries])
period_stats.append({
"period": period,
"users": n_p,
"garp_pass_rate": p_garp / n_p,
"mean_aei": float(p_aei),
"mean_mpi": float(p_mpi),
})
return cls(
user_summaries=user_summaries,
num_users=num_users,
total_observations=int(np.sum(obs_counts)),
num_goods=num_goods,
obs_per_user_stats=_dist(obs_counts),
garp_pass_rate=garp_pass / n,
warp_pass_rate=warp_pass_rate,
sarp_pass_rate=sarp_pass_rate,
aei_distribution=_dist(aei_vals),
mpi_distribution=_dist(mpi_vals),
hm_distribution=hm_distribution,
top_inconsistent=top_inconsistent,
computation_time_ms=total_time,
period_stats=period_stats,
num_periods=num_periods,
)
[docs]
def summary(self) -> str:
"""Return formatted panel summary (statsmodels-style)."""
m = ResultSummaryMixin
W = 70
sep = "-" * W
def _time_str(ms: float) -> str:
return f"{ms:.2f} ms" if ms < 1000 else f"{ms / 1000:.2f} s"
lines: list[str] = []
# Two-column header
lines.append("=" * W)
lines.append(" " * ((W - 13) // 2) + "PANEL SUMMARY")
lines.append("=" * W)
n_garp = int(round(self.garp_pass_rate * self.num_users))
lines.append(m._format_two_column_row(
"No. Users", f"{self.num_users:,}",
"GARP Pass Rate", f"{self.garp_pass_rate * 100:.1f}%", W,
))
lines.append(m._format_two_column_row(
"Total Observations", f"{self.total_observations:,}",
"Mean AEI", f"{self.aei_distribution['mean']:.4f}", W,
))
lines.append(m._format_two_column_row(
"No. Goods", self.num_goods,
"Mean MPI", f"{self.mpi_distribution['mean']:.4f}", W,
))
lines.append(m._format_two_column_row(
"Obs/User (mean)", f"{self.obs_per_user_stats['mean']:.1f}",
"Computation Time", _time_str(self.computation_time_ms), W,
))
if self.num_periods > 0:
lines.append(m._format_two_column_row(
"No. Periods", self.num_periods,
"Entries (Users x Periods)", len(self.user_summaries), W,
))
lines.append("=" * W)
# Consistency Rates
lines.append("")
lines.append("Consistency Rates:")
lines.append(sep)
n_garp = int(round(self.garp_pass_rate * self.num_users))
lines.append(m._format_metric(
"GARP", f"{self.garp_pass_rate * 100:.1f}% ({n_garp:,} / {self.num_users:,})", W - 4,
))
if self.warp_pass_rate is not None:
n_warp = int(round(self.warp_pass_rate * self.num_users))
lines.append(m._format_metric(
"WARP", f"{self.warp_pass_rate * 100:.1f}% ({n_warp:,} / {self.num_users:,})", W - 4,
))
if self.sarp_pass_rate is not None:
n_sarp = int(round(self.sarp_pass_rate * self.num_users))
lines.append(m._format_metric(
"SARP", f"{self.sarp_pass_rate * 100:.1f}% ({n_sarp:,} / {self.num_users:,})", W - 4,
))
# Efficiency Distribution
lines.append("")
lines.append("Efficiency Distribution:")
lines.append(sep)
dist_rows = {
"AEI": self.aei_distribution,
"MPI": self.mpi_distribution,
}
if self.hm_distribution is not None:
dist_rows["HM Index"] = self.hm_distribution
lines.append(m._format_distribution_table(dist_rows, W))
# Most Inconsistent Users
if self.top_inconsistent:
lines.append("")
lines.append("Most Inconsistent (Bottom 5):")
lines.append(sep)
for i, (uid, aei, mpi, t) in enumerate(self.top_inconsistent):
lines.append(m._format_metric(
f" {i+1}. {uid}", f"AEI={aei:.3f}, MPI={mpi:.3f}, T={t}", W - 4,
))
# Temporal Breakdown (if period data available)
if self.period_stats:
lines.append("")
lines.append("Temporal Breakdown:")
lines.append(sep)
hdr = f" {'Period':<12s} {'Users':>6s} {'GARP Pass':>10s} {'Mean AEI':>10s} {'Mean MPI':>10s}"
lines.append(hdr)
for ps in self.period_stats:
garp_pct = f"{ps['garp_pass_rate'] * 100:.1f}%"
lines.append(
f" {ps['period']:<12s} {ps['users']:>6d} {garp_pct:>10s}"
f" {ps['mean_aei']:>10.4f} {ps['mean_mpi']:>10.4f}"
)
lines.append("=" * W)
return "\n".join(lines)
[docs]
def score(self) -> float:
"""Aggregate score: mean AEI across all users."""
return self.aei_distribution["mean"]
def __repr__(self) -> str:
return (
f"PanelSummary(users={self.num_users}, "
f"garp_pass={self.garp_pass_rate * 100:.1f}%, "
f"mean_aei={self.aei_distribution['mean']:.4f})"
)
def __str__(self) -> str:
return self.summary()
[docs]
def to_dict(self) -> dict[str, Any]:
"""Return dictionary representation for serialization."""
return {
"num_users": self.num_users,
"total_observations": self.total_observations,
"num_goods": self.num_goods,
"garp_pass_rate": self.garp_pass_rate,
"warp_pass_rate": self.warp_pass_rate,
"sarp_pass_rate": self.sarp_pass_rate,
"aei_distribution": self.aei_distribution,
"mpi_distribution": self.mpi_distribution,
"hm_distribution": self.hm_distribution,
"computation_time_ms": self.computation_time_ms,
"score": self.score(),
}