""" 支付宝账单清理模块 """ import csv from decimal import Decimal from .base import ( BaseCleaner, parse_amount, format_amount, is_in_date_range, create_arg_parser ) from category import reclassify_if_needed, get_platform_merchants class AlipayCleaner(BaseCleaner): """支付宝账单清理器""" def clean(self) -> None: """执行清理""" self.print_header() # 读取数据,跳过支付宝导出文件的头部信息 with open(self.input_file, "r", encoding="utf-8") as f: reader = csv.reader(f) header = None rows = [] for row in reader: # 跳过空行 if not row or not row[0].strip(): continue # 查找实际的CSV头部行(包含"交易时间"和"交易分类") if header is None: if len(row) >= 2 and "交易时间" in row[0] and "交易分类" in row[1]: header = row continue # 跳过头部信息行 continue # 收集数据行 rows.append(row) # 确保找到了有效的头部 if header is None: raise ValueError("无法找到有效的支付宝账单表头(需包含'交易时间'和'交易分类'列)") self.stats["original_count"] = len(rows) print(f"原始数据行数: {len(rows)}") # 第一步:按日期范围筛选 rows_filtered = [ row for row in rows if row and is_in_date_range(row[0], self.start_date, self.end_date) ] self.stats["filtered_count"] = len(rows_filtered) date_desc = f"{self.start_date} ~ {self.end_date}" if self.start_date or self.end_date else "全部" print(f"筛选后数据行数: {len(rows_filtered)} ({date_desc})") # 第二步:分离退款和非退款条目 refund_rows = [] expense_rows = [] for row in rows_filtered: if len(row) > 1 and row[1] == "退款": refund_rows.append(row) else: expense_rows.append(row) print(f"退款条目数: {len(refund_rows)}") print(f"非退款条目数: {len(expense_rows)}") # 第三步:处理退款 order_refunds = self._aggregate_refunds(refund_rows) print(f"有退款的订单数: {len(order_refunds)}") # 第四步:处理每笔支出 final_rows = self._process_expenses(expense_rows, order_refunds) print(f"\n处理结果:") print(f" 全额退款删除: {self.stats['fully_refunded']} 条") print(f" 部分退款调整: {self.stats['partially_refunded']} 条") if self.stats.get("zero_amount", 0) > 0: print(f" 0元记录过滤: {self.stats['zero_amount']} 条") print(f" 最终保留行数: {len(final_rows)}") # 第五步:重新分类并添加"需复核"标注 final_rows = self.reclassify(final_rows, header) if self.stats["category_adjusted"] > 0: print(f" 分类调整: {self.stats['category_adjusted']} 条") self.stats["final_count"] = len(final_rows) # 写入文件 self.write_output(header, final_rows) print(f"\n清理后的数据已保存到: {self.output_file}") def _aggregate_refunds(self, refund_rows: list) -> dict: """聚合退款金额""" order_refunds = {} for row in refund_rows: if len(row) >= 11: refund_order_no = row[9].strip() refund_merchant_no = row[10].strip() refund_amount = parse_amount(row[6]) original_order = refund_order_no.split("_")[0] if "_" in refund_order_no else refund_order_no key = original_order if original_order else refund_merchant_no if key: if key not in order_refunds: order_refunds[key] = Decimal("0") order_refunds[key] += refund_amount print(f" 退款记录: {row[0]} | {row[2]} | {refund_amount}元") return order_refunds def _process_expenses(self, expense_rows: list, order_refunds: dict) -> list: """处理支出记录""" final_rows = [] for row in expense_rows: if len(row) >= 12: order_no = row[9].strip() merchant_no = row[10].strip() expense_amount = parse_amount(row[6]) # 查找对应的退款 refund_amount = Decimal("0") matched_key = None for key, amount in order_refunds.items(): if key and (order_no == key or merchant_no == key or order_no.startswith(key)): refund_amount = amount matched_key = key break if matched_key: if refund_amount >= expense_amount: # 全额退款,删除 self.stats["fully_refunded"] += 1 print(f" 全额退款删除: {row[0]} | {row[2]} | {row[4][:25]}... | 原{expense_amount}元") else: # 部分退款,保留差额 remaining = expense_amount - refund_amount new_row = row.copy() new_row[6] = format_amount(remaining) original_remark = new_row[11] if len(new_row) > 11 else "" new_row[11] = f"原金额{expense_amount}元,退款{refund_amount}元{';' + original_remark if original_remark else ''}" final_rows.append(new_row) self.stats["partially_refunded"] += 1 print(f" 部分退款: {row[0]} | {row[2]} | 原{expense_amount}元 -> {format_amount(remaining)}元") else: # 过滤掉金额为 0 的记录(预下单/加购物车等无效记录) if expense_amount > 0: final_rows.append(row) else: self.stats["zero_amount"] = self.stats.get("zero_amount", 0) + 1 else: final_rows.append(row) return final_rows def _is_platform_merchant(self, merchant: str) -> bool: """判断是否为平台型商家(从配置文件读取)""" platform_merchants = get_platform_merchants() return any(platform in merchant for platform in platform_merchants) def reclassify(self, rows: list, header: list) -> list: """ 重新分类支付宝账单,并添加"复核等级"标注字段 只对平台型商家(美团、京东、抖音等)进行分类调整, 其他商家直接信任支付宝原分类。 复核等级: 空 = 无需复核 低 = 分类被调整,需确认调整是否正确 高 = 完全无法判断,需人工分类 字段索引: 0: 交易时间 1: 交易分类 2: 交易对方 4: 商品说明 5: 收/支 """ # 添加"复核等级"字段到表头 if "复核等级" not in header: header.append("复核等级") review_low_count = 0 review_high_count = 0 for row in rows: if len(row) >= 6: original_category = row[1] merchant = row[2] product = row[4] income_expense = row[5] review_mark = "" # 只对平台型商家进行重新分类 if self._is_platform_merchant(merchant): new_category, changed, review_level = reclassify_if_needed( original_category, merchant, product, income_expense ) if changed: row[1] = new_category self.stats["category_adjusted"] += 1 print(f" 分类调整: {merchant[:15]}... | {original_category} -> {new_category}") # 添加复核等级标注 if review_level == 1: review_mark = "LOW" review_low_count += 1 elif review_level == 2: review_mark = "HIGH" review_high_count += 1 # 非平台商家:直接信任支付宝原分类,无需复核 # 确保行长度足够 while len(row) < len(header) - 1: row.append("") row.append(review_mark) if review_high_count > 0: print(f" 高优先级复核: {review_high_count} 条(无法判断)") if review_low_count > 0: print(f" 低优先级复核: {review_low_count} 条(分类已调整)") return rows def main(): """命令行入口""" parser = create_arg_parser("清理支付宝交易明细数据") args = parser.parse_args() from .base import get_output_file, compute_date_range cleaner = AlipayCleaner(args.input_file, args.output_file) start_date, end_date = compute_date_range(args) cleaner.set_date_range(start_date, end_date) cleaner.clean() if __name__ == "__main__": main()