azservicebus

package module
v0.3.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 12, 2022 License: MIT Imports: 16 Imported by: 50

README

Azure Service Bus Client Module for Go

Azure Service Bus is a highly-reliable cloud messaging service from Microsoft.

Use the client library github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus in your application to:

  • Send messages to an Azure Service Bus Queue or Topic
  • Receive messages from an Azure Service Bus Queue or Subscription

NOTE: This library is currently a preview. There may be breaking interface changes until it reaches semantic version v1.0.0.

Key links:

Getting started

Install the package

Install the Azure Service Bus client module for Go with go get:

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
Prerequisites
Authenticate the client

The Service Bus Client can be created using a Service Bus connection string or a credential from the Azure Identity package, like DefaultAzureCredential.

Using a Service Principal
import (
  "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
  "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
  // For more information about the DefaultAzureCredential:
  // https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#NewDefaultAzureCredential
  credential, err := azidentity.NewDefaultAzureCredential(nil)

  if err != nil {
    panic(err)
  }

  // The service principal specified by the credential needs to be added to the appropriate Service Bus roles for your
  // resource. More information about Service Bus roles can be found here:
  // https://docs.microsoft.com/azure/service-bus-messaging/service-bus-managed-service-identity#azure-built-in-roles-for-azure-service-bus
  client, err = azservicebus.NewClient("<ex: myservicebus.servicebus.windows.net>", credential, nil)

  if err != nil {
    panic(err)
  }
}
Using a connection string
import (
  "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

func main() {
  // See here for instructions on how to get a Service Bus connection string:
  // https://docs.microsoft.com/azure/service-bus-messaging/service-bus-quickstart-portal#get-the-connection-string
  client, err = azservicebus.NewClientFromConnectionString(connectionString, nil)

  if err != nil {
    panic(err)
  }
}

Key concepts

Once you've created a Client, you can interact with resources within a Service Bus Namespace:

  • Queues: Allows for sending and receiving messages. Often used for point-to-point communication.
  • Topics: As opposed to Queues, Topics are better suited to publish/subscribe scenarios. A topic can be sent to, but requires a subscription, of which there can be multiple in parallel, to consume from.
  • Subscriptions: The mechanism to consume from a Topic. Each subscription is independent, and receives a copy of each message sent to the topic. Rules and Filters can be used to tailor which messages are received by a specific subscription.

For more information about these resources, see What is Azure Service Bus?.

Using a Client you can do the following:

Please note that the Queues, Topics and Subscriptions should be created prior to using this library.

Examples

The following sections provide examples that cover common tasks using Azure Service Bus:

Send messages

Once you've created a Client you can create a Sender, which will allow you to send messages.

NOTE: Creating a azservicebus.Client is covered in the "Authenticate the client" section of the readme, using a Service Principal or a Service Bus connection string.

sender, err := client.NewSender("<queue or topic>", nil)

if err != nil {
  panic(err)
}

// send a single message
err = sender.SendMessage(context.TODO(), &azservicebus.Message{
  Body: []byte("hello world!"),
})

You can also send messages in batches, which can be more efficient than sending them individually

// Create a message batch. It will automatically be sized for the Service Bus
// Namespace's maximum message size.
messageBatch, err := sender.NewMessageBatch(context.TODO())

if err != nil {
  panic(err)
}

// Add a message to our message batch. This can be called multiple times.
err := messageBatch.Add(&azservicebus.Message{
    Body: []byte(fmt.Sprintf("hello world")),
})

if err == azservicebus.ErrMessageTooLarge {
  fmt.Printf("Message batch is full. We should send it and create a new one.\n")

  // send what we have since the batch is full
  err := sender.SendMessageBatch(context.TODO(), messageBatch)
  
  // Create a new batch, add this message and start again.
} else if err != nil {
  panic(err)
}
Receive messages

Once you've created a Client you can create a Receiver, which will allow you to receive messages.

NOTE: Creating a azservicebus.Client is covered in the "Authenticate the client" section of the readme, using a Service Principal or a Service Bus connection string.

receiver, err := client.NewReceiverForQueue(
  "<queue>",
  nil,
)
// or
// client.NewReceiverForSubscription("<topic>", "<subscription>")

// ReceiveMessages respects the passed in context, and will gracefully stop
// receiving when 'ctx' is cancelled.
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
defer cancel()

messages, err = receiver.ReceiveMessages(ctx,
  // The number of messages to receive. Note this is merely an upper
  // bound. It is possible to get fewer message (or zero), depending
  // on the contents of the remote queue or subscription and network
  // conditions.
  1,
  nil,
)

if err != nil {
  panic(err)
}

for _, message := range messages {
  // For more information about settling messages:
  // https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
  err = receiver.CompleteMessage(context.TODO(), message)

  if err != nil {
    panic(err)
  }

  fmt.Printf("Received and completed the message\n")
}
Dead letter queue

The dead letter queue is a sub-queue. Each queue or subscription has its own dead letter queue. Dead letter queues store messages that have been explicitly dead lettered using the Receiver.DeadLetterMessage function.

Opening a dead letter queue is just a configuration option when creating a Receiver.

NOTE: Creating a azservicebus.Client is covered in the "Authenticate the client" section of the readme, using a Service Principal or a Service Bus connection string.

deadLetterReceiver, err := client.NewReceiverForQueue("<queue>",
  &azservicebus.ReceiverOptions{
    SubQueue: azservicebus.SubQueueDeadLetter,
  })
// or 
// client.NewReceiverForSubscription("<topic>", "<subscription>", 
//   &azservicebus.ReceiverOptions{
//     SubQueue: azservicebus.SubQueueDeadLetter,
//   })

To see some example code for receiving messages using the Receiver see the "Receive messages" sample.

Next steps

Please take a look at the samples for detailed examples on how to use this library to send and receive messages to/from Service Bus Queues, Topics and Subscriptions.

Contributing

If you'd like to contribute to this library, please read the contributing guide to learn more about how to build and test the code.

Impressions

Documentation

Overview

Package azservicebus provides clients for sending and receiving messages with Azure Service Bus. NOTE: for creating and managing entities, use the `Client` in the `github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin` package.

Index

Examples

Constants

This section is empty.

Variables

View Source
var ErrMessageTooLarge = errors.New("the message could not be added because it is too large for the batch")

ErrMessageTooLarge is returned when a message cannot fit into a batch when using MessageBatch.Add()

Functions

This section is empty.

Types

type AbandonMessageOptions added in v0.3.0

type AbandonMessageOptions struct {
	// PropertiesToModify specifies properties to modify in the message when it is abandoned.
	PropertiesToModify map[string]interface{}
}

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client provides methods to create Sender and Receiver instances to send and receive messages from Service Bus.

func NewClient

func NewClient(fullyQualifiedNamespace string, credential azcore.TokenCredential, options *ClientOptions) (*Client, error)

NewClient creates a new Client for a Service Bus namespace, using a TokenCredential. A Client allows you create receivers (for queues or subscriptions) and senders (for queues and topics). fullyQualifiedNamespace is the Service Bus namespace name (ex: myservicebus.servicebus.windows.net) credential is one of the credentials in the `github.com/Azure/azure-sdk-for-go/sdk/azidentity` package.

Example
// NOTE: If you'd like to authenticate using a Service Bus connection string
// look at `NewClientFromConnectionString` instead.

// For more information about the DefaultAzureCredential:
// https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#NewDefaultAzureCredential
credential, err := azidentity.NewDefaultAzureCredential(nil)

if err != nil {
	panic(err)
}

client, err = azservicebus.NewClient("<ex: myservicebus.servicebus.windows.net>", credential, nil)

if err != nil {
	panic(err)
}
Output:

Example (UsingWebsockets)
// NOTE: If you'd like to authenticate via Azure Active Directory look at
// the `NewClient` function instead.
client, err = azservicebus.NewClientFromConnectionString(connectionString, &azservicebus.ClientOptions{
	NewWebSocketConn: func(ctx context.Context, args azservicebus.NewWebSocketConnArgs) (net.Conn, error) {
		opts := &websocket.DialOptions{Subprotocols: []string{"amqp"}}
		wssConn, _, err := websocket.Dial(ctx, args.Host, opts)

		if err != nil {
			return nil, err
		}

		return websocket.NetConn(context.Background(), wssConn, websocket.MessageBinary), nil
	},
})

if err != nil {
	panic(err)
}
Output:

func NewClientFromConnectionString added in v0.2.0

func NewClientFromConnectionString(connectionString string, options *ClientOptions) (*Client, error)

NewClientFromConnectionString creates a new Client for a Service Bus namespace using a connection string. A Client allows you create receivers (for queues or subscriptions) and senders (for queues and topics). connectionString is a Service Bus connection string for the namespace or for an entity.

Example
// NOTE: If you'd like to authenticate via Azure Active Directory look at
// the `NewClient` function instead.

client, err = azservicebus.NewClientFromConnectionString(connectionString, nil)

if err != nil {
	panic(err)
}
Output:

func (*Client) AcceptNextSessionForQueue added in v0.2.0

func (client *Client) AcceptNextSessionForQueue(ctx context.Context, queueName string, options *SessionReceiverOptions) (*SessionReceiver, error)

AcceptNextSessionForQueue accepts the next available session from a queue. NOTE: this receiver is initialized immediately, not lazily.

Example
sessionReceiver, err := client.AcceptNextSessionForQueue(context.TODO(), "exampleSessionQueue", nil)
exitOnError("Failed to create session receiver", err)

fmt.Printf("Session receiver was assigned session ID \"%s\"", sessionReceiver.SessionID())
Output:

func (*Client) AcceptNextSessionForSubscription added in v0.2.0

func (client *Client) AcceptNextSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, options *SessionReceiverOptions) (*SessionReceiver, error)

AcceptNextSessionForSubscription accepts the next available session from a subscription. NOTE: this receiver is initialized immediately, not lazily.

func (*Client) AcceptSessionForQueue added in v0.2.0

func (client *Client) AcceptSessionForQueue(ctx context.Context, queueName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error)

AcceptSessionForQueue accepts a session from a queue with a specific session ID. NOTE: this receiver is initialized immediately, not lazily.

Example
sessionReceiver, err := client.AcceptSessionForQueue(context.TODO(), "exampleSessionQueue", "Example Session ID", nil)
exitOnError("Failed to create session receiver", err)

// session receivers function the same as any other receiver
messages, err := sessionReceiver.ReceiveMessages(context.TODO(), 5, nil)
exitOnError("Failed to receive a message", err)

for _, message := range messages {
	err = sessionReceiver.CompleteMessage(context.TODO(), message)
	exitOnError("Failed to complete message", err)

	fmt.Printf("Received message from session ID \"%s\" and completed it", *message.SessionID)
}
Output:

func (*Client) AcceptSessionForSubscription added in v0.2.0

func (client *Client) AcceptSessionForSubscription(ctx context.Context, topicName string, subscriptionName string, sessionID string, options *SessionReceiverOptions) (*SessionReceiver, error)

AcceptSessionForSubscription accepts a session from a subscription with a specific session ID. NOTE: this receiver is initialized immediately, not lazily.

func (*Client) Close

func (client *Client) Close(ctx context.Context) error

Close closes the current connection Service Bus as well as any Senders or Receivers created using this client.

func (*Client) NewReceiverForQueue

func (client *Client) NewReceiverForQueue(queueName string, options *ReceiverOptions) (*Receiver, error)

NewReceiver creates a Receiver for a queue. A receiver allows you to receive messages.

Example
receiver, err = client.NewReceiverForQueue(
	"exampleQueue",
	&azservicebus.ReceiverOptions{
		ReceiveMode: azservicebus.ReceiveModePeekLock,
	},
)
exitOnError("Failed to create Receiver", err)
Output:

Example (DeadLetterQueue)
receiver, err = client.NewReceiverForQueue(
	"exampleQueue",
	&azservicebus.ReceiverOptions{
		ReceiveMode: azservicebus.ReceiveModePeekLock,
		SubQueue:    azservicebus.SubQueueDeadLetter,
	},
)
exitOnError("Failed to create Receiver for DeadLetterQueue", err)
Output:

func (*Client) NewReceiverForSubscription

func (client *Client) NewReceiverForSubscription(topicName string, subscriptionName string, options *ReceiverOptions) (*Receiver, error)

NewReceiver creates a Receiver for a subscription. A receiver allows you to receive messages.

Example
receiver, err = client.NewReceiverForSubscription(
	"exampleTopic",
	"exampleSubscription",
	&azservicebus.ReceiverOptions{
		ReceiveMode: azservicebus.ReceiveModePeekLock,
	},
)
exitOnError("Failed to create Receiver", err)
Output:

Example (DeadLetterQueue)
receiver, err = client.NewReceiverForSubscription(
	"exampleTopic",
	"exampleSubscription",
	&azservicebus.ReceiverOptions{
		ReceiveMode: azservicebus.ReceiveModePeekLock,
		SubQueue:    azservicebus.SubQueueDeadLetter,
	},
)
exitOnError("Failed to create Receiver for DeadLetterQueue", err)
Output:

func (*Client) NewSender

func (client *Client) NewSender(queueOrTopic string, options *NewSenderOptions) (*Sender, error)

NewSender creates a Sender, which allows you to send messages or schedule messages.

Example
sender, err = client.NewSender("<queue or topic>", nil)

if err != nil {
	panic(err)
}
Output:

type ClientOptions

type ClientOptions struct {
	// TLSConfig configures a client with a custom *tls.Config.
	TLSConfig *tls.Config

	// Application ID that will be passed to the namespace.
	ApplicationID string

	// NewWebSocketConn is a function that can create a net.Conn for use with websockets.
	// For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go.
	NewWebSocketConn func(ctx context.Context, args NewWebSocketConnArgs) (net.Conn, error)
}

ClientOptions contains options for the `NewClient` and `NewClientFromConnectionString` functions.

type DeadLetterOptions

type DeadLetterOptions struct {
	// ErrorDescription that caused the dead lettering of the message.
	ErrorDescription *string

	// Reason for dead lettering the message.
	Reason *string

	// PropertiesToModify specifies properties to modify in the message when it is dead lettered.
	PropertiesToModify map[string]interface{}
}

DeadLetterOptions describe the reason and error description for dead lettering a message using the `Receiver.DeadLetterMessage()`

type DeferMessageOptions added in v0.3.0

type DeferMessageOptions struct {
	// PropertiesToModify specifies properties to modify in the message when it is deferred
	PropertiesToModify map[string]interface{}
}

type Message

type Message struct {
	MessageID *string

	ContentType   *string
	CorrelationID *string
	// Body corresponds to the first []byte array in the Data section of an AMQP message.
	Body             []byte
	SessionID        *string
	Subject          *string
	ReplyTo          *string
	ReplyToSessionID *string
	To               *string
	TimeToLive       *time.Duration

	PartitionKey            *string
	TransactionPartitionKey *string
	ScheduledEnqueueTime    *time.Time

	ApplicationProperties map[string]interface{}
}

Message is a message with a body and commonly used properties.

type MessageBatch

type MessageBatch struct {
	// contains filtered or unexported fields
}

MessageBatch represents a batch of messages to send to Service Bus in a single message

func (*MessageBatch) AddMessage added in v0.3.0

func (mb *MessageBatch) AddMessage(m *Message) error

AddMessage adds a message to the batch if the message will not exceed the max size of the batch Returns: - ErrMessageTooLarge if the message cannot fit - a non-nil error for other failures - nil, otherwise

func (*MessageBatch) NumBytes added in v0.3.0

func (mb *MessageBatch) NumBytes() uint64

NumBytes is the number of bytes in the message batch

func (*MessageBatch) NumMessages added in v0.3.0

func (mb *MessageBatch) NumMessages() int32

NumMessages returns the # of messages in the batch.

type MessageBatchOptions

type MessageBatchOptions struct {
	// MaxBytes overrides the max size (in bytes) for a batch.
	// By default NewMessageBatch will use the max message size provided by the service.
	MaxBytes uint64
}

MessageBatchOptions contains options for the `Sender.NewMessageBatch` function.

type NewSenderOptions added in v0.3.0

type NewSenderOptions struct {
}

type NewWebSocketConnArgs added in v0.3.2

type NewWebSocketConnArgs = internal.NewWebSocketConnArgs

NewWebSocketConnArgs are passed to your web socket creation function (ClientOptions.NewWebSocketConn)

type PeekMessagesOptions added in v0.2.0

type PeekMessagesOptions struct {
	// FromSequenceNumber is the sequence number to start with when peeking messages.
	FromSequenceNumber *int64
}

PeekMessagesOptions contains options for the `Receiver.PeekMessages` function.

type ReceiveMessagesOptions added in v0.3.0

type ReceiveMessagesOptions struct {
}

ReceiveMessagesOptions are options for the ReceiveMessages function.

type ReceiveMode

type ReceiveMode = internal.ReceiveMode

ReceiveMode represents the lock style to use for a receiver - either `PeekLock` or `ReceiveAndDelete`

const (
	// ReceiveModePeekLock will lock messages as they are received and can be settled
	// using the Receiver's (Complete|Abandon|DeadLetter|Defer)Message
	// functions.
	ReceiveModePeekLock ReceiveMode = internal.PeekLock
	// ReceiveModeReceiveAndDelete will delete messages as they are received.
	ReceiveModeReceiveAndDelete ReceiveMode = internal.ReceiveAndDelete
)

type ReceivedMessage

type ReceivedMessage struct {
	MessageID string

	ContentType      *string
	CorrelationID    *string
	SessionID        *string
	Subject          *string
	ReplyTo          *string
	ReplyToSessionID *string
	To               *string

	TimeToLive *time.Duration

	PartitionKey            *string
	TransactionPartitionKey *string
	ScheduledEnqueueTime    *time.Time

	ApplicationProperties map[string]interface{}

	LockToken              [16]byte
	DeliveryCount          uint32
	LockedUntil            *time.Time
	SequenceNumber         *int64
	EnqueuedSequenceNumber *int64
	EnqueuedTime           *time.Time
	ExpiresAt              *time.Time

	DeadLetterErrorDescription *string
	DeadLetterReason           *string
	DeadLetterSource           *string
	// contains filtered or unexported fields
}

ReceivedMessage is a received message from a Client.NewReceiver().

func (*ReceivedMessage) Body added in v0.3.0

func (rm *ReceivedMessage) Body() ([]byte, error)

Body returns the body for this received message. If the body not compatible with ReceivedMessage this function will return an error.

type Receiver

type Receiver struct {
	// contains filtered or unexported fields
}

Receiver receives messages using pull based functions (ReceiveMessages).

func (*Receiver) AbandonMessage

func (r *Receiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error

AbandonMessage will cause a message to be returned to the queue or subscription. This will increment its delivery count, and potentially cause it to be dead lettered depending on your queue or subscription's configuration.

func (*Receiver) Close

func (r *Receiver) Close(ctx context.Context) error

Close permanently closes the receiver.

func (*Receiver) CompleteMessage

func (r *Receiver) CompleteMessage(ctx context.Context, message *ReceivedMessage) error

CompleteMessage completes a message, deleting it from the queue or subscription.

func (*Receiver) DeadLetterMessage

func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error

DeadLetterMessage settles a message by moving it to the dead letter queue for a queue or subscription. To receive these messages create a receiver with `Client.NewReceiverForQueue()` or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option.

func (*Receiver) DeferMessage

func (r *Receiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error

DeferMessage will cause a message to be deferred. Deferred messages can be received using `Receiver.ReceiveDeferredMessages`.

func (*Receiver) PeekMessages

func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)

PeekMessages will peek messages without locking or deleting messages. Messages that are peeked do not have lock tokens, so settlement methods like CompleteMessage, AbandonMessage, DeferMessage or DeadLetterMessage will not work with them.

func (*Receiver) ReceiveDeferredMessages

func (r *Receiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64) ([]*ReceivedMessage, error)

ReceiveDeferredMessages receives messages that were deferred using `Receiver.DeferMessage`.

func (*Receiver) ReceiveMessages

func (r *Receiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)

ReceiveMessages receives a fixed number of messages, up to numMessages. There are two ways to stop receiving messages:

  1. Cancelling the `ctx` parameter.
  2. An implicit timeout (default: 1 second) that starts after the first message has been received.
Example
// ReceiveMessages respects the passed in context, and will gracefully stop
// receiving when 'ctx' is cancelled.
ctx, cancel := context.WithTimeout(context.TODO(), 60*time.Second)
defer cancel()

messages, err = receiver.ReceiveMessages(ctx,
	// The number of messages to receive. Note this is merely an upper
	// bound. It is possible to get fewer message (or zero), depending
	// on the contents of the remote queue or subscription and network
	// conditions.
	1,
	nil,
)

if err != nil {
	panic(err)
}

for _, message := range messages {
	// For more information about settling messages:
	// https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
	err = receiver.CompleteMessage(context.TODO(), message)

	if err != nil {
		panic(err)
	}

	fmt.Printf("Received and completed the message\n")
}
Output:

func (*Receiver) RenewMessageLock added in v0.2.0

func (r *Receiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage) error

RenewLock renews the lock on a message, updating the `LockedUntil` field on `msg`.

type ReceiverOptions

type ReceiverOptions struct {
	// ReceiveMode controls when a message is deleted from Service Bus.
	//
	// `azservicebus.PeekLock` is the default. The message is locked, preventing multiple
	// receivers from processing the message at once. You control the lock state of the message
	// using one of the message settlement functions like Receiver.CompleteMessage(), which removes
	// it from Service Bus, or Receiver.AbandonMessage(), which makes it available again.
	//
	// `azservicebus.ReceiveAndDelete` causes Service Bus to remove the message as soon
	// as it's received.
	//
	// More information about receive modes:
	// https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
	ReceiveMode ReceiveMode

	// SubQueue should be set to connect to the sub queue (ex: dead letter queue)
	// of the queue or subscription.
	SubQueue SubQueue
}

ReceiverOptions contains options for the `Client.NewReceiverForQueue` or `Client.NewReceiverForSubscription` functions.

type Sender

type Sender struct {
	// contains filtered or unexported fields
}

Sender is used to send messages as well as schedule them to be delivered at a later date.

func (*Sender) CancelScheduledMessages added in v0.2.0

func (s *Sender) CancelScheduledMessages(ctx context.Context, sequenceNumber []int64) error

CancelScheduledMessages cancels multiple messages that were scheduled.

func (*Sender) Close

func (s *Sender) Close(ctx context.Context) error

Close permanently closes the Sender.

func (*Sender) NewMessageBatch

func (s *Sender) NewMessageBatch(ctx context.Context, options *MessageBatchOptions) (*MessageBatch, error)

NewMessageBatch can be used to create a batch that contain multiple messages. Sending a batch of messages is more efficient than sending the messages one at a time.

func (*Sender) ScheduleMessages added in v0.2.0

func (s *Sender) ScheduleMessages(ctx context.Context, messages []*Message, scheduledEnqueueTime time.Time) ([]int64, error)

ScheduleMessages schedules a slice of Messages to appear on Service Bus Queue/Subscription at a later time. Returns the sequence numbers of the messages that were scheduled. Messages that haven't been delivered can be cancelled using `Receiver.CancelScheduleMessage(s)`

Example
// there are two ways of scheduling messages:
// 1. Using the `Sender.ScheduleMessages()` function.
// 2. Setting the `Message.ScheduledEnqueueTime` field on a message.

// schedule the message to be delivered in an hour.
sequenceNumbers, err := sender.ScheduleMessages(context.TODO(),
	[]*azservicebus.Message{
		{Body: []byte("hello world")},
	}, time.Now().Add(time.Hour))
exitOnError("Failed to schedule messages", err)

err = sender.CancelScheduledMessages(context.TODO(), sequenceNumbers)
exitOnError("Failed to cancel scheduled messages", err)

// or you can set the `ScheduledEnqueueTime` field on a message when you send it
future := time.Now().Add(time.Hour)

err = sender.SendMessage(context.TODO(),
	&azservicebus.Message{
		Body: []byte("hello world"),
		// schedule the message to be delivered in an hour.
		ScheduledEnqueueTime: &future,
	})
exitOnError("Failed to schedule messages using SendMessage", err)
Output:

func (*Sender) SendMessage

func (s *Sender) SendMessage(ctx context.Context, message *Message) error

SendMessage sends a Message to a queue or topic.

Example (Message)
message := &azservicebus.Message{
	Body: []byte("hello, this is a message"),
}

err = sender.SendMessage(context.TODO(), message)
exitOnError("Failed to send message", err)
Output:

Example (MessageBatch)
batch, err := sender.NewMessageBatch(context.TODO(), nil)
exitOnError("Failed to create message batch", err)

// By calling AddMessage multiple times you can add multiple messages into a
// batch. This can help with message throughput, as you can send multiple
// messages in a single send.
err = batch.AddMessage(&azservicebus.Message{Body: []byte("hello world")})

if err != nil {
	switch err {
	case azservicebus.ErrMessageTooLarge:
		// At this point you can do a few things:
		// 1. Ignore this message
		// 2. Send this batch (it's full) and create a new batch.
		//
		// The batch can still be used after this error if you have
		// smaller messages you'd still like to add in.
		fmt.Printf("Failed to add message to batch\n")
	default:
		exitOnError("Error while trying to add message to batch", err)
	}
}

// After you add all the messages to the batch you send it using
// Sender.SendMessageBatch()
err = sender.SendMessageBatch(context.TODO(), batch)
exitOnError("Failed to send message batch", err)
Output:

func (*Sender) SendMessageBatch added in v0.2.0

func (s *Sender) SendMessageBatch(ctx context.Context, batch *MessageBatch) error

SendMessageBatch sends a MessageBatch to a queue or topic. Message batches can be created using `Sender.NewMessageBatch`.

type SessionReceiver added in v0.2.0

type SessionReceiver struct {
	// contains filtered or unexported fields
}

SessionReceiver is a Receiver that handles sessions.

func (*SessionReceiver) AbandonMessage added in v0.2.0

func (r *SessionReceiver) AbandonMessage(ctx context.Context, message *ReceivedMessage, options *AbandonMessageOptions) error

AbandonMessage will cause a message to be returned to the queue or subscription. This will increment its delivery count, and potentially cause it to be dead lettered depending on your queue or subscription's configuration.

func (*SessionReceiver) Close added in v0.2.0

func (r *SessionReceiver) Close(ctx context.Context) error

Close permanently closes the receiver.

func (*SessionReceiver) CompleteMessage added in v0.2.0

func (r *SessionReceiver) CompleteMessage(ctx context.Context, message *ReceivedMessage) error

CompleteMessage completes a message, deleting it from the queue or subscription.

func (*SessionReceiver) DeadLetterMessage added in v0.2.0

func (r *SessionReceiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessage, options *DeadLetterOptions) error

DeadLetterMessage settles a message by moving it to the dead letter queue for a queue or subscription. To receive these messages create a receiver with `Client.NewReceiverForQueue()` or `Client.NewReceiverForSubscription()` using the `ReceiverOptions.SubQueue` option.

func (*SessionReceiver) DeferMessage added in v0.2.0

func (r *SessionReceiver) DeferMessage(ctx context.Context, message *ReceivedMessage, options *DeferMessageOptions) error

DeferMessage will cause a message to be deferred. Deferred messages can be received using `Receiver.ReceiveDeferredMessages`.

func (*SessionReceiver) GetSessionState added in v0.2.0

func (sr *SessionReceiver) GetSessionState(ctx context.Context) ([]byte, error)

GetSessionState retrieves state associated with the session.

func (*SessionReceiver) LockedUntil added in v0.2.0

func (sr *SessionReceiver) LockedUntil() time.Time

LockedUntil is the time the lock on this session expires. The lock can be renewed using `SessionReceiver.RenewSessionLock`.

func (*SessionReceiver) PeekMessages added in v0.2.0

func (r *SessionReceiver) PeekMessages(ctx context.Context, maxMessageCount int, options *PeekMessagesOptions) ([]*ReceivedMessage, error)

PeekMessages will peek messages without locking or deleting messages. Messages that are peeked do not have lock tokens, so settlement methods like CompleteMessage, AbandonMessage, DeferMessage or DeadLetterMessage will not work with them.

func (*SessionReceiver) ReceiveDeferredMessages added in v0.2.0

func (r *SessionReceiver) ReceiveDeferredMessages(ctx context.Context, sequenceNumbers []int64) ([]*ReceivedMessage, error)

ReceiveDeferredMessages receives messages that were deferred using `Receiver.DeferMessage`.

func (*SessionReceiver) ReceiveMessages added in v0.2.0

func (r *SessionReceiver) ReceiveMessages(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error)

ReceiveMessages receives a fixed number of messages, up to numMessages. There are two ways to stop receiving messages:

  1. Cancelling the `ctx` parameter.
  2. An implicit timeout (default: 1 second) that starts after the first message has been received.

func (*SessionReceiver) RenewMessageLock added in v0.2.0

func (r *SessionReceiver) RenewMessageLock(ctx context.Context, msg *ReceivedMessage) error

RenewLock renews the lock on a message, updating the `LockedUntil` field on `msg`.

func (*SessionReceiver) RenewSessionLock added in v0.2.0

func (sr *SessionReceiver) RenewSessionLock(ctx context.Context) error

RenewSessionLock renews this session's lock. The new expiration time is available using `LockedUntil`.

func (*SessionReceiver) SessionID added in v0.2.0

func (sr *SessionReceiver) SessionID() string

SessionID is the session ID for this SessionReceiver.

func (*SessionReceiver) SetSessionState added in v0.2.0

func (sr *SessionReceiver) SetSessionState(ctx context.Context, state []byte) error

SetSessionState sets the state associated with the session.

type SessionReceiverOptions added in v0.2.0

type SessionReceiverOptions struct {
	// ReceiveMode controls when a message is deleted from Service Bus.
	//
	// `azservicebus.PeekLock` is the default. The message is locked, preventing multiple
	// receivers from processing the message at once. You control the lock state of the message
	// using one of the message settlement functions like SessionReceiver.CompleteMessage(), which removes
	// it from Service Bus, or SessionReceiver..AbandonMessage(), which makes it available again.
	//
	// `azservicebus.ReceiveAndDelete` causes Service Bus to remove the message as soon
	// as it's received.
	//
	// More information about receive modes:
	// https://docs.microsoft.com/azure/service-bus-messaging/message-transfers-locks-settlement#settling-receive-operations
	ReceiveMode ReceiveMode
}

SessionReceiverOptions contains options for the `Client.AcceptSessionForQueue/Subscription` or `Client.AcceptNextSessionForQueue/Subscription` functions.

type SubQueue

type SubQueue int

SubQueue allows you to target a subqueue of a queue or subscription. Ex: the dead letter queue (SubQueueDeadLetter).

const (
	// SubQueueDeadLetter targets the dead letter queue for a queue or subscription.
	SubQueueDeadLetter SubQueue = 1
	// SubQueueTransfer targets the transfer dead letter queue for a queue or subscription.
	SubQueueTransfer SubQueue = 2
)

Directories

Path Synopsis
Package admin provides `Client`, which can create and manage Queues, Topics and Subscriptions.
Package admin provides `Client`, which can create and manage Queues, Topics and Subscriptions.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL