feat: 完善项目架构并增强分析页面功能

- 新增项目文档和 Docker 配置
  - 添加 README.md 和 TODO.md 项目文档
  - 为各服务添加 Dockerfile 和 docker-compose 配置

- 重构后端架构
  - 新增 adapter 层(HTTP/Python 适配器)
  - 新增 repository 层(数据访问抽象)
  - 新增 router 模块统一管理路由
  - 新增账单处理 handler

- 扩展前端 UI 组件库
  - 新增 Calendar、DateRangePicker、Drawer、Popover 等组件
  - 集成 shadcn-svelte 组件库

- 增强分析页面功能
  - 添加时间范围筛选器(支持本月默认值)
  - 修复 DateRangePicker 默认值显示问题
  - 优化数据获取和展示逻辑

- 完善分析器服务
  - 新增 FastAPI 服务接口
  - 改进账单清理器实现
This commit is contained in:
2026-01-10 01:15:52 +08:00
parent 94f8ea12e6
commit 087ae027cc
96 changed files with 4301 additions and 482 deletions

25
analyzer/Dockerfile Normal file
View File

@@ -0,0 +1,25 @@
# Python 分析服务 Dockerfile
FROM python:3.12-slim
WORKDIR /app
# 配置国内镜像源pip + apt
RUN sed -i 's|deb.debian.org|mirrors.aliyun.com|g' /etc/apt/sources.list.d/debian.sources && \
pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/ && \
pip config set global.trusted-host mirrors.aliyun.com
# 安装依赖
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制源代码
COPY . .
# 暴露端口
EXPOSE 8001
# 健康检查需要 curl
RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/*
# 启动服务
CMD ["python", "server.py"]

View File

@@ -60,6 +60,8 @@ class AlipayCleaner(BaseCleaner):
print(f"\n处理结果:")
print(f" 全额退款删除: {self.stats['fully_refunded']}")
print(f" 部分退款调整: {self.stats['partially_refunded']}")
if self.stats.get("zero_amount", 0) > 0:
print(f" 0元记录过滤: {self.stats['zero_amount']}")
print(f" 最终保留行数: {len(final_rows)}")
# 第五步:重新分类并添加"需复核"标注
@@ -134,7 +136,11 @@ class AlipayCleaner(BaseCleaner):
self.stats["partially_refunded"] += 1
print(f" 部分退款: {row[0]} | {row[2]} | 原{expense_amount}元 -> {format_amount(remaining)}")
else:
final_rows.append(row)
# 过滤掉金额为 0 的记录(预下单/加购物车等无效记录)
if expense_amount > 0:
final_rows.append(row)
else:
self.stats["zero_amount"] = self.stats.get("zero_amount", 0) + 1
else:
final_rows.append(row)

View File

@@ -85,6 +85,58 @@ def compute_date_range(args) -> tuple[date | None, date | None]:
return start_date, end_date
def compute_date_range_from_values(
year: str = None,
month: str = None,
start: str = None,
end: str = None
) -> tuple[date | None, date | None]:
"""
根据参数值计算日期范围(不依赖 argparse
供 HTTP API 调用使用
Returns:
(start_date, end_date) 或 (None, None) 表示不筛选
"""
start_date = None
end_date = None
# 1. 根据年份设置范围
if year:
y = int(year)
start_date = date(y, 1, 1)
end_date = date(y, 12, 31)
# 2. 根据月份进一步收窄
if month:
m = int(month)
y = int(year) if year else datetime.now().year
if not start_date:
start_date = date(y, 1, 1)
end_date = date(y, 12, 31)
month_start = date(y, m, 1)
if m == 12:
month_end = date(y, 12, 31)
else:
month_end = date(y, m + 1, 1) - timedelta(days=1)
start_date = max(start_date, month_start) if start_date else month_start
end_date = min(end_date, month_end) if end_date else month_end
# 3. 根据 start/end 参数进一步收窄
if start:
custom_start = parse_date(start)
start_date = max(start_date, custom_start) if start_date else custom_start
if end:
custom_end = parse_date(end)
end_date = min(end_date, custom_end) if end_date else custom_end
return start_date, end_date
def is_in_date_range(date_str: str, start_date: date | None, end_date: date | None) -> bool:
"""检查日期字符串是否在指定范围内"""
if start_date is None and end_date is None:

View File

@@ -58,6 +58,8 @@ class WechatCleaner(BaseCleaner):
print(f"\n处理结果:")
print(f" 全额退款删除: {self.stats['fully_refunded']}")
print(f" 部分退款调整: {self.stats['partially_refunded']}")
if self.stats.get("zero_amount", 0) > 0:
print(f" 0元记录过滤: {self.stats['zero_amount']}")
print(f" 保留支出条目: {len(final_expense_rows)}")
print(f" 保留收入条目: {len(income_rows)}")
@@ -177,7 +179,11 @@ class WechatCleaner(BaseCleaner):
if merchant in transfer_refunds:
del transfer_refunds[merchant]
else:
final_expense_rows.append((row, None))
# 过滤掉金额为 0 的记录(预下单/加购物车等无效记录)
if original_amount > 0:
final_expense_rows.append((row, None))
else:
self.stats["zero_amount"] = self.stats.get("zero_amount", 0) + 1
return final_expense_rows, income_rows

View File

@@ -1,2 +1,4 @@
pyyaml>=6.0
fastapi>=0.109.0
uvicorn[standard]>=0.27.0
python-multipart>=0.0.6

348
analyzer/server.py Normal file
View File

@@ -0,0 +1,348 @@
#!/usr/bin/env python3
"""
账单分析 FastAPI 服务
提供 HTTP API 供 Go 服务调用,替代子进程通信方式
"""
import os
import sys
import io
import tempfile
import shutil
from pathlib import Path
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
from fastapi.responses import FileResponse, JSONResponse
from pydantic import BaseModel
from typing import Optional
# 解决编码问题
if sys.stdout.encoding != 'utf-8':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8', errors='replace')
from cleaners.base import compute_date_range_from_values
from cleaners import AlipayCleaner, WechatCleaner
from category import infer_category, get_all_categories, get_all_income_categories
# =============================================================================
# Pydantic 模型
# =============================================================================
class CleanRequest(BaseModel):
"""清洗请求"""
input_path: str
output_path: str
year: Optional[str] = None
month: Optional[str] = None
start: Optional[str] = None
end: Optional[str] = None
format: Optional[str] = "csv"
bill_type: Optional[str] = "auto" # auto, alipay, wechat
class CleanResponse(BaseModel):
"""清洗响应"""
success: bool
bill_type: str
message: str
output_path: Optional[str] = None
class CategoryRequest(BaseModel):
"""分类推断请求"""
merchant: str
product: str
income_expense: str # "收入" 或 "支出"
class CategoryResponse(BaseModel):
"""分类推断响应"""
category: str
is_certain: bool
class HealthResponse(BaseModel):
"""健康检查响应"""
status: str
version: str
# =============================================================================
# 辅助函数
# =============================================================================
def detect_bill_type(filepath: str) -> str | None:
"""
检测账单类型
Returns:
'alipay' | 'wechat' | None
"""
try:
with open(filepath, "r", encoding="utf-8") as f:
for _ in range(20):
line = f.readline()
if not line:
break
# 支付宝特征
if "交易分类" in line and "对方账号" in line:
return "alipay"
# 微信特征
if "交易类型" in line and "金额(元)" in line:
return "wechat"
# 数据行特征
if line.startswith("202"):
if "" in line:
return "wechat"
if "@" in line:
return "alipay"
except Exception as e:
print(f"读取文件失败: {e}", file=sys.stderr)
return None
return None
def do_clean(
input_path: str,
output_path: str,
bill_type: str = "auto",
year: str = None,
month: str = None,
start: str = None,
end: str = None,
output_format: str = "csv"
) -> tuple[bool, str, str]:
"""
执行清洗逻辑
Returns:
(success, bill_type, message)
"""
# 检查文件是否存在
if not Path(input_path).exists():
return False, "", f"文件不存在: {input_path}"
# 检测账单类型
if bill_type == "auto":
detected_type = detect_bill_type(input_path)
if detected_type is None:
return False, "", "无法识别账单类型"
bill_type = detected_type
# 计算日期范围
start_date, end_date = compute_date_range_from_values(year, month, start, end)
# 创建对应的清理器
try:
if bill_type == "alipay":
cleaner = AlipayCleaner(input_path, output_path, output_format)
else:
cleaner = WechatCleaner(input_path, output_path, output_format)
cleaner.set_date_range(start_date, end_date)
cleaner.clean()
type_names = {"alipay": "支付宝", "wechat": "微信"}
return True, bill_type, f"{type_names[bill_type]}账单清洗完成"
except Exception as e:
return False, bill_type, f"清洗失败: {str(e)}"
# =============================================================================
# FastAPI 应用
# =============================================================================
@asynccontextmanager
async def lifespan(app: FastAPI):
"""应用生命周期管理"""
print("🚀 账单分析服务启动")
yield
print("👋 账单分析服务关闭")
app = FastAPI(
title="BillAI Analyzer",
description="账单分析与清洗服务",
version="1.0.0",
lifespan=lifespan
)
# =============================================================================
# API 路由
# =============================================================================
@app.get("/health", response_model=HealthResponse)
async def health_check():
"""健康检查"""
return HealthResponse(status="ok", version="1.0.0")
@app.post("/clean", response_model=CleanResponse)
async def clean_bill(request: CleanRequest):
"""
清洗账单文件
接收账单文件路径,执行清洗后输出到指定路径
"""
success, bill_type, message = do_clean(
input_path=request.input_path,
output_path=request.output_path,
bill_type=request.bill_type or "auto",
year=request.year,
month=request.month,
start=request.start,
end=request.end,
output_format=request.format or "csv"
)
if not success:
raise HTTPException(status_code=400, detail=message)
return CleanResponse(
success=True,
bill_type=bill_type,
message=message,
output_path=request.output_path
)
@app.post("/clean/upload", response_model=CleanResponse)
async def clean_bill_upload(
file: UploadFile = File(...),
year: Optional[str] = Form(None),
month: Optional[str] = Form(None),
start: Optional[str] = Form(None),
end: Optional[str] = Form(None),
format: Optional[str] = Form("csv"),
bill_type: Optional[str] = Form("auto")
):
"""
上传并清洗账单文件
通过 multipart/form-data 上传文件,清洗后返回结果
"""
# 创建临时文件
suffix = Path(file.filename).suffix or ".csv"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp_input:
shutil.copyfileobj(file.file, tmp_input)
input_path = tmp_input.name
# 创建输出临时文件
output_suffix = ".json" if format == "json" else ".csv"
with tempfile.NamedTemporaryFile(delete=False, suffix=output_suffix) as tmp_output:
output_path = tmp_output.name
try:
success, detected_type, message = do_clean(
input_path=input_path,
output_path=output_path,
bill_type=bill_type or "auto",
year=year,
month=month,
start=start,
end=end,
output_format=format or "csv"
)
if not success:
raise HTTPException(status_code=400, detail=message)
return CleanResponse(
success=True,
bill_type=detected_type,
message=message,
output_path=output_path
)
finally:
# 清理输入临时文件
if os.path.exists(input_path):
os.unlink(input_path)
@app.get("/clean/download/{file_path:path}")
async def download_cleaned_file(file_path: str):
"""下载清洗后的文件"""
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail="文件不存在")
return FileResponse(
file_path,
filename=Path(file_path).name,
media_type="application/octet-stream"
)
@app.post("/category/infer", response_model=CategoryResponse)
async def infer_category_api(request: CategoryRequest):
"""
推断交易分类
根据商户名称和商品信息推断交易分类
"""
category, is_certain = infer_category(
merchant=request.merchant,
product=request.product,
income_expense=request.income_expense
)
return CategoryResponse(category=category, is_certain=is_certain)
@app.get("/category/list")
async def list_categories():
"""获取所有分类列表"""
return {
"expense": get_all_categories(),
"income": get_all_income_categories()
}
@app.post("/detect")
async def detect_bill_type_api(file: UploadFile = File(...)):
"""
检测账单类型
上传文件后自动检测是支付宝还是微信账单
"""
suffix = Path(file.filename).suffix or ".csv"
with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
shutil.copyfileobj(file.file, tmp)
tmp_path = tmp.name
try:
bill_type = detect_bill_type(tmp_path)
if bill_type is None:
raise HTTPException(status_code=400, detail="无法识别账单类型")
type_names = {"alipay": "支付宝", "wechat": "微信"}
return {
"bill_type": bill_type,
"display_name": type_names[bill_type]
}
finally:
if os.path.exists(tmp_path):
os.unlink(tmp_path)
# =============================================================================
# 启动入口
# =============================================================================
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("ANALYZER_PORT", 8001))
host = os.environ.get("ANALYZER_HOST", "0.0.0.0")
print(f"🚀 启动账单分析服务: http://{host}:{port}")
uvicorn.run(app, host=host, port=port)