Documentation
¶
Index ¶
- Constants
- Variables
- func ContextWithQueueService(ctx context.Context) context.Context
- func RegisterBackendImplementor(backend Backend)
- func RegisterIdempotentImplementor(idempotent Idempotent)
- func RegisterImplementor(s Queue)
- func StreamServerInterceptor() grpc.StreamServerInterceptor
- func UnaryServerInterceptor() grpc.UnaryServerInterceptor
- type Backend
- type ConsumeOffset
- type Consumer
- type ConsumerFunc
- type Idempotent
- type Message
- type MessageOperation
- type MessageWrapper
- type PingMessage
- func (m *PingMessage) Begin()
- func (m *PingMessage) Cancel()
- func (m *PingMessage) Content() []byte
- func (m *PingMessage) End()
- func (m *PingMessage) Fail()
- func (m *PingMessage) Group() string
- func (m *PingMessage) IsPing() bool
- func (m *PingMessage) Key() string
- func (m *PingMessage) NotBefore() time.Time
- func (m *PingMessage) Properties() map[string]string
- func (m *PingMessage) Requeue()
- func (m *PingMessage) Retry() int
- func (m *PingMessage) Timestamp() time.Time
- func (m *PingMessage) Topic() string
- type ProcessStatus
- type PublishOption
- type PublishOptions
- type Queue
- type SubscribeOption
- func WithConsumeComponent(component string) SubscribeOption
- func WithConsumeConcurrency(concurrency int) SubscribeOption
- func WithConsumeGroup(name string) SubscribeOption
- func WithConsumeIdempotent(impl Idempotent) SubscribeOption
- func WithConsumeRetry(retry int) SubscribeOption
- func WithInitOffset(offset ConsumeOffset) SubscribeOption
- type SubscribeOptions
Constants ¶
const ( DefaultGroup = "default" DefaultConcurrency = 1 DefaultMaxRetry = 3 )
Variables ¶
var EmptyPublishOptions = func() *PublishOptions { key, _ := snowflake.NextID() return &PublishOptions{ Context: ictx.Context, Sequence: key, Key: strconv.FormatUint(key, 10), Properties: map[string]string{}, } }
var EmptySubscribeOptions = func() *SubscribeOptions { return &SubscribeOptions{ Group: DefaultGroup, Concurrency: DefaultConcurrency, MaxRetry: DefaultMaxRetry, Idempotent: IdempotentImplementor(), } }
Functions ¶
func RegisterBackendImplementor ¶
func RegisterBackendImplementor(backend Backend)
RegisterBackendImplementor registers the queue backend service implementor.
func RegisterIdempotentImplementor ¶
func RegisterIdempotentImplementor(idempotent Idempotent)
RegisterIdempotentImplementor registers the idempotent service implementor.
func RegisterImplementor ¶
func RegisterImplementor(s Queue)
RegisterImplementor registers the queue service implementor.
func StreamServerInterceptor ¶
func StreamServerInterceptor() grpc.StreamServerInterceptor
StreamServerInterceptor returns a new streaming server interceptor for message queue service.
func UnaryServerInterceptor ¶
func UnaryServerInterceptor() grpc.UnaryServerInterceptor
UnaryServerInterceptor returns a new unary server interceptor for message queue service.
Types ¶
type Backend ¶
type Backend interface {
// Type returns backend type.
Type() string
// Ping connects the backend server if not connected.
// Will be called before every Read/Write operation.
Ping() error
// Read subscribes the message of the specified topic.
Read(topic string, ch chan<- MessageWrapper, opts *SubscribeOptions) error
// Write publishes content data to the specified queue.
Write(topic string, content []byte, opts *PublishOptions) error
}
Backend interface.
func BackendImplementor ¶
func BackendImplementor() Backend
BackendImplementor returns the queue backend service implementor.
type ConsumeOffset ¶
type ConsumeOffset int
const ( ConsumeFromLatest ConsumeOffset = iota ConsumeFromEarliest )
type ConsumerFunc ¶
type Idempotent ¶
type Idempotent interface {
// BeforeProcess should be invoked before process message.
// Returns true to continue the message processing.
// Returns false to invoke Cancel for the message.
BeforeProcess(Message) bool
// AfterProcess should be invoked after processing.
AfterProcess(Message, ProcessStatus)
}
Idempotent interface.
func IdempotentImplementor ¶
func IdempotentImplementor() Idempotent
IdempotentImplementor returns the idempotent service implementor.
type Message ¶
type Message interface {
// Topic name of this message.
Topic() string
// Group name of this message.
Group() string
// Key returns the unique key ID of this message.
Key() string
// Content returns the message body content.
Content() []byte
// Properties returns the properties of this message.
Properties() map[string]string
// Timestamp indicates the creation time of the message.
Timestamp() time.Time
// NotBefore indicates the message should not be processed before this timestamp.
NotBefore() time.Time
// Retry times.
Retry() int
// IsPing returns true for a ping message.
IsPing() bool
}
Message interface.
type MessageOperation ¶
type MessageOperation interface {
// Begin to process the message.
Begin()
// Cancel indicates the message should be ignored.
Cancel()
// End indicates a successful process.
End()
// Requeue indicates the message should be retried.
Requeue()
// Fail indicates a failed process.
Fail()
}
MessageOperation interface.
type MessageWrapper ¶
type MessageWrapper interface {
Message
MessageOperation
}
MessageWrapper interface.
type PingMessage ¶
type PingMessage struct{}
func (*PingMessage) Cancel ¶
func (m *PingMessage) Cancel()
Cancel indicates the message should be ignored.
func (*PingMessage) Content ¶
func (m *PingMessage) Content() []byte
Content returns the message body content.
func (*PingMessage) IsPing ¶
func (m *PingMessage) IsPing() bool
IsPing returns true for a ping message.
func (*PingMessage) Key ¶
func (m *PingMessage) Key() string
Key returns the unique key ID of this message.
func (*PingMessage) NotBefore ¶
func (m *PingMessage) NotBefore() time.Time
NotBefore indicates the message should not be processed before this timestamp.
func (*PingMessage) Properties ¶
func (m *PingMessage) Properties() map[string]string
Properties returns the properties of this message.
func (*PingMessage) Requeue ¶
func (m *PingMessage) Requeue()
Requeue indicates the message should be retried.
func (*PingMessage) Timestamp ¶
func (m *PingMessage) Timestamp() time.Time
Timestamp indicates the creation time of the message.
type ProcessStatus ¶
type ProcessStatus int
ProcessStatus type.
const ( Created ProcessStatus = iota Processing Canceled Succeeded Failed Requeued )
type PublishOption ¶
type PublishOption func(*PublishOptions)
func WithProperty ¶
func WithProperty(key, value string) PublishOption
func WithPublishContext ¶
func WithPublishContext(ctx context.Context) PublishOption
func WithPublishDelay ¶
func WithPublishDelay(delay time.Duration) PublishOption
func WithPublishSequence ¶
func WithPublishSequence(seq uint64) PublishOption
func WithPublishUniqueKey ¶
func WithPublishUniqueKey(key string) PublishOption
type PublishOptions ¶
type Queue ¶
type Queue interface {
// Publish writes a message body to the specified topic.
Publish(topic string, content []byte, opts ...PublishOption) error
// Subscribe consumes the messages of the specified topic.
Subscribe(topic string, handler Consumer, opts ...SubscribeOption) error
}
Queue interface.
func ContextQueueService ¶
type SubscribeOption ¶
type SubscribeOption func(*SubscribeOptions)
func WithConsumeComponent ¶
func WithConsumeComponent(component string) SubscribeOption
func WithConsumeConcurrency ¶
func WithConsumeConcurrency(concurrency int) SubscribeOption
func WithConsumeGroup ¶
func WithConsumeGroup(name string) SubscribeOption
func WithConsumeIdempotent ¶
func WithConsumeIdempotent(impl Idempotent) SubscribeOption
func WithConsumeRetry ¶
func WithConsumeRetry(retry int) SubscribeOption
func WithInitOffset ¶
func WithInitOffset(offset ConsumeOffset) SubscribeOption
type SubscribeOptions ¶
type SubscribeOptions struct {
Component string
Group string
Concurrency int
MaxRetry int
InitOffset ConsumeOffset
Idempotent Idempotent
}