103 lines
2.8 KiB
Go
103 lines
2.8 KiB
Go
package emqx
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"sync"
|
||
"time"
|
||
|
||
mqttlib "github.com/eclipse/paho.mqtt.golang"
|
||
"github.com/gogf/gf/v2/frame/g"
|
||
"github.com/gogf/gf/v2/os/glog"
|
||
"server/utility/mqtt"
|
||
)
|
||
|
||
type emqxClient struct {
|
||
client mqttlib.Client
|
||
once sync.Once
|
||
}
|
||
|
||
// New 创建并连接 EMQX 客户端实例,连接成功才返回
|
||
func New(broker, clientID, username, password string) mqtt.MqttClient {
|
||
opts := mqttlib.NewClientOptions().
|
||
AddBroker(broker).
|
||
SetClientID(clientID).
|
||
SetUsername(username).
|
||
SetPassword(password).
|
||
SetAutoReconnect(true).
|
||
SetConnectTimeout(5 * time.Second)
|
||
|
||
c := &emqxClient{
|
||
client: mqttlib.NewClient(opts),
|
||
}
|
||
|
||
// 立即连接
|
||
err := c.connect()
|
||
if err != nil {
|
||
glog.Errorf(context.Background(), "连接EMQX失败: %v", err)
|
||
panic(fmt.Sprintf("连接EMQX失败: %v", err))
|
||
}
|
||
glog.Infof(context.Background(), "EMQX客户端连接成功")
|
||
return c
|
||
}
|
||
|
||
// connect 内部只调用一次连接
|
||
func (e *emqxClient) connect() error {
|
||
var err error
|
||
e.once.Do(func() {
|
||
token := e.client.Connect()
|
||
if token.Wait() && token.Error() != nil {
|
||
err = fmt.Errorf("EMQX连接失败: %w", token.Error())
|
||
glog.Errorf(context.Background(), err.Error())
|
||
return
|
||
}
|
||
glog.Infof(context.Background(), "EMQX连接成功")
|
||
})
|
||
return err
|
||
}
|
||
|
||
// Publish 实现接口 Publish
|
||
func (e *emqxClient) Publish(topic string, payload []byte) error {
|
||
token := e.client.Publish(topic, 0, false, payload)
|
||
token.Wait()
|
||
err := token.Error()
|
||
if err != nil {
|
||
glog.Errorf(context.Background(), "发布消息失败,topic=%s,错误:%v", topic, err)
|
||
} else {
|
||
glog.Infof(context.Background(), "成功发布消息,topic=%s,消息大小=%d字节", topic, len(payload))
|
||
}
|
||
return err
|
||
}
|
||
|
||
// Subscribe 实现接口 Subscribe
|
||
func (e *emqxClient) Subscribe(topic string, handler func(topic string, payload []byte)) error {
|
||
token := e.client.Subscribe(topic, 0, func(client mqttlib.Client, msg mqttlib.Message) {
|
||
handler(msg.Topic(), msg.Payload())
|
||
})
|
||
token.Wait()
|
||
err := token.Error()
|
||
if err != nil {
|
||
glog.Errorf(context.Background(), "订阅失败,topic=%s,错误:%v", topic, err)
|
||
} else {
|
||
glog.Infof(context.Background(), "成功订阅主题,topic=%s", topic)
|
||
}
|
||
return err
|
||
}
|
||
|
||
// init 注册emqx客户端
|
||
func init() {
|
||
ctx := context.Background()
|
||
cfg := g.Config()
|
||
host := cfg.MustGet(ctx, "mqtt.emqx.host").String()
|
||
port := cfg.MustGet(ctx, "mqtt.emqx.port").Int()
|
||
username := cfg.MustGet(ctx, "mqtt.emqx.username").String()
|
||
password := cfg.MustGet(ctx, "mqtt.emqx.password").String()
|
||
clientId := cfg.MustGet(ctx, "mqtt.emqx.clientId").String()
|
||
|
||
broker := fmt.Sprintf("tcp://%s:%d", host, port)
|
||
client := New(broker, clientId, username, password)
|
||
|
||
mqtt.Register("emqx", client)
|
||
glog.Infof(ctx, "EMQX客户端注册完成,broker=%s, clientID=%s", broker, clientId)
|
||
}
|