feat: 支持ZIP压缩包上传(含密码保护)
This commit is contained in:
188
analyzer/converter.py
Normal file
188
analyzer/converter.py
Normal file
@@ -0,0 +1,188 @@
|
||||
"""
|
||||
账单文件格式转换模块
|
||||
|
||||
支持:
|
||||
- xlsx -> csv 转换
|
||||
- GBK/GB2312 -> UTF-8 编码转换
|
||||
- 账单类型自动检测
|
||||
"""
|
||||
import os
|
||||
import csv
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple
|
||||
|
||||
# 尝试导入 openpyxl,用于读取 xlsx 文件
|
||||
try:
|
||||
from openpyxl import load_workbook
|
||||
HAS_OPENPYXL = True
|
||||
except ImportError:
|
||||
HAS_OPENPYXL = False
|
||||
|
||||
|
||||
def detect_encoding(filepath: str) -> str:
|
||||
"""
|
||||
检测文件编码
|
||||
|
||||
Returns:
|
||||
'utf-8', 'gbk', 或 'utf-8-sig'
|
||||
"""
|
||||
# 尝试读取前几行来检测编码
|
||||
encodings = ['utf-8', 'utf-8-sig', 'gbk', 'gb2312', 'gb18030']
|
||||
|
||||
for encoding in encodings:
|
||||
try:
|
||||
with open(filepath, 'r', encoding=encoding) as f:
|
||||
# 尝试读取前 10 行
|
||||
for _ in range(10):
|
||||
f.readline()
|
||||
return encoding
|
||||
except (UnicodeDecodeError, UnicodeError):
|
||||
continue
|
||||
|
||||
# 默认使用 gbk
|
||||
return 'gbk'
|
||||
|
||||
|
||||
def detect_bill_type_from_content(content: str, filename: str = "") -> str:
|
||||
"""
|
||||
从内容和文件名检测账单类型
|
||||
|
||||
Returns:
|
||||
'alipay', 'wechat', 或 ''
|
||||
"""
|
||||
# 从文件名检测
|
||||
filename_lower = filename.lower()
|
||||
if '支付宝' in filename or 'alipay' in filename_lower:
|
||||
return 'alipay'
|
||||
if '微信' in filename or 'wechat' in filename_lower:
|
||||
return 'wechat'
|
||||
|
||||
# 从内容检测
|
||||
# 支付宝特征: 有 "交易分类" 和 "对方账号" 列
|
||||
if '交易分类' in content and '对方账号' in content:
|
||||
return 'alipay'
|
||||
|
||||
# 微信特征: 有 "交易类型" 和 "金额(元)" 列
|
||||
if '交易类型' in content and '金额(元)' in content:
|
||||
return 'wechat'
|
||||
|
||||
return ''
|
||||
|
||||
|
||||
def convert_xlsx_to_csv(xlsx_path: str, csv_path: str) -> Tuple[bool, str]:
|
||||
"""
|
||||
将 xlsx 文件转换为 csv 文件
|
||||
|
||||
Returns:
|
||||
(success, message)
|
||||
"""
|
||||
if not HAS_OPENPYXL:
|
||||
return False, "缺少 openpyxl 库,无法读取 xlsx 文件。请运行: pip install openpyxl"
|
||||
|
||||
try:
|
||||
wb = load_workbook(xlsx_path, read_only=True, data_only=True)
|
||||
ws = wb.active
|
||||
|
||||
with open(csv_path, 'w', encoding='utf-8', newline='') as f:
|
||||
writer = csv.writer(f)
|
||||
for row in ws.iter_rows(values_only=True):
|
||||
# 跳过全空行
|
||||
if all(cell is None for cell in row):
|
||||
continue
|
||||
# 将 None 转换为空字符串
|
||||
writer.writerow(['' if cell is None else str(cell) for cell in row])
|
||||
|
||||
wb.close()
|
||||
return True, "xlsx 转换成功"
|
||||
|
||||
except Exception as e:
|
||||
return False, f"xlsx 转换失败: {str(e)}"
|
||||
|
||||
|
||||
def convert_csv_encoding(input_path: str, output_path: str, source_encoding: str = 'auto') -> Tuple[bool, str]:
|
||||
"""
|
||||
将 csv 文件从 GBK/其他编码转换为 UTF-8
|
||||
|
||||
Returns:
|
||||
(success, message)
|
||||
"""
|
||||
if source_encoding == 'auto':
|
||||
source_encoding = detect_encoding(input_path)
|
||||
|
||||
# 如果已经是 UTF-8,直接复制
|
||||
if source_encoding in ('utf-8', 'utf-8-sig'):
|
||||
if input_path != output_path:
|
||||
import shutil
|
||||
shutil.copy(input_path, output_path)
|
||||
return True, "文件已是 UTF-8 编码"
|
||||
|
||||
try:
|
||||
with open(input_path, 'r', encoding=source_encoding) as f_in:
|
||||
content = f_in.read()
|
||||
|
||||
with open(output_path, 'w', encoding='utf-8', newline='') as f_out:
|
||||
f_out.write(content)
|
||||
|
||||
return True, f"编码转换成功: {source_encoding} -> utf-8"
|
||||
|
||||
except Exception as e:
|
||||
return False, f"编码转换失败: {str(e)}"
|
||||
|
||||
|
||||
def convert_bill_file(input_path: str, output_path: Optional[str] = None) -> Tuple[bool, str, str, str]:
|
||||
"""
|
||||
转换账单文件为标准 CSV 格式(UTF-8 编码)
|
||||
|
||||
支持:
|
||||
- xlsx -> csv 转换
|
||||
- GBK/GB2312 -> UTF-8 编码转换
|
||||
|
||||
Args:
|
||||
input_path: 输入文件路径
|
||||
output_path: 输出文件路径(可选,默认在同目录生成)
|
||||
|
||||
Returns:
|
||||
(success, bill_type, output_path, message)
|
||||
"""
|
||||
input_path = Path(input_path)
|
||||
|
||||
if not input_path.exists():
|
||||
return False, '', '', f"文件不存在: {input_path}"
|
||||
|
||||
# 确定输出路径
|
||||
if output_path is None:
|
||||
# 生成临时文件
|
||||
suffix = '.csv'
|
||||
fd, output_path = tempfile.mkstemp(suffix=suffix)
|
||||
os.close(fd)
|
||||
|
||||
ext = input_path.suffix.lower()
|
||||
bill_type = ''
|
||||
|
||||
if ext == '.xlsx':
|
||||
# xlsx 转换
|
||||
success, message = convert_xlsx_to_csv(str(input_path), output_path)
|
||||
if not success:
|
||||
return False, '', '', message
|
||||
|
||||
# 读取内容检测账单类型
|
||||
with open(output_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read(2000) # 只读取前 2000 字符用于检测
|
||||
bill_type = detect_bill_type_from_content(content, input_path.name)
|
||||
|
||||
elif ext == '.csv':
|
||||
# CSV 编码转换
|
||||
success, message = convert_csv_encoding(str(input_path), output_path)
|
||||
if not success:
|
||||
return False, '', '', message
|
||||
|
||||
# 读取内容检测账单类型
|
||||
with open(output_path, 'r', encoding='utf-8') as f:
|
||||
content = f.read(2000)
|
||||
bill_type = detect_bill_type_from_content(content, input_path.name)
|
||||
|
||||
else:
|
||||
return False, '', '', f"不支持的文件格式: {ext}"
|
||||
|
||||
return True, bill_type, output_path, "转换成功"
|
||||
Reference in New Issue
Block a user