Files
arenax-server/utility/mqtt/emqx/emqx.go

344 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package emqx
import (
"context"
"encoding/json"
"fmt"
"github.com/gogf/gf/v2/os/gtime"
"server/internal/consts"
"server/internal/dao"
"server/internal/model/do"
"sync"
"time"
mqttlib "github.com/eclipse/paho.mqtt.golang"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"server/utility/mqtt"
)
type emqxClient struct {
client mqttlib.Client
once sync.Once
}
// New 创建并连接 EMQX 客户端实例,连接成功才返回
func New(broker, clientID, username, password string) mqtt.MqttClient {
opts := mqttlib.NewClientOptions().
AddBroker(broker).
SetClientID(clientID).
SetUsername(username).
SetPassword(password).
SetAutoReconnect(true).
SetConnectTimeout(5 * time.Second)
c := &emqxClient{
client: mqttlib.NewClient(opts),
}
// 立即连接
err := c.connect()
if err != nil {
glog.Errorf(context.Background(), "连接EMQX失败: %v", err)
panic(fmt.Sprintf("连接EMQX失败: %v", err))
}
glog.Infof(context.Background(), "EMQX客户端连接成功")
return c
}
// connect 内部只调用一次连接
func (e *emqxClient) connect() error {
var err error
e.once.Do(func() {
token := e.client.Connect()
if token.Wait() && token.Error() != nil {
err = fmt.Errorf("EMQX连接失败: %w", token.Error())
glog.Errorf(context.Background(), err.Error())
return
}
glog.Infof(context.Background(), "EMQX连接成功")
})
return err
}
// Publish 实现接口 Publish
func (e *emqxClient) Publish(topic string, payload []byte) error {
token := e.client.Publish(topic, 0, false, payload)
token.Wait()
err := token.Error()
if err != nil {
glog.Errorf(context.Background(), "发布消息失败topic=%s错误%v", topic, err)
} else {
glog.Infof(context.Background(), "成功发布消息topic=%s消息大小=%d字节", topic, len(payload))
}
return err
}
// Subscribe 实现接口 Subscribe
func (e *emqxClient) Subscribe(topic string, handler func(topic string, payload []byte)) error {
token := e.client.Subscribe(topic, 0, func(client mqttlib.Client, msg mqttlib.Message) {
handler(msg.Topic(), msg.Payload())
})
token.Wait()
err := token.Error()
if err != nil {
glog.Errorf(context.Background(), "订阅失败topic=%s错误%v", topic, err)
} else {
glog.Infof(context.Background(), "成功订阅主题topic=%s", topic)
}
return err
}
type MemberLevelData struct {
LevelID int `json:"level_id"` // 会员等级ID如0表示临时卡
LevelName string `json:"level_name"` // 等级名称
Level int `json:"level"` // 等级顺序,值越大等级越高
OlType int8 `json:"ol_type"` // 在线类型例如1=线上
Status int8 `json:"status"` // 状态1=启用2=禁用
}
type ClientData struct {
AreaName string `json:"area_name"`
ClientName string `json:"client_name"`
Status int8 `json:"status"`
}
type ClientSession struct {
AreaName string `json:"area_name"`
ClientName string `json:"client_name"`
CardId string `json:"card_id"`
XyUserId string `json:"xy_user_id"`
Level int `json:"level"`
LevelName string `json:"level_name"`
StartTime string `json:"start_time"`
EndTime string `json:"end_time"`
UsedTime int `json:"used_time"`
}
// init 注册emqx客户端
func init() {
// 创建可取消的上下文
ctx := context.Background()
// 加载 MQTT 配置
cfg := g.Config()
host := cfg.MustGet(ctx, "mqtt.emqx.host").String()
port := cfg.MustGet(ctx, "mqtt.emqx.port").Int()
username := cfg.MustGet(ctx, "mqtt.emqx.username").String()
password := cfg.MustGet(ctx, "mqtt.emqx.password").String()
clientId := cfg.MustGet(ctx, "mqtt.emqx.clientId").String()
broker := fmt.Sprintf("tcp://%s:%d", host, port)
client := New(broker, clientId, username, password)
// 注册 MQTT 客户端
mqtt.Register("emqx", client)
glog.Infof(ctx, "EMQX 客户端注册完成broker=%s, clientID=%s", broker, clientId)
go func() {
// 监听数据上报通道
client.Subscribe("/+/up", func(topic string, payload []byte) {
var base struct {
Cmd int `json:"cmd"`
StoreId int `json:"storeId"`
Data json.RawMessage `json:"data"`
}
if err := json.Unmarshal(payload, &base); err != nil {
glog.Errorf(ctx, "解析数据失败topic=%s错误%v", topic, err)
return
}
switch base.Cmd {
case consts.CmdMemberLevels:
// 当前门店会员等级列表数据
levels := make([]MemberLevelData, 0)
if err := json.Unmarshal(base.Data, &levels); err != nil {
glog.Errorf(ctx, "解析会员等级数据失败topic=%s错误%v", topic, err)
return
}
saveOrUpdateLevels(ctx, base.StoreId, levels)
glog.Infof(ctx, "成功保存会员等级数据store_id=%d", base.StoreId)
case consts.CmdClientList:
// 当前门店客户机列表数据
clients := make([]ClientData, 0)
if err := json.Unmarshal(base.Data, &clients); err != nil {
glog.Errorf(ctx, "解析客户机数据失败topic=%s错误%v", topic, err)
return
}
saveOrUpdateClients(ctx, base.StoreId, clients)
case consts.CmdClientUp, consts.CmdClientDown:
session := ClientSession{}
if err := json.Unmarshal(base.Data, &session); err != nil {
glog.Errorf(ctx, "解析客户机数据失败topic=%s错误%v", topic, err)
return
}
saveOrUpdateClientSession(ctx, base.StoreId, session)
}
})
}()
}
func saveOrUpdateClientSession(ctx context.Context, storeId int, session ClientSession) {
// 查询客户机信息
client, err := dao.StoreClients.Ctx(ctx).
Where("store_id", storeId).
Where("client_name", session.ClientName).
One()
if err != nil || client.IsEmpty() {
glog.Errorf(ctx, "未找到客户机信息store_id=%d client_name=%s", storeId, session.ClientName)
return
}
clientId := client["id"].Int64()
areaId := client["area_id"].Int64()
// 上机逻辑(没有 end_time
if session.EndTime == "" {
// 插入上机记录
_, err := dao.StoreClientSessions.Ctx(ctx).Data(do.StoreClientSessions{
StoreId: int64(storeId),
ClientId: clientId,
AreaId: areaId,
XyUserId: session.XyUserId,
LevelId: session.Level,
LevelName: session.LevelName,
StartTime: gtime.NewFromStr(session.StartTime),
AreaName: session.AreaName,
}).Insert()
if err != nil {
glog.Errorf(ctx, "插入上机记录失败store_id=%d client_id=%d xy_user_id=%s错误%v", storeId, clientId, session.XyUserId, err)
return
}
// 更新客户机状态为 上机中3
_, err = dao.StoreClients.Ctx(ctx).
Where("id", clientId).
Data(g.Map{"status": 3}).
Update()
if err != nil {
glog.Errorf(ctx, "更新客户机状态为上机中失败client_id=%d错误%v", clientId, err)
}
return
}
// 下机逻辑end_time 存在)
_, err = dao.StoreClientSessions.Ctx(ctx).
Where("store_id", storeId).
Where("client_id", clientId).
Where("xy_user_id", session.XyUserId).
Where("start_time", session.StartTime).
Data(g.Map{
"end_time": session.EndTime,
"used_time": session.UsedTime,
}).Update()
if err != nil {
glog.Errorf(ctx, "更新下机记录失败store_id=%d client_id=%d xy_user_id=%s错误%v", storeId, clientId, session.XyUserId, err)
return
}
// 更新客户机状态为 空闲1
_, err = dao.StoreClients.Ctx(ctx).
Where("id", clientId).
Data(g.Map{"status": 1}).
Update()
if err != nil {
glog.Errorf(ctx, "更新客户机状态为空闲失败client_id=%d错误%v", clientId, err)
}
}
func saveOrUpdateClients(ctx context.Context, storeId int, clients []ClientData) {
for _, client := range clients {
var areaId int64
// Step 1: 查询区域 ID
value, err := dao.StoreAreas.Ctx(ctx).Where(do.StoreAreas{StoreId: storeId, AreaName: client.AreaName}).Fields(dao.StoreAreas.Columns().Id).Value()
if err != nil {
glog.Errorf(ctx, "查询区域失败 store_id=%d area_name=%s err=%v", storeId, client.AreaName, err)
continue
}
if value.IsEmpty() {
// 不存在则插入数据
id, err := dao.StoreAreas.Ctx(ctx).Data(do.StoreAreas{
StoreId: storeId,
AreaName: client.AreaName,
}).InsertAndGetId()
if err != nil {
glog.Errorf(ctx, "插入区域失败 store_id=%d area_name=%s err=%v", storeId, client.AreaName, err)
continue
}
areaId = id
} else {
areaId = value.Int64()
}
// Step 2: 查询客户机是否存在
count, err := dao.StoreClients.Ctx(ctx).Where(do.StoreClients{StoreId: storeId, ClientName: client.ClientName, AreaId: areaId}).Count()
if err != nil {
glog.Errorf(ctx, "查询客户机失败 store_id=%d client_name=%s err=%v", storeId, client.ClientName, err)
continue
}
data := g.Map{
"store_id": storeId,
"client_name": client.ClientName,
"area_id": areaId,
"status": client.Status,
}
// Step 3: 更新 or 插入
if count > 0 {
_, err = dao.StoreClients.Ctx(ctx).
Data(data).
Where("store_id", storeId).
Where("client_name", client.ClientName).
Update()
if err != nil {
glog.Errorf(ctx, "更新客户机失败 store_id=%d client_name=%s err=%v", storeId, client.ClientName, err)
}
} else {
_, err = dao.StoreClients.Ctx(ctx).
Data(data).
Insert()
if err != nil {
glog.Errorf(ctx, "插入客户机失败 store_id=%d client_name=%s err=%v", storeId, client.ClientName, err)
}
}
}
}
func saveOrUpdateLevels(ctx context.Context, storeId int, levels []MemberLevelData) {
for _, level := range levels {
exists, err := dao.StoreMemberLevels.Ctx(ctx).
Where("store_id", storeId).
Where("level_id", level.LevelID).
Count()
if err != nil {
glog.Infof(ctx, "检查会员等级是否存在失败store_id=%d level_id=%d", storeId, level.LevelID)
}
data := g.Map{
"store_id": storeId,
"level_id": level.LevelID,
"level_name": level.LevelName,
"level": level.Level,
"ol_type": level.OlType,
"status": level.Status,
}
if exists > 0 {
_, err = dao.StoreMemberLevels.Ctx(ctx).
Where("store_id", storeId).
Where("level_id", level.LevelID).
Data(data).
Update()
} else {
_, err = dao.StoreMemberLevels.Ctx(ctx).
Data(data).
Insert()
}
if err != nil {
glog.Errorf(ctx, "保存会员等级数据失败store_id=%d level_id=%d", storeId, level.LevelID)
}
}
}
type DownData struct {
CMD int `json:"cmd"`
StoreId int `json:"storeId"`
Data interface{} `json:"data"`
}