package service import ( "context" "encoding/csv" "encoding/json" "fmt" "os" "strconv" "strings" "time" "go.mongodb.org/mongo-driver/bson" "billai-server/database" "billai-server/model" ) // SaveResult 存储结果 type SaveResult struct { RawCount int // 原始数据存储数量 CleanedCount int // 清洗后数据存储数量 DuplicateCount int // 重复数据跳过数量 } // checkDuplicate 检查记录是否重复 // 优先使用 transaction_id 判断,如果为空则使用 时间+金额+商户 组合判断 func checkDuplicate(ctx context.Context, bill *model.CleanedBill) bool { var filter bson.M if bill.TransactionID != "" { // 优先用交易订单号判断 filter = bson.M{"transaction_id": bill.TransactionID} } else { // 回退到 时间+金额+商户 组合判断 filter = bson.M{ "time": bill.Time.Time(), // 转换为 time.Time 用于 MongoDB 查询 "amount": bill.Amount, "merchant": bill.Merchant, } } count, err := database.CleanedBillCollection.CountDocuments(ctx, filter) if err != nil { return false // 查询出错时不认为是重复 } return count > 0 } // DeduplicateResult 去重结果 type DeduplicateResult struct { OriginalCount int // 原始记录数 DuplicateCount int // 重复记录数 NewCount int // 新记录数 DedupFilePath string // 去重后的文件路径(如果有去重则生成新文件) BillType string // 检测到的账单类型 } // DeduplicateRawFile 对原始文件进行去重检查,返回去重后的文件路径 // 如果全部重复,返回错误 func DeduplicateRawFile(filePath, uploadBatch string) (*DeduplicateResult, error) { file, err := os.Open(filePath) if err != nil { return nil, fmt.Errorf("打开文件失败: %w", err) } defer file.Close() reader := csv.NewReader(file) rows, err := reader.ReadAll() if err != nil { return nil, fmt.Errorf("读取 CSV 失败: %w", err) } if len(rows) < 2 { return nil, fmt.Errorf("文件没有数据行") } header := rows[0] dataRows := rows[1:] // 检测账单类型和去重字段 billType, idFieldIdx := detectBillTypeAndIdField(header) result := &DeduplicateResult{ OriginalCount: len(dataRows), BillType: billType, } // 如果找不到去重字段,不进行去重,直接返回原文件 if idFieldIdx < 0 { result.NewCount = len(dataRows) result.DedupFilePath = filePath return result, nil } // 创建上下文 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // 检查每行是否重复 var newRows [][]string for _, row := range dataRows { if len(row) <= idFieldIdx { continue } transactionID := strings.TrimSpace(row[idFieldIdx]) if transactionID == "" { // 没有交易号的行,保留 newRows = append(newRows, row) continue } // 检查是否已存在 count, err := database.RawBillCollection.CountDocuments(ctx, bson.M{ "raw_data." + header[idFieldIdx]: transactionID, }) if err != nil { // 查询出错,保留该行 newRows = append(newRows, row) continue } if count == 0 { // 不重复,保留 newRows = append(newRows, row) } else { result.DuplicateCount++ } } result.NewCount = len(newRows) // 如果没有新数据 if len(newRows) == 0 { result.DedupFilePath = "" return result, nil } // 如果没有重复,直接返回原文件 if result.DuplicateCount == 0 { result.DedupFilePath = filePath return result, nil } // 有重复,生成去重后的新文件 dedupFilePath := strings.TrimSuffix(filePath, ".csv") + "_dedup.csv" dedupFile, err := os.Create(dedupFilePath) if err != nil { return nil, fmt.Errorf("创建去重文件失败: %w", err) } defer dedupFile.Close() writer := csv.NewWriter(dedupFile) writer.Write(header) // 写入表头 for _, row := range newRows { writer.Write(row) } writer.Flush() result.DedupFilePath = dedupFilePath return result, nil } // detectBillTypeAndIdField 检测账单类型和用于去重的字段索引 func detectBillTypeAndIdField(header []string) (billType string, idFieldIdx int) { idFieldIdx = -1 for i, col := range header { // 支付宝特征 if col == "交易分类" || col == "对方账号" { billType = "alipay" } // 微信特征 if col == "交易类型" || col == "金额(元)" { billType = "wechat" } // 查找去重字段(优先使用交易订单号/交易号) if col == "交易订单号" || col == "交易号" || col == "交易单号" { idFieldIdx = i } } // 如果没找到主要去重字段,尝试商户订单号 if idFieldIdx < 0 { for i, col := range header { if col == "商家订单号" || col == "商户单号" || col == "商户订单号" { idFieldIdx = i break } } } return billType, idFieldIdx } // SaveRawBillsFromFile 从原始上传文件读取数据并存入原始数据集合 func SaveRawBillsFromFile(filePath, billType, sourceFile, uploadBatch string) (int, error) { file, err := os.Open(filePath) if err != nil { return 0, fmt.Errorf("打开文件失败: %w", err) } defer file.Close() reader := csv.NewReader(file) rows, err := reader.ReadAll() if err != nil { return 0, fmt.Errorf("读取 CSV 失败: %w", err) } if len(rows) < 2 { return 0, nil // 没有数据行 } // 获取表头 header := rows[0] now := time.Now() // 构建原始数据文档 var rawBills []interface{} for rowIdx, row := range rows[1:] { rawData := make(map[string]interface{}) for colIdx, col := range header { if colIdx < len(row) { // 清理空白字符,确保去重查询能匹配 rawData[col] = strings.TrimSpace(row[colIdx]) } } rawBill := model.RawBill{ BillType: billType, SourceFile: sourceFile, UploadBatch: uploadBatch, RowIndex: rowIdx + 1, // 从1开始计数 RawData: rawData, CreatedAt: now, } rawBills = append(rawBills, rawBill) } if len(rawBills) == 0 { return 0, nil } // 批量插入原始数据集合 ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() result, err := database.RawBillCollection.InsertMany(ctx, rawBills) if err != nil { return 0, fmt.Errorf("插入原始数据失败: %w", err) } return len(result.InsertedIDs), nil } // SaveCleanedBillsFromFile 从清洗后的文件读取数据并存入清洗后数据集合 // 返回: (插入数量, 重复跳过数量, 错误) func SaveCleanedBillsFromFile(filePath, format, billType, sourceFile, uploadBatch string) (int, int, error) { if format == "json" { return saveCleanedBillsFromJSON(filePath, billType, sourceFile, uploadBatch) } return saveCleanedBillsFromCSV(filePath, billType, sourceFile, uploadBatch) } // saveCleanedBillsFromCSV 从 CSV 文件读取并存储清洗后账单 // 返回: (插入数量, 重复跳过数量, 错误) func saveCleanedBillsFromCSV(filePath, billType, sourceFile, uploadBatch string) (int, int, error) { file, err := os.Open(filePath) if err != nil { return 0, 0, fmt.Errorf("打开文件失败: %w", err) } defer file.Close() reader := csv.NewReader(file) rows, err := reader.ReadAll() if err != nil { return 0, 0, fmt.Errorf("读取 CSV 失败: %w", err) } if len(rows) < 2 { return 0, 0, nil // 没有数据行 } // 构建列索引映射 header := rows[0] colIdx := make(map[string]int) for i, col := range header { colIdx[col] = i } // 创建上下文用于去重检查 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() // 解析数据行 var bills []interface{} duplicateCount := 0 now := time.Now() for _, row := range rows[1:] { bill := model.CleanedBill{ BillType: billType, SourceFile: sourceFile, UploadBatch: uploadBatch, CreatedAt: now, UpdatedAt: now, } // 提取字段 - 订单号(用于去重判断) if idx, ok := colIdx["交易订单号"]; ok && len(row) > idx { bill.TransactionID = strings.TrimSpace(row[idx]) } else if idx, ok := colIdx["交易单号"]; ok && len(row) > idx { bill.TransactionID = strings.TrimSpace(row[idx]) } if idx, ok := colIdx["商家订单号"]; ok && len(row) > idx { bill.MerchantOrderNo = strings.TrimSpace(row[idx]) } else if idx, ok := colIdx["商户单号"]; ok && len(row) > idx { bill.MerchantOrderNo = strings.TrimSpace(row[idx]) } if idx, ok := colIdx["交易时间"]; ok && len(row) > idx { bill.Time = parseTime(row[idx]) } if idx, ok := colIdx["交易分类"]; ok && len(row) > idx { bill.Category = row[idx] } else if idx, ok := colIdx["交易类型"]; ok && len(row) > idx { bill.Category = row[idx] } if idx, ok := colIdx["交易对方"]; ok && len(row) > idx { bill.Merchant = row[idx] } if idx, ok := colIdx["商品说明"]; ok && len(row) > idx { bill.Description = row[idx] } else if idx, ok := colIdx["商品"]; ok && len(row) > idx { bill.Description = row[idx] } if idx, ok := colIdx["收/支"]; ok && len(row) > idx { bill.IncomeExpense = row[idx] } if idx, ok := colIdx["金额"]; ok && len(row) > idx { bill.Amount = parseAmount(row[idx]) } else if idx, ok := colIdx["金额(元)"]; ok && len(row) > idx { bill.Amount = parseAmount(row[idx]) } if idx, ok := colIdx["收/付款方式"]; ok && len(row) > idx { bill.PayMethod = row[idx] } else if idx, ok := colIdx["支付方式"]; ok && len(row) > idx { bill.PayMethod = row[idx] } if idx, ok := colIdx["交易状态"]; ok && len(row) > idx { bill.Status = row[idx] } else if idx, ok := colIdx["当前状态"]; ok && len(row) > idx { bill.Status = row[idx] } if idx, ok := colIdx["备注"]; ok && len(row) > idx { bill.Remark = row[idx] } if idx, ok := colIdx["复核等级"]; ok && len(row) > idx { bill.ReviewLevel = row[idx] } // 检查是否重复 if checkDuplicate(ctx, &bill) { duplicateCount++ continue // 跳过重复记录 } bills = append(bills, bill) } if len(bills) == 0 { return 0, duplicateCount, nil } // 批量插入清洗后数据集合 result, err := database.CleanedBillCollection.InsertMany(ctx, bills) if err != nil { return 0, duplicateCount, fmt.Errorf("插入清洗后数据失败: %w", err) } return len(result.InsertedIDs), duplicateCount, nil } // saveCleanedBillsFromJSON 从 JSON 文件读取并存储清洗后账单 // 返回: (插入数量, 重复跳过数量, 错误) func saveCleanedBillsFromJSON(filePath, billType, sourceFile, uploadBatch string) (int, int, error) { file, err := os.Open(filePath) if err != nil { return 0, 0, fmt.Errorf("打开文件失败: %w", err) } defer file.Close() var data []map[string]interface{} decoder := json.NewDecoder(file) if err := decoder.Decode(&data); err != nil { return 0, 0, fmt.Errorf("解析 JSON 失败: %w", err) } if len(data) == 0 { return 0, 0, nil } // 创建上下文用于去重检查 ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() // 解析数据 var bills []interface{} duplicateCount := 0 now := time.Now() for _, item := range data { bill := model.CleanedBill{ BillType: billType, SourceFile: sourceFile, UploadBatch: uploadBatch, CreatedAt: now, UpdatedAt: now, } // 订单号(用于去重判断) if v, ok := item["交易订单号"].(string); ok { bill.TransactionID = strings.TrimSpace(v) } else if v, ok := item["交易号"].(string); ok { bill.TransactionID = strings.TrimSpace(v) } if v, ok := item["商家订单号"].(string); ok { bill.MerchantOrderNo = strings.TrimSpace(v) } else if v, ok := item["商户单号"].(string); ok { bill.MerchantOrderNo = strings.TrimSpace(v) } if v, ok := item["交易时间"].(string); ok { bill.Time = parseTime(v) } if v, ok := item["交易分类"].(string); ok { bill.Category = v } if v, ok := item["交易对方"].(string); ok { bill.Merchant = v } if v, ok := item["商品说明"].(string); ok { bill.Description = v } if v, ok := item["收/支"].(string); ok { bill.IncomeExpense = v } if v, ok := item["金额"]; ok { switch val := v.(type) { case string: bill.Amount = parseAmount(val) case float64: bill.Amount = val } } if v, ok := item["支付方式"].(string); ok { bill.PayMethod = v } if v, ok := item["交易状态"].(string); ok { bill.Status = v } if v, ok := item["备注"].(string); ok { bill.Remark = v } if v, ok := item["复核等级"].(string); ok { bill.ReviewLevel = v } // 检查是否重复 if checkDuplicate(ctx, &bill) { duplicateCount++ continue // 跳过重复记录 } bills = append(bills, bill) } if len(bills) == 0 { return 0, duplicateCount, nil } result, err := database.CleanedBillCollection.InsertMany(ctx, bills) if err != nil { return 0, duplicateCount, fmt.Errorf("插入清洗后数据失败: %w", err) } return len(result.InsertedIDs), duplicateCount, nil } // parseTime 解析时间字符串 // 使用本地时区解析,返回 model.LocalTime 类型 // 支持支付宝格式: 2026/1/13 20:08 (月份和日期可能没有前导零) func parseTime(s string) model.LocalTime { s = strings.TrimSpace(s) if s == "" { return model.LocalTime(time.Time{}) } // 先尝试标准化支付宝格式(将单数日期/月份补零) // 例如: "2026/1/13 20:08" -> "2026/01/13 20:08" if strings.Contains(s, "/") && !strings.Contains(s, "-") { // 匹配格式: YYYY/M/D 或 YYYY/M/D HH:mm 或 YYYY/M/D HH:mm:ss parts := strings.Split(s, " ") if len(parts) > 0 { datePart := parts[0] // 使用正则表达式将单数日期/月份补零 // 例如: "2026/1/13" -> "2026/01/13" dateParts := strings.Split(datePart, "/") if len(dateParts) == 3 { year := dateParts[0] month := dateParts[1] day := dateParts[2] // 补零 if len(month) == 1 { month = "0" + month } if len(day) == 1 { day = "0" + day } datePart = year + "/" + month + "/" + day if len(parts) > 1 { s = datePart + " " + strings.Join(parts[1:], " ") } else { s = datePart } } } } // 尝试多种时间格式(使用本地时区) formats := []string{ "2006-01-02 15:04:05", "2006/01/02 15:04:05", "2006-01-02 15:04", "2006/01/02 15:04", "2006-01-02", "2006/01/02", } for _, format := range formats { if t, err := time.ParseInLocation(format, s, time.Local); err == nil { return model.LocalTime(t) } } return model.LocalTime(time.Time{}) } // parseAmount 解析金额字符串 func parseAmount(s string) float64 { s = strings.TrimSpace(s) s = strings.ReplaceAll(s, ",", "") s = strings.ReplaceAll(s, "¥", "") s = strings.ReplaceAll(s, "¥", "") if amount, err := strconv.ParseFloat(s, 64); err == nil { return amount } return 0 } // GetCleanedBillsByBatch 根据批次获取清洗后账单 func GetCleanedBillsByBatch(uploadBatch string) ([]model.CleanedBill, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cursor, err := database.CleanedBillCollection.Find(ctx, bson.M{"upload_batch": uploadBatch}) if err != nil { return nil, fmt.Errorf("查询失败: %w", err) } defer cursor.Close(ctx) var bills []model.CleanedBill if err := cursor.All(ctx, &bills); err != nil { return nil, fmt.Errorf("解析结果失败: %w", err) } return bills, nil } // GetRawBillsByBatch 根据批次获取原始账单 func GetRawBillsByBatch(uploadBatch string) ([]model.RawBill, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() cursor, err := database.RawBillCollection.Find(ctx, bson.M{"upload_batch": uploadBatch}) if err != nil { return nil, fmt.Errorf("查询失败: %w", err) } defer cursor.Close(ctx) var bills []model.RawBill if err := cursor.All(ctx, &bills); err != nil { return nil, fmt.Errorf("解析结果失败: %w", err) } return bills, nil } // GetBillStats 获取账单统计信息 func GetBillStats() (map[string]interface{}, error) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() // 原始数据总数 rawTotal, err := database.RawBillCollection.CountDocuments(ctx, bson.M{}) if err != nil { return nil, err } // 清洗后数据总数 cleanedTotal, err := database.CleanedBillCollection.CountDocuments(ctx, bson.M{}) if err != nil { return nil, err } // 支出总额(从清洗后数据统计) expensePipeline := []bson.M{ {"$match": bson.M{"income_expense": "支出"}}, {"$group": bson.M{"_id": nil, "total": bson.M{"$sum": "$amount"}}}, } expenseCursor, err := database.CleanedBillCollection.Aggregate(ctx, expensePipeline) if err != nil { return nil, err } defer expenseCursor.Close(ctx) var expenseResult []bson.M expenseCursor.All(ctx, &expenseResult) totalExpense := 0.0 if len(expenseResult) > 0 { if v, ok := expenseResult[0]["total"].(float64); ok { totalExpense = v } } // 收入总额(从清洗后数据统计) incomePipeline := []bson.M{ {"$match": bson.M{"income_expense": "收入"}}, {"$group": bson.M{"_id": nil, "total": bson.M{"$sum": "$amount"}}}, } incomeCursor, err := database.CleanedBillCollection.Aggregate(ctx, incomePipeline) if err != nil { return nil, err } defer incomeCursor.Close(ctx) var incomeResult []bson.M incomeCursor.All(ctx, &incomeResult) totalIncome := 0.0 if len(incomeResult) > 0 { if v, ok := incomeResult[0]["total"].(float64); ok { totalIncome = v } } return map[string]interface{}{ "raw_records": rawTotal, "cleaned_records": cleanedTotal, "total_expense": totalExpense, "total_income": totalIncome, }, nil }