azuresb

package
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2019 License: Apache-2.0 Imports: 22 Imported by: 25

Documentation

Overview

Package azuresb provides an implementation of pubsub using Azure Service Bus Topic and Subscription. See https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-messaging-overview for an overview.

URLs

For pubsub.OpenTopic and pubsub.OpenSubscription, azuresb registers for the scheme "azuresb". The default URL opener will use a Service Bus Connection String based on the environment variable "SERVICEBUS_CONNECTION_STRING". To customize the URL opener, or for more details on the URL format, see URLOpener. See https://godoc.org/gocloud.dev#hdr-URLs for background information.

Message Delivery Semantics

Azure ServiceBus supports at-least-once semantics in the default Peek-Lock mode; applications must call Message.Ack after processing a message, or it will be redelivered. However, it also supports a Receive-Delete mode, which essentially auto-acks a message when it is delivered, resulting in at-most-once semantics. Use SubscriberOptions.AckFuncForReceiveAndDelete to choose between the two. See https://godoc.org/gocloud.dev/pubsub#hdr-At_most_once_and_At_least_once_Delivery for more background.

As

azuresb exposes the following types for As:

  • Topic: *servicebus.Topic
  • Subscription: *servicebus.Subscription
  • Message: *servicebus.Message
  • Error: common.Retryable
Example (AckFuncForReceiveAndDelete)
package main

import (
	"context"
	"log"
	"os"

	servicebus "github.com/Azure/azure-service-bus-go"
	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/azuresb"
)

func main() {

	ctx := context.Background()
	// See docs below on how to provision an Azure Service Bus Namespace and obtaining the connection string.
	// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
	connString := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	topicName := "test-topic"
	subscriptionName := "test-sub"

	if connString == "" {
		log.Fatal("Service Bus ConnectionString is not set")
	}

	// Construct a Service Bus Namespace from a SAS Token.
	// See https://godoc.org/github.com/Azure/azure-service-bus-go#Namespace.
	ns, err := azuresb.NewNamespaceFromConnectionString(connString)
	if err != nil {
		log.Fatal(err)
	}

	// Construct a Service Bus Topic for a topicName associated with a NameSpace.
	// See https://godoc.org/github.com/Azure/azure-service-bus-go#Topic.
	sbTopic, err := azuresb.NewTopic(ns, topicName, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer sbTopic.Close(ctx)

	// Construct Receiver to AutoDelete messages.
	// See https://godoc.org/github.com/Azure/azure-service-bus-go#SubscriptionWithReceiveAndDelete.
	var opts []servicebus.SubscriptionOption
	opts = append(opts, servicebus.SubscriptionWithReceiveAndDelete())

	// Construct a Service Bus Subscription which is a child to a Service Bus Topic.
	// See https://godoc.org/github.com/Azure/azure-service-bus-go#Topic.NewSubscription.
	sbSub, err := azuresb.NewSubscription(sbTopic, subscriptionName, opts)
	if err != nil {
		log.Fatal(err)
	}
	defer sbSub.Close(ctx)

	// This package accommodates both kinds of systems. If your application uses
	// at-least-once providers, it should always call Message.Ack. If your application
	// only uses at-most-once providers (ReceiveAndDeleteMode), it may call Message.Ack, but does not need to. To avoid
	// calling message.Ack, set option.AckFuncForReceiveAndDelete to a no-op as shown below.
	//
	// For more information on Service Bus ReceiveMode, see https://godoc.org/github.com/Azure/azure-service-bus-go#SubscriptionWithReceiveAndDelete.
	noop := func() {}
	subOpts := &azuresb.SubscriptionOptions{
		AckFuncForReceiveAndDelete: noop,
	}
	// Construct a *pubsub.Subscription for a given Service Bus NameSpace and Topic.
	s, err := azuresb.OpenSubscription(ctx, ns, sbTopic, sbSub, subOpts)
	if err != nil {
		log.Fatal(err)
	}
	defer s.Shutdown(ctx)

	// Construct a *pubsub.Topic.
	t, err := azuresb.OpenTopic(ctx, sbTopic, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer t.Shutdown(ctx)

	// Send *pubsub.Message from *pubsub.Topic backed by Azure Service Bus.
	err = t.Send(ctx, &pubsub.Message{
		Body: []byte("example message"),
		Metadata: map[string]string{
			"Priority": "1",
		},
	})
	if err != nil {
		log.Fatal(err)
	}

	// Receive a message from the *pubsub.Subscription backed by Service Bus.
	msg, err := s.Receive(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// Ack will redirect to the AckOverride option (if provided), otherwise the driver Ack will be called.
	msg.Ack()
}
Output:

Example (OpenFromURL)
package main

import (
	"context"

	"gocloud.dev/pubsub"
)

func main() {
	ctx := context.Background()

	// OpenTopic creates a *pubsub.Topic from a URL.
	// This URL will open the topic "mytopic" using a connection string
	// from the environment variable SERVICEBUS_CONNECTION_STRING.
	t, err := pubsub.OpenTopic(ctx, "azuresb://mytopic")

	// Similarly, OpenSubscription creates a *pubsub.Subscription from a URL.
	// This URL will open the subscription "mysub" for the topic "mytopic".
	s, err := pubsub.OpenSubscription(ctx, "azuresb://mytopic?subscription=mysub")
	_, _, _ = t, s, err
}
Output:

Index

Examples

Constants

View Source
const Scheme = "azuresb"

Scheme is the URL scheme azuresb registers its URLOpeners under on pubsub.DefaultMux.

Variables

Set holds Wire providers for this package.

Functions

func NewNamespaceFromConnectionString

func NewNamespaceFromConnectionString(connectionString string) (*servicebus.Namespace, error)

NewNamespaceFromConnectionString returns a *servicebus.Namespace from a Service Bus connection string. https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues

func NewSubscription

func NewSubscription(parentTopic *servicebus.Topic, subscriptionName string, opts []servicebus.SubscriptionOption) (*servicebus.Subscription, error)

NewSubscription returns a *servicebus.Subscription associated with a Service Bus Topic.

func NewTopic

func NewTopic(ns *servicebus.Namespace, topicName string, opts []servicebus.TopicOption) (*servicebus.Topic, error)

NewTopic returns a *servicebus.Topic associated with a Service Bus Namespace.

func OpenSubscription

func OpenSubscription(ctx context.Context, parentNamespace *servicebus.Namespace, parentTopic *servicebus.Topic, sbSubscription *servicebus.Subscription, opts *SubscriptionOptions) (*pubsub.Subscription, error)

OpenSubscription initializes a pubsub Subscription on a given Service Bus Subscription and its parent Service Bus Topic.

Example
package main

import (
	"context"
	"log"
	"os"

	"gocloud.dev/pubsub/azuresb"
)

func main() {
	ctx := context.Background()
	// See docs below on how to provision an Azure Service Bus Namespace and obtaining the connection string.
	// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
	connString := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	topicName := "test-topic"
	subscriptionName := "test-sub"

	if connString == "" {
		log.Fatal("Service Bus ConnectionString is not set")
	}

	// Construct a Service Bus Namespace from a SAS Token.
	// See https://godoc.org/github.com/Azure/azure-service-bus-go#Namespace.
	ns, err := azuresb.NewNamespaceFromConnectionString(connString)
	if err != nil {
		log.Fatal(err)
	}

	// Construct a Service Bus Topic for a topicName associated with a NameSpace.
	// See https://godoc.org/github.com/Azure/azure-service-bus-go#Topic.
	sbTopic, err := azuresb.NewTopic(ns, topicName, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer sbTopic.Close(ctx)

	// Construct a Service Bus Subscription which is a child to a Service Bus Topic.
	// See https://godoc.org/github.com/Azure/azure-service-bus-go#Topic.NewSubscription.
	sbSub, err := azuresb.NewSubscription(sbTopic, subscriptionName, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer sbSub.Close(ctx)

	// Construct a *pubsub.Subscription for a given Service Bus NameSpace and Topic.
	s, err := azuresb.OpenSubscription(ctx, ns, sbTopic, sbSub, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer s.Shutdown(ctx)

	// Receive a message from the *pubsub.Subscription backed by Service Bus.
	msg, err := s.Receive(ctx)
	if err != nil {
		log.Fatal(err)
	}

	// Acknowledge the message, this operation issues a 'Complete' disposition on the Service Bus message.
	// See https://godoc.org/github.com/Azure/azure-service-bus-go#Message.Complete.
	msg.Ack()
}
Output:

func OpenTopic

func OpenTopic(ctx context.Context, sbTopic *servicebus.Topic, opts *TopicOptions) (*pubsub.Topic, error)

OpenTopic initializes a pubsub Topic on a given Service Bus Topic.

Example
package main

import (
	"context"
	"log"
	"os"

	"gocloud.dev/pubsub"
	"gocloud.dev/pubsub/azuresb"
)

func main() {

	ctx := context.Background()
	// See docs below on how to provision an Azure Service Bus Namespace and obtaining the connection string.
	// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
	connString := os.Getenv("SERVICEBUS_CONNECTION_STRING")
	topicName := "test-topic"

	if connString == "" {
		log.Fatal("Service Bus ConnectionString is not set")
	}

	// Construct a Service Bus Namespace from a SAS Token.
	// See https://godoc.org/github.com/Azure/azure-service-bus-go#Namespace.
	ns, err := azuresb.NewNamespaceFromConnectionString(connString)
	if err != nil {
		log.Fatal(err)
	}

	// Construct a Service Bus Topic for a topicName associated with a NameSpace.
	// See https://godoc.org/github.com/Azure/azure-service-bus-go#Topic.
	sbTopic, err := azuresb.NewTopic(ns, topicName, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer sbTopic.Close(ctx)

	// Construct a *pubsub.Topic.
	t, err := azuresb.OpenTopic(ctx, sbTopic, nil)
	if err != nil {
		log.Fatal(err)
	}
	defer t.Shutdown(ctx)

	// Construct a *pubsub.Message.
	msg := &pubsub.Message{
		Body: []byte("example message"),
		Metadata: map[string]string{
			"Priority": "1",
		},
	}

	// Send *pubsub.Message from *pubsub.Topic backed by Azure Service Bus.
	err = t.Send(ctx, msg)
	if err != nil {
		log.Fatal(err)
	}
}
Output:

Types

type SubscriptionOptions

type SubscriptionOptions struct {
	// If nil, the subscription MUST be in Peek-Lock mode. The Ack method must be called on each message
	// to complete it, otherwise you run the risk of deadlettering messages.
	// If non-nil, the subscription MUST be in Receive-and-Delete mode, and this function will be called
	// whenever Ack is called on a message.
	// See the "At-most-once vs. At-least-once Delivery" section in the pubsub package documentation.
	AckFuncForReceiveAndDelete func()
}

SubscriptionOptions will contain configuration for subscriptions.

type TopicOptions

type TopicOptions struct{}

TopicOptions provides configuration options for an Azure SB Topic.

type URLOpener

type URLOpener struct {
	// ConnectionString is the Service Bus connection string (required).
	// https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-dotnet-get-started-with-queues
	ConnectionString string

	// Options passed when creating the ServiceBus Topic/Subscription.
	ServiceBusTopicOptions        []servicebus.TopicOption
	ServiceBusSubscriptionOptions []servicebus.SubscriptionOption

	// TopicOptions specifies the options to pass to OpenTopic.
	TopicOptions TopicOptions
	// SubscriptionOptions specifies the options to pass to OpenSubscription.
	SubscriptionOptions SubscriptionOptions
}

URLOpener opens Azure Service Bus URLs like "azuresb://mytopic" for topics or "azuresb://mytopic?subscription=mysubscription" for subscriptions.

  • The URL's host+path is used as the topic name.
  • For subscriptions, the subscription name must be provided in the "subscription" query parameter.

No other query parameters are supported.

func (*URLOpener) OpenSubscriptionURL

func (o *URLOpener) OpenSubscriptionURL(ctx context.Context, u *url.URL) (*pubsub.Subscription, error)

OpenSubscriptionURL opens a pubsub.Subscription based on u.

func (*URLOpener) OpenTopicURL

func (o *URLOpener) OpenTopicURL(ctx context.Context, u *url.URL) (*pubsub.Topic, error)

OpenTopicURL opens a pubsub.Topic based on u.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL