# Sales Transaction Aggregator
```python
"""Aggregate sales transactions by product category."""
from __future__ import annotations
import argparse
import csv
import sys
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
@dataclass
class CategoryStats:
"""Running totals for a single product category."""
revenue: float = 0.0
order_count: int = 0
@property
def average_order_value(self) -> float:
"""Mean revenue per order; zero if no orders recorded."""
return self.revenue / self.order_count if self.order_count else 0.0
REQUIRED_COLUMNS = {"date", "product", "category", "quantity", "unit_price"}
def aggregate_sales(input_path: Path) -> dict[str, CategoryStats]:
"""Read a sales CSV and return per-category aggregated statistics.
Args:
input_path: Path to the input CSV file.
Returns:
Mapping of category name -> CategoryStats.
Raises:
FileNotFoundError: If the input file does not exist.
ValueError: If required columns are missing.
"""
if not input_path.is_file():
raise FileNotFoundError(f"Input file not found: {input_path}")
stats: dict[str, CategoryStats] = defaultdict(CategoryStats)
with input_path.open(newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
# Validate headers once up-front to fail fast on malformed input.
if reader.fieldnames is None or not REQUIRED_COLUMNS.issubset(reader.fieldnames):
missing = REQUIRED_COLUMNS - set(reader.fieldnames or [])
raise ValueError(f"Missing required columns: {sorted(missing)}")
for line_no, row in enumerate(reader, start=2): # start=2 accounts for header
try:
quantity = int(row["quantity"])
unit_price = float(row["unit_price"])
category = row["category"].strip()
except (ValueError, AttributeError) as exc:
print(f"Skipping malformed row {line_no}: {exc}", file=sys.stderr)
continue
if not category:
print(f"Skipping row {line_no}: empty category", file=sys.stderr)
continue
# Each row is one order line; accumulate revenue and count.
stats[category].revenue += quantity * unit_price
stats[category].order_count += 1
return stats
def write_summary(stats: dict[str, CategoryStats], output_path: Path) -> None:
"""Write category statistics to a CSV file, sorted by revenue desc."""
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["category", "total_revenue", "average_order_value"])
for category, s in sorted(stats.items(), key=lambda kv: kv[1].revenue, reverse=True):
writer.writerow([category, f"{s.revenue:.2f}", f"{s.average_order_value:.2f}"])
def main() -> int:
parser = argparse.ArgumentParser(description="Summarize sales by category.")
parser.add_argument("input", type=Path, help="Input CSV of transactions")
parser.add_argument("output", type=Path, help="Output CSV for summary")
args = parser.parse_args()
try:
stats = aggregate_sales(args.input)
write_summary(stats, args.output)
except (FileNotFoundError, ValueError) as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
print(f"Wrote summary for {len(stats)} categories to {args.output}")
return 0
if __name__ == "__main__":
sys.exit(main())
```
## Usage
```bash
python sales_summary.py sales.csv summary.csv
```
**Sample input (`sales.csv`):**
```
date,product,category,quantity,unit_price
2024-01-03,Widget,Hardware,2,9.99
2024-01-04,Book,Media,1,24.50
```
The script validates headers, skips malformed rows with warnings to stderr, uses a `dataclass` for clean stat tracking, and emits categories sorted by revenue. Revenue is computed as `quantity * unit_price`, and average order value divides total revenue by the number of transaction lines per category.
Effective ChatGPT Prompts for Writing Python Code
Tested prompts for chatgpt prompts for python coding compared across 5 leading AI models.
BEST BY JUDGE SCORE
Claude Opus 4.7
9/10
The prompt we tested
You are an expert Python developer and coding mentor. Write clean, production-quality Python code that solves the user's request below. Follow these instructions carefully: Use Python 3.10+ with type hints, docstrings, and PEP 8 formatting. Include error handling for edge cases, prefer standard library when possible, and keep the explanation under 150 words so the code remains the focus. User's coding request: I need a Python script that reads a CSV file of sales transactions, groups them by product category, and outputs the total revenue and average order value per category to a new CSV. The input file has columns: date, product, category, quantity, unit_price. Respond with the complete Python solution, including code, inline comments explaining key logic, and a brief usage example.
Model comparison
| Model | Quality | Cost | Latency | Tokens |
|---|---|---|---|---|
| Claude Opus 4.7 | 9 | $0.1199 | 14.6s | 249 / 1549 |
| Claude Haiku 4.5 | 7 | $0.0050 | 5.8s | 179 / 966 |
| GPT-5.4 | 8 | $0.0203 | 16.8s | 164 / 1326 |
| Grok 4.1 Fast Reasoning | 8 | $0.0005 | 16.5s | 313 / 801 |
Outputs
```python
"""
Sales transaction analyzer: Groups transactions by category and calculates metrics.
"""
import csv
from collections import defaultdict
from pathlib import Path
from dataclasses import dataclass
from typing import Dict
@dataclass
class CategoryMetrics:
"""Stores aggregated metrics per category."""
total_revenue: float = 0.0
total_orders: int = 0
order_count: int = 0
@property
def average_order_value(self) -> float:
"""Calculate average revenue per order."""
return self.total_revenue / self.order_count if self.order_count > 0 else 0.0
def analyze_sales(input_file: str, output_file: str) -> None:
"""
Read sales CSV, aggregate by category, and write results to output CSV.
Args:
input_file: Path to input CSV with columns: date, product, category,
quantity, unit_price
output_file: Path to output CSV with aggregated metrics
Raises:
FileNotFoundError: If input file doesn't exist
ValueError: If required columns are missing
"""
input_path = Path(input_file)
if not input_path.exists():
raise FileNotFoundError(f"Input file not found: {input_file}")
# Dictionary to store metrics per category
categories: Dict[str, CategoryMetrics] = defaultdict(CategoryMetrics)
# Read and aggregate transaction data
try:
with open(input_path, 'r', encoding='utf-8') as f:
reader = csv.DictReader(f)
# Validate required columns
if not reader.fieldnames or not all(
col in reader.fieldnames
for col in ['category', 'quantity', 'unit_price']
):
raise ValueError("Missing required columns: category, quantity, unit_price")
for row in reader:
category = row['category'].strip()
quantity = float(row['quantity'])
unit_price = float(row['unit_price'])
# Calculate revenue for this transaction
revenue = quantity * unit_price
# Update metrics
categories[category].total_revenue += revenue
categories[category].order_count += 1
except (ValueError, KeyError) as e:
raise ValueError(f"Error parsing CSV data: {e}")
# Write aggregated results
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['Category', 'Total Revenue', 'Average Order Value'])
# Sort by category for consistent output
for category in sorted(categories.keys()):
metrics = categories[category]
writer.writerow([
category,
f"{metrics.total_revenue:.2f}",
f"{metrics.average_order_value:.2f}"
])
print(f"✓ Analysis complete. Results written to: {output_file}")
if __name__ == "__main__":
# Example usage
analyze_sales('sales_transactions.csv', 'sales_summary.csv')
```
**Usage Example:**
Create `sales_transactions.csv`:
```csv
date,product,category,quantity,unit_price
2024-01-01,Widget A,Electronics,5,29.99
2024-01-02,Widget B,Electronics,3,49.99
2024-01-03,Gadget X,Home,2,79.99
```
Run:
```bash
python sales_analyzer.py
```
Output `sales_summary.csv`:
```csv
Category,Total Revenue,Average Order Value
Electronics,299.92,149.96
Home,159.98,159.98
```
**Key Features:** Type hints, dataclass for metrics, error handling, clean CSV I/O, and sorted output for reproducibility.
```python
#!/usr/bin/env python3
"""
Aggregate sales transactions by product category.
Reads an input CSV with columns:
date, product, category, quantity, unit_price
Outputs a CSV with columns:
category, total_revenue, average_order_value
Definitions:
- revenue per row = quantity * unit_price
- average_order_value per category = total_revenue / number_of_rows_in_category
"""
from __future__ import annotations
import csv
import sys
from collections import defaultdict
from dataclasses import dataclass
from decimal import Decimal, InvalidOperation, ROUND_HALF_UP
from pathlib import Path
from typing import DefaultDict
REQUIRED_COLUMNS = {"date", "product", "category", "quantity", "unit_price"}
OUTPUT_COLUMNS = ["category", "total_revenue", "average_order_value"]
TWOPLACES = Decimal("0.01")
@dataclass
class CategoryStats:
"""Holds aggregation data for a category."""
total_revenue: Decimal = Decimal("0")
order_count: int = 0
def add_order(self, revenue: Decimal) -> None:
"""Add one transaction's revenue to the category totals."""
self.total_revenue += revenue
self.order_count += 1
@property
def average_order_value(self) -> Decimal:
"""Compute average order value, rounded to 2 decimal places."""
if self.order_count == 0:
return Decimal("0.00")
return (self.total_revenue / self.order_count).quantize(
TWOPLACES, rounding=ROUND_HALF_UP
)
def parse_decimal(value: str, field_name: str, row_number: int) -> Decimal:
"""
Parse a string into Decimal with validation.
Raises:
ValueError: If the value is missing or invalid.
"""
if value is None or value.strip() == "":
raise ValueError(f"Row {row_number}: missing value for '{field_name}'")
try:
return Decimal(value.strip())
except (InvalidOperation, AttributeError) as exc:
raise ValueError(
f"Row {row_number}: invalid decimal for '{field_name}': {value!r}"
) from exc
def aggregate_sales_by_category(input_csv: Path) -> dict[str, CategoryStats]:
"""
Read the input CSV and aggregate revenue and order counts by category.
Args:
input_csv: Path to the source CSV file.
Returns:
Mapping of category name to aggregated statistics.
Raises:
FileNotFoundError: If the input file does not exist.
ValueError: If required columns are missing or row data is invalid.
"""
if not input_csv.exists():
raise FileNotFoundError(f"Input file not found: {input_csv}")
stats: DefaultDict[str, CategoryStats] = defaultdict(CategoryStats)
with input_csv.open("r", newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
if reader.fieldnames is None:
raise ValueError("Input CSV is empty or missing a header row")
missing = REQUIRED_COLUMNS - set(reader.fieldnames)
if missing:
raise ValueError(
f"Input CSV is missing required columns: {', '.join(sorted(missing))}"
)
for row_number, row in enumerate(reader, start=2): # header is row 1
category = (row.get("category") or "").strip()
if not category:
raise ValueError(f"Row {row_number}: missing category")
quantity = parse_decimal(row.get("quantity", ""), "quantity", row_number)
unit_price = parse_decimal(row.get("unit_price", ""), "unit_price", row_number)
if quantity < 0:
raise ValueError(f"Row {row_number}: quantity cannot be negative")
if unit_price < 0:
raise ValueError(f"Row {row_number}: unit_price cannot be negative")
# Revenue for one transaction row.
revenue = quantity * unit_price
stats[category].add_order(revenue)
return dict(stats)
def write_summary_csv(output_csv: Path, stats: dict[str, CategoryStats]) -> None:
"""
Write aggregated category metrics to an output CSV.
Args:
output_csv: Destination CSV path.
stats: Aggregated category statistics.
"""
with output_csv.open("w", newline="", encoding="utf-8") as f:
writer = csv.DictWriter(f, fieldnames=OUTPUT_COLUMNS)
writer.writeheader()
# Sort categories for stable, predictable output.
for category in sorted(stats):
category_stats = stats[category]
writer.writerow(
{
"category": category,
"total_revenue": str(
category_stats.total_revenue.quantize(
TWOPLACES, rounding=ROUND_HALF_UP
)
),
"average_order_value": str(category_stats.average_order_value),
}
)
def main(argv: list[str]) -> int:
"""
CLI entry point.
Usage:
python sales_summary.py input.csv output.csv
"""
if len(argv) != 3:
print(
"Usage: python sales_summary.py <input_csv> <output_csv>",
file=sys.stderr,
)
return 1
input_csv = Path(argv[1])
output_csv = Path(argv[2])
try:
stats = aggregate_sales_by_category(input_csv)
write_summary_csv(output_csv, stats)
except (FileNotFoundError, ValueError, OSError) as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
print(f"Summary written to: {output_csv}")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv))
```
### Usage example
```bash
python sales_summary.py sales_transactions.csv category_summary.csv
```
Example input:
```csv
date,product,category,quantity,unit_price
2024-01-01,Keyboard,Electronics,2,49.99
2024-01-02,Mouse,Electronics,1,19.99
2024-01-03,Mug,Home,3,8.50
```
This writes `category_summary.csv` with total revenue and average order value per category.
```python
import argparse
import csv
from collections import defaultdict
from pathlib import Path
from typing import Dict, Any
def process_sales_data(input_path: str, output_path: str) -> None:
"""
Reads a sales CSV file, groups transactions by 'category',
computes total revenue (sum(quantity * unit_price)) and average order value
(total_revenue / transaction_count) per category, and writes to output CSV.
Handles missing files, invalid columns, non-numeric data gracefully.
"""
required_columns = {'date', 'product', 'category', 'quantity', 'unit_price'}
# Use defaultdict for efficient grouping by category
stats: Dict[str, Dict[str, float]] = defaultdict(lambda: {'total_revenue': 0.0, 'count': 0.0})
# Read and validate input CSV
try:
with open(input_path, 'r', newline='', encoding='utf-8') as infile:
reader = csv.DictReader(infile)
# Ensure all required columns exist
if not required_columns.issubset(set(reader.fieldnames or [])):
missing = required_columns - set(reader.fieldnames or [])
raise ValueError(f"Missing columns: {missing}")
for row_num, row in enumerate(reader, start=2): # Line numbers for errors
try:
# Compute revenue per transaction (row)
qty = float(row['quantity'])
unit_price = float(row['unit_price'])
revenue = qty * unit_price
category = row['category'].strip()
# Accumulate stats
stats[category]['total_revenue'] += revenue
stats[category]['count'] += 1.0
except (ValueError, KeyError) as e:
print(f"Warning: Skipping row {row_num} (invalid data): {e}")
continue
except FileNotFoundError:
raise FileNotFoundError(f"Input file not found: {input_path}")
except csv.Error as e:
raise ValueError(f"CSV read error in {input_path}: {e}")
# Write grouped stats to output CSV, sorted by category
fieldnames = ['category', 'total_revenue', 'average_order_value']
try:
with open(output_path, 'w', newline='', encoding='utf-8') as outfile:
writer = csv.DictWriter(outfile, fieldnames=fieldnames)
writer.writeheader()
for category in sorted(stats.keys()):
total_rev = stats[category]['total_revenue']
count = stats[category]['count']
avg_order_value = total_rev / count if count > 0 else 0.0
writer.writerow({
'category': category,
'total_revenue': round(total_rev, 2),
'average_order_value': round(avg_order_value, 2)
})
print(f"Output written to {output_path} ({len(stats)} categories)")
except IOError as e:
raise IOError(f"Failed to write {output_path}: {e}")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Analyze sales by category.')
parser.add_argument('input', help='Path to input CSV (e.g., sales.csv)')
parser.add_argument('output', help='Path to output CSV (e.g., summary.csv)')
args = parser.parse_args()
process_sales_data(args.input, args.output)
```
**Usage example:**
```
$ python sales_analyzer.py sales.csv category_summary.csv
```
Assumes `sales.csv` has header and data like: `date,product,category,quantity,unit_price\n2023-01-01,Widget A,Electronics,2,10.50`. Outputs `category_summary.csv` with `category,total_revenue,average_order_value`. (98 words)
Related queries
Try it with a real tool
Run this prompt in one of these tools. Affiliate links help keep Gridlyx free.