package emqx import ( "context" "fmt" "sync" "time" mqttlib "github.com/eclipse/paho.mqtt.golang" "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/os/glog" "server/utility/mqtt" ) type emqxClient struct { client mqttlib.Client once sync.Once } // New 创建并连接 EMQX 客户端实例,连接成功才返回 func New(broker, clientID, username, password string) mqtt.MqttClient { opts := mqttlib.NewClientOptions(). AddBroker(broker). SetClientID(clientID). SetUsername(username). SetPassword(password). SetAutoReconnect(true). SetConnectTimeout(5 * time.Second) c := &emqxClient{ client: mqttlib.NewClient(opts), } // 立即连接 err := c.connect() if err != nil { glog.Errorf(context.Background(), "连接EMQX失败: %v", err) panic(fmt.Sprintf("连接EMQX失败: %v", err)) } glog.Infof(context.Background(), "EMQX客户端连接成功") return c } // connect 内部只调用一次连接 func (e *emqxClient) connect() error { var err error e.once.Do(func() { token := e.client.Connect() if token.Wait() && token.Error() != nil { err = fmt.Errorf("EMQX连接失败: %w", token.Error()) glog.Errorf(context.Background(), err.Error()) return } glog.Infof(context.Background(), "EMQX连接成功") }) return err } // Publish 实现接口 Publish func (e *emqxClient) Publish(topic string, payload []byte) error { token := e.client.Publish(topic, 0, false, payload) token.Wait() err := token.Error() if err != nil { glog.Errorf(context.Background(), "发布消息失败,topic=%s,错误:%v", topic, err) } else { glog.Infof(context.Background(), "成功发布消息,topic=%s,消息大小=%d字节", topic, len(payload)) } return err } // Subscribe 实现接口 Subscribe func (e *emqxClient) Subscribe(topic string, handler func(topic string, payload []byte)) error { token := e.client.Subscribe(topic, 0, func(client mqttlib.Client, msg mqttlib.Message) { handler(msg.Topic(), msg.Payload()) }) token.Wait() err := token.Error() if err != nil { glog.Errorf(context.Background(), "订阅失败,topic=%s,错误:%v", topic, err) } else { glog.Infof(context.Background(), "成功订阅主题,topic=%s", topic) } return err } // init 注册emqx客户端 func init() { ctx := context.Background() cfg := g.Config() host := cfg.MustGet(ctx, "mqtt.emqx.host").String() port := cfg.MustGet(ctx, "mqtt.emqx.port").Int() username := cfg.MustGet(ctx, "mqtt.emqx.username").String() password := cfg.MustGet(ctx, "mqtt.emqx.password").String() clientId := cfg.MustGet(ctx, "mqtt.emqx.clientId").String() broker := fmt.Sprintf("tcp://%s:%d", host, port) client := New(broker, clientId, username, password) mqtt.Register("emqx", client) glog.Infof(ctx, "EMQX客户端注册完成,broker=%s, clientID=%s", broker, clientId) }