diff --git a/internal/consts/redis.go b/internal/consts/redis.go index 2ef0834..a7549e9 100644 --- a/internal/consts/redis.go +++ b/internal/consts/redis.go @@ -22,6 +22,6 @@ const ( ) const ( - NetbarOnlineNumberKey = "%s:online:device:%s" - NetbarOnlineDeviceSetKey = "%s:online_devices" + NetbarOnlineNumberKey = "%s:online_number" + OnlineDeviceSetKey = "online:devices" ) diff --git a/internal/controller/rewardType/rewardType_v1_list.go b/internal/controller/rewardType/rewardType_v1_list.go index c586be6..ea170b5 100644 --- a/internal/controller/rewardType/rewardType_v1_list.go +++ b/internal/controller/rewardType/rewardType_v1_list.go @@ -20,13 +20,11 @@ func (c *ControllerV1) List(ctx context.Context, req *v1.ListReq) (res *v1.ListR ) out, err := service.RewardType().List(ctx, &model.RewardTypeListIn{ - Page: req.Page, - Size: req.Size, - OperatorId: operatorId, - OperatorRole: operatorRole, - StoreId: req.StoreId, - Name: req.Name, - Source: req.Source, + Page: req.Page, + Size: req.Size, + StoreId: req.StoreId, + Name: req.Name, + Source: req.Source, }) if err != nil { diff --git a/internal/controller/statistic/statistic_v1_get_online_device.go b/internal/controller/statistic/statistic_v1_get_online_device.go index adbbf6e..2c2d859 100644 --- a/internal/controller/statistic/statistic_v1_get_online_device.go +++ b/internal/controller/statistic/statistic_v1_get_online_device.go @@ -10,6 +10,11 @@ import ( ) func (c *ControllerV1) GetOnlineDevice(ctx context.Context, req *v1.GetOnlineDeviceReq) (res *v1.GetOnlineDeviceRes, err error) { - card, err := g.Redis().SCard(ctx, fmt.Sprintf(consts.NetbarOnlineDeviceSetKey, req.NetbarAccount)) - return &v1.GetOnlineDeviceRes{Total: card}, err + get, err := g.Redis().Get(ctx, fmt.Sprintf(consts.NetbarOnlineNumberKey, req.NetbarAccount)) + if err != nil { + return nil, err + } + return &v1.GetOnlineDeviceRes{ + Total: get.Int64(), + }, nil } diff --git a/internal/logic/rewardType/rewardType.go b/internal/logic/rewardType/rewardType.go index f8b1a1a..9268a97 100644 --- a/internal/logic/rewardType/rewardType.go +++ b/internal/logic/rewardType/rewardType.go @@ -107,30 +107,6 @@ func (s *sRewardType) Delete(ctx context.Context, in *model.RewardTypeDeleteIn) func (s *sRewardType) List(ctx context.Context, in *model.RewardTypeListIn) (out *model.RewardTypeListOut, err error) { mod := dao.RewardTypes.Ctx(ctx).Where("deleted_at IS NULL") - switch in.OperatorRole { - case consts.AdminRoleCode: - // 系统管理员只能看系统奖励类型 - mod = mod.Where("source", 1) - - case consts.MerchantRoleCode: - // 校验商户是否对该门店有权限 - if err = checkRewardTypePermission(ctx, in.OperatorRole, in.OperatorId, 2, in.StoreId); err != nil { - return nil, err - } - // 只查询该门店的奖励类型(source=2且store_id=指定门店) - mod = mod.Where("source", 2).WhereIn("store_id", in.StoreId) - - case consts.StoreRoleCode: - // 校验门店权限 - if err = checkRewardTypePermission(ctx, in.OperatorRole, in.OperatorId, 2, in.StoreId); err != nil { - return nil, err - } - mod = mod.Where("source", 2).Where("store_id", in.StoreId) - - default: - return nil, ecode.Params.Sub("无效的操作角色") - } - // 其余过滤条件 if in.Name != "" { mod = mod.WhereLike("name", "%"+in.Name+"%") diff --git a/internal/logic/storeDesktopSetting/storeDesktopSetting.go b/internal/logic/storeDesktopSetting/storeDesktopSetting.go index 9a3a444..681eada 100644 --- a/internal/logic/storeDesktopSetting/storeDesktopSetting.go +++ b/internal/logic/storeDesktopSetting/storeDesktopSetting.go @@ -13,17 +13,10 @@ import ( ) type sStoreDesktopSetting struct { - mqttCLient mqtt.MqttClient } func New() service.IStoreDesktopSetting { - client, b := mqtt.GetClient("emqx") - if !b { - return nil - } - return &sStoreDesktopSetting{ - mqttCLient: client, - } + return &sStoreDesktopSetting{} } func init() { service.RegisterStoreDesktopSetting(New()) @@ -74,7 +67,11 @@ func (s *sStoreDesktopSetting) Save(ctx context.Context, in model.SaveDesktopSet if err != nil { return nil, err } - err = s.mqttCLient.Publish(fmt.Sprintf("/desktop/%d", in.StoreId), marshal) + client, b := mqtt.GetClient("emqx") + if !b { + return nil, gerror.New("获取MQTT客户端失败") + } + err = client.Publish(fmt.Sprintf("/desktop/%d", in.StoreId), marshal) if err != nil { return nil, err } diff --git a/internal/logic/task/task.go b/internal/logic/task/task.go index 08f4ff5..a410cf7 100644 --- a/internal/logic/task/task.go +++ b/internal/logic/task/task.go @@ -385,5 +385,19 @@ func (s *sTask) GetTask(ctx context.Context, in *model.GetTaskIn) (out *model.Ge } func (s *sTask) GetUserTaskRecordsList(ctx context.Context, in *model.UserTaskRecordsListIn) (out *model.UserTaskRecordsListOut, err error) { - return + list := make([]model.UserTask, 0) + var total int + orm := dao.UserTasks.Ctx(ctx).Where(dao.UserTasks.Columns().Id, in.UserId) + + if in.StoreId != 0 { + orm = orm.Where(dao.UserTasks.Columns().StoreId, in.StoreId) + } + err = orm.Page(in.Page, in.Size).ScanAndCount(&list, &total, false) + if err != nil { + return nil, ecode.Fail.Sub("获取用户任务列表失败") + } + return &model.UserTaskRecordsListOut{ + List: list, + Total: total, + }, nil } diff --git a/internal/model/rewardType.go b/internal/model/rewardType.go index dec3cf1..b36d766 100644 --- a/internal/model/rewardType.go +++ b/internal/model/rewardType.go @@ -6,13 +6,13 @@ import ( // RewardType 奖励类型表 type RewardType struct { - Id int64 `json:"id" dc:"奖励类型ID"` - Name string `json:"name" dc:"奖励类型名称(如积分、优惠券)"` - TencentTypeId int `json:"tencentTypeId" dc:"腾讯奖励类型ID(仅系统奖励有效)"` - Source int `json:"source" dc:"来源:1=腾讯系统,2=本系统,3=其他"` - CreatedAt *gtime.Time `json:"createdAt" dc:"创建时间"` - UpdatedAt *gtime.Time `json:"updatedAt" dc:"更新时间"` - DeletedAt *gtime.Time `json:"deletedAt" dc:"软删除时间戳"` + Id int64 `json:"id" orm:"id" description:"奖励类型ID"` // 奖励类型ID + Name string `json:"name" orm:"name" description:"类型名称"` // 类型名称 + Code string `json:"code" orm:"code" description:"唯一编码"` // 唯一编码 + IconUrl string `json:"iconUrl" orm:"icon_url" description:"图标链接地址"` // 图标链接地址 + CreatedAt *gtime.Time `json:"createdAt" orm:"created_at" description:"创建时间"` // 创建时间 + UpdatedAt *gtime.Time `json:"updatedAt" orm:"updated_at" description:"更新时间"` // 更新时间 + DeletedAt *gtime.Time `json:"deletedAt" orm:"deleted_at" description:"软删除时间"` // 软删除时间 } // RewardTypeCreateIn 创建奖励类型入参 @@ -59,14 +59,12 @@ type RewardTypeDeleteOut struct { // RewardTypeListIn 获取奖励类型列表入参 type RewardTypeListIn struct { - OperatorId int64 - OperatorRole string - Page int - Size int - Name string - StoreId int64 - Status int - Source int + Page int + Size int + Name string + StoreId int64 + Status int + Source int } // RewardTypeListOut 获取奖励类型列表出参 diff --git a/internal/packed/packed.go b/internal/packed/packed.go index e61b667..d483800 100644 --- a/internal/packed/packed.go +++ b/internal/packed/packed.go @@ -4,7 +4,7 @@ import ( _ "github.com/gogf/gf/contrib/drivers/mysql/v2" _ "github.com/gogf/gf/contrib/nosql/redis/v2" _ "server/utility/gamelife" - //_ "server/utility/mqtt/emqx" + _ "server/utility/mqtt/emqx" _ "server/utility/myCasbin" _ "server/utility/oss/aliyun" _ "server/utility/rsa" diff --git a/utility/mqtt/emqx/emqx.go b/utility/mqtt/emqx/emqx.go index b7b9de0..7e7e8a5 100644 --- a/utility/mqtt/emqx/emqx.go +++ b/utility/mqtt/emqx/emqx.go @@ -3,6 +3,9 @@ package emqx import ( "context" "fmt" + "github.com/gogf/gf/v2/encoding/gjson" + "server/internal/consts" + "strconv" "sync" "time" @@ -84,9 +87,20 @@ func (e *emqxClient) Subscribe(topic string, handler func(topic string, payload return err } +type DeviceData struct { + NetbarAccount string `json:"netbarAccount"` + DeviceId string `json:"deviceId"` + DeviceName string `json:"deviceName"` + IP string `json:"ip"` + MACAddress string `json:"macAddress"` +} + // init 注册emqx客户端 func init() { + + // 创建可取消的上下文 ctx := context.Background() + // 加载 MQTT 配置 cfg := g.Config() host := cfg.MustGet(ctx, "mqtt.emqx.host").String() port := cfg.MustGet(ctx, "mqtt.emqx.port").Int() @@ -97,6 +111,122 @@ func init() { broker := fmt.Sprintf("tcp://%s:%d", host, port) client := New(broker, clientId, username, password) + // 注册 MQTT 客户端 mqtt.Register("emqx", client) - glog.Infof(ctx, "EMQX客户端注册完成,broker=%s, clientID=%s", broker, clientId) + + glog.Infof(ctx, "EMQX 客户端注册完成,broker=%s, clientID=%s", broker, clientId) + // 订阅设备上线消息 + go func() { + ctx := context.Background() + err := client.Subscribe("/up", func(topic string, payload []byte) { + glog.Infof(ctx, "收到 MQTT 消息,topic=%s", topic) + + var data DeviceData + if err := gjson.Unmarshal(payload, &data); err != nil { + glog.Errorf(ctx, "[/up] 解析设备信息失败: %v", err) + return + } + + // 增加门店在线设备计数 + key := fmt.Sprintf(consts.NetbarOnlineNumberKey, data.NetbarAccount) + if _, err := g.Redis().Incr(ctx, key); err != nil { + glog.Errorf(ctx, "增加 Redis 计数失败 %s: %v", key, err) + return + } + + // 更新在线设备集合 + setKey := "system:online:devices" + if _, err := g.Redis().HSet(ctx, setKey, map[string]interface{}{ + data.DeviceId: time.Now().Unix(), + }); err != nil { + glog.Errorf(ctx, "更新在线设备集合失败 %s: %v", setKey, err) + return + } + + // 存储设备信息(用于离线检测时获取 NetbarAccount) + deviceKey := fmt.Sprintf("device:%s", data.DeviceId) + if _, err := g.Redis().HSet(ctx, deviceKey, map[string]interface{}{ + "netbarAccount": data.NetbarAccount, + "deviceName": data.DeviceName, + "ip": data.IP, + "macAddress": data.MACAddress, + }); err != nil { + glog.Errorf(ctx, "存储设备信息失败 %s: %v", deviceKey, err) + } + }) + if err != nil { + glog.Errorf(ctx, "订阅 /up 失败: %v", err) + } + }() + + // 监控离线设备 + go func() { + ctx := context.Background() + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + glog.Info(ctx, "停止离线设备监控") + return + case <-ticker.C: + setKey := "system:online:devices" + devicesVar, err := g.Redis().HGetAll(ctx, setKey) + if err != nil { + glog.Errorf(ctx, "获取在线设备失败: %v", err) + continue + } + + // 转换为 map[string]string + devices := devicesVar.MapStrStr() + if len(devices) == 0 { + continue + } + + now := time.Now().Unix() + for deviceId, timestampStr := range devices { + // 使用 strconv.ParseInt 替代 time.ParseInt + timestamp, err := strconv.ParseInt(timestampStr, 10, 64) + if err != nil { + glog.Errorf(ctx, "无效时间戳 for 设备 %s: %v", deviceId, err) + continue + } + + // 检查设备是否离线(超过 10 秒未更新) + if now-timestamp > 10 { + // 获取设备信息 + dataKey := fmt.Sprintf("device:%s", deviceId) + dataVar, err := g.Redis().HGetAll(ctx, dataKey) + if err != nil { + glog.Errorf(ctx, "获取设备数据失败 %s: %v", dataKey, err) + continue + } + + // 转换为 map[string]string + data := dataVar.MapStrStr() + netbarAccount, exists := data["netbarAccount"] + if !exists { + glog.Errorf(ctx, "设备 %s 缺少 netbarAccount", deviceId) + continue + } + + // 减少在线设备计数 + key := fmt.Sprintf(consts.NetbarOnlineNumberKey, netbarAccount) + if _, err := g.Redis().Decr(ctx, key); err != nil { + glog.Errorf(ctx, "减少 Redis 计数失败 %s: %v", key, err) + } + + // 从在线设备集合中移除 + if _, err := g.Redis().HDel(ctx, setKey, deviceId); err != nil { + glog.Errorf(ctx, "移除设备 %s 从在线集合失败: %v", deviceId, err) + } else { + glog.Infof(ctx, "设备 %s 已标记为离线", deviceId) + } + } + } + } + } + }() + }