feat: server connect mongo

This commit is contained in:
CHE LIANG ZHAO
2026-01-08 23:42:01 +08:00
parent ccd2d0386a
commit c1ffe2e822
17 changed files with 1455 additions and 338 deletions

BIN
server/billai-server.exe Normal file

Binary file not shown.

View File

@@ -7,7 +7,7 @@ server:
# Python 配置
python:
# Python 解释器路径(相对于项目根目录或绝对路径)
path: analyzer/venv/bin/python
path: analyzer/venv/Scripts/python.exe
# 分析脚本路径(相对于项目根目录)
script: analyzer/clean_bill.py
@@ -16,3 +16,16 @@ directories:
upload: server/uploads
output: server/outputs
# MongoDB 配置
mongodb:
# MongoDB 连接 URI带认证
uri: mongodb://admin:password@localhost:27017
# 数据库名称
database: billai
# 集合名称
collections:
# 原始数据集合
raw: bills_raw
# 清洗后数据集合
cleaned: bills_cleaned

View File

@@ -17,6 +17,12 @@ type Config struct {
CleanScript string // 清理脚本路径
UploadDir string // 上传文件目录
OutputDir string // 输出文件目录
// MongoDB 配置
MongoURI string // MongoDB 连接 URI
MongoDatabase string // 数据库名称
MongoRawCollection string // 原始数据集合名称
MongoCleanedCollection string // 清洗后数据集合名称
}
// configFile YAML 配置文件结构
@@ -32,6 +38,14 @@ type configFile struct {
Upload string `yaml:"upload"`
Output string `yaml:"output"`
} `yaml:"directories"`
MongoDB struct {
URI string `yaml:"uri"`
Database string `yaml:"database"`
Collections struct {
Raw string `yaml:"raw"`
Cleaned string `yaml:"cleaned"`
} `yaml:"collections"`
} `yaml:"mongodb"`
}
// Global 全局配置实例
@@ -102,6 +116,12 @@ func Load() {
Global.UploadDir = "server/uploads"
Global.OutputDir = "server/outputs"
// MongoDB 默认值
Global.MongoURI = getEnvOrDefault("MONGO_URI", "mongodb://localhost:27017")
Global.MongoDatabase = getEnvOrDefault("MONGO_DATABASE", "billai")
Global.MongoRawCollection = getEnvOrDefault("MONGO_RAW_COLLECTION", "bills_raw")
Global.MongoCleanedCollection = getEnvOrDefault("MONGO_CLEANED_COLLECTION", "bills_cleaned")
// 查找配置文件
configPath := configFilePath
if !filepath.IsAbs(configPath) {
@@ -128,6 +148,19 @@ func Load() {
if cfg.Directories.Output != "" {
Global.OutputDir = cfg.Directories.Output
}
// MongoDB 配置
if cfg.MongoDB.URI != "" {
Global.MongoURI = cfg.MongoDB.URI
}
if cfg.MongoDB.Database != "" {
Global.MongoDatabase = cfg.MongoDB.Database
}
if cfg.MongoDB.Collections.Raw != "" {
Global.MongoRawCollection = cfg.MongoDB.Collections.Raw
}
if cfg.MongoDB.Collections.Cleaned != "" {
Global.MongoCleanedCollection = cfg.MongoDB.Collections.Cleaned
}
}
// 环境变量覆盖
@@ -140,6 +173,19 @@ func Load() {
if root := os.Getenv("BILLAI_ROOT"); root != "" {
Global.ProjectRoot = root
}
// MongoDB 环境变量覆盖
if uri := os.Getenv("MONGO_URI"); uri != "" {
Global.MongoURI = uri
}
if db := os.Getenv("MONGO_DATABASE"); db != "" {
Global.MongoDatabase = db
}
if rawColl := os.Getenv("MONGO_RAW_COLLECTION"); rawColl != "" {
Global.MongoRawCollection = rawColl
}
if cleanedColl := os.Getenv("MONGO_CLEANED_COLLECTION"); cleanedColl != "" {
Global.MongoCleanedCollection = cleanedColl
}
}
// ResolvePath 解析路径(相对路径转为绝对路径)

72
server/database/mongo.go Normal file
View File

@@ -0,0 +1,72 @@
package database
import (
"context"
"fmt"
"time"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"billai-server/config"
)
var (
// Client MongoDB 客户端实例
Client *mongo.Client
// DB 数据库实例
DB *mongo.Database
// RawBillCollection 原始账单数据集合
RawBillCollection *mongo.Collection
// CleanedBillCollection 清洗后账单数据集合
CleanedBillCollection *mongo.Collection
)
// Connect 连接 MongoDB
func Connect() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 创建客户端选项
clientOptions := options.Client().ApplyURI(config.Global.MongoURI)
// 连接 MongoDB
client, err := mongo.Connect(ctx, clientOptions)
if err != nil {
return fmt.Errorf("连接 MongoDB 失败: %w", err)
}
// 测试连接
if err := client.Ping(ctx, nil); err != nil {
return fmt.Errorf("MongoDB Ping 失败: %w", err)
}
// 设置全局变量
Client = client
DB = client.Database(config.Global.MongoDatabase)
RawBillCollection = DB.Collection(config.Global.MongoRawCollection)
CleanedBillCollection = DB.Collection(config.Global.MongoCleanedCollection)
fmt.Printf("🍃 MongoDB 连接成功: %s\n", config.Global.MongoDatabase)
fmt.Printf(" 📄 原始数据集合: %s\n", config.Global.MongoRawCollection)
fmt.Printf(" 📄 清洗数据集合: %s\n", config.Global.MongoCleanedCollection)
return nil
}
// Disconnect 断开 MongoDB 连接
func Disconnect() error {
if Client == nil {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := Client.Disconnect(ctx); err != nil {
return fmt.Errorf("断开 MongoDB 连接失败: %w", err)
}
fmt.Println("🍃 MongoDB 连接已断开")
return nil
}

View File

@@ -4,6 +4,7 @@ go 1.21
require (
github.com/gin-gonic/gin v1.9.1
go.mongodb.org/mongo-driver v1.13.1
gopkg.in/yaml.v3 v3.0.1
)
@@ -16,18 +17,26 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.14.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.13.6 // indirect
github.com/klauspost/cpuid/v2 v2.2.4 // indirect
github.com/leodido/go-urn v1.2.4 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.11 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
golang.org/x/arch v0.3.0 // indirect
golang.org/x/crypto v0.9.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect

View File

@@ -24,11 +24,16 @@ github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QX
github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5 h1:Khx7svrCpmxxtHBq5j2mp/xVjsi8hQMfNLvJFAlrGgU=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/klauspost/compress v1.13.6 h1:P76CopJELS0TiO2mebmnzgWaajssP/EszplttgQxcgc=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk=
github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY=
@@ -41,6 +46,8 @@ github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/pelletier/go-toml/v2 v2.0.8 h1:0ctb6s9mE31h0/lhu+J6OPmVeDxJn+kYnJc2jZR9tGQ=
github.com/pelletier/go-toml/v2 v2.0.8/go.mod h1:vuYfssBdrU2XDZ9bYydBu6t+6a6PYNcZljzZR9VXg+4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
@@ -60,19 +67,59 @@ github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go/codec v1.2.11 h1:BMaWp1Bb6fHwEtbplGBGJ498wD+LKlNSl25MjdZY4dU=
github.com/ugorji/go/codec v1.2.11/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c=
github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI=
github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY=
github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4=
github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8=
github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA=
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
go.mongodb.org/mongo-driver v1.13.1 h1:YIc7HTYsKndGK4RFzJ3covLz1byri52x0IoMB0Pt/vk=
go.mongodb.org/mongo-driver v1.13.1/go.mod h1:wcDf1JBCXy2mOW0bWHwO/IOYqdca1MPCwDtFu/Z9+eo=
golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/arch v0.3.0 h1:02VY4/ZcO/gBOH6PUaoiptASxtXU10jazRCP865E97k=
golang.org/x/arch v0.3.0/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.9.0 h1:LF6fAI+IutBocDJ2OT0Q1g8plpYljMZ4+lty+dsqw3g=
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=

View File

@@ -5,7 +5,6 @@ import (
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"strings"
"time"
@@ -14,6 +13,7 @@ import (
"billai-server/config"
"billai-server/model"
"billai-server/service"
)
// Upload 处理账单上传和清理请求
@@ -53,7 +53,45 @@ func Upload(c *gin.Context) {
defer dst.Close()
io.Copy(dst, file)
// 4. 构建输出文件路径
// 4. 对原始数据进行去重检查
fmt.Printf("📋 开始去重检查...\n")
dedupResult, dedupErr := service.DeduplicateRawFile(inputPath, timestamp)
if dedupErr != nil {
c.JSON(http.StatusInternalServerError, model.UploadResponse{
Result: false,
Message: "去重检查失败: " + dedupErr.Error(),
})
return
}
// 账单类型从去重结果获取
billType := dedupResult.BillType
fmt.Printf(" 原始记录: %d 条\n", dedupResult.OriginalCount)
if dedupResult.DuplicateCount > 0 {
fmt.Printf(" 重复记录: %d 条(已跳过)\n", dedupResult.DuplicateCount)
}
fmt.Printf(" 新增记录: %d 条\n", dedupResult.NewCount)
// 如果全部重复,返回提示
if dedupResult.NewCount == 0 {
c.JSON(http.StatusOK, model.UploadResponse{
Result: true,
Message: fmt.Sprintf("文件中的 %d 条记录全部已存在,无需重复导入", dedupResult.OriginalCount),
Data: &model.UploadData{
BillType: billType,
RawCount: 0,
CleanedCount: 0,
DuplicateCount: dedupResult.DuplicateCount,
},
})
return
}
// 使用去重后的文件路径进行后续处理
processFilePath := dedupResult.DedupFilePath
// 5. 构建输出文件路径
baseName := strings.TrimSuffix(header.Filename, filepath.Ext(header.Filename))
outputExt := ".csv"
if req.Format == "json" {
@@ -63,57 +101,65 @@ func Upload(c *gin.Context) {
outputDirAbs := config.ResolvePath(config.Global.OutputDir)
outputPath := filepath.Join(outputDirAbs, outputFileName)
// 5. 构建命令参数
cleanScriptAbs := config.ResolvePath(config.Global.CleanScript)
args := []string{cleanScriptAbs, inputPath, outputPath}
if req.Year != "" {
args = append(args, "--year", req.Year)
// 6. 执行 Python 清洗脚本
cleanOpts := &service.CleanOptions{
Year: req.Year,
Month: req.Month,
Start: req.Start,
End: req.End,
Format: req.Format,
}
if req.Month != "" {
args = append(args, "--month", req.Month)
}
if req.Start != "" {
args = append(args, "--start", req.Start)
}
if req.End != "" {
args = append(args, "--end", req.End)
}
if req.Format != "" {
args = append(args, "--format", req.Format)
}
// 6. 执行 Python 脚本
pythonPathAbs := config.ResolvePath(config.Global.PythonPath)
cmd := exec.Command(pythonPathAbs, args...)
cmd.Dir = config.Global.ProjectRoot
output, err := cmd.CombinedOutput()
outputStr := string(output)
if err != nil {
cleanResult, cleanErr := service.RunCleanScript(processFilePath, outputPath, cleanOpts)
if cleanErr != nil {
c.JSON(http.StatusInternalServerError, model.UploadResponse{
Result: false,
Message: "处理失败: " + err.Error(),
Message: cleanErr.Error(),
})
return
}
// 7. 检测账单类型
billType := ""
if strings.Contains(outputStr, "支付宝") {
billType = "alipay"
} else if strings.Contains(outputStr, "微信") {
billType = "wechat"
// 7. 如果去重检测没有识别出类型,从 Python 输出中检测
if billType == "" {
billType = cleanResult.BillType
}
// 8. 将去重后的原始数据存入 MongoDB原始数据集合
rawCount, rawErr := service.SaveRawBillsFromFile(processFilePath, billType, header.Filename, timestamp)
if rawErr != nil {
fmt.Printf("⚠️ 存储原始数据到 MongoDB 失败: %v\n", rawErr)
} else {
fmt.Printf("✅ 已存储 %d 条原始账单记录到 MongoDB\n", rawCount)
}
// 9. 将清洗后的数据存入 MongoDB清洗后数据集合
cleanedCount, _, cleanedErr := service.SaveCleanedBillsFromFile(outputPath, req.Format, billType, header.Filename, timestamp)
if cleanedErr != nil {
fmt.Printf("⚠️ 存储清洗后数据到 MongoDB 失败: %v\n", cleanedErr)
} else {
fmt.Printf("✅ 已存储 %d 条清洗后账单记录到 MongoDB\n", cleanedCount)
}
// 10. 清理临时的去重文件(如果生成了的话)
if dedupResult.DedupFilePath != inputPath && dedupResult.DedupFilePath != "" {
os.Remove(dedupResult.DedupFilePath)
}
// 11. 返回成功响应
message := fmt.Sprintf("处理成功,新增 %d 条记录", cleanedCount)
if dedupResult.DuplicateCount > 0 {
message = fmt.Sprintf("处理成功,新增 %d 条,跳过 %d 条重复记录", cleanedCount, dedupResult.DuplicateCount)
}
// 8. 返回成功响应
c.JSON(http.StatusOK, model.UploadResponse{
Result: true,
Message: "处理成功",
Message: message,
Data: &model.UploadData{
BillType: billType,
FileURL: fmt.Sprintf("/download/%s", outputFileName),
FileName: outputFileName,
BillType: billType,
FileURL: fmt.Sprintf("/download/%s", outputFileName),
FileName: outputFileName,
RawCount: rawCount,
CleanedCount: cleanedCount,
DuplicateCount: dedupResult.DuplicateCount,
},
})
}

View File

@@ -4,10 +4,13 @@ import (
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"github.com/gin-gonic/gin"
"billai-server/config"
"billai-server/database"
"billai-server/handler"
)
@@ -33,12 +36,32 @@ func main() {
fmt.Println(" 请在配置文件中指定正确的 Python 路径")
}
// 连接 MongoDB
if err := database.Connect(); err != nil {
fmt.Printf("⚠️ 警告: MongoDB 连接失败: %v\n", err)
fmt.Println(" 账单数据将不会存储到数据库")
os.Exit(1)
} else {
// 优雅关闭时断开连接
defer database.Disconnect()
}
// 创建路由
r := gin.Default()
// 注册路由
setupRoutes(r, outputDirAbs, pythonPathAbs)
// 监听系统信号
go func() {
quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
fmt.Println("\n🛑 正在关闭服务...")
database.Disconnect()
os.Exit(0)
}()
// 启动服务
printAPIInfo()
r.Run(":" + config.Global.Port)
@@ -74,6 +97,7 @@ func printBanner(pythonPath, uploadDir, outputDir string) {
fmt.Printf("🐍 Python路径: %s\n", pythonPath)
fmt.Printf("📂 上传目录: %s\n", uploadDir)
fmt.Printf("📂 输出目录: %s\n", outputDir)
fmt.Printf("🍃 MongoDB: %s/%s\n", config.Global.MongoURI, config.Global.MongoDatabase)
fmt.Println("━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━")
}

40
server/model/bill.go Normal file
View File

@@ -0,0 +1,40 @@
package model
import (
"time"
"go.mongodb.org/mongo-driver/bson/primitive"
)
// RawBill 原始账单记录(存储上传的原始数据)
type RawBill struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"`
BillType string `bson:"bill_type" json:"bill_type"` // 账单类型: alipay/wechat
SourceFile string `bson:"source_file" json:"source_file"` // 来源文件名
UploadBatch string `bson:"upload_batch" json:"upload_batch"` // 上传批次(时间戳)
RowIndex int `bson:"row_index" json:"row_index"` // 原始行号
RawData map[string]interface{} `bson:"raw_data" json:"raw_data"` // 原始字段数据
CreatedAt time.Time `bson:"created_at" json:"created_at"` // 创建时间
}
// CleanedBill 清洗后账单记录(标准化后的数据)
type CleanedBill struct {
ID primitive.ObjectID `bson:"_id,omitempty" json:"id,omitempty"`
BillType string `bson:"bill_type" json:"bill_type"` // 账单类型: alipay/wechat
TransactionID string `bson:"transaction_id" json:"transaction_id"` // 交易订单号(用于去重)
MerchantOrderNo string `bson:"merchant_order_no" json:"merchant_order_no"` // 商家订单号(用于去重)
Time time.Time `bson:"time" json:"time"` // 交易时间
Category string `bson:"category" json:"category"` // 交易分类
Merchant string `bson:"merchant" json:"merchant"` // 交易对方
Description string `bson:"description" json:"description"` // 商品说明
IncomeExpense string `bson:"income_expense" json:"income_expense"` // 收/支
Amount float64 `bson:"amount" json:"amount"` // 金额
PayMethod string `bson:"pay_method" json:"pay_method"` // 支付方式
Status string `bson:"status" json:"status"` // 交易状态
Remark string `bson:"remark" json:"remark"` // 备注
ReviewLevel string `bson:"review_level" json:"review_level"` // 复核等级: HIGH/LOW/空
CreatedAt time.Time `bson:"created_at" json:"created_at"` // 创建时间
UpdatedAt time.Time `bson:"updated_at" json:"updated_at"` // 更新时间
SourceFile string `bson:"source_file" json:"source_file"` // 来源文件名
UploadBatch string `bson:"upload_batch" json:"upload_batch"` // 上传批次(时间戳)
}

View File

@@ -2,9 +2,12 @@ package model
// UploadData 上传响应数据
type UploadData struct {
BillType string `json:"bill_type,omitempty"` // alipay/wechat
FileURL string `json:"file_url,omitempty"` // 下载链接
FileName string `json:"file_name,omitempty"` // 文件名
BillType string `json:"bill_type,omitempty"` // alipay/wechat
FileURL string `json:"file_url,omitempty"` // 下载链接
FileName string `json:"file_name,omitempty"` // 文件名
RawCount int `json:"raw_count,omitempty"` // 存储到原始数据集合的记录数
CleanedCount int `json:"cleaned_count,omitempty"` // 存储到清洗后数据集合的记录数
DuplicateCount int `json:"duplicate_count,omitempty"` // 重复跳过的记录数
}
// UploadResponse 上传响应

621
server/service/bill.go Normal file
View File

@@ -0,0 +1,621 @@
package service
import (
"context"
"encoding/csv"
"encoding/json"
"fmt"
"os"
"strconv"
"strings"
"time"
"go.mongodb.org/mongo-driver/bson"
"billai-server/database"
"billai-server/model"
)
// SaveResult 存储结果
type SaveResult struct {
RawCount int // 原始数据存储数量
CleanedCount int // 清洗后数据存储数量
DuplicateCount int // 重复数据跳过数量
}
// checkDuplicate 检查记录是否重复
// 优先使用 transaction_id 判断,如果为空则使用 时间+金额+商户 组合判断
func checkDuplicate(ctx context.Context, bill *model.CleanedBill) bool {
var filter bson.M
if bill.TransactionID != "" {
// 优先用交易订单号判断
filter = bson.M{"transaction_id": bill.TransactionID}
} else {
// 回退到 时间+金额+商户 组合判断
filter = bson.M{
"time": bill.Time,
"amount": bill.Amount,
"merchant": bill.Merchant,
}
}
count, err := database.CleanedBillCollection.CountDocuments(ctx, filter)
if err != nil {
return false // 查询出错时不认为是重复
}
return count > 0
}
// DeduplicateResult 去重结果
type DeduplicateResult struct {
OriginalCount int // 原始记录数
DuplicateCount int // 重复记录数
NewCount int // 新记录数
DedupFilePath string // 去重后的文件路径(如果有去重则生成新文件)
BillType string // 检测到的账单类型
}
// DeduplicateRawFile 对原始文件进行去重检查,返回去重后的文件路径
// 如果全部重复,返回错误
func DeduplicateRawFile(filePath, uploadBatch string) (*DeduplicateResult, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, fmt.Errorf("打开文件失败: %w", err)
}
defer file.Close()
reader := csv.NewReader(file)
rows, err := reader.ReadAll()
if err != nil {
return nil, fmt.Errorf("读取 CSV 失败: %w", err)
}
if len(rows) < 2 {
return nil, fmt.Errorf("文件没有数据行")
}
header := rows[0]
dataRows := rows[1:]
// 检测账单类型和去重字段
billType, idFieldIdx := detectBillTypeAndIdField(header)
result := &DeduplicateResult{
OriginalCount: len(dataRows),
BillType: billType,
}
// 如果找不到去重字段,不进行去重,直接返回原文件
if idFieldIdx < 0 {
result.NewCount = len(dataRows)
result.DedupFilePath = filePath
return result, nil
}
// 创建上下文
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 检查每行是否重复
var newRows [][]string
for _, row := range dataRows {
if len(row) <= idFieldIdx {
continue
}
transactionID := strings.TrimSpace(row[idFieldIdx])
if transactionID == "" {
// 没有交易号的行,保留
newRows = append(newRows, row)
continue
}
// 检查是否已存在
count, err := database.RawBillCollection.CountDocuments(ctx, bson.M{
"raw_data." + header[idFieldIdx]: transactionID,
})
if err != nil {
// 查询出错,保留该行
newRows = append(newRows, row)
continue
}
if count == 0 {
// 不重复,保留
newRows = append(newRows, row)
} else {
result.DuplicateCount++
}
}
result.NewCount = len(newRows)
// 如果没有新数据
if len(newRows) == 0 {
result.DedupFilePath = ""
return result, nil
}
// 如果没有重复,直接返回原文件
if result.DuplicateCount == 0 {
result.DedupFilePath = filePath
return result, nil
}
// 有重复,生成去重后的新文件
dedupFilePath := strings.TrimSuffix(filePath, ".csv") + "_dedup.csv"
dedupFile, err := os.Create(dedupFilePath)
if err != nil {
return nil, fmt.Errorf("创建去重文件失败: %w", err)
}
defer dedupFile.Close()
writer := csv.NewWriter(dedupFile)
writer.Write(header) // 写入表头
for _, row := range newRows {
writer.Write(row)
}
writer.Flush()
result.DedupFilePath = dedupFilePath
return result, nil
}
// detectBillTypeAndIdField 检测账单类型和用于去重的字段索引
func detectBillTypeAndIdField(header []string) (billType string, idFieldIdx int) {
idFieldIdx = -1
for i, col := range header {
// 支付宝特征
if col == "交易分类" || col == "对方账号" {
billType = "alipay"
}
// 微信特征
if col == "交易类型" || col == "金额(元)" {
billType = "wechat"
}
// 查找去重字段(优先使用交易订单号/交易号)
if col == "交易订单号" || col == "交易号" || col == "交易单号" {
idFieldIdx = i
}
}
// 如果没找到主要去重字段,尝试商户订单号
if idFieldIdx < 0 {
for i, col := range header {
if col == "商家订单号" || col == "商户单号" || col == "商户订单号" {
idFieldIdx = i
break
}
}
}
return billType, idFieldIdx
}
// SaveRawBillsFromFile 从原始上传文件读取数据并存入原始数据集合
func SaveRawBillsFromFile(filePath, billType, sourceFile, uploadBatch string) (int, error) {
file, err := os.Open(filePath)
if err != nil {
return 0, fmt.Errorf("打开文件失败: %w", err)
}
defer file.Close()
reader := csv.NewReader(file)
rows, err := reader.ReadAll()
if err != nil {
return 0, fmt.Errorf("读取 CSV 失败: %w", err)
}
if len(rows) < 2 {
return 0, nil // 没有数据行
}
// 获取表头
header := rows[0]
now := time.Now()
// 构建原始数据文档
var rawBills []interface{}
for rowIdx, row := range rows[1:] {
rawData := make(map[string]interface{})
for colIdx, col := range header {
if colIdx < len(row) {
// 清理空白字符,确保去重查询能匹配
rawData[col] = strings.TrimSpace(row[colIdx])
}
}
rawBill := model.RawBill{
BillType: billType,
SourceFile: sourceFile,
UploadBatch: uploadBatch,
RowIndex: rowIdx + 1, // 从1开始计数
RawData: rawData,
CreatedAt: now,
}
rawBills = append(rawBills, rawBill)
}
if len(rawBills) == 0 {
return 0, nil
}
// 批量插入原始数据集合
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
result, err := database.RawBillCollection.InsertMany(ctx, rawBills)
if err != nil {
return 0, fmt.Errorf("插入原始数据失败: %w", err)
}
return len(result.InsertedIDs), nil
}
// SaveCleanedBillsFromFile 从清洗后的文件读取数据并存入清洗后数据集合
// 返回: (插入数量, 重复跳过数量, 错误)
func SaveCleanedBillsFromFile(filePath, format, billType, sourceFile, uploadBatch string) (int, int, error) {
if format == "json" {
return saveCleanedBillsFromJSON(filePath, billType, sourceFile, uploadBatch)
}
return saveCleanedBillsFromCSV(filePath, billType, sourceFile, uploadBatch)
}
// saveCleanedBillsFromCSV 从 CSV 文件读取并存储清洗后账单
// 返回: (插入数量, 重复跳过数量, 错误)
func saveCleanedBillsFromCSV(filePath, billType, sourceFile, uploadBatch string) (int, int, error) {
file, err := os.Open(filePath)
if err != nil {
return 0, 0, fmt.Errorf("打开文件失败: %w", err)
}
defer file.Close()
reader := csv.NewReader(file)
rows, err := reader.ReadAll()
if err != nil {
return 0, 0, fmt.Errorf("读取 CSV 失败: %w", err)
}
if len(rows) < 2 {
return 0, 0, nil // 没有数据行
}
// 构建列索引映射
header := rows[0]
colIdx := make(map[string]int)
for i, col := range header {
colIdx[col] = i
}
// 创建上下文用于去重检查
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// 解析数据行
var bills []interface{}
duplicateCount := 0
now := time.Now()
for _, row := range rows[1:] {
bill := model.CleanedBill{
BillType: billType,
SourceFile: sourceFile,
UploadBatch: uploadBatch,
CreatedAt: now,
UpdatedAt: now,
}
// 提取字段 - 订单号(用于去重判断)
if idx, ok := colIdx["交易订单号"]; ok && len(row) > idx {
bill.TransactionID = strings.TrimSpace(row[idx])
} else if idx, ok := colIdx["交易号"]; ok && len(row) > idx {
bill.TransactionID = strings.TrimSpace(row[idx])
}
if idx, ok := colIdx["商家订单号"]; ok && len(row) > idx {
bill.MerchantOrderNo = strings.TrimSpace(row[idx])
} else if idx, ok := colIdx["商户单号"]; ok && len(row) > idx {
bill.MerchantOrderNo = strings.TrimSpace(row[idx])
}
if idx, ok := colIdx["交易时间"]; ok && len(row) > idx {
bill.Time = parseTime(row[idx])
}
if idx, ok := colIdx["交易分类"]; ok && len(row) > idx {
bill.Category = row[idx]
}
if idx, ok := colIdx["交易对方"]; ok && len(row) > idx {
bill.Merchant = row[idx]
}
if idx, ok := colIdx["商品说明"]; ok && len(row) > idx {
bill.Description = row[idx]
}
if idx, ok := colIdx["收/支"]; ok && len(row) > idx {
bill.IncomeExpense = row[idx]
}
if idx, ok := colIdx["金额"]; ok && len(row) > idx {
bill.Amount = parseAmount(row[idx])
}
if idx, ok := colIdx["支付方式"]; ok && len(row) > idx {
bill.PayMethod = row[idx]
}
if idx, ok := colIdx["交易状态"]; ok && len(row) > idx {
bill.Status = row[idx]
}
if idx, ok := colIdx["备注"]; ok && len(row) > idx {
bill.Remark = row[idx]
}
if idx, ok := colIdx["复核等级"]; ok && len(row) > idx {
bill.ReviewLevel = row[idx]
}
// 检查是否重复
if checkDuplicate(ctx, &bill) {
duplicateCount++
continue // 跳过重复记录
}
bills = append(bills, bill)
}
if len(bills) == 0 {
return 0, duplicateCount, nil
}
// 批量插入清洗后数据集合
result, err := database.CleanedBillCollection.InsertMany(ctx, bills)
if err != nil {
return 0, duplicateCount, fmt.Errorf("插入清洗后数据失败: %w", err)
}
return len(result.InsertedIDs), duplicateCount, nil
}
// saveCleanedBillsFromJSON 从 JSON 文件读取并存储清洗后账单
// 返回: (插入数量, 重复跳过数量, 错误)
func saveCleanedBillsFromJSON(filePath, billType, sourceFile, uploadBatch string) (int, int, error) {
file, err := os.Open(filePath)
if err != nil {
return 0, 0, fmt.Errorf("打开文件失败: %w", err)
}
defer file.Close()
var data []map[string]interface{}
decoder := json.NewDecoder(file)
if err := decoder.Decode(&data); err != nil {
return 0, 0, fmt.Errorf("解析 JSON 失败: %w", err)
}
if len(data) == 0 {
return 0, 0, nil
}
// 创建上下文用于去重检查
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
// 解析数据
var bills []interface{}
duplicateCount := 0
now := time.Now()
for _, item := range data {
bill := model.CleanedBill{
BillType: billType,
SourceFile: sourceFile,
UploadBatch: uploadBatch,
CreatedAt: now,
UpdatedAt: now,
}
// 订单号(用于去重判断)
if v, ok := item["交易订单号"].(string); ok {
bill.TransactionID = strings.TrimSpace(v)
} else if v, ok := item["交易号"].(string); ok {
bill.TransactionID = strings.TrimSpace(v)
}
if v, ok := item["商家订单号"].(string); ok {
bill.MerchantOrderNo = strings.TrimSpace(v)
} else if v, ok := item["商户单号"].(string); ok {
bill.MerchantOrderNo = strings.TrimSpace(v)
}
if v, ok := item["交易时间"].(string); ok {
bill.Time = parseTime(v)
}
if v, ok := item["交易分类"].(string); ok {
bill.Category = v
}
if v, ok := item["交易对方"].(string); ok {
bill.Merchant = v
}
if v, ok := item["商品说明"].(string); ok {
bill.Description = v
}
if v, ok := item["收/支"].(string); ok {
bill.IncomeExpense = v
}
if v, ok := item["金额"]; ok {
switch val := v.(type) {
case string:
bill.Amount = parseAmount(val)
case float64:
bill.Amount = val
}
}
if v, ok := item["支付方式"].(string); ok {
bill.PayMethod = v
}
if v, ok := item["交易状态"].(string); ok {
bill.Status = v
}
if v, ok := item["备注"].(string); ok {
bill.Remark = v
}
if v, ok := item["复核等级"].(string); ok {
bill.ReviewLevel = v
}
// 检查是否重复
if checkDuplicate(ctx, &bill) {
duplicateCount++
continue // 跳过重复记录
}
bills = append(bills, bill)
}
if len(bills) == 0 {
return 0, duplicateCount, nil
}
result, err := database.CleanedBillCollection.InsertMany(ctx, bills)
if err != nil {
return 0, duplicateCount, fmt.Errorf("插入清洗后数据失败: %w", err)
}
return len(result.InsertedIDs), duplicateCount, nil
}
// parseTime 解析时间字符串
func parseTime(s string) time.Time {
s = strings.TrimSpace(s)
if s == "" {
return time.Time{}
}
// 尝试多种时间格式
formats := []string{
"2006-01-02 15:04:05",
"2006/01/02 15:04:05",
"2006-01-02 15:04",
"2006/01/02 15:04",
"2006-01-02",
"2006/01/02",
}
for _, format := range formats {
if t, err := time.Parse(format, s); err == nil {
return t
}
}
return time.Time{}
}
// parseAmount 解析金额字符串
func parseAmount(s string) float64 {
s = strings.TrimSpace(s)
s = strings.ReplaceAll(s, ",", "")
s = strings.ReplaceAll(s, "¥", "")
s = strings.ReplaceAll(s, "¥", "")
if amount, err := strconv.ParseFloat(s, 64); err == nil {
return amount
}
return 0
}
// GetCleanedBillsByBatch 根据批次获取清洗后账单
func GetCleanedBillsByBatch(uploadBatch string) ([]model.CleanedBill, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cursor, err := database.CleanedBillCollection.Find(ctx, bson.M{"upload_batch": uploadBatch})
if err != nil {
return nil, fmt.Errorf("查询失败: %w", err)
}
defer cursor.Close(ctx)
var bills []model.CleanedBill
if err := cursor.All(ctx, &bills); err != nil {
return nil, fmt.Errorf("解析结果失败: %w", err)
}
return bills, nil
}
// GetRawBillsByBatch 根据批次获取原始账单
func GetRawBillsByBatch(uploadBatch string) ([]model.RawBill, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
cursor, err := database.RawBillCollection.Find(ctx, bson.M{"upload_batch": uploadBatch})
if err != nil {
return nil, fmt.Errorf("查询失败: %w", err)
}
defer cursor.Close(ctx)
var bills []model.RawBill
if err := cursor.All(ctx, &bills); err != nil {
return nil, fmt.Errorf("解析结果失败: %w", err)
}
return bills, nil
}
// GetBillStats 获取账单统计信息
func GetBillStats() (map[string]interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 原始数据总数
rawTotal, err := database.RawBillCollection.CountDocuments(ctx, bson.M{})
if err != nil {
return nil, err
}
// 清洗后数据总数
cleanedTotal, err := database.CleanedBillCollection.CountDocuments(ctx, bson.M{})
if err != nil {
return nil, err
}
// 支出总额(从清洗后数据统计)
expensePipeline := []bson.M{
{"$match": bson.M{"income_expense": "支出"}},
{"$group": bson.M{"_id": nil, "total": bson.M{"$sum": "$amount"}}},
}
expenseCursor, err := database.CleanedBillCollection.Aggregate(ctx, expensePipeline)
if err != nil {
return nil, err
}
defer expenseCursor.Close(ctx)
var expenseResult []bson.M
expenseCursor.All(ctx, &expenseResult)
totalExpense := 0.0
if len(expenseResult) > 0 {
if v, ok := expenseResult[0]["total"].(float64); ok {
totalExpense = v
}
}
// 收入总额(从清洗后数据统计)
incomePipeline := []bson.M{
{"$match": bson.M{"income_expense": "收入"}},
{"$group": bson.M{"_id": nil, "total": bson.M{"$sum": "$amount"}}},
}
incomeCursor, err := database.CleanedBillCollection.Aggregate(ctx, incomePipeline)
if err != nil {
return nil, err
}
defer incomeCursor.Close(ctx)
var incomeResult []bson.M
incomeCursor.All(ctx, &incomeResult)
totalIncome := 0.0
if len(incomeResult) > 0 {
if v, ok := incomeResult[0]["total"].(float64); ok {
totalIncome = v
}
}
return map[string]interface{}{
"raw_records": rawTotal,
"cleaned_records": cleanedTotal,
"total_expense": totalExpense,
"total_income": totalIncome,
}, nil
}

84
server/service/cleaner.go Normal file
View File

@@ -0,0 +1,84 @@
package service
import (
"fmt"
"os/exec"
"strings"
"billai-server/config"
)
// CleanOptions 清洗选项
type CleanOptions struct {
Year string // 年份筛选
Month string // 月份筛选
Start string // 起始日期
End string // 结束日期
Format string // 输出格式: csv/json
}
// CleanResult 清洗结果
type CleanResult struct {
BillType string // 检测到的账单类型: alipay/wechat
Output string // Python 脚本输出
}
// RunCleanScript 执行 Python 清洗脚本
// inputPath: 输入文件路径
// outputPath: 输出文件路径
// opts: 清洗选项
func RunCleanScript(inputPath, outputPath string, opts *CleanOptions) (*CleanResult, error) {
// 构建命令参数
cleanScriptAbs := config.ResolvePath(config.Global.CleanScript)
args := []string{cleanScriptAbs, inputPath, outputPath}
if opts != nil {
if opts.Year != "" {
args = append(args, "--year", opts.Year)
}
if opts.Month != "" {
args = append(args, "--month", opts.Month)
}
if opts.Start != "" {
args = append(args, "--start", opts.Start)
}
if opts.End != "" {
args = append(args, "--end", opts.End)
}
if opts.Format != "" {
args = append(args, "--format", opts.Format)
}
}
// 执行 Python 脚本
fmt.Printf("🐍 执行清洗脚本...\n")
pythonPathAbs := config.ResolvePath(config.Global.PythonPath)
cmd := exec.Command(pythonPathAbs, args...)
cmd.Dir = config.Global.ProjectRoot
output, err := cmd.CombinedOutput()
outputStr := string(output)
if err != nil {
return nil, fmt.Errorf("清洗脚本执行失败: %w\n输出: %s", err, outputStr)
}
// 从输出中检测账单类型
billType := DetectBillTypeFromOutput(outputStr)
return &CleanResult{
BillType: billType,
Output: outputStr,
}, nil
}
// DetectBillTypeFromOutput 从 Python 脚本输出中检测账单类型
func DetectBillTypeFromOutput(output string) string {
if strings.Contains(output, "支付宝") {
return "alipay"
}
if strings.Contains(output, "微信") {
return "wechat"
}
return ""
}