Documentation ¶
Index ¶
Constants ¶
This section is empty.
Variables ¶
var DefaultMessageBatchHandlerConfig = &BatchMessageHandlerConfig{ DelayThreshold: 10 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, NumGoroutines: 10, BufferedByteLimit: 10 * pubsub.MaxPublishRequestBytes, }
Functions ¶
This section is empty.
Types ¶
type BatchError ¶ added in v0.3.0
BatchError is used to handle error for each message The key is message id
func (BatchError) Error ¶ added in v0.3.0
func (b BatchError) Error() string
type BatchMessageHandlerConfig ¶ added in v0.3.0
type BatchMessageHandlerConfig struct { // Process a non-empty batch after this delay has passed. // Defaults to DefaultMessageBatchHandlerConfig.DelayThreshold. DelayThreshold time.Duration // Process a batch when it has this many messages. // Defaults to DefaultMessageBatchHandlerConfig.CountThreshold. CountThreshold int // Process a batch when its size in bytes reaches this value. // Defaults to DefaultMessageBatchHandlerConfig.ByteThreshold. ByteThreshold int // The number of goroutines. // Defaults to DefaultMessageBatchHandlerConfig.NumGoroutines. NumGoroutines int // Defaults to DefaultMessageBatchHandlerConfig.BufferedByteLimit. BufferedByteLimit int }
type MessageBatchHandler ¶ added in v0.3.0
MessageBatchHandler defines the batch message handler By default, when non-nil error is returned, all messages are processed as error in MessageHandler To handle error for each message, use BatchError
type MessageHandler ¶
MessageHandler defines the message handler invoked by SubscriptionInterceptor to complete the normal message handling.
func NewBatchMessageHandler ¶ added in v0.3.0
func NewBatchMessageHandler(handler MessageBatchHandler, config BatchMessageHandlerConfig) MessageHandler
NewBatchMessageHandler initializes MessageHandler for batch message processing with config
type MessagePublisher ¶
type MessagePublisher = func(ctx context.Context, topic *pubsub.Topic, m *pubsub.Message) *pubsub.PublishResult
MessagePublisher defines the message publisher invoked by PublishInterceptor to complete the normal message publishment.
type PublishInterceptor ¶
type PublishInterceptor = func(next MessagePublisher) MessagePublisher
PublishInterceptor provides a hook to intercept the execution of a publishment.
type Publisher ¶
Publisher represents a wrapper of Pub/Sub client focusing on publishment.
func NewPublisher ¶
func NewPublisher(pubsubClient *pubsub.Client, opt ...PublisherOption) *Publisher
NewPublisher initializes new Publisher.
type PublisherOption ¶
type PublisherOption interface {
// contains filtered or unexported methods
}
PublisherOption is a option to change publisher configuration.
func WithPublishInterceptor ¶
func WithPublishInterceptor(interceptors ...PublishInterceptor) PublisherOption
WithPublishInterceptor sets publish interceptors.
type Subscriber ¶
type Subscriber struct {
// contains filtered or unexported fields
}
Subscriber represents a wrapper of Pub/Sub client mainly focusing on pull subscription.
func NewSubscriber ¶
func NewSubscriber(pubsubClient *pubsub.Client, opt ...SubscriberOption) *Subscriber
NewSubscriber initializes new Subscriber.
func (*Subscriber) Close ¶
func (s *Subscriber) Close()
Close closes running subscriptions gracefully.
func (*Subscriber) HandleSubscriptionFunc ¶
func (s *Subscriber) HandleSubscriptionFunc(subscription *pubsub.Subscription, f MessageHandler) error
HandleSubscriptionFunc registers subscription handler for the given id's subscription. If subscription does not exist, it will return error.
func (*Subscriber) HandleSubscriptionFuncMap ¶ added in v0.1.2
func (s *Subscriber) HandleSubscriptionFuncMap(funcMap map[*pubsub.Subscription]MessageHandler) error
HandleSubscriptionFuncMap registers multiple subscription handlers at once. This function take map of key[subscription id]: value[corresponding message handler] pairs.
func (*Subscriber) Run ¶
func (s *Subscriber) Run(ctx context.Context)
Run starts running registered pull subscriptions.
type SubscriberOption ¶
type SubscriberOption interface {
// contains filtered or unexported methods
}
SubscriberOption is a option to change subscriber configuration.
func WithSubscriptionInterceptor ¶
func WithSubscriptionInterceptor(interceptors ...SubscriptionInterceptor) SubscriberOption
WithSubscriptionInterceptor sets subscription interceptors.
type SubscriptionInfo ¶ added in v0.1.1
SubscriptionInfo contains various info about the subscription.
type SubscriptionInterceptor ¶
type SubscriptionInterceptor = func(info *SubscriptionInfo, next MessageHandler) MessageHandler
SubscriptionInterceptor provides a hook to intercept the execution of a message handling.