Documentation ¶
Index ¶
- func SetCorrelationId(correlationId *string) func(msg *azservicebus.Message) error
- func SetLoggerFunc(fn func(ctx context.Context) Logger)
- func SetMessageDelay(delay time.Duration) func(msg *azservicebus.Message) error
- func SetMessageId(messageId *string) func(msg *azservicebus.Message) error
- func SetMessageTTL(ttl time.Duration) func(msg *azservicebus.Message) error
- func SetScheduleAt(t time.Time) func(msg *azservicebus.Message) error
- func WithReceiverSpanNameFormatter(...) func(t *TracingHandlerOpts)
- func WithSpanStartOptions(options []trace.SpanStartOption) func(t *TracingHandlerOpts)
- func WithTracePropagation(ctx context.Context) func(msg *azservicebus.Message) error
- func WithTraceProvider(tp trace.TracerProvider) func(t *TracingHandlerOpts)
- type Abandon
- type AzServiceBusSender
- type Complete
- type ConstantDelayStrategy
- type DeadLetter
- type DefaultJSONMarshaller
- type DefaultProtoMarshaller
- type Defer
- type Handler
- type HandlerFunc
- func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc
- func NewPanicHandler(panicOptions *PanicHandlerOptions, handler Handler) HandlerFunc
- func NewRenewLockHandler(lockRenewer LockRenewer, interval *time.Duration, handler Handler) HandlerFuncdeprecated
- func NewSettlementHandler(opts *SettlementHandlerOptions, handler Settler) HandlerFunc
- func NewTracingHandler(next Handler, options ...func(t *TracingHandlerOpts)) HandlerFunc
- type LockRenewalOptions
- type LockRenewer
- type Logger
- type ManagedSettler
- type ManagedSettlingFunc
- type ManagedSettlingHandler
- type ManagedSettlingOptions
- type Marshaller
- type MaxAttemptsRetryDecision
- type MessageBody
- type MessageSettler
- type NoOp
- type PanicHandlerOptions
- type Processor
- type ProcessorOptions
- type Receiver
- type RetryDecision
- type RetryDelayStrategy
- type Sender
- func (d *Sender) AzSender() AzServiceBusSender
- func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, ...) error
- func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.Message) error
- func (d *Sender) ToServiceBusMessage(ctx context.Context, mb MessageBody, ...) (*azservicebus.Message, error)
- type SenderOptions
- type Settlement
- type SettlementHandlerOptions
- type Settler
- type TracingHandlerOpts
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func SetCorrelationId ¶
func SetCorrelationId(correlationId *string) func(msg *azservicebus.Message) error
SetCorrelationId sets the ServiceBus message's correlation ID to a user-specified value
func SetLoggerFunc ¶ added in v2.4.0
SetLoggerFunc sets the function to be used to acquire a logger when go-shuttle logs.
func SetMessageDelay ¶
func SetMessageDelay(delay time.Duration) func(msg *azservicebus.Message) error
SetMessageDelay schedules a message in the future
func SetMessageId ¶
func SetMessageId(messageId *string) func(msg *azservicebus.Message) error
SetMessageId sets the ServiceBus message's ID to a user-specified value
func SetMessageTTL ¶
func SetMessageTTL(ttl time.Duration) func(msg *azservicebus.Message) error
SetMessageTTL sets the ServiceBus message's TimeToLive to a user-specified value
func SetScheduleAt ¶
func SetScheduleAt(t time.Time) func(msg *azservicebus.Message) error
SetScheduleAt schedules a message to be enqueued in the future
func WithReceiverSpanNameFormatter ¶ added in v2.6.0
func WithReceiverSpanNameFormatter(format func(defaultSpanName string, message *azservicebus.ReceivedMessage) string) func(t *TracingHandlerOpts)
WithReceiverSpanNameFormatter allows formatting name of the span started by the tracing handler in NewTracingHandler.
func WithSpanStartOptions ¶ added in v2.6.0
func WithSpanStartOptions(options []trace.SpanStartOption) func(t *TracingHandlerOpts)
WithSpanStartOptions allows setting custom span start options for the tracing handler in NewTracingHandler.
func WithTracePropagation ¶
func WithTracePropagation(ctx context.Context) func(msg *azservicebus.Message) error
WithTracePropagation is a sender option to inject the trace context into the message
func WithTraceProvider ¶ added in v2.6.0
func WithTraceProvider(tp trace.TracerProvider) func(t *TracingHandlerOpts)
WithTraceProvider allows setting a custom trace provider for the tracing handler in NewTracingHandler.
Types ¶
type Abandon ¶
type Abandon struct {
// contains filtered or unexported fields
}
Abandon settlement will cause a message to be available again from the queue or subscription. This will increment its delivery count, and potentially cause it to be dead-lettered depending on your queue or subscription's configuration.
func (*Abandon) Settle ¶
func (a *Abandon) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type AzServiceBusSender ¶
type AzServiceBusSender interface { SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error SendMessageBatch(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error NewMessageBatch(ctx context.Context, options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error) }
AzServiceBusSender is satisfied by *azservicebus.Sender
type Complete ¶
type Complete struct {
// contains filtered or unexported fields
}
Complete settlement completes a message, deleting it from the queue or subscription.
func (*Complete) Settle ¶
func (a *Complete) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type ConstantDelayStrategy ¶
ConstantDelayStrategy delays the message retry by the given duration
type DeadLetter ¶
type DeadLetter struct {
// contains filtered or unexported fields
}
DeadLetter settlement moves the message to the dead letter queue for a queue or subscription. To process deadlettered messages, create a receiver with `Client.NewReceiverForQueue()` or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option.
func (*DeadLetter) Settle ¶
func (a *DeadLetter) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type DefaultJSONMarshaller ¶
type DefaultJSONMarshaller struct { }
DefaultJSONMarshaller is the default marshaller for JSON messages
func (*DefaultJSONMarshaller) ContentType ¶
func (j *DefaultJSONMarshaller) ContentType() string
ContentType returns the content type for the JSON marshaller
func (*DefaultJSONMarshaller) Marshal ¶
func (j *DefaultJSONMarshaller) Marshal(mb MessageBody) (*azservicebus.Message, error)
Marshal marshals the user-input struct into a JSON string and returns a new message with the JSON string as the body
func (*DefaultJSONMarshaller) Unmarshal ¶
func (j *DefaultJSONMarshaller) Unmarshal(msg *azservicebus.Message, mb MessageBody) error
Unmarshal unmarshals the message body from a JSON string into the user-input struct
type DefaultProtoMarshaller ¶
type DefaultProtoMarshaller struct { }
DefaultProtoMarshaller is the default marshaller for protobuf messages
func (*DefaultProtoMarshaller) ContentType ¶
func (p *DefaultProtoMarshaller) ContentType() string
ContentType returns teh contentType for the protobuf marshaller
func (*DefaultProtoMarshaller) Marshal ¶
func (p *DefaultProtoMarshaller) Marshal(mb MessageBody) (*azservicebus.Message, error)
Marshal marshals the user-input struct into a protobuf message and returns a new ServiceBus message with the protofbuf message as the body
func (*DefaultProtoMarshaller) Unmarshal ¶
func (p *DefaultProtoMarshaller) Unmarshal(msg *azservicebus.Message, mb MessageBody) error
Unmarshal unmarshalls the protobuf message from the ServiceBus message into the user-input struct
type Defer ¶
type Defer struct {
// contains filtered or unexported fields
}
Defer settlement will cause a message to be deferred. Deferred messages are moved to ta deferred queue. They can only be received using `Receiver.ReceiveDeferredMessages`.
func (*Defer) Settle ¶
func (a *Defer) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type Handler ¶
type Handler interface {
Handle(context.Context, MessageSettler, *azservicebus.ReceivedMessage)
}
type HandlerFunc ¶
type HandlerFunc func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
HandlerFunc is a func to handle the message received from a subscription
func NewLockRenewalHandler ¶ added in v2.4.0
func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc
NewLockRenewalHandler returns a middleware handler that will renew the lock on the message at the specified interval.
func NewPanicHandler ¶
func NewPanicHandler(panicOptions *PanicHandlerOptions, handler Handler) HandlerFunc
NewPanicHandler recovers panics from downstream handlers
func NewRenewLockHandler
deprecated
func NewRenewLockHandler(lockRenewer LockRenewer, interval *time.Duration, handler Handler) HandlerFunc
Deprecated: use NewLockRenewalHandler NewRenewLockHandler starts a renewlock goroutine for each message received.
func NewSettlementHandler ¶
func NewSettlementHandler(opts *SettlementHandlerOptions, handler Settler) HandlerFunc
NewSettlementHandler creates a middleware to use the Settlement api in the message handler implementation.
Example ¶
package main import ( "context" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/Azure/go-shuttle/v2" ) func main() { tokenCredential, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil) if err != nil { panic(err) } receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil) if err != nil { panic(err) } lockRenewalInterval := 10 * time.Second p := shuttle.NewProcessor(receiver, shuttle.NewPanicHandler(nil, shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval}, shuttle.NewSettlementHandler(nil, mySettlingHandler()))), &shuttle.ProcessorOptions{MaxConcurrency: 10}) ctx, cancel := context.WithCancel(context.Background()) err = p.Start(ctx) if err != nil { panic(err) } cancel() } func mySettlingHandler() shuttle.Settler { return func(ctx context.Context, message *azservicebus.ReceivedMessage) shuttle.Settlement { return &shuttle.Complete{} } }
Output:
func NewTracingHandler ¶
func NewTracingHandler(next Handler, options ...func(t *TracingHandlerOpts)) HandlerFunc
NewTracingHandler is a shuttle middleware that extracts the context from the message Application property if available, or from the existing context if not, and starts a span.
func (HandlerFunc) Handle ¶
func (f HandlerFunc) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type LockRenewalOptions ¶ added in v2.4.0
type LockRenewalOptions struct { // Interval defines the frequency at which we renew the lock on the message. Defaults to 10 seconds. Interval *time.Duration // CancelMessageContextOnStop will cancel the downstream message context when the renewal handler is stopped. // Defaults to true. CancelMessageContextOnStop *bool // MetricRecorder allows to pass a custom metric recorder for the LockRenewer. // Defaults to processor.Metric instance. MetricRecorder processor.Recorder }
LockRenewalOptions configures the lock renewal.
type LockRenewer ¶
type LockRenewer interface {
RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
}
LockRenewer abstracts the servicebus receiver client to only expose lock renewal
type ManagedSettler ¶
type ManagedSettler struct {
// contains filtered or unexported fields
}
ManagedSettler is a middleware that allows to reduce the message handler signature to ManagedSettlingFunc
func NewManagedSettlingHandler ¶
func NewManagedSettlingHandler(opts *ManagedSettlingOptions, handler ManagedSettlingHandler) *ManagedSettler
NewManagedSettlingHandler allows to configure Retry decision logic and delay strategy. It also adapts the handler to let the user return an error from the handler, instead of a settlement. the settlement is inferred from the handler's return value. error -> abandon nil -> complete the RetryDecision can be overridden and can inspect the error returned to decide to retry the message or not. this allows to define error types that shouldn't be retried (and moved directly to the deadletter queue)
Example ¶
package main import ( "context" "fmt" "time" "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/Azure/go-shuttle/v2" ) func main() { tokenCredential, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil) if err != nil { panic(err) } receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil) if err != nil { panic(err) } lockRenewalInterval := 10 * time.Second p := shuttle.NewProcessor(receiver, shuttle.NewPanicHandler(nil, shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval}, shuttle.NewManagedSettlingHandler(&shuttle.ManagedSettlingOptions{ RetryDecision: &shuttle.MaxAttemptsRetryDecision{MaxAttempts: 2}, RetryDelayStrategy: &shuttle.ConstantDelayStrategy{Delay: 2 * time.Second}, OnAbandoned: func(ctx context.Context, message *azservicebus.ReceivedMessage, err error) { fmt.Printf("message abandoned due to error: %s\n", err) }, OnDeadLettered: func(ctx context.Context, message *azservicebus.ReceivedMessage, err error) { fmt.Printf("message deadlettered due to error: %s\n", err) }, }, myManagedSettlementHandler()))), &shuttle.ProcessorOptions{MaxConcurrency: 10}) ctx, cancel := context.WithCancel(context.Background()) err = p.Start(ctx) if err != nil { panic(err) } cancel() } func myManagedSettlementHandler() shuttle.ManagedSettlingHandler { count := 0 return shuttle.ManagedSettlingFunc(func(ctx context.Context, message *azservicebus.ReceivedMessage) error { count++ if count == 0 { // this will abandon the message return fmt.Errorf("this will abandon the message, and eventually move it to DLQ") } return nil // this will complete the message }) }
Output:
func (*ManagedSettler) Handle ¶
func (m *ManagedSettler) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)
type ManagedSettlingFunc ¶
type ManagedSettlingFunc func(ctx context.Context, message *azservicebus.ReceivedMessage) error
ManagedSettlingFunc allows to convert a function with the signature func(context.Context, *azservicebus.ReceivedMessage) error to the ManagedSettlingHandler interface.
func (ManagedSettlingFunc) Handle ¶ added in v2.3.0
func (f ManagedSettlingFunc) Handle(ctx context.Context, message *azservicebus.ReceivedMessage) error
type ManagedSettlingHandler ¶ added in v2.3.0
type ManagedSettlingHandler interface {
Handle(context.Context, *azservicebus.ReceivedMessage) error
}
ManagedSettlingHandler is the message Handler interface for the ManagedSettler.
type ManagedSettlingOptions ¶
type ManagedSettlingOptions struct { // Allows to override the built-in error handling logic. // OnError is called before any message settling action is taken. // the ManagedSettlingOptions struct is passed as an argument so that the configuration // like RetryDecision, RetryDelayStrategy and the post-settlement hooks can be reused and composed differently OnError func(ctx context.Context, opts *ManagedSettlingOptions, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error) // RetryDecision is invoked to decide whether an error should be retried. // the default is to retry 5 times before moving the message to the deadletter. RetryDecision RetryDecision // RetryDelayStrategy is invoked when a message handling does not complete successfully // and the RetryDecision decides to retry the message. // The handler will sleep for the time calculated by the delayStrategy before Abandoning the message. RetryDelayStrategy RetryDelayStrategy // OnAbandoned is invoked when the handler returns an error. It is invoked after the message is abandoned. OnAbandoned func(context.Context, *azservicebus.ReceivedMessage, error) // OnDeadLettered is invoked after the ManagedSettling dead-letters a message. // this occurs when the RetryDecision.CanRetry implementation returns false following an error returned by the handler // It is invoked after the message is dead-lettered. OnDeadLettered func(context.Context, *azservicebus.ReceivedMessage, error) // OnCompleted is a func that is invoked when the handler does not return any error. it is invoked after the message is completed. OnCompleted func(context.Context, *azservicebus.ReceivedMessage) }
ManagedSettlingOptions allows to configure the ManagedSettling middleware
type Marshaller ¶
type Marshaller interface { Marshal(mb MessageBody) (*azservicebus.Message, error) Unmarshal(msg *azservicebus.Message, mb MessageBody) error ContentType() string }
type MaxAttemptsRetryDecision ¶
type MaxAttemptsRetryDecision struct {
MaxAttempts uint32
}
MaxAttemptsRetryDecision defines how many delivery the handler allows before explicitly moving the message to the deadletter queue. This requires the MaxDeliveryCount from the queue or subscription to be higher than the MaxAttempts property. If the queue or subscription's MaxDeliveryCount is lower than MaxAttempts, service bus will move the message to the DLQ before the handler reaches the MaxAttempts.
func (*MaxAttemptsRetryDecision) CanRetry ¶
func (d *MaxAttemptsRetryDecision) CanRetry(_ error, message *azservicebus.ReceivedMessage) bool
type MessageBody ¶
type MessageBody any
MessageBody is a type to represent that an input message body can be of any type
type MessageSettler ¶
type MessageSettler interface { AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error DeadLetterMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeadLetterOptions) error DeferMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeferMessageOptions) error RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error }
MessageSettler is passed to the handlers. it exposes the message settling functionality from the receiver needed within the handler.
type NoOp ¶
type NoOp struct { }
NoOp settlement exits the handler without taking an action, letting the message's peek lock expire before incrementing the delivery count, or moving it to the deadletter, depending on the queue or subscription's configuration
func (*NoOp) Settle ¶
func (a *NoOp) Settle(ctx context.Context, _ MessageSettler, message *azservicebus.ReceivedMessage)
type PanicHandlerOptions ¶
type PanicHandlerOptions struct { OnPanicRecovered func( ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage, recovered any) }
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor encapsulates the message pump and concurrency handling of servicebus. it exposes a handler API to provides a middleware based message processing pipeline.
Example ¶
tokenCredential, err := azidentity.NewDefaultAzureCredential(nil) if err != nil { panic(err) } client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil) if err != nil { panic(err) } receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil) if err != nil { panic(err) } lockRenewalInterval := 10 * time.Second p := shuttle.NewProcessor(receiver, shuttle.NewPanicHandler(nil, shuttle.NewRenewLockHandler(receiver, &lockRenewalInterval, MyHandler(0*time.Second))), &shuttle.ProcessorOptions{MaxConcurrency: 10}) ctx, cancel := context.WithCancel(context.Background()) err = p.Start(ctx) if err != nil { panic(err) } cancel()
Output:
func NewProcessor ¶
func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor
type ProcessorOptions ¶
ProcessorOptions configures the processor MaxConcurrency defaults to 1. Not setting MaxConcurrency, or setting it to 0 or a negative value will fallback to the default. ReceiveInterval defaults to 2 seconds if not set.
type Receiver ¶
type Receiver interface { ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error) MessageSettler }
type RetryDecision ¶
type RetryDecision interface { // CanRetry inspects the error returned from the message handler, and the message itself to decide if it should be retried or not. CanRetry(err error, message *azservicebus.ReceivedMessage) bool }
RetryDecision allows to provide custom retry decision.
type RetryDelayStrategy ¶
RetryDelayStrategy can be implemented to provide custom delay retry strategies.
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender contains an SBSender used to send the message to the ServiceBus queue and a Marshaller used to marshal any struct into a ServiceBus message
func NewSender ¶
func NewSender(sender AzServiceBusSender, options *SenderOptions) *Sender
NewSender takes in a Sender and a Marshaller to create a new object that can send messages to the ServiceBus queue
func (*Sender) AzSender ¶ added in v2.2.0
func (d *Sender) AzSender() AzServiceBusSender
AzSender returns the underlying azservicebus.Sender instance.
func (*Sender) SendMessage ¶
func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...func(msg *azservicebus.Message) error) error
SendMessage sends a payload on the bus. the MessageBody is marshalled and set as the message body.
func (*Sender) SendMessageBatch ¶ added in v2.2.0
SendMessageBatch sends the array of azservicebus messages as a batch.
func (*Sender) ToServiceBusMessage ¶ added in v2.2.0
func (d *Sender) ToServiceBusMessage( ctx context.Context, mb MessageBody, options ...func(msg *azservicebus.Message) error) (*azservicebus.Message, error)
ToServiceBusMessage transform a MessageBody into an azservicebus.Message. It marshals the body using the sender's configured marshaller, and set the bytes as the message.Body. the sender's configured options are applied to the azservicebus.Message before returning it.
type SenderOptions ¶
type SenderOptions struct { // Marshaller will be used to marshall the messageBody to the azservicebus.Message Body property // defaults to DefaultJSONMarshaller Marshaller Marshaller // EnableTracingPropagation automatically applies WithTracePropagation option on all message sent through this sender EnableTracingPropagation bool // SendTimeout is the timeout value used on the context that sends messages // Defaults to 30 seconds if not set or 0 // Disabled when set to a negative value SendTimeout time.Duration }
type Settlement ¶
type Settlement interface {
Settle(context.Context, MessageSettler, *azservicebus.ReceivedMessage)
}
Settlement represents an action to take on a message. Abandon, Complete, DeadLetter, Defer, NoOp
type SettlementHandlerOptions ¶
type SettlementHandlerOptions struct { // OnNilSettlement is a func that allows to handle cases where the downstream handler returns nil. // the default behavior is to panic. OnNilSettlement func() Settlement }
SettlementHandlerOptions allows to configure the SettleHandler
type Settler ¶
type Settler func(ctx context.Context, message *azservicebus.ReceivedMessage) Settlement
func (Settler) Handle ¶
func (s Settler) Handle(ctx context.Context, message *azservicebus.ReceivedMessage) Settlement
type TracingHandlerOpts ¶ added in v2.6.0
type TracingHandlerOpts struct {
// contains filtered or unexported fields
}