Files
arenax-server/utility/mqtt/emqx/emqx.go

233 lines
6.5 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package emqx
import (
"context"
"fmt"
"github.com/gogf/gf/v2/encoding/gjson"
"server/internal/consts"
"strconv"
"sync"
"time"
mqttlib "github.com/eclipse/paho.mqtt.golang"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"server/utility/mqtt"
)
type emqxClient struct {
client mqttlib.Client
once sync.Once
}
// New 创建并连接 EMQX 客户端实例,连接成功才返回
func New(broker, clientID, username, password string) mqtt.MqttClient {
opts := mqttlib.NewClientOptions().
AddBroker(broker).
SetClientID(clientID).
SetUsername(username).
SetPassword(password).
SetAutoReconnect(true).
SetConnectTimeout(5 * time.Second)
c := &emqxClient{
client: mqttlib.NewClient(opts),
}
// 立即连接
err := c.connect()
if err != nil {
glog.Errorf(context.Background(), "连接EMQX失败: %v", err)
panic(fmt.Sprintf("连接EMQX失败: %v", err))
}
glog.Infof(context.Background(), "EMQX客户端连接成功")
return c
}
// connect 内部只调用一次连接
func (e *emqxClient) connect() error {
var err error
e.once.Do(func() {
token := e.client.Connect()
if token.Wait() && token.Error() != nil {
err = fmt.Errorf("EMQX连接失败: %w", token.Error())
glog.Errorf(context.Background(), err.Error())
return
}
glog.Infof(context.Background(), "EMQX连接成功")
})
return err
}
// Publish 实现接口 Publish
func (e *emqxClient) Publish(topic string, payload []byte) error {
token := e.client.Publish(topic, 0, false, payload)
token.Wait()
err := token.Error()
if err != nil {
glog.Errorf(context.Background(), "发布消息失败topic=%s错误%v", topic, err)
} else {
glog.Infof(context.Background(), "成功发布消息topic=%s消息大小=%d字节", topic, len(payload))
}
return err
}
// Subscribe 实现接口 Subscribe
func (e *emqxClient) Subscribe(topic string, handler func(topic string, payload []byte)) error {
token := e.client.Subscribe(topic, 0, func(client mqttlib.Client, msg mqttlib.Message) {
handler(msg.Topic(), msg.Payload())
})
token.Wait()
err := token.Error()
if err != nil {
glog.Errorf(context.Background(), "订阅失败topic=%s错误%v", topic, err)
} else {
glog.Infof(context.Background(), "成功订阅主题topic=%s", topic)
}
return err
}
type DeviceData struct {
NetbarAccount string `json:"netbarAccount"`
DeviceId string `json:"deviceId"`
DeviceName string `json:"deviceName"`
IP string `json:"ip"`
MACAddress string `json:"macAddress"`
}
// init 注册emqx客户端
func init() {
// 创建可取消的上下文
ctx := context.Background()
// 加载 MQTT 配置
cfg := g.Config()
host := cfg.MustGet(ctx, "mqtt.emqx.host").String()
port := cfg.MustGet(ctx, "mqtt.emqx.port").Int()
username := cfg.MustGet(ctx, "mqtt.emqx.username").String()
password := cfg.MustGet(ctx, "mqtt.emqx.password").String()
clientId := cfg.MustGet(ctx, "mqtt.emqx.clientId").String()
broker := fmt.Sprintf("tcp://%s:%d", host, port)
client := New(broker, clientId, username, password)
// 注册 MQTT 客户端
mqtt.Register("emqx", client)
glog.Infof(ctx, "EMQX 客户端注册完成broker=%s, clientID=%s", broker, clientId)
// 订阅设备上线消息
go func() {
ctx := context.Background()
err := client.Subscribe("/up", func(topic string, payload []byte) {
glog.Infof(ctx, "收到 MQTT 消息topic=%s", topic)
var data DeviceData
if err := gjson.Unmarshal(payload, &data); err != nil {
glog.Errorf(ctx, "[/up] 解析设备信息失败: %v", err)
return
}
// 增加门店在线设备计数
key := fmt.Sprintf(consts.NetbarOnlineNumberKey, data.NetbarAccount)
if _, err := g.Redis().Incr(ctx, key); err != nil {
glog.Errorf(ctx, "增加 Redis 计数失败 %s: %v", key, err)
return
}
// 更新在线设备集合
setKey := "system:online:devices"
if _, err := g.Redis().HSet(ctx, setKey, map[string]interface{}{
data.DeviceId: time.Now().Unix(),
}); err != nil {
glog.Errorf(ctx, "更新在线设备集合失败 %s: %v", setKey, err)
return
}
// 存储设备信息(用于离线检测时获取 NetbarAccount
deviceKey := fmt.Sprintf("device:%s", data.DeviceId)
if _, err := g.Redis().HSet(ctx, deviceKey, map[string]interface{}{
"netbarAccount": data.NetbarAccount,
"deviceName": data.DeviceName,
"ip": data.IP,
"macAddress": data.MACAddress,
}); err != nil {
glog.Errorf(ctx, "存储设备信息失败 %s: %v", deviceKey, err)
}
})
if err != nil {
glog.Errorf(ctx, "订阅 /up 失败: %v", err)
}
}()
// 监控离线设备
go func() {
ctx := context.Background()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
glog.Info(ctx, "停止离线设备监控")
return
case <-ticker.C:
setKey := "system:online:devices"
devicesVar, err := g.Redis().HGetAll(ctx, setKey)
if err != nil {
glog.Errorf(ctx, "获取在线设备失败: %v", err)
continue
}
// 转换为 map[string]string
devices := devicesVar.MapStrStr()
if len(devices) == 0 {
continue
}
now := time.Now().Unix()
for deviceId, timestampStr := range devices {
// 使用 strconv.ParseInt 替代 time.ParseInt
timestamp, err := strconv.ParseInt(timestampStr, 10, 64)
if err != nil {
glog.Errorf(ctx, "无效时间戳 for 设备 %s: %v", deviceId, err)
continue
}
// 检查设备是否离线(超过 10 秒未更新)
if now-timestamp > 10 {
// 获取设备信息
dataKey := fmt.Sprintf("device:%s", deviceId)
dataVar, err := g.Redis().HGetAll(ctx, dataKey)
if err != nil {
glog.Errorf(ctx, "获取设备数据失败 %s: %v", dataKey, err)
continue
}
// 转换为 map[string]string
data := dataVar.MapStrStr()
netbarAccount, exists := data["netbarAccount"]
if !exists {
glog.Errorf(ctx, "设备 %s 缺少 netbarAccount", deviceId)
continue
}
// 减少在线设备计数
key := fmt.Sprintf(consts.NetbarOnlineNumberKey, netbarAccount)
if _, err := g.Redis().Decr(ctx, key); err != nil {
glog.Errorf(ctx, "减少 Redis 计数失败 %s: %v", key, err)
}
// 从在线设备集合中移除
if _, err := g.Redis().HDel(ctx, setKey, deviceId); err != nil {
glog.Errorf(ctx, "移除设备 %s 从在线集合失败: %v", deviceId, err)
} else {
glog.Infof(ctx, "设备 %s 已标记为离线", deviceId)
}
}
}
}
}
}()
}