Files
arenax-server/utility/mqtt/emqx/emqx.go

103 lines
2.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package emqx
import (
"context"
"fmt"
"sync"
"time"
mqttlib "github.com/eclipse/paho.mqtt.golang"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/os/glog"
"server/utility/mqtt"
)
type emqxClient struct {
client mqttlib.Client
once sync.Once
}
// New 创建并连接 EMQX 客户端实例,连接成功才返回
func New(broker, clientID, username, password string) mqtt.MqttClient {
opts := mqttlib.NewClientOptions().
AddBroker(broker).
SetClientID(clientID).
SetUsername(username).
SetPassword(password).
SetAutoReconnect(true).
SetConnectTimeout(5 * time.Second)
c := &emqxClient{
client: mqttlib.NewClient(opts),
}
// 立即连接
err := c.connect()
if err != nil {
glog.Errorf(context.Background(), "连接EMQX失败: %v", err)
panic(fmt.Sprintf("连接EMQX失败: %v", err))
}
glog.Infof(context.Background(), "EMQX客户端连接成功")
return c
}
// connect 内部只调用一次连接
func (e *emqxClient) connect() error {
var err error
e.once.Do(func() {
token := e.client.Connect()
if token.Wait() && token.Error() != nil {
err = fmt.Errorf("EMQX连接失败: %w", token.Error())
glog.Errorf(context.Background(), err.Error())
return
}
glog.Infof(context.Background(), "EMQX连接成功")
})
return err
}
// Publish 实现接口 Publish
func (e *emqxClient) Publish(topic string, payload []byte) error {
token := e.client.Publish(topic, 0, false, payload)
token.Wait()
err := token.Error()
if err != nil {
glog.Errorf(context.Background(), "发布消息失败topic=%s错误%v", topic, err)
} else {
glog.Infof(context.Background(), "成功发布消息topic=%s消息大小=%d字节", topic, len(payload))
}
return err
}
// Subscribe 实现接口 Subscribe
func (e *emqxClient) Subscribe(topic string, handler func(topic string, payload []byte)) error {
token := e.client.Subscribe(topic, 0, func(client mqttlib.Client, msg mqttlib.Message) {
handler(msg.Topic(), msg.Payload())
})
token.Wait()
err := token.Error()
if err != nil {
glog.Errorf(context.Background(), "订阅失败topic=%s错误%v", topic, err)
} else {
glog.Infof(context.Background(), "成功订阅主题topic=%s", topic)
}
return err
}
// init 注册emqx客户端
func init() {
ctx := context.Background()
cfg := g.Config()
host := cfg.MustGet(ctx, "mqtt.emqx.host").String()
port := cfg.MustGet(ctx, "mqtt.emqx.port").Int()
username := cfg.MustGet(ctx, "mqtt.emqx.username").String()
password := cfg.MustGet(ctx, "mqtt.emqx.password").String()
clientId := cfg.MustGet(ctx, "mqtt.emqx.clientId").String()
broker := fmt.Sprintf("tcp://%s:%d", host, port)
client := New(broker, clientId, username, password)
mqtt.Register("emqx", client)
glog.Infof(ctx, "EMQX客户端注册完成broker=%s, clientID=%s", broker, clientId)
}