alimns

package module
v2.1.7 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 1, 2020 License: MIT Imports: 33 Imported by: 0

README

aliyun-mns

aliyun-mns是对阿里云消息服务的封装

队列模型

具有以下特点:

  • 动态创建队列
  • 可以设置消费者数目
  • 消息处理时长自适应
  • 发送消息重试。目前基于网络错误、阿里云MNS错误码表InternalError重试
  • 监控报警
  • 优雅的关闭消费者
  • 处理函数处理最大时间限制
  • 队列消费者使用协程池,每一个消息队列独占自己的协程池
  • 发送消息失败保存进入redis,尝试重新发送,提高发送成功率
  • 业务需要自己做消息幂等,有可能出现同样消息内容发送多次,这种情况非常罕见
消费者
package main

import (
	"context"
	"github.com/go-redis/redis/v7"
	"github.com/xiaojiaoyu100/aliyun-mns/v2"
)

type Builder struct {
}

func (b *Builder) Handle(ctx context.Context) error {
	//return alimns.BackoffError{
	//	Err:  err,
	//	N:  30,
	//}
	//return err 
	//return nil  
}

func Before(m *alimns.M) (context.Context, error) {
	return context.TODO(), nil
}

func After(ctx context.Context) {
}

func main() {
	option := &redis.Options{
		Addr: "127.0.0.1:6379",
		DB:   0,
	}

	redisClient := redis.NewClient(option)
	client, err := alimns.NewClient(alimns.Config{
		Cmdable:         redisClient,
		Endpoint:        "",
		QueuePrefix:     "", // 可以留空,表示拉取全部消息队列
		AccessKeyID:     "",
		AccessKeySecret: "",
	})
	if err != nil {
		return
	}

	client.SetBefore(Before)
	client.SetAfter(After)

	consumer := alimns.NewConsumer(client)
	err = consumer.AddQueue(
		&alimns.Queue{
			Name:     "QueueTest1",
			Builder:  &Builder{},
		},
	)
	if err != nil {
		return
	}
	consumer.Run()
}
生产者
producer := alimns.NewProducer(client)
producer.SendBase64EncodedJSONMessage()

主题模型

支持以下主题api:

  • 支持主题的创建,删除
  • 支持订阅主题,取消主题订阅
  • 支持向主题发布消息
创建/订阅主题
endpoint := QueueEndPoint{
	AccountID: "xxx",
	Region:    "xxx",
	QueueName: "xxx",
}

// 创建
err := client.CreateTopic("topicName")
if err != nil {
	return
}

// 订阅
err = client.Subscribe("topicName", "subscriptionName", endpoint)
if err != nil {
	return
}

发布消息
messageID, err := client.PublishMessage("topicName", "hello world")
if err != nil {
	return
}

Documentation

Index

Constants

View Source
const (
	// BackOffRetryStrategy 重试3次,每次重试的间隔时间是10秒到 20秒之间的随机值
	BackOffRetryStrategy = "BACKOFF_RETRY"
	// ExponentialDecayRetryStrategy 重试176次,每次重试的间隔时间指数递增至512秒,总计重试时间为1天;每次重试的具体间隔为:1,2,4,8,16,32,64,128,256,512,512 ... 512 秒(共167个512)
	ExponentialDecayRetryStrategy = "EXPONENTIAL_DECAY_RETRY"
	// XMLNotifyFormat 消息体为XML格式,包含消息正文和消息属性
	XMLNotifyFormat = "XML"
	// JSONNotifyFormat 消息体为JSON格式,包含消息正文和消息属性
	JSONNotifyFormat = "JSON"
	// SimplifiedNotifyFormat 消息体即用户发布的消息,不包含任何属性信息
	SimplifiedNotifyFormat = "SIMPLIFIED"
)

Variables

This section is empty.

Functions

func Base64Md5

func Base64Md5(s string) (string, error)

Base64Md5 md5值用base64编码

func DefaultHook added in v2.0.6

func DefaultHook(callback func(entryJson string, fieldsJson string)) func(entry Entry) error

DefaultHook 提供一个默认钩子

func HandleErrFrom added in v2.1.3

func HandleErrFrom(ctx context.Context) error

HandleErrFrom 返回handle的結果

func Hooks added in v2.0.6

func Hooks(hooks ...func(entry Entry) error) zap.Option

Hooks 回调钩子(warn级别以上回调)

func IsBase64

func IsBase64(s string) bool

IsBase64 返回字符串是否是base64编码

func IsCreateQueueConflict

func IsCreateQueueConflict(err error) bool

IsCreateQueueConflict 是否是createQueueConflictError

func IsCreateQueueNoContent

func IsCreateQueueNoContent(err error) bool

IsCreateQueueNoContent 是否是createQueueNoContentError

func IsHandleCrash

func IsHandleCrash(err error) bool

IsHandleCrash 是否是处理函数崩溃错误

func IsInternalError

func IsInternalError(err error) bool

IsInternalError 是否内部错误

func IsMessageBodyLimit

func IsMessageBodyLimit(err error) bool

IsMessageBodyLimit 是否超出范围

func IsMessageDelaySecondsOutOfRange

func IsMessageDelaySecondsOutOfRange(err error) bool

IsMessageDelaySecondsOutOfRange 延时时长是否合理

func IsSendMessageTimeout

func IsSendMessageTimeout(err error) bool

IsSendMessageTimeout 是否发送消息超时

func IsUnknown

func IsUnknown(err error) bool

IsUnknown 是否是未知错误

func IsVisibilityTimeout

func IsVisibilityTimeout(err error) bool

IsVisibilityTimeout 是否是可见时间不在范围

func Md5

func Md5(s string) ([]byte, error)

Md5 md5

func NewLogger added in v2.0.6

func NewLogger() (logger *zap.Logger, err error)

NewLogger 创建一个logger

func TimestampInMs

func TimestampInMs() int64

TimestampInMs 毫秒时间戳

Types

type After added in v2.1.1

type After func(ctx context.Context)

After 处理函数善后处理

type Backoff

type Backoff interface {
	Backoff() int
}

Backoff backoff

type BackoffError

type BackoffError struct {
	Err error
	// visibility timeout
	N int
}

BackoffError represents the backoff error.

func (BackoffError) Backoff

func (err BackoffError) Backoff() int

Backoff returns the message visibility time.

func (BackoffError) Error

func (err BackoffError) Error() string

Error implements the error interface.

func (BackoffError) Unwrap

func (err BackoffError) Unwrap() error

Unwrap get the underlying error.

type BatchPeekMessageResponse

type BatchPeekMessageResponse struct {
	XMLName      xml.Name       `xml:"Messages"`
	XMLNs        string         `xml:"xmlns,attr"`
	PeekMessages []*PeekMessage `xml:"Message"`
}

BatchPeekMessageResponse 批量查看消息

type BatchReceiveMessageResponse

type BatchReceiveMessageResponse struct {
	XMLName         xml.Name          `xml:"Messages"`
	XMLNs           string            `xml:"xmlns,attr"`
	ReceiveMessages []*ReceiveMessage `xml:"Message"`
}

BatchReceiveMessageResponse 批量消費消息

type BatchSendMessageResponse

type BatchSendMessageResponse struct {
	XMLName      xml.Name       `xml:"Messages"`
	XMLNs        string         `xml:"xmlns,attr"`
	SendMessages []*SendMessage `xml:"Message"`
}

BatchSendMessageResponse 批量发送消息回复

type Before added in v2.1.1

type Before func(m *M) (context.Context, error)

Before 生成一个context

type Builder

type Builder interface {
	Handle(ctx context.Context) error
}

Builder 构造者

type ChangeVisibilityTimeoutResponse

type ChangeVisibilityTimeoutResponse struct {
	XMLName         xml.Name `xml:"ChangeVisibility"`
	XMLNs           string   `xml:"xmlns,attr"`
	ReceiptHandle   string   `xml:"ReceiptHandle"`
	NextVisibleTime int64    `xml:"NextVisibleTime"`
}

ChangeVisibilityTimeoutResponse 修改消息可见时长回复

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client 存储了阿里云的相关信息

func NewClient

func NewClient(config Config) (*Client, error)

NewClient 返回Client的实例

func (*Client) AddLogHook

func (c *Client) AddLogHook(f func(entry Entry) error)

AddLogHook add a log reporter.

func (*Client) BatchPeekMessage

func (c *Client) BatchPeekMessage(name string) (*BatchPeekMessageResponse, error)

BatchPeekMessage 批量查看消息

func (*Client) BatchReceiveMessage

func (c *Client) BatchReceiveMessage(name string, setters ...ReceiveMessageParamSetter) (*BatchReceiveMessageResponse, error)

BatchReceiveMessage 批量消费消息

func (*Client) BatchSendMessage

func (c *Client) BatchSendMessage(name string, messageList ...*Message) (*BatchSendMessageResponse, error)

BatchSendMessage 批量发送消息

func (*Client) ChangeVisibilityTimeout

func (c *Client) ChangeVisibilityTimeout(
	name,
	receiptHandle string,
	visibilityTimeout int) (*ChangeVisibilityTimeoutResponse, error)

ChangeVisibilityTimeout 修改消息可见时长

func (*Client) CreateQueue

func (c *Client) CreateQueue(name string, setters ...QueueAttributeSetter) (string, error)

CreateQueue 创建一个消息队列

func (*Client) CreateTopic

func (c *Client) CreateTopic(name string, setters ...TopicAttributeSetter) error

CreateTopic 创建一个主题

func (*Client) DeleteMessage

func (c *Client) DeleteMessage(name, receiptHandle string) error

DeleteMessage 删除消息

func (*Client) DeleteQueue

func (c *Client) DeleteQueue(name string) error

DeleteQueue 删除队列

func (*Client) DeleteTopic

func (c *Client) DeleteTopic(name string) error

DeleteTopic 删除主题

func (*Client) EnableDebug

func (c *Client) EnableDebug()

EnableDebug enables debug info.

func (*Client) GetQueueAttributes

func (c *Client) GetQueueAttributes(name string) (*QueueAttributeResponse, error)

GetQueueAttributes 获取消息队列属性

func (*Client) ListQueue

func (c *Client) ListQueue(request *ListQueueRequest) (*ListQueueResponse, error)

ListQueue 请求队列列表

func (*Client) PeekMessage

func (c *Client) PeekMessage(name string) (*PeekMessageResponse, error)

PeekMessage 查看消息

func (*Client) PublishMessage

func (c *Client) PublishMessage(topic, message string, setters ...PublishMessageParamSetter) (string, error)

PublishMessage 向指定的主题发布消息,消息发布到主题后随即会被推送给Endpoint消费

func (*Client) QueueMetaOverride

func (c *Client) QueueMetaOverride(name string, setters ...QueueAttributeSetter) error

QueueMetaOverride 修改队列属性

func (*Client) ReceiveMessage

func (c *Client) ReceiveMessage(name string, setters ...ReceiveMessageParamSetter) (*ReceiveMessageResponse, error)

ReceiveMessage 接收消息

func (*Client) SendBase64EncodedJSONMessage

func (c *Client) SendBase64EncodedJSONMessage(name string, messageBody interface{},
	setters ...MessageSetter) (*SendMessageResponse, error)

SendBase64EncodedJSONMessage 发送base64编码的json消息

func (*Client) SetAfter added in v2.1.2

func (c *Client) SetAfter(after After)

SetAfter 消息队里善后处理函数

func (*Client) SetBefore added in v2.1.2

func (c *Client) SetBefore(before Before)

SetBefore 设置环境

func (*Client) SetQueuePrefix

func (c *Client) SetQueuePrefix(prefix string)

SetQueuePrefix sets the query param for ListQueue.

func (*Client) SetTopicAttributes

func (c *Client) SetTopicAttributes(topic string, setters ...TopicAttributeSetter) error

SetTopicAttributes 修改主题的属性

func (*Client) Subscribe

func (c *Client) Subscribe(topic, subscription string, endpoint EndPointer, setters ...SubscribeParamSetter) error

Subscribe 订阅主题

func (*Client) Unsubscribe

func (c *Client) Unsubscribe(topic, subscription string) error

Unsubscribe 取消订阅主题

type Cmdable

type Cmdable interface {
	Pipeline() redis.Pipeliner
	RPush(key string, values ...interface{}) *redis.IntCmd
	LRem(key string, count int64, value interface{}) *redis.IntCmd
	SetNX(key string, value interface{}, expiration time.Duration) *redis.BoolCmd
	Eval(script string, keys []string, args ...interface{}) *redis.Cmd
	Expire(key string, expiration time.Duration) *redis.BoolCmd
}

Cmdable 封装了redis相关操作

type Codec

type Codec interface {
	Encode(interface{}) ([]byte, error)
	Decode([]byte, interface{}) error
}

Codec 编解码

type Config

type Config struct {
	Cmdable
	Endpoint        string
	QueuePrefix     string
	AccessKeyID     string
	AccessKeySecret string
}

Config 配置

type Consumer

type Consumer struct {
	*Client
	// contains filtered or unexported fields
}

Consumer 消费者

func NewConsumer

func NewConsumer(client *Client) *Consumer

NewConsumer 生成了一个消费者

func (*Consumer) AddQueue

func (c *Consumer) AddQueue(q *Queue) error

AddQueue 添加一个消息队列

func (*Consumer) BatchListQueue

func (c *Consumer) BatchListQueue() error

BatchListQueue 批量请求队列

func (*Consumer) ConsumeQueueMessage

func (c *Consumer) ConsumeQueueMessage(queue *Queue)

ConsumeQueueMessage 消费消息

func (*Consumer) CreateQueueList

func (c *Consumer) CreateQueueList(fetchQueueReady chan struct{}) chan struct{}

CreateQueueList 创建消息队列

func (*Consumer) LongPollQueueMessage

func (c *Consumer) LongPollQueueMessage(queue *Queue)

LongPollQueueMessage 长轮询消息

func (*Consumer) OnReceive

func (c *Consumer) OnReceive(queue *Queue, receiveMsg *ReceiveMessage)

OnReceive 消息队列处理函数

func (*Consumer) PeriodicallyFetchQueues

func (c *Consumer) PeriodicallyFetchQueues() chan struct{}

PeriodicallyFetchQueues 周期性拉取消息队列与内存的消息队列做比较

func (*Consumer) PopCount

func (c *Consumer) PopCount() int32

PopCount means the current number of running handlers.

func (*Consumer) Run

func (c *Consumer) Run()

Run 入口函数

func (*Consumer) Schedule

func (c *Consumer) Schedule(createQueueReady chan struct{})

Schedule 使消息队列开始运作起来

type EndPointer

type EndPointer interface {
	EndPoint() (string, error)
}

EndPointer 是用户订阅主题时,指定接收消息的终端地址; 当有消息发布到主题时,MNS会主动将消息推送到对应的 Endpoint; 多个Subscription可以指定同一个Endpoint。

type Entry added in v2.0.6

type Entry struct {
	Level      zapcore.Level `json:"-"`
	LevelStr   string        `json:"level"`
	LoggerName string        `json:"logger_name"`
	Message    string        `json:"message"`
	Caller     string        `json:"caller"`
	Stack      string        `json:"stack"`
	Fields     []zap.Field   `json:"-"` // 不包含error字段
	FieldsJSON string        `json:"-"`
	Err        error         `json:"-"`
	ErrStr     string        `json:"error"`
}

Entry 在zapcore.Entry基础上增加 error & []zap.Fields

func NewEntryFromZapEntry added in v2.0.6

func NewEntryFromZapEntry(ent zapcore.Entry) Entry

NewEntryFromZapEntry zapcore.Entry转换到Entry

type HTTPEndPoint

type HTTPEndPoint string

HTTPEndPoint HTTP格式的Endpoint

func (HTTPEndPoint) EndPoint

func (ep HTTPEndPoint) EndPoint() (string, error)

EndPoint 获取EndPoint的字符串

type Handle

type Handle func(ctx context.Context) error

Handle 消息处理函数模板

type JSONCodec

type JSONCodec struct {
}

JSONCodec json编解码

func (JSONCodec) Decode

func (codec JSONCodec) Decode(data []byte, value interface{}) error

Decode 解码

func (JSONCodec) Encode

func (codec JSONCodec) Encode(value interface{}) ([]byte, error)

Encode 编码

type ListQueueRequest

type ListQueueRequest struct {
	Marker    string
	RetNumber string
	Prefix    string
}

ListQueueRequest 获取队列列表参数

type ListQueueResponse

type ListQueueResponse struct {
	XMLName    xml.Name     `xml:"Queues"`
	XMLNs      string       `xml:"xmlns,attr"`
	Queues     []*QueueData `xml:"Queue"`
	NextMarker string       `xml:"NextMarker"`
}

ListQueueResponse 获取队列的回复

type M

type M struct {
	QueueName     string // 队列名
	MessageBody   string // 消息体
	EnqueueTime   int64  // 入队时间
	ReceiptHandle string // 消息句柄
	// contains filtered or unexported fields
}

M 消息内容,去掉其它字段是为了不要依赖消息其它字段,应该依赖数据库字段

func MFrom

func MFrom(ctx context.Context) (*M, error)

MFrom 拿出message

func (*M) Decode

func (m *M) Decode(v interface{}) error

Decode 解析消息

type Message

type Message struct {
	XMLName      xml.Name `xml:"Message"`
	XMLNs        string   `xml:"xmlns,attr"`
	MessageBody  string   `xml:"MessageBody,omitempty"` // 消息正文
	DelaySeconds int      `xml:"DelaySeconds,omitempty"`
	Priority     int      `xml:"Priority,omitempty"`
}

Message 代表了阿里云消息

func DefaultMessage

func DefaultMessage() Message

DefaultMessage 给出了默认的消息

type MessageSetter

type MessageSetter func(msg *Message) error

MessageSetter 设置消息的函数

func WithMessageDelaySeconds

func WithMessageDelaySeconds(delay int) MessageSetter

WithMessageDelaySeconds 设置消息延时

func WithMessagePriority

func WithMessagePriority(priority int) MessageSetter

WithMessagePriority 设置消息优先级

type MnsError

type MnsError string

MnsError 错误码

func (MnsError) Error

func (e MnsError) Error() string

type ModifiedAttribute

type ModifiedAttribute struct {
	XMLName                xml.Name `xml:"Queue"`
	DelaySeconds           *int     `xml:"DelaySeconds,omitempty"`
	MaximumMessageSize     int      `xml:"MaximumMessageSize,omitempty"`
	MessageRetentionPeriod int      `xml:"MessageRetentionPeriod,omitempty"`
	VisibilityTimeout      int      `xml:"VisibilityTimeout,omitempty"`
	PollingWaitSeconds     *int     `xml:"PollingWaitSeconds,omitempty"`
	LoggingEnabled         *bool    `xml:"LoggingEnabled,omitempty"`
}

ModifiedAttribute 修改消息属性

func DefaultQueueAttri

func DefaultQueueAttri() ModifiedAttribute

DefaultQueueAttri 返回默认的修改消息隊列的參數

type PeekMessage

type PeekMessage struct {
	MessageID        string `xml:"MessageId"`
	MessageBody      string `xml:"MessageBody"`
	MessageBodyMD5   string `xml:"MessageBodyMD5"`
	EnqueueTime      int64  `xml:"EnqueueTime"`
	FirstDequeueTime int64  `xml:"FirstDequeueTime"`
	DequeueCount     int    `xml:"DequeueCount"`
	Priority         int    `xml:"Priority"`
}

PeekMessage 查看消息

type PeekMessageResponse

type PeekMessageResponse struct {
	PeekMessage
}

PeekMessageResponse 查看消息回复

type Producer

type Producer struct {
	*Client
}

Producer 生产者

func NewProducer

func NewProducer(client *Client) *Producer

NewProducer 产生生产者

type PublishMessageParam

type PublishMessageParam struct {
	XMLName           xml.Name `xml:"Message"`
	MessageBody       string   `xml:"MessageBody"`
	MessageTag        string   `xml:"MessageTag,omitempty"`
	MessageAttributes string   `xml:"MessageAttributes,omitempty"`
}

PublishMessageParam 发布消息的参数

type PublishMessageParamSetter

type PublishMessageParamSetter func(attri *PublishMessageParam) error

PublishMessageParamSetter 发布消息的参数设置函数模板

func WithMessageTag

func WithMessageTag(filterTag string) PublishMessageParamSetter

WithMessageTag 设置消息标签(用于消息过滤)

type Queue

type Queue struct {
	Name             string
	Parallel         int
	AttributeSetters []QueueAttributeSetter
	Builder
	PullWait bool // 等消息消費完再去拉取消息
	// contains filtered or unexported fields
}

Queue 消息队列

func (*Queue) Stop

func (q *Queue) Stop()

Stop 使消息队列拉取消息和消费消息停止

type QueueAttributeResponse

type QueueAttributeResponse struct {
	XMLName                xml.Name `xml:"Queue"`
	QueueName              string   `xml:"QueueName"`
	CreateTime             string   `xml:"CreateTime"`
	LastModifyTime         string   `xml:"LastModifyTime"`
	DelaySeconds           int      `xml:"DelaySeconds"`
	MaximumMessageSize     int      `xml:"MaximumMessageSize"`
	MessageRetentionPeriod int      `xml:"MessageRetentionPeriod"`
	VisibilityTimeout      int      `xml:"VisibilityTimeout"`
	PollingWaitSeconds     int      `xml:"PollingWaitSeconds"`
	Activemessages         string   `xml:"Activemessages"`
	InactiveMessages       string   `xml:"InactiveMessages"`
	DelayMessages          string   `xml:"DelayMessages"`
	LoggingEnabled         bool     `xml:"LoggingEnabled"`
}

QueueAttributeResponse 消息队列属性回复

type QueueAttributeSetter

type QueueAttributeSetter func(attri *ModifiedAttribute) error

QueueAttributeSetter 消息属性设置函数模板

func WithDelaySeconds

func WithDelaySeconds(s int) QueueAttributeSetter

WithDelaySeconds 设置延时时间

func WithLoggingEnabled

func WithLoggingEnabled(flag bool) QueueAttributeSetter

WithLoggingEnabled 设置日志开启

func WithMaximumMessageSize

func WithMaximumMessageSize(size int) QueueAttributeSetter

WithMaximumMessageSize 设置消息体长度

func WithMessageRetentionPeriod

func WithMessageRetentionPeriod(s int) QueueAttributeSetter

WithMessageRetentionPeriod 设置最长存活时间

func WithPollingWaitSeconds

func WithPollingWaitSeconds(s int) QueueAttributeSetter

WithPollingWaitSeconds 设置长轮询时间

func WithVisibilityTimeout

func WithVisibilityTimeout(s int) QueueAttributeSetter

WithVisibilityTimeout 设置可见时间

type QueueData

type QueueData struct {
	QueueURL string `xml:"QueueURL"`
}

QueueData 队列的属性

type QueueEndPoint

type QueueEndPoint struct {
	AccountID string
	Region    string
	QueueName string
}

QueueEndPoint 队列格式的endpoint

func (QueueEndPoint) EndPoint

func (ep QueueEndPoint) EndPoint() (string, error)

EndPoint 获取EndPoint的字符串

type ReceiveMessage

type ReceiveMessage struct {
	XMLName          xml.Name `xml:"Message"`
	XMLNs            string   `xml:"xmlns,attr"`
	MessageID        string   `xml:"MessageId"`
	ReceiptHandle    string   `xml:"ReceiptHandle"`
	MessageBody      string   `xml:"MessageBody"`
	MessageBodyMD5   string   `xml:"MessageBodyMD5"`
	EnqueueTime      int64    `xml:"EnqueueTime"`
	NextVisibleTime  int64    `xml:"NextVisibleTime"`
	FirstDequeueTime int64    `xml:"FirstDequeueTime"`
	DequeueCount     int      `xml:"DequeueCount"`
	Priority         int      `xml:"Priority"`
}

ReceiveMessage 收到消息

type ReceiveMessageParam

type ReceiveMessageParam struct {
	WaitSeconds   *int `url:"waitseconds,omitempty"`
	NumOfMessages int  `url:"numOfMessages,omitempty"`
}

ReceiveMessageParam 收到消息请求

func DefaultBatchReceiveMessage

func DefaultBatchReceiveMessage() ReceiveMessageParam

DefaultBatchReceiveMessage 返回默认的批量消费消息的参数

func DefaultReceiveMessage

func DefaultReceiveMessage() ReceiveMessageParam

DefaultReceiveMessage 默认的收到消息请求参数

type ReceiveMessageParamSetter

type ReceiveMessageParamSetter func(*ReceiveMessageParam) error

ReceiveMessageParamSetter 收到消息请求参数设置函数

func WithReceiveMessageNumOfMessages

func WithReceiveMessageNumOfMessages(num int) ReceiveMessageParamSetter

WithReceiveMessageNumOfMessages 设置请求消息数量

func WithReceiveMessageWaitSeconds

func WithReceiveMessageWaitSeconds(s int) ReceiveMessageParamSetter

WithReceiveMessageWaitSeconds 设置收到消息的long poll等待时长

type ReceiveMessageResponse

type ReceiveMessageResponse struct {
	ReceiveMessage
}

ReceiveMessageResponse 收到消息回复

type RespErr

type RespErr struct {
	XMLName   xml.Name `xml:"Error"`
	XMLNs     string   `xml:"xmlns,attr"`
	Code      string   `xml:"Code"`
	Message   string   `xml:"Message"`
	RequestID string   `xml:"RequestId"`
	HostID    string   `xml:"HostId"`
}

RespErr 阿里云回复错误

type SendMessage

type SendMessage struct {
	XMLName        xml.Name `xml:"Message"`
	XMLNs          string   `xml:"xmlns,attr"`
	ErrorCode      string   `xml:"ErrorCode"`
	ErrorMessage   string   `xml:"ErrorMessage"`
	MessageID      string   `xml:"MessageId"`
	MessageBodyMD5 string   `xml:"MessageBodyMD5"`
	ReceiptHandle  string   `xml:"ReceiptHandle"` // 发送延时消息才有返回
}

SendMessage 发送消息

type SendMessageResponse

type SendMessageResponse struct {
	SendMessage
}

SendMessageResponse 发送消息回复

type SubscribeParam

type SubscribeParam struct {
	XMLName             xml.Name `xml:"Subscription"`
	EndPoint            string   `xml:"Endpoint"`
	FilterTag           string   `xml:"FilterTag,omitempty"`
	NotifyStrategy      string   `xml:"NotifyStrategy"`
	NotifyContentFormat string   `xml:"NotifyContentFormat"`
}

SubscribeParam 订阅主题需要的参数

type SubscribeParamSetter

type SubscribeParamSetter func(attri *SubscribeParam) error

SubscribeParamSetter 订阅主题消息属性设置函数模板

func WithFilterTag

func WithFilterTag(filterTag string) SubscribeParamSetter

WithFilterTag 设置过滤标签

func WithNotifyContentFormat

func WithNotifyContentFormat(s string) SubscribeParamSetter

WithNotifyContentFormat 设置最长存活时间

func WithNotifyStrategy

func WithNotifyStrategy(ns string) SubscribeParamSetter

WithNotifyStrategy 设置推送消息出现错误时的重试策略

type TopicAttribute

type TopicAttribute struct {
	XMLName            xml.Name `xml:"Topic"`
	MaximumMessageSize int      `xml:"MaximumMessageSize"`
	LoggingEnabled     bool     `xml:"LoggingEnabled"`
}

TopicAttribute 修改主题属性

func DefaultTopicAttr

func DefaultTopicAttr() TopicAttribute

DefaultTopicAttr 返回默认的主题属性

type TopicAttributeSetter

type TopicAttributeSetter func(attri *TopicAttribute) error

TopicAttributeSetter 主题属性设置函数模板

func TopicWithLoggingEnabled

func TopicWithLoggingEnabled(flag bool) TopicAttributeSetter

TopicWithLoggingEnabled 设置日志开启

func TopicWithMaximumMessageSize

func TopicWithMaximumMessageSize(size int) TopicAttributeSetter

TopicWithMaximumMessageSize 设置消息体长度

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL