shuttle

package module
v2.6.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 8, 2024 License: MIT Imports: 18 Imported by: 0

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func SetCorrelationId

func SetCorrelationId(correlationId *string) func(msg *azservicebus.Message) error

SetCorrelationId sets the ServiceBus message's correlation ID to a user-specified value

func SetLoggerFunc added in v2.4.0

func SetLoggerFunc(fn func(ctx context.Context) Logger)

SetLoggerFunc sets the function to be used to acquire a logger when go-shuttle logs.

func SetMessageDelay

func SetMessageDelay(delay time.Duration) func(msg *azservicebus.Message) error

SetMessageDelay schedules a message in the future

func SetMessageId

func SetMessageId(messageId *string) func(msg *azservicebus.Message) error

SetMessageId sets the ServiceBus message's ID to a user-specified value

func SetMessageTTL

func SetMessageTTL(ttl time.Duration) func(msg *azservicebus.Message) error

SetMessageTTL sets the ServiceBus message's TimeToLive to a user-specified value

func SetScheduleAt

func SetScheduleAt(t time.Time) func(msg *azservicebus.Message) error

SetScheduleAt schedules a message to be enqueued in the future

func WithReceiverSpanNameFormatter added in v2.6.0

func WithReceiverSpanNameFormatter(format func(defaultSpanName string, message *azservicebus.ReceivedMessage) string) func(t *TracingHandlerOpts)

WithReceiverSpanNameFormatter allows formatting name of the span started by the tracing handler in NewTracingHandler.

func WithSpanStartOptions added in v2.6.0

func WithSpanStartOptions(options []trace.SpanStartOption) func(t *TracingHandlerOpts)

WithSpanStartOptions allows setting custom span start options for the tracing handler in NewTracingHandler.

func WithTracePropagation

func WithTracePropagation(ctx context.Context) func(msg *azservicebus.Message) error

WithTracePropagation is a sender option to inject the trace context into the message

func WithTraceProvider added in v2.6.0

func WithTraceProvider(tp trace.TracerProvider) func(t *TracingHandlerOpts)

WithTraceProvider allows setting a custom trace provider for the tracing handler in NewTracingHandler.

Types

type Abandon

type Abandon struct {
	// contains filtered or unexported fields
}

Abandon settlement will cause a message to be available again from the queue or subscription. This will increment its delivery count, and potentially cause it to be dead-lettered depending on your queue or subscription's configuration.

func (*Abandon) Settle

func (a *Abandon) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type AzServiceBusSender

type AzServiceBusSender interface {
	SendMessage(ctx context.Context, message *azservicebus.Message, options *azservicebus.SendMessageOptions) error
	SendMessageBatch(ctx context.Context, batch *azservicebus.MessageBatch, options *azservicebus.SendMessageBatchOptions) error
	NewMessageBatch(ctx context.Context, options *azservicebus.MessageBatchOptions) (*azservicebus.MessageBatch, error)
}

AzServiceBusSender is satisfied by *azservicebus.Sender

type Complete

type Complete struct {
	// contains filtered or unexported fields
}

Complete settlement completes a message, deleting it from the queue or subscription.

func (*Complete) Settle

func (a *Complete) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type ConstantDelayStrategy

type ConstantDelayStrategy struct {
	Delay time.Duration
}

ConstantDelayStrategy delays the message retry by the given duration

func (*ConstantDelayStrategy) GetDelay

func (s *ConstantDelayStrategy) GetDelay(_ uint32) time.Duration

type DeadLetter

type DeadLetter struct {
	// contains filtered or unexported fields
}

DeadLetter settlement moves the message to the dead letter queue for a queue or subscription. To process deadlettered messages, create a receiver with `Client.NewReceiverForQueue()` or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option.

func (*DeadLetter) Settle

func (a *DeadLetter) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type DefaultJSONMarshaller

type DefaultJSONMarshaller struct {
}

DefaultJSONMarshaller is the default marshaller for JSON messages

func (*DefaultJSONMarshaller) ContentType

func (j *DefaultJSONMarshaller) ContentType() string

ContentType returns the content type for the JSON marshaller

func (*DefaultJSONMarshaller) Marshal

Marshal marshals the user-input struct into a JSON string and returns a new message with the JSON string as the body

func (*DefaultJSONMarshaller) Unmarshal

Unmarshal unmarshals the message body from a JSON string into the user-input struct

type DefaultProtoMarshaller

type DefaultProtoMarshaller struct {
}

DefaultProtoMarshaller is the default marshaller for protobuf messages

func (*DefaultProtoMarshaller) ContentType

func (p *DefaultProtoMarshaller) ContentType() string

ContentType returns teh contentType for the protobuf marshaller

func (*DefaultProtoMarshaller) Marshal

Marshal marshals the user-input struct into a protobuf message and returns a new ServiceBus message with the protofbuf message as the body

func (*DefaultProtoMarshaller) Unmarshal

Unmarshal unmarshalls the protobuf message from the ServiceBus message into the user-input struct

type Defer

type Defer struct {
	// contains filtered or unexported fields
}

Defer settlement will cause a message to be deferred. Deferred messages are moved to ta deferred queue. They can only be received using `Receiver.ReceiveDeferredMessages`.

func (*Defer) Settle

func (a *Defer) Settle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type Handler

type Handler interface {
	Handle(context.Context, MessageSettler, *azservicebus.ReceivedMessage)
}

type HandlerFunc

type HandlerFunc func(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

HandlerFunc is a func to handle the message received from a subscription

func NewLockRenewalHandler added in v2.4.0

func NewLockRenewalHandler(lockRenewer LockRenewer, options *LockRenewalOptions, handler Handler) HandlerFunc

NewLockRenewalHandler returns a middleware handler that will renew the lock on the message at the specified interval.

func NewPanicHandler

func NewPanicHandler(panicOptions *PanicHandlerOptions, handler Handler) HandlerFunc

NewPanicHandler recovers panics from downstream handlers

func NewRenewLockHandler deprecated

func NewRenewLockHandler(lockRenewer LockRenewer, interval *time.Duration, handler Handler) HandlerFunc

Deprecated: use NewLockRenewalHandler NewRenewLockHandler starts a renewlock goroutine for each message received.

func NewSettlementHandler

func NewSettlementHandler(opts *SettlementHandlerOptions, handler Settler) HandlerFunc

NewSettlementHandler creates a middleware to use the Settlement api in the message handler implementation.

Example
package main

import (
	"context"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"

	"github.com/Azure/go-shuttle/v2"
)

func main() {
	tokenCredential, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}
	client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil)
	if err != nil {
		panic(err)
	}
	receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil)
	if err != nil {
		panic(err)
	}
	lockRenewalInterval := 10 * time.Second
	p := shuttle.NewProcessor(receiver,
		shuttle.NewPanicHandler(nil,
			shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
				shuttle.NewSettlementHandler(nil, mySettlingHandler()))), &shuttle.ProcessorOptions{MaxConcurrency: 10})

	ctx, cancel := context.WithCancel(context.Background())
	err = p.Start(ctx)
	if err != nil {
		panic(err)
	}
	cancel()
}

func mySettlingHandler() shuttle.Settler {
	return func(ctx context.Context, message *azservicebus.ReceivedMessage) shuttle.Settlement {
		return &shuttle.Complete{}
	}
}
Output:

func NewTracingHandler

func NewTracingHandler(next Handler, options ...func(t *TracingHandlerOpts)) HandlerFunc

NewTracingHandler is a shuttle middleware that extracts the context from the message Application property if available, or from the existing context if not, and starts a span.

func (HandlerFunc) Handle

func (f HandlerFunc) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type LockRenewalOptions added in v2.4.0

type LockRenewalOptions struct {
	// Interval defines the frequency at which we renew the lock on the message. Defaults to 10 seconds.
	Interval *time.Duration
	// CancelMessageContextOnStop will cancel the downstream message context when the renewal handler is stopped.
	// Defaults to true.
	CancelMessageContextOnStop *bool
	// MetricRecorder allows to pass a custom metric recorder for the LockRenewer.
	// Defaults to processor.Metric instance.
	MetricRecorder processor.Recorder
}

LockRenewalOptions configures the lock renewal.

type LockRenewer

type LockRenewer interface {
	RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
}

LockRenewer abstracts the servicebus receiver client to only expose lock renewal

type Logger

type Logger interface {
	Info(s string)
	Warn(s string)
	Error(s string)
}

type ManagedSettler

type ManagedSettler struct {
	// contains filtered or unexported fields
}

ManagedSettler is a middleware that allows to reduce the message handler signature to ManagedSettlingFunc

func NewManagedSettlingHandler

func NewManagedSettlingHandler(opts *ManagedSettlingOptions, handler ManagedSettlingHandler) *ManagedSettler

NewManagedSettlingHandler allows to configure Retry decision logic and delay strategy. It also adapts the handler to let the user return an error from the handler, instead of a settlement. the settlement is inferred from the handler's return value. error -> abandon nil -> complete the RetryDecision can be overridden and can inspect the error returned to decide to retry the message or not. this allows to define error types that shouldn't be retried (and moved directly to the deadletter queue)

Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"

	"github.com/Azure/go-shuttle/v2"
)

func main() {
	tokenCredential, err := azidentity.NewDefaultAzureCredential(nil)
	if err != nil {
		panic(err)
	}
	client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil)
	if err != nil {
		panic(err)
	}
	receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil)
	if err != nil {
		panic(err)
	}
	lockRenewalInterval := 10 * time.Second
	p := shuttle.NewProcessor(receiver,
		shuttle.NewPanicHandler(nil,
			shuttle.NewLockRenewalHandler(receiver, &shuttle.LockRenewalOptions{Interval: &lockRenewalInterval},
				shuttle.NewManagedSettlingHandler(&shuttle.ManagedSettlingOptions{
					RetryDecision:      &shuttle.MaxAttemptsRetryDecision{MaxAttempts: 2},
					RetryDelayStrategy: &shuttle.ConstantDelayStrategy{Delay: 2 * time.Second},
					OnAbandoned: func(ctx context.Context, message *azservicebus.ReceivedMessage, err error) {
						fmt.Printf("message abandoned due to error: %s\n", err)
					},
					OnDeadLettered: func(ctx context.Context, message *azservicebus.ReceivedMessage, err error) {
						fmt.Printf("message deadlettered due to error: %s\n", err)
					},
				}, myManagedSettlementHandler()))), &shuttle.ProcessorOptions{MaxConcurrency: 10})

	ctx, cancel := context.WithCancel(context.Background())
	err = p.Start(ctx)
	if err != nil {
		panic(err)
	}
	cancel()
}

func myManagedSettlementHandler() shuttle.ManagedSettlingHandler {
	count := 0
	return shuttle.ManagedSettlingFunc(func(ctx context.Context, message *azservicebus.ReceivedMessage) error {
		count++
		if count == 0 {
			// this will abandon the message
			return fmt.Errorf("this will abandon the message, and eventually move it to DLQ")
		}
		return nil // this will complete the message
	})
}
Output:

func (*ManagedSettler) Handle

func (m *ManagedSettler) Handle(ctx context.Context, settler MessageSettler, message *azservicebus.ReceivedMessage)

type ManagedSettlingFunc

type ManagedSettlingFunc func(ctx context.Context, message *azservicebus.ReceivedMessage) error

ManagedSettlingFunc allows to convert a function with the signature func(context.Context, *azservicebus.ReceivedMessage) error to the ManagedSettlingHandler interface.

func (ManagedSettlingFunc) Handle added in v2.3.0

type ManagedSettlingHandler added in v2.3.0

type ManagedSettlingHandler interface {
	Handle(context.Context, *azservicebus.ReceivedMessage) error
}

ManagedSettlingHandler is the message Handler interface for the ManagedSettler.

type ManagedSettlingOptions

type ManagedSettlingOptions struct {
	// Allows to override the built-in error handling logic.
	// OnError is called before any message settling action is taken.
	// the ManagedSettlingOptions struct is passed as an argument so that the configuration
	// like RetryDecision, RetryDelayStrategy and the post-settlement hooks can be reused and composed differently
	OnError func(ctx context.Context, opts *ManagedSettlingOptions, settler MessageSettler, message *azservicebus.ReceivedMessage, handleErr error)
	// RetryDecision is invoked to decide whether an error should be retried.
	// the default is to retry 5 times before moving the message to the deadletter.
	RetryDecision RetryDecision
	// RetryDelayStrategy is invoked when a message handling does not complete successfully
	// and the RetryDecision decides to retry the message.
	// The handler will sleep for the time calculated by the delayStrategy before Abandoning the message.
	RetryDelayStrategy RetryDelayStrategy
	// OnAbandoned is invoked when the handler returns an error. It is invoked after the message is abandoned.
	OnAbandoned func(context.Context, *azservicebus.ReceivedMessage, error)
	// OnDeadLettered is invoked after the ManagedSettling dead-letters a message.
	// this occurs when the RetryDecision.CanRetry implementation returns false following an error returned by the handler
	// It is invoked after the message is dead-lettered.
	OnDeadLettered func(context.Context, *azservicebus.ReceivedMessage, error)
	// OnCompleted is a func that is invoked when the handler does not return any error. it is invoked after the message is completed.
	OnCompleted func(context.Context, *azservicebus.ReceivedMessage)
}

ManagedSettlingOptions allows to configure the ManagedSettling middleware

type Marshaller

type Marshaller interface {
	Marshal(mb MessageBody) (*azservicebus.Message, error)
	Unmarshal(msg *azservicebus.Message, mb MessageBody) error
	ContentType() string
}

type MaxAttemptsRetryDecision

type MaxAttemptsRetryDecision struct {
	MaxAttempts uint32
}

MaxAttemptsRetryDecision defines how many delivery the handler allows before explicitly moving the message to the deadletter queue. This requires the MaxDeliveryCount from the queue or subscription to be higher than the MaxAttempts property. If the queue or subscription's MaxDeliveryCount is lower than MaxAttempts, service bus will move the message to the DLQ before the handler reaches the MaxAttempts.

func (*MaxAttemptsRetryDecision) CanRetry

type MessageBody

type MessageBody any

MessageBody is a type to represent that an input message body can be of any type

type MessageSettler

type MessageSettler interface {
	AbandonMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.AbandonMessageOptions) error
	CompleteMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.CompleteMessageOptions) error
	DeadLetterMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeadLetterOptions) error
	DeferMessage(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.DeferMessageOptions) error
	RenewMessageLock(ctx context.Context, message *azservicebus.ReceivedMessage, options *azservicebus.RenewMessageLockOptions) error
}

MessageSettler is passed to the handlers. it exposes the message settling functionality from the receiver needed within the handler.

type NoOp

type NoOp struct {
}

NoOp settlement exits the handler without taking an action, letting the message's peek lock expire before incrementing the delivery count, or moving it to the deadletter, depending on the queue or subscription's configuration

func (*NoOp) Settle

func (a *NoOp) Settle(ctx context.Context, _ MessageSettler, message *azservicebus.ReceivedMessage)

type PanicHandlerOptions

type PanicHandlerOptions struct {
	OnPanicRecovered func(
		ctx context.Context,
		settler MessageSettler,
		message *azservicebus.ReceivedMessage,
		recovered any)
}

type Processor

type Processor struct {
	// contains filtered or unexported fields
}

Processor encapsulates the message pump and concurrency handling of servicebus. it exposes a handler API to provides a middleware based message processing pipeline.

Example
tokenCredential, err := azidentity.NewDefaultAzureCredential(nil)
if err != nil {
	panic(err)
}
client, err := azservicebus.NewClient("myservicebus.servicebus.windows.net", tokenCredential, nil)
if err != nil {
	panic(err)
}
receiver, err := client.NewReceiverForSubscription("topic-a", "sub-a", nil)
if err != nil {
	panic(err)
}
lockRenewalInterval := 10 * time.Second
p := shuttle.NewProcessor(receiver,
	shuttle.NewPanicHandler(nil,
		shuttle.NewRenewLockHandler(receiver, &lockRenewalInterval,
			MyHandler(0*time.Second))), &shuttle.ProcessorOptions{MaxConcurrency: 10})

ctx, cancel := context.WithCancel(context.Background())
err = p.Start(ctx)
if err != nil {
	panic(err)
}
cancel()
Output:

func NewProcessor

func NewProcessor(receiver Receiver, handler HandlerFunc, options *ProcessorOptions) *Processor

func (*Processor) Start

func (p *Processor) Start(ctx context.Context) error

Start starts the processor and blocks until an error occurs or the context is canceled.

type ProcessorOptions

type ProcessorOptions struct {
	MaxConcurrency  int
	ReceiveInterval *time.Duration
}

ProcessorOptions configures the processor MaxConcurrency defaults to 1. Not setting MaxConcurrency, or setting it to 0 or a negative value will fallback to the default. ReceiveInterval defaults to 2 seconds if not set.

type Receiver

type Receiver interface {
	ReceiveMessages(ctx context.Context, maxMessages int, options *azservicebus.ReceiveMessagesOptions) ([]*azservicebus.ReceivedMessage, error)
	MessageSettler
}

type RetryDecision

type RetryDecision interface {
	// CanRetry inspects the error returned from the message handler, and the message itself to decide if it should be retried or not.
	CanRetry(err error, message *azservicebus.ReceivedMessage) bool
}

RetryDecision allows to provide custom retry decision.

type RetryDelayStrategy

type RetryDelayStrategy interface {
	GetDelay(deliveryCount uint32) time.Duration
}

RetryDelayStrategy can be implemented to provide custom delay retry strategies.

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender contains an SBSender used to send the message to the ServiceBus queue and a Marshaller used to marshal any struct into a ServiceBus message

func NewSender

func NewSender(sender AzServiceBusSender, options *SenderOptions) *Sender

NewSender takes in a Sender and a Marshaller to create a new object that can send messages to the ServiceBus queue

func (*Sender) AzSender added in v2.2.0

func (d *Sender) AzSender() AzServiceBusSender

AzSender returns the underlying azservicebus.Sender instance.

func (*Sender) SendMessage

func (d *Sender) SendMessage(ctx context.Context, mb MessageBody, options ...func(msg *azservicebus.Message) error) error

SendMessage sends a payload on the bus. the MessageBody is marshalled and set as the message body.

func (*Sender) SendMessageBatch added in v2.2.0

func (d *Sender) SendMessageBatch(ctx context.Context, messages []*azservicebus.Message) error

SendMessageBatch sends the array of azservicebus messages as a batch.

func (*Sender) ToServiceBusMessage added in v2.2.0

func (d *Sender) ToServiceBusMessage(
	ctx context.Context,
	mb MessageBody,
	options ...func(msg *azservicebus.Message) error) (*azservicebus.Message, error)

ToServiceBusMessage transform a MessageBody into an azservicebus.Message. It marshals the body using the sender's configured marshaller, and set the bytes as the message.Body. the sender's configured options are applied to the azservicebus.Message before returning it.

type SenderOptions

type SenderOptions struct {
	// Marshaller will be used to marshall the messageBody to the azservicebus.Message Body property
	// defaults to DefaultJSONMarshaller
	Marshaller Marshaller
	// EnableTracingPropagation automatically applies WithTracePropagation option on all message sent through this sender
	EnableTracingPropagation bool
	// SendTimeout is the timeout value used on the context that sends messages
	// Defaults to 30 seconds if not set or 0
	// Disabled when set to a negative value
	SendTimeout time.Duration
}

type Settlement

type Settlement interface {
	Settle(context.Context, MessageSettler, *azservicebus.ReceivedMessage)
}

Settlement represents an action to take on a message. Abandon, Complete, DeadLetter, Defer, NoOp

type SettlementHandlerOptions

type SettlementHandlerOptions struct {
	// OnNilSettlement is a func that allows to handle cases where the downstream handler returns nil.
	// the default behavior is to panic.
	OnNilSettlement func() Settlement
}

SettlementHandlerOptions allows to configure the SettleHandler

type Settler

type Settler func(ctx context.Context, message *azservicebus.ReceivedMessage) Settlement

func (Settler) Handle

type TracingHandlerOpts added in v2.6.0

type TracingHandlerOpts struct {
	// contains filtered or unexported fields
}

Directories

Path Synopsis
Package metrics allows to configure, record and read go-shuttle metrics
Package metrics allows to configure, record and read go-shuttle metrics

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL