Source code for prefgraph.datasets._pakistan

"""Pakistan Largest E-Commerce dataset loader.

Loads the Pakistan e-commerce dataset (~1M+ transactions across 16 product
categories) with real prices and quantities, returning a BehaviorPanel.

Budget-based: each observation is a customer-month with price and quantity
vectors across 16 product categories.

Data must be downloaded separately from Kaggle:
https://www.kaggle.com/datasets/zusmani/pakistans-largest-ecommerce-dataset
"""

from __future__ import annotations

import os
from pathlib import Path

import numpy as np

from prefgraph.core.panel import BehaviorPanel
from prefgraph.core.session import BehaviorLog

# --- Constants ---

CATEGORIES = [
    "Mobiles & Tablets", "Entertainment", "Computing", "Appliances",
    "Men's Fashion", "Women's Fashion", "Kids & Baby",
    "Superstore", "Beauty & Grooming", "Health & Sports",
    "Home & Living", "Books", "School & Education",
    "Soghaat", "Others",
]

CSV_FILENAME = "Pakistan Largest Ecommerce Dataset.csv"

NUM_CATEGORIES = len(CATEGORIES)
MIN_UNIT_PRICE = 0.01
MAX_UNIT_PRICE = 500_000.0  # PKR - electronics can be expensive


def _find_data_dir(data_dir: str | Path | None) -> Path:
    """Find Pakistan e-commerce data directory via cascade."""
    candidates = []
    if data_dir is not None:
        candidates.append(Path(data_dir))

    env = os.environ.get("PYREVEALED_DATA_DIR")
    if env:
        candidates.append(Path(env) / "pakistan")

    candidates.extend([
        Path.home() / ".prefgraph" / "data" / "pakistan",
        Path(__file__).resolve().parents[3] / "pakistan" / "data",
    ])

    for d in candidates:
        if d.is_dir() and (d / CSV_FILENAME).exists():
            return d

    searched = "\n  ".join(str(c) for c in candidates)
    raise FileNotFoundError(
        f"Pakistan e-commerce data not found. Searched:\n  {searched}\n\n"
        "Download from Kaggle: "
        "https://www.kaggle.com/datasets/zusmani/pakistans-largest-ecommerce-dataset\n"
        "Place the CSV in ~/.prefgraph/data/pakistan/ or pass data_dir=."
    )


[docs] def load_pakistan( data_dir: str | Path | None = None, max_users: int = 50_000, min_months: int = 5, ) -> BehaviorPanel: """Load Pakistan E-Commerce dataset as a BehaviorPanel. Filters to completed orders with positive price/quantity and non-null Customer ID. Aggregates to monthly periods: for each customer-month, quantity = total units ordered per category, price = median unit price per category that month (market-wide oracle). All 16 product categories are used as goods. Args: data_dir: Path to directory containing the CSV. If None, searches standard locations. max_users: Maximum number of customers to include (default 50,000). min_months: Minimum active months per customer (default 5). Returns: BehaviorPanel with one BehaviorLog per customer (rows = months, cols = product categories). Raises: FileNotFoundError: If data file cannot be found. ImportError: If pandas is not installed. """ try: import pandas as pd except ImportError: raise ImportError( "pandas is required for dataset loaders. " "Install with: pip install 'prefgraph[datasets]'" ) from None data_path = _find_data_dir(data_dir) # --- Load CSV --- df = pd.read_csv( data_path / CSV_FILENAME, usecols=[ "item_id", "status", "created_at", "price", "qty_ordered", "grand_total", "category_name_1", "payment_method", "Customer ID", "Year", "Month", ], dtype={"Customer ID": str}, ) # --- Filter --- # Only completed orders df = df[df["status"].str.strip().str.lower() == "complete"].copy() # Non-null Customer ID df = df.dropna(subset=["Customer ID"]) df = df[df["Customer ID"].str.strip() != ""] # Positive price and quantity df["price"] = pd.to_numeric(df["price"], errors="coerce") df["qty_ordered"] = pd.to_numeric(df["qty_ordered"], errors="coerce") df = df.dropna(subset=["price", "qty_ordered"]) df = df[(df["price"] > 0) & (df["qty_ordered"] > 0)] # Price sanity bounds df = df[ (df["price"] >= MIN_UNIT_PRICE) & (df["price"] <= MAX_UNIT_PRICE) ] # Non-null category df = df.dropna(subset=["category_name_1"]) df["category"] = df["category_name_1"].str.strip() # --- Discover categories from data --- # Use hardcoded list as preference order, but accept whatever exists data_categories = df["category"].value_counts().index.tolist() categories = [c for c in CATEGORIES if c in data_categories] # Add any categories present in data but not in our hardcoded list for c in data_categories: if c not in categories: categories.append(c) df = df[df["category"].isin(categories)] # --- Build year-month period key --- df["Year"] = pd.to_numeric(df["Year"], errors="coerce") df["Month"] = pd.to_numeric(df["Month"], errors="coerce") df = df.dropna(subset=["Year", "Month"]) df["year_month"] = ( df["Year"].astype(int).astype(str) + "-" + df["Month"].astype(int).astype(str).str.zfill(2) ) # --- Build price oracle: median price per category per month --- price_oracle = df.pivot_table( values="price", index="year_month", columns="category", aggfunc="median", ).reindex(columns=categories) price_oracle = price_oracle.ffill().bfill() # Fill remaining NaN with global median per category global_medians = df.groupby("category")["price"].median() for cat in categories: if cat in global_medians.index: price_oracle[cat] = price_oracle[cat].fillna( global_medians[cat] ) price_oracle = price_oracle.fillna(1.0) # absolute fallback all_months = sorted(price_oracle.index) month_to_idx = {m: i for i, m in enumerate(all_months)} price_grid = price_oracle.values # (n_months, n_categories) # --- Aggregate quantity per customer-month-category --- agg = df.groupby( ["Customer ID", "year_month", "category"], observed=True, ).agg( total_qty=("qty_ordered", "sum"), ).reset_index() # --- Build per-customer BehaviorLogs --- logs: dict[str, BehaviorLog] = {} grouped = agg.groupby("Customer ID") customer_ids = list(grouped.groups.keys()) # Cap at max_users if max_users is not None and len(customer_ids) > max_users: customer_ids = customer_ids[:max_users] for cust_id in customer_ids: cust_data = grouped.get_group(cust_id) # Pivot to quantity matrix (months x categories) qty_pivot = cust_data.pivot_table( values="total_qty", index="year_month", columns="category", aggfunc="sum", ).reindex(columns=categories).fillna(0) # Only keep months with at least one purchase active_months = ( qty_pivot[qty_pivot.sum(axis=1) > 0].index.tolist() ) if len(active_months) < min_months: continue qty_matrix = qty_pivot.loc[active_months].values # (T, K) # Price matrix from oracle month_indices = [month_to_idx[m] for m in active_months] price_matrix = price_grid[month_indices] # (T, K) uid = f"customer_{cust_id}" logs[uid] = BehaviorLog( cost_vectors=price_matrix.astype(np.float64), action_vectors=qty_matrix.astype(np.float64), user_id=uid, metadata={ "active_months": list(active_months), }, ) # Early exit if we hit max_users worth of valid logs if max_users is not None and len(logs) >= max_users: break return BehaviorPanel( _logs=logs, metadata={ "dataset": "pakistan_ecommerce", "goods": categories, "n_categories": len(categories), "min_months": min_months, "max_users": max_users, "n_customers": len(logs), "total_months": len(all_months), }, )