package emqx import ( "context" "encoding/json" "fmt" "github.com/gogf/gf/v2/os/gtime" "server/internal/consts" "server/internal/dao" "server/internal/model/do" "sync" "time" mqttlib "github.com/eclipse/paho.mqtt.golang" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "server/utility/mqtt" ) type emqxClient struct { client mqttlib.Client once sync.Once } // New 创建并连接 EMQX 客户端实例,连接成功才返回 func New(broker, clientID, username, password string) mqtt.MqttClient { opts := mqttlib.NewClientOptions(). AddBroker(broker). SetClientID(clientID). SetUsername(username). SetPassword(password). SetAutoReconnect(true). SetConnectTimeout(5 * time.Second) c := &emqxClient{ client: mqttlib.NewClient(opts), } // 立即连接 err := c.connect() if err != nil { glog.Errorf(context.Background(), "连接EMQX失败: %v", err) panic(fmt.Sprintf("连接EMQX失败: %v", err)) } glog.Infof(context.Background(), "EMQX客户端连接成功") return c } // connect 内部只调用一次连接 func (e *emqxClient) connect() error { var err error e.once.Do(func() { token := e.client.Connect() if token.Wait() && token.Error() != nil { err = fmt.Errorf("EMQX连接失败: %w", token.Error()) glog.Errorf(context.Background(), err.Error()) return } glog.Infof(context.Background(), "EMQX连接成功") }) return err } // Publish 实现接口 Publish func (e *emqxClient) Publish(topic string, payload []byte) error { token := e.client.Publish(topic, 0, false, payload) token.Wait() err := token.Error() if err != nil { glog.Errorf(context.Background(), "发布消息失败,topic=%s,错误:%v", topic, err) } else { glog.Infof(context.Background(), "成功发布消息,topic=%s,消息大小=%d字节", topic, len(payload)) } return err } // Subscribe 实现接口 Subscribe func (e *emqxClient) Subscribe(topic string, handler func(topic string, payload []byte)) error { token := e.client.Subscribe(topic, 0, func(client mqttlib.Client, msg mqttlib.Message) { handler(msg.Topic(), msg.Payload()) }) token.Wait() err := token.Error() if err != nil { glog.Errorf(context.Background(), "订阅失败,topic=%s,错误:%v", topic, err) } else { glog.Infof(context.Background(), "成功订阅主题,topic=%s", topic) } return err } type MemberLevelData struct { LevelID int `json:"level_id"` // 会员等级ID(如0表示临时卡) LevelName string `json:"level_name"` // 等级名称 Level int `json:"level"` // 等级顺序,值越大等级越高 OlType int8 `json:"ol_type"` // 在线类型,例如:1=线上 Status int8 `json:"status"` // 状态:1=启用,2=禁用 } type ClientData struct { AreaName string `json:"area_name"` ClientName string `json:"client_name"` Status int8 `json:"status"` } type ClientSession struct { AreaName string `json:"area_name"` ClientName string `json:"client_name"` CardId string `json:"card_id"` XyUserId string `json:"xy_user_id"` Level int `json:"level"` LevelName string `json:"level_name"` StartTime string `json:"start_time"` EndTime string `json:"end_time"` UsedTime int `json:"used_time"` } // init 注册emqx客户端 func init() { // 创建可取消的上下文 ctx := context.Background() // 加载 MQTT 配置 cfg := g.Config() host := cfg.MustGet(ctx, "mqtt.emqx.host").String() port := cfg.MustGet(ctx, "mqtt.emqx.port").Int() username := cfg.MustGet(ctx, "mqtt.emqx.username").String() password := cfg.MustGet(ctx, "mqtt.emqx.password").String() clientId := cfg.MustGet(ctx, "mqtt.emqx.clientId").String() broker := fmt.Sprintf("tcp://%s:%d", host, port) client := New(broker, clientId, username, password) // 注册 MQTT 客户端 mqtt.Register("emqx", client) glog.Infof(ctx, "EMQX 客户端注册完成,broker=%s, clientID=%s", broker, clientId) go func() { // 监听数据上报通道 client.Subscribe("/+/up", func(topic string, payload []byte) { var base struct { Cmd int `json:"cmd"` StoreId int `json:"storeId"` Data json.RawMessage `json:"data"` } if err := json.Unmarshal(payload, &base); err != nil { glog.Errorf(ctx, "解析数据失败,topic=%s,错误:%v", topic, err) return } switch base.Cmd { case consts.CmdMemberLevels: // 当前门店会员等级列表数据 levels := make([]MemberLevelData, 0) if err := json.Unmarshal(base.Data, &levels); err != nil { glog.Errorf(ctx, "解析会员等级数据失败,topic=%s,错误:%v", topic, err) return } saveOrUpdateLevels(ctx, base.StoreId, levels) glog.Infof(ctx, "成功保存会员等级数据,store_id=%d", base.StoreId) case consts.CmdClientList: // 当前门店客户机列表数据 clients := make([]ClientData, 0) if err := json.Unmarshal(base.Data, &clients); err != nil { glog.Errorf(ctx, "解析客户机数据失败,topic=%s,错误:%v", topic, err) return } saveOrUpdateClients(ctx, base.StoreId, clients) case consts.CmdClientUp, consts.CmdClientDown: session := ClientSession{} if err := json.Unmarshal(base.Data, &session); err != nil { glog.Errorf(ctx, "解析客户机数据失败,topic=%s,错误:%v", topic, err) return } saveOrUpdateClientSession(ctx, base.StoreId, session) } }) }() } func saveOrUpdateClientSession(ctx context.Context, storeId int, session ClientSession) { // 查询客户机信息 client, err := dao.StoreClients.Ctx(ctx). Where("store_id", storeId). Where("client_name", session.ClientName). One() if err != nil || client.IsEmpty() { glog.Errorf(ctx, "未找到客户机信息:store_id=%d client_name=%s", storeId, session.ClientName) return } clientId := client["id"].Int64() areaId := client["area_id"].Int64() value, err := dao.Users.Ctx(ctx).Where(do.Users{XyUserId: session.XyUserId}).Fields("id").Value() if err != nil || value.IsEmpty() { glog.Errorf(ctx, "未找到用户信息:xy_user_id=%s", session.XyUserId) return } // 上机逻辑(没有 end_time) if session.EndTime == "" { // 插入上机记录 _, err := dao.StoreClientSessions.Ctx(ctx).Data(do.StoreClientSessions{ UserId: value.Int64(), StoreId: int64(storeId), ClientId: clientId, AreaId: areaId, XyUserId: session.XyUserId, LevelId: session.Level, LevelName: session.LevelName, StartTime: gtime.NewFromStr(session.StartTime), AreaName: session.AreaName, }).Insert() if err != nil { glog.Errorf(ctx, "插入上机记录失败:store_id=%d client_id=%d xy_user_id=%s,错误:%v", storeId, clientId, session.XyUserId, err) return } // 更新客户机状态为 上机中(3) _, err = dao.StoreClients.Ctx(ctx). Where("id", clientId). Data(g.Map{"status": 3}). Update() if err != nil { glog.Errorf(ctx, "更新客户机状态为上机中失败:client_id=%d,错误:%v", clientId, err) } return } // 下机逻辑(end_time 存在) _, err = dao.StoreClientSessions.Ctx(ctx). Where("store_id", storeId). Where("client_id", clientId). Where("xy_user_id", session.XyUserId). Where("start_time", session.StartTime). Data(g.Map{ "end_time": session.EndTime, "used_time": session.UsedTime, }).Update() if err != nil { glog.Errorf(ctx, "更新下机记录失败:store_id=%d client_id=%d xy_user_id=%s,错误:%v", storeId, clientId, session.XyUserId, err) return } // 更新客户机状态为 空闲(1) _, err = dao.StoreClients.Ctx(ctx). Where("id", clientId). Data(g.Map{"status": 1}). Update() if err != nil { glog.Errorf(ctx, "更新客户机状态为空闲失败:client_id=%d,错误:%v", clientId, err) } } func saveOrUpdateClients(ctx context.Context, storeId int, clients []ClientData) { for _, client := range clients { var areaId int64 // Step 1: 查询区域 ID value, err := dao.StoreAreas.Ctx(ctx).Where(do.StoreAreas{StoreId: storeId, AreaName: client.AreaName}).Fields(dao.StoreAreas.Columns().Id).Value() if err != nil { glog.Errorf(ctx, "查询区域失败 store_id=%d area_name=%s err=%v", storeId, client.AreaName, err) continue } if value.IsEmpty() { // 不存在则插入数据 id, err := dao.StoreAreas.Ctx(ctx).Data(do.StoreAreas{ StoreId: storeId, AreaName: client.AreaName, }).InsertAndGetId() if err != nil { glog.Errorf(ctx, "插入区域失败 store_id=%d area_name=%s err=%v", storeId, client.AreaName, err) continue } areaId = id } else { areaId = value.Int64() } // Step 2: 查询客户机是否存在 count, err := dao.StoreClients.Ctx(ctx).Where(do.StoreClients{StoreId: storeId, ClientName: client.ClientName, AreaId: areaId}).Count() if err != nil { glog.Errorf(ctx, "查询客户机失败 store_id=%d client_name=%s err=%v", storeId, client.ClientName, err) continue } data := g.Map{ "store_id": storeId, "client_name": client.ClientName, "area_id": areaId, "status": client.Status, } // Step 3: 更新 or 插入 if count > 0 { _, err = dao.StoreClients.Ctx(ctx). Data(data). Where("store_id", storeId). Where("client_name", client.ClientName). Update() if err != nil { glog.Errorf(ctx, "更新客户机失败 store_id=%d client_name=%s err=%v", storeId, client.ClientName, err) } } else { _, err = dao.StoreClients.Ctx(ctx). Data(data). Insert() if err != nil { glog.Errorf(ctx, "插入客户机失败 store_id=%d client_name=%s err=%v", storeId, client.ClientName, err) } } } } func saveOrUpdateLevels(ctx context.Context, storeId int, levels []MemberLevelData) { for _, level := range levels { exists, err := dao.StoreMemberLevels.Ctx(ctx). Where("store_id", storeId). Where("level_id", level.LevelID). Count() if err != nil { glog.Infof(ctx, "检查会员等级是否存在失败,store_id=%d level_id=%d", storeId, level.LevelID) } data := g.Map{ "store_id": storeId, "level_id": level.LevelID, "level_name": level.LevelName, "level": level.Level, "ol_type": level.OlType, "status": level.Status, } if exists > 0 { _, err = dao.StoreMemberLevels.Ctx(ctx). Where("store_id", storeId). Where("level_id", level.LevelID). Data(data). Update() } else { _, err = dao.StoreMemberLevels.Ctx(ctx). Data(data). Insert() } if err != nil { glog.Errorf(ctx, "保存会员等级数据失败,store_id=%d level_id=%d", storeId, level.LevelID) } } } type DownData struct { CMD int `json:"cmd"` StoreId int `json:"storeId"` Data interface{} `json:"data"` }