queue

package
v1.4.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 20, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ContextKeyAliMNSMessagePriority        = "gorich_mns_message_priority"
	ContextKeyAliMNSLongPollingWaitSeconds = "gorich_mns_long_polling_wait_seconds"

	AliMNSMessageDefaultPriority = 1
)
View Source
const (
	DefaultIdle       = 10 * time.Second // 即多长时间后未收到 ACK 的消息被认为是 Pending 状态需要被处理
	DefaultGlobalIdle = 30 * time.Second // 消费者会从全局 Pending 获取超过这个时间的消息
)

Variables

View Source
var (
	ErrBaseRedisQueueNameEmpty    = errors.New("standalone-redis queue name is empty")
	ErrClusterRedisQueueNameEmpty = errors.New("cluster-redis queue name is empty")
)
View Source
var (
	ErrStandaloneRedisQueueAddrEmpty          = errors.New("standalone-redis addr is empty")
	ErrStandaloneRedisQueueConsumerGroupEmpty = errors.New("standalone-redis queue consumer group name is empty")
	ErrClusterRedisQueueAddrsEmpty            = errors.New("cluster-redis addrs is empty")
	ErrClusterRedisQueueConsumerGroupEmpty    = errors.New("cluster-redis queue consumer group name is empty")
)
View Source
var (
	ErrTencentCloudQueueServiceTokenEmpty            = errors.New("token for tencentcloud queue service is empty")
	ErrTencentCloudQueueServiceURLEmpty              = errors.New("url for tencentcloud queue service is empty")
	ErrTencentCloudQueueServiceEmptySubscriptionName = errors.New("subscription name for tencentcloud queue service is empty")
	ErrTencentCloudQueueServiceEmptyTopic            = errors.New("topic name for tencentcloud queue service is empty")
)
View Source
var ErrAWSQueueNameEmpty = errors.New("aws queue name is empty")
View Source
var ErrAliMNSQueueNameEmpty = errors.New("mns queue name is empty")
View Source
var ErrQueueNameRequired = errors.New("queue_name is required")

Functions

func GenerateTopicAndSubName

func GenerateTopicAndSubName(topic, subscription string) string

Types

type AWSQueueMessage

type AWSQueueMessage struct {
	// contains filtered or unexported fields
}

func (*AWSQueueMessage) Body

func (message *AWSQueueMessage) Body() string

type AWSQueueService

type AWSQueueService struct {
	// contains filtered or unexported fields
}

func (*AWSQueueService) AckMessage

func (service *AWSQueueService) AckMessage(ctx context.Context, message Message) error

func (*AWSQueueService) Close

func (service *AWSQueueService) Close() error

func (*AWSQueueService) CreateConsumer

func (service *AWSQueueService) CreateConsumer() (Consumer, error)

func (*AWSQueueService) CreateProducer

func (service *AWSQueueService) CreateProducer() (Producer, error)

func (*AWSQueueService) ReceiveMessages

func (service *AWSQueueService) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)

func (*AWSQueueService) SendMessage

func (service *AWSQueueService) SendMessage(ctx context.Context, body string) error

type AliMNSClientOption added in v1.3.7

type AliMNSClientOption struct {
	EndPoint        string `json:"endpoint" yaml:"endpoint"`
	TimeoutSecond   int64  `json:"timeout_second" yaml:"timeout_second"`
	MaxConnsPerHost int    `json:"max_conns_per_host" yaml:"max_conns_per_host"`
	QueueQPS        int32  `json:"queue_qps" yaml:"queue_qps"`

	// MessagePriority is used to set message priority when sending messages.
	// message priority can also be set in ctx parameter when calling Producer's SendMessage method.
	MessagePriority int `json:"message_priority" yaml:"message_priority"`

	// ReceiveMessageLongPollingWaitSeconds is used to set long polling wait seconds.
	// ReceiveMessages will wait at most `ReceiveMessageLongPollingWaitSeconds` seconds before return
	// long polling period can also be set in ctx parameter when calling Consumer's ReceiveMessages method.
	// The priority is:
	// 1. value set when calling ReceiveMessages method
	// 2. value set in option (i.e. here)
	// 3. queue's long polling period configuration
	// If want to disable long polling, do not set the above 3 configurations.
	ReceiveMessageLongPollingWaitSeconds int `json:"receive_message_long_polling_wait_seconds" yaml:"receive_message_long_polling_wait_seconds"`

	CredentialType cloud.AliCloudCredentialType `json:"credential_type" yaml:"credential_type"`

	// required when CredentialType is AliCloudAccessKeyCredentialType, get from env if not provided
	AccessKeyId     string `json:"access_key_id" yaml:"access_key_id"`
	AccessKeySecret string `json:"access_key_secret" yaml:"access_key_secret"`

	// optional when CredentialType is AliCloudECSRamRoleCredentialType
	RoleName string `json:"role_name" yaml:"role_name"`

	// required when CredentialType is AliCloudOIDCRoleARNCredentialType, get from env if not provided
	RoleArn           string `json:"role_arn" yaml:"role_arn"`
	OIDCProviderArn   string `json:"oidc_provider_arn" yaml:"oidc_provider_arn"`
	OIDCTokenFilePath string `json:"oidc_token_file_path" yaml:"oidc_token_file_path"`

	// optional when CredentialType is AliCloudOIDCRoleARNCredentialType
	// RoleSessionName will get from env if not provided
	RoleSessionName       string `json:"role_session_name" yaml:"role_session_name"`
	Policy                string `json:"policy" yaml:"policy"`
	RoleSessionExpiration int    `json:"role_session_expiration" yaml:"role_session_expiration"`
	// contains filtered or unexported fields
}

func (AliMNSClientOption) CheckAWS added in v1.3.7

func (option AliMNSClientOption) CheckAWS() error

func (AliMNSClientOption) CheckAliCloudStorage added in v1.3.7

func (option AliMNSClientOption) CheckAliCloudStorage() error

func (AliMNSClientOption) CheckClusterRedis added in v1.3.7

func (option AliMNSClientOption) CheckClusterRedis() error

func (AliMNSClientOption) CheckStandaloneRedis added in v1.3.7

func (option AliMNSClientOption) CheckStandaloneRedis() error

func (AliMNSClientOption) CheckTencentCloud added in v1.3.7

func (option AliMNSClientOption) CheckTencentCloud() error

func (AliMNSClientOption) GetAssumeRegion added in v1.3.7

func (option AliMNSClientOption) GetAssumeRegion() string

func (AliMNSClientOption) GetAssumeRoleArn added in v1.3.7

func (option AliMNSClientOption) GetAssumeRoleArn() string

func (AliMNSClientOption) GetProvider added in v1.3.7

func (option AliMNSClientOption) GetProvider() cloud.Provider

func (AliMNSClientOption) GetRegion added in v1.3.7

func (option AliMNSClientOption) GetRegion() string

func (AliMNSClientOption) GetSecretID added in v1.3.7

func (option AliMNSClientOption) GetSecretID() string

func (AliMNSClientOption) GetSecretKey added in v1.3.7

func (option AliMNSClientOption) GetSecretKey() string

type AliMNSQueueMessage added in v1.3.7

type AliMNSQueueMessage struct {
	// contains filtered or unexported fields
}

func (*AliMNSQueueMessage) Body added in v1.3.7

func (message *AliMNSQueueMessage) Body() string

type AliMNSQueueService added in v1.3.7

type AliMNSQueueService struct {
	// contains filtered or unexported fields
}

func (*AliMNSQueueService) AckMessage added in v1.3.7

func (service *AliMNSQueueService) AckMessage(ctx context.Context, message Message) error

func (*AliMNSQueueService) Close added in v1.3.7

func (service *AliMNSQueueService) Close() error

func (*AliMNSQueueService) CreateConsumer added in v1.3.7

func (service *AliMNSQueueService) CreateConsumer() (Consumer, error)

func (*AliMNSQueueService) CreateProducer added in v1.3.7

func (service *AliMNSQueueService) CreateProducer() (Producer, error)

func (*AliMNSQueueService) ReceiveMessages added in v1.3.7

func (service *AliMNSQueueService) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)

func (*AliMNSQueueService) SendMessage added in v1.3.7

func (service *AliMNSQueueService) SendMessage(ctx context.Context, body string) error

type BaseRedisQueueConsumer added in v1.2.0

type BaseRedisQueueConsumer struct {
	// contains filtered or unexported fields
}

func (*BaseRedisQueueConsumer) AckMessage added in v1.2.0

func (c *BaseRedisQueueConsumer) AckMessage(ctx context.Context, message Message) error

func (*BaseRedisQueueConsumer) Close added in v1.2.0

func (c *BaseRedisQueueConsumer) Close() error

func (*BaseRedisQueueConsumer) ReceiveMessages added in v1.2.0

func (c *BaseRedisQueueConsumer) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)

ReceiveMessages 获取待消费消息

首先获取当前消费者 Pending 状态超过 X 秒的消息
而后获取全局所有的 Pending 状态超过 Y 秒的消息
最后获取 Stream 队列的消息

type BaseRedisQueueMessage added in v1.2.0

type BaseRedisQueueMessage struct {
	// contains filtered or unexported fields
}

func (*BaseRedisQueueMessage) Body added in v1.2.0

func (message *BaseRedisQueueMessage) Body() string

type BaseRedisQueueService added in v1.2.0

type BaseRedisQueueService struct {
	// contains filtered or unexported fields
}

func (*BaseRedisQueueService) Close added in v1.2.0

func (service *BaseRedisQueueService) Close() error

func (*BaseRedisQueueService) CreateConsumer added in v1.2.0

func (service *BaseRedisQueueService) CreateConsumer() (Consumer, error)

func (*BaseRedisQueueService) CreateProducer added in v1.2.0

func (service *BaseRedisQueueService) CreateProducer() (Producer, error)

func (*BaseRedisQueueService) IfCreateMkStream added in v1.2.0

func (service *BaseRedisQueueService) IfCreateMkStream(ctx context.Context) error

func (*BaseRedisQueueService) SendMessage added in v1.2.0

func (service *BaseRedisQueueService) SendMessage(ctx context.Context, body string) error

type ClusterRedisQueueOption added in v1.2.0

type ClusterRedisQueueOption struct {
	// redis cluster
	Addrs           []string       `json:"addrs" yaml:"addrs"`
	Password        string         `json:"password" yaml:"password"`
	DB              *int           `json:"db" yaml:"db"`
	MaxRetries      *int           `json:"max_retries" yaml:"max_retries"`
	PoolSize        *int           `json:"pool_size" yaml:"pool_size"`
	DialTimeout     *time.Duration `json:"dial_timeout" yaml:"dial_timeout"`
	ReadTimeout     *time.Duration `json:"read_timeout" yaml:"read_timeout"`
	WriteTimeout    *time.Duration `json:"write_timeout" yaml:"write_timeout"`
	MinIdleConns    *int           `json:"min_idle_conns" yaml:"min_idle_conns"`
	MaxIdleConns    *int           `json:"max_idle_conns" yaml:"max_idle_conns"`
	ConnMaxIdleTime *time.Duration `json:"conn_max_idle_time" yaml:"conn_max_idle_time"`
	ConnMaxLifetime *time.Duration `json:"conn_max_lifetime" yaml:"conn_max_lifetime"`

	// queue
	ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"`
	Idle          int    `json:"idle" yaml:"idle"`
	GlobalIdle    int    `json:"global_idle" yaml:"global_idle"`
}

func (ClusterRedisQueueOption) CheckAWS added in v1.2.0

func (option ClusterRedisQueueOption) CheckAWS() error

func (ClusterRedisQueueOption) CheckAliCloudStorage added in v1.3.0

func (option ClusterRedisQueueOption) CheckAliCloudStorage() error

func (ClusterRedisQueueOption) CheckClusterRedis added in v1.2.0

func (option ClusterRedisQueueOption) CheckClusterRedis() error

func (ClusterRedisQueueOption) CheckStandaloneRedis added in v1.2.0

func (option ClusterRedisQueueOption) CheckStandaloneRedis() error

func (ClusterRedisQueueOption) CheckTencentCloud added in v1.2.0

func (option ClusterRedisQueueOption) CheckTencentCloud() error

func (ClusterRedisQueueOption) GetAssumeRegion added in v1.2.0

func (option ClusterRedisQueueOption) GetAssumeRegion() string

func (ClusterRedisQueueOption) GetAssumeRoleArn added in v1.2.0

func (option ClusterRedisQueueOption) GetAssumeRoleArn() string

func (ClusterRedisQueueOption) GetProvider added in v1.2.0

func (option ClusterRedisQueueOption) GetProvider() cloud.Provider

func (ClusterRedisQueueOption) GetRegion added in v1.2.0

func (option ClusterRedisQueueOption) GetRegion() string

func (ClusterRedisQueueOption) GetSecretID added in v1.2.0

func (option ClusterRedisQueueOption) GetSecretID() string

func (ClusterRedisQueueOption) GetSecretKey added in v1.2.0

func (option ClusterRedisQueueOption) GetSecretKey() string

type ClusterRedisQueueOptionV7 added in v1.3.3

type ClusterRedisQueueOptionV7 struct {
	ClusterRedisQueueOption
}

func (ClusterRedisQueueOptionV7) GetProvider added in v1.3.3

func (option ClusterRedisQueueOptionV7) GetProvider() cloud.Provider

type Consumer

type Consumer interface {
	ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)
	AckMessage(ctx context.Context, message Message) error
	Close() error
}

type Message

type Message interface {
	Body() string
}

type Producer

type Producer interface {
	SendMessage(ctx context.Context, body string) error
	Close() error
}

type QueueOption added in v1.4.0

type QueueOption struct {
	Provider          cloud.Provider             `json:"provider" yaml:"provider"`
	QueueName         string                     `json:"queue_name" yaml:"queue_name"`
	SQS               cloud.AWSOption            `json:"sqs" yaml:"sqs"`
	MNS               AliMNSClientOption         `json:"mns" yaml:"mns"`
	StandaloneRedis   StandaloneRedisQueueOption `json:"standalone_redis" yaml:"standalone_redis"`
	ClusterRedis      ClusterRedisQueueOption    `json:"cluster_redis" yaml:"cluster_redis"`
	StandaloneRedisV7 ClusterRedisQueueOptionV7  `json:"standalone_redis_v7" yaml:"standalone_redis_v7"`
	ClusterRedisV7    ClusterRedisQueueOptionV7  `json:"cluster_redis_v7" yaml:"cluster_redis_v7"`
	Pulsar            TencentCloudQueueOption    `json:"pulsar" yaml:"pulsar"`
}

func (QueueOption) Check added in v1.4.0

func (option QueueOption) Check() error

type QueueService

type QueueService interface {
	CreateProducer() (Producer, error)
	CreateConsumer() (Consumer, error)
	Close() error
}

func GetAWSQueueService

func GetAWSQueueService(queueName string, option cloud.Option) (QueueService, error)

GetAWSQueueService is deprecated, use getAWSQueueService instead.

func GetClusterRedisQueueService added in v1.2.0

func GetClusterRedisQueueService(queueName string, option cloud.Option) (QueueService, error)

func GetQueueService

func GetQueueService(queueOrTopicSubName string, option cloud.Option) (QueueService, error)

GetQueueService is deprecated, use GetQueueServiceWithOption instead.

func GetQueueServiceWithOption added in v1.4.0

func GetQueueServiceWithOption(option QueueOption) (QueueService, error)

func GetStandaloneRedisQueueService added in v1.2.0

func GetStandaloneRedisQueueService(queueName string, option cloud.Option) (QueueService, error)

func GetTencentCloudQueueService

func GetTencentCloudQueueService(topicSubName string, option cloud.Option) (QueueService, error)

type RedisQueueConsumerV7 added in v1.3.3

type RedisQueueConsumerV7 struct {
	// contains filtered or unexported fields
}

func (*RedisQueueConsumerV7) AckMessage added in v1.3.3

func (c *RedisQueueConsumerV7) AckMessage(ctx context.Context, message Message) error

func (*RedisQueueConsumerV7) Close added in v1.3.3

func (c *RedisQueueConsumerV7) Close() error

func (*RedisQueueConsumerV7) ReceiveMessages added in v1.3.3

func (c *RedisQueueConsumerV7) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)

ReceiveMessages 获取待消费消息

首先获取当前消费者 Pending 状态超过 X 秒的消息
而后获取全局所有的 Pending 状态超过 Y 秒的消息
最后获取 Stream 队列的消息

type RedisQueueMessageV7 added in v1.3.3

type RedisQueueMessageV7 struct {
	// contains filtered or unexported fields
}

func (*RedisQueueMessageV7) Body added in v1.3.3

func (message *RedisQueueMessageV7) Body() string

type RedisQueueServiceV7 added in v1.3.3

type RedisQueueServiceV7 struct {
	// contains filtered or unexported fields
}

func (*RedisQueueServiceV7) Close added in v1.3.3

func (service *RedisQueueServiceV7) Close() error

func (*RedisQueueServiceV7) CreateConsumer added in v1.3.3

func (service *RedisQueueServiceV7) CreateConsumer() (Consumer, error)

func (*RedisQueueServiceV7) CreateProducer added in v1.3.3

func (service *RedisQueueServiceV7) CreateProducer() (Producer, error)

func (*RedisQueueServiceV7) IfCreateMkStream added in v1.3.3

func (service *RedisQueueServiceV7) IfCreateMkStream(ctx context.Context) error

func (*RedisQueueServiceV7) SendMessage added in v1.3.3

func (service *RedisQueueServiceV7) SendMessage(ctx context.Context, body string) error

type StandaloneRedisQueueOption added in v1.2.0

type StandaloneRedisQueueOption struct {
	Addr         string         `json:"addr" yaml:"addr"`
	Password     string         `json:"password" yaml:"password"`
	DB           *int           `json:"db" yaml:"db"`
	MaxRetries   *int           `json:"max_retries" yaml:"max_retries"`
	PoolSize     *int           `json:"pool_size" yaml:"pool_size"`
	DialTimeout  *time.Duration `json:"dial_timeout" yaml:"dial_timeout"`
	ReadTimeout  *time.Duration `json:"read_timeout" yaml:"read_timeout"`
	WriteTimeout *time.Duration `json:"write_timeout" yaml:"write_timeout"`
	MinIdleConns *int           `json:"min_idle_conns" yaml:"min_idle_conns"`

	// queue
	ConsumerGroup string `json:"consumer_group" yaml:"consumer_group"`
	Idle          int    `json:"idle" yaml:"idle"`
	GlobalIdle    int    `json:"global_idle" yaml:"global_idle"`
}

func (StandaloneRedisQueueOption) CheckAWS added in v1.2.0

func (option StandaloneRedisQueueOption) CheckAWS() error

func (StandaloneRedisQueueOption) CheckAliCloudStorage added in v1.3.0

func (option StandaloneRedisQueueOption) CheckAliCloudStorage() error

func (StandaloneRedisQueueOption) CheckClusterRedis added in v1.2.0

func (option StandaloneRedisQueueOption) CheckClusterRedis() error

func (StandaloneRedisQueueOption) CheckStandaloneRedis added in v1.2.0

func (option StandaloneRedisQueueOption) CheckStandaloneRedis() error

func (StandaloneRedisQueueOption) CheckTencentCloud added in v1.2.0

func (option StandaloneRedisQueueOption) CheckTencentCloud() error

func (StandaloneRedisQueueOption) GetAssumeRegion added in v1.2.0

func (option StandaloneRedisQueueOption) GetAssumeRegion() string

func (StandaloneRedisQueueOption) GetAssumeRoleArn added in v1.2.0

func (option StandaloneRedisQueueOption) GetAssumeRoleArn() string

func (StandaloneRedisQueueOption) GetProvider added in v1.2.0

func (option StandaloneRedisQueueOption) GetProvider() cloud.Provider

func (StandaloneRedisQueueOption) GetRegion added in v1.2.0

func (option StandaloneRedisQueueOption) GetRegion() string

func (StandaloneRedisQueueOption) GetSecretID added in v1.2.0

func (option StandaloneRedisQueueOption) GetSecretID() string

func (StandaloneRedisQueueOption) GetSecretKey added in v1.2.0

func (option StandaloneRedisQueueOption) GetSecretKey() string

type StandaloneRedisQueueOptionV7 added in v1.3.3

type StandaloneRedisQueueOptionV7 struct {
	StandaloneRedisQueueOption
}

func (StandaloneRedisQueueOptionV7) GetProvider added in v1.3.3

func (option StandaloneRedisQueueOptionV7) GetProvider() cloud.Provider

type TencentCloudQueueConsumer

type TencentCloudQueueConsumer struct {
	// contains filtered or unexported fields
}

func (*TencentCloudQueueConsumer) AckMessage

func (consumer *TencentCloudQueueConsumer) AckMessage(ctx context.Context, message Message) error

func (*TencentCloudQueueConsumer) Close

func (consumer *TencentCloudQueueConsumer) Close() error

func (*TencentCloudQueueConsumer) ReceiveMessages

func (consumer *TencentCloudQueueConsumer) ReceiveMessages(ctx context.Context, maxCount int) ([]Message, error)

type TencentCloudQueueMessage

type TencentCloudQueueMessage struct {
	// contains filtered or unexported fields
}

func (*TencentCloudQueueMessage) Body

func (message *TencentCloudQueueMessage) Body() string

type TencentCloudQueueOption

type TencentCloudQueueOption struct {
	Token string `json:"token" yaml:"token"`
	URL   string `json:"url" yaml:"url"`
}

func (TencentCloudQueueOption) CheckAWS

func (option TencentCloudQueueOption) CheckAWS() error

func (TencentCloudQueueOption) CheckAliCloudStorage added in v1.3.0

func (option TencentCloudQueueOption) CheckAliCloudStorage() error

func (TencentCloudQueueOption) CheckClusterRedis added in v1.2.0

func (option TencentCloudQueueOption) CheckClusterRedis() error

func (TencentCloudQueueOption) CheckStandaloneRedis added in v1.2.0

func (option TencentCloudQueueOption) CheckStandaloneRedis() error

func (TencentCloudQueueOption) CheckTencentCloud

func (option TencentCloudQueueOption) CheckTencentCloud() error

func (TencentCloudQueueOption) GetAssumeRegion added in v1.1.3

func (option TencentCloudQueueOption) GetAssumeRegion() string

func (TencentCloudQueueOption) GetAssumeRoleArn added in v1.1.3

func (option TencentCloudQueueOption) GetAssumeRoleArn() string

func (TencentCloudQueueOption) GetProvider

func (option TencentCloudQueueOption) GetProvider() cloud.Provider

func (TencentCloudQueueOption) GetRegion

func (option TencentCloudQueueOption) GetRegion() string

func (TencentCloudQueueOption) GetSecretID

func (option TencentCloudQueueOption) GetSecretID() string

func (TencentCloudQueueOption) GetSecretKey

func (option TencentCloudQueueOption) GetSecretKey() string

type TencentCloudQueueProducer

type TencentCloudQueueProducer struct {
	// contains filtered or unexported fields
}

func (*TencentCloudQueueProducer) Close

func (producer *TencentCloudQueueProducer) Close() error

func (*TencentCloudQueueProducer) SendMessage

func (producer *TencentCloudQueueProducer) SendMessage(ctx context.Context, body string) error

type TencentCloudQueueService

type TencentCloudQueueService struct {
	// contains filtered or unexported fields
}

func (*TencentCloudQueueService) Close

func (service *TencentCloudQueueService) Close() error

func (*TencentCloudQueueService) CreateConsumer

func (service *TencentCloudQueueService) CreateConsumer() (Consumer, error)

func (*TencentCloudQueueService) CreateProducer

func (service *TencentCloudQueueService) CreateProducer() (Producer, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL