Documentation ¶
Overview ¶
Example (AutoForward) ¶
package main import ( "context" "fmt" "os" "time" servicebus "github.com/Azure/azure-service-bus-go" ) type MessagePrinter struct{} func (mp MessagePrinter) Handle(ctx context.Context, msg *servicebus.Message) error { fmt.Println(string(msg.Data)) return msg.Complete(ctx) } func main() { ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println(err) return } qm := ns.NewQueueManager() target, err := ensureQueue(ctx, qm, "AutoForwardTargetQueue") if err != nil { fmt.Println(err) return } source, err := ensureQueue(ctx, qm, "AutoForwardSourceQueue", servicebus.QueueEntityWithAutoForward(target)) if err != nil { fmt.Println(err) return } sourceQueue, err := ns.NewQueue(source.Name) if err != nil { fmt.Println(err) return } defer func() { _ = sourceQueue.Close(ctx) }() if err := sourceQueue.Send(ctx, servicebus.NewMessageFromString("forward me to target!")); err != nil { fmt.Println(err) return } targetQueue, err := ns.NewQueue(target.Name) if err != nil { fmt.Println(err) return } defer func() { _ = targetQueue.Close(ctx) }() if err := targetQueue.ReceiveOne(ctx, MessagePrinter{}); err != nil { fmt.Println(err) return } } func ensureQueue(ctx context.Context, qm *servicebus.QueueManager, name string, opts ...servicebus.QueueManagementOption) (*servicebus.QueueEntity, error) { _, err := qm.Get(ctx, name) if err == nil { _ = qm.Delete(ctx, name) } qe, err := qm.Put(ctx, name, opts...) if err != nil { fmt.Println(err) return nil, err } return qe, nil }
Output: forward me to target!
Example (BatchingMessages) ¶
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println(err) return } qm := ns.NewQueueManager() qe, err := ensureQueue(ctx, qm, "MessageBatchingExample") if err != nil { fmt.Println(err) return } q, err := ns.NewQueue(qe.Name) if err != nil { fmt.Println(err) return } defer func() { _ = q.Close(ctx) }() msgs := make([]*servicebus.Message, 10) for i := 0; i < 10; i++ { msgs[i] = servicebus.NewMessageFromString(fmt.Sprintf("foo %d", i)) } batcher := servicebus.NewMessageBatchIterator(servicebus.StandardMaxMessageSizeInBytes, msgs...) if err := q.SendBatch(ctx, batcher); err != nil { fmt.Println(err) return } for i := 0; i < 10; i++ { err := q.ReceiveOne(ctx, MessagePrinter{}) if err != nil { fmt.Println(err) return } }
Output: foo 0 foo 1 foo 2 foo 3 foo 4 foo 5 foo 6 foo 7 foo 8 foo 9
Example (DeadletterQueues) ¶
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println(err) return } qm := ns.NewQueueManager() qe, err := ensureQueue(ctx, qm, "DeadletterExample") if err != nil { fmt.Println(err) return } q, err := ns.NewQueue(qe.Name) if err != nil { fmt.Println(err) return } defer func() { _ = q.Close(ctx) }() if err := q.Send(ctx, servicebus.NewMessageFromString("foo")); err != nil { fmt.Println(err) return } // Abandon the message 10 times simulating attempting to process the message 10 times. After the 10th time, the // message will be placed in the Deadletter Queue. for count := 0; count < 10; count++ { err = q.ReceiveOne(ctx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error { fmt.Printf("count: %d\n", count+1) return msg.Abandon(ctx) })) if err != nil { fmt.Println(err) return } } // receive one from the queue's deadletter queue. It should be the foo message. qdl := q.NewDeadLetter() if err := qdl.ReceiveOne(ctx, MessagePrinter{}); err != nil { fmt.Println(err) return } defer func() { _ = qdl.Close(ctx) }()
Output: count: 1 count: 2 count: 3 count: 4 count: 5 count: 6 count: 7 count: 8 count: 9 count: 10 foo
Example (DeferMessages) ¶
package main import ( "context" "encoding/json" "fmt" "math/rand" "os" "time" "github.com/Azure/azure-service-bus-go" ) type RecipeStep struct { Step int `json:"step,omitempty"` Title string `json:"title,omitempty"` } func main() { ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println(err) return } qm := ns.NewQueueManager() qe, err := ensureQueue(ctx, qm, "DeferExample") if err != nil { fmt.Println(err) return } q, err := ns.NewQueue(qe.Name) if err != nil { fmt.Println(err) return } defer func() { _ = q.Close(ctx) }() steps := []RecipeStep{ { Step: 1, Title: "Shop", }, { Step: 2, Title: "Unpack", }, { Step: 3, Title: "Prepare", }, { Step: 4, Title: "Cook", }, { Step: 5, Title: "Eat", }, } for _, step := range steps { go func(s RecipeStep) { j, err := json.Marshal(s) if err != nil { fmt.Println(err) return } msg := &servicebus.Message{ Data: j, ContentType: "application/json", Label: "RecipeStep", } // we shuffle the message order to introduce a random delay before each of the messages is sent to // simulate out of order sending time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond) if err := q.Send(ctx, msg); err != nil { fmt.Println(err) return } }(step) } sequenceByStepNumber := map[int]int64{} // collect and defer messages for i := 0; i < len(steps); i++ { err = q.ReceiveOne(ctx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error { var step RecipeStep if err := json.Unmarshal(msg.Data, &step); err != nil { return err } sequenceByStepNumber[step.Step] = *msg.SystemProperties.SequenceNumber return msg.Defer(ctx) })) if err != nil { fmt.Println(err) return } } for i := 0; i < len(steps); i++ { err := q.ReceiveDeferred(ctx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error { var step RecipeStep if err := json.Unmarshal(msg.Data, &step); err != nil { return err } fmt.Printf("step: %d, %s\n", step.Step, step.Title) return msg.Complete(ctx) }), sequenceByStepNumber[i+1]) if err != nil { fmt.Println(err) return } } }
Output: step: 1, Shop step: 2, Unpack step: 3, Prepare step: 4, Cook step: 5, Eat
Example (DuplicateMessageDetection) ¶
ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println(err) return } window := 30 * time.Second qm := ns.NewQueueManager() qe, err := ensureQueue(ctx, qm, "DuplicateDetectionExample", servicebus.QueueEntityWithDuplicateDetection(&window)) if err != nil { fmt.Println(err) return } q, err := ns.NewQueue(qe.Name) if err != nil { fmt.Println(err) return } defer func() { _ = q.Close(ctx) }() guid, err := uuid.NewV4() if err != nil { fmt.Println(err) return } msg := servicebus.NewMessageFromString("foo") msg.ID = guid.String() // send the message twice with the same ID for i := 0; i < 2; i++ { if err := q.Send(ctx, msg); err != nil { fmt.Println(err) return } } // there should be only 1 message received from the queue go func() { if err := q.Receive(ctx, MessagePrinter{}); err != nil { if err.Error() != "context canceled" { fmt.Println(err) return } } }() time.Sleep(2 * time.Second)
Output: foo
Example (MessageBrowse) ¶
package main import ( "context" "encoding/json" "fmt" "os" "time" "github.com/Azure/azure-service-bus-go" ) type ( Scientist struct { Surname string `json:"surname,omitempty"` FirstName string `json:"firstname,omitempty"` } ) func main() { ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println(err) return } qm := ns.NewQueueManager() qEntity, err := ensureQueue(ctx, qm, "MessageBrowseExample") if err != nil { fmt.Println(err) return } q, err := ns.NewQueue(qEntity.Name) if err != nil { fmt.Println(err) return } txRxCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() go sendMessages(txRxCtx, q) time.Sleep(1 * time.Second) // wait a second to ensure a message has landed in the queue go peekMessages(txRxCtx, q) <-txRxCtx.Done() // wait for the context to finish } func sendMessages(ctx context.Context, q *servicebus.Queue) { scientists := []Scientist{ { Surname: "Einstein", FirstName: "Albert", }, { Surname: "Heisenberg", FirstName: "Werner", }, { Surname: "Curie", FirstName: "Marie", }, { Surname: "Hawking", FirstName: "Steven", }, { Surname: "Newton", FirstName: "Isaac", }, { Surname: "Bohr", FirstName: "Niels", }, { Surname: "Faraday", FirstName: "Michael", }, { Surname: "Galilei", FirstName: "Galileo", }, { Surname: "Kepler", FirstName: "Johannes", }, { Surname: "Kopernikus", FirstName: "Nikolaus", }, } for _, scientist := range scientists { bits, err := json.Marshal(scientist) if err != nil { fmt.Println(err) return } ttl := 2 * time.Minute msg := servicebus.NewMessage(bits) msg.ContentType = "application/json" msg.TTL = &ttl if err := q.Send(ctx, msg); err != nil { fmt.Println(err) return } } } func peekMessages(ctx context.Context, q *servicebus.Queue) { var opts []servicebus.PeekOption for { select { case <-ctx.Done(): return default: msg, err := q.PeekOne(ctx, opts...) if err != nil { switch err.(type) { case servicebus.ErrNoMessages: // all done return default: fmt.Println(err) return } } var scientist Scientist if err := json.Unmarshal(msg.Data, &scientist); err != nil { fmt.Println(err) return } opts = []servicebus.PeekOption{servicebus.PeekFromSequenceNumber(*msg.SystemProperties.SequenceNumber)} fmt.Printf("Firstname: %s, Surname: %s\n", scientist.FirstName, scientist.Surname) } } }
Output: Firstname: Albert, Surname: Einstein Firstname: Werner, Surname: Heisenberg Firstname: Marie, Surname: Curie Firstname: Steven, Surname: Hawking Firstname: Isaac, Surname: Newton Firstname: Niels, Surname: Bohr Firstname: Michael, Surname: Faraday Firstname: Galileo, Surname: Galilei Firstname: Johannes, Surname: Kepler Firstname: Nikolaus, Surname: Kopernikus
Example (MessageSessions) ¶
package main import ( "context" "encoding/json" "fmt" "os" "time" "github.com/Azure/azure-service-bus-go" ) type StepSessionHandler struct { messageSession *servicebus.MessageSession } // Start is called when a new session is started func (ssh *StepSessionHandler) Start(ms *servicebus.MessageSession) error { ssh.messageSession = ms fmt.Println("Begin session: ", *ssh.messageSession.SessionID()) return nil } // Handle is called when a new session message is received func (ssh *StepSessionHandler) Handle(ctx context.Context, msg *servicebus.Message) error { var step RecipeStep if err := json.Unmarshal(msg.Data, &step); err != nil { fmt.Println(err) return err } fmt.Printf(" Step: %d, %s\n", step.Step, step.Title) if step.Step == 5 { ssh.messageSession.Close() } return msg.Complete(ctx) } // End is called when the message session is closed. Service Bus will not automatically end your message session. Be // sure to know when to terminate your own session. func (ssh *StepSessionHandler) End() { fmt.Println("End session: ", *ssh.messageSession.SessionID()) fmt.Println("") } func main() { ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println(err) return } // Create a Service Bus Queue with required sessions enabled. This will ensure that all messages sent and received // are bound to a session. qm := ns.NewQueueManager() qEntity, err := ensureQueue(ctx, qm, "MessageSessionsExample", servicebus.QueueEntityWithRequiredSessions()) if err != nil { fmt.Println(err) return } q, err := ns.NewQueue(qEntity.Name) if err != nil { fmt.Println(err) return } sessions := []string{"foo", "bar", "bazz", "buzz"} for _, session := range sessions { // send recipe steps // note that order is preserved within a given session sendSessionRecipeSteps(ctx, session, q) } // receive messages for each session // you can also call q.NewSession(nil) to receive from any available session for _, session := range sessions { queueSession := q.NewSession(&session) err := queueSession.ReceiveOne(ctx, new(StepSessionHandler)) if err != nil { fmt.Println(err) return } if err := queueSession.Close(ctx); err != nil { fmt.Println(err) return } } } func sendSessionRecipeSteps(ctx context.Context, sessionID string, q *servicebus.Queue) { steps := []RecipeStep{ { Step: 1, Title: "Shop", }, { Step: 2, Title: "Unpack", }, { Step: 3, Title: "Prepare", }, { Step: 4, Title: "Cook", }, { Step: 5, Title: "Eat", }, } for _, step := range steps { bits, err := json.Marshal(step) if err != nil { fmt.Println(err) return } msg := servicebus.NewMessage(bits) msg.ContentType = "application/json" msg.SessionID = &sessionID if err := q.Send(ctx, msg); err != nil { fmt.Println(err) return } } }
Output: Begin session: foo Step: 1, Shop Step: 2, Unpack Step: 3, Prepare Step: 4, Cook Step: 5, Eat End session: foo Begin session: bar Step: 1, Shop Step: 2, Unpack Step: 3, Prepare Step: 4, Cook Step: 5, Eat End session: bar Begin session: bazz Step: 1, Shop Step: 2, Unpack Step: 3, Prepare Step: 4, Cook Step: 5, Eat End session: bazz Begin session: buzz Step: 1, Shop Step: 2, Unpack Step: 3, Prepare Step: 4, Cook Step: 5, Eat End session: buzz
Example (Prefetch) ¶
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println(err) return } qm := ns.NewQueueManager() prefetch1, err := ensureQueue(ctx, qm, "Prefetch1Example") if err != nil { fmt.Println(err) return } prefetch1000, err := ensureQueue(ctx, qm, "Prefetch1000Example") if err != nil { fmt.Println(err) return } // sendAndReceive will send to the queue and read from the queue sendAndReceive := func(ctx context.Context, name string, opt servicebus.QueueOption) error { messageCount := 200 q, err := ns.NewQueue(name, opt, servicebus.QueueWithReceiveAndDelete()) if err != nil { return err } buffer := make([]byte, 1000) if _, err := rand.Read(buffer); err != nil { return err } for i := 0; i < messageCount; i++ { if err := q.Send(ctx, servicebus.NewMessage(buffer)); err != nil { return err } } innerCtx, cancel := context.WithCancel(ctx) count := 0 err = q.Receive(innerCtx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error { count++ if count == messageCount-1 { defer cancel() } return msg.Complete(ctx) })) if err != nil { if err.Error() != "context canceled" { return err } } return nil } // run send and receive concurrently and compare the times totalPrefetch1 := make(chan time.Duration) go func() { start := time.Now() if err := sendAndReceive(ctx, prefetch1.Name, servicebus.QueueWithPrefetchCount(1)); err != nil { fmt.Println(err) return } totalPrefetch1 <- time.Since(start) }() totalPrefetch1000 := make(chan time.Duration) go func() { start := time.Now() if err := sendAndReceive(ctx, prefetch1000.Name, servicebus.QueueWithPrefetchCount(1000)); err != nil { fmt.Println(err) return } totalPrefetch1000 <- time.Since(start) }() tp1 := <-totalPrefetch1 tp2 := <-totalPrefetch1000 if tp1 > tp2 { fmt.Println("prefetch of 1000 took less time!") }
Output: prefetch of 1000 took less time!
Example (PrioritySubscriptions) ¶
package main import ( "context" "fmt" "os" "strconv" "strings" "time" servicebus "github.com/Azure/azure-service-bus-go" ) type PrioritySubscription struct { Name string Expression string MessageCount int } type PriorityMessage struct { Body string Priority int } type PriorityPrinter struct { SubName string } func (pp PriorityPrinter) Handle(ctx context.Context, msg *servicebus.Message) error { i, ok := msg.UserProperties["Priority"].(int64) if !ok { fmt.Println("Priority is not an int64") } fmt.Println(strings.Join([]string{pp.SubName, string(msg.Data), strconv.Itoa(int(i))}, "_")) return msg.Complete(ctx) } func main() { ctx, cancel := context.WithTimeout(context.Background(), 40*time.Second) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println(err) return } // build the topic for sending priority messages tm := ns.NewTopicManager() topicEntity, err := ensureTopic(ctx, tm, "PrioritySubscriptionsTopic") if err != nil { fmt.Println(err) return } sm, err := ns.NewSubscriptionManager(topicEntity.Name) if err != nil { fmt.Println(err) return } // build each priority subscription providing each with a SQL like expression to filter messages from the topic prioritySubs := []PrioritySubscription{ { Name: "Priority1", Expression: "user.Priority=1", MessageCount: 1, }, { Name: "Priority2", Expression: "user.Priority=2", MessageCount: 1, }, { Name: "PriorityGreaterThan2", Expression: "user.Priority>2", MessageCount: 2, }, } for _, s := range prioritySubs { subEntity, err := ensureSubscription(ctx, sm, s.Name) if err != nil { fmt.Println(err) return } // remove the default rule, which is the "TrueFilter" that accepts all messages err = sm.DeleteRule(ctx, subEntity.Name, "$Default") if err != nil { fmt.Println(err) return } _, err = sm.PutRule(ctx, subEntity.Name, s.Name+"Rule", servicebus.SQLFilter{Expression: s.Expression}) if err != nil { fmt.Println(err) return } } priorityMessages := []PriorityMessage{ { Body: "foo", Priority: 1, }, { Body: "bar", Priority: 2, }, { Body: "bazz", Priority: 3, }, { Body: "buzz", Priority: 4, }, } topic, err := ns.NewTopic(topicEntity.Name) if err != nil { fmt.Println(err) return } defer func() { _ = topic.Close(ctx) }() for _, pMessage := range priorityMessages { msg := servicebus.NewMessageFromString(pMessage.Body) msg.UserProperties = map[string]interface{}{"Priority": pMessage.Priority} if err := topic.Send(ctx, msg); err != nil { fmt.Println(err) return } } for _, s := range prioritySubs { sub, err := topic.NewSubscription(s.Name) if err != nil { fmt.Println(err) return } for i := 0; i < s.MessageCount; i++ { err := sub.ReceiveOne(ctx, PriorityPrinter{SubName: sub.Name}) if err != nil { fmt.Println(err) return } } err = sub.Close(ctx) if err != nil { fmt.Println(err) return } } } func ensureTopic(ctx context.Context, tm *servicebus.TopicManager, name string, opts ...servicebus.TopicManagementOption) (*servicebus.TopicEntity, error) { _, err := tm.Get(ctx, name) if err == nil { _ = tm.Delete(ctx, name) } te, err := tm.Put(ctx, name, opts...) if err != nil { fmt.Println(err) return nil, err } return te, nil } func ensureSubscription(ctx context.Context, sm *servicebus.SubscriptionManager, name string, opts ...servicebus.SubscriptionManagementOption) (*servicebus.SubscriptionEntity, error) { _, err := sm.Get(ctx, name) if err == nil { _ = sm.Delete(ctx, name) } subEntity, err := sm.Put(ctx, name, opts...) if err != nil { fmt.Println(err) return nil, err } return subEntity, nil }
Output: Priority1_foo_1 Priority2_bar_2 PriorityGreaterThan2_bazz_3 PriorityGreaterThan2_buzz_4
Example (QueueSendAndReceive) ¶
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println(err) return } // Create a client to communicate with the queue. (The queue must have already been created, see `QueueManager`) q, err := ns.NewQueue("helloworld") if err != nil { fmt.Println("FATAL: ", err) return } err = q.Send(ctx, servicebus.NewMessageFromString("Hello, World!!!")) if err != nil { fmt.Println("FATAL: ", err) return } err = q.ReceiveOne( ctx, servicebus.HandlerFunc(func(ctx context.Context, message *servicebus.Message) error { fmt.Println(string(message.Data)) return message.Complete(ctx) })) if err != nil { fmt.Println("FATAL: ", err) return }
Output: Hello, World!!!
Example (ScheduledMessage) ¶
package main import ( "context" "fmt" "os" "time" "github.com/Azure/azure-service-bus-go" ) func main() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println("FATAL: ", err) return } // Create a client to communicate with the queue. (The queue must have already been created, see `QueueManager`) client, err := ns.NewQueue("scheduledmessages") if err != nil { fmt.Println("FATAL: ", err) return } // purge all of the existing messages in the queue purgeMessages(ns) // The delay that we should schedule a message for. const waitTime = 1 * time.Minute // Service Bus guarantees roughly a one minute window. So that our tests aren't flaky, we'll buffer our expectations // on either side. const buffer = 20 * time.Second expectedTime := time.Now().Add(waitTime) msg := servicebus.NewMessageFromString("to the future!!") msg.ScheduleAt(expectedTime) err = client.Send(ctx, msg) if err != nil { fmt.Println("FATAL: ", err) return } err = client.ReceiveOne( ctx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error { received := time.Now() if received.Before(expectedTime.Add(buffer)) && received.After(expectedTime.Add(-buffer)) { fmt.Println("Received when expected!") } else { fmt.Println("Received outside the expected window.") } return msg.Complete(ctx) })) if err != nil { fmt.Println("FATAL: ", err) return } } func purgeMessages(ns *servicebus.Namespace) { purgeCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) client, _ := ns.NewQueue("scheduledmessages") defer func() { _ = client.Close(purgeCtx) }() defer cancel() _ = client.Receive(purgeCtx, servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) error { return msg.Complete(ctx) })) }
Output: Received when expected!
Index ¶
- Constants
- func IsErrNotFound(err error) bool
- type ActionDescriber
- type ActionDescription
- type BaseEntityDescription
- type BatchDispositionError
- type BatchDispositionIterator
- type BatchIterator
- type BatchOptions
- type Closer
- type CorrelationFilter
- type CountDetails
- type DeadLetter
- type DeadLetterBuilder
- type DefaultRuleDescription
- type DispositionAction
- type DispositionError
- type Entity
- type EntityManagementAddresser
- type EntityStatus
- type ErrAMQP
- type ErrConnectionClosed
- type ErrIncorrectType
- type ErrMalformedMessage
- type ErrMissingField
- type ErrNoMessages
- type ErrNotFound
- type FalseFilter
- type FilterDescriber
- type FilterDescription
- type Handler
- type HandlerFunc
- type ListQueuesOption
- type ListQueuesOptions
- type ListSubscriptionsOption
- type ListSubscriptionsOptions
- type ListTopicsOption
- type ListTopicsOptions
- type ListenerHandle
- type MaxMessageSizeInBytes
- type Message
- func (m *Message) Abandon(ctx context.Context) error
- func (m *Message) AbandonAction() DispositionAction
- func (m *Message) Complete(ctx context.Context) error
- func (m *Message) CompleteAction() DispositionAction
- func (m *Message) DeadLetter(ctx context.Context, err error) error
- func (m *Message) DeadLetterAction(err error) DispositionAction
- func (m *Message) DeadLetterWithInfo(ctx context.Context, err error, condition MessageErrorCondition, ...) error
- func (m *Message) DeadLetterWithInfoAction(err error, condition MessageErrorCondition, additionalData map[string]string) DispositionAction
- func (m *Message) Defer(ctx context.Context) error
- func (m *Message) GetKeyValues() map[string]interface{}
- func (m *Message) ScheduleAt(t time.Time)
- func (m *Message) Set(key string, value interface{})
- type MessageBatch
- type MessageBatchIterator
- type MessageErrorCondition
- type MessageIterator
- type MessageSession
- func (ms *MessageSession) Close()
- func (ms *MessageSession) ListSessions(ctx context.Context) ([]byte, error)
- func (ms *MessageSession) LockedUntil() time.Time
- func (ms *MessageSession) RenewLock(ctx context.Context) error
- func (ms *MessageSession) SessionID() *string
- func (ms *MessageSession) SetState(ctx context.Context, state []byte) error
- func (ms *MessageSession) State(ctx context.Context) ([]byte, error)
- type MessageSliceIterator
- type MessageStatus
- type MiddlewareFunc
- type Namespace
- func (ns *Namespace) NewQueue(name string, opts ...QueueOption) (*Queue, error)
- func (ns *Namespace) NewQueueManager() *QueueManager
- func (ns *Namespace) NewReceiver(ctx context.Context, entityPath string, opts ...ReceiverOption) (*Receiver, error)
- func (ns *Namespace) NewSender(ctx context.Context, entityPath string, opts ...SenderOption) (*Sender, error)
- func (ns *Namespace) NewSubscriptionManager(topicName string) (*SubscriptionManager, error)
- func (ns *Namespace) NewTopic(name string, opts ...TopicOption) (*Topic, error)
- func (ns *Namespace) NewTopicManager() *TopicManager
- type NamespaceOption
- func NamespaceWithAzureEnvironment(namespaceName, environmentName string) NamespaceOption
- func NamespaceWithConnectionString(connStr string) NamespaceOption
- func NamespaceWithEnvironmentBinding(name string) NamespaceOption
- func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
- func NamespaceWithTokenProvider(provider auth.TokenProvider) NamespaceOption
- func NamespaceWithUserAgent(userAgent string) NamespaceOption
- func NamespaceWithWebSocket() NamespaceOption
- type PeekOption
- type Queue
- func (q *Queue) Close(ctx context.Context) error
- func (q *Queue) NewDeadLetter() *DeadLetter
- func (q *Queue) NewDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
- func (q *Queue) NewReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error)
- func (q *Queue) NewSender(ctx context.Context, opts ...SenderOption) (*Sender, error)
- func (q *Queue) NewSession(sessionID *string) *QueueSession
- func (q *Queue) NewTransferDeadLetter() *TransferDeadLetter
- func (q *Queue) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
- func (q *Queue) Receive(ctx context.Context, handler Handler) error
- func (q *Queue) ReceiveOne(ctx context.Context, handler Handler) error
- func (q *Queue) Send(ctx context.Context, msg *Message) error
- func (q *Queue) SendBatch(ctx context.Context, iterator BatchIterator) error
- type QueueDescription
- type QueueEntity
- type QueueManagementOption
- func QueueEntityWithAutoDeleteOnIdle(window *time.Duration) QueueManagementOption
- func QueueEntityWithAutoForward(target Targetable) QueueManagementOption
- func QueueEntityWithDeadLetteringOnMessageExpiration() QueueManagementOption
- func QueueEntityWithDuplicateDetection(window *time.Duration) QueueManagementOption
- func QueueEntityWithForwardDeadLetteredMessagesTo(target Targetable) QueueManagementOption
- func QueueEntityWithLockDuration(window *time.Duration) QueueManagementOption
- func QueueEntityWithMaxDeliveryCount(count int32) QueueManagementOption
- func QueueEntityWithMaxSizeInMegabytes(size int) QueueManagementOption
- func QueueEntityWithMessageTimeToLive(window *time.Duration) QueueManagementOption
- func QueueEntityWithPartitioning() QueueManagementOption
- func QueueEntityWithRequiredSessions() QueueManagementOption
- type QueueManager
- func (qm *QueueManager) Delete(ctx context.Context, name string) error
- func (em QueueManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, ...) (*http.Response, error)
- func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, error)
- func (qm *QueueManager) List(ctx context.Context, options ...ListQueuesOption) ([]*QueueEntity, error)
- func (em QueueManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
- func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueManagementOption) (*QueueEntity, error)
- func (em QueueManager) TokenProvider() auth.TokenProvider
- func (em QueueManager) Use(mw ...MiddlewareFunc)
- type QueueOption
- type QueueSession
- func (qs *QueueSession) Close(ctx context.Context) error
- func (qs *QueueSession) ManagementPath() string
- func (qs *QueueSession) ReceiveDeferred(ctx context.Context, handler Handler, mode ReceiveMode, ...) error
- func (qs *QueueSession) ReceiveOne(ctx context.Context, handler SessionHandler) error
- func (qs *QueueSession) Send(ctx context.Context, msg *Message) error
- func (qs *QueueSession) SessionID() *string
- type ReceiveBuilder
- type ReceiveMode
- type ReceiveOner
- type Receiver
- type ReceiverBuilder
- type ReceiverOption
- type RestHandler
- type RuleDescription
- type RuleEntity
- type SQLAction
- type SQLFilter
- type SendAndReceiveBuilder
- type SendOption
- type Sender
- type SenderBuilder
- type SenderOption
- type SessionHandler
- type Subscription
- func (s *Subscription) Close(ctx context.Context) error
- func (s *Subscription) NewDeadLetter() *DeadLetter
- func (s *Subscription) NewDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
- func (s *Subscription) NewReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error)
- func (s *Subscription) NewSession(sessionID *string) *SubscriptionSession
- func (s *Subscription) NewTransferDeadLetter() *TransferDeadLetter
- func (s *Subscription) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
- func (re Subscription) Peek(ctx context.Context, options ...PeekOption) (MessageIterator, error)
- func (re Subscription) PeekOne(ctx context.Context, options ...PeekOption) (*Message, error)
- func (s *Subscription) Receive(ctx context.Context, handler Handler) error
- func (re Subscription) ReceiveDeferred(ctx context.Context, handler Handler, sequenceNumbers ...int64) error
- func (re Subscription) ReceiveDeferredWithMode(ctx context.Context, handler Handler, mode ReceiveMode, ...) error
- func (s *Subscription) ReceiveOne(ctx context.Context, handler Handler) error
- func (re Subscription) RenewLocks(ctx context.Context, messages ...*Message) error
- func (re Subscription) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error
- type SubscriptionDescription
- type SubscriptionEntity
- type SubscriptionManagementOption
- func SubscriptionWithAutoDeleteOnIdle(window *time.Duration) SubscriptionManagementOption
- func SubscriptionWithAutoForward(target Targetable) SubscriptionManagementOption
- func SubscriptionWithBatchedOperations() SubscriptionManagementOption
- func SubscriptionWithDeadLetteringOnMessageExpiration() SubscriptionManagementOption
- func SubscriptionWithDefaultRuleDescription(filter FilterDescriber, name string) SubscriptionManagementOption
- func SubscriptionWithForwardDeadLetteredMessagesTo(target Targetable) SubscriptionManagementOption
- func SubscriptionWithLockDuration(window *time.Duration) SubscriptionManagementOption
- func SubscriptionWithMessageTimeToLive(window *time.Duration) SubscriptionManagementOption
- func SubscriptionWithRequiredSessions() SubscriptionManagementOption
- type SubscriptionManager
- func (sm *SubscriptionManager) Delete(ctx context.Context, name string) error
- func (sm *SubscriptionManager) DeleteRule(ctx context.Context, subscriptionName, ruleName string) error
- func (em SubscriptionManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, ...) (*http.Response, error)
- func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*SubscriptionEntity, error)
- func (sm *SubscriptionManager) List(ctx context.Context, options ...ListSubscriptionsOption) ([]*SubscriptionEntity, error)
- func (sm *SubscriptionManager) ListRules(ctx context.Context, subscriptionName string) ([]*RuleEntity, error)
- func (em SubscriptionManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
- func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...SubscriptionManagementOption) (*SubscriptionEntity, error)
- func (sm *SubscriptionManager) PutRule(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber) (*RuleEntity, error)
- func (sm *SubscriptionManager) PutRuleWithAction(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber, ...) (*RuleEntity, error)
- func (em SubscriptionManager) TokenProvider() auth.TokenProvider
- func (em SubscriptionManager) Use(mw ...MiddlewareFunc)
- type SubscriptionOption
- type SubscriptionSession
- func (ss *SubscriptionSession) Close(ctx context.Context) error
- func (ss *SubscriptionSession) ManagementPath() string
- func (ss *SubscriptionSession) ReceiveDeferred(ctx context.Context, handler Handler, mode ReceiveMode, ...) error
- func (ss *SubscriptionSession) ReceiveOne(ctx context.Context, handler SessionHandler) error
- func (ss *SubscriptionSession) SessionID() *string
- type SystemProperties
- type Targetable
- type Topic
- func (se Topic) CancelScheduled(ctx context.Context, seq ...int64) error
- func (t *Topic) Close(ctx context.Context) error
- func (t *Topic) NewSender(ctx context.Context, opts ...SenderOption) (*Sender, error)
- func (t *Topic) NewSession(sessionID *string) *TopicSession
- func (t *Topic) NewSubscription(name string, opts ...SubscriptionOption) (*Subscription, error)
- func (t *Topic) NewSubscriptionManager() *SubscriptionManager
- func (t *Topic) NewTransferDeadLetter() *TransferDeadLetter
- func (t *Topic) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
- func (se Topic) ScheduleAt(ctx context.Context, enqueueTime time.Time, messages ...*Message) ([]int64, error)
- func (t *Topic) Send(ctx context.Context, event *Message, opts ...SendOption) error
- func (t *Topic) SendBatch(ctx context.Context, iterator BatchIterator) error
- type TopicDescription
- type TopicEntity
- type TopicManagementOption
- func TopicWithAutoDeleteOnIdle(window *time.Duration) TopicManagementOption
- func TopicWithBatchedOperations() TopicManagementOption
- func TopicWithDuplicateDetection(window *time.Duration) TopicManagementOption
- func TopicWithExpress() TopicManagementOption
- func TopicWithMaxSizeInMegabytes(size int) TopicManagementOption
- func TopicWithMessageTimeToLive(window *time.Duration) TopicManagementOption
- func TopicWithOrdering() TopicManagementOption
- func TopicWithPartitioning() TopicManagementOption
- type TopicManager
- func (tm *TopicManager) Delete(ctx context.Context, name string) error
- func (em TopicManager) Execute(ctx context.Context, method string, entityPath string, body io.Reader, ...) (*http.Response, error)
- func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, error)
- func (tm *TopicManager) List(ctx context.Context, options ...ListTopicsOption) ([]*TopicEntity, error)
- func (em TopicManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
- func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicManagementOption) (*TopicEntity, error)
- func (em TopicManager) TokenProvider() auth.TokenProvider
- func (em TopicManager) Use(mw ...MiddlewareFunc)
- type TopicOption
- type TopicSession
- type TransferDeadLetter
- type TransferDeadLetterBuilder
- type TrueFilter
Examples ¶
- Package (AutoForward)
- Package (BatchingMessages)
- Package (DeadletterQueues)
- Package (DeferMessages)
- Package (DuplicateMessageDetection)
- Package (MessageBrowse)
- Package (MessageSessions)
- Package (Prefetch)
- Package (PrioritySubscriptions)
- Package (QueueSendAndReceive)
- Package (ScheduledMessage)
- MessageIterator
- NamespaceWithWebSocket
- Queue (GetOrBuildQueue)
- Queue (ScheduleAndCancelMessages)
- Queue (SessionsRoundTrip)
- Queue.Receive
- Queue.Receive (Second)
- Queue.Send
Constants ¶
const ( // PeekLockMode causes a Receiver to peek at a message, lock it so no others can consume and have the queue wait for // the DispositionAction PeekLockMode ReceiveMode = 0 // ReceiveAndDeleteMode causes a Receiver to pop messages off of the queue without waiting for DispositionAction ReceiveAndDeleteMode ReceiveMode = 1 // DeadLetterQueueName is the name of the dead letter queue to be appended to the entity path DeadLetterQueueName = "$DeadLetterQueue" // TransferDeadLetterQueueName is the name of the transfer dead letter queue which is appended to the entity name to // build the full address of the transfer dead letter queue. TransferDeadLetterQueueName = "$Transfer/" + DeadLetterQueueName )
const (
// Version is the semantic version number
Version = "0.10.16"
)
Variables ¶
This section is empty.
Functions ¶
func IsErrNotFound ¶ added in v0.3.0
IsErrNotFound returns true if the error argument is an ErrNotFound type
Types ¶
type ActionDescriber ¶ added in v0.2.0
type ActionDescriber interface {
ToActionDescription() ActionDescription
}
ActionDescriber can transform itself into a ActionDescription
type ActionDescription ¶ added in v0.2.0
type ActionDescription struct { Type string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"` SQLExpression string `xml:"SqlExpression"` RequiresPreprocessing bool `xml:"RequiresPreprocessing"` CompatibilityLevel int `xml:"CompatibilityLevel,omitempty"` }
ActionDescription describes an action upon a message that matches a filter
With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL UPDATE statement syntax. The action is performed on the message after it has been matched and before the message is selected into the subscription. The changes to the message properties are private to the message copied into the subscription.
type BaseEntityDescription ¶
type BaseEntityDescription struct { InstanceMetadataSchema *string `xml:"xmlns:i,attr,omitempty"` ServiceBusSchema *string `xml:"xmlns,attr,omitempty"` }
BaseEntityDescription provides common fields which are part of Queues, Topics and Subscriptions
type BatchDispositionError ¶ added in v0.7.0
type BatchDispositionError struct {
Errors []DispositionError
}
BatchDispositionError is an error which returns a collection of DispositionError.
func (BatchDispositionError) Error ¶ added in v0.7.0
func (bde BatchDispositionError) Error() string
type BatchDispositionIterator ¶ added in v0.3.0
type BatchDispositionIterator struct { LockTokenIDs []*uuid.UUID Status MessageStatus // contains filtered or unexported fields }
BatchDispositionIterator provides an iterator over LockTokenIDs
func (*BatchDispositionIterator) Done ¶ added in v0.3.0
func (bdi *BatchDispositionIterator) Done() bool
Done communicates whether there are more messages remaining to be iterated over.
func (*BatchDispositionIterator) Next ¶ added in v0.3.0
func (bdi *BatchDispositionIterator) Next() (uuid *uuid.UUID)
Next iterates to the next LockToken
type BatchIterator ¶ added in v0.3.0
type BatchIterator interface { Done() bool Next(messageID string, opts *BatchOptions) (*MessageBatch, error) }
BatchIterator offers a simple mechanism for batching a list of messages
type BatchOptions ¶ added in v0.3.0
type BatchOptions struct {
SessionID *string
}
BatchOptions are optional information to add to a batch of messages
type CorrelationFilter ¶ added in v0.2.0
type CorrelationFilter struct { CorrelationID *string `xml:"CorrelationId,omitempty"` MessageID *string `xml:"MessageId,omitempty"` To *string `xml:"To,omitempty"` ReplyTo *string `xml:"ReplyTo,omitempty"` Label *string `xml:"Label,omitempty"` SessionID *string `xml:"SessionId,omitempty"` ReplyToSessionID *string `xml:"ReplyToSessionId,omitempty"` ContentType *string `xml:"ContentType,omitempty"` Properties map[string]interface{} `xml:"Properties,omitempty"` }
CorrelationFilter holds a set of conditions that are matched against one or more of an arriving message's user and system properties. A common use is to match against the CorrelationId property, but the application can also choose to match against ContentType, Label, MessageId, ReplyTo, ReplyToSessionId, SessionId, To, and any user-defined properties. A match exists when an arriving message's value for a property is equal to the value specified in the correlation filter. For string expressions, the comparison is case-sensitive. When specifying multiple match properties, the filter combines them as a logical AND condition, meaning for the filter to match, all conditions must match.
func (CorrelationFilter) ToFilterDescription ¶ added in v0.2.0
func (cf CorrelationFilter) ToFilterDescription() FilterDescription
ToFilterDescription will transform the CorrelationFilter into a FilterDescription
type CountDetails ¶
type CountDetails struct { XMLName xml.Name `xml:"CountDetails"` ActiveMessageCount *int32 `xml:"ActiveMessageCount,omitempty"` DeadLetterMessageCount *int32 `xml:"DeadLetterMessageCount,omitempty"` ScheduledMessageCount *int32 `xml:"ScheduledMessageCount,omitempty"` TransferDeadLetterMessageCount *int32 `xml:"TransferDeadLetterMessageCount,omitempty"` TransferMessageCount *int32 `xml:"TransferMessageCount,omitempty"` }
CountDetails has current active (and other) messages for queue/topic.
type DeadLetter ¶ added in v0.2.0
type DeadLetter struct {
// contains filtered or unexported fields
}
DeadLetter represents a dead letter queue in Azure Service Bus.
Azure Service Bus queues, topics and subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ). The dead-letter queue does not need to be explicitly created and cannot be deleted or otherwise managed independent of the main entity.
The purpose of the dead-letter queue is to hold messages that cannot be delivered to any receiver, or messages that could not be processed. Messages can then be removed from the DLQ and inspected. An application might, with help of an operator, correct issues and resubmit the message, log the fact that there was an error, and take corrective action.
From an API and protocol perspective, the DLQ is mostly similar to any other queue, except that messages can only be submitted via the dead-letter operation of the parent entity. In addition, time-to-live is not observed, and you can't dead-letter a message from a DLQ. The dead-letter queue fully supports peek-lock delivery and transactional operations.
Note that there is no automatic cleanup of the DLQ. Messages remain in the DLQ until you explicitly retrieve them from the DLQ and call Complete() on the dead-letter message.
func NewDeadLetter ¶ added in v0.2.0
func NewDeadLetter(builder DeadLetterBuilder) *DeadLetter
NewDeadLetter constructs an instance of DeadLetter which represents a dead letter queue in Azure Service Bus
func (*DeadLetter) Close ¶ added in v0.2.0
func (dl *DeadLetter) Close(ctx context.Context) error
Close the underlying connection to Service Bus
func (*DeadLetter) ReceiveOne ¶ added in v0.2.0
func (dl *DeadLetter) ReceiveOne(ctx context.Context, handler Handler) error
ReceiveOne will receive one message from the dead letter queue
type DeadLetterBuilder ¶ added in v0.2.0
type DeadLetterBuilder interface {
NewDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
}
DeadLetterBuilder provides the ability to create a new receiver addressed to a given entity's dead letter queue.
type DefaultRuleDescription ¶ added in v0.10.3
type DefaultRuleDescription struct { XMLName xml.Name `xml:"DefaultRuleDescription"` Filter FilterDescription `xml:"Filter"` Name *string `xml:"Name,omitempty"` }
DefaultRuleDescription is the content type for Subscription Rule management requests
type DispositionAction ¶
DispositionAction represents the action to notify Azure Service Bus of the Message's disposition
type DispositionError ¶ added in v0.7.0
DispositionError is an error associated with a LockTokenID.
func (DispositionError) Error ¶ added in v0.7.0
func (de DispositionError) Error() string
func (DispositionError) UnWrap ¶ added in v0.7.0
func (de DispositionError) UnWrap() error
UnWrap will return the private error.
type Entity ¶ added in v0.2.0
Entity is represents the most basic form of an Azure Service Bus entity.
type EntityManagementAddresser ¶ added in v0.2.0
type EntityManagementAddresser interface {
ManagementPath() string
}
EntityManagementAddresser describes the ability of an entity to provide an addressable path to it's management endpoint
type EntityStatus ¶ added in v0.2.0
type EntityStatus string
EntityStatus enumerates the values for entity status.
const ( // Active ... Active EntityStatus = "Active" // Creating ... Creating EntityStatus = "Creating" // Deleting ... Deleting EntityStatus = "Deleting" // Disabled ... Disabled EntityStatus = "Disabled" // ReceiveDisabled ... ReceiveDisabled EntityStatus = "ReceiveDisabled" // Renaming ... Renaming EntityStatus = "Renaming" // Restoring ... Restoring EntityStatus = "Restoring" // SendDisabled ... SendDisabled EntityStatus = "SendDisabled" // Unknown ... Unknown EntityStatus = "Unknown" )
type ErrAMQP ¶ added in v0.2.0
ErrAMQP indicates that the server communicated an AMQP error with a particular
type ErrConnectionClosed ¶ added in v0.10.1
type ErrConnectionClosed string
ErrConnectionClosed indicates that the connection has been closed.
func (ErrConnectionClosed) Error ¶ added in v0.10.1
func (e ErrConnectionClosed) Error() string
type ErrIncorrectType ¶ added in v0.2.0
ErrIncorrectType indicates that type assertion failed. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func (ErrIncorrectType) Error ¶ added in v0.2.0
func (e ErrIncorrectType) Error() string
type ErrMalformedMessage ¶ added in v0.2.0
type ErrMalformedMessage string
ErrMalformedMessage indicates that a message was expected in the form of []byte was not a []byte. This is likely a bug and should be reported.
func (ErrMalformedMessage) Error ¶ added in v0.2.0
func (e ErrMalformedMessage) Error() string
type ErrMissingField ¶ added in v0.2.0
type ErrMissingField string
ErrMissingField indicates that an expected property was missing from an AMQP message. This should only be encountered when there is an error with this library, or the server has altered its behavior unexpectedly.
func (ErrMissingField) Error ¶ added in v0.2.0
func (e ErrMissingField) Error() string
type ErrNoMessages ¶ added in v0.2.0
type ErrNoMessages struct{}
ErrNoMessages is returned when an operation returned no messages. It is not indicative that there will not be more messages in the future.
func (ErrNoMessages) Error ¶ added in v0.2.0
func (e ErrNoMessages) Error() string
type ErrNotFound ¶ added in v0.3.0
type ErrNotFound struct {
EntityPath string
}
ErrNotFound is returned when an entity is not found (404)
func (ErrNotFound) Error ¶ added in v0.3.0
func (e ErrNotFound) Error() string
type FalseFilter ¶ added in v0.2.0
type FalseFilter struct{}
FalseFilter represents a always false sql expression which will deny all messages
func (FalseFilter) ToFilterDescription ¶ added in v0.2.0
func (ff FalseFilter) ToFilterDescription() FilterDescription
ToFilterDescription will transform the FalseFilter into a FilterDescription
type FilterDescriber ¶ added in v0.2.0
type FilterDescriber interface {
ToFilterDescription() FilterDescription
}
FilterDescriber can transform itself into a FilterDescription
type FilterDescription ¶ added in v0.2.0
type FilterDescription struct { XMLName xml.Name `xml:"Filter"` CorrelationFilter Type string `xml:"http://www.w3.org/2001/XMLSchema-instance type,attr"` SQLExpression *string `xml:"SqlExpression,omitempty"` CompatibilityLevel int `xml:"CompatibilityLevel,omitempty"` }
FilterDescription describes a filter which can be applied to a subscription to filter messages from the topic.
Subscribers can define which messages they want to receive from a topic. These messages are specified in the form of one or more named subscription rules. Each rule consists of a condition that selects particular messages and an action that annotates the selected message. For each matching rule condition, the subscription produces a copy of the message, which may be differently annotated for each matching rule.
Each newly created topic subscription has an initial default subscription rule. If you don't explicitly specify a filter condition for the rule, the applied filter is the true filter that enables all messages to be selected into the subscription. The default rule has no associated annotation action.
type HandlerFunc ¶ added in v0.2.0
HandlerFunc is a type converter that allows a func to be used as a `Handler`
type ListQueuesOption ¶ added in v0.10.14
type ListQueuesOption func(*ListQueuesOptions) error
ListQueuesOption represents named options for listing topics
func ListQueuesWithSkip ¶ added in v0.10.14
func ListQueuesWithSkip(skip int) ListQueuesOption
ListQueuesWithSkip will skip the specified number of entities
func ListQueuesWithTop ¶ added in v0.10.14
func ListQueuesWithTop(top int) ListQueuesOption
ListQueuesWithTop will return at most `top` results
type ListQueuesOptions ¶ added in v0.10.14
type ListQueuesOptions struct {
// contains filtered or unexported fields
}
ListQueuesOptions provides options for List() to control things like page size. NOTE: Use the ListQueuesWith* methods to specify this.
type ListSubscriptionsOption ¶ added in v0.10.14
type ListSubscriptionsOption func(*ListSubscriptionsOptions) error
ListSubscriptionsOption represents named options for listing topics
func ListSubscriptionsWithSkip ¶ added in v0.10.14
func ListSubscriptionsWithSkip(skip int) ListSubscriptionsOption
ListSubscriptionsWithSkip will skip the specified number of entities
func ListSubscriptionsWithTop ¶ added in v0.10.14
func ListSubscriptionsWithTop(top int) ListSubscriptionsOption
ListSubscriptionsWithTop will return at most `top` results
type ListSubscriptionsOptions ¶ added in v0.10.14
type ListSubscriptionsOptions struct {
// contains filtered or unexported fields
}
ListSubscriptionsOptions provides options for List() to control things like page size. NOTE: Use the ListSubscriptionsWith* methods to specify this.
type ListTopicsOption ¶ added in v0.10.14
type ListTopicsOption func(*ListTopicsOptions) error
ListTopicsOption represents named options for listing topics
func ListTopicsWithSkip ¶ added in v0.10.14
func ListTopicsWithSkip(skip int) ListTopicsOption
ListTopicsWithSkip will skip the specified number of entities
func ListTopicsWithTop ¶ added in v0.10.14
func ListTopicsWithTop(top int) ListTopicsOption
ListTopicsWithTop will return at most `top` results
type ListTopicsOptions ¶ added in v0.10.14
type ListTopicsOptions struct {
// contains filtered or unexported fields
}
ListTopicsOptions provides options for List() to control things like page size. NOTE: Use the ListTopicsWith* methods to specify this.
type ListenerHandle ¶
type ListenerHandle struct {
// contains filtered or unexported fields
}
ListenerHandle provides the ability to close or listen to the close of a Receiver
func (*ListenerHandle) Close ¶
func (lc *ListenerHandle) Close(ctx context.Context) error
Close will close the listener
func (*ListenerHandle) Done ¶
func (lc *ListenerHandle) Done() <-chan struct{}
Done will close the channel when the listener has stopped
func (*ListenerHandle) Err ¶
func (lc *ListenerHandle) Err() error
Err will return the last error encountered
type MaxMessageSizeInBytes ¶ added in v0.3.0
type MaxMessageSizeInBytes int
MaxMessageSizeInBytes is the max number of bytes allowed by Azure Service Bus
const ( // StandardMaxMessageSizeInBytes is the maximum number of bytes in a message for the Standard tier StandardMaxMessageSizeInBytes MaxMessageSizeInBytes = 256000 // PremiumMaxMessageSizeInBytes is the maximum number of bytes in a message for the Premium tier PremiumMaxMessageSizeInBytes MaxMessageSizeInBytes = 1000000 )
type Message ¶
type Message struct { ContentType string CorrelationID string Data []byte DeliveryCount uint32 SessionID *string GroupSequence *uint32 ID string Label string ReplyTo string ReplyToGroupID string To string TTL *time.Duration LockToken *uuid.UUID SystemProperties *SystemProperties UserProperties map[string]interface{} Format uint32 // contains filtered or unexported fields }
Message is an Service Bus message to be sent or received
func NewMessage ¶
NewMessage builds an Message from a slice of data
func NewMessageFromString ¶
NewMessageFromString builds an Message from a string message
func (*Message) Abandon ¶
Abandon will notify Azure Service Bus the message failed but should be re-queued for delivery.
func (*Message) AbandonAction ¶ added in v0.2.0
func (m *Message) AbandonAction() DispositionAction
AbandonAction will notify Azure Service Bus the message failed but should be re-queued for delivery.
func (*Message) Complete ¶
Complete will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue
func (*Message) CompleteAction ¶ added in v0.2.0
func (m *Message) CompleteAction() DispositionAction
CompleteAction will notify Azure Service Bus that the message was successfully handled and should be deleted from the queue
func (*Message) DeadLetter ¶
DeadLetter will notify Azure Service Bus the message failed and should not re-queued
func (*Message) DeadLetterAction ¶ added in v0.2.0
func (m *Message) DeadLetterAction(err error) DispositionAction
DeadLetterAction will notify Azure Service Bus the message failed and should not re-queued
func (*Message) DeadLetterWithInfo ¶
func (m *Message) DeadLetterWithInfo(ctx context.Context, err error, condition MessageErrorCondition, additionalData map[string]string) error
DeadLetterWithInfo will notify Azure Service Bus the message failed and should not be re-queued with additional context
func (*Message) DeadLetterWithInfoAction ¶ added in v0.2.0
func (m *Message) DeadLetterWithInfoAction(err error, condition MessageErrorCondition, additionalData map[string]string) DispositionAction
DeadLetterWithInfoAction will notify Azure Service Bus the message failed and should not be re-queued with additional context
func (*Message) Defer ¶ added in v0.2.0
Defer will set aside the message for later processing
When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.
Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.
A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.
Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.
func (*Message) GetKeyValues ¶ added in v0.8.0
GetKeyValues implements tab.Carrier
func (*Message) ScheduleAt ¶ added in v0.2.0
ScheduleAt will ensure Azure Service Bus delivers the message after the time specified (usually within 1 minute after the specified time)
type MessageBatch ¶ added in v0.3.0
type MessageBatch struct { *Message MaxSize MaxMessageSizeInBytes // contains filtered or unexported fields }
MessageBatch represents a batch of messages to send to Service Bus in a single message
func NewMessageBatch ¶ added in v0.3.0
func NewMessageBatch(maxSize MaxMessageSizeInBytes, messageID string, opts *BatchOptions) *MessageBatch
NewMessageBatch builds a new message batch with a default standard max message size
func (*MessageBatch) Add ¶ added in v0.3.0
func (mb *MessageBatch) Add(m *Message) (bool, error)
Add adds a message to the batch if the message will not exceed the max size of the batch
func (*MessageBatch) Clear ¶ added in v0.3.0
func (mb *MessageBatch) Clear()
Clear will zero out the batch size and clear the buffered messages
func (*MessageBatch) Size ¶ added in v0.3.0
func (mb *MessageBatch) Size() int
Size is the number of bytes in the message batch
type MessageBatchIterator ¶ added in v0.3.0
type MessageBatchIterator struct { Messages []*Message Cursor int MaxSize MaxMessageSizeInBytes }
MessageBatchIterator provides an easy way to iterate over a slice of messages to reliably create batches
func NewMessageBatchIterator ¶ added in v0.3.0
func NewMessageBatchIterator(maxBatchSize MaxMessageSizeInBytes, msgs ...*Message) *MessageBatchIterator
NewMessageBatchIterator wraps a slice of Message pointers to allow it to be made into a MessageIterator.
func (*MessageBatchIterator) Done ¶ added in v0.3.0
func (mbi *MessageBatchIterator) Done() bool
Done communicates whether there are more messages remaining to be iterated over.
func (*MessageBatchIterator) Next ¶ added in v0.3.0
func (mbi *MessageBatchIterator) Next(messageID string, opts *BatchOptions) (*MessageBatch, error)
Next fetches the batch of messages in the message slice at a position one larger than the last one accessed.
type MessageErrorCondition ¶
type MessageErrorCondition string
MessageErrorCondition represents a well-known collection of AMQP errors
const ( ErrorInternalError MessageErrorCondition = "amqp:internal-error" ErrorNotFound MessageErrorCondition = "amqp:not-found" ErrorDecodeError MessageErrorCondition = "amqp:decode-error" ErrorResourceLimitExceeded MessageErrorCondition = "amqp:resource-limit-exceeded" ErrorNotAllowed MessageErrorCondition = "amqp:not-allowed" ErrorInvalidField MessageErrorCondition = "amqp:invalid-field" ErrorNotImplemented MessageErrorCondition = "amqp:not-implemented" ErrorResourceLocked MessageErrorCondition = "amqp:resource-locked" ErrorPreconditionFailed MessageErrorCondition = "amqp:precondition-failed" ErrorResourceDeleted MessageErrorCondition = "amqp:resource-deleted" ErrorIllegalState MessageErrorCondition = "amqp:illegal-state" )
Error Conditions
type MessageIterator ¶ added in v0.2.0
MessageIterator offers a simple mechanism for iterating over a list of
Example ¶
subject := servicebus.AsMessageSliceIterator([]*servicebus.Message{ servicebus.NewMessageFromString("hello"), servicebus.NewMessageFromString("world"), }) for !subject.Done() { cursor, err := subject.Next(context.Background()) if err != nil { fmt.Println(err) return } fmt.Println(string(cursor.Data)) }
Output: hello world
type MessageSession ¶ added in v0.2.0
type MessageSession struct { *Receiver // contains filtered or unexported fields }
MessageSession represents and allows for interaction with a Service Bus Session.
func (*MessageSession) Close ¶ added in v0.2.0
func (ms *MessageSession) Close()
Close communicates that Handler receiving messages should no longer continue to be executed. This can happen when:
- A Handler recognizes that no further messages will come to this session.
- A Handler has given up on receiving more messages before a session. Future messages should be delegated to the next available session client.
func (*MessageSession) ListSessions ¶ added in v0.2.0
func (ms *MessageSession) ListSessions(ctx context.Context) ([]byte, error)
ListSessions will list all of the sessions available
func (*MessageSession) LockedUntil ¶ added in v0.2.0
func (ms *MessageSession) LockedUntil() time.Time
LockedUntil fetches the moment in time when the Session lock held by this Receiver will expire.
func (*MessageSession) RenewLock ¶ added in v0.2.0
func (ms *MessageSession) RenewLock(ctx context.Context) error
RenewLock requests that the Service Bus Server renews this client's lock on an existing Session.
func (*MessageSession) SessionID ¶ added in v0.2.0
func (ms *MessageSession) SessionID() *string
SessionID gets the unique identifier of the session being interacted with by this MessageSession.
func (*MessageSession) SetState ¶ added in v0.2.0
func (ms *MessageSession) SetState(ctx context.Context, state []byte) error
SetState updates the current State associated with this Session.
func (*MessageSession) State ¶ added in v0.2.0
func (ms *MessageSession) State(ctx context.Context) ([]byte, error)
State retrieves the current State associated with this Session. https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response#get-session-state
type MessageSliceIterator ¶ added in v0.2.0
MessageSliceIterator is a wrapper, which lets any slice of Message pointers be used as a MessageIterator.
func AsMessageSliceIterator ¶ added in v0.2.0
func AsMessageSliceIterator(target []*Message) *MessageSliceIterator
AsMessageSliceIterator wraps a slice of Message pointers to allow it to be made into a MessageIterator.
func (MessageSliceIterator) Done ¶ added in v0.2.0
func (ms MessageSliceIterator) Done() bool
Done communicates whether there are more messages remaining to be iterated over.
type MessageStatus ¶ added in v0.3.0
type MessageStatus dispositionStatus
MessageStatus defines an acceptable Message disposition status.
const ( // Complete exposes completedDisposition Complete MessageStatus = MessageStatus(completedDisposition) // Abort exposes abandonedDisposition Abort MessageStatus = MessageStatus(abandonedDisposition) )
type MiddlewareFunc ¶ added in v0.2.0
type MiddlewareFunc func(next RestHandler) RestHandler
MiddlewareFunc allows a consumer of the entity manager to inject handlers within the request / response pipeline
The example below adds the atom xml content type to the request, calls the next middleware and returns the result.
addAtomXMLContentType MiddlewareFunc = func(next RestHandler) RestHandler { return func(ctx context.Context, req *http.Request) (res *http.Response, e error) { if req.Method != http.MethodGet && req.Method != http.MethodHead { req.Header.Add("content-Type", "application/atom+xml;type=entry;charset=utf-8") } return next(ctx, req) } }
func TraceReqAndResponseMiddleware ¶ added in v0.2.0
func TraceReqAndResponseMiddleware() MiddlewareFunc
TraceReqAndResponseMiddleware will print the dump of the management request and response.
This should only be used for debugging or educational purposes.
type Namespace ¶
type Namespace struct { Name string Suffix string TokenProvider auth.TokenProvider Environment azure.Environment // contains filtered or unexported fields }
Namespace provides a simplified facade over the AMQP implementation of Azure Service Bus and is the entry point for using Queues, Topics and Subscriptions
func NewNamespace ¶
func NewNamespace(opts ...NamespaceOption) (*Namespace, error)
NewNamespace creates a new namespace configured through NamespaceOption(s)
func (*Namespace) NewQueue ¶
func (ns *Namespace) NewQueue(name string, opts ...QueueOption) (*Queue, error)
NewQueue creates a new Queue Sender / Receiver
func (*Namespace) NewQueueManager ¶
func (ns *Namespace) NewQueueManager() *QueueManager
NewQueueManager creates a new QueueManager for a Service Bus Namespace
func (*Namespace) NewReceiver ¶ added in v0.2.0
func (ns *Namespace) NewReceiver(ctx context.Context, entityPath string, opts ...ReceiverOption) (*Receiver, error)
NewReceiver creates a new Service Bus message listener given an AMQP client and an entity path
func (*Namespace) NewSender ¶ added in v0.2.0
func (ns *Namespace) NewSender(ctx context.Context, entityPath string, opts ...SenderOption) (*Sender, error)
NewSender creates a new Service Bus message Sender given an AMQP client and entity path
func (*Namespace) NewSubscriptionManager ¶
func (ns *Namespace) NewSubscriptionManager(topicName string) (*SubscriptionManager, error)
NewSubscriptionManager creates a new SubscriptionManger for a Service Bus Namespace
func (*Namespace) NewTopic ¶
func (ns *Namespace) NewTopic(name string, opts ...TopicOption) (*Topic, error)
NewTopic creates a new Topic Sender
func (*Namespace) NewTopicManager ¶
func (ns *Namespace) NewTopicManager() *TopicManager
NewTopicManager creates a new TopicManager for a Service Bus Namespace
type NamespaceOption ¶
NamespaceOption provides structure for configuring a new Service Bus namespace
func NamespaceWithAzureEnvironment ¶ added in v0.10.7
func NamespaceWithAzureEnvironment(namespaceName, environmentName string) NamespaceOption
NamespaceWithAzureEnvironment sets the namespace's Environment, Suffix and ResourceURI parameters according to the Azure Environment defined in "github.com/Azure/go-autorest/autorest/azure" package. This allows to configure the library to be used in the different Azure clouds. environmentName is the name of the cloud as defined in autorest : https://github.com/Azure/go-autorest/blob/b076c1437d051bf4c328db428b70f4fe22ad38b0/autorest/azure/environments.go#L34-L39
func NamespaceWithConnectionString ¶
func NamespaceWithConnectionString(connStr string) NamespaceOption
NamespaceWithConnectionString configures a namespace with the information provided in a Service Bus connection string
func NamespaceWithEnvironmentBinding ¶ added in v0.10.0
func NamespaceWithEnvironmentBinding(name string) NamespaceOption
NamespaceWithEnvironmentBinding configures a namespace using the environment details. It uses one of the following methods:
- Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and "AZURE_CLIENT_SECRET"
- Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID", "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
3. Managed Identity (MI): attempt to authenticate via the MI assigned to the Azure resource
The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
func NamespaceWithTLSConfig ¶ added in v0.6.0
func NamespaceWithTLSConfig(tlsConfig *tls.Config) NamespaceOption
NamespaceWithTLSConfig appends to the TLS config.
func NamespaceWithTokenProvider ¶ added in v0.10.7
func NamespaceWithTokenProvider(provider auth.TokenProvider) NamespaceOption
NamespaceWithTokenProvider sets the token provider on the namespace
func NamespaceWithUserAgent ¶ added in v0.3.0
func NamespaceWithUserAgent(userAgent string) NamespaceOption
NamespaceWithUserAgent appends to the root user-agent value.
func NamespaceWithWebSocket ¶ added in v0.5.0
func NamespaceWithWebSocket() NamespaceOption
NamespaceWithWebSocket configures the namespace and all entities to use wss:// rather than amqps://
Example ¶
const queueName = "wssQueue" connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a Service Bus Namespace using a connection string over wss:// on port 443 ns, err := servicebus.NewNamespace( servicebus.NamespaceWithConnectionString(connStr), servicebus.NamespaceWithWebSocket(), ) if err != nil { fmt.Println(err) return } // Create a context to limit how long we will try to send, then push the message over the wire. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() qm := ns.NewQueueManager() if _, err := ensureQueue(ctx, qm, queueName); err != nil { fmt.Println(err) return } client, err := ns.NewQueue(queueName) if err != nil { fmt.Println(err) return } // Send a message to the queue if err := client.Send(ctx, servicebus.NewMessageFromString("Hello World!!!")); err != nil { fmt.Println(err) } // Receive the message from the queue if err := client.ReceiveOne(ctx, MessagePrinter{}); err != nil { fmt.Println(err) }
Output: Hello World!!!
type PeekOption ¶ added in v0.2.0
type PeekOption func(*peekIterator) error
PeekOption allows customization of parameters when querying a Service Bus entity for messages without committing to processing them.
func PeekFromSequenceNumber ¶ added in v0.2.0
func PeekFromSequenceNumber(seq int64) PeekOption
PeekFromSequenceNumber adds a filter to the Peek operation, so that no messages with a Sequence Number less than 'seq' are returned.
func PeekWithPageSize ¶ added in v0.2.0
func PeekWithPageSize(pageSize int) PeekOption
PeekWithPageSize adjusts how many messages are fetched at once while peeking from the server.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue represents a Service Bus Queue entity, which offers First In, First Out (FIFO) message delivery to one or more competing consumers. That is, messages are typically expected to be received and processed by the receivers in the order in which they were added to the queue, and each message is received and processed by only one message consumer.
Example (GetOrBuildQueue) ¶
const queueName = "myqueue" connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println(err) return } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() qm := ns.NewQueueManager() qe, err := qm.Get(ctx, queueName) if err != nil && !servicebus.IsErrNotFound(err) { fmt.Println(err) return } if qe == nil { _, err := qm.Put(ctx, queueName) if err != nil { fmt.Println(err) return } } q, err := ns.NewQueue(queueName) if err != nil { fmt.Println(err) return } fmt.Println(q.Name)
Output: myqueue
Example (ScheduleAndCancelMessages) ¶
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute+40*time.Second) defer cancel() connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } // Create a client to communicate with a Service Bus Namespace. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println("FATAL: ", err) return } client, err := ns.NewQueue("schedulewithqueue") if err != nil { fmt.Println("FATAL: ", err) return } // The delay that we should schedule a message for. const waitTime = 1 * time.Minute expectedTime := time.Now().Add(waitTime) msg := servicebus.NewMessageFromString("to the future!!") scheduled, err := client.ScheduleAt(ctx, expectedTime, msg) if err != nil { fmt.Println("FATAL: ", err) return } err = client.CancelScheduled(ctx, scheduled...) if err != nil { fmt.Println("FATAL: ", err) return } fmt.Println("All Messages Scheduled and Cancelled")
Output: All Messages Scheduled and Cancelled
Example (SessionsRoundTrip) ¶
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Setup the required clients for communicating with Service Bus. // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING") if connStr == "" { fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set") return } ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr)) if err != nil { fmt.Println("FATAL: ", err) return } client, err := ns.NewQueue("receivesession") if err != nil { fmt.Println("FATAL: ", err) return } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Publish five session's worth of data. // // // // The sessions are deliberately interleaved to demonstrate consumption semantics. // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// const numSessions = 5 adjectives := []string{"Doltish", "Foolish", "Juvenile"} nouns := []string{"Automaton", "Luddite", "Monkey", "Neanderthal"} // seed chosen arbitrarily, see https://en.wikipedia.org/wiki/Taxicab_number generator := rand.New(rand.NewSource(1729)) sessionIDs := make([]string, numSessions) // Establish a set of sessions for i := 0; i < numSessions; i++ { if rawSessionID, err := uuid.NewV4(); err == nil { sessionIDs[i] = rawSessionID.String() } else { fmt.Println("FATAL: ", err) return } } // Publish an adjective for each session for i := 0; i < numSessions; i++ { adj := adjectives[generator.Intn(len(adjectives))] msg := servicebus.NewMessageFromString(adj) msg.SessionID = &sessionIDs[i] if err := client.Send(ctx, msg); err != nil { fmt.Println("FATAL: ", err) return } } // Publish a noun for each session for i := 0; i < numSessions; i++ { noun := nouns[generator.Intn(len(nouns))] msg := servicebus.NewMessageFromString(noun) msg.SessionID = &sessionIDs[i] if err := client.Send(ctx, msg); err != nil { fmt.Println("FATAL: ", err) return } } // Publish a numeric suffix for each session for i := 0; i < numSessions; i++ { suffix := fmt.Sprintf("%02d", generator.Intn(100)) msg := servicebus.NewMessageFromString(suffix) msg.SessionID = &sessionIDs[i] if err := client.Send(ctx, msg); err != nil { fmt.Println("FATAL: ", err) return } } //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// // Receive and process the previously published sessions. // // // // The order the sessions are received in is not guaranteed, so the expected output must be "Unordered output". // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// for i := 0; i < numSessions; i++ { handler := &SessionPrinter{} qs := client.NewSession(nil) if err := qs.ReceiveOne(ctx, handler); err != nil { fmt.Println("FATAL: ", err) return } }
Output: FoolishMonkey63 FoolishLuddite05 JuvenileMonkey80 JuvenileLuddite84 FoolishLuddite68
func (*Queue) NewDeadLetter ¶ added in v0.2.0
func (q *Queue) NewDeadLetter() *DeadLetter
NewDeadLetter creates an entity that represents the dead letter sub queue of the queue
Azure Service Bus queues and topic subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ). The dead-letter queue does not need to be explicitly created and cannot be deleted or otherwise managed independent of the main entity.
The purpose of the dead-letter queue is to hold messages that cannot be delivered to any receiver, or messages that could not be processed. Messages can then be removed from the DLQ and inspected. An application might, with help of an operator, correct issues and resubmit the message, log the fact that there was an error, and take corrective action.
From an API and protocol perspective, the DLQ is mostly similar to any other queue, except that messages can only be submitted via the dead-letter operation of the parent entity. In addition, time-to-live is not observed, and you can't dead-letter a message from a DLQ. The dead-letter queue fully supports peek-lock delivery and transactional operations.
Note that there is no automatic cleanup of the DLQ. Messages remain in the DLQ until you explicitly retrieve them from the DLQ and call Complete() on the dead-letter message.
func (*Queue) NewDeadLetterReceiver ¶ added in v0.2.0
func (q *Queue) NewDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
NewDeadLetterReceiver builds a receiver for the Queue's dead letter queue
func (*Queue) NewReceiver ¶ added in v0.2.0
NewReceiver will create a new Receiver for receiving messages off of a queue
func (*Queue) NewSender ¶ added in v0.2.0
NewSender will create a new Sender for sending messages to the queue
func (*Queue) NewSession ¶ added in v0.2.0
func (q *Queue) NewSession(sessionID *string) *QueueSession
NewSession will create a new session based receiver and sender for the queue
Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.
func (*Queue) NewTransferDeadLetter ¶ added in v0.2.0
func (q *Queue) NewTransferDeadLetter() *TransferDeadLetter
NewTransferDeadLetter creates an entity that represents the transfer dead letter sub queue of the queue
Messages will be sent to the transfer dead-letter queue under the following conditions:
- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.
func (*Queue) NewTransferDeadLetterReceiver ¶ added in v0.2.0
func (q *Queue) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
NewTransferDeadLetterReceiver builds a receiver for the Queue's transfer dead letter queue
Messages will be sent to the transfer dead-letter queue under the following conditions:
- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.
func (*Queue) Receive ¶
Receive subscribes for messages sent to the Queue. If the messages not within a session, messages will arrive unordered.
Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.
If the handler returns an error, the receive loop will be terminated.
Example ¶
// Define a function that should be executed when a message is received. var printMessage servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error { fmt.Println(string(msg.Data)) return msg.Complete(ctx) } // Instantiate the clients needed to communicate with a Service Bus Queue. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>")) if err != nil { return } client, err := ns.NewQueue("myqueue") if err != nil { return } // Define a context to limit how long we will block to receive messages, then start serving our function. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() if err := client.Receive(ctx, printMessage); err != nil { fmt.Println("FATAL: ", err) }
Output:
Example (Second) ¶
// Set concurrent number const concurrentNum = 5 // Define msg chan msgChan := make(chan *servicebus.Message, concurrentNum) // Define a function that should be executed when a message is received. var concurrentHandler servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error { msgChan <- msg return nil } // Define msg workers for i := 0; i < concurrentNum; i++ { go func() { for msg := range msgChan { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond) defer cancel() fmt.Println(string(msg.Data)) msg.Complete(ctx) } }() } // Instantiate the clients needed to communicate with a Service Bus Queue. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>")) if err != nil { close(msgChan) return } // Init queue client with prefetch count client, err := ns.NewQueue("myqueue", servicebus.QueueWithPrefetchCount(concurrentNum)) if err != nil { close(msgChan) return } // Define a context to limit how long we will block to receive messages, then start serving our function. ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) defer cancel() if err := client.Receive(ctx, concurrentHandler); err != nil { fmt.Println("FATAL: ", err) } // Close the message chan close(msgChan)
Output:
func (*Queue) ReceiveOne ¶
ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.
Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.
func (*Queue) Send ¶
Send sends messages to the Queue
Example ¶
// Instantiate the clients needed to communicate with a Service Bus Queue. ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>")) if err != nil { return } client, err := ns.NewQueue("myqueue") if err != nil { return } // Create a context to limit how long we will try to send, then push the message over the wire. ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() if err := client.Send(ctx, servicebus.NewMessageFromString("Hello World!!!")); err != nil { fmt.Println("FATAL: ", err) }
Output:
type QueueDescription ¶
type QueueDescription struct { XMLName xml.Name `xml:"QueueDescription"` BaseEntityDescription LockDuration *string `xml:"LockDuration,omitempty"` // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute. MaxSizeInMegabytes *int32 `xml:"MaxSizeInMegabytes,omitempty"` // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024. RequiresDuplicateDetection *bool `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection. RequiresSession *bool `xml:"RequiresSession,omitempty"` DefaultMessageTimeToLive *string `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. DeadLetteringOnMessageExpiration *bool `xml:"DeadLetteringOnMessageExpiration,omitempty"` // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires. DuplicateDetectionHistoryTimeWindow *string `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes. MaxDeliveryCount *int32 `xml:"MaxDeliveryCount,omitempty"` // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10. EnableBatchedOperations *bool `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled. SizeInBytes *int64 `xml:"SizeInBytes,omitempty"` // SizeInBytes - The size of the queue, in bytes. MessageCount *int64 `xml:"MessageCount,omitempty"` // MessageCount - The number of messages in the queue. IsAnonymousAccessible *bool `xml:"IsAnonymousAccessible,omitempty"` Status *EntityStatus `xml:"Status,omitempty"` CreatedAt *date.Time `xml:"CreatedAt,omitempty"` UpdatedAt *date.Time `xml:"UpdatedAt,omitempty"` SupportOrdering *bool `xml:"SupportOrdering,omitempty"` AutoDeleteOnIdle *string `xml:"AutoDeleteOnIdle,omitempty"` EnablePartitioning *bool `xml:"EnablePartitioning,omitempty"` EnableExpress *bool `xml:"EnableExpress,omitempty"` CountDetails *CountDetails `xml:"CountDetails,omitempty"` ForwardTo *string `xml:"ForwardTo,omitempty"` ForwardDeadLetteredMessagesTo *string `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages }
QueueDescription is the content type for Queue management requests
type QueueEntity ¶
type QueueEntity struct { *QueueDescription *Entity }
QueueEntity is the Azure Service Bus description of a Queue for management activities
type QueueManagementOption ¶
type QueueManagementOption func(*QueueDescription) error
QueueManagementOption represents named configuration options for queue mutation
func QueueEntityWithAutoDeleteOnIdle ¶
func QueueEntityWithAutoDeleteOnIdle(window *time.Duration) QueueManagementOption
QueueEntityWithAutoDeleteOnIdle configures the queue to automatically delete after the specified idle interval. The minimum duration is 5 minutes.
func QueueEntityWithAutoForward ¶ added in v0.2.0
func QueueEntityWithAutoForward(target Targetable) QueueManagementOption
QueueEntityWithAutoForward configures the queue to automatically forward messages to the specified target.
The ability to AutoForward to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.
func QueueEntityWithDeadLetteringOnMessageExpiration ¶
func QueueEntityWithDeadLetteringOnMessageExpiration() QueueManagementOption
QueueEntityWithDeadLetteringOnMessageExpiration will ensure the queue sends expired messages to the dead letter queue
func QueueEntityWithDuplicateDetection ¶
func QueueEntityWithDuplicateDetection(window *time.Duration) QueueManagementOption
QueueEntityWithDuplicateDetection configures the queue to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.
func QueueEntityWithForwardDeadLetteredMessagesTo ¶ added in v0.2.0
func QueueEntityWithForwardDeadLetteredMessagesTo(target Targetable) QueueManagementOption
QueueEntityWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to the specified target.
The ability to forward dead letter messages to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.
func QueueEntityWithLockDuration ¶
func QueueEntityWithLockDuration(window *time.Duration) QueueManagementOption
QueueEntityWithLockDuration configures the queue to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
func QueueEntityWithMaxDeliveryCount ¶
func QueueEntityWithMaxDeliveryCount(count int32) QueueManagementOption
QueueEntityWithMaxDeliveryCount configures the queue to have a maximum number of delivery attempts before dead-lettering the message
func QueueEntityWithMaxSizeInMegabytes ¶
func QueueEntityWithMaxSizeInMegabytes(size int) QueueManagementOption
QueueEntityWithMaxSizeInMegabytes configures the maximum size of the queue in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the queue. Default is 1 MB (1 * 1024).
size must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku
func QueueEntityWithMessageTimeToLive ¶
func QueueEntityWithMessageTimeToLive(window *time.Duration) QueueManagementOption
QueueEntityWithMessageTimeToLive configures the queue to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.
func QueueEntityWithPartitioning ¶
func QueueEntityWithPartitioning() QueueManagementOption
QueueEntityWithPartitioning ensure the created queue will be a partitioned queue. Partitioned queues offer increased storage and availability compared to non-partitioned queues with the trade-off of requiring the following to ensure FIFO message retrieval:
SessionId. If a message has the SessionId property set, then Service Bus uses the SessionId property as the partition key. This way, all messages that belong to the same session are assigned to the same fragment and handled by the same message broker. This allows Service Bus to guarantee message ordering as well as the consistency of session states.
PartitionKey. If a message has the PartitionKey property set but not the SessionId property, then Service Bus uses the PartitionKey property as the partition key. Use the PartitionKey property to send non-sessionful transactional messages. The partition key ensures that all messages that are sent within a transaction are handled by the same messaging broker.
MessageId. If the queue has the RequiresDuplicationDetection property set to true, then the MessageId property serves as the partition key if the SessionId or a PartitionKey properties are not set. This ensures that all copies of the same message are handled by the same message broker and, thus, allows Service Bus to detect and eliminate duplicate messages
func QueueEntityWithRequiredSessions ¶
func QueueEntityWithRequiredSessions() QueueManagementOption
QueueEntityWithRequiredSessions will ensure the queue requires senders and receivers to have sessionIDs
type QueueManager ¶
type QueueManager struct {
// contains filtered or unexported fields
}
QueueManager provides CRUD functionality for Service Bus Queues
func (*QueueManager) Delete ¶
func (qm *QueueManager) Delete(ctx context.Context, name string) error
Delete deletes a Service Bus Queue entity by name
func (*QueueManager) Get ¶
func (qm *QueueManager) Get(ctx context.Context, name string) (*QueueEntity, error)
Get fetches a Service Bus Queue entity by name
func (*QueueManager) List ¶
func (qm *QueueManager) List(ctx context.Context, options ...ListQueuesOption) ([]*QueueEntity, error)
List fetches all of the queues for a Service Bus Namespace
func (QueueManager) Post ¶
func (em QueueManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
Post performs an HTTP POST for a given entity path and body
func (*QueueManager) Put ¶
func (qm *QueueManager) Put(ctx context.Context, name string, opts ...QueueManagementOption) (*QueueEntity, error)
Put creates or updates a Service Bus Queue
func (QueueManager) TokenProvider ¶ added in v0.2.0
func (em QueueManager) TokenProvider() auth.TokenProvider
TokenProvider generates authorization tokens for communicating with the Service Bus management API
func (QueueManager) Use ¶ added in v0.2.0
func (em QueueManager) Use(mw ...MiddlewareFunc)
Use adds middleware to the middleware mwStack
type QueueOption ¶
QueueOption represents named options for assisting Queue message handling
func QueueWithPrefetchCount ¶ added in v0.2.0
func QueueWithPrefetchCount(prefetch uint32) QueueOption
QueueWithPrefetchCount configures the queue to attempt to fetch the number of messages specified by the prefetch count at one time.
The default is 1 message at a time.
Caution: Using PeekLock, messages have a set lock timeout, which can be renewed. By setting a high prefetch count, a local queue of messages could build up and cause message locks to expire before the message lands in the handler. If this happens, the message disposition will fail and will be re-queued and processed again.
func QueueWithReceiveAndDelete ¶
func QueueWithReceiveAndDelete() QueueOption
QueueWithReceiveAndDelete configures a queue to pop and delete messages off of the queue upon receiving the message. This differs from the default, PeekLock, where PeekLock receives a message, locks it for a period of time, then sends a disposition to the broker when the message has been processed.
type QueueSession ¶ added in v0.2.0
type QueueSession struct {
// contains filtered or unexported fields
}
QueueSession wraps Service Bus session functionality over a Queue
func NewQueueSession ¶ added in v0.2.0
func NewQueueSession(builder SendAndReceiveBuilder, sessionID *string) *QueueSession
NewQueueSession creates a new session sender and receiver to communicate with a Service Bus queue.
Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.
func (*QueueSession) Close ¶ added in v0.2.0
func (qs *QueueSession) Close(ctx context.Context) error
Close the underlying connection to Service Bus
func (*QueueSession) ManagementPath ¶ added in v0.9.0
func (qs *QueueSession) ManagementPath() string
ManagementPath provides an addressable path to the Entity management endpoint
func (*QueueSession) ReceiveDeferred ¶ added in v0.9.0
func (qs *QueueSession) ReceiveDeferred(ctx context.Context, handler Handler, mode ReceiveMode, sequenceNumbers ...int64) error
ReceiveDeferred will receive and handle a set of deferred messages
When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.
Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.
A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.
Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.
func (*QueueSession) ReceiveOne ¶ added in v0.2.0
func (qs *QueueSession) ReceiveOne(ctx context.Context, handler SessionHandler) error
ReceiveOne waits for the lock on a particular session to become available, takes it, then process the session. The session can contain multiple messages. ReceiveOne will receive all messages within that session.
Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.
If the handler returns an error, the receive loop will be terminated.
func (*QueueSession) Send ¶ added in v0.2.0
func (qs *QueueSession) Send(ctx context.Context, msg *Message) error
Send the message to the queue within a session
func (*QueueSession) SessionID ¶ added in v0.2.0
func (qs *QueueSession) SessionID() *string
SessionID is the identifier for the Service Bus session
type ReceiveBuilder ¶ added in v0.2.0
type ReceiveBuilder interface { ReceiverBuilder // contains filtered or unexported methods }
ReceiveBuilder is a ReceiverBuilder and EntityManagementAddresser
type ReceiveMode ¶
type ReceiveMode int
ReceiveMode represents the behavior when consuming a message from a queue
type ReceiveOner ¶ added in v0.2.0
ReceiveOner provides the ability to receive and handle events
type Receiver ¶ added in v0.2.0
type Receiver struct { Name string DefaultDisposition DispositionAction Closed bool // contains filtered or unexported fields }
Receiver provides connection, session and link handling for a receiving to an entity path
func (*Receiver) Listen ¶ added in v0.2.0
func (r *Receiver) Listen(ctx context.Context, handler Handler) *ListenerHandle
Listen start a listener for messages sent to the entity path
func (*Receiver) ReceiveOne ¶ added in v0.2.0
ReceiveOne will receive one message from the link
type ReceiverBuilder ¶ added in v0.2.0
type ReceiverBuilder interface {
NewReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error)
}
ReceiverBuilder describes the ability of an entity to build receiver links
type ReceiverOption ¶ added in v0.2.0
ReceiverOption provides a structure for configuring receivers
func ReceiverWithPrefetchCount ¶ added in v0.2.0
func ReceiverWithPrefetchCount(prefetch uint32) ReceiverOption
ReceiverWithPrefetchCount configures the receiver to attempt to fetch the number of messages specified by the prefect at one time.
The default is 1 message at a time.
Caution: Using PeekLock, messages have a set lock timeout, which can be renewed. By setting a high prefetch count, a local queue of messages could build up and cause message locks to expire before the message lands in the handler. If this happens, the message disposition will fail and will be re-queued and processed again.
func ReceiverWithReceiveMode ¶ added in v0.2.0
func ReceiverWithReceiveMode(mode ReceiveMode) ReceiverOption
ReceiverWithReceiveMode configures a Receiver to use the specified receive mode
func ReceiverWithSession ¶ added in v0.2.0
func ReceiverWithSession(sessionID *string) ReceiverOption
ReceiverWithSession configures a Receiver to use a session
type RestHandler ¶ added in v0.2.0
RestHandler is used to transform a request and response within the http pipeline
type RuleDescription ¶ added in v0.2.0
type RuleDescription struct { XMLName xml.Name `xml:"RuleDescription"` BaseEntityDescription CreatedAt *date.Time `xml:"CreatedAt,omitempty"` Filter FilterDescription `xml:"Filter"` Action *ActionDescription `xml:"Action,omitempty"` }
RuleDescription is the content type for Subscription Rule management requests
type RuleEntity ¶ added in v0.2.0
type RuleEntity struct { *RuleDescription *Entity }
RuleEntity is the Azure Service Bus description of a Subscription Rule for management activities
type SQLAction ¶ added in v0.2.0
type SQLAction struct {
Expression string
}
SQLAction represents a SQL language-based action expression that is evaluated against a BrokeredMessage. A SQLAction supports a subset of the SQL-92 standard.
With SQL filter conditions, you can define an action that can annotate the message by adding, removing, or replacing properties and their values. The action uses a SQL-like expression that loosely leans on the SQL UPDATE statement syntax. The action is performed on the message after it has been matched and before the message is selected into the subscription. The changes to the message properties are private to the message copied into the subscription.
see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter
func (SQLAction) ToActionDescription ¶ added in v0.2.0
func (sf SQLAction) ToActionDescription() ActionDescription
ToActionDescription will transform the SqlAction into a ActionDescription
type SQLFilter ¶ added in v0.2.0
type SQLFilter struct {
Expression string
}
SQLFilter represents a SQL language-based filter expression that is evaluated against a BrokeredMessage. A SQLFilter supports a subset of the SQL-92 standard.
see: https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-sql-filter
func (SQLFilter) ToFilterDescription ¶ added in v0.2.0
func (sf SQLFilter) ToFilterDescription() FilterDescription
ToFilterDescription will transform the SqlFilter into a FilterDescription
type SendAndReceiveBuilder ¶ added in v0.2.0
type SendAndReceiveBuilder interface { ReceiveBuilder SenderBuilder }
SendAndReceiveBuilder is a ReceiverBuilder, SenderBuilder and EntityManagementAddresser
type SendOption ¶
SendOption provides a way to customize a message on sending
type Sender ¶ added in v0.2.0
type Sender struct { Name string // contains filtered or unexported fields }
Sender provides connection, session and link handling for an sending to an entity path
func (*Sender) Close ¶ added in v0.2.0
Close will close the AMQP connection, session and link of the Sender
func (*Sender) Recover ¶ added in v0.2.0
Recover will attempt to close the current session and link, then rebuild them
type SenderBuilder ¶ added in v0.2.0
type SenderBuilder interface {
NewSender(ctx context.Context, opts ...SenderOption) (*Sender, error)
}
SenderBuilder describes the ability of an entity to build sender links
type SenderOption ¶ added in v0.2.0
SenderOption provides a way to customize a Sender
func SenderWithSession ¶ added in v0.2.0
func SenderWithSession(sessionID *string) SenderOption
SenderWithSession configures the message to send with a specific session and sequence. By default, a Sender has a default session (uuid.NewV4()) and sequence generator.
type SessionHandler ¶ added in v0.2.0
type SessionHandler interface { Handler // Start is called when a Receiver is informed that has acquired a lock on a Service Bus Session. Start(*MessageSession) error // End is called when a Receiver is informed that the last message of a Session has been passed to it. End() }
SessionHandler exposes a manner of handling a group of messages together. Instances of SessionHandler should be passed to a Receiver such as a Queue or Subscription.
func NewSessionHandler ¶ added in v0.2.0
func NewSessionHandler(base Handler, start func(*MessageSession) error, end func()) SessionHandler
NewSessionHandler is a type converter that allows three funcs to be tied together into a type that fulfills the SessionHandler interface.
type Subscription ¶
type Subscription struct { Topic *Topic // contains filtered or unexported fields }
Subscription represents a Service Bus Subscription entity which are used to receive topic messages. A topic subscription resembles a virtual queue that receives copies of the messages that are sent to the topic. Messages are received from a subscription identically to the way they are received from a queue.
func (*Subscription) Close ¶
func (s *Subscription) Close(ctx context.Context) error
Close the underlying connection to Service Bus
func (*Subscription) NewDeadLetter ¶ added in v0.2.0
func (s *Subscription) NewDeadLetter() *DeadLetter
NewDeadLetter creates an entity that represents the dead letter sub queue of the queue
Azure Service Bus queues and topic subscriptions provide a secondary sub-queue, called a dead-letter queue (DLQ). The dead-letter queue does not need to be explicitly created and cannot be deleted or otherwise managed independent of the main entity.
The purpose of the dead-letter queue is to hold messages that cannot be delivered to any receiver, or messages that could not be processed. Messages can then be removed from the DLQ and inspected. An application might, with help of an operator, correct issues and resubmit the message, log the fact that there was an error, and take corrective action.
From an API and protocol perspective, the DLQ is mostly similar to any other queue, except that messages can only be submitted via the dead-letter operation of the parent entity. In addition, time-to-live is not observed, and you can't dead-letter a message from a DLQ. The dead-letter queue fully supports peek-lock delivery and transactional operations.
Note that there is no automatic cleanup of the DLQ. Messages remain in the DLQ until you explicitly retrieve them from the DLQ and call Complete() on the dead-letter message.
func (*Subscription) NewDeadLetterReceiver ¶ added in v0.2.0
func (s *Subscription) NewDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
NewDeadLetterReceiver builds a receiver for the Subscriptions's dead letter queue
func (*Subscription) NewReceiver ¶ added in v0.2.0
func (s *Subscription) NewReceiver(ctx context.Context, opts ...ReceiverOption) (*Receiver, error)
NewReceiver will create a new Receiver for receiving messages off of the queue
func (*Subscription) NewSession ¶ added in v0.2.0
func (s *Subscription) NewSession(sessionID *string) *SubscriptionSession
NewSession will create a new session based receiver for the subscription
Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.
func (*Subscription) NewTransferDeadLetter ¶ added in v0.2.0
func (s *Subscription) NewTransferDeadLetter() *TransferDeadLetter
NewTransferDeadLetter creates an entity that represents the transfer dead letter sub queue of the subscription
Messages will be sent to the transfer dead-letter queue under the following conditions:
- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.
func (*Subscription) NewTransferDeadLetterReceiver ¶ added in v0.2.0
func (s *Subscription) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
NewTransferDeadLetterReceiver builds a receiver for the Queue's transfer dead letter queue
Messages will be sent to the transfer dead-letter queue under the following conditions:
- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.
func (Subscription) Peek ¶ added in v0.2.0
func (re Subscription) Peek(ctx context.Context, options ...PeekOption) (MessageIterator, error)
Peek fetches a list of Messages from the Service Bus broker without acquiring a lock or committing to a disposition. The messages are delivered as close to sequence order as possible.
The MessageIterator that is returned has the following properties: - Messages are fetches from the server in pages. Page size is configurable with PeekOptions. - The MessageIterator will always return "false" for Done(). - When Next() is called, it will return either: a slice of messages and no error, nil with an error related to being unable to complete the operation, or an empty slice of messages and an instance of "ErrNoMessages" signifying that there are currently no messages in the queue with a sequence ID larger than previously viewed ones.
func (Subscription) PeekOne ¶ added in v0.2.0
func (re Subscription) PeekOne(ctx context.Context, options ...PeekOption) (*Message, error)
PeekOne fetches a single Message from the Service Bus broker without acquiring a lock or committing to a disposition.
func (*Subscription) Receive ¶
func (s *Subscription) Receive(ctx context.Context, handler Handler) error
Receive subscribes for messages sent to the Subscription
Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.
If the handler returns an error, the receive loop will be terminated.
func (Subscription) ReceiveDeferred ¶ added in v0.2.0
func (re Subscription) ReceiveDeferred(ctx context.Context, handler Handler, sequenceNumbers ...int64) error
ReceiveDeferred will receive and handle a set of deferred messages
When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.
Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.
A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.
Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.
func (Subscription) ReceiveDeferredWithMode ¶ added in v0.9.0
func (re Subscription) ReceiveDeferredWithMode(ctx context.Context, handler Handler, mode ReceiveMode, sequenceNumbers ...int64) error
ReceiveDeferredWithMode will receive and handle a set of deferred messages
When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.
Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.
A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.
Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.
func (*Subscription) ReceiveOne ¶
func (s *Subscription) ReceiveOne(ctx context.Context, handler Handler) error
ReceiveOne will listen to receive a single message. ReceiveOne will only wait as long as the context allows.
Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.
func (Subscription) RenewLocks ¶ added in v0.2.0
RenewLocks renews the locks on messages provided
func (Subscription) SendBatchDisposition ¶ added in v0.3.0
func (re Subscription) SendBatchDisposition(ctx context.Context, iterator BatchDispositionIterator) error
SendBatchDisposition updates the LockTokenIDs to the disposition status.
type SubscriptionDescription ¶
type SubscriptionDescription struct { XMLName xml.Name `xml:"SubscriptionDescription"` BaseEntityDescription LockDuration *string `xml:"LockDuration,omitempty"` // LockDuration - ISO 8601 timespan duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute. RequiresSession *bool `xml:"RequiresSession,omitempty"` DefaultMessageTimeToLive *string `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message timespan to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. DefaultRuleDescription *DefaultRuleDescription `xml:"DefaultRuleDescription,omitempty"` DeadLetteringOnMessageExpiration *bool `xml:"DeadLetteringOnMessageExpiration,omitempty"` // DeadLetteringOnMessageExpiration - A value that indicates whether this queue has dead letter support when a message expires. DeadLetteringOnFilterEvaluationExceptions *bool `xml:"DeadLetteringOnFilterEvaluationExceptions,omitempty"` MessageCount *int64 `xml:"MessageCount,omitempty"` // MessageCount - The number of messages in the queue. MaxDeliveryCount *int32 `xml:"MaxDeliveryCount,omitempty"` // MaxDeliveryCount - The maximum delivery count. A message is automatically deadlettered after this number of deliveries. default value is 10. EnableBatchedOperations *bool `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled. Status *EntityStatus `xml:"Status,omitempty"` CreatedAt *date.Time `xml:"CreatedAt,omitempty"` UpdatedAt *date.Time `xml:"UpdatedAt,omitempty"` AccessedAt *date.Time `xml:"AccessedAt,omitempty"` AutoDeleteOnIdle *string `xml:"AutoDeleteOnIdle,omitempty"` ForwardTo *string `xml:"ForwardTo,omitempty"` // ForwardTo - absolute URI of the entity to forward messages ForwardDeadLetteredMessagesTo *string `xml:"ForwardDeadLetteredMessagesTo,omitempty"` // ForwardDeadLetteredMessagesTo - absolute URI of the entity to forward dead letter messages CountDetails *CountDetails `xml:"CountDetails,omitempty"` }
SubscriptionDescription is the content type for Subscription management requests
type SubscriptionEntity ¶
type SubscriptionEntity struct { *SubscriptionDescription *Entity }
SubscriptionEntity is the Azure Service Bus description of a topic Subscription for management activities
type SubscriptionManagementOption ¶
type SubscriptionManagementOption func(*SubscriptionDescription) error
SubscriptionManagementOption represents named options for assisting Subscription creation
func SubscriptionWithAutoDeleteOnIdle ¶
func SubscriptionWithAutoDeleteOnIdle(window *time.Duration) SubscriptionManagementOption
SubscriptionWithAutoDeleteOnIdle configures the subscription to automatically delete after the specified idle interval. The minimum duration is 5 minutes.
func SubscriptionWithAutoForward ¶ added in v0.2.0
func SubscriptionWithAutoForward(target Targetable) SubscriptionManagementOption
SubscriptionWithAutoForward configures the queue to automatically forward messages to the specified entity path
The ability to AutoForward to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.
func SubscriptionWithBatchedOperations ¶
func SubscriptionWithBatchedOperations() SubscriptionManagementOption
SubscriptionWithBatchedOperations configures the subscription to batch server-side operations.
func SubscriptionWithDeadLetteringOnMessageExpiration ¶
func SubscriptionWithDeadLetteringOnMessageExpiration() SubscriptionManagementOption
SubscriptionWithDeadLetteringOnMessageExpiration will ensure the Subscription sends expired messages to the dead letter queue
func SubscriptionWithDefaultRuleDescription ¶ added in v0.10.3
func SubscriptionWithDefaultRuleDescription(filter FilterDescriber, name string) SubscriptionManagementOption
SubscriptionWithDefaultRuleDescription configures the subscription to set a default rule
func SubscriptionWithForwardDeadLetteredMessagesTo ¶ added in v0.2.0
func SubscriptionWithForwardDeadLetteredMessagesTo(target Targetable) SubscriptionManagementOption
SubscriptionWithForwardDeadLetteredMessagesTo configures the queue to automatically forward dead letter messages to the specified target entity.
The ability to forward dead letter messages to a target requires the connection have management authorization. If the connection string or Azure Active Directory identity used does not have management authorization, an unauthorized error will be returned on the PUT.
func SubscriptionWithLockDuration ¶
func SubscriptionWithLockDuration(window *time.Duration) SubscriptionManagementOption
SubscriptionWithLockDuration configures the subscription to have a duration of a peek-lock; that is, the amount of time that the message is locked for other receivers. The maximum value for LockDuration is 5 minutes; the default value is 1 minute.
func SubscriptionWithMessageTimeToLive ¶
func SubscriptionWithMessageTimeToLive(window *time.Duration) SubscriptionManagementOption
SubscriptionWithMessageTimeToLive configures the subscription to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.
func SubscriptionWithRequiredSessions ¶
func SubscriptionWithRequiredSessions() SubscriptionManagementOption
SubscriptionWithRequiredSessions will ensure the subscription requires senders and receivers to have sessionIDs
type SubscriptionManager ¶
type SubscriptionManager struct { Topic *Topic // contains filtered or unexported fields }
SubscriptionManager provides CRUD functionality for Service Bus Subscription
func (*SubscriptionManager) Delete ¶
func (sm *SubscriptionManager) Delete(ctx context.Context, name string) error
Delete deletes a Service Bus Topic entity by name
func (*SubscriptionManager) DeleteRule ¶ added in v0.2.0
func (sm *SubscriptionManager) DeleteRule(ctx context.Context, subscriptionName, ruleName string) error
DeleteRule will delete a rule on the subscription
func (*SubscriptionManager) Get ¶
func (sm *SubscriptionManager) Get(ctx context.Context, name string) (*SubscriptionEntity, error)
Get fetches a Service Bus Topic entity by name
func (*SubscriptionManager) List ¶
func (sm *SubscriptionManager) List(ctx context.Context, options ...ListSubscriptionsOption) ([]*SubscriptionEntity, error)
List fetches all of the Topics for a Service Bus Namespace
func (*SubscriptionManager) ListRules ¶ added in v0.2.0
func (sm *SubscriptionManager) ListRules(ctx context.Context, subscriptionName string) ([]*RuleEntity, error)
ListRules returns the slice of subscription filter rules
By default when the subscription is created, there exists a single "true" filter which matches all messages.
func (SubscriptionManager) Post ¶
func (em SubscriptionManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
Post performs an HTTP POST for a given entity path and body
func (*SubscriptionManager) Put ¶
func (sm *SubscriptionManager) Put(ctx context.Context, name string, opts ...SubscriptionManagementOption) (*SubscriptionEntity, error)
Put creates or updates a Service Bus Topic
func (*SubscriptionManager) PutRule ¶ added in v0.2.0
func (sm *SubscriptionManager) PutRule(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber) (*RuleEntity, error)
PutRule creates a new Subscription rule to filter messages from the topic
func (*SubscriptionManager) PutRuleWithAction ¶ added in v0.2.0
func (sm *SubscriptionManager) PutRuleWithAction(ctx context.Context, subscriptionName, ruleName string, filter FilterDescriber, action ActionDescriber) (*RuleEntity, error)
PutRuleWithAction creates a new Subscription rule to filter messages from the topic and then perform an action
func (SubscriptionManager) TokenProvider ¶ added in v0.2.0
func (em SubscriptionManager) TokenProvider() auth.TokenProvider
TokenProvider generates authorization tokens for communicating with the Service Bus management API
func (SubscriptionManager) Use ¶ added in v0.2.0
func (em SubscriptionManager) Use(mw ...MiddlewareFunc)
Use adds middleware to the middleware mwStack
type SubscriptionOption ¶
type SubscriptionOption func(*Subscription) error
SubscriptionOption configures the Subscription Azure Service Bus client
func SubscriptionWithPrefetchCount ¶ added in v0.2.0
func SubscriptionWithPrefetchCount(prefetch uint32) SubscriptionOption
SubscriptionWithPrefetchCount configures the subscription to attempt to fetch the number of messages specified by the prefetch count at one time.
The default is 1 message at a time.
Caution: Using PeekLock, messages have a set lock timeout, which can be renewed. By setting a high prefetch count, a local queue of messages could build up and cause message locks to expire before the message lands in the handler. If this happens, the message disposition will fail and will be re-queued and processed again.
func SubscriptionWithReceiveAndDelete ¶
func SubscriptionWithReceiveAndDelete() SubscriptionOption
SubscriptionWithReceiveAndDelete configures a subscription to pop and delete messages off of the queue upon receiving the message. This differs from the default, PeekLock, where PeekLock receives a message, locks it for a period of time, then sends a disposition to the broker when the message has been processed.
type SubscriptionSession ¶ added in v0.2.0
type SubscriptionSession struct {
// contains filtered or unexported fields
}
SubscriptionSession wraps Service Bus session functionality over a Subscription
func NewSubscriptionSession ¶ added in v0.2.0
func NewSubscriptionSession(builder ReceiveBuilder, sessionID *string) *SubscriptionSession
NewSubscriptionSession creates a new session receiver to receive from a Service Bus subscription.
Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.
func (*SubscriptionSession) Close ¶ added in v0.2.0
func (ss *SubscriptionSession) Close(ctx context.Context) error
Close the underlying connection to Service Bus
func (*SubscriptionSession) ManagementPath ¶ added in v0.9.0
func (ss *SubscriptionSession) ManagementPath() string
ManagementPath provides an addressable path to the Entity management endpoint
func (*SubscriptionSession) ReceiveDeferred ¶ added in v0.9.0
func (ss *SubscriptionSession) ReceiveDeferred(ctx context.Context, handler Handler, mode ReceiveMode, sequenceNumbers ...int64) error
ReceiveDeferred will receive and handle a set of deferred messages
When a queue or subscription client receives a message that it is willing to process, but for which processing is not currently possible due to special circumstances inside of the application, it has the option of "deferring" retrieval of the message to a later point. The message remains in the queue or subscription, but it is set aside.
Deferral is a feature specifically created for workflow processing scenarios. Workflow frameworks may require certain operations to be processed in a particular order, and may have to postpone processing of some received messages until prescribed prior work that is informed by other messages has been completed.
A simple illustrative example is an order processing sequence in which a payment notification from an external payment provider appears in a system before the matching purchase order has been propagated from the store front to the fulfillment system. In that case, the fulfillment system might defer processing the payment notification until there is an order with which to associate it. In rendezvous scenarios, where messages from different sources drive a workflow forward, the real-time execution order may indeed be correct, but the messages reflecting the outcomes may arrive out of order.
Ultimately, deferral aids in reordering messages from the arrival order into an order in which they can be processed, while leaving those messages safely in the message store for which processing needs to be postponed.
func (*SubscriptionSession) ReceiveOne ¶ added in v0.2.0
func (ss *SubscriptionSession) ReceiveOne(ctx context.Context, handler SessionHandler) error
ReceiveOne waits for the lock on a particular session to become available, takes it, then process the session. The session can contain multiple messages. ReceiveOneSession will receive all messages within that session.
Handler must call a disposition action such as Complete, Abandon, Deadletter on the message. If the messages does not have a disposition set, the Queue's DefaultDisposition will be used.
If the handler returns an error, the receive loop will be terminated.
func (*SubscriptionSession) SessionID ¶ added in v0.2.0
func (ss *SubscriptionSession) SessionID() *string
SessionID is the identifier for the Service Bus session
type SystemProperties ¶
type SystemProperties struct { LockedUntil *time.Time `mapstructure:"x-opt-locked-until"` SequenceNumber *int64 `mapstructure:"x-opt-sequence-number"` PartitionID *int16 `mapstructure:"x-opt-partition-id"` PartitionKey *string `mapstructure:"x-opt-partition-key"` EnqueuedTime *time.Time `mapstructure:"x-opt-enqueued-time"` DeadLetterSource *string `mapstructure:"x-opt-deadletter-source"` ScheduledEnqueueTime *time.Time `mapstructure:"x-opt-scheduled-enqueue-time"` EnqueuedSequenceNumber *int64 `mapstructure:"x-opt-enqueue-sequence-number"` ViaPartitionKey *string `mapstructure:"x-opt-via-partition-key"` Annotations map[string]interface{} `mapstructure:"-"` }
SystemProperties are used to store properties that are set by the system.
type Targetable ¶ added in v0.2.0
type Targetable interface {
TargetURI() string
}
Targetable provides the ability to forward messages to the entity
type Topic ¶
type Topic struct {
// contains filtered or unexported fields
}
Topic in contrast to queues, in which each message is processed by a single consumer, topics and subscriptions provide a one-to-many form of communication, in a publish/subscribe pattern. Useful for scaling to very large numbers of recipients, each published message is made available to each subscription registered with the topic. Messages are sent to a topic and delivered to one or more associated subscriptions, depending on filter rules that can be set on a per-subscription basis. The subscriptions can use additional filters to restrict the messages that they want to receive. Messages are sent to a topic in the same way they are sent to a queue, but messages are not received from the topic directly. Instead, they are received from subscriptions. A topic subscription resembles a virtual queue that receives copies of the messages that are sent to the topic. Messages are received from a subscription identically to the way they are received from a queue.
func (Topic) CancelScheduled ¶ added in v0.9.0
CancelScheduled allows for removal of messages that have been handed to the Service Bus broker for later delivery, but have not yet ben enqueued.
func (*Topic) NewSender ¶ added in v0.2.0
NewSender will create a new Sender for sending messages to the queue
func (*Topic) NewSession ¶ added in v0.2.0
func (t *Topic) NewSession(sessionID *string) *TopicSession
NewSession will create a new session based sender for the topic
Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.
func (*Topic) NewSubscription ¶
func (t *Topic) NewSubscription(name string, opts ...SubscriptionOption) (*Subscription, error)
NewSubscription creates a new Topic Subscription client
func (*Topic) NewSubscriptionManager ¶
func (t *Topic) NewSubscriptionManager() *SubscriptionManager
NewSubscriptionManager creates a new SubscriptionManager for a Service Bus Topic
func (*Topic) NewTransferDeadLetter ¶ added in v0.2.0
func (t *Topic) NewTransferDeadLetter() *TransferDeadLetter
NewTransferDeadLetter creates an entity that represents the transfer dead letter sub queue of the topic
Messages will be sent to the transfer dead-letter queue under the following conditions:
- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.
func (*Topic) NewTransferDeadLetterReceiver ¶ added in v0.2.0
func (t *Topic) NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
NewTransferDeadLetterReceiver builds a receiver for the Queue's transfer dead letter queue
Messages will be sent to the transfer dead-letter queue under the following conditions:
- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.
func (Topic) ScheduleAt ¶ added in v0.9.0
func (se Topic) ScheduleAt(ctx context.Context, enqueueTime time.Time, messages ...*Message) ([]int64, error)
ScheduleAt will send a batch of messages to a Queue, schedule them to be enqueued, and return the sequence numbers that can be used to cancel each message.
type TopicDescription ¶
type TopicDescription struct { XMLName xml.Name `xml:"TopicDescription"` BaseEntityDescription DefaultMessageTimeToLive *string `xml:"DefaultMessageTimeToLive,omitempty"` // DefaultMessageTimeToLive - ISO 8601 default message time span to live value. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. MaxSizeInMegabytes *int32 `xml:"MaxSizeInMegabytes,omitempty"` // MaxSizeInMegabytes - The maximum size of the queue in megabytes, which is the size of memory allocated for the queue. Default is 1024. RequiresDuplicateDetection *bool `xml:"RequiresDuplicateDetection,omitempty"` // RequiresDuplicateDetection - A value indicating if this queue requires duplicate detection. DuplicateDetectionHistoryTimeWindow *string `xml:"DuplicateDetectionHistoryTimeWindow,omitempty"` // DuplicateDetectionHistoryTimeWindow - ISO 8601 timeSpan structure that defines the duration of the duplicate detection history. The default value is 10 minutes. EnableBatchedOperations *bool `xml:"EnableBatchedOperations,omitempty"` // EnableBatchedOperations - Value that indicates whether server-side batched operations are enabled. SizeInBytes *int64 `xml:"SizeInBytes,omitempty"` // SizeInBytes - The size of the queue, in bytes. FilteringMessagesBeforePublishing *bool `xml:"FilteringMessagesBeforePublishing,omitempty"` IsAnonymousAccessible *bool `xml:"IsAnonymousAccessible,omitempty"` Status *EntityStatus `xml:"Status,omitempty"` CreatedAt *date.Time `xml:"CreatedAt,omitempty"` UpdatedAt *date.Time `xml:"UpdatedAt,omitempty"` SupportOrdering *bool `xml:"SupportOrdering,omitempty"` AutoDeleteOnIdle *string `xml:"AutoDeleteOnIdle,omitempty"` EnablePartitioning *bool `xml:"EnablePartitioning,omitempty"` EnableSubscriptionPartitioning *bool `xml:"EnableSubscriptionPartitioning,omitempty"` EnableExpress *bool `xml:"EnableExpress,omitempty"` CountDetails *CountDetails `xml:"CountDetails,omitempty"` }
TopicDescription is the content type for Topic management requests
type TopicEntity ¶
type TopicEntity struct { *TopicDescription *Entity }
TopicEntity is the Azure Service Bus description of a Topic for management activities
type TopicManagementOption ¶
type TopicManagementOption func(*TopicDescription) error
TopicManagementOption represents named options for assisting Topic creation
func TopicWithAutoDeleteOnIdle ¶
func TopicWithAutoDeleteOnIdle(window *time.Duration) TopicManagementOption
TopicWithAutoDeleteOnIdle configures the topic to automatically delete after the specified idle interval. The minimum duration is 5 minutes.
func TopicWithBatchedOperations ¶
func TopicWithBatchedOperations() TopicManagementOption
TopicWithBatchedOperations configures the topic to batch server-side operations.
func TopicWithDuplicateDetection ¶
func TopicWithDuplicateDetection(window *time.Duration) TopicManagementOption
TopicWithDuplicateDetection configures the topic to detect duplicates for a given time window. If window is not specified, then it uses the default of 10 minutes.
func TopicWithExpress ¶
func TopicWithExpress() TopicManagementOption
TopicWithExpress configures the topic to hold a message in memory temporarily before writing it to persistent storage.
func TopicWithMaxSizeInMegabytes ¶
func TopicWithMaxSizeInMegabytes(size int) TopicManagementOption
TopicWithMaxSizeInMegabytes configures the maximum size of the topic in megabytes (1 * 1024 - 5 * 1024), which is the size of the memory allocated for the topic. Default is 1 MB (1 * 1024).
size must be between 1024 and 5 * 1024 for the Standard sku and up to 80 * 1024 for Premium sku
func TopicWithMessageTimeToLive ¶
func TopicWithMessageTimeToLive(window *time.Duration) TopicManagementOption
TopicWithMessageTimeToLive configures the topic to set a time to live on messages. This is the duration after which the message expires, starting from when the message is sent to Service Bus. This is the default value used when TimeToLive is not set on a message itself. If nil, defaults to 14 days.
func TopicWithOrdering ¶
func TopicWithOrdering() TopicManagementOption
TopicWithOrdering configures the topic to support ordering of messages.
func TopicWithPartitioning ¶
func TopicWithPartitioning() TopicManagementOption
TopicWithPartitioning configures the topic to be partitioned across multiple message brokers.
type TopicManager ¶
type TopicManager struct {
// contains filtered or unexported fields
}
TopicManager provides CRUD functionality for Service Bus Topics
func (*TopicManager) Delete ¶
func (tm *TopicManager) Delete(ctx context.Context, name string) error
Delete deletes a Service Bus Topic entity by name
func (*TopicManager) Get ¶
func (tm *TopicManager) Get(ctx context.Context, name string) (*TopicEntity, error)
Get fetches a Service Bus Topic entity by name
func (*TopicManager) List ¶
func (tm *TopicManager) List(ctx context.Context, options ...ListTopicsOption) ([]*TopicEntity, error)
List fetches all of the Topics for a Service Bus Namespace
func (TopicManager) Post ¶
func (em TopicManager) Post(ctx context.Context, entityPath string, body []byte, mw ...MiddlewareFunc) (*http.Response, error)
Post performs an HTTP POST for a given entity path and body
func (*TopicManager) Put ¶
func (tm *TopicManager) Put(ctx context.Context, name string, opts ...TopicManagementOption) (*TopicEntity, error)
Put creates or updates a Service Bus Topic
func (TopicManager) TokenProvider ¶ added in v0.2.0
func (em TopicManager) TokenProvider() auth.TokenProvider
TokenProvider generates authorization tokens for communicating with the Service Bus management API
func (TopicManager) Use ¶ added in v0.2.0
func (em TopicManager) Use(mw ...MiddlewareFunc)
Use adds middleware to the middleware mwStack
type TopicOption ¶
TopicOption represents named options for assisting Topic message handling
type TopicSession ¶ added in v0.2.0
type TopicSession struct {
// contains filtered or unexported fields
}
TopicSession wraps Service Bus session functionality over a Topic
func NewTopicSession ¶ added in v0.2.0
func NewTopicSession(builder SenderBuilder, sessionID *string) *TopicSession
NewTopicSession creates a new session receiver to receive from a Service Bus topic.
Microsoft Azure Service Bus sessions enable joint and ordered handling of unbounded sequences of related messages. To realize a FIFO guarantee in Service Bus, use Sessions. Service Bus is not prescriptive about the nature of the relationship between the messages, and also does not define a particular model for determining where a message sequence starts or ends.
func (*TopicSession) Close ¶ added in v0.2.0
func (ts *TopicSession) Close(ctx context.Context) error
Close the underlying connection to Service Bus
func (*TopicSession) Send ¶ added in v0.2.0
func (ts *TopicSession) Send(ctx context.Context, msg *Message) error
Send the message to the queue within a session
func (*TopicSession) SessionID ¶ added in v0.2.0
func (ts *TopicSession) SessionID() *string
SessionID is the identifier for the Service Bus session
type TransferDeadLetter ¶ added in v0.2.0
type TransferDeadLetter struct {
// contains filtered or unexported fields
}
TransferDeadLetter represents a transfer dead letter queue in Azure Service Bus.
Messages will be sent to the transfer dead-letter queue under the following conditions:
- A message passes through more than 3 queues or topics that are chained together.
- The destination queue or topic is disabled or deleted.
- The destination queue or topic exceeds the maximum entity size.
func NewTransferDeadLetter ¶ added in v0.2.0
func NewTransferDeadLetter(builder TransferDeadLetterBuilder) *TransferDeadLetter
NewTransferDeadLetter constructs an instance of DeadLetter which represents a transfer dead letter queue in Azure Service Bus
func (*TransferDeadLetter) Close ¶ added in v0.2.0
func (dl *TransferDeadLetter) Close(ctx context.Context) error
Close the underlying connection to Service Bus
func (*TransferDeadLetter) ReceiveOne ¶ added in v0.2.0
func (dl *TransferDeadLetter) ReceiveOne(ctx context.Context, handler Handler) error
ReceiveOne will receive one message from the dead letter queue
type TransferDeadLetterBuilder ¶ added in v0.2.0
type TransferDeadLetterBuilder interface {
NewTransferDeadLetterReceiver(ctx context.Context, opts ...ReceiverOption) (ReceiveOner, error)
}
TransferDeadLetterBuilder provides the ability to create a new receiver addressed to a given entity's transfer dead letter queue.
type TrueFilter ¶ added in v0.2.0
type TrueFilter struct{}
TrueFilter represents a always true sql expression which will accept all messages
func (TrueFilter) ToFilterDescription ¶ added in v0.2.0
func (tf TrueFilter) ToFilterDescription() FilterDescription
ToFilterDescription will transform the TrueFilter into a FilterDescription
Source Files ¶
- action.go
- amqphandler.go
- batch.go
- batch_disposition.go
- deadletter.go
- entity.go
- errors.go
- filter.go
- handler.go
- iterator.go
- message.go
- message_session.go
- mgmt.go
- namespace.go
- operation_constants.go
- queue.go
- queue_manager.go
- receiver.go
- rpc.go
- sender.go
- session.go
- subscription.go
- subscription_manager.go
- topic.go
- topic_manager.go
- tracing.go