Documentation ¶
Overview ¶
Package azservicebus provides clients for sending and receiving messages with Azure ServiceBus as well as modifying resources like Queues, Topics and Subscriptions.
Index ¶
- type Client
- func (client *Client) Close(ctx context.Context) error
- func (client *Client) NewProcessorForQueue(queue string, options *ProcessorOptions) (*Processor, error)
- func (client *Client) NewProcessorForSubscription(topic string, subscription string, options *ProcessorOptions) (*Processor, error)
- func (client *Client) NewReceiverForQueue(queue string, options *ReceiverOptions) (*Receiver, error)
- func (client *Client) NewReceiverForSubscription(topic string, subscription string, options *ReceiverOptions) (*Receiver, error)
- func (client *Client) NewSender(queueOrTopic string) (*Sender, error)
- type ClientOptions
- type DeadLetterOptions
- type Message
- type MessageBatch
- type MessageBatchOptions
- type PeekOptions
- type Processor
- func (p *Processor) AbandonMessage(ctx context.Context, message *ReceivedMessage) error
- func (p *Processor) Close(ctx context.Context) error
- func (p *Processor) CompleteMessage(ctx context.Context, message *ReceivedMessage) error
- func (p *Processor) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error
- func (p *Processor) DeferMessage(ctx context.Context, message *ReceivedMessage) error
- func (p *Processor) Start(ctx context.Context, handleMessage func(message *ReceivedMessage) error, ...) error
- type ProcessorOptions
- type ReceiveMode
- type ReceiveOptions
- type ReceivedMessage
- type Receiver
- func (r *Receiver) AbandonMessage(ctx context.Context, message *ReceivedMessage) error
- func (r *Receiver) Close(ctx context.Context) error
- func (r *Receiver) CompleteMessage(ctx context.Context, message *ReceivedMessage) error
- func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error
- func (r *Receiver) DeferMessage(ctx context.Context, message *ReceivedMessage) error
- func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekOptions) ([]*ReceivedMessage, error)
- func (r *Receiver) ReceiveDeferredMessage(ctx context.Context, sequenceNumber int64) (*ReceivedMessage, error)
- func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64) ([]*ReceivedMessage, error)
- func (r *Receiver) ReceiveMessage(ctx context.Context, options *ReceiveOptions) (*ReceivedMessage, error)
- func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveOptions) ([]*ReceivedMessage, error)
- type ReceiverOptions
- type SendableMessage
- type Sender
- type SubQueue
Examples ¶
- Client.NewProcessorForQueue
- Client.NewProcessorForSubscription
- Client.NewReceiverForQueue
- Client.NewReceiverForSubscription
- Client.NewSender
- NewClient
- NewClientWithConnectionString
- Processor.Close
- Processor.Start
- Receiver.ReceiveMessages
- Sender.SendMessage (Message)
- Sender.SendMessage (MessageBatch)
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client provides methods to create Sender, Receiver and Processor instances to send and receive messages from Service Bus.
func NewClient ¶
func NewClient(fullyQualifiedNamespace string, credential azcore.TokenCredential, options *ClientOptions) (*Client, error)
NewClient creates a new Client for a Service Bus namespace, using a TokenCredential. A Client allows you create receivers (for queues or subscriptions) and senders (for queues and topics). fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net) credential is one of the credentials in the `github.com/Azure/azure-sdk-for-go/sdk/azidentity` package.
Example ¶
// NOTE: If you'd like to authenticate using a Service Bus connection string // look at `NewClientWithConnectionString` instead. credential, err := azidentity.NewDefaultAzureCredential(nil) exitOnError("Failed to create a DefaultAzureCredential", err) client, err = azservicebus.NewClient("<ex: myservicebus.servicebus.windows.net>", credential, nil) exitOnError("Failed to create ServiceBusClient in example", err)
Output:
func NewClientWithConnectionString ¶
func NewClientWithConnectionString(connectionString string, options *ClientOptions) (*Client, error)
NewClient creates a new Client for a Service Bus namespace, using a TokenCredential. A Client allows you create receivers (for queues or subscriptions) and senders (for queues and topics). connectionString is a Service Bus connection string for the namespace or for an entity.
Example ¶
// NOTE: If you'd like to authenticate via Azure Active Directory look at // the `NewClient` function instead. client, err = azservicebus.NewClientWithConnectionString(connectionString, nil) exitOnError("Failed to create ServiceBusClient in example", err)
Output:
func (*Client) Close ¶
Close closes the current connection Service Bus as well as any Sender, Receiver or Processors created using this client.
func (*Client) NewProcessorForQueue ¶
func (client *Client) NewProcessorForQueue(queue string, options *ProcessorOptions) (*Processor, error)
NewProcessor creates a Processor for a queue.
Example ¶
processor, err = client.NewProcessorForQueue( queueName, &azservicebus.ProcessorOptions{ // NOTE: this is a parameter you'll want to tune. It controls the number of // active message `handleMessage` calls that the processor will allow at any time. MaxConcurrentCalls: 1, ReceiveMode: azservicebus.PeekLock, ManualComplete: false, }, ) exitOnError("Failed to create Processor", err)
Output:
func (*Client) NewProcessorForSubscription ¶
func (client *Client) NewProcessorForSubscription(topic string, subscription string, options *ProcessorOptions) (*Processor, error)
NewProcessor creates a Processor for a subscription.
Example ¶
processor, err = client.NewProcessorForSubscription( topicName, subscriptionName, &azservicebus.ProcessorOptions{ // NOTE: this is a parameter you'll want to tune. It controls the number of // active message `handleMessage` calls that the processor will allow at any time. MaxConcurrentCalls: 1, ReceiveMode: azservicebus.PeekLock, ManualComplete: false, }, ) exitOnError("Failed to create Processor", err)
Output:
func (*Client) NewReceiverForQueue ¶
func (client *Client) NewReceiverForQueue(queue string, options *ReceiverOptions) (*Receiver, error)
NewReceiver creates a Receiver for a queue. A receiver allows you to receive messages.
Example ¶
receiver, err = client.NewReceiverForQueue( queueName, &azservicebus.ReceiverOptions{ ReceiveMode: azservicebus.PeekLock, }, ) exitOnError("Failed to create Receiver", err)
Output:
func (*Client) NewReceiverForSubscription ¶
func (client *Client) NewReceiverForSubscription(topic string, subscription string, options *ReceiverOptions) (*Receiver, error)
NewReceiver creates a Receiver for a subscription. A receiver allows you to receive messages.
Example ¶
receiver, err = client.NewReceiverForSubscription( topicName, subscriptionName, &azservicebus.ReceiverOptions{ ReceiveMode: azservicebus.PeekLock, }, ) exitOnError("Failed to create receiver", err)
Output:
type ClientOptions ¶
type ClientOptions struct { // TLSConfig configures a client with a custom *tls.Config. TLSConfig *tls.Config }
ClientOptions contains options for the `NewClient` and `NewClientWithConnectionString` functions.
type DeadLetterOptions ¶
type DeadLetterOptions struct { // ErrorDescription that caused the dead lettering of the message. ErrorDescription *string // Reason for dead lettering the message. Reason *string // PropertiesToModify specifies properties to modify in the message when it is dead lettered. PropertiesToModify map[string]interface{} }
DeadLetterOptions describe the reason and error description for dead lettering a message using the `Receiver.DeadLetterMessage()`
type Message ¶
type Message struct { ID string ContentType string CorrelationID string // Body corresponds to the first []byte array in the Data section of an AMQP message. Body []byte SessionID *string Subject string ReplyTo string ReplyToSessionID string To string TimeToLive *time.Duration PartitionKey *string TransactionPartitionKey *string ScheduledEnqueueTime *time.Time ApplicationProperties map[string]interface{} Format uint32 }
Message is a SendableMessage which can be sent using a Client.NewSender().
func (*Message) GetKeyValues ¶
GetKeyValues implements tab.Carrier
type MessageBatch ¶
type MessageBatch struct {
// contains filtered or unexported fields
}
MessageBatch represents a batch of messages to send to Service Bus in a single message
func (*MessageBatch) Add ¶
func (mb *MessageBatch) Add(m *Message) (bool, error)
Add adds a message to the batch if the message will not exceed the max size of the batch This function will return: (true, nil) if the message was added. (false, nil) if the message was too large to fit into the batch. (false, err) if an error occurs when adding the message.
func (*MessageBatch) Len ¶
func (mb *MessageBatch) Len() int
Len returns the # of messages in the batch.
func (*MessageBatch) Size ¶
func (mb *MessageBatch) Size() int
Size is the number of bytes in the message batch
type MessageBatchOptions ¶
type MessageBatchOptions struct { // MaxSizeInBytes overrides the max size (in bytes) for a batch. // By default NewMessageBatch will use the max message size provided by the service. MaxSizeInBytes int }
MessageBatchOptions contains options for the `Sender.NewMessageBatch` function.
type PeekOptions ¶
type PeekOptions struct { // FromSequenceNumber is the sequence number to start with when peeking messages. FromSequenceNumber *int64 }
PeekOptions contains options for the `Receiver.PeekMessages` function.
type Processor ¶
type Processor struct {
// contains filtered or unexported fields
}
Processor is a push-based receiver for Service Bus.
func (*Processor) AbandonMessage ¶
func (p *Processor) AbandonMessage(ctx context.Context, message *ReceivedMessage) error
AbandonMessage will cause a message to be returned to the queue or subscription. This will increment its delivery count, and potentially cause it to be dead lettered depending on your queue or subscription's configuration.
func (*Processor) Close ¶
Close will wait for any pending callbacks to complete. NOTE: Close() cannot be called synchronously in a message or error handler. You must run it asynchronously using `go processor.Close(ctx)` or similar.
Example ¶
err = processor.Close(context.TODO()) exitOnError("Processor failed to close", err)
Output:
func (*Processor) CompleteMessage ¶
func (p *Processor) CompleteMessage(ctx context.Context, message *ReceivedMessage) error
CompleteMessage completes a message, deleting it from the queue or subscription.
func (*Processor) DeadLetterMessage ¶
func (p *Processor) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error
DeadLetterMessage settles a message by moving it to the dead letter queue for a queue or subscription. To receive these messages create a processor with `Client.NewProcessorForQueue()` or `Client.NewProcessorForSubscription()` using the `ProcessorOptions.SubQueue` option.
func (*Processor) DeferMessage ¶
func (p *Processor) DeferMessage(ctx context.Context, message *ReceivedMessage) error
DeferMessage will cause a message to be deferred. Deferred messages can be received using `Receiver.ReceiveDeferredMessages`.
func (*Processor) Start ¶
func (p *Processor) Start(ctx context.Context, handleMessage func(message *ReceivedMessage) error, handleError func(err error)) error
Start will start receiving messages from the queue or subscription.
if err := processor.Start(context.TODO(), messageHandler, errorHandler); err != nil { log.Fatalf("Processor failed to start: %s", err.Error()) }
Any errors that occur (such as network disconnects, failures in handleMessage) will be sent to your handleError function. The processor will retry and restart as needed - no user intervention is required.
Example ¶
handleMessage := func(message *azservicebus.ReceivedMessage) error { // This is where your logic for handling messages goes yourLogicForProcessing(message) // 'AutoComplete' (enabled by default, and controlled by `ProcessorWithAutoComplete`) // will use this return value to determine how it should settle your message. // // Non-nil errors will cause your message to be Abandon()'d. // Nil errors will cause your message to be Complete'd. return nil } handleError := func(err error) { // handleError will be called on errors that are noteworthy // but the Processor internally will continue to attempt to // recover. // NOTE: errors returned from `handleMessage` above will also be // sent here, but do not affect the running of the Processor // itself. // We'll just print these out, as they're informational and // can indicate if there are longer lived problems that we might // want to resolve manually (for instance, longer term network // outages, or issues affecting your `handleMessage` handler) log.Printf("Error: %s", err.Error()) } err = processor.Start(context.TODO(), handleMessage, handleError) exitOnError("Failed to start Processor", err)
Output:
type ProcessorOptions ¶
type ProcessorOptions struct { // ReceiveMode controls when a message is deleted from Service Bus. // // `azservicebus.PeekLock` is the default. The message is locked, preventing multiple // receivers from processing the message at once. You control the lock state of the message // using one of the message settlement functions like processor.CompleteMessage(), which removes // it from Service Bus, or processor.AbandonMessage(), which makes it available again. // // `azservicebus.ReceiveAndDelete` causes Service Bus to remove the message as soon // as it's received. // // More information about receive modes: // https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations ReceiveMode ReceiveMode // SubQueue should be set to connect to the sub queue (ex: dead letter queue) // of the queue or subscription. SubQueue SubQueue // ManualComplete controls whether messages must be settled explicitly via the // settlement methods (ie, Complete, Abandon) or if the // processor will automatically settle messages. // // If true, no automatic settlement is done. // If false, the return value of your `handleMessage` function will control if the // message is abandoned (non-nil error return) or completed (nil error return). // // This option is enabled, by default. ManualComplete bool // MaxConcurrentCalls controls the maximum number of message processing // goroutines that are active at any time. // Default is 1. MaxConcurrentCalls int }
ProcessorOptions contains options for the `Client.NewProcessorForQueue` or `Client.NewProcessorForSubscription` functions.
type ReceiveMode ¶
type ReceiveMode = internal.ReceiveMode
ReceiveMode represents the lock style to use for a receiver - either `PeekLock` or `ReceiveAndDelete`
const ( // PeekLock will lock messages as they are received and can be settled // using the Receiver or Processor's (Complete|Abandon|DeadLetter|Defer)Message // functions. PeekLock ReceiveMode = internal.PeekLock // ReceiveAndDelete will delete messages as they are received. ReceiveAndDelete ReceiveMode = internal.ReceiveAndDelete )
type ReceiveOptions ¶
type ReceiveOptions struct { // MaxWaitTime configures how long to wait for the first // message in a set of messages to arrive. // Default: 60 seconds MaxWaitTime time.Duration // contains filtered or unexported fields }
ReceiveOptions are options for the ReceiveMessages function.
type ReceivedMessage ¶
type ReceivedMessage struct { Message LockToken [16]byte DeliveryCount uint32 LockedUntil *time.Time // `mapstructure:"x-opt-locked-until"` SequenceNumber *int64 // :"x-opt-sequence-number"` EnqueuedSequenceNumber *int64 // :"x-opt-enqueue-sequence-number"` EnqueuedTime *time.Time // :"x-opt-enqueued-time"` DeadLetterSource *string // :"x-opt-deadletter-source"` // contains filtered or unexported fields }
ReceivedMessage is a received message from a Client.NewReceiver() or Client.NewProcessor().
type Receiver ¶
type Receiver struct {
// contains filtered or unexported fields
}
Receiver receives messages using pull based functions (ReceiveMessages). For push-based receiving via callbacks look at the `Processor` type.
func (*Receiver) AbandonMessage ¶
func (r *Receiver) AbandonMessage(ctx context.Context, message *ReceivedMessage) error
AbandonMessage will cause a message to be returned to the queue or subscription. This will increment its delivery count, and potentially cause it to be dead lettered depending on your queue or subscription's configuration.
func (*Receiver) CompleteMessage ¶
func (r *Receiver) CompleteMessage(ctx context.Context, message *ReceivedMessage) error
CompleteMessage completes a message, deleting it from the queue or subscription.
func (*Receiver) DeadLetterMessage ¶
func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error
DeadLetterMessage settles a message by moving it to the dead letter queue for a queue or subscription. To receive these messages create a receiver with `Client.NewReceiverForQueue()` or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option.
func (*Receiver) DeferMessage ¶
func (r *Receiver) DeferMessage(ctx context.Context, message *ReceivedMessage) error
DeferMessage will cause a message to be deferred. Deferred messages can be received using `Receiver.ReceiveDeferredMessages`.
func (*Receiver) PeekMessages ¶
func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekOptions) ([]*ReceivedMessage, error)
PeekMessages will peek messages without locking or deleting messages. Messages that are peeked do not have lock tokens, so settlement methods like CompleteMessage, AbandonMessage, DeferMessage or DeadLetterMessage will not work with them.
func (*Receiver) ReceiveDeferredMessage ¶
func (r *Receiver) ReceiveDeferredMessage(ctx context.Context, sequenceNumber int64) (*ReceivedMessage, error)
ReceiveDeferredMessage receives a single message that was deferred using `Receiver.DeferMessage`.
func (*Receiver) ReceiveDeferredMessages ¶
func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64) ([]*ReceivedMessage, error)
ReceiveDeferredMessages receives messages that were deferred using `Receiver.DeferMessage`.
func (*Receiver) ReceiveMessage ¶
func (r *Receiver) ReceiveMessage(ctx context.Context, options *ReceiveOptions) (*ReceivedMessage, error)
ReceiveMessage receives a single message, waiting up to `ReceiveOptions.MaxWaitTime` (default: 60 seconds)
func (*Receiver) ReceiveMessages ¶
func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveOptions) ([]*ReceivedMessage, error)
ReceiveMessages receives a fixed number of messages, up to numMessages. There are two timeouts involved in receiving messages:
- An explicit timeout set with `ReceiveOptions.MaxWaitTime` (default: 60 seconds)
- An implicit timeout (default: 1 second) that starts after the first message has been received.
Example ¶
// Receive a fixed set of messages. Note that the number of messages // to receive and the amount of time to wait are upper bounds. messages, err = receiver.ReceiveMessages(context.TODO(), // The number of messages to receive. Note this is merely an upper // bound. It is possible to get fewer message (or zero), depending // on the contents of the remote queue or subscription and network // conditions. 1, &azservicebus.ReceiveOptions{ // This configures the amount of time to wait for messages to arrive. // Note that this is merely an upper bound. It is possible to get messages // faster than the duration specified. MaxWaitTime: 60 * time.Second, }, ) exitOnError("Failed to receive messages", err) for _, message := range messages { err = receiver.CompleteMessage(context.TODO(), message) fmt.Printf("Received and completed message\n") exitOnError("Failed to complete message", err) }
Output:
type ReceiverOptions ¶
type ReceiverOptions struct { // ReceiveMode controls when a message is deleted from Service Bus. // // `azservicebus.PeekLock` is the default. The message is locked, preventing multiple // receivers from processing the message at once. You control the lock state of the message // using one of the message settlement functions like processor.CompleteMessage(), which removes // it from Service Bus, or processor.AbandonMessage(), which makes it available again. // // `azservicebus.ReceiveAndDelete` causes Service Bus to remove the message as soon // as it's received. // // More information about receive modes: // https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations ReceiveMode ReceiveMode // SubQueue should be set to connect to the sub queue (ex: dead letter queue) // of the queue or subscription. SubQueue SubQueue }
ReceiverOptions contains options for the `Client.NewReceiverForQueue` or `Client.NewReceiverForSubscription` functions.
type SendableMessage ¶
type SendableMessage interface {
// contains filtered or unexported methods
}
SendableMessage are sendable using Sender.SendMessage. Message, MessageBatch implement this interface.
type Sender ¶
type Sender struct {
// contains filtered or unexported fields
}
Sender is used to send messages as well as schedule them to be delivered at a later date.
func (*Sender) NewMessageBatch ¶
func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptions) (*MessageBatch, error)
NewMessageBatch can be used to create a batch that contain multiple messages. Sending a batch of messages is more efficient than sending the messages one at a time.
func (*Sender) SendMessage ¶
func (s *Sender) SendMessage(ctx context.Context, message SendableMessage) error
SendMessage sends a message to a queue or topic. Message can be a MessageBatch (created using `Sender.CreateMessageBatch`) or a Message.
Example (Message) ¶
message := &azservicebus.Message{ Body: []byte("hello, this is a message"), } err = sender.SendMessage(context.TODO(), message) exitOnError("Failed to send message", err)
Output:
Example (MessageBatch) ¶
client, err := azservicebus.NewClientWithConnectionString(connectionString, nil) exitOnError("Failed to create client", err) sender, err := client.NewSender(queueName) exitOnError("Failed to create sender", err) batch, err := sender.NewMessageBatch(context.TODO(), nil) exitOnError("Failed to create message batch", err) messagesToSend := []*azservicebus.Message{ {Body: []byte("hello world")}, {Body: []byte("hello world as well")}, } for i := 0; i < len(messagesToSend); i++ { added, err := batch.Add(messagesToSend[i]) if added { continue } if err == nil { // At this point you can do a few things: // 1. Ignore this message // 2. Send this batch (it's full) and create a new batch. // // The batch can still be used after this error. log.Fatal("Failed to add message to batch (batch is full)") } exitOnError("Error while trying to add message to batch", err) } // now let's send the batch err = sender.SendMessage(context.TODO(), batch) exitOnError("Failed to send message batch", err)
Output:
type SubQueue ¶
type SubQueue int
SubQueue allows you to target a subqueue of a queue or subscription. Ex: the dead letter queue (SubQueueDeadLetter).
const ( // SubQueueNone means no sub queue. SubQueueNone SubQueue = 0 // SubQueueDeadLetter targets the dead letter queue for a queue or subscription. SubQueueDeadLetter SubQueue = 1 // SubQueueTransfer targets the transfer dead letter queue for a queue or subscription. SubQueueTransfer SubQueue = 2 )