package service import ( "context" "fmt" "opencatd-open/team/dao" dto "opencatd-open/team/dto/team" "opencatd-open/team/model" "time" "log" "gorm.io/gorm" "gorm.io/gorm/clause" ) var _ UsageService = (*usageService)(nil) type UsageService interface { // AsyncProcessUsage 异步处理使用记录 AsyncProcessUsage(usage *model.Usage) ListByUserID(ctx context.Context, userID int64, limit, offset int) ([]*model.Usage, error) ListByCapability(ctx context.Context, capability string, limit, offset int) ([]*model.Usage, error) ListByDateRange(ctx context.Context, start, end time.Time, filters map[string]interface{}) ([]*dto.UsageInfo, error) Delete(ctx context.Context, id int64) error } type usageService struct { db *gorm.DB usageDAO dao.UsageRepository dailyUsageDAO dao.DailyUsageRepository usageChan chan *model.Usage // 用于异步处理的channel ctx context.Context } func NewUsageService(ctx context.Context, db *gorm.DB, usageRepo dao.UsageRepository, dailyUsageRepo dao.DailyUsageRepository) UsageService { srv := &usageService{ db: db, usageDAO: usageRepo, dailyUsageDAO: dailyUsageRepo, usageChan: make(chan *model.Usage, 1000), // 设置合适的缓冲区大小 ctx: ctx, } // 启动异步处理goroutine go srv.processUsageWorker() return srv } func (s *usageService) AsyncProcessUsage(usage *model.Usage) { select { case s.usageChan <- usage: // 成功发送到channel default: // channel已满,记录错误日志 log.Println("usage channel is full, skip processing") } } func (s *usageService) processUsageWorker() { for { select { case usage := <-s.usageChan: err := s.processUsage(usage) if err != nil { log.Println("processUsage error:", err) } case <-s.ctx.Done(): log.Println("processUsageWorker is exiting") return } } } // processUsageWorker 异步处理worker func (s *usageService) processUsage(usage *model.Usage) error { err := s.db.Transaction(func(tx *gorm.DB) error { // 1. 记录使用记录 if err := tx.WithContext(s.ctx).Create(usage).Error; err != nil { return fmt.Errorf("create usage error: %w", err) } // 2. 更新每日统计(upsert 操作) dailyUsage := model.DailyUsage{ UserID: usage.UserID, TokenID: usage.TokenID, Capability: usage.Capability, Date: time.Date(usage.Date.Year(), usage.Date.Month(), usage.Date.Day(), 0, 0, 0, 0, usage.Date.Location()), Model: usage.Model, Stream: usage.Stream, PromptTokens: usage.PromptTokens, CompletionTokens: usage.CompletionTokens, TotalTokens: usage.TotalTokens, Cost: usage.Cost, } // 使用 OnConflict 实现 upsert if err := tx.WithContext(s.ctx).Clauses(clause.OnConflict{ Columns: []clause.Column{{Name: "user_id"}, {Name: "token_id"}, {Name: "capability"}, {Name: "date"}}, // 唯一键 DoUpdates: clause.Assignments(map[string]interface{}{ "prompt_tokens": gorm.Expr("prompt_tokens + ?", usage.PromptTokens), "completion_tokens": gorm.Expr("completion_tokens + ?", usage.CompletionTokens), "total_tokens": gorm.Expr("total_tokens + ?", usage.TotalTokens), "cost": gorm.Expr("cost + ?", usage.Cost), }), }).Create(&dailyUsage).Error; err != nil { return fmt.Errorf("upsert daily usage error: %w", err) } // 3. 更新用户额度 if err := tx.WithContext(s.ctx).Model(&model.User{}).Where("id = ?", usage.UserID).Update("quota", gorm.Expr("quota - ?", usage.Cost)).Error; err != nil { return fmt.Errorf("update user quota error: %w", err) } return nil }) return err } func (s *usageService) ListByUserID(ctx context.Context, userID int64, limit int, offset int) ([]*model.Usage, error) { return s.usageDAO.ListByUserID(ctx, userID, limit, offset) } func (s *usageService) ListByCapability(ctx context.Context, capability string, limit, offset int) ([]*model.Usage, error) { return s.usageDAO.ListByCapability(ctx, capability, limit, offset) } func (s *usageService) ListByDateRange(ctx context.Context, start, end time.Time, filters map[string]interface{}) ([]*dto.UsageInfo, error) { return s.dailyUsageDAO.StatUserUsages(ctx, start, end, filters) } func (s *usageService) Delete(ctx context.Context, id int64) error { return s.usageDAO.Delete(ctx, id) }