package ads import ( "context" "fmt" "server/internal/consts" "server/internal/dao" "server/internal/model" "server/internal/model/do" "server/internal/service" "server/utility/ecode" "server/utility/encrypt" "server/utility/mqtt" "server/utility/state_machine" "time" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/database/gdb" "github.com/gogf/gf/v2/frame/g" ) type sAds struct { stateMachine *state_machine.AdStateMachine } func New() service.IAds { return &sAds{ stateMachine: state_machine.NewAdStateMachine(), } } func init() { service.RegisterAds(New()) } // Upload 广告数据上传 - 处理用户广告状态流转 func (s *sAds) Upload(ctx context.Context, in *model.AdsUploadIn) (out *model.AdsUploadOut, err error) { // 解密广告数据 adsData, err := encrypt.DecryptAdsData(in.Data) if err != nil { return nil, ecode.Fail.Sub("data_decrypt_failed") } // 验证数据 if adsData.AdsPlatId <= 0 || adsData.AdsCategoryId <= 0 || adsData.AppPackage == "" { return nil, ecode.Params.Sub("invalid_ads_data") } // 使用事务同时操作多个表 err = dao.AdEventLogs.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { // 1. 查找用户该广告的最新记录,确保状态流转是在同一条广告记录上进行的 var fromState consts.AdState var adEventId int64 // 广告事件ID,用于关联状态流转记录 var isNewLifecycle bool // 是否是新的生命周期 // 当前状态 toState := consts.AdState(adsData.Status) // 拉取成功和拉取失败总是新的广告生命周期的开始 if toState == consts.StateFetchSuccess || toState == consts.StateFetchFailed { isNewLifecycle = true fromState = 0 glog.Infof(ctx, "Starting new ad lifecycle with fetch state for user %d, app %s, state: %s", in.UserId, adsData.AppPackage, consts.GetStateDescription(toState)) } else { // 查询条件:同一用户、同一平台、同一广告类型、同一APP包名 latestLog, err := dao.AdEventLogs.Ctx(ctx).TX(tx). Where(do.AdEventLogs{ UserId: in.UserId, AdsPlatId: adsData.AdsPlatId, AdsCategoryId: adsData.AdsCategoryId, AppPackage: adsData.AppPackage, }). Order("created_at DESC"). One() if err == nil && !latestLog.IsEmpty() { fromState = consts.AdState(latestLog["status"].Int()) adEventId = latestLog["id"].Int64() // 检查是否是终止状态,如果是终止状态,则需要创建新的广告记录 if s.stateMachine.IsTerminalState(fromState) { // 如果是终止状态,则表示这是一个新的广告生命周期 isNewLifecycle = true fromState = 0 // 重置状态,表示新的生命周期开始 glog.Infof(ctx, "Starting new ad lifecycle after terminal state for user %d, app %s", in.UserId, adsData.AppPackage) } else { glog.Infof(ctx, "Continuing ad lifecycle for user %d, app %s, from state %d to %d", in.UserId, adsData.AppPackage, fromState, adsData.Status) } } else { // 没有找到记录,这是第一次上传该广告数据 isNewLifecycle = true glog.Infof(ctx, "First ad upload for user %d, app %s", in.UserId, adsData.AppPackage) } } // 2. 使用状态机验证状态转换 flowID := generateFlowID(in.UserId, adsData.AppPackage) // 如果不是新的生命周期,则验证状态转换 if !isNewLifecycle { err = s.stateMachine.Transition(ctx, flowID, in.UserId, fromState, toState, "用户操作") if err != nil { glog.Warningf(ctx, "Invalid state transition: %s -> %s for user %d, app %s", consts.GetStateDescription(fromState), consts.GetStateDescription(toState), in.UserId, adsData.AppPackage) return err } } // 3. 插入或更新广告事件日志 var eventId int64 if isNewLifecycle { // 新的生命周期,创建新记录 eventId, err = dao.AdEventLogs.Ctx(ctx).TX(tx).Data(do.AdEventLogs{ UserId: in.UserId, AdsPlatId: adsData.AdsPlatId, AdsCategoryId: adsData.AdsCategoryId, AppPackage: adsData.AppPackage, Status: adsData.Status, StatusDesc: consts.GetStateDescription(toState), }).InsertAndGetId() if err != nil { return ecode.Fail.Sub("database_save_failed") } } else { // 继续现有生命周期,更新状态 _, err = dao.AdEventLogs.Ctx(ctx).TX(tx). Where("id", adEventId). Data(do.AdEventLogs{ Status: adsData.Status, StatusDesc: consts.GetStateDescription(toState), }). Update() if err != nil { return ecode.Fail.Sub("database_update_failed") } eventId = adEventId } // 4. 插入状态流转记录 var fromStatus interface{} if fromState != 0 { fromStatus = int(fromState) } _, err = dao.AdEventTransitions.Ctx(ctx).TX(tx).Data(do.AdEventTransitions{ EventId: eventId, FromStatus: fromStatus, ToStatus: adsData.Status, }).Insert() if err != nil { return ecode.Fail.Sub("transition_save_failed") } // 5. 如果状态是观看完成(StateWatched),则给用户增加积分 if toState == consts.StateWatched { // 查询任务表中状态为1(启用)且任务类型为2(广告)的任务 taskInfo, err := dao.Tasks.Ctx(ctx).TX(tx). Fields("id, reward_points"). Where("task_type", 2). Where("status", 1). One() if err != nil { glog.Errorf(ctx, "Failed to query ad task: %v", err) return ecode.Fail.Sub("task_query_failed") } if !taskInfo.IsEmpty() { taskId := taskInfo["id"].Int64() rewardPoints := taskInfo["reward_points"].Uint() if rewardPoints > 0 { // 更新用户积分 _, err = dao.Users.Ctx(ctx).TX(tx). Where("id", in.UserId). Increment("points", rewardPoints) if err != nil { glog.Errorf(ctx, "Failed to update user points: %v", err) return ecode.Fail.Sub("user_points_update_failed") } // 记录积分日志 _, err = dao.UserPointsLogs.Ctx(ctx).TX(tx).Data(do.UserPointsLogs{ UserId: in.UserId, ChangeType: 2, // 2=收入(earn) PointsChange: int(rewardPoints), RelatedOrderId: eventId, Description: "观看广告奖励", }).Insert() if err != nil { glog.Errorf(ctx, "Failed to create user points log: %v", err) return ecode.Fail.Sub("user_points_log_create_failed") } // 记录任务日志 _, err = dao.TaskLogs.Ctx(ctx).TX(tx).Data(do.TaskLogs{ TaskId: taskId, UserId: in.UserId, RewardPoints: rewardPoints, }).Insert() if err != nil { glog.Errorf(ctx, "Failed to create task log: %v", err) return ecode.Fail.Sub("task_log_create_failed") } glog.Infof(ctx, "User %d earned %d points for watching ad", in.UserId, rewardPoints) } } else { glog.Warningf(ctx, "No active ad task found") } } return nil }) if err != nil { return nil, err } // 构建MQTT消息 mqttMessage := map[string]interface{}{ "type": "ads", "node_uid": in.NodeUid, "device_code": in.DeviceCode, "data": map[string]interface{}{ "ads_plat_id": adsData.AdsPlatId, "ads_category_id": adsData.AdsCategoryId, "app_package": adsData.AppPackage, "status": adsData.Status, "timestamp": time.Now().Unix(), }, } // 发送到MQTT s.publishToMQTT(ctx, mqttMessage) return &model.AdsUploadOut{Success: true}, nil } // generateFlowID 生成流转ID func generateFlowID(userID int64, appPackage string) string { return fmt.Sprintf("flow_%d_%s_%d", userID, appPackage, time.Now().Unix()) } // publishToMQTT 发送消息到MQTT func (s *sAds) publishToMQTT(ctx context.Context, message map[string]interface{}) { mqttClient := mqtt.GetMQTTClient("amazon_sqs") if mqttClient == nil { glog.Errorf(ctx, "MQTT client not initialized") return } queueName := g.Cfg().MustGet(ctx, "sqs.ads").String() if queueName == "" { glog.Errorf(ctx, "MQTT queue name not configured") return } err := mqttClient.Publish(ctx, queueName, message) if err != nil { glog.Errorf(ctx, "Failed to publish message to MQTT: %v", err) } else { glog.Infof(ctx, "Message published to MQTT: %v", message) } }