完善功能
This commit is contained in:
60
utility/mqtt/amazon_sqs/amazon_sqs.go
Normal file
60
utility/mqtt/amazon_sqs/amazon_sqs.go
Normal file
@ -0,0 +1,60 @@
|
||||
package amazonsqs
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"server/utility/mqtt"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/service/sqs"
|
||||
)
|
||||
|
||||
type AmazonSQSClient struct {
|
||||
client *sqs.Client
|
||||
}
|
||||
|
||||
func NewAmazonSQSClient() *AmazonSQSClient {
|
||||
cfg, err := config.LoadDefaultConfig(context.TODO())
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return &AmazonSQSClient{
|
||||
client: sqs.NewFromConfig(cfg),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AmazonSQSClient) getQueueURL(ctx context.Context, queueName string) (string, error) {
|
||||
out, err := c.client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{
|
||||
QueueName: aws.String(queueName),
|
||||
})
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return *out.QueueUrl, nil
|
||||
}
|
||||
|
||||
func (c *AmazonSQSClient) Publish(ctx context.Context, topic string, payload interface{}) error {
|
||||
queueURL, err := c.getQueueURL(ctx, topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
body, err := json.Marshal(payload)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = c.client.SendMessage(ctx, &sqs.SendMessageInput{
|
||||
QueueUrl: aws.String(queueURL),
|
||||
MessageBody: aws.String(string(body)),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *AmazonSQSClient) Subscribe(topic string, handler func(payload interface{})) error {
|
||||
// 这里实现 SQS 的消息订阅逻辑
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
mqtt.RegisterClient("amazon_sqs", NewAmazonSQSClient())
|
||||
}
|
||||
18
utility/mqtt/mqtt.go
Normal file
18
utility/mqtt/mqtt.go
Normal file
@ -0,0 +1,18 @@
|
||||
package mqtt
|
||||
|
||||
import "context"
|
||||
|
||||
type MQTTClient interface {
|
||||
Publish(ctx context.Context, topic string, payload interface{}) error
|
||||
Subscribe(topic string, handler func(payload interface{})) error
|
||||
}
|
||||
|
||||
var clients = make(map[string]MQTTClient)
|
||||
|
||||
func RegisterClient(name string, client MQTTClient) {
|
||||
clients[name] = client
|
||||
}
|
||||
|
||||
func GetMQTTClient(name string) MQTTClient {
|
||||
return clients[name]
|
||||
}
|
||||
Reference in New Issue
Block a user