azeventhubs

package module
v1.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: MIT Imports: 22 Imported by: 19

README

Azure Event Hubs Client Module for Go

Azure Event Hubs is a big data streaming platform and event ingestion service from Microsoft. For more information about Event Hubs see: link.

Use the client library github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs in your application to:

  • Send events to an event hub.
  • Consume events from an event hub.

Key links:

Getting started

Install the package

Install the Azure Event Hubs client module for Go with go get:

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Prerequisites
Authenticate the client

Event Hub clients are created using a TokenCredential from the Azure Identity package, like DefaultAzureCredential. You can also create a client using a connection string.

Using a service principal
  • ConsumerClient: link
  • ProducerClient: link
Using a connection string
  • ConsumerClient: link
  • ProducerClient: link

Key concepts

An Event Hub namespace can have multiple event hubs. Each event hub, in turn, contains partitions which store events.

Events are published to an event hub using an event publisher. In this package, the event publisher is the ProducerClient

Events can be consumed from an event hub using an event consumer. In this package there are two types for consuming events:

  • The basic event consumer is the, in the ConsumerClient. This consumer is useful if you already known which partitions you want to receive from.
  • A distributed event consumer, which uses Azure Blobs for checkpointing and coordination. This is implemented in the Processor. This is useful when you want to have the partition assignment be dynamically chosen, and balanced with other Processor instances.

For more information about Event Hubs features and terminology can be found here: link

Examples

Examples for various scenarios can be found on pkg.go.dev or in the example*_test.go files in our GitHub repo for azeventhubs.

Troubleshooting

Logging

This module uses the classification-based logging implementation in azcore. To enable console logging for all SDK modules, set the environment variable AZURE_SDK_GO_LOGGING to all.

Use the azcore/log package to control log event output or to enable logs for azeventhubs only. For example:

import (
  "fmt"
  azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
)

// print log output to stdout
azlog.SetListener(func(event azlog.Event, s string) {
    fmt.Printf("[%s] %s\n", event, s)
})

// pick the set of events to log
azlog.SetEvents(
  azeventhubs.EventConn,
  azeventhubs.EventAuth,
  azeventhubs.EventProducer,
  azeventhubs.EventConsumer,
)

Contributing

For details on contributing to this repository, see the contributing guide.

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Many people all over the world have helped make this project better. You'll want to check out:

Reporting security issues and security bugs

Security issues and bugs should be reported privately, via email, to the Microsoft Security Response Center (MSRC) secure@microsoft.com. You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Further information, including the MSRC PGP key, can be found in the Security TechCenter.

License

Azure SDK for Go is licensed under the MIT license.

Impressions

Documentation

Overview

Example (ConsumingEventsUsingConsumerClient)

Shows how to start consuming events in partitions in an Event Hub using the ConsumerClient.

If you have an Azure Storage account you can use the Processor type instead, which will handle distributing partitions between multiple consumers and storing progress using checkpoints. See example_consuming_with_checkpoints_test.go for an example.

package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	eventHubNamespace := os.Getenv("EVENTHUB_NAMESPACE") // <ex: myeventhubnamespace.servicebus.windows.net>
	eventHubName := os.Getenv("EVENTHUB_NAME")
	partitionID := os.Getenv("EVENTHUB_PARTITION_ID")

	fmt.Printf("Event Hub Namespace: %s, hubname: %s\n", eventHubNamespace, eventHubName)

	defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

	if err != nil {
		panic(err)
	}

	// Can also use a connection string:
	//
	// consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, eventHubName, azeventhubs.DefaultConsumerGroup, nil)
	//
	consumerClient, err := azeventhubs.NewConsumerClient(eventHubNamespace, eventHubName, azeventhubs.DefaultConsumerGroup, defaultAzureCred, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	partitionClient, err := consumerClient.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{
		StartPosition: azeventhubs.StartPosition{
			Earliest: to.Ptr(true),
		},
	})

	if err != nil {
		panic(err)
	}

	defer partitionClient.Close(context.TODO())

	// Will wait up to 1 minute for 100 events. If the context is cancelled (or expires)
	// you'll get any events that have been collected up to that point.
	receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Minute)
	events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
	cancel()

	if err != nil && !errors.Is(err, context.DeadlineExceeded) {
		panic(err)
	}

	for _, event := range events {
		// We're assuming the Body is a byte-encoded string. EventData.Body supports any payload
		// that can be encoded to []byte.
		fmt.Printf("Event received with body '%s'\n", string(event.Body))
	}

	fmt.Printf("Done receiving events\n")
}
Output:

Example (ConsumingEventsWithCheckpoints)

Shows how to use the Processor type, using a ConsumerClient and CheckpointStore.

The Processor type acts as a load balancer, ensuring that partitions are divided up evenly amongst active Processor instances. It also allows storing (and restoring) checkpoints of progress.

NOTE: If you want to manually allocate partitions or to have more control over the process you can use the ConsumerClient. See example_consuming_events_test.go for an example.

package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

// Shows how to use the [Processor] type, using a [ConsumerClient] and [CheckpointStore].
//
// The Processor type acts as a load balancer, ensuring that partitions are divided up evenly
// amongst active Processor instances. It also allows storing (and restoring) checkpoints of progress.
//
// NOTE: If you want to manually allocate partitions or to have more control over the process you can use
// the [ConsumerClient]. See [example_consuming_events_test.go] for an example.
//
// [example_consuming_events_test.go]: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_events_test.go
func main() {
	// The Processor makes it simpler to do distributed consumption of an Event Hub.
	// It automatically coordinates with other Processor instances to ensure balanced
	// allocation of partitions and tracks status, durably, in a CheckpointStore.
	//
	// The built-in checkpoint store (available in the `azeventhubs/checkpoints` package) uses
	// Azure Blob storage.

	eventHubConnectionString := os.Getenv("EVENTHUB_CONNECTION_STRING")
	eventHubName := os.Getenv("EVENTHUB_NAME")

	storageConnectionString := os.Getenv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING")
	storageContainerName := os.Getenv("CHECKPOINTSTORE_STORAGE_CONTAINER_NAME")

	consumerClient, checkpointStore, err := createClientsForExample(eventHubConnectionString, eventHubName, storageConnectionString, storageContainerName)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// Create the Processor
	//
	// The Processor handles load balancing with other Processor instances, running in separate
	// processes or even on separate machines. Each one will use the checkpointStore to coordinate
	// state and ownership, dynamically.
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	// Run in the background, launching goroutines to process each partition
	go dispatchPartitionClients(processor)

	// Run the load balancer. The dispatchPartitionClients goroutine (launched above)
	// will receive and dispatch ProcessorPartitionClients as partitions are claimed.
	//
	// Stopping the processor is as simple as canceling the context that you passed
	// in to Run.
	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func dispatchPartitionClients(processor *azeventhubs.Processor) {
	for {
		processorPartitionClient := processor.NextPartitionClient(context.TODO())

		if processorPartitionClient == nil {
			// Processor has stopped
			break
		}

		go func() {
			if err := processEventsForPartition(processorPartitionClient); err != nil {
				panic(err)
			}
		}()
	}
}

// processEventsForPartition shows the typical pattern for processing a partition.
func processEventsForPartition(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	// 1. [BEGIN] Initialize any partition specific resources for your application.
	// 2. [CONTINUOUS] Loop, calling ReceiveEvents() and UpdateCheckpoint().
	// 3. [END] Cleanup any resources.

	defer func() {
		// 3/3 [END] Do cleanup here, like shutting down database clients
		// or other resources used for processing this partition.
		shutdownPartitionResources(partitionClient)
	}()

	// 1/3 [BEGIN] Initialize any partition specific resources for your application.
	if err := initializePartitionResources(partitionClient.PartitionID()); err != nil {
		return err
	}

	// 2/3 [CONTINUOUS] Receive events, checkpointing as needed using UpdateCheckpoint.
	for {
		// Wait up to a minute for 100 events, otherwise returns whatever we collected during that time.
		receiveCtx, cancelReceive := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		cancelReceive()

		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			var eventHubError *azeventhubs.Error

			if errors.As(err, &eventHubError) && eventHubError.Code == azeventhubs.ErrorCodeOwnershipLost {
				return nil
			}

			return err
		}

		if len(events) == 0 {
			continue
		}

		fmt.Printf("Received %d event(s)\n", len(events))

		for _, event := range events {
			fmt.Printf("Event received with body %v\n", event.Body)
		}

		// Updates the checkpoint with the latest event received. If processing needs to restart
		// it will restart from this point, automatically.
		if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1], nil); err != nil {
			return err
		}
	}
}

func initializePartitionResources(partitionID string) error {
	// initialize things that might be partition specific, like a
	// database connection.
	return nil
}

func shutdownPartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	// Each PartitionClient holds onto an external resource and should be closed if you're
	// not processing them anymore.
	defer partitionClient.Close(context.TODO())
}

func createClientsForExample(eventHubConnectionString, eventHubName, storageConnectionString, storageContainerName string) (*azeventhubs.ConsumerClient, azeventhubs.CheckpointStore, error) {
	// NOTE: the storageContainerName must exist before the checkpoint store can be used.
	azBlobContainerClient, err := container.NewClientFromConnectionString(storageConnectionString, storageContainerName, nil)

	if err != nil {
		return nil, nil, err
	}

	checkpointStore, err := checkpoints.NewBlobStore(azBlobContainerClient, nil)

	if err != nil {
		return nil, nil, err
	}

	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(eventHubConnectionString, eventHubName, azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		return nil, nil, err
	}

	return consumerClient, checkpointStore, nil
}
Output:

Example (EnableLogging)
package main

import (
	"fmt"

	azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// print log output to stdout
	azlog.SetListener(printLoggedEvent)

	// pick the set of events to log
	azlog.SetEvents(
		azeventhubs.EventConn,
		azeventhubs.EventAuth,
		azeventhubs.EventProducer,
		azeventhubs.EventConsumer,
	)

	fmt.Printf("Logging enabled\n")

}

func printLoggedEvent(event azlog.Event, s string) {
	fmt.Printf("[%s] %s\n", event, s)
}
Output:

Logging enabled
Example (MigrateCheckpoints)

Shows how to migrate from the older `github.com/Azure/azure-event-hubs-go` checkpointer to to the format used by this package, `github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints/BlobStore`

NOTE: This example is not safe to run while either the old or new checkpoint store is in-use as it doesn't respect locking or ownership.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"os"
	"strconv"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

type LegacyCheckpoint struct {
	PartitionID string `json:"partitionID"`
	Epoch       int    `json:"epoch"`
	Owner       string `json:"owner"`
	Checkpoint  struct {
		Offset         string `json:"offset"`
		SequenceNumber int64  `json:"sequenceNumber"`
		EnqueueTime    string `json:"enqueueTime"` // ": "0001-01-01T00:00:00Z"
	} `json:"checkpoint"`
}

// Shows how to migrate from the older `github.com/Azure/azure-event-hubs-go` checkpointer to to
// the format used by this package, `github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints/BlobStore`
//
// NOTE: This example is not safe to run while either the old or new checkpoint store is in-use as it doesn't
// respect locking or ownership.
func main() {
	// Azure Event Hubs connection string. You can get this from the Azure Portal.
	// For example: youreventhub.servicebus.windows.net
	var EventHubNamespace = os.Getenv("EVENTHUB_NAMESPACE")

	// Name of your Event Hub that these checkpoints reference.
	var EventHubName = os.Getenv("EVENTHUB_NAME")

	// Name of your Event Hub consumer group
	// Example: $Default
	var EventHubConsumerGroup = os.Getenv("EVENTHUB_CONSUMER_GROUP")

	// Azure Storage account connection string. You can get this from the Azure Portal.
	// For example: DefaultEndpointsProtocol=https;AccountName=accountname;AccountKey=account-key;EndpointSuffix=core.windows.net
	var StorageConnectionString = os.Getenv("STORAGE_CONNECTION_STRING")

	// Optional: If you used `eventhub.WithPrefixInBlobPath()` configuration option for your Event Processor Host
	// then you'll need to set this value.
	//
	// NOTE: This is no longer needed with the new checkpoint store as it automatically makes the path unique
	// for each combination of eventhub + hubname + consumergroup + partition.
	var BlobPrefix = os.Getenv("OLD_STORAGE_BLOB_PREFIX")

	// Name of the checkpoint store's Azure Storage container.
	var OldStorageContainerName = os.Getenv("OLD_STORAGE_CONTAINER_NAME")

	// Name of the Azure Storage container to place new checkpoints in.
	var NewStorageContainerName = os.Getenv("NEW_STORAGE_CONTAINER_NAME")

	if EventHubNamespace == "" || EventHubName == "" || EventHubConsumerGroup == "" ||
		StorageConnectionString == "" || OldStorageContainerName == "" || NewStorageContainerName == "" {
		fmt.Printf("Skipping migration, missing parameters\n")
		return
	}

	blobClient, err := azblob.NewClientFromConnectionString(StorageConnectionString, nil)

	if err != nil {
		panic(err)
	}

	oldCheckpoints, err := loadOldCheckpoints(blobClient, OldStorageContainerName, BlobPrefix)

	if err != nil {
		panic(err)
	}

	newCheckpointStore, err := checkpoints.NewBlobStore(blobClient.ServiceClient().NewContainerClient(NewStorageContainerName), nil)

	if err != nil {
		panic(err)
	}

	for _, oldCheckpoint := range oldCheckpoints {
		newCheckpoint := azeventhubs.Checkpoint{
			ConsumerGroup:           EventHubConsumerGroup,
			EventHubName:            EventHubName,
			FullyQualifiedNamespace: EventHubNamespace,
			PartitionID:             oldCheckpoint.PartitionID,
		}

		offset, err := strconv.ParseInt(oldCheckpoint.Checkpoint.Offset, 10, 64)

		if err != nil {
			panic(err)
		}

		newCheckpoint.Offset = &offset
		newCheckpoint.SequenceNumber = &oldCheckpoint.Checkpoint.SequenceNumber

		if err := newCheckpointStore.SetCheckpoint(context.Background(), newCheckpoint, nil); err != nil {
			panic(err)
		}
	}
}

func loadOldCheckpoints(blobClient *azblob.Client, containerName string, customBlobPrefix string) ([]*LegacyCheckpoint, error) {
	blobPrefix := &customBlobPrefix

	if customBlobPrefix == "" {
		blobPrefix = nil
	}

	pager := blobClient.NewListBlobsFlatPager(containerName, &container.ListBlobsFlatOptions{
		Prefix: blobPrefix,
	})

	var checkpoints []*LegacyCheckpoint

	for pager.More() {
		page, err := pager.NextPage(context.Background())

		if err != nil {
			return nil, err
		}

		for _, item := range page.Segment.BlobItems {
			buff := [4000]byte{}

			len, err := blobClient.DownloadBuffer(context.Background(), containerName, *item.Name, buff[:], nil)

			if err != nil {
				return nil, err
			}

			var legacyCheckpoint *LegacyCheckpoint

			if err := json.Unmarshal(buff[0:len], &legacyCheckpoint); err != nil {
				return nil, err
			}

			checkpoints = append(checkpoints, legacyCheckpoint)
		}
	}

	return checkpoints, nil
}
Output:

Example (ProducingEventsUsingProducerClient)

Shows how to send events to an Event Hub partition using the ProducerClient and EventDataBatch.

package main

import (
	"context"
	"errors"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

// Shows how to send events to an Event Hub partition using the [ProducerClient]
// and [EventDataBatch].
func main() {
	eventHubNamespace := os.Getenv("EVENTHUB_NAMESPACE") // <ex: myeventhubnamespace.servicebus.windows.net>
	eventHubName := os.Getenv("EVENTHUB_NAME")

	defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

	if err != nil {
		panic(err)
	}

	// Can also use a connection string:
	//
	// producerClient, err := azeventhubs.NewProducerClientFromConnectionString(connectionString, eventHubName, nil)
	//
	producerClient, err := azeventhubs.NewProducerClient(eventHubNamespace, eventHubName, defaultAzureCred, nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	events := createEventsForSample()

	newBatchOptions := &azeventhubs.EventDataBatchOptions{
		// The options allow you to control the size of the batch, as well as the partition it will get sent to.

		// PartitionID can be used to target a specific partition ID.
		// specific partition ID.
		//
		// PartitionID: partitionID,

		// PartitionKey can be used to ensure that messages that have the same key
		// will go to the same partition without requiring your application to specify
		// that partition ID.
		//
		// PartitionKey: partitionKey,

		//
		// Or, if you leave both PartitionID and PartitionKey nil, the service will choose a partition.
	}

	// Creates an EventDataBatch, which you can use to pack multiple events together, allowing for efficient transfer.
	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
			if batch.NumEvents() == 0 {
				// This one event is too large for this batch, even on its own. No matter what we do it
				// will not be sendable at its current size.
				panic(err)
			}

			// This batch is full - we can send it and create a new one and continue
			// packaging and sending events.
			if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
				panic(err)
			}

			// create the next batch we'll use for events, ensuring that we use the same options
			// each time so all the messages go the same target.
			tmpBatch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

			if err != nil {
				panic(err)
			}

			batch = tmpBatch

			// rewind so we can retry adding this event to a batch
			i--
		} else if err != nil {
			panic(err)
		}
	}

	// if we have any events in the last batch, send it
	if batch.NumEvents() > 0 {
		if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
			panic(err)
		}
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}
Output:

Index

Examples

Constants

View Source
const (
	// EventConn is used whenever we create a connection or any links (ie: producers, consumers).
	EventConn log.Event = exported.EventConn

	// EventAuth is used when we're doing authentication/claims negotiation.
	EventAuth log.Event = exported.EventAuth

	// EventProducer represents operations that happen on Producers.
	EventProducer log.Event = exported.EventProducer

	// EventConsumer represents operations that happen on Consumers.
	EventConsumer log.Event = exported.EventConsumer
)
View Source
const DefaultConsumerGroup = "$Default"

DefaultConsumerGroup is the name of the default consumer group in the Event Hubs service.

Variables

View Source
var ErrEventDataTooLarge = errors.New("the EventData could not be added because it is too large for the batch")

ErrEventDataTooLarge is returned when a message cannot fit into a batch when using the azeventhubs.EventDataBatch.AddEventData function.

Functions

This section is empty.

Types

type AMQPAnnotatedMessage added in v0.2.0

type AMQPAnnotatedMessage struct {
	// ApplicationProperties corresponds to the "application-properties" section of an AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	ApplicationProperties map[string]any

	// Body represents the body of an AMQP message.
	Body AMQPAnnotatedMessageBody

	// DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	DeliveryAnnotations map[any]any

	// DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame
	// for this message.
	DeliveryTag []byte

	// Footer is the transport footers for this AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	Footer map[any]any

	// Header is the transport headers for this AMQP message.
	Header *AMQPAnnotatedMessageHeader

	// MessageAnnotations corresponds to the message-annotations section of an AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	MessageAnnotations map[any]any

	// Properties corresponds to the properties section of an AMQP message.
	Properties *AMQPAnnotatedMessageProperties
}

AMQPAnnotatedMessage represents the AMQP message, as received from Event Hubs. For details about these properties, refer to the AMQP specification:

https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format

Some fields in this struct are typed 'any', which means they will accept AMQP primitives, or in some cases slices and maps.

AMQP simple types include: - int (any size), uint (any size) - float (any size) - string - bool - time.Time

type AMQPAnnotatedMessageBody added in v0.2.0

type AMQPAnnotatedMessageBody struct {
	// Data is encoded/decoded as multiple data sections in the body.
	Data [][]byte

	// Sequence is encoded/decoded as one or more amqp-sequence sections in the body.
	//
	// The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	Sequence [][]any

	// Value is encoded/decoded as the amqp-value section in the body.
	//
	// The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPAnnotatedMessage,
	// as well as slices or maps of AMQP simple types.
	Value any
}

AMQPAnnotatedMessageBody represents the body of an AMQP message. Only one of these fields can be used a a time. They are mutually exclusive.

type AMQPAnnotatedMessageHeader added in v0.2.0

type AMQPAnnotatedMessageHeader struct {
	// DeliveryCount is the number of unsuccessful previous attempts to deliver this message.
	// It corresponds to the 'delivery-count' property.
	DeliveryCount uint32

	// Durable corresponds to the 'durable' property.
	Durable bool

	// FirstAcquirer corresponds to the 'first-acquirer' property.
	FirstAcquirer bool

	// Priority corresponds to the 'priority' property.
	Priority uint8

	// TTL corresponds to the 'ttl' property.
	TTL time.Duration
}

AMQPAnnotatedMessageHeader carries standard delivery details about the transfer of a message. See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header for more details.

type AMQPAnnotatedMessageProperties added in v0.2.0

type AMQPAnnotatedMessageProperties struct {
	// AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property.
	AbsoluteExpiryTime *time.Time

	// ContentEncoding corresponds to the 'content-encoding' property.
	ContentEncoding *string

	// ContentType corresponds to the 'content-type' property
	ContentType *string

	// CorrelationID corresponds to the 'correlation-id' property.
	// The type of CorrelationID can be a uint64, UUID, []byte, or a string
	CorrelationID any

	// CreationTime corresponds to the 'creation-time' property.
	CreationTime *time.Time

	// GroupID corresponds to the 'group-id' property.
	GroupID *string

	// GroupSequence corresponds to the 'group-sequence' property.
	GroupSequence *uint32

	// MessageID corresponds to the 'message-id' property.
	// The type of MessageID can be a uint64, UUID, []byte, or string
	MessageID any

	// ReplyTo corresponds to the 'reply-to' property.
	ReplyTo *string

	// ReplyToGroupID corresponds to the 'reply-to-group-id' property.
	ReplyToGroupID *string

	// Subject corresponds to the 'subject' property.
	Subject *string

	// To corresponds to the 'to' property.
	To *string

	// UserID corresponds to the 'user-id' property.
	UserID []byte
}

AMQPAnnotatedMessageProperties represents the properties of an AMQP message. See here for more details: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties

type AddEventDataOptions

type AddEventDataOptions struct {
}

AddEventDataOptions contains optional parameters for the AddEventData function.

type Checkpoint added in v0.1.1

type Checkpoint struct {
	ConsumerGroup           string
	EventHubName            string
	FullyQualifiedNamespace string
	PartitionID             string

	Offset         *int64 // the last succesfully processed Offset.
	SequenceNumber *int64 // the last succesfully processed SequenceNumber.
}

Checkpoint tracks the last succesfully processed event in a partition.

type CheckpointStore added in v0.1.1

type CheckpointStore interface {
	// ClaimOwnership attempts to claim ownership of the partitions in partitionOwnership and returns
	// the actual partitions that were claimed.
	ClaimOwnership(ctx context.Context, partitionOwnership []Ownership, options *ClaimOwnershipOptions) ([]Ownership, error)

	// ListCheckpoints lists all the available checkpoints.
	ListCheckpoints(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListCheckpointsOptions) ([]Checkpoint, error)

	// ListOwnership lists all ownerships.
	ListOwnership(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListOwnershipOptions) ([]Ownership, error)

	// SetCheckpoint updates a specific checkpoint with a sequence and offset.
	SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error
}

CheckpointStore is used by multiple consumers to coordinate progress and ownership for partitions.

type ClaimOwnershipOptions added in v0.1.1

type ClaimOwnershipOptions struct {
}

ClaimOwnershipOptions contains optional parameters for the ClaimOwnership function

type ConnectionStringProperties added in v0.5.0

type ConnectionStringProperties = exported.ConnectionStringProperties

ConnectionStringProperties are the properties of a connection string as returned by ParseConnectionString.

func ParseConnectionString added in v0.5.0

func ParseConnectionString(connStr string) (ConnectionStringProperties, error)

ParseConnectionString takes a connection string from the Azure portal and returns the parsed representation.

There are two supported formats:

  1. Connection strings generated from the portal (or elsewhere) that contain an embedded key and keyname.
  2. A connection string with an embedded SharedAccessSignature: Endpoint=sb://<sb>.servicebus.windows.net;SharedAccessSignature=SharedAccessSignature sr=<sb>.servicebus.windows.net&sig=<base64-sig>&se=<expiry>&skn=<keyname>"

type ConsumerClient

type ConsumerClient struct {
	// contains filtered or unexported fields
}

ConsumerClient can create PartitionClient instances, which can read events from a partition.

func NewConsumerClient

func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, consumerGroup string, credential azcore.TokenCredential, options *ConsumerClientOptions) (*ConsumerClient, error)

NewConsumerClient creates a ConsumerClient which uses an azcore.TokenCredential for authentication. You MUST call ConsumerClient.Close on this client to avoid leaking resources.

The fullyQualifiedNamespace is the Event Hubs namespace name (ex: myeventhub.servicebus.windows.net) The credential is one of the credentials in the azidentity package.

Example
package main

import (
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient

func main() {
	// `DefaultAzureCredential` tries several common credential types. For more credential types
	// see this link: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types.
	defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

	if err != nil {
		panic(err)
	}

	consumerClient, err = azeventhubs.NewConsumerClient("<ex: myeventhubnamespace.servicebus.windows.net>", "eventhub-name", azeventhubs.DefaultConsumerGroup, defaultAzureCred, nil)

	if err != nil {
		panic(err)
	}
}
Output:

func NewConsumerClientFromConnectionString

func NewConsumerClientFromConnectionString(connectionString string, eventHub string, consumerGroup string, options *ConsumerClientOptions) (*ConsumerClient, error)

NewConsumerClientFromConnectionString creates a ConsumerClient from a connection string. You MUST call ConsumerClient.Close on this client to avoid leaking resources.

connectionString can be one of two formats - with or without an EntityPath key.

When the connection string does not have an entity path, as shown below, the eventHub parameter cannot be empty and should contain the name of your event hub.

Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>

When the connection string DOES have an entity path, as shown below, the eventHub parameter must be empty.

Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>;
Example
package main

import (
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient
var err error

func main() {
	// if the connection string contains an EntityPath
	//
	connectionString := "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>"
	consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "", azeventhubs.DefaultConsumerGroup, nil)

	// or

	// if the connection string does not contain an EntityPath
	connectionString = "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
	consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "eventhub-name", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}
}
Output:

func (*ConsumerClient) Close

func (cc *ConsumerClient) Close(ctx context.Context) error

Close releases resources for this client.

func (*ConsumerClient) GetEventHubProperties

func (cc *ConsumerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)

GetEventHubProperties gets event hub properties, like the available partition IDs and when the Event Hub was created.

Example
package main

import (
	"context"
	"fmt"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient

func main() {
	eventHubProps, err := consumerClient.GetEventHubProperties(context.TODO(), nil)

	if err != nil {
		panic(err)
	}

	for _, partitionID := range eventHubProps.PartitionIDs {
		fmt.Printf("Partition ID: %s\n", partitionID)
	}
}
Output:

func (*ConsumerClient) GetPartitionProperties

func (cc *ConsumerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)

GetPartitionProperties gets properties for a specific partition. This includes data like the last enqueued sequence number, the first sequence number and when an event was last enqueued to the partition.

Example
package main

import (
	"context"
	"fmt"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient

func main() {
	partitionProps, err := consumerClient.GetPartitionProperties(context.TODO(), "partition-id", nil)

	if err != nil {
		panic(err)
	}

	fmt.Printf("First sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.BeginningSequenceNumber)
	fmt.Printf("Last sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.LastEnqueuedSequenceNumber)
}
Output:

func (*ConsumerClient) InstanceID added in v0.6.0

func (cc *ConsumerClient) InstanceID() string

InstanceID is the identifier for this ConsumerClient.

func (*ConsumerClient) NewPartitionClient added in v0.1.1

func (cc *ConsumerClient) NewPartitionClient(partitionID string, options *PartitionClientOptions) (*PartitionClient, error)

NewPartitionClient creates a client that can receive events from a partition. By default it starts at the latest point in the partition. This can be changed using the options parameter. You MUST call azeventhubs.PartitionClient.Close on the returned client to avoid leaking resources.

Example (ConfiguringPrefetch)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient

func main() {
	const partitionID = "0"

	// Prefetching configures the Event Hubs client to continually cache events, up to the configured size
	// in PartitionClientOptions.Prefetch. PartitionClient.ReceiveEvents will read from the cache first,
	// which can improve throughput in situations where you might normally be forced to request and wait
	// for more events.

	// By default, prefetch is enabled.
	partitionClient, err := consumerClient.NewPartitionClient(partitionID, nil)

	if err != nil {
		panic(err)
	}

	defer partitionClient.Close(context.TODO())

	// You can configure the prefetch buffer size as well. The default is 300.
	partitionClientWithCustomPrefetch, err := consumerClient.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{
		Prefetch: 301,
	})

	if err != nil {
		panic(err)
	}

	defer partitionClientWithCustomPrefetch.Close(context.TODO())

	// And prefetch can be disabled if you prefer to manually control the flow of events. Excess
	// events (that arrive after your ReceiveEvents() call has completed) will still be
	// buffered internally, but they will not be automatically replenished.
	partitionClientWithPrefetchDisabled, err := consumerClient.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{
		Prefetch: -1,
	})

	if err != nil {
		panic(err)
	}

	defer partitionClientWithPrefetchDisabled.Close(context.TODO())

	// Using a context with a timeout will allow ReceiveEvents() to return with events it
	// collected in a minute, or earlier if it actually gets all 100 events we requested.
	receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Minute)
	defer cancel()
	events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)

	if err != nil {
		panic(err)
	}

	for _, evt := range events {
		fmt.Printf("Body: %s\n", string(evt.Body))
	}
}
Output:

Example (ReceiveEvents)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient

func main() {
	const partitionID = "0"

	partitionClient, err := consumerClient.NewPartitionClient(partitionID, nil)

	if err != nil {
		panic(err)
	}

	defer partitionClient.Close(context.TODO())

	// Using a context with a timeout will allow ReceiveEvents() to return with events it
	// collected in a minute, or earlier if it actually gets all 100 events we requested.
	receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Minute)
	defer cancel()
	events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)

	if err != nil {
		panic(err)
	}

	for _, evt := range events {
		fmt.Printf("Body: %s\n", string(evt.Body))
	}
}
Output:

type ConsumerClientOptions

type ConsumerClientOptions struct {
	// ApplicationID is used as the identifier when setting the User-Agent property.
	ApplicationID string

	// InstanceID is a unique name used to identify the consumer. This can help with
	// diagnostics as this name will be returned in error messages. By default,
	// an identifier will be automatically generated.
	InstanceID string

	// NewWebSocketConn is a function that can create a net.Conn for use with websockets.
	// For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go.
	NewWebSocketConn func(ctx context.Context, args WebSocketConnParams) (net.Conn, error)

	// RetryOptions controls how often operations are retried from this client and any
	// Receivers and Senders created from this client.
	RetryOptions RetryOptions

	// TLSConfig configures a client with a custom *tls.Config.
	TLSConfig *tls.Config
}

ConsumerClientOptions configures optional parameters for a ConsumerClient.

type Error added in v0.1.1

type Error = exported.Error

Error represents an Event Hub specific error. NOTE: the Code is considered part of the published API but the message that comes back from Error(), as well as the underlying wrapped error, are NOT and are subject to change.

type ErrorCode added in v0.2.0

type ErrorCode = exported.ErrorCode

ErrorCode is an error code, usable by consuming code to work with programatically.

const (
	// ErrorCodeUnauthorizedAccess means the credentials provided are not valid for use with
	// a particular entity, or have expired.
	ErrorCodeUnauthorizedAccess ErrorCode = exported.ErrorCodeUnauthorizedAccess

	// ErrorCodeConnectionLost means our connection was lost and all retry attempts failed.
	// This typically reflects an extended outage or connection disruption and may
	// require manual intervention.
	ErrorCodeConnectionLost ErrorCode = exported.ErrorCodeConnectionLost

	// ErrorCodeOwnershipLost means that a partition that you were reading from was opened
	// by another link with a higher epoch/owner level.
	ErrorCodeOwnershipLost ErrorCode = exported.ErrorCodeOwnershipLost
)

type EventData

type EventData struct {
	// Properties can be used to store custom metadata for a message.
	Properties map[string]any

	// Body is the payload for a message.
	Body []byte

	// ContentType describes the payload of the message, with a descriptor following
	// the format of Content-Type, specified by RFC2045 (ex: "application/json").
	ContentType *string

	// CorrelationID is a client-specific id that can be used to mark or identify messages
	// between clients.
	// CorrelationID can be a uint64, UUID, []byte, or string
	CorrelationID any

	// MessageID is an application-defined value that uniquely identifies
	// the message and its payload. The identifier is a free-form string.
	//
	// If enabled, the duplicate detection feature identifies and removes further submissions
	// of messages with the same MessageId.
	MessageID *string
}

EventData is an event that can be sent, using the ProducerClient, to an Event Hub.

type EventDataBatch

type EventDataBatch struct {
	// contains filtered or unexported fields
}

EventDataBatch is used to efficiently pack up EventData before sending it to Event Hubs.

EventDataBatch's are not meant to be created directly. Use ProducerClient.NewEventDataBatch, which will create them with the proper size limit for your Event Hub.

func (*EventDataBatch) AddAMQPAnnotatedMessage added in v0.2.0

func (b *EventDataBatch) AddAMQPAnnotatedMessage(annotatedMessage *AMQPAnnotatedMessage, options *AddEventDataOptions) error

AddAMQPAnnotatedMessage adds an AMQPAnnotatedMessage to the batch, failing if the AMQPAnnotatedMessage would cause the EventDataBatch to be too large to send.

This size limit was set when the EventDataBatch was created, in options to ProducerClient.NewEventDataBatch, or (by default) from Event Hubs itself.

Returns ErrMessageTooLarge if the message cannot fit, or a non-nil error for other failures.

func (*EventDataBatch) AddEventData

func (b *EventDataBatch) AddEventData(ed *EventData, options *AddEventDataOptions) error

AddEventData adds an EventData to the batch, failing if the EventData would cause the EventDataBatch to be too large to send.

This size limit was set when the EventDataBatch was created, in options to ProducerClient.NewEventDataBatch, or (by default) from Event Hubs itself.

Returns ErrMessageTooLarge if the event cannot fit, or a non-nil error for other failures.

Example
package main

import (
	"context"
	"errors"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)

	if err != nil {
		panic(err)
	}

	// can be called multiple times with new messages until you
	// receive an azeventhubs.ErrMessageTooLarge
	err = batch.AddEventData(&azeventhubs.EventData{
		Body: []byte("hello"),
	}, nil)

	if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
		// Message was too large to fit into this batch.
		//
		// At this point you'd usually just send the batch (using ProducerClient.SendEventDataBatch),
		// create a new one, and start filling up the batch again.
		//
		// If this is the _only_ message being added to the batch then it's too big in general, and
		// will need to be split or shrunk to fit.
		panic(err)
	} else if err != nil {
		panic(err)
	}

	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}
Output:

Example (RawAMQPMessages)
package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)

	if err != nil {
		panic(err)
	}

	// This is functionally equivalent to EventDataBatch.AddEventData(), just with a more
	// advanced message format.
	// See ExampleEventDataBatch_AddEventData for more details.

	err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{
		Body: azeventhubs.AMQPAnnotatedMessageBody{
			Data: [][]byte{
				[]byte("hello"),
				[]byte("world"),
			},
		},
	}, nil)

	if err != nil {
		panic(err)
	}

	err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{
		Body: azeventhubs.AMQPAnnotatedMessageBody{
			Sequence: [][]any{
				// let the AMQP stack encode your strings (or other primitives) for you, no need
				// to convert them to bytes manually.
				{"hello", "world"},
				{"howdy", "world"},
			},
		},
	}, nil)

	if err != nil {
		panic(err)
	}

	err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{
		Body: azeventhubs.AMQPAnnotatedMessageBody{
			// let the AMQP stack encode your string (or other primitives) for you, no need
			// to convert them to bytes manually.
			Value: "hello world",
		},
	}, nil)

	if err != nil {
		panic(err)
	}

	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}
Output:

func (*EventDataBatch) NumBytes

func (b *EventDataBatch) NumBytes() uint64

NumBytes is the number of bytes in the batch.

func (*EventDataBatch) NumEvents added in v0.2.0

func (b *EventDataBatch) NumEvents() int32

NumEvents returns the number of events in the batch.

type EventDataBatchOptions added in v0.2.0

type EventDataBatchOptions struct {
	// MaxBytes overrides the max size (in bytes) for a batch.
	// By default NewEventDataBatch will use the max message size provided by the service.
	MaxBytes uint64

	// PartitionKey is hashed to calculate the partition assignment. Messages and message
	// batches with the same PartitionKey are guaranteed to end up in the same partition.
	// Note that if you use this option then PartitionID cannot be set.
	PartitionKey *string

	// PartitionID is the ID of the partition to send these messages to.
	// Note that if you use this option then PartitionKey cannot be set.
	PartitionID *string
}

EventDataBatchOptions contains optional parameters for the ProducerClient.NewEventDataBatch function.

If both PartitionKey and PartitionID are nil, Event Hubs will choose an arbitrary partition for any events in this EventDataBatch.

type EventHubProperties

type EventHubProperties struct {
	CreatedOn    time.Time
	Name         string
	PartitionIDs []string
}

EventHubProperties represents properties of the Event Hub, like the number of partitions.

type GetEventHubPropertiesOptions

type GetEventHubPropertiesOptions struct {
}

GetEventHubPropertiesOptions contains optional parameters for the GetEventHubProperties function

type GetPartitionPropertiesOptions

type GetPartitionPropertiesOptions struct {
}

GetPartitionPropertiesOptions are the options for the GetPartitionProperties function.

type ListCheckpointsOptions added in v0.1.1

type ListCheckpointsOptions struct {
}

ListCheckpointsOptions contains optional parameters for the ListCheckpoints function

type ListOwnershipOptions added in v0.1.1

type ListOwnershipOptions struct {
}

ListOwnershipOptions contains optional parameters for the ListOwnership function

type Ownership added in v0.1.1

type Ownership struct {
	ConsumerGroup           string
	EventHubName            string
	FullyQualifiedNamespace string
	PartitionID             string

	OwnerID          string       // the owner ID of the Processor
	LastModifiedTime time.Time    // used when calculating if ownership has expired
	ETag             *azcore.ETag // the ETag, used when attempting to claim or update ownership of a partition.
}

Ownership tracks which consumer owns a particular partition.

type PartitionClient added in v0.1.1

type PartitionClient struct {
	// contains filtered or unexported fields
}

PartitionClient is used to receive events from an Event Hub partition.

This type is instantiated from the ConsumerClient type, using ConsumerClient.NewPartitionClient.

func (*PartitionClient) Close added in v0.1.1

func (pc *PartitionClient) Close(ctx context.Context) error

Close releases resources for this client.

func (*PartitionClient) ReceiveEvents added in v0.1.1

func (pc *PartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)

ReceiveEvents receives events until 'count' events have been received or the context has expired or been cancelled.

If your ReceiveEvents call appears to be stuck there are some common causes:

  1. The PartitionClientOptions.StartPosition defaults to "Latest" when the client is created. The connection is lazily initialized, so it's possible the link was initialized to a position after events you've sent. To make this deterministic, you can choose an explicit start point using sequence number, offset or a timestamp. See the [PartitionClientOptions.StartPosition] field for more details.

  2. You might have sent the events to a different partition than intended. By default, batches that are created using ProducerClient.NewEventDataBatch do not target a specific partition. When a partition is not specified, Azure Event Hubs service will choose the partition the events will be sent to.

    To fix this, you can specify a PartitionID as part of your [EventDataBatchOptions.PartitionID] options or open multiple PartitionClient instances, one for each partition. You can get the full list of partitions at runtime using ConsumerClient.GetEventHubProperties. See the "example_consuming_events_test.go" for an example of this pattern.

  3. Network issues can cause internal retries. To see log messages related to this use the instructions in the example function "Example_enableLogging".

type PartitionClientOptions added in v0.2.0

type PartitionClientOptions struct {
	// StartPosition is the position we will start receiving events from,
	// either an offset (inclusive) with Offset, or receiving events received
	// after a specific time using EnqueuedTime.
	//
	// NOTE: you can also use the [Processor], which will automatically manage the start
	// value using a [CheckpointStore]. See [example_consuming_with_checkpoints_test.go] for an
	// example.
	//
	// [example_consuming_with_checkpoints_test.go]: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_with_checkpoints_test.go
	StartPosition StartPosition

	// OwnerLevel is the priority for this partition client, also known as the 'epoch' level.
	// When used, a partition client with a higher OwnerLevel will take ownership of a partition
	// from partition clients with a lower OwnerLevel.
	// Default is off.
	OwnerLevel *int64

	// Prefetch represents the size of the internal prefetch buffer. When set,
	// this client will attempt to always maintain an internal cache of events of
	// this size, asynchronously, increasing the odds that ReceiveEvents() will use
	// a locally stored cache of events, rather than having to wait for events to
	// arrive from the network.
	//
	// Defaults to 300 events if Prefetch == 0.
	// Disabled if Prefetch < 0.
	Prefetch int32
}

PartitionClientOptions provides options for the NewPartitionClient function.

type PartitionProperties

type PartitionProperties struct {
	// BeginningSequenceNumber is the first sequence number for a partition.
	BeginningSequenceNumber int64
	// EventHubName is the name of the Event Hub for this partition.
	EventHubName string

	// IsEmpty is true if the partition is empty, false otherwise.
	IsEmpty bool

	// LastEnqueuedOffset is the offset of latest enqueued event.
	LastEnqueuedOffset int64

	// LastEnqueuedOn is the date of latest enqueued event.
	LastEnqueuedOn time.Time

	// LastEnqueuedSequenceNumber is the sequence number of the latest enqueued event.
	LastEnqueuedSequenceNumber int64

	// PartitionID is the partition ID of this partition.
	PartitionID string
}

PartitionProperties are the properties for a single partition.

type Processor added in v0.1.1

type Processor struct {
	// contains filtered or unexported fields
}

Processor uses a ConsumerClient and CheckpointStore to provide automatic load balancing between multiple Processor instances, even in separate processes or on separate machines.

See example_consuming_with_checkpoints_test.go for an example, and the function documentation for [Run] for a more detailed description of how load balancing works.

func NewProcessor added in v0.1.1

func NewProcessor(consumerClient *ConsumerClient, checkpointStore CheckpointStore, options *ProcessorOptions) (*Processor, error)

NewProcessor creates a Processor.

More information can be found in the documentation for the Processor type or the example_consuming_with_checkpoints_test.go for an example.

func (*Processor) NextPartitionClient added in v0.1.1

func (p *Processor) NextPartitionClient(ctx context.Context) *ProcessorPartitionClient

NextPartitionClient will get the next owned ProcessorPartitionClient if one is acquired or will block until a new one arrives or Processor.Run is cancelled. When the Processor stops running this function will return nil.

NOTE: You MUST call ProcessorPartitionClient.Close on the returned client to avoid leaking resources.

See example_consuming_with_checkpoints_test.go for an example of typical usage.

func (*Processor) Run added in v0.1.1

func (p *Processor) Run(ctx context.Context) error

Run handles the load balancing loop, blocking until the passed in context is cancelled or it encounters an unrecoverable error. On cancellation, it will return a nil error.

This function should run for the lifetime of your application, or for as long as you want to continue to claim and process partitions.

As partitions are claimed new ProcessorPartitionClient instances will be returned from Processor.NextPartitionClient. This can happen at any time, based on new Processor instances coming online, as well as other Processors exiting.

ProcessorPartitionClient are used like a PartitionClient but provide an ProcessorPartitionClient.UpdateCheckpoint function that will store a checkpoint into the CheckpointStore. If the client were to crash, or be restarted it will pick up from the last checkpoint.

See example_consuming_with_checkpoints_test.go for an example of typical usage.

type ProcessorOptions added in v0.2.0

type ProcessorOptions struct {
	// LoadBalancingStrategy dictates how concurrent Processor instances distribute
	// ownership of partitions between them.
	// The default strategy is ProcessorStrategyBalanced.
	LoadBalancingStrategy ProcessorStrategy

	// UpdateInterval controls how often attempt to claim partitions.
	// The default value is 10 seconds.
	UpdateInterval time.Duration

	// PartitionExpirationDuration is the amount of time before a partition is considered
	// unowned.
	// The default value is 60 seconds.
	PartitionExpirationDuration time.Duration

	// StartPositions are the default start positions (configurable per partition, or with an overall
	// default value) if a checkpoint is not found in the CheckpointStore.
	// The default position is Latest.
	StartPositions StartPositions

	// Prefetch represents the size of the internal prefetch buffer for each ProcessorPartitionClient
	// created by this Processor. When set, this client will attempt to always maintain
	// an internal cache of events of this size, asynchronously, increasing the odds that
	// ReceiveEvents() will use a locally stored cache of events, rather than having to
	// wait for events to arrive from the network.
	//
	// Defaults to 300 events if Prefetch == 0.
	// Disabled if Prefetch < 0.
	Prefetch int32
}

ProcessorOptions are the options for the NewProcessor function.

type ProcessorPartitionClient added in v0.1.1

type ProcessorPartitionClient struct {
	// contains filtered or unexported fields
}

ProcessorPartitionClient allows you to receive events, similar to a PartitionClient, with a checkpoint store for tracking progress.

This type is instantiated from Processor.NextPartitionClient, which handles load balancing of partition ownership between multiple Processor instances.

See example_consuming_with_checkpoints_test.go for an example.

NOTE: If you do NOT want to use dynamic load balancing, and would prefer to track state and ownership manually, use the ConsumerClient instead.

func (*ProcessorPartitionClient) Close added in v0.1.1

Close releases resources for the partition client. This does not close the ConsumerClient that the Processor was started with.

func (*ProcessorPartitionClient) PartitionID added in v0.1.1

func (p *ProcessorPartitionClient) PartitionID() string

PartitionID is the partition ID of the partition we're receiving from. This will not change during the lifetime of this ProcessorPartitionClient.

func (*ProcessorPartitionClient) ReceiveEvents added in v0.1.1

func (c *ProcessorPartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)

ReceiveEvents receives events until 'count' events have been received or the context has been cancelled.

See PartitionClient.ReceiveEvents for more information, including troubleshooting.

func (*ProcessorPartitionClient) UpdateCheckpoint added in v0.1.1

func (p *ProcessorPartitionClient) UpdateCheckpoint(ctx context.Context, latestEvent *ReceivedEventData, options *UpdateCheckpointOptions) error

UpdateCheckpoint updates the checkpoint in the CheckpointStore. New Processors will resume after this checkpoint for this partition.

type ProcessorStrategy added in v0.1.1

type ProcessorStrategy string

ProcessorStrategy specifies the load balancing strategy used by the Processor.

const (
	// ProcessorStrategyBalanced will attempt to claim a single partition at a time, until each active
	// owner has an equal share of partitions.
	// This is the default strategy.
	ProcessorStrategyBalanced ProcessorStrategy = "balanced"

	// ProcessorStrategyGreedy will attempt to claim as many partitions at a time as it can, ignoring
	// balance.
	ProcessorStrategyGreedy ProcessorStrategy = "greedy"
)

type ProducerClient

type ProducerClient struct {
	// contains filtered or unexported fields
}

ProducerClient can be used to send events to an Event Hub.

func NewProducerClient

func NewProducerClient(fullyQualifiedNamespace string, eventHub string, credential azcore.TokenCredential, options *ProducerClientOptions) (*ProducerClient, error)

NewProducerClient creates a ProducerClient which uses an azcore.TokenCredential for authentication. You MUST call ProducerClient.Close on this client to avoid leaking resources.

The fullyQualifiedNamespace is the Event Hubs namespace name (ex: myeventhub.servicebus.windows.net) The credential is one of the credentials in the azidentity package.

Example
package main

import (
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	// `DefaultAzureCredential` tries several common credential types. For more credential types
	// see this link: https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types.
	defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

	if err != nil {
		panic(err)
	}

	producerClient, err = azeventhubs.NewProducerClient("<ex: myeventhubnamespace.servicebus.windows.net>", "eventhub-name", defaultAzureCred, nil)

	if err != nil {
		panic(err)
	}
}
Output:

func NewProducerClientFromConnectionString

func NewProducerClientFromConnectionString(connectionString string, eventHub string, options *ProducerClientOptions) (*ProducerClient, error)

NewProducerClientFromConnectionString creates a ProducerClient from a connection string. You MUST call ProducerClient.Close on this client to avoid leaking resources.

connectionString can be one of two formats - with or without an EntityPath key.

When the connection string does not have an entity path, as shown below, the eventHub parameter cannot be empty and should contain the name of your event hub.

Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>

When the connection string DOES have an entity path, as shown below, the eventHub parameter must be empty.

Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>;
Example
// if the connection string contains an EntityPath
//
connectionString := "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>"
producerClient, err = azeventhubs.NewProducerClientFromConnectionString(connectionString, "", nil)

// or

// if the connection string does not contain an EntityPath
connectionString = "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
producerClient, err = azeventhubs.NewProducerClientFromConnectionString(connectionString, "eventhub-name", nil)

if err != nil {
	panic(err)
}
Output:

func (*ProducerClient) Close

func (pc *ProducerClient) Close(ctx context.Context) error

Close releases resources for this client.

func (*ProducerClient) GetEventHubProperties

func (pc *ProducerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)

GetEventHubProperties gets event hub properties, like the available partition IDs and when the Event Hub was created.

Example
package main

import (
	"context"
	"fmt"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	eventHubProps, err := producerClient.GetEventHubProperties(context.TODO(), nil)

	if err != nil {
		panic(err)
	}

	for _, partitionID := range eventHubProps.PartitionIDs {
		fmt.Printf("Partition ID: %s\n", partitionID)
	}
}
Output:

func (*ProducerClient) GetPartitionProperties

func (pc *ProducerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)

GetPartitionProperties gets properties for a specific partition. This includes data like the last enqueued sequence number, the first sequence number and when an event was last enqueued to the partition.

Example
package main

import (
	"context"
	"fmt"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	partitionProps, err := producerClient.GetPartitionProperties(context.TODO(), "partition-id", nil)

	if err != nil {
		panic(err)
	}

	fmt.Printf("First sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.BeginningSequenceNumber)
	fmt.Printf("Last sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.LastEnqueuedSequenceNumber)
}
Output:

func (*ProducerClient) NewEventDataBatch

func (pc *ProducerClient) NewEventDataBatch(ctx context.Context, options *EventDataBatchOptions) (*EventDataBatch, error)

NewEventDataBatch can be used to create an EventDataBatch, which can contain multiple events.

EventDataBatch contains logic to make sure that the it doesn't exceed the maximum size for the Event Hubs link, using it's azeventhubs.EventDataBatch.AddEventData function. A lower size limit can also be configured through the options.

NOTE: if options is nil or empty, Event Hubs will choose an arbitrary partition for any events in this EventDataBatch.

If the operation fails it can return an azeventhubs.Error type if the failure is actionable.

func (*ProducerClient) SendEventDataBatch added in v0.3.0

func (pc *ProducerClient) SendEventDataBatch(ctx context.Context, batch *EventDataBatch, options *SendEventDataBatchOptions) error

SendEventDataBatch sends an event data batch to Event Hubs.

Example
package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)

	if err != nil {
		panic(err)
	}

	// See ExampleProducerClient_AddEventData for more information.
	err = batch.AddEventData(&azeventhubs.EventData{Body: []byte("hello")}, nil)

	if err != nil {
		panic(err)
	}

	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}
Output:

type ProducerClientOptions

type ProducerClientOptions struct {
	// Application ID that will be passed to the namespace.
	ApplicationID string

	// NewWebSocketConn is a function that can create a net.Conn for use with websockets.
	// For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go.
	NewWebSocketConn func(ctx context.Context, params WebSocketConnParams) (net.Conn, error)

	// RetryOptions controls how often operations are retried from this client and any
	// Receivers and Senders created from this client.
	RetryOptions RetryOptions

	// TLSConfig configures a client with a custom *tls.Config.
	TLSConfig *tls.Config
}

ProducerClientOptions contains options for the `NewProducerClient` and `NewProducerClientFromConnectionString` functions.

type ReceiveEventsOptions

type ReceiveEventsOptions struct {
}

ReceiveEventsOptions contains optional parameters for the ReceiveEvents function

type ReceivedEventData

type ReceivedEventData struct {
	EventData

	// EnqueuedTime is the UTC time when the message was accepted and stored by Event Hubs.
	EnqueuedTime *time.Time

	// PartitionKey is used with a partitioned entity and enables assigning related messages
	// to the same internal partition. This ensures that the submission sequence order is correctly
	// recorded. The partition is chosen by a hash function in Event Hubs and cannot be chosen
	// directly.
	PartitionKey *string

	// Offset is the offset of the event.
	Offset int64

	// RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
	// to properties that are not exposed by ReceivedEventData such as payloads encoded into the
	// Value or Sequence section, payloads sent as multiple Data sections, as well as Footer
	// and Header fields.
	RawAMQPMessage *AMQPAnnotatedMessage

	// SequenceNumber is a unique number assigned to a message by Event Hubs.
	SequenceNumber int64

	// Properties set by the Event Hubs service.
	SystemProperties map[string]any
}

ReceivedEventData is an event that has been received using the ConsumerClient.

type RetryOptions

type RetryOptions = exported.RetryOptions

RetryOptions represent the options for retries.

type SendEventDataBatchOptions added in v0.3.0

type SendEventDataBatchOptions struct {
}

SendEventDataBatchOptions contains optional parameters for the SendEventDataBatch function

type SetCheckpointOptions added in v1.0.0

type SetCheckpointOptions struct {
}

SetCheckpointOptions contains optional parameters for the UpdateCheckpoint function

type StartPosition

type StartPosition struct {
	// Offset will start the consumer after the specified offset. Can be exclusive
	// or inclusive, based on the Inclusive property.
	// NOTE: offsets are not stable values, and might refer to different events over time
	// as the Event Hub events reach their age limit and are discarded.
	Offset *int64

	// SequenceNumber will start the consumer after the specified sequence number. Can be exclusive
	// or inclusive, based on the Inclusive property.
	SequenceNumber *int64

	// EnqueuedTime will start the consumer before events that were enqueued on or after EnqueuedTime.
	// Can be exclusive or inclusive, based on the Inclusive property.
	EnqueuedTime *time.Time

	// Inclusive configures whether the events directly at Offset, SequenceNumber or EnqueuedTime will be included (true)
	// or excluded (false).
	Inclusive bool

	// Earliest will start the consumer at the earliest event.
	Earliest *bool

	// Latest will start the consumer after the last event.
	Latest *bool
}

StartPosition indicates the position to start receiving events within a partition. The default position is Latest.

You can set this in the options for ConsumerClient.NewPartitionClient.

type StartPositions added in v0.1.1

type StartPositions struct {
	// PerPartition controls the start position for a specific partition,
	// by partition ID. If a partition is not configured here it will default
	// to Default start position.
	PerPartition map[string]StartPosition

	// Default is used if the partition is not found in the PerPartition map.
	Default StartPosition
}

StartPositions are used if there is no checkpoint for a partition in the checkpoint store.

type UpdateCheckpointOptions added in v0.1.1

type UpdateCheckpointOptions struct {
}

UpdateCheckpointOptions contains optional parameters for the ProcessorPartitionClient.UpdateCheckpoint function.

type WebSocketConnParams added in v0.2.0

type WebSocketConnParams = exported.WebSocketConnParams

WebSocketConnParams are passed to your web socket creation function (ClientOptions.NewWebSocketConn)

Directories

Path Synopsis
amqpwrap
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
auth
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
eh
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.
sas

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL