Files
billai/analyzer/cleaners/alipay.py
2026-01-23 14:17:59 +08:00

258 lines
9.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
支付宝账单清理模块
"""
import csv
from decimal import Decimal
from .base import (
BaseCleaner, parse_amount, format_amount,
is_in_date_range, create_arg_parser
)
from category import reclassify_if_needed, get_platform_merchants
class AlipayCleaner(BaseCleaner):
"""支付宝账单清理器"""
def clean(self) -> None:
"""执行清理"""
self.print_header()
# 读取数据,跳过支付宝导出文件的头部信息
with open(self.input_file, "r", encoding="utf-8") as f:
reader = csv.reader(f)
header = None
rows = []
for row in reader:
# 跳过空行
if not row or not row[0].strip():
continue
# 查找实际的CSV头部行包含"交易时间"和"交易分类"
if header is None:
if len(row) >= 2 and "交易时间" in row[0] and "交易分类" in row[1]:
header = row
continue
# 跳过头部信息行
continue
# 收集数据行
rows.append(row)
# 确保找到了有效的头部
if header is None:
raise ValueError("无法找到有效的支付宝账单表头(需包含'交易时间''交易分类'列)")
self.stats["original_count"] = len(rows)
print(f"原始数据行数: {len(rows)}")
# 第一步:按日期范围筛选
rows_filtered = [
row for row in rows
if row and is_in_date_range(row[0], self.start_date, self.end_date)
]
self.stats["filtered_count"] = len(rows_filtered)
date_desc = f"{self.start_date} ~ {self.end_date}" if self.start_date or self.end_date else "全部"
print(f"筛选后数据行数: {len(rows_filtered)} ({date_desc})")
# 第二步:分离退款和非退款条目
refund_rows = []
expense_rows = []
for row in rows_filtered:
if len(row) > 1 and row[1] == "退款":
refund_rows.append(row)
else:
expense_rows.append(row)
print(f"退款条目数: {len(refund_rows)}")
print(f"非退款条目数: {len(expense_rows)}")
# 第三步:处理退款
order_refunds = self._aggregate_refunds(refund_rows)
print(f"有退款的订单数: {len(order_refunds)}")
# 第四步:处理每笔支出
final_rows = self._process_expenses(expense_rows, order_refunds)
print(f"\n处理结果:")
print(f" 全额退款删除: {self.stats['fully_refunded']}")
print(f" 部分退款调整: {self.stats['partially_refunded']}")
if self.stats.get("zero_amount", 0) > 0:
print(f" 0元记录过滤: {self.stats['zero_amount']}")
print(f" 最终保留行数: {len(final_rows)}")
# 第五步:重新分类并添加"需复核"标注
final_rows = self.reclassify(final_rows, header)
if self.stats["category_adjusted"] > 0:
print(f" 分类调整: {self.stats['category_adjusted']}")
self.stats["final_count"] = len(final_rows)
# 写入文件
self.write_output(header, final_rows)
print(f"\n清理后的数据已保存到: {self.output_file}")
def _aggregate_refunds(self, refund_rows: list) -> dict:
"""聚合退款金额"""
order_refunds = {}
for row in refund_rows:
if len(row) >= 11:
refund_order_no = row[9].strip()
refund_merchant_no = row[10].strip()
refund_amount = parse_amount(row[6])
original_order = refund_order_no.split("_")[0] if "_" in refund_order_no else refund_order_no
key = original_order if original_order else refund_merchant_no
if key:
if key not in order_refunds:
order_refunds[key] = Decimal("0")
order_refunds[key] += refund_amount
print(f" 退款记录: {row[0]} | {row[2]} | {refund_amount}")
return order_refunds
def _process_expenses(self, expense_rows: list, order_refunds: dict) -> list:
"""处理支出记录"""
final_rows = []
for row in expense_rows:
if len(row) >= 12:
order_no = row[9].strip()
merchant_no = row[10].strip()
expense_amount = parse_amount(row[6])
# 查找对应的退款
refund_amount = Decimal("0")
matched_key = None
for key, amount in order_refunds.items():
if key and (order_no == key or merchant_no == key or order_no.startswith(key)):
refund_amount = amount
matched_key = key
break
if matched_key:
if refund_amount >= expense_amount:
# 全额退款,删除
self.stats["fully_refunded"] += 1
print(f" 全额退款删除: {row[0]} | {row[2]} | {row[4][:25]}... | 原{expense_amount}")
else:
# 部分退款,保留差额
remaining = expense_amount - refund_amount
new_row = row.copy()
new_row[6] = format_amount(remaining)
original_remark = new_row[11] if len(new_row) > 11 else ""
new_row[11] = f"原金额{expense_amount}元,退款{refund_amount}{';' + original_remark if original_remark else ''}"
final_rows.append(new_row)
self.stats["partially_refunded"] += 1
print(f" 部分退款: {row[0]} | {row[2]} | 原{expense_amount}元 -> {format_amount(remaining)}")
else:
# 过滤掉金额为 0 的记录(预下单/加购物车等无效记录)
if expense_amount > 0:
final_rows.append(row)
else:
self.stats["zero_amount"] = self.stats.get("zero_amount", 0) + 1
else:
final_rows.append(row)
return final_rows
def _is_platform_merchant(self, merchant: str) -> bool:
"""判断是否为平台型商家(从配置文件读取)"""
platform_merchants = get_platform_merchants()
return any(platform in merchant for platform in platform_merchants)
def reclassify(self, rows: list, header: list) -> list:
"""
重新分类支付宝账单,并添加"复核等级"标注字段
只对平台型商家(美团、京东、抖音等)进行分类调整,
其他商家直接信任支付宝原分类。
复核等级:
空 = 无需复核
低 = 分类被调整,需确认调整是否正确
高 = 完全无法判断,需人工分类
字段索引:
0: 交易时间
1: 交易分类
2: 交易对方
4: 商品说明
5: 收/支
"""
# 添加"复核等级"字段到表头
if "复核等级" not in header:
header.append("复核等级")
review_low_count = 0
review_high_count = 0
for row in rows:
if len(row) >= 6:
original_category = row[1]
merchant = row[2]
product = row[4]
income_expense = row[5]
review_mark = ""
# 只对平台型商家进行重新分类
if self._is_platform_merchant(merchant):
new_category, changed, review_level = reclassify_if_needed(
original_category, merchant, product, income_expense
)
if changed:
row[1] = new_category
self.stats["category_adjusted"] += 1
print(f" 分类调整: {merchant[:15]}... | {original_category} -> {new_category}")
# 添加复核等级标注
if review_level == 1:
review_mark = "LOW"
review_low_count += 1
elif review_level == 2:
review_mark = "HIGH"
review_high_count += 1
# 非平台商家:直接信任支付宝原分类,无需复核
# 确保行长度足够
while len(row) < len(header) - 1:
row.append("")
row.append(review_mark)
if review_high_count > 0:
print(f" 高优先级复核: {review_high_count} 条(无法判断)")
if review_low_count > 0:
print(f" 低优先级复核: {review_low_count} 条(分类已调整)")
return rows
def main():
"""命令行入口"""
parser = create_arg_parser("清理支付宝交易明细数据")
args = parser.parse_args()
from .base import get_output_file, compute_date_range
cleaner = AlipayCleaner(args.input_file, args.output_file)
start_date, end_date = compute_date_range(args)
cleaner.set_date_range(start_date, end_date)
cleaner.clean()
if __name__ == "__main__":
main()