Documentation
¶
Index ¶
- Constants
- Variables
- func ContextWithQueueService(ctx context.Context) context.Context
- func RegisterBackendImplementor(backend Backend)
- func RegisterIdempotentImplementor(idempotent Idempotent)
- func RegisterImplementor(s Queue)
- func StreamServerInterceptor() grpc.StreamServerInterceptor
- func UnaryServerInterceptor() grpc.UnaryServerInterceptor
- type Backend
- type Consumer
- type ConsumerFunc
- type Idempotent
- type Message
- type MessageOperation
- type MessageWrapper
- type PingMessage
- func (m *PingMessage) Begin()
- func (m *PingMessage) Cancel()
- func (m *PingMessage) Content() []byte
- func (m *PingMessage) End()
- func (m *PingMessage) Fail()
- func (m *PingMessage) IsPing() bool
- func (m *PingMessage) NotBefore() time.Time
- func (m *PingMessage) Queue() string
- func (m *PingMessage) Requeue()
- func (m *PingMessage) Retry() int
- func (m *PingMessage) Timestamp() time.Time
- func (m *PingMessage) Topic() string
- func (m *PingMessage) UniqueID() string
- type ProcessStatus
- type PublishOption
- type PublishOptions
- type Queue
- type SubscribeOption
- func WithConsumeComponent(component string) SubscribeOption
- func WithConsumeConcurrency(concurrency int) SubscribeOption
- func WithConsumeIdempotent(impl Idempotent) SubscribeOption
- func WithConsumeProduct(product string) SubscribeOption
- func WithConsumeRetry(retry int) SubscribeOption
- func WithConsumeTopic(topic string) SubscribeOption
- type SubscribeOptions
Constants ¶
const ( DefaultTopic = "default" DefaultConcurrency = 1 DefaultMaxRetry = 3 )
Variables ¶
var EmptyPublishOptions = func() *PublishOptions { return &PublishOptions{ Context: context.Background(), } }
var EmptySubscribeOptions = func() *SubscribeOptions { return &SubscribeOptions{ Topic: DefaultTopic, Concurrency: DefaultConcurrency, MaxRetry: DefaultMaxRetry, Idempotent: IdempotentImplementor(), } }
Functions ¶
func RegisterBackendImplementor ¶
func RegisterBackendImplementor(backend Backend)
RegisterBackendImplementor registers the queue backend service implementor.
func RegisterIdempotentImplementor ¶
func RegisterIdempotentImplementor(idempotent Idempotent)
RegisterIdempotentImplementor registers the idempotent service implementor.
func RegisterImplementor ¶
func RegisterImplementor(s Queue)
RegisterImplementor registers the queue service implementor.
func StreamServerInterceptor ¶
func StreamServerInterceptor() grpc.StreamServerInterceptor
StreamServerInterceptor returns a new streaming server interceptor for message queue service.
func UnaryServerInterceptor ¶
func UnaryServerInterceptor() grpc.UnaryServerInterceptor
UnaryServerInterceptor returns a new unary server interceptor for message queue service.
Types ¶
type Backend ¶
type Backend interface {
// Type returns backend type.
Type() string
// Ping connects the backend server if not connected.
// Will be called before every Read/Write operation.
Ping() error
// MaxDelay returns the max delay duration supported by the backend.
// A negative value means no limitation.
// A zero value means delay operation is not supported.
MaxDelay() time.Duration
// GetQueues returns all queue names in backend storage.
GetQueues() ([]string, error)
// GetTopics returns all queue/topics in backend storage.
GetTopics() (map[string][]string, error)
// GetQueueLength returns all topic length of specified queue in backend storage.
GetQueueLength(queue string) (map[string]int64, error)
// GetTopicLength returns the specified queue/topic length in backend storage.
GetTopicLength(queue, topic string) (int64, error)
// Read subscribes the message of the specified queue and topic.
Read(ctx context.Context, queue, topic string, ch chan<- MessageWrapper) error
// Write publishes content data to the specified queue.
Write(ctx context.Context, queue string, delay time.Duration, content []byte) error
}
Backend interface.
func BackendImplementor ¶
func BackendImplementor() Backend
BackendImplementor returns the queue backend service implementor.
type ConsumerFunc ¶
type Idempotent ¶
type Idempotent interface {
// BeforeProcess should be invoked before process message.
// Returns true to continue the message processing.
// Returns false to invoke Cancel for the message.
BeforeProcess(Message) bool
// AfterProcess should be invoked after processing.
AfterProcess(Message, ProcessStatus)
}
Idempotent interface.
func IdempotentImplementor ¶
func IdempotentImplementor() Idempotent
IdempotentImplementor returns the idempotent service implementor.
type Message ¶
type Message interface {
// Queue name of this message.
Queue() string
// Topic name of this message.
Topic() string
// UniqueID returns the unique ID of this message.
UniqueID() string
// Content returns the message body content.
Content() []byte
// Timestamp indicates the creation time of the message.
Timestamp() time.Time
// NotBefore indicates the message should not be processed before this timestamp.
NotBefore() time.Time
// Retry times.
Retry() int
// IsPing returns true for a ping message.
IsPing() bool
}
Message interface.
type MessageOperation ¶
type MessageOperation interface {
// Begin to process the message.
Begin()
// Cancel indicates the message should be ignored.
Cancel()
// End indicates a successful process.
End()
// Requeue indicates the message should be retried.
Requeue()
// Fail indicates a failed process.
Fail()
}
MessageOperation interface.
type MessageWrapper ¶
type MessageWrapper interface {
Message
MessageOperation
}
MessageWrapper interface.
type PingMessage ¶
type PingMessage struct{}
func (*PingMessage) Cancel ¶
func (m *PingMessage) Cancel()
Cancel indicates the message should be ignored.
func (*PingMessage) Content ¶
func (m *PingMessage) Content() []byte
Content returns the message body content.
func (*PingMessage) IsPing ¶
func (m *PingMessage) IsPing() bool
IsPing returns true for a ping message.
func (*PingMessage) NotBefore ¶
func (m *PingMessage) NotBefore() time.Time
NotBefore indicates the message should not be processed before this timestamp.
func (*PingMessage) Requeue ¶
func (m *PingMessage) Requeue()
Requeue indicates the message should be retried.
func (*PingMessage) Timestamp ¶
func (m *PingMessage) Timestamp() time.Time
Timestamp indicates the creation time of the message.
func (*PingMessage) UniqueID ¶
func (m *PingMessage) UniqueID() string
UniqueID returns the unique ID of this message.
type ProcessStatus ¶
type ProcessStatus int
ProcessStatus type.
const ( Created ProcessStatus = iota Processing Canceled Succeeded Failed Requeued )
type PublishOption ¶
type PublishOption func(*PublishOptions)
func WithPublishContext ¶
func WithPublishContext(ctx context.Context) PublishOption
func WithPublishDelay ¶
func WithPublishDelay(delay time.Duration) PublishOption
type Queue ¶
type Queue interface {
// Publish writes a message body to the specified queue.
Publish(queue string, content []byte, opts ...PublishOption) error
// Subscribe consumes the messages of the specified queue.
Subscribe(queue string, handler Consumer, opts ...SubscribeOption) error
}
func ContextQueueService ¶
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func WithConsumeComponent ¶
func WithConsumeComponent(component string) SubscribeOption
func WithConsumeConcurrency ¶
func WithConsumeConcurrency(concurrency int) SubscribeOption
func WithConsumeIdempotent ¶
func WithConsumeIdempotent(impl Idempotent) SubscribeOption
func WithConsumeProduct ¶
func WithConsumeProduct(product string) SubscribeOption
func WithConsumeRetry ¶
func WithConsumeRetry(retry int) SubscribeOption
func WithConsumeTopic ¶
func WithConsumeTopic(topic string) SubscribeOption