实现缓存+mqtt设备消息监听

This commit is contained in:
2025-06-26 16:31:43 +08:00
parent 8107b9a719
commit efed64ed9e
9 changed files with 180 additions and 62 deletions

View File

@ -3,6 +3,9 @@ package emqx
import (
"context"
"fmt"
"github.com/gogf/gf/v2/encoding/gjson"
"server/internal/consts"
"strconv"
"sync"
"time"
@ -84,9 +87,20 @@ func (e *emqxClient) Subscribe(topic string, handler func(topic string, payload
return err
}
type DeviceData struct {
NetbarAccount string `json:"netbarAccount"`
DeviceId string `json:"deviceId"`
DeviceName string `json:"deviceName"`
IP string `json:"ip"`
MACAddress string `json:"macAddress"`
}
// init 注册emqx客户端
func init() {
// 创建可取消的上下文
ctx := context.Background()
// 加载 MQTT 配置
cfg := g.Config()
host := cfg.MustGet(ctx, "mqtt.emqx.host").String()
port := cfg.MustGet(ctx, "mqtt.emqx.port").Int()
@ -97,6 +111,122 @@ func init() {
broker := fmt.Sprintf("tcp://%s:%d", host, port)
client := New(broker, clientId, username, password)
// 注册 MQTT 客户端
mqtt.Register("emqx", client)
glog.Infof(ctx, "EMQX客户端注册完成broker=%s, clientID=%s", broker, clientId)
glog.Infof(ctx, "EMQX 客户端注册完成broker=%s, clientID=%s", broker, clientId)
// 订阅设备上线消息
go func() {
ctx := context.Background()
err := client.Subscribe("/up", func(topic string, payload []byte) {
glog.Infof(ctx, "收到 MQTT 消息topic=%s", topic)
var data DeviceData
if err := gjson.Unmarshal(payload, &data); err != nil {
glog.Errorf(ctx, "[/up] 解析设备信息失败: %v", err)
return
}
// 增加门店在线设备计数
key := fmt.Sprintf(consts.NetbarOnlineNumberKey, data.NetbarAccount)
if _, err := g.Redis().Incr(ctx, key); err != nil {
glog.Errorf(ctx, "增加 Redis 计数失败 %s: %v", key, err)
return
}
// 更新在线设备集合
setKey := "system:online:devices"
if _, err := g.Redis().HSet(ctx, setKey, map[string]interface{}{
data.DeviceId: time.Now().Unix(),
}); err != nil {
glog.Errorf(ctx, "更新在线设备集合失败 %s: %v", setKey, err)
return
}
// 存储设备信息(用于离线检测时获取 NetbarAccount
deviceKey := fmt.Sprintf("device:%s", data.DeviceId)
if _, err := g.Redis().HSet(ctx, deviceKey, map[string]interface{}{
"netbarAccount": data.NetbarAccount,
"deviceName": data.DeviceName,
"ip": data.IP,
"macAddress": data.MACAddress,
}); err != nil {
glog.Errorf(ctx, "存储设备信息失败 %s: %v", deviceKey, err)
}
})
if err != nil {
glog.Errorf(ctx, "订阅 /up 失败: %v", err)
}
}()
// 监控离线设备
go func() {
ctx := context.Background()
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
glog.Info(ctx, "停止离线设备监控")
return
case <-ticker.C:
setKey := "system:online:devices"
devicesVar, err := g.Redis().HGetAll(ctx, setKey)
if err != nil {
glog.Errorf(ctx, "获取在线设备失败: %v", err)
continue
}
// 转换为 map[string]string
devices := devicesVar.MapStrStr()
if len(devices) == 0 {
continue
}
now := time.Now().Unix()
for deviceId, timestampStr := range devices {
// 使用 strconv.ParseInt 替代 time.ParseInt
timestamp, err := strconv.ParseInt(timestampStr, 10, 64)
if err != nil {
glog.Errorf(ctx, "无效时间戳 for 设备 %s: %v", deviceId, err)
continue
}
// 检查设备是否离线(超过 10 秒未更新)
if now-timestamp > 10 {
// 获取设备信息
dataKey := fmt.Sprintf("device:%s", deviceId)
dataVar, err := g.Redis().HGetAll(ctx, dataKey)
if err != nil {
glog.Errorf(ctx, "获取设备数据失败 %s: %v", dataKey, err)
continue
}
// 转换为 map[string]string
data := dataVar.MapStrStr()
netbarAccount, exists := data["netbarAccount"]
if !exists {
glog.Errorf(ctx, "设备 %s 缺少 netbarAccount", deviceId)
continue
}
// 减少在线设备计数
key := fmt.Sprintf(consts.NetbarOnlineNumberKey, netbarAccount)
if _, err := g.Redis().Decr(ctx, key); err != nil {
glog.Errorf(ctx, "减少 Redis 计数失败 %s: %v", key, err)
}
// 从在线设备集合中移除
if _, err := g.Redis().HDel(ctx, setKey, deviceId); err != nil {
glog.Errorf(ctx, "移除设备 %s 从在线集合失败: %v", deviceId, err)
} else {
glog.Infof(ctx, "设备 %s 已标记为离线", deviceId)
}
}
}
}
}
}()
}