package amazonsqs import ( "context" "encoding/json" "server/utility/mqtt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/sqs" ) type AmazonSQSClient struct { client *sqs.Client } func NewAmazonSQSClient() *AmazonSQSClient { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { panic(err) } return &AmazonSQSClient{ client: sqs.NewFromConfig(cfg), } } func (c *AmazonSQSClient) getQueueURL(ctx context.Context, queueName string) (string, error) { out, err := c.client.GetQueueUrl(ctx, &sqs.GetQueueUrlInput{ QueueName: aws.String(queueName), }) if err != nil { return "", err } return *out.QueueUrl, nil } func (c *AmazonSQSClient) Publish(ctx context.Context, topic string, payload interface{}) error { queueURL, err := c.getQueueURL(ctx, topic) if err != nil { return err } body, err := json.Marshal(payload) if err != nil { return err } _, err = c.client.SendMessage(ctx, &sqs.SendMessageInput{ QueueUrl: aws.String(queueURL), MessageBody: aws.String(string(body)), }) return err } func (c *AmazonSQSClient) Subscribe(topic string, handler func(payload interface{})) error { // 这里实现 SQS 的消息订阅逻辑 return nil } func init() { mqtt.RegisterClient("amazon_sqs", NewAmazonSQSClient()) }