servicebus

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 6, 2018 License: MIT Imports: 31 Imported by: 0

README

Microsoft Azure Service Bus Client for Golang

Go Report Card godoc Build Status Coverage Status

Microsoft Azure Service Bus is a reliable cloud messaging service (MaaS) which simplifies enterprise cloud messaging. It enables developers to build scalable cloud solutions and implement complex messaging workflows over an efficient binary protocol called AMQP.

This library provides a simple interface for sending, receiving and managing Service Bus entities such as Queues, Topics and Subscriptions.

For more information about Service Bus, check out the Azure documentation.

This library is a pure Golang implementation of Azure Service Bus over AMQP.

Preview of Service Bus for Golang

This library is currently a preview. There may be breaking interface changes until it reaches semantic version v1.0.0. If you run into an issue, please don't hesitate to log a new issue or open a pull request.

Installing the library

To more reliably manage dependencies in your application we recommend golang/dep.

With dep:

dep ensure -add github.com/Azure/azure-service-bus-go

With go get:

go get -u github.com/Azure/azure-service-bus-go/...

If you need to install Go, follow the official instructions

Using Service Bus

In this section we'll cover some basics of the library to help you get started.

Quick start

Let's send and receive "hello, world!".

package main

import (
	"context"
	"fmt"
	"os"
	"os/signal"

	"github.com/Azure/azure-service-bus-go"
)

func main() {
	connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
	if err != nil {
		// handle error
	}

	// Initialize and create a Service Bus Queue named helloworld if it doesn't exist
	queueName := "helloworld"
	q, err := ns.NewQueue(context.Background(), queueName)
	if err != nil {
		// handle queue creation error
	}

	// Send message to the Queue named helloworld
	err = q.Send(context.Background(), servicebus.NewMessageFromString("Hello World!"))
	if err != nil {
		// handle message send error
	}

	// Receive message from queue named helloworld
	listenHandle, err := q.Receive(context.Background(),
		func(ctx context.Context, msg *servicebus.Message) servicebus.DispositionAction {
			fmt.Println(string(msg.Data))
			return msg.Accept()
		})
	if err != nil {
		// handle queue listener creation err
	}
	// Close the listener handle for the Service Bus Queue
	defer listenHandle.Close(context.Background())

	// Wait for a signal to quit:
	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, os.Interrupt, os.Kill)
	<-signalChan
}

Examples

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Documentation

Index

Constants

View Source
const (

	// Version is the semantic version number
	Version = "0.1.0"

	// Megabytes is a helper for specifying MaxSizeInMegabytes and equals 1024, thus 5 GB is 5 * Megabytes
	Megabytes = 1024
)

Variables

This section is empty.

Functions

This section is empty.

Types

type BaseEntityDescription

type BaseEntityDescription struct {
	InstanceMetadataSchema *string `xml:"xmlns:i,attr,omitempty"`
	ServiceBusSchema       *string `xml:"xmlns,attr,omitempty"`
}

BaseEntityDescription provides common fields which are part of Queues, Topics and Subscriptions

type CountDetails

type CountDetails struct {
	XMLName                        xml.Name `xml:"CountDetails"`
	ActiveMessageCount             *int32   `xml:"ActiveMessageCount,omitempty"`
	DeadLetterMessageCount         *int32   `xml:"DeadLetterMessageCount,omitempty"`
	ScheduledMessageCount          *int32   `xml:"ScheduledMessageCount,omitempty"`
	TransferDeadLetterMessageCount *int32   `xml:"TransferDeadLetterMessageCount,omitempty"`
	TransferMessageCount           *int32   `xml:"TransferMessageCount,omitempty"`
}

CountDetails has current active (and other) messages for queue/topic.

type DispositionAction

type DispositionAction func(ctx context.Context)

DispositionAction represents the action to notify Azure Service Bus of the Message's disposition

type Handler

type Handler func(context.Context, *Message) DispositionAction

Handler is the function signature for any receiver of AMQP messages

type ListenerHandle

type ListenerHandle struct {
	// contains filtered or unexported fields
}

ListenerHandle provides the ability to close or listen to the close of a Receiver

func (*ListenerHandle) Close

func (lc *ListenerHandle) Close(ctx context.Context) error

Close will close the listener

func (*ListenerHandle) Done

func (lc *ListenerHandle) Done() <-chan struct{}

Done will close the channel when the listener has stopped

func (*ListenerHandle) Err

func (lc *ListenerHandle) Err() error

Err will return the last error encountered

type Message

type Message struct {
	ContentType      string
	CorrelationID    string
	Data             []byte
	DeliveryCount    uint32
	GroupID          *string
	GroupSequence    *uint32
	ID               string
	Label            string
	ReplyTo          string
	ReplyToGroupID   string
	To               string
	TTL              *time.Duration
	LockToken        *uuid.UUID
	SystemProperties *SystemProperties
	UserProperties   map[string]interface{}
	// contains filtered or unexported fields
}

Message is an Service Bus message to be sent or received

func NewMessage

func NewMessage(data []byte) *Message

NewMessage builds an Message from a slice of data

func NewMessageFromString

func NewMessageFromString(message string) *Message

NewMessageFromString builds an Message from a string message

func (*Message) Abandon

func (m *Message) Abandon() DispositionAction

Abandon will notify Azure Service Bus the message failed but should be re-queued for delivery.

func (*Message) Complete

func (m *Message) Complete() DispositionAction

Complete will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue

func (*Message) DeadLetter

func (m *Message) DeadLetter(err error) DispositionAction

DeadLetter will notify Azure Service Bus the message failed and should not re-queued

func (*Message) DeadLetterWithInfo

func (m *Message) DeadLetterWithInfo(err error, condition MessageErrorCondition, additionalData map[string]string) DispositionAction

DeadLetterWithInfo will notify Azure Service Bus the message failed and should not be re-queued with additional context

func (*Message) ForeachKey

func (m *Message) ForeachKey(handler func(key, val string) error) error

ForeachKey implements the opentracing.TextMapReader and gets properties on the event to be propagated from the message broker

func (*Message) Set

func (m *Message) Set(key, value string)

Set implements opentracing.TextMapWriter and sets properties on the event to be propagated to the message broker

type MessageErrorCondition

type MessageErrorCondition string

MessageErrorCondition represents a well-known collection of AMQP errors

const (
	ErrorInternalError         MessageErrorCondition = "amqp:internal-error"
	ErrorNotFound              MessageErrorCondition = "amqp:not-found"
	ErrorUnauthorizedAccess    MessageErrorCondition = "amqp:unauthorized-access"
	ErrorDecodeError           MessageErrorCondition = "amqp:decode-error"
	ErrorResourceLimitExceeded MessageErrorCondition = "amqp:resource-limit-exceeded"
	ErrorNotAllowed            MessageErrorCondition = "amqp:not-allowed"
	ErrorInvalidField          MessageErrorCondition = "amqp:invalid-field"
	ErrorNotImplemented        MessageErrorCondition = "amqp:not-implemented"
	ErrorResourceLocked        MessageErrorCondition = "amqp:resource-locked"
	ErrorPreconditionFailed    MessageErrorCondition = "amqp:precondition-failed"
	ErrorResourceDeleted       MessageErrorCondition = "amqp:resource-deleted"
	ErrorIllegalState          MessageErrorCondition = "amqp:illegal-state"
)

Error Conditions

type MessageWithContext

type MessageWithContext struct {
	*Message
	Ctx context.Context
}

MessageWithContext is a Service Bus message with its context which propagates the distributed trace information

func (*MessageWithContext) Abandon

func (m *MessageWithContext) Abandon()

Abandon will notify Azure Service Bus the message failed but should be re-queued for delivery.

func (*MessageWithContext) Complete

func (m *MessageWithContext) Complete()

Complete will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue

func (*MessageWithContext) DeadLetter

func (m *MessageWithContext) DeadLetter(err error)

DeadLetter will notify Azure Service Bus the message failed and should not re-queued

func (*MessageWithContext) DeadLetterWithInfo

func (m *MessageWithContext) DeadLetterWithInfo(err error, condition MessageErrorCondition, additionalData map[string]string)

DeadLetterWithInfo will notify Azure Service Bus the message failed and should not be re-queued with additional context

type Namespace

type Namespace struct {
	Name          string
	TokenProvider auth.TokenProvider
	Environment   azure.Environment
}

Namespace provides a simplified facade over the AMQP implementation of Azure Service Bus and is the entry point for using Queues, Topics and Subscriptions

func NewNamespace

func NewNamespace(opts ...NamespaceOption) (*Namespace, error)

NewNamespace creates a new namespace configured through NamespaceOption(s)

func (*Namespace) NewQueue

func (ns *Namespace) NewQueue(ctx context.Context, name string, opts ...QueueOption) (*Queue, error)

NewQueue creates a new Queue Sender / Receiver

func (*Namespace) NewQueueManager

func (ns *Namespace) NewQueueManager() *QueueManager

NewQueueManager creates a new QueueManager for a Service Bus Namespace

func (*Namespace) NewSubscriptionManager

func (ns *Namespace) NewSubscriptionManager(ctx context.Context, topicName string) (*SubscriptionManager, error)

NewSubscriptionManager creates a new SubscriptionManger for a Service Bus Namespace

func (*Namespace) NewTopic

func (ns *Namespace) NewTopic(ctx context.Context, name string, opts ...TopicOption) (*Topic, error)

NewTopic creates a new Topic Sender

func (*Namespace) NewTopicManager

func (ns *Namespace) NewTopicManager() *TopicManager

NewTopicManager creates a new TopicManager for a Service Bus Namespace

type NamespaceOption

type NamespaceOption func(h *Namespace) error

NamespaceOption provides structure for configuring a new Service Bus namespace

func NamespaceWithConnectionString

func NamespaceWithConnectionString(connStr string) NamespaceOption

NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents a Service Bus Queue entity, which offers First In, First Out (FIFO) message delivery to one or more competing consumers. That is, messages are typically expected to be received and processed by the receivers in the order in which they were added to the queue, and each message is received and processed by only one message consumer.

func (*Queue) Close

func (q *Queue) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*Queue) Receive

func (q *Queue) Receive(ctx context.Context, handler Handler) (*ListenerHandle, error)

Receive subscribes for messages sent to the Queue

func (*Queue) ReceiveOne

func (q *Queue) ReceiveOne(ctx context.Context) (*MessageWithContext, error)

ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.

func (*Queue) Send

func (q *Queue) Send(ctx context.Context, event *Message) error

Send sends messages to the Queue

type QueueDescription

type QueueDescription struct {
	XMLName xml.Name `xml:"QueueDescription"`
	BaseEntityDescription
	LockDuration                        *string                  `xml:"LockDuration,omitempty"`               // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
	MaxSizeInMegabytes                  *int32                   `xml:"MaxSizeInMegabytes,omitempty"`         // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024.
	RequiresDuplicateDetection          *bool                    `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection.
	RequiresSession                     *bool                    `xml:"RequiresSession,omitempty"`
	DefaultMessageTimeToLive            *string                  `xml:"DefaultMessageTimeToLive,omitempty"`            // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
	DeadLetteringOnMessageExpiration    *bool                    `xml:"DeadLetteringOnMessageExpiration,omitempty"`    // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires.
	DuplicateDetectionHistoryTimeWindow *string                  `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes.
	MaxDeliveryCount                    *int32                   `xml:"MaxDeliveryCount,omitempty"`                    // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10.
	EnableBatchedOperations             *bool                    `xml:"EnableBatchedOperations,omitempty"`             // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
	SizeInBytes                         *int64                   `xml:"SizeInBytes,omitempty"`                         // SizeInBytes - The size of the queue, in bytes.
	MessageCount                        *int64                   `xml:"MessageCount,omitempty"`                        // MessageCount - The number of messages in the queue.
	IsAnonymousAccessible               *bool                    `xml:"IsAnonymousAccessible,omitempty"`
	Status                              *servicebus.EntityStatus `xml:"Status,omitempty"`
	CreatedAt                           *date.Time               `xml:"CreatedAt,omitempty"`
	UpdatedAt                           *date.Time               `xml:"UpdatedAt,omitempty"`
	SupportOrdering                     *bool                    `xml:"SupportOrdering,omitempty"`
	AutoDeleteOnIdle                    *string                  `xml:"AutoDeleteOnIdle,omitempty"`
	EnablePartitioning                  *bool                    `xml:"EnablePartitioning,omitempty"`
	EnableExpress                       *bool                    `xml:"EnableExpress,omitempty"`
	CountDetails                        *CountDetails            `xml:"CountDetails,omitempty"`
}

QueueDescription is the content type for Queue management requests

type QueueEntity

type QueueEntity struct {
	*QueueDescription
	Name string
}

QueueEntity is the Azure Service Bus description of a Queue for management activities

type QueueManagementOption

type QueueManagementOption func(*QueueDescription) error

QueueManagementOption represents named configuration options for queue mutation

func QueueEntityWithAutoDeleteOnIdle

func QueueEntityWithAutoDeleteOnIdle(window *time.Duration) QueueManagementOption

QueueEntityWithAutoDeleteOnIdle configures the queue to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func QueueEntityWithDeadLetteringOnMessageExpiration

func QueueEntityWithDeadLetteringOnMessageExpiration() QueueManagementOption

QueueEntityWithDeadLetteringOnMessageExpiration will ensure the queue sends expired messages to the dead letter queue

func QueueEntityWithDuplicateDetection

func QueueEntityWithDuplicateDetection(window *time.Duration) QueueManagementOption

QueueEntityWithDuplicateDetection configures the queue to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.

func QueueEntityWithLockDuration

func QueueEntityWithLockDuration(window *time.Duration) QueueManagementOption

QueueEntityWithLockDuration configures the queue to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.

func QueueEntityWithMaxDeliveryCount

func QueueEntityWithMaxDeliveryCount(count int32) QueueManagementOption

QueueEntityWithMaxDeliveryCount configures the queue to have a maximum number of delivery attempts before dead-lettering the message

func QueueEntityWithMaxSizeInMegabytes

func QueueEntityWithMaxSizeInMegabytes(size int) QueueManagementOption

QueueEntityWithMaxSizeInMegabytes configures the maximum size of the queue in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the queue. Default is 1 MB (1 * 1024).

func QueueEntityWithMessageTimeToLive

func QueueEntityWithMessageTimeToLive(window *time.Duration) QueueManagementOption

QueueEntityWithMessageTimeToLive configures the queue to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func QueueEntityWithPartitioning

func QueueEntityWithPartitioning() QueueManagementOption

QueueEntityWithPartitioning ensure the created queue will be a partitioned queue. Partitioned queues offer increased storage and availability compared to non-partitioned queues with the trade-off of requiring the following to ensure FIFO message retrieval:

SessionId. If a message has the SessionId property set, then Service Bus uses the SessionId property as the partition key. This way, all messages that belong to the same session are assigned to the same fragment and handled by the same message broker. This allows Service Bus to guarantee message ordering as well as the consistency of session states.

PartitionKey. If a message has the PartitionKey property set but not the SessionId property, then Service Bus uses the PartitionKey property as the partition key. Use the PartitionKey property to send non-sessionful transactional messages. The partition key ensures that all messages that are sent within a transaction are handled by the same messaging broker.

MessageId. If the queue has the RequiresDuplicationDetection property set to true, then the MessageId property serves as the partition key if the SessionId or a PartitionKey properties are not set. This ensures that all copies of the same message are handled by the same message broker and, thus, allows Service Bus to detect and eliminate duplicate messages

func QueueEntityWithRequiredSessions

func QueueEntityWithRequiredSessions() QueueManagementOption

QueueEntityWithRequiredSessions will ensure the queue requires senders and receivers to have sessionIDs

type QueueManager

type QueueManager struct {
	// contains filtered or unexported fields
}

QueueManager provides CRUD functionality for Service Bus Queues

func (*QueueManager) Delete

func (qm *QueueManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Queue entity by name

func (QueueManager) Execute

func (em QueueManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader) (*http.Response, error)

Execute performs an HTTP request given a http method, path and body

func (*QueueManager) Get

func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, error)

Get fetches a Service Bus Queue entity by name

func (*QueueManager) List

func (qm *QueueManager) List(ctx context.Context) ([]*QueueEntity, error)

List fetches all of the queues for a Service Bus Namespace

func (QueueManager) Post

func (em QueueManager) Post(ctx context.Context, entityPath string, body []byte) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*QueueManager) Put

func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueManagementOption) (*QueueEntity, error)

Put creates or updates a Service Bus Queue

type QueueOption

type QueueOption func(*Queue) error

QueueOption represents named options for assisting Queue message handling

func QueueWithReceiveAndDelete

func QueueWithReceiveAndDelete() QueueOption

QueueWithReceiveAndDelete configures a queue to pop and delete messages off of the queue upon receiving the message. This differs from the default, PeekLock, where PeekLock receives a message, locks it for a period of time, then sends a disposition to the broker when the message has been processed.

type ReceiveMode

type ReceiveMode int

ReceiveMode represents the behavior when consuming a message from a queue

const (
	// PeekLockMode causes a receiver to peek at a message, lock it so no others can consume and have the queue wait for
	// the DispositionAction
	PeekLockMode ReceiveMode = 0
	// ReceiveAndDeleteMode causes a receiver to pop messages off of the queue without waiting for DispositionAction
	ReceiveAndDeleteMode ReceiveMode = 1
)

type SendOption

type SendOption func(event *Message) error

SendOption provides a way to customize a message on sending

type Subscription

type Subscription struct {
	Topic *Topic
	// contains filtered or unexported fields
}

Subscription represents a Service Bus Subscription entity which are used to receive topic messages. A topic subscription resembles a virtual queue that receives copies of the messages that are sent to the topic. Messages are received from a subscription identically to the way they are received from a queue.

func (*Subscription) Close

func (s *Subscription) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*Subscription) Receive

func (s *Subscription) Receive(ctx context.Context, handler Handler) (*ListenerHandle, error)

Receive subscribes for messages sent to the Subscription

func (*Subscription) ReceiveOne

func (s *Subscription) ReceiveOne(ctx context.Context) (*MessageWithContext, error)

ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.

type SubscriptionDescription

type SubscriptionDescription struct {
	XMLName xml.Name `xml:"SubscriptionDescription"`
	BaseEntityDescription
	LockDuration                              *string                  `xml:"LockDuration,omitempty"` // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
	RequiresSession                           *bool                    `xml:"RequiresSession,omitempty"`
	DefaultMessageTimeToLive                  *string                  `xml:"DefaultMessageTimeToLive,omitempty"`         // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
	DeadLetteringOnMessageExpiration          *bool                    `xml:"DeadLetteringOnMessageExpiration,omitempty"` // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires.
	DeadLetteringOnFilterEvaluationExceptions *bool                    `xml:"DeadLetteringOnFilterEvaluationExceptions,omitempty"`
	MessageCount                              *int64                   `xml:"MessageCount,omitempty"`            // MessageCount - The number of messages in the queue.
	MaxDeliveryCount                          *int32                   `xml:"MaxDeliveryCount,omitempty"`        // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10.
	EnableBatchedOperations                   *bool                    `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
	Status                                    *servicebus.EntityStatus `xml:"Status,omitempty"`
	CreatedAt                                 *date.Time               `xml:"CreatedAt,omitempty"`
	UpdatedAt                                 *date.Time               `xml:"UpdatedAt,omitempty"`
	AccessedAt                                *date.Time               `xml:"AccessedAt,omitempty"`
	AutoDeleteOnIdle                          *string                  `xml:"AutoDeleteOnIdle,omitempty"`
}

SubscriptionDescription is the content type for Subscription management requests

type SubscriptionEntity

type SubscriptionEntity struct {
	*SubscriptionDescription
	Name string
}

SubscriptionEntity is the Azure Service Bus description of a topic Subscription for management activities

type SubscriptionManagementOption

type SubscriptionManagementOption func(*SubscriptionDescription) error

SubscriptionManagementOption represents named options for assisting Subscription creation

func SubscriptionWithAutoDeleteOnIdle

func SubscriptionWithAutoDeleteOnIdle(window *time.Duration) SubscriptionManagementOption

SubscriptionWithAutoDeleteOnIdle configures the subscription to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func SubscriptionWithBatchedOperations

func SubscriptionWithBatchedOperations() SubscriptionManagementOption

SubscriptionWithBatchedOperations configures the subscription to batch server-side operations.

func SubscriptionWithDeadLetteringOnMessageExpiration

func SubscriptionWithDeadLetteringOnMessageExpiration() SubscriptionManagementOption

SubscriptionWithDeadLetteringOnMessageExpiration will ensure the Subscription sends expired messages to the dead letter queue

func SubscriptionWithLockDuration

func SubscriptionWithLockDuration(window *time.Duration) SubscriptionManagementOption

SubscriptionWithLockDuration configures the subscription to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.

func SubscriptionWithMessageTimeToLive

func SubscriptionWithMessageTimeToLive(window *time.Duration) SubscriptionManagementOption

SubscriptionWithMessageTimeToLive configures the subscription to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func SubscriptionWithRequiredSessions

func SubscriptionWithRequiredSessions() SubscriptionManagementOption

SubscriptionWithRequiredSessions will ensure the subscription requires senders and receivers to have sessionIDs

type SubscriptionManager

type SubscriptionManager struct {
	Topic *Topic
	// contains filtered or unexported fields
}

SubscriptionManager provides CRUD functionality for Service Bus Subscription

func (*SubscriptionManager) Delete

func (sm *SubscriptionManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Topic entity by name

func (SubscriptionManager) Execute

func (em SubscriptionManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader) (*http.Response, error)

Execute performs an HTTP request given a http method, path and body

func (*SubscriptionManager) Get

Get fetches a Service Bus Topic entity by name

func (*SubscriptionManager) List

List fetches all of the Topics for a Service Bus Namespace

func (SubscriptionManager) Post

func (em SubscriptionManager) Post(ctx context.Context, entityPath string, body []byte) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*SubscriptionManager) Put

Put creates or updates a Service Bus Topic

type SubscriptionOption

type SubscriptionOption func(*Subscription) error

SubscriptionOption configures the Subscription Azure Service Bus client

func SubscriptionWithReceiveAndDelete

func SubscriptionWithReceiveAndDelete() SubscriptionOption

SubscriptionWithReceiveAndDelete configures a subscription to pop and delete messages off of the queue upon receiving the message. This differs from the default, PeekLock, where PeekLock receives a message, locks it for a period of time, then sends a disposition to the broker when the message has been processed.

type SystemProperties

type SystemProperties struct {
	LockedUntil            *time.Time `mapstructure:"x-opt-locked-until"`
	SequenceNumber         *int64     `mapstructure:"x-opt-sequence-number"`
	PartitionID            *int16     `mapstructure:"x-opt-partition-id"`
	PartitionKey           *string    `mapstructure:"x-opt-partition-key"`
	EnqueuedTime           *time.Time `mapstructure:"x-opt-enqueued-time"`
	DeadLetterSource       *string    `mapstructure:"x-opt-deadletter-source"`
	ScheduledEnqueueTime   *time.Time `mapstructure:"x-opt-scheduled-enqueue-time"`
	EnqueuedSequenceNumber *int64     `mapstructure:"x-opt-enqueue-sequence-number"`
	ViaPartitionKey        *string    `mapstructure:"x-opt-via-partition-key"`
}

SystemProperties are used to store properties that are set by the system.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic in contrast to queues, in which each message is processed by a single consumer, topics and subscriptions provide a one-to-many form of communication, in a publish/subscribe pattern. Useful for scaling to very large numbers of recipients, each published message is made available to each subscription registered with the topic. Messages are sent to a topic and delivered to one or more associated subscriptions, depending on filter rules that can be set on a per-subscription basis. The subscriptions can use additional filters to restrict the messages that they want to receive. Messages are sent to a topic in the same way they are sent to a queue, but messages are not received from the topic directly. Instead, they are received from subscriptions. A topic subscription resembles a virtual queue that receives copies of the messages that are sent to the topic. Messages are received from a subscription identically to the way they are received from a queue.

func (*Topic) Close

func (t *Topic) Close(ctx context.Context) error

Close the underlying connection to Service Bus

func (*Topic) NewSubscription

func (t *Topic) NewSubscription(ctx context.Context, name string, opts ...SubscriptionOption) (*Subscription, error)

NewSubscription creates a new Topic Subscription client

func (*Topic) NewSubscriptionManager

func (t *Topic) NewSubscriptionManager() *SubscriptionManager

NewSubscriptionManager creates a new SubscriptionManager for a Service Bus Topic

func (*Topic) Send

func (t *Topic) Send(ctx context.Context, event *Message, opts ...SendOption) error

Send sends messages to the Topic

type TopicDescription

type TopicDescription struct {
	XMLName xml.Name `xml:"TopicDescription"`
	BaseEntityDescription
	DefaultMessageTimeToLive            *string                  `xml:"DefaultMessageTimeToLive,omitempty"`            // DefaultMessageTimeToLive - ISO 8601 default message time span to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself.
	MaxSizeInMegabytes                  *int32                   `xml:"MaxSizeInMegabytes,omitempty"`                  // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024.
	RequiresDuplicateDetection          *bool                    `xml:"RequiresDuplicateDetection,omitempty"`          // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection.
	DuplicateDetectionHistoryTimeWindow *string                  `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes.
	EnableBatchedOperations             *bool                    `xml:"EnableBatchedOperations,omitempty"`             // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled.
	SizeInBytes                         *int64                   `xml:"SizeInBytes,omitempty"`                         // SizeInBytes - The size of the queue, in bytes.
	FilteringMessagesBeforePublishing   *bool                    `xml:"FilteringMessagesBeforePublishing,omitempty"`
	IsAnonymousAccessible               *bool                    `xml:"IsAnonymousAccessible,omitempty"`
	Status                              *servicebus.EntityStatus `xml:"Status,omitempty"`
	CreatedAt                           *date.Time               `xml:"CreatedAt,omitempty"`
	UpdatedAt                           *date.Time               `xml:"UpdatedAt,omitempty"`
	SupportOrdering                     *bool                    `xml:"SupportOrdering,omitempty"`
	AutoDeleteOnIdle                    *string                  `xml:"AutoDeleteOnIdle,omitempty"`
	EnablePartitioning                  *bool                    `xml:"EnablePartitioning,omitempty"`
	EnableSubscriptionPartitioning      *bool                    `xml:"EnableSubscriptionPartitioning,omitempty"`
	EnableExpress                       *bool                    `xml:"EnableExpress,omitempty"`
	CountDetails                        *CountDetails            `xml:"CountDetails,omitempty"`
}

TopicDescription is the content type for Topic management requests

type TopicEntity

type TopicEntity struct {
	*TopicDescription
	Name string
}

TopicEntity is the Azure Service Bus description of a Topic for management activities

type TopicManagementOption

type TopicManagementOption func(*TopicDescription) error

TopicManagementOption represents named options for assisting Topic creation

func TopicWithAutoDeleteOnIdle

func TopicWithAutoDeleteOnIdle(window *time.Duration) TopicManagementOption

TopicWithAutoDeleteOnIdle configures the topic to automatically delete after the specified idle interval. The minimum duration is 5 minutes.

func TopicWithBatchedOperations

func TopicWithBatchedOperations() TopicManagementOption

TopicWithBatchedOperations configures the topic to batch server-side operations.

func TopicWithDuplicateDetection

func TopicWithDuplicateDetection(window *time.Duration) TopicManagementOption

TopicWithDuplicateDetection configures the topic to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.

func TopicWithExpress

func TopicWithExpress() TopicManagementOption

TopicWithExpress configures the topic to hold a message in memory temporarily before writing it to persistent storage.

func TopicWithMaxSizeInMegabytes

func TopicWithMaxSizeInMegabytes(size int) TopicManagementOption

TopicWithMaxSizeInMegabytes configures the maximum size of the topic in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the topic. Default is 1 MB (1 * 1024).

func TopicWithMessageTimeToLive

func TopicWithMessageTimeToLive(window *time.Duration) TopicManagementOption

TopicWithMessageTimeToLive configures the topic to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.

func TopicWithOrdering

func TopicWithOrdering() TopicManagementOption

TopicWithOrdering configures the topic to support ordering of messages.

func TopicWithPartitioning

func TopicWithPartitioning() TopicManagementOption

TopicWithPartitioning configures the topic to be partitioned across multiple message brokers.

type TopicManager

type TopicManager struct {
	// contains filtered or unexported fields
}

TopicManager provides CRUD functionality for Service Bus Topics

func (*TopicManager) Delete

func (tm *TopicManager) Delete(ctx context.Context, name string) error

Delete deletes a Service Bus Topic entity by name

func (TopicManager) Execute

func (em TopicManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader) (*http.Response, error)

Execute performs an HTTP request given a http method, path and body

func (*TopicManager) Get

func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, error)

Get fetches a Service Bus Topic entity by name

func (*TopicManager) List

func (tm *TopicManager) List(ctx context.Context) ([]*TopicEntity, error)

List fetches all of the Topics for a Service Bus Namespace

func (TopicManager) Post

func (em TopicManager) Post(ctx context.Context, entityPath string, body []byte) (*http.Response, error)

Post performs an HTTP POST for a given entity path and body

func (*TopicManager) Put

func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicManagementOption) (*TopicEntity, error)

Put creates or updates a Service Bus Topic

type TopicOption

type TopicOption func(*Topic) error

TopicOption represents named options for assisting Topic message handling

Directories

Path Synopsis
_examples
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL