diff --git a/api/reward/reward.go b/api/reward/reward.go index 55a29d6..8379a54 100644 --- a/api/reward/reward.go +++ b/api/reward/reward.go @@ -23,4 +23,5 @@ type IRewardV1 interface { GetGoodsDetails(ctx context.Context, req *v1.GetGoodsDetailsReq) (res *v1.GetGoodsDetailsRes, err error) OperateTaskReward(ctx context.Context, req *v1.OperateTaskRewardReq) (res *v1.OperateTaskRewardRes, err error) GetUserRewardsCanClaimList(ctx context.Context, req *v1.GetUserRewardsCanClaimListReq) (res *v1.GetUserRewardsCanClaimListRes, err error) + NetfeeCallback(ctx context.Context, req *v1.NetfeeCallbackReq) (res *v1.NetfeeCallbackRes, err error) } diff --git a/api/reward/v1/reward.go b/api/reward/v1/reward.go index d303b8d..645a09b 100644 --- a/api/reward/v1/reward.go +++ b/api/reward/v1/reward.go @@ -314,3 +314,11 @@ type GetUserRewardsCanClaimListRes struct { List interface{} `json:"list"` Total int `json:"total"` } + +type NetfeeCallbackReq struct { + g.Meta `path:"/reward/netfeeCallback" method:"post" tags:"Reward" summary:"(8圈)回调"` + OrderId string `json:"order_id"` +} +type NetfeeCallbackRes struct { + Success bool `json:"success" dc:"是否成功"` +} diff --git a/internal/consts/emqx.go b/internal/consts/emqx.go new file mode 100644 index 0000000..9295b1d --- /dev/null +++ b/internal/consts/emqx.go @@ -0,0 +1,20 @@ +package consts + +// UP 上行消息 cmd 常量 +const ( + CmdMemberLevels = 1 // 会员等级数据 + CmdClientList = 2 // 客户机列表数据 + CmdClientUp = 104 // 上机记录 + CmdClientDown = 106 // 下机记录 +) + +// DOWN 下行消息 cmd 常量 +const ( + CmdDesktopSetting = 10001 // 桌面组件显示设置 + CmdUserFee = 10002 // 用户网费下发 +) + +const ( + UPDataTopic = "/+/up" + DOWNDataTopic = "/%d/down" +) diff --git a/internal/consts/rewardType.go b/internal/consts/rewardType.go new file mode 100644 index 0000000..809c876 --- /dev/null +++ b/internal/consts/rewardType.go @@ -0,0 +1,5 @@ +package consts + +const ( + NetfeeCode = "internet_fee" +) diff --git a/internal/controller/reward/reward_v1_netfee_callback.go b/internal/controller/reward/reward_v1_netfee_callback.go new file mode 100644 index 0000000..2931a6a --- /dev/null +++ b/internal/controller/reward/reward_v1_netfee_callback.go @@ -0,0 +1,17 @@ +package reward + +import ( + "context" + "server/internal/model" + "server/internal/service" + + "server/api/reward/v1" +) + +func (c *ControllerV1) NetfeeCallback(ctx context.Context, req *v1.NetfeeCallbackReq) (res *v1.NetfeeCallbackRes, err error) { + out, err := service.Reward().NetfeeCallback(ctx, &model.NetfeeCallbackIn{OrderId: req.OrderId}) + if err != nil { + return + } + return &v1.NetfeeCallbackRes{Success: out.Success}, nil +} diff --git a/internal/dao/internal/users.go b/internal/dao/internal/users.go index 55d0eda..e643d7f 100644 --- a/internal/dao/internal/users.go +++ b/internal/dao/internal/users.go @@ -37,6 +37,7 @@ type UsersColumns struct { RoleId string // 角色ID LastLoginStoreId string // 上次登录门店ID Quan8Uuid string // 8圈使用的 uuid + XyUserId string // 系统唯一用户ID } // usersColumns holds the columns for the table users. @@ -58,6 +59,7 @@ var usersColumns = UsersColumns{ RoleId: "role_id", LastLoginStoreId: "last_login_store_id", Quan8Uuid: "quan8_uuid", + XyUserId: "xy_user_id", } // NewUsersDao creates and returns a new DAO object for table data access. diff --git a/internal/logic/reward/reward.go b/internal/logic/reward/reward.go index 9cda271..2a43ac2 100644 --- a/internal/logic/reward/reward.go +++ b/internal/logic/reward/reward.go @@ -7,6 +7,7 @@ import ( "github.com/gogf/gf/v2/database/gdb" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gtime" + "github.com/gogf/gf/v2/util/gconv" "github.com/gogf/gf/v2/util/guid" "server/internal/consts" "server/internal/dao" @@ -15,6 +16,8 @@ import ( "server/internal/service" "server/utility/ecode" "server/utility/gamelife" + "server/utility/mqtt" + "server/utility/mqtt/emqx" "strconv" "time" ) @@ -657,7 +660,7 @@ func (s *sReward) GetLift(ctx context.Context, in *model.GetRewardIn) (out *mode } } - if in.Source == 1 && in.RewradTypeId == 37 || in.Source == 2 { + if in.Source == 1 && in.RewradTypeId == 37 { // 增加奖励已领取数量 _, err = dao.Rewards.Ctx(ctx).Where(do.Rewards{Id: in.RewardId}).Increment(dao.Rewards.Columns().ReceivedNum, 1) if err != nil { @@ -704,6 +707,63 @@ func (s *sReward) GetLift(ctx context.Context, in *model.GetRewardIn) (out *mode } } else { // 门店奖励处理 + value, err := dao.RewardTypes.Ctx(ctx).WherePri(in.RewradTypeId).Value() + if err != nil { + return nil, ecode.Fail.Sub("获取奖励类型失败") + } + if value.IsEmpty() { + return nil, ecode.Params.Sub("奖励类型不存在") + } + switch value.String() { + case consts.NetfeeCode: + dao.UserTaskRewards.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { + xyUserId, err := dao.Users.Ctx(ctx).WherePri(in.UserId).Fields(dao.Users.Columns().XyUserId).Value() + if err != nil { + return err + } + if xyUserId.IsEmpty() { + return ecode.Params.Sub("该用户暂未绑定8圈账号,无法发放网费奖励") + } + + // 增加奖励已领取数量 + _, err = dao.Rewards.Ctx(ctx).Where(do.Rewards{Id: in.RewardId}).Increment(dao.Rewards.Columns().ReceivedNum, 1) + if err != nil { + return ecode.Fail.Sub("获取奖励领取记录异常") + } + + // 修改用户任务奖励记录状态 + _, err = dao.UserTaskRewards.Ctx(ctx).Where(do.UserTaskRewards{Id: in.Id}).Data(do.UserTaskRewards{ + Status: consts.RewardExchangeStatus, + }).Update() + if err != nil { + return ecode.Fail.Sub("修改用户任务奖励记录状态异常") + } + + client, b := mqtt.GetClient("emqx") + if !b { + return ecode.Fail.Sub("获取mqtt客户端异常") + } + downData := emqx.DownData{CMD: consts.CmdUserFee, Data: struct { + XyUserId string `json:"xy_user_id"` + Money int `json:"money"` + Note string `json:"note"` + OrderId string `json:"order_id"` + }{ + XyUserId: xyUserId.String(), + Money: in.GrantQuantity, + Note: fmt.Sprintf("用户领取 id 为 %d,下发记录 id 为 %d 的网费", in.RewardId, in.Id), + OrderId: gconv.String(in.Id), + }} + marshal, err := json.Marshal(downData) + if err != nil { + return ecode.Fail.Sub("json.Marshal异常") + } + if err = client.Publish(fmt.Sprintf(consts.DOWNDataTopic, in.StoreId), marshal); err != nil { + return ecode.Fail.Sub("Publish异常") + } + return nil + }) + } } return out, err } @@ -1115,3 +1175,41 @@ func (s *sReward) GetUserClaimList(ctx context.Context, in *model.GetUserClaimLi Total: total, }, nil } + +func (s *sReward) NetfeeCallback(ctx context.Context, in *model.NetfeeCallbackIn) (out *model.NetfeeCallbackOut, err error) { + if err = dao.UserTaskRewards.Transaction(ctx, func(ctx context.Context, tx gdb.TX) error { + value, err := dao.UserTaskRewards.Ctx(ctx).WherePri(in.OrderId).Fields(dao.UserTaskRewards.Columns().UserTaskId).Value() + if err != nil { + return ecode.Fail.Sub("查询用户任务奖励失败") + } + if value.IsEmpty() { + return ecode.Fail.Sub("查询用户任务奖励失败") + } + count, err := dao.UserTaskRewards.Ctx(ctx).Where(do.UserTaskRewards{UserTaskId: value.Int64()}).WhereIn(dao.UserTaskRewards.Columns().Status, []int{2, 3, 5}).Count() + if err != nil { + return ecode.Fail.Sub("查询用户任务奖励失败") + } + + if count == 1 { + // 修改任务记录状态2 + _, err = dao.UserTasks.Ctx(ctx).Where(do.UserTasks{Id: value.Int64()}).Data(do.UserTasks{ + Status: 2, + }).Update() + + if err != nil { + return ecode.Fail.Sub("修改用户任务状态失败") + } + } + if _, err := dao.UserTaskRewards.Ctx(ctx).Data(do.UserTaskRewards{ + Status: consts.RewardSuccessStatus, + }).Update(); err != nil { + return ecode.Fail.Sub("修改用户任务奖励状态失败") + } + + return nil + }); err != nil { + return nil, err + } + + return &model.NetfeeCallbackOut{Success: true}, nil +} diff --git a/internal/logic/storeDesktopSetting/storeDesktopSetting.go b/internal/logic/storeDesktopSetting/storeDesktopSetting.go index 681eada..0cbb8f7 100644 --- a/internal/logic/storeDesktopSetting/storeDesktopSetting.go +++ b/internal/logic/storeDesktopSetting/storeDesktopSetting.go @@ -5,11 +5,13 @@ import ( "encoding/json" "fmt" "github.com/gogf/gf/v2/errors/gerror" + "server/internal/consts" "server/internal/dao" "server/internal/model" "server/internal/model/do" "server/internal/service" "server/utility/mqtt" + "server/utility/mqtt/emqx" ) type sStoreDesktopSetting struct { @@ -63,15 +65,26 @@ func (s *sStoreDesktopSetting) Save(ctx context.Context, in model.SaveDesktopSet if err != nil { return nil, err } - marshal, err := json.Marshal(in) - if err != nil { - return nil, err - } client, b := mqtt.GetClient("emqx") if !b { return nil, gerror.New("获取MQTT客户端失败") } - err = client.Publish(fmt.Sprintf("/desktop/%d", in.StoreId), marshal) + downData := emqx.DownData{ + CMD: consts.CmdDesktopSetting, + StoreId: int(in.StoreId), + Data: struct { + RightComponentVisible int `json:"rightComponentVisible"` + TopComponentVisible int `json:"topComponentVisible"` + }{ + RightComponentVisible: in.RightComponentVisible, + TopComponentVisible: in.TopComponentVisible, + }, + } + marshal, err := json.Marshal(downData) + if err != nil { + return nil, err + } + err = client.Publish(fmt.Sprintf("/%d/down", in.StoreId), marshal) if err != nil { return nil, err } diff --git a/internal/logic/task/task.go b/internal/logic/task/task.go index 17978a3..29d0077 100644 --- a/internal/logic/task/task.go +++ b/internal/logic/task/task.go @@ -679,6 +679,7 @@ func (s *sTask) GetTaskList(ctx context.Context, in *model.GetTaskListV2In) (out } if int(v.UserTimes) >= v.TargetTimes { + completedTime := gtime.Now() // 判断当前用户完成情况,已完成根据任务、用户,任务类型检查是否存在用户任务记录 orm := dao.UserTasks.Ctx(ctx).Where(do.UserTasks{UserId: in.UserId, TaskId: v.TaskID}) if v.GameTaskConfig.TimeType == 1 { @@ -741,12 +742,17 @@ func (s *sTask) GetTaskList(ctx context.Context, in *model.GetTaskListV2In) (out // 拼装门店奖励数据 if len(result.TaskList[i].Rewards) > 0 { for _, reward := range result.TaskList[i].Rewards { + var quantity uint64 + quantity, err = CalculateNetfeeRewardQuantity(ctx, in.UserId, in.StoreId, &reward, completedTime) + if err != nil { + quantity = reward.GrantQuantity + } in := do.UserTaskRewards{ RewardId: reward.Id, UserTaskId: id, RewardName: reward.Name, Status: consts.RewardPendingStatus, - IssueQuantity: reward.GrantQuantity, + IssueQuantity: quantity, Source: 2, RewardTypeId: reward.RewardTypeId, } @@ -936,3 +942,95 @@ func (s *sTask) SyncTaskFromGamelife(ctx context.Context) (out *model.SyncTaskOu wg.Wait() return } +func CalculateNetfeeRewardQuantity(ctx context.Context, userId int64, storeId int64, reward *model.SimpleReward, completedTime *gtime.Time) (uint64, error) { + const rewardTypeCode = consts.NetfeeCode + + // 判断是否是门店网费奖励 + exist, err := dao.RewardTypes.Ctx(ctx). + WherePri(reward.RewardTypeId). + Where(do.RewardTypes{Code: rewardTypeCode}). + Exist() + if err != nil { + return 0, ecode.Fail.Sub("获取奖励类型失败") + } + if !exist { + // 不是网费奖励,返回当前奖励默认值 + return reward.GrantQuantity, nil + } + + // 获取当前小时 & 星期几(0=周日) + hour := completedTime.Hour() + weekday := int(completedTime.Weekday()) + + // 获取上机记录 + areaLevel, err := dao.StoreClientSessions.Ctx(ctx). + Where(do.StoreClientSessions{UserId: userId}). + WhereLTE(dao.StoreClientSessions.Columns().StartTime, completedTime). + Where("end_time IS NULL OR end_time >= ?", completedTime). + Fields("area_name,level_id"). + OrderDesc(dao.StoreClientSessions.Columns().StartTime). + One() + if err != nil { + return 0, ecode.Fail.Sub("获取用户上机记录失败") + } + + // 获取会员等级ID(内部ID) + levelId, err := dao.StoreMemberLevels.Ctx(ctx). + Where(do.StoreMemberLevels{LevelId: areaLevel["level_id"].Int64()}). + Fields("id"). + Value() + if err != nil { + return 0, ecode.Fail.Sub("获取会员等级失败") + } + if levelId.IsEmpty() { + return reward.GrantQuantity, nil + } + + // 获取区域ID + areaId, err := dao.StoreAreas.Ctx(ctx). + Where(do.StoreAreas{AreaName: areaLevel["area_name"].String()}). + Fields("id"). + Value() + if err != nil { + return 0, ecode.Fail.Sub("获取区域失败") + } + if areaId.IsEmpty() { + return reward.GrantQuantity, nil + } + + // 获取门店该区域、等级、奖励配置 + priceDataStr, err := dao.StoreNetfeeAreaLevel.Ctx(ctx). + Where(do.StoreNetfeeAreaLevel{ + StoreId: storeId, + AreaId: areaId.Int(), + MemberLevelId: levelId.Int(), + RewardId: reward.Id, + }). + Fields(dao.StoreNetfeeAreaLevel.Columns().PriceData). + Value() + if err != nil { + return 0, ecode.Fail.Sub("获取网费奖励价格配置失败") + } + + // 若配置为空,返回默认值 + if priceDataStr.IsEmpty() { + return reward.GrantQuantity, nil + } + + // 解析 priceData + var priceData [][]int + if err := json.Unmarshal([]byte(priceDataStr.String()), &priceData); err != nil { + return 0, ecode.Fail.Sub("网费价格配置解析失败") + } + + // 防止越界 + if weekday >= len(priceData) || hour >= len(priceData[weekday]) { + return 0, ecode.Fail.Sub("网费奖励价格配置不完整") + } + + grant := uint64(priceData[weekday][hour]) + + glog.Infof(ctx, "网费奖励金额为 %d(来源于门店配置)", grant) + + return grant, nil +} diff --git a/internal/model/do/users.go b/internal/model/do/users.go index d99cbca..c3bdc5f 100644 --- a/internal/model/do/users.go +++ b/internal/model/do/users.go @@ -29,4 +29,5 @@ type Users struct { RoleId interface{} // 角色ID LastLoginStoreId interface{} // 上次登录门店ID Quan8Uuid interface{} // 8圈使用的 uuid + XyUserId interface{} // 系统唯一用户ID } diff --git a/internal/model/entity/users.go b/internal/model/entity/users.go index 39770a9..91774ab 100644 --- a/internal/model/entity/users.go +++ b/internal/model/entity/users.go @@ -27,4 +27,5 @@ type Users struct { RoleId int64 `json:"roleId" orm:"role_id" description:"角色ID"` // 角色ID LastLoginStoreId int64 `json:"lastLoginStoreId" orm:"last_login_store_id" description:"上次登录门店ID"` // 上次登录门店ID Quan8Uuid string `json:"quan8Uuid" orm:"quan8_uuid" description:"8圈使用的 uuid"` // 8圈使用的 uuid + XyUserId string `json:"xyUserId" orm:"xy_user_id" description:"系统唯一用户ID"` // 系统唯一用户ID } diff --git a/internal/model/reward.go b/internal/model/reward.go index cbf0caa..7bec6dd 100644 --- a/internal/model/reward.go +++ b/internal/model/reward.go @@ -196,19 +196,21 @@ type CallbackData struct { } type GetRewardIn struct { - Id int - AreaId int - GameId int - GameCode string - RewradTypeId int - RewardId int - RoleIdx string - TaskId string - PopenId string - Source int - BindType int - UserTaskId int - UserId int + Id int + AreaId int + GameId int + GameCode string + RewradTypeId int + RewardId int + RoleIdx string + TaskId string + PopenId string + Source int + BindType int + UserTaskId int + UserId int + GrantQuantity int + StoreId int } type GetRewardOut struct { //List []GetRewardNewOut `json:"list"` diff --git a/internal/model/userTaskReward.go b/internal/model/userTaskReward.go index bb91252..c558169 100644 --- a/internal/model/userTaskReward.go +++ b/internal/model/userTaskReward.go @@ -44,3 +44,9 @@ type UserClaimReward struct { RewardName string `json:"rewardName" orm:"reward_name"` SimpleReward SimpleReward `json:"reward" orm:"with:id=reward_id"` } +type NetfeeCallbackIn struct { + OrderId string +} +type NetfeeCallbackOut struct { + Success bool +} diff --git a/internal/packed/packed.go b/internal/packed/packed.go index e61b667..d483800 100644 --- a/internal/packed/packed.go +++ b/internal/packed/packed.go @@ -4,7 +4,7 @@ import ( _ "github.com/gogf/gf/contrib/drivers/mysql/v2" _ "github.com/gogf/gf/contrib/nosql/redis/v2" _ "server/utility/gamelife" - //_ "server/utility/mqtt/emqx" + _ "server/utility/mqtt/emqx" _ "server/utility/myCasbin" _ "server/utility/oss/aliyun" _ "server/utility/rsa" diff --git a/internal/service/reward.go b/internal/service/reward.go index 09ca4b4..263e844 100644 --- a/internal/service/reward.go +++ b/internal/service/reward.go @@ -337,6 +337,7 @@ type ( // } CallBack(ctx context.Context, in *model.RewardCallbackIn) (out *model.RewardCallbackOut, err error) GetUserClaimList(ctx context.Context, in *model.GetUserClaimListIn) (out *model.GetUserClaimListOut, err error) + NetfeeCallback(ctx context.Context, in *model.NetfeeCallbackIn) (out *model.NetfeeCallbackOut, err error) } ) diff --git a/utility/mqtt/emqx/emqx.go b/utility/mqtt/emqx/emqx.go index ea5354e..412dbc8 100644 --- a/utility/mqtt/emqx/emqx.go +++ b/utility/mqtt/emqx/emqx.go @@ -2,11 +2,12 @@ package emqx import ( "context" + "encoding/json" "fmt" - "github.com/gogf/gf/v2/encoding/gjson" - "github.com/gogf/gf/v2/util/gconv" + "github.com/gogf/gf/v2/os/gtime" + "server/internal/consts" "server/internal/dao" - "strconv" + "server/internal/model/do" "sync" "time" @@ -88,12 +89,29 @@ func (e *emqxClient) Subscribe(topic string, handler func(topic string, payload return err } -type DeviceData struct { - NetbarAccount string `json:"netbarAccount"` - DeviceId string `json:"deviceId"` - DeviceName string `json:"deviceName"` - IP string `json:"ip"` - MACAddress string `json:"macAddress"` +type MemberLevelData struct { + LevelID int `json:"level_id"` // 会员等级ID(如0表示临时卡) + LevelName string `json:"level_name"` // 等级名称 + Level int `json:"level"` // 等级顺序,值越大等级越高 + OlType int8 `json:"ol_type"` // 在线类型,例如:1=线上 + Status int8 `json:"status"` // 状态:1=启用,2=禁用 +} + +type ClientData struct { + AreaName string `json:"area_name"` + ClientName string `json:"client_name"` + Status int8 `json:"status"` +} +type ClientSession struct { + AreaName string `json:"area_name"` + ClientName string `json:"client_name"` + CardId string `json:"card_id"` + XyUserId string `json:"xy_user_id"` + Level int `json:"level"` + LevelName string `json:"level_name"` + StartTime string `json:"start_time"` + EndTime string `json:"end_time"` + UsedTime int `json:"used_time"` } // init 注册emqx客户端 @@ -116,181 +134,215 @@ func init() { mqtt.Register("emqx", client) glog.Infof(ctx, "EMQX 客户端注册完成,broker=%s, clientID=%s", broker, clientId) - // 订阅设备上线消息 - // 订阅设备上线消息 + go func() { - ctx := context.Background() - err := client.Subscribe("/+/up", func(topic string, payload []byte) { - glog.Infof(ctx, "收到 MQTT 消息,topic=%s", topic) - - var data DeviceData - if err := gjson.Unmarshal(payload, &data); err != nil { - glog.Errorf(ctx, "[/up] 解析设备信息失败: %v", err) + // 监听数据上报通道 + client.Subscribe("/+/up", func(topic string, payload []byte) { + var base struct { + Cmd int `json:"cmd"` + StoreId int `json:"storeId"` + Data json.RawMessage `json:"data"` + } + if err := json.Unmarshal(payload, &base); err != nil { + glog.Errorf(ctx, "解析数据失败,topic=%s,错误:%v", topic, err) return } - - deviceId := data.DeviceId - netbarAccount := data.NetbarAccount - now := time.Now().Unix() - - // Redis 统一 key - onlineDevicesKey := "system_device:online_devices" - lastOnlineKey := fmt.Sprintf("system_device:last_online:%s", deviceId) - deviceInfoKey := fmt.Sprintf("system_device:info:%s", deviceId) - netbarDeviceCountKey := fmt.Sprintf("system_device:netbar_device_count:%s", netbarAccount) - - // 判断设备是否在线 - exists, err := g.Redis().HExists(ctx, onlineDevicesKey, deviceId) - if err != nil { - glog.Errorf(ctx, "查询设备在线状态失败 %s: %v", deviceId, err) - return - } - - // 获取上次上线时间,判断是否断线重连 - lastOnlineStr, err := g.Redis().Get(ctx, lastOnlineKey) - shouldSend := false - needIncr := false - - if exists != 1 { - // 设备不在线,首次上线 - shouldSend = true - needIncr = true - } else if err != nil || lastOnlineStr.IsEmpty() { - // 无上线时间记录,断线重连 - shouldSend = true - needIncr = true - } else { - lastOnline, err := strconv.ParseInt(lastOnlineStr.String(), 10, 64) - if err != nil || now-lastOnline > 10 { - // 超过10秒断线重连 - shouldSend = true - needIncr = true - } else { - // 在线且未断线,不加计数,不发送配置 - shouldSend = false - needIncr = false + switch base.Cmd { + case consts.CmdMemberLevels: + // 当前门店会员等级列表数据 + levels := make([]MemberLevelData, 0) + if err := json.Unmarshal(base.Data, &levels); err != nil { + glog.Errorf(ctx, "解析会员等级数据失败,topic=%s,错误:%v", topic, err) + return } - } - - // 更新上线时间并设置20秒过期 - if _, err := g.Redis().Set(ctx, lastOnlineKey, now); err != nil { - glog.Errorf(ctx, "更新上线时间失败 %s: %v", lastOnlineKey, err) - } - if _, err := g.Redis().Expire(ctx, lastOnlineKey, 20); err != nil { - glog.Errorf(ctx, "设置上线时间过期失败 %s: %v", lastOnlineKey, err) - } - - // 需要时增加在线设备计数 - if needIncr { - if _, err := g.Redis().Incr(ctx, netbarDeviceCountKey); err != nil { - glog.Errorf(ctx, "增加 Netbar 在线设备数失败 %s: %v", netbarDeviceCountKey, err) + saveOrUpdateLevels(ctx, base.StoreId, levels) + glog.Infof(ctx, "成功保存会员等级数据,store_id=%d", base.StoreId) + case consts.CmdClientList: + // 当前门店客户机列表数据 + clients := make([]ClientData, 0) + if err := json.Unmarshal(base.Data, &clients); err != nil { + glog.Errorf(ctx, "解析客户机数据失败,topic=%s,错误:%v", topic, err) + return } - } - - // 更新在线设备集合(时间戳) - if _, err := g.Redis().HSet(ctx, onlineDevicesKey, map[string]interface{}{ - deviceId: now, - }); err != nil { - glog.Errorf(ctx, "更新在线设备集合失败 %s: %v", onlineDevicesKey, err) - return - } - - // 更新设备基础信息,并设置1小时过期 - if _, err := g.Redis().HSet(ctx, deviceInfoKey, map[string]interface{}{ - "netbarAccount": netbarAccount, - "deviceName": data.DeviceName, - "ip": data.IP, - "macAddress": data.MACAddress, - }); err != nil { - glog.Errorf(ctx, "存储设备信息失败 %s: %v", deviceInfoKey, err) - } - if _, err := g.Redis().Expire(ctx, deviceInfoKey, 3600); err != nil { - glog.Errorf(ctx, "设置设备信息过期失败 %s: %v", deviceInfoKey, err) - } - - // 首次或断线重连发送配置 - if shouldSend { - one, err := dao.Stores.Ctx(ctx).InnerJoin(dao.StoreDesktopSettings.Table(), "sds", "sds.store_id = stores.id"). - Where("stores.netbar_account = ?", netbarAccount). - Fields("stores.netbar_account netbarAccount, sds.top_component_visible topComponentVisible, sds.right_component_visible rightComponentVisible").One() - if err != nil { - glog.Errorf(ctx, "获取门店信息失败 %s: %v", deviceInfoKey, err) - } - if err = client.Publish(fmt.Sprintf("/%s/down", netbarAccount), gconv.Bytes(one.Json())); err != nil { - glog.Errorf(ctx, "发布消息失败 %s: %v", deviceInfoKey, err) + saveOrUpdateClients(ctx, base.StoreId, clients) + case consts.CmdClientUp, consts.CmdClientDown: + session := ClientSession{} + if err := json.Unmarshal(base.Data, &session); err != nil { + glog.Errorf(ctx, "解析客户机数据失败,topic=%s,错误:%v", topic, err) + return } + saveOrUpdateClientSession(ctx, base.StoreId, session) } - }) - if err != nil { - glog.Errorf(ctx, "订阅 /+/up 失败: %v", err) - } }() +} - // 监控离线设备 - go func() { - ctx := context.Background() - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() +func saveOrUpdateClientSession(ctx context.Context, storeId int, session ClientSession) { + // 查询客户机信息 + client, err := dao.StoreClients.Ctx(ctx). + Where("store_id", storeId). + Where("client_name", session.ClientName). + One() + if err != nil || client.IsEmpty() { + glog.Errorf(ctx, "未找到客户机信息:store_id=%d client_name=%s", storeId, session.ClientName) + return + } + clientId := client["id"].Int64() + areaId := client["area_id"].Int64() + value, err := dao.Users.Ctx(ctx).Where(do.Users{XyUserId: session.XyUserId}).Fields("id").Value() + if err != nil || value.IsEmpty() { + glog.Errorf(ctx, "未找到用户信息:xy_user_id=%s", session.XyUserId) + return + } + // 上机逻辑(没有 end_time) + if session.EndTime == "" { + // 插入上机记录 + _, err := dao.StoreClientSessions.Ctx(ctx).Data(do.StoreClientSessions{ + UserId: value.Int64(), + StoreId: int64(storeId), + ClientId: clientId, + AreaId: areaId, + XyUserId: session.XyUserId, + LevelId: session.Level, + LevelName: session.LevelName, + StartTime: gtime.NewFromStr(session.StartTime), + AreaName: session.AreaName, + }).Insert() + if err != nil { + glog.Errorf(ctx, "插入上机记录失败:store_id=%d client_id=%d xy_user_id=%s,错误:%v", storeId, clientId, session.XyUserId, err) + return + } + // 更新客户机状态为 上机中(3) + _, err = dao.StoreClients.Ctx(ctx). + Where("id", clientId). + Data(g.Map{"status": 3}). + Update() + if err != nil { + glog.Errorf(ctx, "更新客户机状态为上机中失败:client_id=%d,错误:%v", clientId, err) + } + return + } - for { - select { - case <-ctx.Done(): - glog.Info(ctx, "停止离线设备监控") - return - case <-ticker.C: - onlineDevicesKey := "system_device:online_devices" - devicesVar, err := g.Redis().HGetAll(ctx, onlineDevicesKey) - if err != nil { - glog.Errorf(ctx, "获取在线设备失败: %v", err) - continue - } + // 下机逻辑(end_time 存在) + _, err = dao.StoreClientSessions.Ctx(ctx). + Where("store_id", storeId). + Where("client_id", clientId). + Where("xy_user_id", session.XyUserId). + Where("start_time", session.StartTime). + Data(g.Map{ + "end_time": session.EndTime, + "used_time": session.UsedTime, + }).Update() + if err != nil { + glog.Errorf(ctx, "更新下机记录失败:store_id=%d client_id=%d xy_user_id=%s,错误:%v", storeId, clientId, session.XyUserId, err) + return + } + // 更新客户机状态为 空闲(1) + _, err = dao.StoreClients.Ctx(ctx). + Where("id", clientId). + Data(g.Map{"status": 1}). + Update() + if err != nil { + glog.Errorf(ctx, "更新客户机状态为空闲失败:client_id=%d,错误:%v", clientId, err) + } +} - devices := devicesVar.MapStrStr() - if len(devices) == 0 { - continue - } +func saveOrUpdateClients(ctx context.Context, storeId int, clients []ClientData) { + for _, client := range clients { + var areaId int64 + // Step 1: 查询区域 ID + value, err := dao.StoreAreas.Ctx(ctx).Where(do.StoreAreas{StoreId: storeId, AreaName: client.AreaName}).Fields(dao.StoreAreas.Columns().Id).Value() + if err != nil { + glog.Errorf(ctx, "查询区域失败 store_id=%d area_name=%s err=%v", storeId, client.AreaName, err) + continue + } - now := time.Now().Unix() - for deviceId, timestampStr := range devices { - timestamp, err := strconv.ParseInt(timestampStr, 10, 64) - if err != nil { - glog.Errorf(ctx, "无效时间戳 for 设备 %s: %v", deviceId, err) - continue - } + if value.IsEmpty() { + // 不存在则插入数据 + id, err := dao.StoreAreas.Ctx(ctx).Data(do.StoreAreas{ + StoreId: storeId, + AreaName: client.AreaName, + }).InsertAndGetId() + if err != nil { + glog.Errorf(ctx, "插入区域失败 store_id=%d area_name=%s err=%v", storeId, client.AreaName, err) + continue + } + areaId = id + } else { + areaId = value.Int64() + } - if now-timestamp > 10 { - // 超过10秒未更新,认定离线 - deviceInfoKey := fmt.Sprintf("system_device:info:%s", deviceId) - dataVar, err := g.Redis().HGetAll(ctx, deviceInfoKey) - if err != nil { - glog.Errorf(ctx, "获取设备数据失败 %s: %v", deviceInfoKey, err) - continue - } + // Step 2: 查询客户机是否存在 + count, err := dao.StoreClients.Ctx(ctx).Where(do.StoreClients{StoreId: storeId, ClientName: client.ClientName, AreaId: areaId}).Count() + if err != nil { + glog.Errorf(ctx, "查询客户机失败 store_id=%d client_name=%s err=%v", storeId, client.ClientName, err) + continue + } - data := dataVar.MapStrStr() - netbarAccount, exists := data["netbarAccount"] - if !exists { - glog.Errorf(ctx, "设备 %s 缺少 netbarAccount", deviceId) - continue - } + data := g.Map{ + "store_id": storeId, + "client_name": client.ClientName, + "area_id": areaId, + "status": client.Status, + } - // 新增:system_device命名空间,按账号统计在线设备数 -1 - netbarDeviceCountKey := fmt.Sprintf("system_device:netbar_device_count:%s", netbarAccount) - if _, err := g.Redis().Decr(ctx, netbarDeviceCountKey); err != nil { - glog.Errorf(ctx, "减少 Netbar 在线设备数失败 %s: %v", netbarDeviceCountKey, err) - } - - // 从在线设备集合中删除 - if _, err := g.Redis().HDel(ctx, onlineDevicesKey, deviceId); err != nil { - glog.Errorf(ctx, "移除设备 %s 从在线集合失败: %v", deviceId, err) - } else { - glog.Infof(ctx, "设备 %s 已标记为离线", deviceId) - } - } - } + // Step 3: 更新 or 插入 + if count > 0 { + _, err = dao.StoreClients.Ctx(ctx). + Data(data). + Where("store_id", storeId). + Where("client_name", client.ClientName). + Update() + if err != nil { + glog.Errorf(ctx, "更新客户机失败 store_id=%d client_name=%s err=%v", storeId, client.ClientName, err) + } + } else { + _, err = dao.StoreClients.Ctx(ctx). + Data(data). + Insert() + if err != nil { + glog.Errorf(ctx, "插入客户机失败 store_id=%d client_name=%s err=%v", storeId, client.ClientName, err) } } - }() - + } +} + +func saveOrUpdateLevels(ctx context.Context, storeId int, levels []MemberLevelData) { + for _, level := range levels { + exists, err := dao.StoreMemberLevels.Ctx(ctx). + Where("store_id", storeId). + Where("level_id", level.LevelID). + Count() + if err != nil { + glog.Infof(ctx, "检查会员等级是否存在失败,store_id=%d level_id=%d", storeId, level.LevelID) + } + data := g.Map{ + "store_id": storeId, + "level_id": level.LevelID, + "level_name": level.LevelName, + "level": level.Level, + "ol_type": level.OlType, + "status": level.Status, + } + if exists > 0 { + _, err = dao.StoreMemberLevels.Ctx(ctx). + Where("store_id", storeId). + Where("level_id", level.LevelID). + Data(data). + Update() + } else { + _, err = dao.StoreMemberLevels.Ctx(ctx). + Data(data). + Insert() + } + if err != nil { + glog.Errorf(ctx, "保存会员等级数据失败,store_id=%d level_id=%d", storeId, level.LevelID) + } + } +} + +type DownData struct { + CMD int `json:"cmd"` + StoreId int `json:"storeId"` + Data interface{} `json:"data"` }