""" 微信账单清理模块 """ import csv import re from decimal import Decimal from .base import ( BaseCleaner, parse_amount, format_amount, is_in_date_range, create_arg_parser ) from category import infer_category # 与支付宝对齐的表头(包含"复核等级"字段) ALIGNED_HEADER = [ "交易时间", "交易分类", "交易对方", "对方账号", "商品说明", "收/支", "金额", "收/付款方式", "交易状态", "交易订单号", "商家订单号", "备注", "复核等级" ] class WechatCleaner(BaseCleaner): """微信账单清理器""" def clean(self) -> None: """执行清理""" self.print_header() # 读取数据 with open(self.input_file, "r", encoding="utf-8") as f: reader = csv.reader(f) header = next(reader) rows = list(reader) self.stats["original_count"] = len(rows) print(f"原始数据行数: {len(rows)}") # 第一步:按日期范围筛选 rows_filtered = [ row for row in rows if row and is_in_date_range(row[0], self.start_date, self.end_date) ] self.stats["filtered_count"] = len(rows_filtered) date_desc = f"{self.start_date} ~ {self.end_date}" if self.start_date or self.end_date else "全部" print(f"筛选后数据行数: {len(rows_filtered)} ({date_desc})") # 第二步:分离退款、支出、收入 refund_rows, expense_rows, income_rows = self._separate_rows(rows_filtered) print(f"退款条目数: {len(refund_rows)}") print(f"支出条目数: {len(expense_rows)}") print(f"其他收入条目数: {len(income_rows)}") # 第三步:处理退款(包括转账退款) final_expense_rows, income_rows = self._process_refunds(expense_rows, income_rows) print(f"\n处理结果:") print(f" 全额退款删除: {self.stats['fully_refunded']} 条") print(f" 部分退款调整: {self.stats['partially_refunded']} 条") if self.stats.get("zero_amount", 0) > 0: print(f" 0元记录过滤: {self.stats['zero_amount']} 条") print(f" 保留支出条目: {len(final_expense_rows)} 条") print(f" 保留收入条目: {len(income_rows)} 条") # 第四步:转换为对齐格式并重新分类 aligned_expense = [self._convert_and_reclassify(r, remark) for r, remark in final_expense_rows] aligned_income = [self._convert_and_reclassify((r, None), None) for r in income_rows] # 合并并排序 final_rows = aligned_expense + aligned_income final_rows.sort(key=lambda x: x[0], reverse=True) # 统计复核数量 review_high_count = sum(1 for row in final_rows if row[-1] == "HIGH") self.stats["final_count"] = len(final_rows) print(f" 最终保留行数: {len(final_rows)}") if review_high_count > 0: print(f" 高优先级复核: {review_high_count} 条(无法判断)") # 写入文件 self.write_output(ALIGNED_HEADER, final_rows) print(f"\n清理后的数据已保存到: {self.output_file}") # 统计支出 self._print_expense_summary(aligned_expense) def _separate_rows(self, rows: list) -> tuple[list, list, list]: """分离退款、支出、收入记录""" refund_rows = [] expense_rows = [] income_rows = [] for row in rows: if len(row) < 6: continue transaction_type = row[1] income_expense = row[4] if "-退款" in transaction_type: refund_rows.append(row) elif income_expense == "支出": expense_rows.append(row) elif income_expense == "收入" and "-退款" not in transaction_type: income_rows.append(row) return refund_rows, expense_rows, income_rows def _process_refunds(self, expense_rows: list, income_rows: list) -> tuple[list, list]: """ 处理退款(包括转账退款) 微信的退款有两种形式: 1. 状态标注:支出记录的"当前状态"列标注"已退款" 2. 转账退款:同一交易对方有收入记录(转账退回) """ # 3.1 识别转账退款 transfer_refunds = {} transfer_refund_rows = [] for row in income_rows: merchant = row[2].strip() amount = parse_amount(row[5]) # 检查是否有对应的支出记录 has_matching_expense = any(exp[2].strip() == merchant for exp in expense_rows) if has_matching_expense: if merchant not in transfer_refunds: transfer_refunds[merchant] = Decimal("0") transfer_refunds[merchant] += amount transfer_refund_rows.append(row) # 从收入中移除已识别的转账退款 for row in transfer_refund_rows: income_rows.remove(row) if transfer_refunds: print(f" 识别到转账退款: {len(transfer_refunds)} 笔") # 3.2 处理支出记录 final_expense_rows = [] for row in expense_rows: status = row[7] merchant = row[2].strip() original_amount = parse_amount(row[5]) # 计算总退款金额 status_refund = Decimal("0") transfer_refund = transfer_refunds.get(merchant, Decimal("0")) if "已全额退款" in status: self.stats["fully_refunded"] += 1 print(f" 全额退款删除: {row[0]} | {row[2]} | {row[3][:25]}... | {row[5]}") continue elif "已退款" in status: status_refund = self._extract_refund_amount(status) or Decimal("0") total_refund = status_refund + transfer_refund if total_refund > 0: if total_refund >= original_amount: self.stats["fully_refunded"] += 1 print(f" 全额退款删除: {row[0]} | {row[2]} | {row[3][:25]}... | {row[5]}") else: remaining = original_amount - total_refund new_row = row.copy() new_row[5] = f"¥{format_amount(remaining)}" remark = f"原金额{row[5]},退款¥{total_refund}" final_expense_rows.append((new_row, remark)) self.stats["partially_refunded"] += 1 print(f" 部分退款: {row[0]} | {row[2]} | 原{row[5]} -> ¥{format_amount(remaining)}") if merchant in transfer_refunds: del transfer_refunds[merchant] else: # 过滤掉金额为 0 的记录(预下单/加购物车等无效记录) if original_amount > 0: final_expense_rows.append((row, None)) else: self.stats["zero_amount"] = self.stats.get("zero_amount", 0) + 1 return final_expense_rows, income_rows def _extract_refund_amount(self, status: str) -> Decimal | None: """从状态中提取已退款金额""" match = re.search(r'已退款[((]?¥?([\d.]+)[))]?', status) if match: return Decimal(match.group(1)) if "已全额退款" in status: return None return Decimal("0") def _convert_and_reclassify(self, row_tuple: tuple, remark_override: str | None) -> list: """ 转换为对齐格式并重新分类 微信原始字段: 0: 交易时间, 1: 交易类型, 2: 交易对方, 3: 商品, 4: 收/支, 5: 金额(元), 6: 支付方式, 7: 当前状态, 8: 交易单号, 9: 商户单号, 10: 备注 对齐后字段: 交易时间, 交易分类, 交易对方, 对方账号, 商品说明, 收/支, 金额, 收/付款方式, 交易状态, 交易订单号, 商家订单号, 备注, 需复核 """ if isinstance(row_tuple, tuple): row, remark = row_tuple else: row, remark = row_tuple, None remark = remark_override if remark_override else remark transaction_time = row[0] merchant = row[2] product = row[3] income_expense = row[4] amount = parse_amount(row[5]) payment_method = row[6] status = row[7] order_no = row[8] merchant_order_no = row[9] if len(row) > 9 else "" final_remark = remark if remark else (row[10] if len(row) > 10 else "/") # 重新分类(微信原始的"交易类型"太笼统) category, is_certain = infer_category(merchant, product, income_expense) # 复核等级: 空=无需复核, HIGH=无法判断 review_mark = "" if is_certain else "HIGH" return [ transaction_time, category, merchant, "/", # 对方账号(微信无此字段) product, income_expense, format_amount(amount), payment_method, status, order_no, merchant_order_no, final_remark, review_mark ] def reclassify(self, rows: list) -> list: """ 重新分类微信账单 微信账单在 _convert_and_reclassify 中已完成分类 此方法为接口兼容保留 """ return rows def _print_expense_summary(self, expense_rows: list): """打印支出统计""" total = Decimal("0") categories = {} for row in expense_rows: if row[5] == "支出": amt = Decimal(row[6]) total += amt cat = row[1] categories[cat] = categories.get(cat, Decimal("0")) + amt print(f"清理后支出总额: ¥{total}") print("\n=== 按分类统计 ===") for cat, amt in sorted(categories.items(), key=lambda x: -x[1]): print(f" {cat}: ¥{amt}") def main(): """命令行入口""" parser = create_arg_parser("清理微信支付账单数据") args = parser.parse_args() from .base import compute_date_range cleaner = WechatCleaner(args.input_file, args.output_file) start_date, end_date = compute_date_range(args) cleaner.set_date_range(start_date, end_date) cleaner.clean() if __name__ == "__main__": main()