189 lines
5.5 KiB
Python
189 lines
5.5 KiB
Python
"""
|
||
账单文件格式转换模块
|
||
|
||
支持:
|
||
- xlsx -> csv 转换
|
||
- GBK/GB2312 -> UTF-8 编码转换
|
||
- 账单类型自动检测
|
||
"""
|
||
import os
|
||
import csv
|
||
import tempfile
|
||
from pathlib import Path
|
||
from typing import Optional, Tuple
|
||
|
||
# 尝试导入 openpyxl,用于读取 xlsx 文件
|
||
try:
|
||
from openpyxl import load_workbook
|
||
HAS_OPENPYXL = True
|
||
except ImportError:
|
||
HAS_OPENPYXL = False
|
||
|
||
|
||
def detect_encoding(filepath: str) -> str:
|
||
"""
|
||
检测文件编码
|
||
|
||
Returns:
|
||
'utf-8', 'gbk', 或 'utf-8-sig'
|
||
"""
|
||
# 尝试读取前几行来检测编码
|
||
encodings = ['utf-8', 'utf-8-sig', 'gbk', 'gb2312', 'gb18030']
|
||
|
||
for encoding in encodings:
|
||
try:
|
||
with open(filepath, 'r', encoding=encoding) as f:
|
||
# 尝试读取前 10 行
|
||
for _ in range(10):
|
||
f.readline()
|
||
return encoding
|
||
except (UnicodeDecodeError, UnicodeError):
|
||
continue
|
||
|
||
# 默认使用 gbk
|
||
return 'gbk'
|
||
|
||
|
||
def detect_bill_type_from_content(content: str, filename: str = "") -> str:
|
||
"""
|
||
从内容和文件名检测账单类型
|
||
|
||
Returns:
|
||
'alipay', 'wechat', 或 ''
|
||
"""
|
||
# 从文件名检测
|
||
filename_lower = filename.lower()
|
||
if '支付宝' in filename or 'alipay' in filename_lower:
|
||
return 'alipay'
|
||
if '微信' in filename or 'wechat' in filename_lower:
|
||
return 'wechat'
|
||
|
||
# 从内容检测
|
||
# 支付宝特征: 有 "交易分类" 和 "对方账号" 列
|
||
if '交易分类' in content and '对方账号' in content:
|
||
return 'alipay'
|
||
|
||
# 微信特征: 有 "交易类型" 和 "金额(元)" 列
|
||
if '交易类型' in content and '金额(元)' in content:
|
||
return 'wechat'
|
||
|
||
return ''
|
||
|
||
|
||
def convert_xlsx_to_csv(xlsx_path: str, csv_path: str) -> Tuple[bool, str]:
|
||
"""
|
||
将 xlsx 文件转换为 csv 文件
|
||
|
||
Returns:
|
||
(success, message)
|
||
"""
|
||
if not HAS_OPENPYXL:
|
||
return False, "缺少 openpyxl 库,无法读取 xlsx 文件。请运行: pip install openpyxl"
|
||
|
||
try:
|
||
wb = load_workbook(xlsx_path, read_only=True, data_only=True)
|
||
ws = wb.active
|
||
|
||
with open(csv_path, 'w', encoding='utf-8', newline='') as f:
|
||
writer = csv.writer(f)
|
||
for row in ws.iter_rows(values_only=True):
|
||
# 跳过全空行
|
||
if all(cell is None for cell in row):
|
||
continue
|
||
# 将 None 转换为空字符串
|
||
writer.writerow(['' if cell is None else str(cell) for cell in row])
|
||
|
||
wb.close()
|
||
return True, "xlsx 转换成功"
|
||
|
||
except Exception as e:
|
||
return False, f"xlsx 转换失败: {str(e)}"
|
||
|
||
|
||
def convert_csv_encoding(input_path: str, output_path: str, source_encoding: str = 'auto') -> Tuple[bool, str]:
|
||
"""
|
||
将 csv 文件从 GBK/其他编码转换为 UTF-8
|
||
|
||
Returns:
|
||
(success, message)
|
||
"""
|
||
if source_encoding == 'auto':
|
||
source_encoding = detect_encoding(input_path)
|
||
|
||
# 如果已经是 UTF-8,直接复制
|
||
if source_encoding in ('utf-8', 'utf-8-sig'):
|
||
if input_path != output_path:
|
||
import shutil
|
||
shutil.copy(input_path, output_path)
|
||
return True, "文件已是 UTF-8 编码"
|
||
|
||
try:
|
||
with open(input_path, 'r', encoding=source_encoding) as f_in:
|
||
content = f_in.read()
|
||
|
||
with open(output_path, 'w', encoding='utf-8', newline='') as f_out:
|
||
f_out.write(content)
|
||
|
||
return True, f"编码转换成功: {source_encoding} -> utf-8"
|
||
|
||
except Exception as e:
|
||
return False, f"编码转换失败: {str(e)}"
|
||
|
||
|
||
def convert_bill_file(input_path: str, output_path: Optional[str] = None) -> Tuple[bool, str, str, str]:
|
||
"""
|
||
转换账单文件为标准 CSV 格式(UTF-8 编码)
|
||
|
||
支持:
|
||
- xlsx -> csv 转换
|
||
- GBK/GB2312 -> UTF-8 编码转换
|
||
|
||
Args:
|
||
input_path: 输入文件路径
|
||
output_path: 输出文件路径(可选,默认在同目录生成)
|
||
|
||
Returns:
|
||
(success, bill_type, output_path, message)
|
||
"""
|
||
input_path = Path(input_path)
|
||
|
||
if not input_path.exists():
|
||
return False, '', '', f"文件不存在: {input_path}"
|
||
|
||
# 确定输出路径
|
||
if output_path is None:
|
||
# 生成临时文件
|
||
suffix = '.csv'
|
||
fd, output_path = tempfile.mkstemp(suffix=suffix)
|
||
os.close(fd)
|
||
|
||
ext = input_path.suffix.lower()
|
||
bill_type = ''
|
||
|
||
if ext == '.xlsx':
|
||
# xlsx 转换
|
||
success, message = convert_xlsx_to_csv(str(input_path), output_path)
|
||
if not success:
|
||
return False, '', '', message
|
||
|
||
# 读取内容检测账单类型
|
||
with open(output_path, 'r', encoding='utf-8') as f:
|
||
content = f.read(2000) # 只读取前 2000 字符用于检测
|
||
bill_type = detect_bill_type_from_content(content, input_path.name)
|
||
|
||
elif ext == '.csv':
|
||
# CSV 编码转换
|
||
success, message = convert_csv_encoding(str(input_path), output_path)
|
||
if not success:
|
||
return False, '', '', message
|
||
|
||
# 读取内容检测账单类型
|
||
with open(output_path, 'r', encoding='utf-8') as f:
|
||
content = f.read(2000)
|
||
bill_type = detect_bill_type_from_content(content, input_path.name)
|
||
|
||
else:
|
||
return False, '', '', f"不支持的文件格式: {ext}"
|
||
|
||
return True, bill_type, output_path, "转换成功"
|