Files
billai/analyzer/cleaners/base.py
cheliangzhao 087ae027cc feat: 完善项目架构并增强分析页面功能
- 新增项目文档和 Docker 配置
  - 添加 README.md 和 TODO.md 项目文档
  - 为各服务添加 Dockerfile 和 docker-compose 配置

- 重构后端架构
  - 新增 adapter 层(HTTP/Python 适配器)
  - 新增 repository 层(数据访问抽象)
  - 新增 router 模块统一管理路由
  - 新增账单处理 handler

- 扩展前端 UI 组件库
  - 新增 Calendar、DateRangePicker、Drawer、Popover 等组件
  - 集成 shadcn-svelte 组件库

- 增强分析页面功能
  - 添加时间范围筛选器(支持本月默认值)
  - 修复 DateRangePicker 默认值显示问题
  - 优化数据获取和展示逻辑

- 完善分析器服务
  - 新增 FastAPI 服务接口
  - 改进账单清理器实现
2026-01-10 01:23:36 +08:00

293 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
账单清理基类和公共工具函数
"""
import csv
import json
import argparse
from abc import ABC, abstractmethod
from datetime import datetime, date, timedelta
from decimal import Decimal, ROUND_HALF_UP
from pathlib import Path
# =============================================================================
# 公共工具函数
# =============================================================================
def parse_date(date_str: str) -> date:
"""解析日期字符串,支持 YYYY-MM-DD 或 YYYY/MM/DD 格式"""
for fmt in ("%Y-%m-%d", "%Y/%m/%d"):
try:
return datetime.strptime(date_str, fmt).date()
except ValueError:
continue
raise ValueError(f"无法解析日期: {date_str},请使用 YYYY-MM-DD 格式")
def parse_amount(amount_str: str) -> Decimal:
"""解析金额字符串为Decimal去掉¥符号"""
try:
clean = amount_str.replace("", "").replace(" ", "").strip()
return Decimal(clean)
except:
return Decimal("0")
def format_amount(amount: Decimal) -> str:
"""格式化金额为字符串(保留两位小数)"""
return str(amount.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP))
def compute_date_range(args) -> tuple[date | None, date | None]:
"""
根据参数计算最终的日期范围
多重指定时取交集(最小范围)
Returns:
(start_date, end_date) 或 (None, None) 表示不筛选
"""
start_date = None
end_date = None
# 1. 根据年份设置范围
if args.year:
year = int(args.year)
start_date = date(year, 1, 1)
end_date = date(year, 12, 31)
# 2. 根据月份进一步收窄
if args.month:
month = int(args.month)
year = int(args.year) if args.year else datetime.now().year
if not start_date:
start_date = date(year, 1, 1)
end_date = date(year, 12, 31)
month_start = date(year, month, 1)
if month == 12:
month_end = date(year, 12, 31)
else:
month_end = date(year, month + 1, 1) - timedelta(days=1)
start_date = max(start_date, month_start) if start_date else month_start
end_date = min(end_date, month_end) if end_date else month_end
# 3. 根据 start/end 参数进一步收窄
if args.start:
custom_start = parse_date(args.start)
start_date = max(start_date, custom_start) if start_date else custom_start
if args.end:
custom_end = parse_date(args.end)
end_date = min(end_date, custom_end) if end_date else custom_end
return start_date, end_date
def compute_date_range_from_values(
year: str = None,
month: str = None,
start: str = None,
end: str = None
) -> tuple[date | None, date | None]:
"""
根据参数值计算日期范围(不依赖 argparse
供 HTTP API 调用使用
Returns:
(start_date, end_date) 或 (None, None) 表示不筛选
"""
start_date = None
end_date = None
# 1. 根据年份设置范围
if year:
y = int(year)
start_date = date(y, 1, 1)
end_date = date(y, 12, 31)
# 2. 根据月份进一步收窄
if month:
m = int(month)
y = int(year) if year else datetime.now().year
if not start_date:
start_date = date(y, 1, 1)
end_date = date(y, 12, 31)
month_start = date(y, m, 1)
if m == 12:
month_end = date(y, 12, 31)
else:
month_end = date(y, m + 1, 1) - timedelta(days=1)
start_date = max(start_date, month_start) if start_date else month_start
end_date = min(end_date, month_end) if end_date else month_end
# 3. 根据 start/end 参数进一步收窄
if start:
custom_start = parse_date(start)
start_date = max(start_date, custom_start) if start_date else custom_start
if end:
custom_end = parse_date(end)
end_date = min(end_date, custom_end) if end_date else custom_end
return start_date, end_date
def is_in_date_range(date_str: str, start_date: date | None, end_date: date | None) -> bool:
"""检查日期字符串是否在指定范围内"""
if start_date is None and end_date is None:
return True
try:
row_date = datetime.strptime(date_str[:10], "%Y-%m-%d").date()
except ValueError:
return False
if start_date and row_date < start_date:
return False
if end_date and row_date > end_date:
return False
return True
def create_arg_parser(description: str) -> argparse.ArgumentParser:
"""创建通用的命令行参数解析器"""
parser = argparse.ArgumentParser(
description=description,
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
日期筛选说明:
--year 指定年份(如 2026
--month 指定月份1-12
--start 起始日期YYYY-MM-DD
--end 结束日期YYYY-MM-DD
多个条件同时指定时,取交集(最小日期范围)
输出格式:
--format 输出格式csv默认或 json
"""
)
parser.add_argument("input_file", help="输入的账单CSV文件")
parser.add_argument("output_file", nargs="?", default=None,
help="输出文件(默认为 输入文件名_cleaned.csv/json")
parser.add_argument("--year", "-y", type=str, default=None,
help="保留的年份(如 2026")
parser.add_argument("--month", "-m", type=int, choices=range(1, 13),
metavar="1-12", help="保留的月份1-12")
parser.add_argument("--start", "-s", type=str, help="起始日期YYYY-MM-DD")
parser.add_argument("--end", "-e", type=str, help="结束日期YYYY-MM-DD")
parser.add_argument("--format", "-f", choices=["csv", "json"], default="csv",
help="输出格式csv默认或 json")
return parser
def get_output_file(input_file: str, output_file: str | None, output_format: str = "csv") -> str:
"""获取输出文件路径"""
if output_file:
return output_file
import os
base_name = os.path.splitext(input_file)[0]
ext = "json" if output_format == "json" else "csv"
return f"{base_name}_cleaned.{ext}"
# =============================================================================
# 账单清理基类
# =============================================================================
class BaseCleaner(ABC):
"""账单清理基类"""
def __init__(self, input_file: str, output_file: str | None = None, output_format: str = "csv"):
self.input_file = input_file
self.output_format = output_format
self.output_file = get_output_file(input_file, output_file, output_format)
self.start_date: date | None = None
self.end_date: date | None = None
# 统计信息
self.stats = {
"original_count": 0,
"filtered_count": 0,
"fully_refunded": 0,
"partially_refunded": 0,
"category_adjusted": 0,
"final_count": 0,
}
def set_date_range(self, start_date: date | None, end_date: date | None):
"""设置日期筛选范围"""
self.start_date = start_date
self.end_date = end_date
def print_header(self):
"""打印处理头信息"""
print(f"输入文件: {self.input_file}")
print(f"输出文件: {self.output_file}")
print(f"输出格式: {self.output_format.upper()}")
if self.start_date or self.end_date:
print(f"日期范围: {self.start_date or '不限'} ~ {self.end_date or '不限'}")
else:
print("日期范围: 全部")
print()
def write_output(self, header: list, rows: list):
"""
写入输出文件(支持 CSV 和 JSON 格式)
Args:
header: 表头列表
rows: 数据行列表
"""
if self.output_format == "json":
self._write_json(header, rows)
else:
self._write_csv(header, rows)
def _write_csv(self, header: list, rows: list):
"""写入 CSV 格式"""
with open(self.output_file, "w", encoding="utf-8", newline="") as f:
writer = csv.writer(f)
writer.writerow(header)
writer.writerows(rows)
def _write_json(self, header: list, rows: list):
"""写入 JSON 格式"""
# 将每行转换为字典
data = []
for row in rows:
record = {}
for i, col in enumerate(header):
if i < len(row):
record[col] = row[i]
else:
record[col] = ""
data.append(record)
with open(self.output_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
@abstractmethod
def clean(self) -> None:
"""执行清理,子类实现"""
pass
@abstractmethod
def reclassify(self, rows: list) -> list:
"""
重新分类(子类实现)
Args:
rows: 待处理的数据行
Returns:
处理后的数据行
"""
pass