- 将 Python 代码移至 analyzer/ 目录(含 venv) - 拆分 Go 服务器代码为模块化结构: - config/: 配置加载 - model/: 请求/响应模型 - service/: 业务逻辑 - handler/: API处理器 - 添加 .gitignore 文件 - 删除旧的独立脚本文件
232 lines
8.6 KiB
Python
232 lines
8.6 KiB
Python
"""
|
|
支付宝账单清理模块
|
|
"""
|
|
import csv
|
|
from decimal import Decimal
|
|
|
|
from .base import (
|
|
BaseCleaner, parse_amount, format_amount,
|
|
is_in_date_range, create_arg_parser
|
|
)
|
|
from category import reclassify_if_needed, get_platform_merchants
|
|
|
|
|
|
class AlipayCleaner(BaseCleaner):
|
|
"""支付宝账单清理器"""
|
|
|
|
def clean(self) -> None:
|
|
"""执行清理"""
|
|
self.print_header()
|
|
|
|
# 读取数据
|
|
with open(self.input_file, "r", encoding="utf-8") as f:
|
|
reader = csv.reader(f)
|
|
header = next(reader)
|
|
rows = list(reader)
|
|
|
|
self.stats["original_count"] = len(rows)
|
|
print(f"原始数据行数: {len(rows)}")
|
|
|
|
# 第一步:按日期范围筛选
|
|
rows_filtered = [
|
|
row for row in rows
|
|
if row and is_in_date_range(row[0], self.start_date, self.end_date)
|
|
]
|
|
self.stats["filtered_count"] = len(rows_filtered)
|
|
|
|
date_desc = f"{self.start_date} ~ {self.end_date}" if self.start_date or self.end_date else "全部"
|
|
print(f"筛选后数据行数: {len(rows_filtered)} ({date_desc})")
|
|
|
|
# 第二步:分离退款和非退款条目
|
|
refund_rows = []
|
|
expense_rows = []
|
|
|
|
for row in rows_filtered:
|
|
if len(row) > 1 and row[1] == "退款":
|
|
refund_rows.append(row)
|
|
else:
|
|
expense_rows.append(row)
|
|
|
|
print(f"退款条目数: {len(refund_rows)}")
|
|
print(f"非退款条目数: {len(expense_rows)}")
|
|
|
|
# 第三步:处理退款
|
|
order_refunds = self._aggregate_refunds(refund_rows)
|
|
print(f"有退款的订单数: {len(order_refunds)}")
|
|
|
|
# 第四步:处理每笔支出
|
|
final_rows = self._process_expenses(expense_rows, order_refunds)
|
|
|
|
print(f"\n处理结果:")
|
|
print(f" 全额退款删除: {self.stats['fully_refunded']} 条")
|
|
print(f" 部分退款调整: {self.stats['partially_refunded']} 条")
|
|
print(f" 最终保留行数: {len(final_rows)}")
|
|
|
|
# 第五步:重新分类并添加"需复核"标注
|
|
final_rows = self.reclassify(final_rows, header)
|
|
|
|
if self.stats["category_adjusted"] > 0:
|
|
print(f" 分类调整: {self.stats['category_adjusted']} 条")
|
|
|
|
self.stats["final_count"] = len(final_rows)
|
|
|
|
# 写入文件
|
|
self.write_output(header, final_rows)
|
|
|
|
print(f"\n清理后的数据已保存到: {self.output_file}")
|
|
|
|
def _aggregate_refunds(self, refund_rows: list) -> dict:
|
|
"""聚合退款金额"""
|
|
order_refunds = {}
|
|
|
|
for row in refund_rows:
|
|
if len(row) >= 11:
|
|
refund_order_no = row[9].strip()
|
|
refund_merchant_no = row[10].strip()
|
|
refund_amount = parse_amount(row[6])
|
|
|
|
original_order = refund_order_no.split("_")[0] if "_" in refund_order_no else refund_order_no
|
|
key = original_order if original_order else refund_merchant_no
|
|
|
|
if key:
|
|
if key not in order_refunds:
|
|
order_refunds[key] = Decimal("0")
|
|
order_refunds[key] += refund_amount
|
|
print(f" 退款记录: {row[0]} | {row[2]} | {refund_amount}元")
|
|
|
|
return order_refunds
|
|
|
|
def _process_expenses(self, expense_rows: list, order_refunds: dict) -> list:
|
|
"""处理支出记录"""
|
|
final_rows = []
|
|
|
|
for row in expense_rows:
|
|
if len(row) >= 12:
|
|
order_no = row[9].strip()
|
|
merchant_no = row[10].strip()
|
|
expense_amount = parse_amount(row[6])
|
|
|
|
# 查找对应的退款
|
|
refund_amount = Decimal("0")
|
|
matched_key = None
|
|
|
|
for key, amount in order_refunds.items():
|
|
if key and (order_no == key or merchant_no == key or order_no.startswith(key)):
|
|
refund_amount = amount
|
|
matched_key = key
|
|
break
|
|
|
|
if matched_key:
|
|
if refund_amount >= expense_amount:
|
|
# 全额退款,删除
|
|
self.stats["fully_refunded"] += 1
|
|
print(f" 全额退款删除: {row[0]} | {row[2]} | {row[4][:25]}... | 原{expense_amount}元")
|
|
else:
|
|
# 部分退款,保留差额
|
|
remaining = expense_amount - refund_amount
|
|
new_row = row.copy()
|
|
new_row[6] = format_amount(remaining)
|
|
|
|
original_remark = new_row[11] if len(new_row) > 11 else ""
|
|
new_row[11] = f"原金额{expense_amount}元,退款{refund_amount}元{';' + original_remark if original_remark else ''}"
|
|
|
|
final_rows.append(new_row)
|
|
self.stats["partially_refunded"] += 1
|
|
print(f" 部分退款: {row[0]} | {row[2]} | 原{expense_amount}元 -> {format_amount(remaining)}元")
|
|
else:
|
|
final_rows.append(row)
|
|
else:
|
|
final_rows.append(row)
|
|
|
|
return final_rows
|
|
|
|
def _is_platform_merchant(self, merchant: str) -> bool:
|
|
"""判断是否为平台型商家(从配置文件读取)"""
|
|
platform_merchants = get_platform_merchants()
|
|
return any(platform in merchant for platform in platform_merchants)
|
|
|
|
def reclassify(self, rows: list, header: list) -> list:
|
|
"""
|
|
重新分类支付宝账单,并添加"复核等级"标注字段
|
|
|
|
只对平台型商家(美团、京东、抖音等)进行分类调整,
|
|
其他商家直接信任支付宝原分类。
|
|
|
|
复核等级:
|
|
空 = 无需复核
|
|
低 = 分类被调整,需确认调整是否正确
|
|
高 = 完全无法判断,需人工分类
|
|
|
|
字段索引:
|
|
0: 交易时间
|
|
1: 交易分类
|
|
2: 交易对方
|
|
4: 商品说明
|
|
5: 收/支
|
|
"""
|
|
# 添加"复核等级"字段到表头
|
|
if "复核等级" not in header:
|
|
header.append("复核等级")
|
|
|
|
review_low_count = 0
|
|
review_high_count = 0
|
|
|
|
for row in rows:
|
|
if len(row) >= 6:
|
|
original_category = row[1]
|
|
merchant = row[2]
|
|
product = row[4]
|
|
income_expense = row[5]
|
|
|
|
review_mark = ""
|
|
|
|
# 只对平台型商家进行重新分类
|
|
if self._is_platform_merchant(merchant):
|
|
new_category, changed, review_level = reclassify_if_needed(
|
|
original_category, merchant, product, income_expense
|
|
)
|
|
|
|
if changed:
|
|
row[1] = new_category
|
|
self.stats["category_adjusted"] += 1
|
|
print(f" 分类调整: {merchant[:15]}... | {original_category} -> {new_category}")
|
|
|
|
# 添加复核等级标注
|
|
if review_level == 1:
|
|
review_mark = "LOW"
|
|
review_low_count += 1
|
|
elif review_level == 2:
|
|
review_mark = "HIGH"
|
|
review_high_count += 1
|
|
# 非平台商家:直接信任支付宝原分类,无需复核
|
|
|
|
# 确保行长度足够
|
|
while len(row) < len(header) - 1:
|
|
row.append("")
|
|
row.append(review_mark)
|
|
|
|
if review_high_count > 0:
|
|
print(f" 高优先级复核: {review_high_count} 条(无法判断)")
|
|
if review_low_count > 0:
|
|
print(f" 低优先级复核: {review_low_count} 条(分类已调整)")
|
|
|
|
return rows
|
|
|
|
|
|
def main():
|
|
"""命令行入口"""
|
|
parser = create_arg_parser("清理支付宝交易明细数据")
|
|
args = parser.parse_args()
|
|
|
|
from .base import get_output_file, compute_date_range
|
|
|
|
cleaner = AlipayCleaner(args.input_file, args.output_file)
|
|
start_date, end_date = compute_date_range(args)
|
|
cleaner.set_date_range(start_date, end_date)
|
|
cleaner.clean()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|
|
|