azbus

package module
v0.0.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2024 License: MIT Imports: 12 Imported by: 0

Documentation

Overview

Package azbus provides a simple wrapper for Azure Service Bus.

Index

Constants

This section is empty.

Variables

View Source
var (
	// ErrQueueNotFound is returned when the queue is not found.
	ErrQueueNotFound = errors.New("queue not found")
	// ErrTopicNotFound is returned when the topic is not found.
	ErrTopicNotFound = errors.New("topic not found")
	// ErrSubscriptionNotFound is returned when the subscription is not found.
	ErrSubscriptionNotFound = errors.New("subscription not found")
	// ErrConflict is returned when the resource already exists.
	ErrConflict = errors.New("already exists")
	// ErrInvalidMessage is returned when the message is invalid.
	ErrInvalidMessage = errors.New("invalid message")
	// ErrInvalidSequenceNumber is returned when the sequence number is invalid.
	ErrInvalidSequenceNumber = errors.New("invalid sequence number")
	// ErrInvalidScheduleTime is returned when the schedule time is invalid.
	ErrInvalidScheduleTime = errors.New("invalid schedule time")
)

Functions

func GetFromNameRule

func GetFromNameRule(name string) *admin.RuleProperties

GetFromNameRule returns a rule that filters messages by the given name.

func GetToNameRule

func GetToNameRule(name string) *admin.RuleProperties

GetToNameRule returns a rule that filters messages by the given name.

func SetLog

func SetLog(l *zap.SugaredLogger)

SetLog sets the logger from outside the package.

func SetReceiveBatchSize added in v0.0.2

func SetReceiveBatchSize(size int)

SetReceiveBatchSize sets the batch size for receiving messages.

Types

type Attributes

type Attributes map[string]interface{}

Attributes represents the attributes of a message.

type Manager

type Manager struct {
	// contains filtered or unexported fields
}

Manager represents a admin client of Azure Service Bus.

func NewManager

func NewManager(connectionString string) (*Manager, error)

NewManager creates a new instance of Manager.

func (*Manager) CreateQueue

func (m *Manager) CreateQueue(queueName string) (*Queue, error)

CreateQueue creates a Queue with the given name if not exists, and returns the queue instance.

func (*Manager) CreateSubscription

func (m *Manager) CreateSubscription(topicName, subName string) (*Subscription, error)

CreateSubscription creates a Subscription with the given name if not exists, and returns the subscription instance.

func (*Manager) CreateSubscriptionWithRule

func (m *Manager) CreateSubscriptionWithRule(topicName, subName string, rule *admin.RuleProperties) (*Subscription, error)

CreateSubscriptionWithRule creates a Subscription with the given name and rule if not exists, and returns the subscription instance.

func (*Manager) CreateTopic

func (m *Manager) CreateTopic(topicName string) (*Topic, error)

CreateTopic creates a Topic with the given name if not exists, and returns the topic instance.

func (*Manager) DeleteQueue

func (m *Manager) DeleteQueue(queueName string) error

DeleteQueue deletes a Queue with the given name if exists.

func (*Manager) DeleteSubscription

func (m *Manager) DeleteSubscription(topicName, subName string) error

DeleteSubscription deletes a Subscription with the given name if exists.

func (*Manager) DeleteTopic

func (m *Manager) DeleteTopic(topicName string) error

DeleteTopic deletes a Topic with the given name if exists.

func (*Manager) SetLockDuration

func (m *Manager) SetLockDuration(s string)

SetLockDuration sets the default lock duration of the queue, it will be used when creating new queue or subscription.

func (*Manager) SetMaxSizeMB

func (m *Manager) SetMaxSizeMB(i int32)

SetMaxSizeMB sets the default max size in megabytes of the queue, it will be used when creating new queue or topic.

func (*Manager) SetMessageTimeToLive

func (m *Manager) SetMessageTimeToLive(s string)

SetMessageTimeToLive sets the default message time to live of the queue, it will be used when creating new queue, topic or subscription.

func (*Manager) String

func (m *Manager) String() string

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message represents a message received from Azure Service Bus.

func (*Message) GetAttributes

func (m *Message) GetAttributes() Attributes

GetAttributes returns the attributes of the message.

func (*Message) GetContent

func (m *Message) GetContent() string

GetContent returns the content of the message in string.

func (*Message) GetData

func (m *Message) GetData() []byte

GetData returns the data of the message in bytes.

func (*Message) GetID

func (m *Message) GetID() string

GetID returns the ID of the message.

func (*Message) GetTags

func (m *Message) GetTags() Tags

GetTags returns the local tags of the message.

func (*Message) SetTags

func (m *Message) SetTags(tag Tags)

SetTags sets the local tags of the message. It will not be sent to the server, and only used locally.

func (*Message) String

func (m *Message) String() string

String returns the string representation of the message.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue represents a instance of Azure Service Bus Queue.

func NewQueue

func NewQueue(connectionString, queueName string) (*Queue, error)

NewQueue creates a new instance of Queue.

func NewQueueWithPeekLockMode

func NewQueueWithPeekLockMode(connectionString, queueName string) (*Queue, error)

NewQueueWithPeekLockMode creates a new instance of Queue with PeekLock mode.

func NewQueueWithReceiveAndDeleteMode

func NewQueueWithReceiveAndDeleteMode(connectionString, queueName string) (*Queue, error)

NewQueueWithReceiveAndDeleteMode creates a new instance of Queue with ReceiveAndDelete mode.

func (*Queue) Abandon

func (q *Queue) Abandon(m *Message) error

Abandon marks a message as not taken, so it will be returned immediately.

func (*Queue) Close

func (q *Queue) Close() error

Close closes the sender and receiver of the queue.

func (*Queue) Complete

func (q *Queue) Complete(m *Message) error

Complete marks a message as completed, so it will be deleted from the queue.

func (*Queue) GetStatus

func (q *Queue) GetStatus() (*Status, error)

GetStatus returns the status of the queue.

func (*Queue) Receive

func (q *Queue) Receive() ([]*Message, error)

Receive returns at most 10 messages from the queue. It returns no error if no message is available. Don't call Receive() concurrently, or would be blocked by the mutex.

func (*Queue) Schedule

func (q *Queue) Schedule(data []byte, attrs Attributes, sched time.Time) (int64, error)

Schedule schedules a message to be enqueue at the given time, and returns the sequence number of the message.

func (*Queue) Send

func (q *Queue) Send(data []byte, attrs Attributes) error

Send sends a message to the queue immediately. If attrs is nil and data is empty, an error will be returned.

func (*Queue) String

func (q *Queue) String() string

func (*Queue) Take

func (q *Queue) Take(m *Message) error

Take marks a message as taken, so it will not be returned before the visibility timeout. It usually used to extend the time to process the message.

type Status

type Status struct {
	SizeInBytes                    int64 `json:"size_bytes,omitempty"`
	TotalMessageCount              int64 `json:"total_message,omitempty"`
	ActiveMessageCount             int32 `json:"active_message,omitempty"`
	DeadLetterMessageCount         int32 `json:"dead_letter_message,omitempty"`
	ScheduledMessageCount          int32 `json:"scheduled_message,omitempty"`
	SubscriptionCount              int32 `json:"subscription,omitempty"`
	TransferMessageCount           int32 `json:"transfer_message,omitempty"`
	TransferDeadLetterMessageCount int32 `json:"transfer_dead_letter_message,omitempty"`
}

Status represents the status of a queue or topic.

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription wraps the Azure Service Bus subscription of a topic.

func (*Subscription) Abandon

func (s *Subscription) Abandon(m *Message) error

Abandon marks a message as not taken, so it will be returned immediately.

func (*Subscription) Close

func (s *Subscription) Close() error

Close closes the receiver of the subscription.

func (*Subscription) Complete

func (s *Subscription) Complete(m *Message) error

Complete marks a message as completed, so it will be deleted from the queue.

func (*Subscription) GetStatus

func (s *Subscription) GetStatus() (*Status, error)

GetStatus returns the status of the subscription.

func (*Subscription) Receive

func (s *Subscription) Receive() ([]*Message, error)

Receive returns at most 10 messages from the queue. It returns no error if no message is available. Don't call Receive() concurrently, or would be blocked by the mutex.

func (*Subscription) String

func (s *Subscription) String() string

func (*Subscription) Take

func (s *Subscription) Take(m *Message) error

Take marks a message as taken, so it will not be returned before the visibility timeout. It usually used to extend the time to process the message.

type Tags

type Tags map[string]interface{}

Tags represents the tags of a message.

type Topic

type Topic struct {
	// contains filtered or unexported fields
}

Topic wraps the Azure Service Bus topic and its related subscriptions.

func NewTopic

func NewTopic(connectionString, topicName string) (*Topic, error)

NewTopic creates a new topic instance with the given connection string and topic name.

func (*Topic) Close

func (t *Topic) Close() error

Close closes the topic and its related subscriptions.

func (*Topic) GetStatus

func (t *Topic) GetStatus() (*Status, error)

GetStatus returns the status of the topic.

func (*Topic) NewSubscription

func (t *Topic) NewSubscription(subName string) (*Subscription, error)

NewSubscription returns the subscription with the given name of the topic. It returns error if the subscription does not exist.

func (*Topic) NewSubscriptionWithPeekLockMode

func (t *Topic) NewSubscriptionWithPeekLockMode(subName string) (*Subscription, error)

NewSubscriptionWithPeekLockMode returns the subscription with the given name of the topic with PeekLock mode. It returns error if the subscription does not exist.

func (*Topic) NewSubscriptionWithReceiveAndDeleteMode

func (t *Topic) NewSubscriptionWithReceiveAndDeleteMode(subName string) (*Subscription, error)

NewSubscriptionWithReceiveAndDeleteMode returns the subscription with the given name of the topic with ReceiveAndDelete mode. It returns error if the subscription does not exist.

func (*Topic) Schedule

func (t *Topic) Schedule(data []byte, attrs Attributes, sched time.Time) (int64, error)

Schedule schedules a message to be enqueue at the given time, and returns the sequence number of the message.

func (*Topic) Send

func (t *Topic) Send(data []byte, attrs Attributes) error

Send sends a message to the queue immediately. If attrs is nil and data is empty, an error will be returned.

func (*Topic) String

func (t *Topic) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL