"""Production theory and firm behavior analysis.
Tests profit maximization and cost minimization consistency for firm data.
Based on Chapter 15 of Chambers & Echenique (2016) "Revealed Preference Theory".
The production analogue of GARP tests whether observed input-output choices
are consistent with profit maximization.
Tech-Friendly Names (Primary):
- test_profit_maximization(): Test production GARP
- check_cost_minimization(): Test cost minimization consistency
- estimate_returns_to_scale(): Estimate returns to scale
Economics Names (Legacy Aliases):
- check_production_garp() -> test_profit_maximization()
"""
from __future__ import annotations
import time
from typing import TYPE_CHECKING
import numpy as np
from numpy.typing import NDArray
from prefgraph.core.result import ProductionGARPResult
from prefgraph.core.types import Cycle
from prefgraph.graph.transitive_closure import floyd_warshall_transitive_closure
if TYPE_CHECKING:
from prefgraph.core.session import ProductionLog
[docs]
def test_profit_maximization(
log: "ProductionLog",
tolerance: float = 1e-6,
) -> ProductionGARPResult:
"""
Test if firm behavior is consistent with profit maximization.
Production GARP: For observations i and j, if firm at i could have
achieved the output of j at the prices of i, and the profit would
have been at least as high, then the reverse should not also hold.
The test constructs a revealed preferred-at-least-as-profitable
relation and checks for cycles.
Args:
log: ProductionLog with input/output prices and quantities
tolerance: Numerical tolerance
Returns:
ProductionGARPResult with profit maximization analysis
Example:
>>> from prefgraph import ProductionLog, test_profit_maximization
>>> result = test_profit_maximization(firm_data)
>>> if result.is_profit_maximizing:
... print("Firm behavior is profit-maximizing consistent")
>>> else:
... print(f"Found {result.num_violations} violations")
References:
Chambers & Echenique (2016), Chapter 15
Varian, H.R. (1984). "The Nonparametric Approach to Production Analysis"
"""
start_time = time.perf_counter()
T = log.num_observations
# --------------------------------------------------------------------------
# Production GARP - Varian (1984) "The Nonparametric Approach to Production
# Analysis", Econometrica 52(3), 579-597.
# Chambers & Echenique (2016) "Revealed Preference Theory", Chapter 15.
#
# For a profit-maximizing firm observed at T periods with input prices w_t,
# output prices p_t, input quantities x_t, and output quantities y_t:
#
# Actual profit at t: pi_t = p_t · y_t - w_t · x_t
# Counterfactual profit: cf_t(s) = p_t · y_s - w_t · x_s
# (profit firm t would earn using firm s's input-output bundle at t's prices)
#
# Revealed profitability relations:
# R[t,s]: pi_t >= cf_t(s) (weak: t's bundle at least as profitable as s's)
# P[t,s]: pi_t > cf_t(s) (strict: t's bundle strictly more profitable)
#
# Production GARP violation (Def 2.1 in Smeulders et al. 2014, applied to
# production data per Varian 1984):
# R_star[i,j] AND P[j,i]
# i.e., i transitively reveals j's bundle to be at most as profitable,
# yet at j's prices, j's own bundle is strictly more profitable than i's.
#
# The old code checked R_star[i,j] AND R_star[j,i] (cycle check only).
# That misses violations where i R* j and j P i without a reverse R* arc.
# The Rust implementation (rpt-core/src/garp.rs) uses the correct condition.
# --------------------------------------------------------------------------
# Vectorized computation of actual profits: pi_t = p_t · y_t - w_t · x_t
actual_profits = log.total_revenue - log.total_cost # Shape: (T,)
# Counterfactual profit matrix: cf[i,j] = p_i · y_j - w_i · x_j
counterfactual_revenue = log.output_prices @ log.output_quantities.T # (T, T)
counterfactual_cost = log.input_prices @ log.input_quantities.T # (T, T)
counterfactual_profit = counterfactual_revenue - counterfactual_cost
# Weak revealed profitability: R[i,j] = True iff pi_i >= cf[i,j]
# "At i's prices, i's bundle is at least as profitable as j's bundle."
R = actual_profits[:, np.newaxis] >= counterfactual_profit - tolerance
np.fill_diagonal(R, False)
# Strict revealed profitability: P[i,j] = True iff pi_i > cf[i,j]
# "At i's prices, i's bundle is strictly more profitable than j's."
P = actual_profits[:, np.newaxis] > counterfactual_profit + tolerance
np.fill_diagonal(P, False)
# Transitive closure via Floyd-Warshall: R_star[i,j] = True iff there
# exists a chain i R i_1 R i_2 R ... R j.
R_star = floyd_warshall_transitive_closure(R)
# Production GARP violation: R_star[i,j] AND P[j,i].
# "i transitively reveals j to be at most as profitable, yet j strictly
# prefers its own bundle over i's at j's prices."
# This matches the Rust implementation (rpt-core/src/garp.rs lines 58-77).
violations: list[Cycle] = []
seen: set[tuple[int, int]] = set()
for i in range(T):
for j in range(T):
if i != j and R_star[i, j] and P[j, i]:
pair = (i, j)
if pair not in seen:
seen.add(pair)
violations.append(pair)
is_profit_maximizing = len(violations) == 0
# Compute cost efficiency score
cost_efficiency = _compute_cost_efficiency(log, tolerance)
# Estimate returns to scale
returns_to_scale = estimate_returns_to_scale(log)
# Compute overall profit efficiency
profit_efficiency = _compute_profit_efficiency(log, tolerance)
# Per-input and per-output efficiency
input_efficiency = _compute_input_efficiency(log, tolerance)
output_efficiency = _compute_output_efficiency(log, tolerance)
# Technical efficiency
technical_efficiency = _compute_technical_efficiency(log)
computation_time = (time.perf_counter() - start_time) * 1000
return ProductionGARPResult(
is_profit_maximizing=is_profit_maximizing,
violations=violations,
cost_efficiency_score=cost_efficiency,
returns_to_scale=returns_to_scale,
profit_efficiency=profit_efficiency,
input_efficiency_vector=input_efficiency,
output_efficiency_vector=output_efficiency,
technical_efficiency=technical_efficiency,
computation_time_ms=computation_time,
)
[docs]
def check_cost_minimization(
log: "ProductionLog",
tolerance: float = 1e-6,
) -> dict:
"""
Test if firm behavior is consistent with cost minimization.
Cost minimization is the dual of profit maximization. The firm
should choose inputs that minimize cost for a given output level.
Args:
log: ProductionLog with input/output prices and quantities
tolerance: Numerical tolerance
Returns:
Dictionary with cost minimization analysis
"""
T = log.num_observations
# Construct revealed cost relation
# R[i,j] = True if using j's inputs at i's input prices costs at least
# as much as i's actual cost, AND outputs are comparable
violations = []
for i in range(T):
for j in range(T):
if i == j:
continue
# Actual cost at i
cost_i = log.total_cost[i]
# Counterfactual: using j's inputs at i's prices
counterfactual_cost = np.dot(log.input_prices[i], log.input_quantities[j])
# Check if outputs are comparable (j produces at least as much for ALL outputs)
# Critical fix: use element-wise comparison, not sum-based
# For multi-output firms, we need j to produce at least as much of EACH output
output_comparable = np.all(
log.output_quantities[j] >= log.output_quantities[i] - tolerance
)
if output_comparable:
# j's inputs could produce at least as much output
# If counterfactual cost < actual cost, violation
if counterfactual_cost < cost_i - tolerance:
violations.append((i, j))
return {
"is_cost_minimizing": len(violations) == 0,
"violations": violations,
"num_violations": len(violations),
}
[docs]
def estimate_returns_to_scale(
log: "ProductionLog",
) -> str:
"""
Estimate returns to scale from production data.
Returns to scale indicates how output changes when all inputs
are scaled proportionally:
- Increasing: doubling inputs more than doubles output
- Constant: doubling inputs exactly doubles output
- Decreasing: doubling inputs less than doubles output
Args:
log: ProductionLog with input/output quantities
Returns:
One of "increasing", "constant", "decreasing", or "variable"
"""
T = log.num_observations
if T < 3:
return "variable"
# Compute input and output indices
input_indices = np.sum(log.input_quantities, axis=1)
output_indices = np.sum(log.output_quantities, axis=1)
if np.std(input_indices) < 1e-10:
return "variable"
# Estimate output elasticity with respect to inputs
# Using log-log regression
log_inputs = np.log(input_indices + 1e-10)
log_outputs = np.log(output_indices + 1e-10)
# Simple linear regression
cov = np.cov(log_inputs, log_outputs)
if cov[0, 0] > 1e-10:
elasticity = cov[0, 1] / cov[0, 0]
else:
elasticity = 1.0
# Classify returns to scale
if elasticity > 1.1:
return "increasing"
elif elasticity < 0.9:
return "decreasing"
else:
return "constant"
[docs]
def compute_technical_efficiency(
log: "ProductionLog",
method: str = "output_oriented",
) -> NDArray[np.float64]:
"""
Compute technical efficiency for each observation.
Technical efficiency measures how close the firm operates to the
production frontier. A score of 1.0 means fully efficient.
Args:
log: ProductionLog with input/output data
method: "output_oriented" or "input_oriented"
Returns:
Array of efficiency scores (one per observation)
"""
T = log.num_observations
efficiencies = np.ones(T)
for i in range(T):
max_efficiency = 1.0
for j in range(T):
if i == j:
continue
if method == "output_oriented":
# Can j produce more output with same or fewer inputs?
input_ratio = np.sum(log.input_quantities[j]) / max(np.sum(log.input_quantities[i]), 1e-10)
output_ratio = np.sum(log.output_quantities[j]) / max(np.sum(log.output_quantities[i]), 1e-10)
if input_ratio <= 1.0 and output_ratio > 1.0:
# j is more efficient
efficiency = 1.0 / output_ratio
max_efficiency = min(max_efficiency, efficiency)
else: # input_oriented
# Can j produce same output with fewer inputs?
input_ratio = np.sum(log.input_quantities[j]) / max(np.sum(log.input_quantities[i]), 1e-10)
output_ratio = np.sum(log.output_quantities[j]) / max(np.sum(log.output_quantities[i]), 1e-10)
if output_ratio >= 1.0 and input_ratio < 1.0:
# j uses fewer inputs for same output
max_efficiency = min(max_efficiency, input_ratio)
efficiencies[i] = max_efficiency
return efficiencies
def _compute_cost_efficiency(
log: "ProductionLog",
tolerance: float,
) -> float:
"""Compute overall cost efficiency score."""
T = log.num_observations
efficient_count = 0
for i in range(T):
is_efficient = True
for j in range(T):
if i == j:
continue
# Can j produce same output at lower cost?
output_comparable = np.all(log.output_quantities[j] >= log.output_quantities[i] - tolerance)
counterfactual_cost = np.dot(log.input_prices[i], log.input_quantities[j])
if output_comparable and counterfactual_cost < log.total_cost[i] - tolerance:
is_efficient = False
break
if is_efficient:
efficient_count += 1
return efficient_count / T if T > 0 else 1.0
def _compute_profit_efficiency(
log: "ProductionLog",
tolerance: float,
) -> float:
"""Compute overall profit efficiency score."""
profits = log.profit
if len(profits) == 0:
return 1.0
max_profit = np.max(profits)
if max_profit <= 0:
return 1.0
mean_profit = np.mean(profits)
return max(0.0, min(1.0, mean_profit / max_profit))
def _compute_input_efficiency(
log: "ProductionLog",
tolerance: float,
) -> NDArray[np.float64]:
"""Compute per-input efficiency scores."""
N_inputs = log.num_inputs
efficiencies = np.ones(N_inputs)
for k in range(N_inputs):
# Compare input k usage relative to output
input_k = log.input_quantities[:, k]
total_output = np.sum(log.output_quantities, axis=1)
if np.std(input_k) < tolerance:
continue
# Efficiency = how well input usage correlates with output
corr = np.corrcoef(input_k, total_output)[0, 1]
efficiencies[k] = max(0.0, corr) if not np.isnan(corr) else 1.0
return efficiencies
def _compute_output_efficiency(
log: "ProductionLog",
tolerance: float,
) -> NDArray[np.float64]:
"""Compute per-output efficiency scores."""
N_outputs = log.num_outputs
efficiencies = np.ones(N_outputs)
for k in range(N_outputs):
# Compare output k relative to input usage
output_k = log.output_quantities[:, k]
total_input = np.sum(log.input_quantities, axis=1)
if np.std(total_input) < tolerance:
continue
# Efficiency = how well output correlates with input
corr = np.corrcoef(output_k, total_input)[0, 1]
efficiencies[k] = max(0.0, corr) if not np.isnan(corr) else 1.0
return efficiencies
def _compute_technical_efficiency(
log: "ProductionLog",
) -> float:
"""Compute overall technical efficiency (mean of observations)."""
efficiencies = compute_technical_efficiency(log)
return float(np.mean(efficiencies))
# =============================================================================
# LEGACY ALIASES
# =============================================================================
check_production_garp = test_profit_maximization
"""Legacy alias: use test_profit_maximization instead."""
test_cost_minimization = check_cost_minimization
"""Legacy alias: use check_cost_minimization instead."""