Files
arenax-server/utility/mqtt/emqx/emqx.go

297 lines
9.0 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package emqx
import (
"context"
"fmt"
"github.com/gogf/gf/v2/encoding/gjson"
"github.com/gogf/gf/v2/util/gconv"
"server/internal/dao"
"strconv"
"sync"
"time"
mqttlib "github.com/eclipse/paho.mqtt.golang"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"server/utility/mqtt"
)
type emqxClient struct {
client mqttlib.Client
once sync.Once
}
// New 创建并连接 EMQX 客户端实例,连接成功才返回
func New(broker, clientID, username, password string) mqtt.MqttClient {
opts := mqttlib.NewClientOptions().
AddBroker(broker).
SetClientID(clientID).
SetUsername(username).
SetPassword(password).
SetAutoReconnect(true).
SetConnectTimeout(5 * time.Second)
c := &emqxClient{
client: mqttlib.NewClient(opts),
}
// 立即连接
err := c.connect()
if err != nil {
glog.Errorf(context.Background(), "连接EMQX失败: %v", err)
panic(fmt.Sprintf("连接EMQX失败: %v", err))
}
glog.Infof(context.Background(), "EMQX客户端连接成功")
return c
}
// connect 内部只调用一次连接
func (e *emqxClient) connect() error {
var err error
e.once.Do(func() {
token := e.client.Connect()
if token.Wait() && token.Error() != nil {
err = fmt.Errorf("EMQX连接失败: %w", token.Error())
glog.Errorf(context.Background(), err.Error())
return
}
glog.Infof(context.Background(), "EMQX连接成功")
})
return err
}
// Publish 实现接口 Publish
func (e *emqxClient) Publish(topic string, payload []byte) error {
token := e.client.Publish(topic, 0, false, payload)
token.Wait()
err := token.Error()
if err != nil {
glog.Errorf(context.Background(), "发布消息失败topic=%s错误%v", topic, err)
} else {
glog.Infof(context.Background(), "成功发布消息topic=%s消息大小=%d字节", topic, len(payload))
}
return err
}
// Subscribe 实现接口 Subscribe
func (e *emqxClient) Subscribe(topic string, handler func(topic string, payload []byte)) error {
token := e.client.Subscribe(topic, 0, func(client mqttlib.Client, msg mqttlib.Message) {
handler(msg.Topic(), msg.Payload())
})
token.Wait()
err := token.Error()
if err != nil {
glog.Errorf(context.Background(), "订阅失败topic=%s错误%v", topic, err)
} else {
glog.Infof(context.Background(), "成功订阅主题topic=%s", topic)
}
return err
}
type DeviceData struct {
NetbarAccount string `json:"netbarAccount"`
DeviceId string `json:"deviceId"`
DeviceName string `json:"deviceName"`
IP string `json:"ip"`
MACAddress string `json:"macAddress"`
}
// init 注册emqx客户端
func init() {
// 创建可取消的上下文
ctx := context.Background()
// 加载 MQTT 配置
cfg := g.Config()
host := cfg.MustGet(ctx, "mqtt.emqx.host").String()
port := cfg.MustGet(ctx, "mqtt.emqx.port").Int()
username := cfg.MustGet(ctx, "mqtt.emqx.username").String()
password := cfg.MustGet(ctx, "mqtt.emqx.password").String()
clientId := cfg.MustGet(ctx, "mqtt.emqx.clientId").String()
broker := fmt.Sprintf("tcp://%s:%d", host, port)
client := New(broker, clientId, username, password)
// 注册 MQTT 客户端
mqtt.Register("emqx", client)
glog.Infof(ctx, "EMQX 客户端注册完成broker=%s, clientID=%s", broker, clientId)
// 订阅设备上线消息
// 订阅设备上线消息
go func() {
ctx := context.Background()
err := client.Subscribe("/+/up", func(topic string, payload []byte) {
glog.Infof(ctx, "收到 MQTT 消息topic=%s", topic)
var data DeviceData
if err := gjson.Unmarshal(payload, &data); err != nil {
glog.Errorf(ctx, "[/up] 解析设备信息失败: %v", err)
return
}
deviceId := data.DeviceId
netbarAccount := data.NetbarAccount
now := time.Now().Unix()
// Redis 统一 key
onlineDevicesKey := "system_device:online_devices"
lastOnlineKey := fmt.Sprintf("system_device:last_online:%s", deviceId)
deviceInfoKey := fmt.Sprintf("system_device:info:%s", deviceId)
netbarDeviceCountKey := fmt.Sprintf("system_device:netbar_device_count:%s", netbarAccount)
// 判断设备是否在线
exists, err := g.Redis().HExists(ctx, onlineDevicesKey, deviceId)
if err != nil {
glog.Errorf(ctx, "查询设备在线状态失败 %s: %v", deviceId, err)
return
}
// 获取上次上线时间,判断是否断线重连
lastOnlineStr, err := g.Redis().Get(ctx, lastOnlineKey)
shouldSend := false
needIncr := false
if exists != 1 {
// 设备不在线,首次上线
shouldSend = true
needIncr = true
} else if err != nil || lastOnlineStr.IsEmpty() {
// 无上线时间记录,断线重连
shouldSend = true
needIncr = true
} else {
lastOnline, err := strconv.ParseInt(lastOnlineStr.String(), 10, 64)
if err != nil || now-lastOnline > 10 {
// 超过10秒断线重连
shouldSend = true
needIncr = true
} else {
// 在线且未断线,不加计数,不发送配置
shouldSend = false
needIncr = false
}
}
// 更新上线时间并设置20秒过期
if _, err := g.Redis().Set(ctx, lastOnlineKey, now); err != nil {
glog.Errorf(ctx, "更新上线时间失败 %s: %v", lastOnlineKey, err)
}
if _, err := g.Redis().Expire(ctx, lastOnlineKey, 20); err != nil {
glog.Errorf(ctx, "设置上线时间过期失败 %s: %v", lastOnlineKey, err)
}
// 需要时增加在线设备计数
if needIncr {
if _, err := g.Redis().Incr(ctx, netbarDeviceCountKey); err != nil {
glog.Errorf(ctx, "增加 Netbar 在线设备数失败 %s: %v", netbarDeviceCountKey, err)
}
}
// 更新在线设备集合(时间戳)
if _, err := g.Redis().HSet(ctx, onlineDevicesKey, map[string]interface{}{
deviceId: now,
}); err != nil {
glog.Errorf(ctx, "更新在线设备集合失败 %s: %v", onlineDevicesKey, err)
return
}
// 更新设备基础信息并设置1小时过期
if _, err := g.Redis().HSet(ctx, deviceInfoKey, map[string]interface{}{
"netbarAccount": netbarAccount,
"deviceName": data.DeviceName,
"ip": data.IP,
"macAddress": data.MACAddress,
}); err != nil {
glog.Errorf(ctx, "存储设备信息失败 %s: %v", deviceInfoKey, err)
}
if _, err := g.Redis().Expire(ctx, deviceInfoKey, 3600); err != nil {
glog.Errorf(ctx, "设置设备信息过期失败 %s: %v", deviceInfoKey, err)
}
// 首次或断线重连发送配置
if shouldSend {
one, err := dao.Stores.Ctx(ctx).InnerJoin(dao.StoreDesktopSettings.Table(), "sds", "sds.store_id = stores.id").
Where("stores.netbar_account = ?", netbarAccount).
Fields("stores.netbar_account netbarAccount, sds.top_component_visible topComponentVisible, sds.right_component_visible rightComponentVisible").One()
if err != nil {
glog.Errorf(ctx, "获取门店信息失败 %s: %v", deviceInfoKey, err)
}
if err = client.Publish(fmt.Sprintf("/%s/down", netbarAccount), gconv.Bytes(one.Json())); err != nil {
glog.Errorf(ctx, "发布消息失败 %s: %v", deviceInfoKey, err)
}
}
})
if err != nil {
glog.Errorf(ctx, "订阅 /+/up 失败: %v", err)
}
}()
// 监控离线设备
go func() {
ctx := context.Background()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
glog.Info(ctx, "停止离线设备监控")
return
case <-ticker.C:
onlineDevicesKey := "system_device:online_devices"
devicesVar, err := g.Redis().HGetAll(ctx, onlineDevicesKey)
if err != nil {
glog.Errorf(ctx, "获取在线设备失败: %v", err)
continue
}
devices := devicesVar.MapStrStr()
if len(devices) == 0 {
continue
}
now := time.Now().Unix()
for deviceId, timestampStr := range devices {
timestamp, err := strconv.ParseInt(timestampStr, 10, 64)
if err != nil {
glog.Errorf(ctx, "无效时间戳 for 设备 %s: %v", deviceId, err)
continue
}
if now-timestamp > 10 {
// 超过10秒未更新认定离线
deviceInfoKey := fmt.Sprintf("system_device:info:%s", deviceId)
dataVar, err := g.Redis().HGetAll(ctx, deviceInfoKey)
if err != nil {
glog.Errorf(ctx, "获取设备数据失败 %s: %v", deviceInfoKey, err)
continue
}
data := dataVar.MapStrStr()
netbarAccount, exists := data["netbarAccount"]
if !exists {
glog.Errorf(ctx, "设备 %s 缺少 netbarAccount", deviceId)
continue
}
// 新增system_device命名空间按账号统计在线设备数 -1
netbarDeviceCountKey := fmt.Sprintf("system_device:netbar_device_count:%s", netbarAccount)
if _, err := g.Redis().Decr(ctx, netbarDeviceCountKey); err != nil {
glog.Errorf(ctx, "减少 Netbar 在线设备数失败 %s: %v", netbarDeviceCountKey, err)
}
// 从在线设备集合中删除
if _, err := g.Redis().HDel(ctx, onlineDevicesKey, deviceId); err != nil {
glog.Errorf(ctx, "移除设备 %s 从在线集合失败: %v", deviceId, err)
} else {
glog.Infof(ctx, "设备 %s 已标记为离线", deviceId)
}
}
}
}
}
}()
}