azeventhubs

package module
v0.6.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 7, 2023 License: MIT Imports: 20 Imported by: 19

README

Azure Event Hubs Client Module for Go

Azure Event Hubs is a big data streaming platform and event ingestion service from Microsoft. For more information about Event Hubs see: link.

Use the client library github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs in your application to:

  • Send events to an event hub.
  • Consume events from an event hub.

NOTE: This library is currently a beta. There may be breaking changes until it reaches semantic version v1.0.0.

Key links:

Getting started

Install the package

Install the Azure Event Hubs client module for Go with go get:

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs
Prerequisites
Authenticate the client

Event Hub clients are created using an Event Hub a credential from the Azure Identity package, like DefaultAzureCredential. You can also create a client using a connection string.

Using a service principal
  • ConsumerClient: link
  • ProducerClient: link
Using a connection string
  • ConsumerClient: link
  • ProducerClient: link

Key concepts

An Event Hub namespace can have multiple event hubs. Each event hub, in turn, contains partitions which store events.

Events are published to an event hub using an event publisher. In this package, the event publisher is the ProducerClient

Events can be consumed from an event hub using an event consumer. In this package there are two types for consuming events:

  • The basic event consumer is the, in the ConsumerClient. This consumer is useful if you already known which partitions you want to receive from.
  • A distributed event consumer, which uses Azure Blobs for checkpointing and coordination. This is implemented in the Processor. This is useful when you want to have the partition assignment be dynamically chosen, and balanced with other Processor instances.

For more information about Event Hubs features and terminology can be found here: link

Examples

Examples for various scenarios can be found on pkg.go.dev or in the example*_test.go files in our GitHub repo for azeventhubs.

Troubleshooting

Logging

This module uses the classification-based logging implementation in azcore. To enable console logging for all SDK modules, set the environment variable AZURE_SDK_GO_LOGGING to all.

Use the azcore/log package to control log event output or to enable logs for azeventhubs only. For example:

import (
  "fmt"
  azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
)

// print log output to stdout
azlog.SetListener(func(event azlog.Event, s string) {
    fmt.Printf("[%s] %s\n", event, s)
})

// pick the set of events to log
azlog.SetEvents(
  azeventhubs.EventConn,
  azeventhubs.EventAuth,
  azeventhubs.EventProducer,
  azeventhubs.EventConsumer,
)

Contributing

For details on contributing to this repository, see the contributing guide.

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.

Many people all over the world have helped make this project better. You'll want to check out:

Reporting security issues and security bugs

Security issues and bugs should be reported privately, via email, to the Microsoft Security Response Center (MSRC) secure@microsoft.com. You should receive a response within 24 hours. If for some reason you do not, please follow up via email to ensure we received your original message. Further information, including the MSRC PGP key, can be found in the Security TechCenter.

License

Azure SDK for Go is licensed under the MIT license.

Impressions

Documentation

Overview

Example (ConsumingEventsUsingConsumerClient)

Shows how to start consuming events in partitions in an Event Hub.

If you have an Azure Storage account you can use the Processor type instead, which will handle distributing partitions between multiple consumers. See example_processor_test.go for usage of that type.

For an example of that see example_processor_test.go.

package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"sync"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

// Shows how to start consuming events in partitions in an Event Hub.
//
// If you have an Azure Storage account you can use the [Processor] type instead, which will handle
// distributing partitions between multiple consumers. See example_processor_test.go for usage of
// that type.
//
// For an example of that see [example_processor_test.go].
//
// [example_processor_test.go]: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_processor_test.go
func main() {
	eventHubNamespace := os.Getenv("EVENTHUB_NAMESPACE") // <ex: myeventhubnamespace.servicebus.windows.net>
	eventHubName := os.Getenv("EVENTHUB_NAME")

	fmt.Printf("Event Hub Namespace: %s, hubname: %s\n", eventHubNamespace, eventHubName)

	defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

	if err != nil {
		panic(err)
	}

	// Can also use a connection string:
	//
	// consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, eventHubName, azeventhubs.DefaultConsumerGroup, nil)
	//
	consumerClient, err := azeventhubs.NewConsumerClient(eventHubNamespace, eventHubName, azeventhubs.DefaultConsumerGroup, defaultAzureCred, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// Consuming events from Azure Event Hubs requires an Event Hub AND a partition to receive from.
	// This differs from sending events where a partition ID is not required, although events are
	// assigned a partition when stored on the server side.
	//
	// For this example, we'll assume we don't know which partitions will have events, so we'll just
	// receive from all of them.
	eventHubProperties, err := consumerClient.GetEventHubProperties(context.Background(), nil)

	if err != nil {
		panic(err)
	}

	overallReceiveTime := 5 * time.Minute
	appCtx, cancelApp := context.WithTimeout(context.Background(), overallReceiveTime)
	defer cancelApp()

	fmt.Printf("Starting PartitionClients, will receive for %s...\n", overallReceiveTime)

	wg := sync.WaitGroup{}

	for _, tmpPartitionID := range eventHubProperties.PartitionIDs {
		wg.Add(1)

		go func(partitionID string) {
			defer wg.Done()

			// NOTE: If you're not changing any options for this function you can pass `nil` for the options
			// parameter.
			partitionClient, err := consumerClient.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{
				// The default start position is Latest, which means this partition client will only start receiving
				// events stored AFTER the receiver has initialized.
				//
				// If you need to read events earlier than that or require a more deterministic start, you can use
				// the other fields in StartPosition, which allow you to choose a starting sequence number, offset
				// or even a timestamp.
				StartPosition: azeventhubs.StartPosition{
					Latest: to.Ptr(true),
				},
			})

			if err != nil {
				panic(err)
			}

			defer partitionClient.Close(context.TODO())

			fmt.Printf("[Partition: %s] Starting receive loop for partition\n", partitionID)

			for {
				// Using a context with a timeout will allow ReceiveEvents() to return with events it
				// collected in a minute, or earlier if it actually gets all 100 events we requested.
				receiveCtx, cancel := context.WithTimeout(appCtx, time.Minute)
				events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
				cancel()

				if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
					if appCtx.Err() != nil {
						// Gracefully exit. the 'defer partitionClient.Close(context.Background())' above
						// will take care of closing the PartitionClient.
						fmt.Printf("[Partition: %s] Application is stopping, stopping receive for partition\n", partitionID)
						break
					}

					// We didn't get any events before our context cancelled. Let's loop again.
					fmt.Printf("[Partition: %s] No events arrived in 1m, trying to receive again\n", partitionID)
				} else if err != nil {
					panic(err)
				}

				processConsumedEvents(partitionID, events)
			}
		}(tmpPartitionID)
	}

	fmt.Printf("Waiting for %s, for events to arrive on any partition", overallReceiveTime)
	wg.Wait()
	fmt.Printf("Done receiving events\n")
}

func processConsumedEvents(partitionID string, events []*azeventhubs.ReceivedEventData) {
	for _, event := range events {
		// We're assuming the Body is a byte-encoded string. EventData.Body supports any payload
		// that can be encoded to []byte.
		fmt.Printf("  [Partition: %s] Event received with body '%s'\n", partitionID, string(event.Body))
	}
}
Output:

Example (ConsumingEventsUsingProcessor)

Shows how to use the Processor type.

The Processor type acts as a load balancer, ensuring that partitions are divided up amongst active Processor instances. You provide it with a ConsumerClient as well as a CheckpointStore.

You will loop, continually calling Processor.NextPartitionClient and using the ProcessorPartitionClient's that are returned. This loop will run for the lifetime of your application, as ownership can change over time as new Processor instances are started, or die.

As you process a partition using ProcessorPartitionClient.ReceiveEvents you will periodically call ProcessorPartitionClient.UpdateCheckpoint, which stores your checkpoint information inside of the CheckpointStore. In the common case, this means your checkpoint information will be stored in Azure Blob storage.

If you prefer to manually allocate partitions or to have more control over the process you can use the ConsumerClient type. See example_consuming_events_test.go for an example.

package main

import (
	"context"
	"errors"
	"fmt"
	"os"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/internal/exported"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

// Shows how to use the [Processor] type.
//
// The Processor type acts as a load balancer, ensuring that partitions are divided up amongst
// active Processor instances. You provide it with a [ConsumerClient] as well as a [CheckpointStore].
//
// You will loop, continually calling [Processor.NextPartitionClient] and using the [ProcessorPartitionClient]'s
// that are returned. This loop will run for the lifetime of your application, as ownership can change over
// time as new Processor instances are started, or die.
//
// As you process a partition using [ProcessorPartitionClient.ReceiveEvents] you will periodically
// call [ProcessorPartitionClient.UpdateCheckpoint], which stores your checkpoint information inside of
// the [CheckpointStore]. In the common case, this means your checkpoint information will be stored
// in Azure Blob storage.
//
// If you prefer to manually allocate partitions or to have more control over the process you can use
// the [ConsumerClient] type. See [example_consuming_events_test.go] for an example.
//
// [example_consuming_events_test.go]: https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/example_consuming_events_test.go
func main() {
	// The Processor makes it simpler to do distributed consumption of an Event Hub.
	// It automatically coordinates with other Processor instances to ensure balanced
	// allocation of partitions and tracks status, durably, in a CheckpointStore.
	//
	// The built-in checkpoint store (available in the `azeventhubs/checkpoints` package) uses
	// Azure Blob storage.

	ehCS := os.Getenv("EVENTHUB_CONNECTION_STRING")
	eventHubName := os.Getenv("EVENTHUB_NAME")

	storageCS := os.Getenv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING")
	containerName := os.Getenv("CHECKPOINTSTORE_STORAGE_CONTAINER_NAME")

	// Create the checkpoint store
	//
	// NOTE: the Blob container must exist before the checkpoint store can be used.
	azBlobContainerClient, err := container.NewClientFromConnectionString(storageCS, containerName, nil)

	if err != nil {
		panic(err)
	}

	checkpointStore, err := checkpoints.NewBlobStore(azBlobContainerClient, nil)

	if err != nil {
		panic(err)
	}

	// Create a ConsumerClient
	//
	// The Processor (created below) will use this to create any PartitionClient instances, as ownership
	// is assigned.
	consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(ehCS, eventHubName, azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}

	defer consumerClient.Close(context.TODO())

	// Create the Processor
	//
	// The Processor handles load balancing with other Processor instances, running in separate
	// processes or even on separate machines. Each one will use the checkpointStore to coordinate
	// state and ownership, dynamically.
	processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

	if err != nil {
		panic(err)
	}

	// This function will be launched in a goroutine and will loop and launch
	// processing of partitions, in parallel.
	dispatchPartitionClients := func() {
		// Our loop will terminate when:
		// - You cancel the passed in context
		// - The Processor.Run() call is cancelled or terminates.
		for {
			partitionClient := processor.NextPartitionClient(context.TODO())

			if partitionClient == nil {
				// this happens if Processor.Run terminates or if you cancel
				// the passed in context.
				break
			}

			// Each time you get a ProcessorPartitionClient, you are the
			// exclusive owner. The previous owner (if there was one) will get an
			// *azeventhubs.Error from their next call to PartitionClient.ReceiveEvents()
			// with a Code of CodeOwnershipLost.
			go func() {
				if err := processEvents(partitionClient); err != nil {
					panic(err)
				}
			}()
		}
	}

	go dispatchPartitionClients()

	// This context will control the lifetime of the Processor.Run call.
	// When it is cancelled the processor will stop running and also close
	// any ProcessorPartitionClient's it opened while running.
	processorCtx, processorCancel := context.WithCancel(context.TODO())
	defer processorCancel()

	// Run the load balancer. The dispatchPartitionClients goroutine, launched
	// above, will continually get new ProcessorPartitionClient's as partitions
	// are allocated.
	//
	// Stopping the processor is as simple as canceling the context that you passed
	// in to Run.
	if err := processor.Run(processorCtx); err != nil {
		panic(err)
	}
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
	// In other models of the Processor we have broken up the partition
	// lifecycle model.
	//
	// In Go, we model this as a function call, with a loop, using this structure:
	//
	// 1. [BEGIN] Initialize any partition specific resources.
	// 2. [CONTINUOUS] Run a loop, calling ReceiveEvents() and UpdateCheckpoint().
	// 3. [END] Close any resources associated with the processor.

	// [END] Do cleanup here, like shutting down database connections
	// or other resources used for processing this partition.
	defer closePartitionResources(partitionClient)

	// [BEGIN] Initialize any resources needed to process the partition
	if err := initializePartitionResources(partitionClient.PartitionID()); err != nil {
		return err
	}

	// [CONTINUOUS] loop until you lose ownership or your own criteria, checkpointing
	// as needed using UpdateCheckpoint.
	for {
		// Using a context with a timeout will allow ReceiveEvents() to return with events it
		// collected in a minute, or earlier if it actually gets all 100 events we requested.
		receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
		events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
		receiveCtxCancel()

		// Timing out (context.DeadlineExceeded) is fine. We didn't receive our 100 events
		// but we might have received _some_ events.
		if err != nil && !errors.Is(err, context.DeadlineExceeded) {
			if eventHubError := (*azeventhubs.Error)(nil); errors.As(err, &eventHubError) && eventHubError.Code == exported.ErrorCodeOwnershipLost {
				// This means that the partition was "stolen" - this can happen as partitions are balanced between
				// consumers. We'll exit here and just let our "defer closePartitionResources" handle closing
				// resources, including the ProcessorPartitionClient.
				return nil
			}

			return err
		}

		fmt.Printf("Processing %d event(s)\n", len(events))

		for _, event := range events {
			// process the event in some way
			fmt.Printf("Event received with body %v\n", event.Body)
		}

		// it's possible to get zero events if the partition is empty, or if no new events have arrived
		// since your last receive.
		if len(events) != 0 {
			// Update the checkpoint with the last event received. If we lose ownership of this partition or
			// have to restart the next owner will start from this point.
			if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
				return err
			}
		}
	}
}

func initializePartitionResources(partitionID string) error {
	// initialize things that might be partition specific, like a
	// database connection.
	return nil
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
	// Each PartitionClient holds onto an external resource and should be closed if you're
	// not processing them anymore.
	defer partitionClient.Close(context.TODO())
}
Output:

Example (EnableLogging)
package main

import (
	"fmt"

	azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	// print log output to stdout
	azlog.SetListener(printLoggedEvent)

	// pick the set of events to log
	azlog.SetEvents(
		azeventhubs.EventConn,
		azeventhubs.EventAuth,
		azeventhubs.EventProducer,
		azeventhubs.EventConsumer,
	)

	fmt.Printf("Logging enabled\n")

}

func printLoggedEvent(event azlog.Event, s string) {
	fmt.Printf("[%s] %s\n", event, s)
}
Output:

Logging enabled
Example (MigrateCheckpoints)

Shows how to migrate from the older `github.com/Azure/azure-event-hubs-go` checkpointer to to the format used by this package, `github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints/BlobStore`

NOTE: This example is not safe to run while either the old or new checkpoint store is in-use as it doesn't respect locking or ownership.

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"os"
	"strconv"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
	"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
)

type LegacyCheckpoint struct {
	PartitionID string `json:"partitionID"`
	Epoch       int    `json:"epoch"`
	Owner       string `json:"owner"`
	Checkpoint  struct {
		Offset         string `json:"offset"`
		SequenceNumber int64  `json:"sequenceNumber"`
		EnqueueTime    string `json:"enqueueTime"` // ": "0001-01-01T00:00:00Z"
	} `json:"checkpoint"`
}

// Shows how to migrate from the older `github.com/Azure/azure-event-hubs-go` checkpointer to to
// the format used by this package, `github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints/BlobStore`
//
// NOTE: This example is not safe to run while either the old or new checkpoint store is in-use as it doesn't
// respect locking or ownership.
func main() {
	// Azure Event Hubs connection string. You can get this from the Azure Portal.
	// For example: youreventhub.servicebus.windows.net
	var EventHubNamespace = os.Getenv("EVENTHUB_NAMESPACE")

	// Name of your Event Hub that these checkpoints reference.
	var EventHubName = os.Getenv("EVENTHUB_NAME")

	// Name of your Event Hub consumer group
	// Example: $Default
	var EventHubConsumerGroup = os.Getenv("EVENTHUB_CONSUMER_GROUP")

	// Azure Storage account connection string. You can get this from the Azure Portal.
	// For example: DefaultEndpointsProtocol=https;AccountName=accountname;AccountKey=account-key;EndpointSuffix=core.windows.net
	var StorageConnectionString = os.Getenv("STORAGE_CONNECTION_STRING")

	// Optional: If you used `eventhub.WithPrefixInBlobPath()` configuration option for your Event Processor Host
	// then you'll need to set this value.
	//
	// NOTE: This is no longer needed with the new checkpoint store as it automatically makes the path unique
	// for each combination of eventhub + hubname + consumergroup + partition.
	var BlobPrefix = os.Getenv("OLD_STORAGE_BLOB_PREFIX")

	// Name of the checkpoint store's Azure Storage container.
	var OldStorageContainerName = os.Getenv("OLD_STORAGE_CONTAINER_NAME")

	// Name of the Azure Storage container to place new checkpoints in.
	var NewStorageContainerName = os.Getenv("NEW_STORAGE_CONTAINER_NAME")

	if EventHubNamespace == "" || EventHubName == "" || EventHubConsumerGroup == "" ||
		StorageConnectionString == "" || OldStorageContainerName == "" || NewStorageContainerName == "" {
		fmt.Printf("Skipping migration, missing parameters\n")
		return
	}

	blobClient, err := azblob.NewClientFromConnectionString(StorageConnectionString, nil)

	if err != nil {
		panic(err)
	}

	oldCheckpoints, err := loadOldCheckpoints(blobClient, OldStorageContainerName, BlobPrefix)

	if err != nil {
		panic(err)
	}

	newCheckpointStore, err := checkpoints.NewBlobStore(blobClient.ServiceClient().NewContainerClient(NewStorageContainerName), nil)

	if err != nil {
		panic(err)
	}

	for _, oldCheckpoint := range oldCheckpoints {
		newCheckpoint := azeventhubs.Checkpoint{
			ConsumerGroup:           EventHubConsumerGroup,
			EventHubName:            EventHubName,
			FullyQualifiedNamespace: EventHubNamespace,
			PartitionID:             oldCheckpoint.PartitionID,
		}

		offset, err := strconv.ParseInt(oldCheckpoint.Checkpoint.Offset, 10, 64)

		if err != nil {
			panic(err)
		}

		newCheckpoint.Offset = &offset
		newCheckpoint.SequenceNumber = &oldCheckpoint.Checkpoint.SequenceNumber

		if err := newCheckpointStore.UpdateCheckpoint(context.Background(), newCheckpoint, nil); err != nil {
			panic(err)
		}
	}
}

func loadOldCheckpoints(blobClient *azblob.Client, containerName string, customBlobPrefix string) ([]*LegacyCheckpoint, error) {
	blobPrefix := &customBlobPrefix

	if customBlobPrefix == "" {
		blobPrefix = nil
	}

	pager := blobClient.NewListBlobsFlatPager(containerName, &container.ListBlobsFlatOptions{
		Prefix: blobPrefix,
	})

	var checkpoints []*LegacyCheckpoint

	for pager.More() {
		page, err := pager.NextPage(context.Background())

		if err != nil {
			return nil, err
		}

		for _, item := range page.Segment.BlobItems {
			buff := [4000]byte{}

			len, err := blobClient.DownloadBuffer(context.Background(), containerName, *item.Name, buff[:], nil)

			if err != nil {
				return nil, err
			}

			var legacyCheckpoint *LegacyCheckpoint

			if err := json.Unmarshal(buff[0:len], &legacyCheckpoint); err != nil {
				return nil, err
			}

			checkpoints = append(checkpoints, legacyCheckpoint)
		}
	}

	return checkpoints, nil
}
Output:

Example (ProducingEventsUsingProducerClient)
package main

import (
	"context"
	"errors"
	"os"

	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {
	eventHubNamespace := os.Getenv("EVENTHUB_NAMESPACE") // <ex: myeventhubnamespace.servicebus.windows.net>
	eventHubName := os.Getenv("EVENTHUB_NAME")

	defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

	if err != nil {
		panic(err)
	}

	// Can also use a connection string:
	//
	// producerClient, err := azeventhubs.NewProducerClientFromConnectionString(connectionString, eventHubName, nil)
	//
	producerClient, err := azeventhubs.NewProducerClient(eventHubNamespace, eventHubName, defaultAzureCred, nil)

	if err != nil {
		panic(err)
	}

	defer producerClient.Close(context.TODO())

	events := createEventsForSample()

	newBatchOptions := &azeventhubs.EventDataBatchOptions{
		// The options allow you to control the size of the batch, as well as the partition it will get sent to.

		// PartitionID can be used to target a specific partition ID.
		// specific partition ID.
		//
		// PartitionID: partitionID,

		// PartitionKey can be used to ensure that messages that have the same key
		// will go to the same partition without requiring your application to specify
		// that partition ID.
		//
		// PartitionKey: partitionKey,

		//
		// Or, if you leave both PartitionID and PartitionKey nil, the service will choose a partition.
	}

	// Creates an EventDataBatch, which you can use to pack multiple events together, allowing for efficient transfer.
	batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

	if err != nil {
		panic(err)
	}

	for i := 0; i < len(events); i++ {
		err = batch.AddEventData(events[i], nil)

		if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
			if batch.NumEvents() == 0 {
				// This one event is too large for this batch, even on its own. No matter what we do it
				// will not be sendable at its current size.
				panic(err)
			}

			// This batch is full - we can send it and create a new one and continue
			// packaging and sending events.
			if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
				panic(err)
			}

			// create the next batch we'll use for events, ensuring that we use the same options
			// each time so all the messages go the same target.
			tmpBatch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

			if err != nil {
				panic(err)
			}

			batch = tmpBatch

			// rewind so we can retry adding this event to a batch
			i--
		} else if err != nil {
			panic(err)
		}
	}

	// if we have any events in the last batch, send it
	if batch.NumEvents() > 0 {
		if err := producerClient.SendEventDataBatch(context.TODO(), batch, nil); err != nil {
			panic(err)
		}
	}
}

func createEventsForSample() []*azeventhubs.EventData {
	return []*azeventhubs.EventData{
		{
			Body: []byte("hello"),
		},
		{
			Body: []byte("world"),
		},
	}
}
Output:

Index

Examples

Constants

View Source
const (
	// EventConn is used whenever we create a connection or any links (ie: producers, consumers).
	EventConn log.Event = exported.EventConn

	// EventAuth is used when we're doing authentication/claims negotiation.
	EventAuth log.Event = exported.EventAuth

	// EventProducer represents operations that happen on Producers.
	EventProducer log.Event = exported.EventProducer

	// EventConsumer represents operations that happen on Consumers.
	EventConsumer log.Event = exported.EventConsumer
)
View Source
const DefaultConsumerGroup = "$Default"

DefaultConsumerGroup is the name of the default consumer group in the Event Hubs service.

Variables

View Source
var ErrEventDataTooLarge = errors.New("the EventData could not be added because it is too large for the batch")

ErrEventDataTooLarge is returned when a message cannot fit into a batch when using the azeventhubs.EventDataBatch.AddEventData function.

Functions

This section is empty.

Types

type AMQPAnnotatedMessage added in v0.2.0

type AMQPAnnotatedMessage struct {
	// ApplicationProperties corresponds to the "application-properties" section of an AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	ApplicationProperties map[string]any

	// Body represents the body of an AMQP message.
	Body AMQPAnnotatedMessageBody

	// DeliveryAnnotations corresponds to the "delivery-annotations" section in an AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	DeliveryAnnotations map[any]any

	// DeliveryTag corresponds to the delivery-tag property of the TRANSFER frame
	// for this message.
	DeliveryTag []byte

	// Footer is the transport footers for this AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	Footer map[any]any

	// Header is the transport headers for this AMQP message.
	Header *AMQPAnnotatedMessageHeader

	// MessageAnnotations corresponds to the message-annotations section of an AMQP message.
	//
	// The values of the map are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	MessageAnnotations map[any]any

	// Properties corresponds to the properties section of an AMQP message.
	Properties *AMQPAnnotatedMessageProperties
}

AMQPAnnotatedMessage represents the AMQP message, as received from Event Hubs. For details about these properties, refer to the AMQP specification:

https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format

Some fields in this struct are typed 'any', which means they will accept AMQP primitives, or in some cases slices and maps.

AMQP simple types include: - int (any size), uint (any size) - float (any size) - string - bool - time.Time

type AMQPAnnotatedMessageBody added in v0.2.0

type AMQPAnnotatedMessageBody struct {
	// Data is encoded/decoded as multiple data sections in the body.
	Data [][]byte

	// Sequence is encoded/decoded as one or more amqp-sequence sections in the body.
	//
	// The values of the slices are are restricted to AMQP simple types, as listed in the comment for AMQPAnnotatedMessage.
	Sequence [][]any

	// Value is encoded/decoded as the amqp-value section in the body.
	//
	// The type of Value can be any of the AMQP simple types, as listed in the comment for AMQPAnnotatedMessage,
	// as well as slices or maps of AMQP simple types.
	Value any
}

AMQPAnnotatedMessageBody represents the body of an AMQP message. Only one of these fields can be used a a time. They are mutually exclusive.

type AMQPAnnotatedMessageHeader added in v0.2.0

type AMQPAnnotatedMessageHeader struct {
	// DeliveryCount is the number of unsuccessful previous attempts to deliver this message.
	// It corresponds to the 'delivery-count' property.
	DeliveryCount uint32

	// Durable corresponds to the 'durable' property.
	Durable bool

	// FirstAcquirer corresponds to the 'first-acquirer' property.
	FirstAcquirer bool

	// Priority corresponds to the 'priority' property.
	Priority uint8

	// TTL corresponds to the 'ttl' property.
	TTL time.Duration
}

AMQPAnnotatedMessageHeader carries standard delivery details about the transfer of a message. See https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-header for more details.

type AMQPAnnotatedMessageProperties added in v0.2.0

type AMQPAnnotatedMessageProperties struct {
	// AbsoluteExpiryTime corresponds to the 'absolute-expiry-time' property.
	AbsoluteExpiryTime *time.Time

	// ContentEncoding corresponds to the 'content-encoding' property.
	ContentEncoding *string

	// ContentType corresponds to the 'content-type' property
	ContentType *string

	// CorrelationID corresponds to the 'correlation-id' property.
	// The type of CorrelationID can be a uint64, UUID, []byte, or a string
	CorrelationID any

	// CreationTime corresponds to the 'creation-time' property.
	CreationTime *time.Time

	// GroupID corresponds to the 'group-id' property.
	GroupID *string

	// GroupSequence corresponds to the 'group-sequence' property.
	GroupSequence *uint32

	// MessageID corresponds to the 'message-id' property.
	// The type of MessageID can be a uint64, UUID, []byte, or string
	MessageID any

	// ReplyTo corresponds to the 'reply-to' property.
	ReplyTo *string

	// ReplyToGroupID corresponds to the 'reply-to-group-id' property.
	ReplyToGroupID *string

	// Subject corresponds to the 'subject' property.
	Subject *string

	// To corresponds to the 'to' property.
	To *string

	// UserID corresponds to the 'user-id' property.
	UserID []byte
}

AMQPAnnotatedMessageProperties represents the properties of an AMQP message. See here for more details: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-properties

type AddEventDataOptions

type AddEventDataOptions struct {
}

AddEventDataOptions contains optional parameters for the AddEventData function.

type Checkpoint added in v0.1.1

type Checkpoint struct {
	ConsumerGroup           string
	EventHubName            string
	FullyQualifiedNamespace string
	PartitionID             string

	Offset         *int64 // the last succesfully processed Offset.
	SequenceNumber *int64 // the last succesfully processed SequenceNumber.
}

Checkpoint tracks the last succesfully processed event in a partition.

type CheckpointStore added in v0.1.1

type CheckpointStore interface {
	// ClaimOwnership attempts to claim ownership of the partitions in partitionOwnership and returns
	// the actual partitions that were claimed.
	ClaimOwnership(ctx context.Context, partitionOwnership []Ownership, options *ClaimOwnershipOptions) ([]Ownership, error)

	// ListCheckpoints lists all the available checkpoints.
	ListCheckpoints(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListCheckpointsOptions) ([]Checkpoint, error)

	// ListOwnership lists all ownerships.
	ListOwnership(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListOwnershipOptions) ([]Ownership, error)

	// UpdateCheckpoint updates a specific checkpoint with a sequence and offset.
	UpdateCheckpoint(ctx context.Context, checkpoint Checkpoint, options *UpdateCheckpointOptions) error
}

CheckpointStore is used by multiple consumers to coordinate progress and ownership for partitions.

type ClaimOwnershipOptions added in v0.1.1

type ClaimOwnershipOptions struct {
}

ClaimOwnershipOptions contains optional parameters for the ClaimOwnership function

type ConnectionStringProperties added in v0.5.0

type ConnectionStringProperties = exported.ConnectionStringProperties

ConnectionStringProperties are the properties of a connection string as returned by ParseConnectionString.

func ParseConnectionString added in v0.5.0

func ParseConnectionString(connStr string) (ConnectionStringProperties, error)

ParseConnectionString takes a connection string from the Azure portal and returns the parsed representation.

There are two supported formats:

  1. Connection strings generated from the portal (or elsewhere) that contain an embedded key and keyname.
  2. A connection string with an embedded SharedAccessSignature: Endpoint=sb://<sb>.servicebus.windows.net;SharedAccessSignature=SharedAccessSignature sr=<sb>.servicebus.windows.net&sig=<base64-sig>&se=<expiry>&skn=<keyname>"

type ConsumerClient

type ConsumerClient struct {
	// contains filtered or unexported fields
}

ConsumerClient can create PartitionClient instances, which can read events from a partition.

func NewConsumerClient

func NewConsumerClient(fullyQualifiedNamespace string, eventHub string, consumerGroup string, credential azcore.TokenCredential, options *ConsumerClientOptions) (*ConsumerClient, error)

NewConsumerClient creates a ConsumerClient which uses an azcore.TokenCredential for authentication. You MUST call azeventhubs.ConsumerClient.Close on this client to avoid leaking resources.

The fullyQualifiedNamespace is the Event Hubs namespace name (ex: myeventhub.servicebus.windows.net) The credential is one of the credentials in the azidentity package.

Example
package main

import (
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient

func main() {
	defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

	if err != nil {
		panic(err)
	}

	consumerClient, err = azeventhubs.NewConsumerClient("<ex: myeventhubnamespace.servicebus.windows.net>", "eventhub-name", azeventhubs.DefaultConsumerGroup, defaultAzureCred, nil)

	if err != nil {
		panic(err)
	}
}
Output:

func NewConsumerClientFromConnectionString

func NewConsumerClientFromConnectionString(connectionString string, eventHub string, consumerGroup string, options *ConsumerClientOptions) (*ConsumerClient, error)

NewConsumerClientFromConnectionString creates a ConsumerClient from a connection string. You MUST call azeventhubs.ConsumerClient.Close on this client to avoid leaking resources.

connectionString can be one of two formats - with or without an EntityPath key.

When the connection string does not have an entity path, as shown below, the eventHub parameter cannot be empty and should contain the name of your event hub.

Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>

When the connection string DOES have an entity path, as shown below, the eventHub parameter must be empty.

Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>;
Example
package main

import (
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient
var err error

func main() {
	// if the connection string contains an EntityPath
	//
	connectionString := "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>"
	consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "", azeventhubs.DefaultConsumerGroup, nil)

	// or

	// if the connection string does not contain an EntityPath
	connectionString = "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
	consumerClient, err = azeventhubs.NewConsumerClientFromConnectionString(connectionString, "eventhub-name", azeventhubs.DefaultConsumerGroup, nil)

	if err != nil {
		panic(err)
	}
}
Output:

func (*ConsumerClient) Close

func (cc *ConsumerClient) Close(ctx context.Context) error

Close releases resources for this client.

func (*ConsumerClient) GetEventHubProperties

func (cc *ConsumerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)

GetEventHubProperties gets event hub properties, like the available partition IDs and when the Event Hub was created.

Example
package main

import (
	"context"
	"fmt"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient

func main() {
	eventHubProps, err := consumerClient.GetEventHubProperties(context.TODO(), nil)

	if err != nil {
		panic(err)
	}

	for _, partitionID := range eventHubProps.PartitionIDs {
		fmt.Printf("Partition ID: %s\n", partitionID)
	}
}
Output:

func (*ConsumerClient) GetPartitionProperties

func (cc *ConsumerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)

GetPartitionProperties gets properties for a specific partition. This includes data like the last enqueued sequence number, the first sequence number and when an event was last enqueued to the partition.

Example
package main

import (
	"context"
	"fmt"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient

func main() {
	partitionProps, err := consumerClient.GetPartitionProperties(context.TODO(), "partition-id", nil)

	if err != nil {
		panic(err)
	}

	fmt.Printf("First sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.BeginningSequenceNumber)
	fmt.Printf("Last sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.LastEnqueuedSequenceNumber)
}
Output:

func (*ConsumerClient) InstanceID added in v0.6.0

func (cc *ConsumerClient) InstanceID() string

InstanceID is the identifier for this ConsumerClient.

func (*ConsumerClient) NewPartitionClient added in v0.1.1

func (cc *ConsumerClient) NewPartitionClient(partitionID string, options *PartitionClientOptions) (*PartitionClient, error)

NewPartitionClient creates a client that can receive events from a partition. By default it starts at the latest point in the partition. This can be changed using the options parameter. You MUST call azeventhubs.PartitionClient.Close on the returned client to avoid leaking resources.

Example (ConfiguringPrefetch)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient

func main() {
	const partitionID = "0"

	// Prefetching configures the Event Hubs client to continually cache events, up to the configured size
	// in PartitionClientOptions.Prefetch. PartitionClient.ReceiveEvents will read from the cache first,
	// which can improve throughput in situations where you might normally be forced to request and wait
	// for more events.

	// By default, prefetch is enabled.
	partitionClient, err := consumerClient.NewPartitionClient(partitionID, nil)

	if err != nil {
		panic(err)
	}

	defer partitionClient.Close(context.TODO())

	// You can configure the prefetch buffer size as well. The default is 300.
	partitionClientWithCustomPrefetch, err := consumerClient.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{
		Prefetch: 301,
	})

	if err != nil {
		panic(err)
	}

	defer partitionClientWithCustomPrefetch.Close(context.TODO())

	// And prefetch can be disabled if you prefer to manually control the flow of events. Excess
	// events (that arrive after your ReceiveEvents() call has completed) will still be
	// buffered internally, but they will not be automatically replenished.
	partitionClientWithPrefetchDisabled, err := consumerClient.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{
		Prefetch: -1,
	})

	if err != nil {
		panic(err)
	}

	defer partitionClientWithPrefetchDisabled.Close(context.TODO())

	// Using a context with a timeout will allow ReceiveEvents() to return with events it
	// collected in a minute, or earlier if it actually gets all 100 events we requested.
	receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Minute)
	defer cancel()
	events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)

	if err != nil {
		panic(err)
	}

	for _, evt := range events {
		fmt.Printf("Body: %s\n", string(evt.Body))
	}
}
Output:

Example (ReceiveEvents)
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var consumerClient *azeventhubs.ConsumerClient

func main() {
	const partitionID = "0"

	partitionClient, err := consumerClient.NewPartitionClient(partitionID, nil)

	if err != nil {
		panic(err)
	}

	defer partitionClient.Close(context.TODO())

	// Using a context with a timeout will allow ReceiveEvents() to return with events it
	// collected in a minute, or earlier if it actually gets all 100 events we requested.
	receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Minute)
	defer cancel()
	events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)

	if err != nil {
		panic(err)
	}

	for _, evt := range events {
		fmt.Printf("Body: %s\n", string(evt.Body))
	}
}
Output:

type ConsumerClientOptions

type ConsumerClientOptions struct {
	// ApplicationID is used as the identifier when setting the User-Agent property.
	ApplicationID string

	// InstanceID is a unique name used to identify the consumer. This can help with
	// diagnostics as this name will be returned in error messages. By default,
	// an identifier will be automatically generated.
	InstanceID string

	// NewWebSocketConn is a function that can create a net.Conn for use with websockets.
	// For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go.
	NewWebSocketConn func(ctx context.Context, args WebSocketConnParams) (net.Conn, error)

	// RetryOptions controls how often operations are retried from this client and any
	// Receivers and Senders created from this client.
	RetryOptions RetryOptions

	// TLSConfig configures a client with a custom *tls.Config.
	TLSConfig *tls.Config
}

ConsumerClientOptions configures optional parameters for a ConsumerClient.

type Error added in v0.1.1

type Error = exported.Error

Error represents an Event Hub specific error. NOTE: the Code is considered part of the published API but the message that comes back from Error(), as well as the underlying wrapped error, are NOT and are subject to change.

type ErrorCode added in v0.2.0

type ErrorCode = exported.ErrorCode

ErrorCode is an error code, usable by consuming code to work with programatically.

const (
	// ErrorCodeConnectionLost means our connection was lost and all retry attempts failed.
	// This typically reflects an extended outage or connection disruption and may
	// require manual intervention.
	ErrorCodeConnectionLost ErrorCode = exported.ErrorCodeConnectionLost

	// ErrorCodeOwnershipLost means that a partition that you were reading from was opened
	// by another link with a higher epoch/owner level.
	ErrorCodeOwnershipLost ErrorCode = exported.ErrorCodeOwnershipLost
)

type EventData

type EventData struct {
	// Properties can be used to store custom metadata for a message.
	Properties map[string]any

	// Body is the payload for a message.
	Body []byte

	// ContentType describes the payload of the message, with a descriptor following
	// the format of Content-Type, specified by RFC2045 (ex: "application/json").
	ContentType *string

	// CorrelationID is a client-specific id that can be used to mark or identify messages
	// between clients.
	// CorrelationID can be a uint64, UUID, []byte, or string
	CorrelationID any

	// MessageID is an application-defined value that uniquely identifies
	// the message and its payload. The identifier is a free-form string.
	//
	// If enabled, the duplicate detection feature identifies and removes further submissions
	// of messages with the same MessageId.
	MessageID *string
}

EventData is an event that can be sent, using the ProducerClient, to an Event Hub.

type EventDataBatch

type EventDataBatch struct {
	// contains filtered or unexported fields
}

EventDataBatch is used to efficiently pack up EventData before sending it to Event Hubs.

EventDataBatch's are not meant to be created directly. Use azeventhubs.ProducerClient.NewEventDataBatch, which will create them with the proper size limit for your Event Hub.

func (*EventDataBatch) AddAMQPAnnotatedMessage added in v0.2.0

func (b *EventDataBatch) AddAMQPAnnotatedMessage(annotatedMessage *AMQPAnnotatedMessage, options *AddEventDataOptions) error

AddAMQPAnnotatedMessage adds an AMQPAnnotatedMessage to the batch, failing if the AMQPAnnotatedMessage would cause the EventDataBatch to be too large to send.

This size limit was set when the EventDataBatch was created, in options to azeventhubs.ProducerClient.NewEventDataBatch, or (by default) from Event Hubs itself.

Returns ErrMessageTooLarge if the message cannot fit, or a non-nil error for other failures.

func (*EventDataBatch) AddEventData

func (b *EventDataBatch) AddEventData(ed *EventData, options *AddEventDataOptions) error

AddEventData adds an EventData to the batch, failing if the EventData would cause the EventDataBatch to be too large to send.

This size limit was set when the EventDataBatch was created, in options to azeventhubs.ProducerClient.NewEventDataBatch, or (by default) from Event Hubs itself.

Returns ErrMessageTooLarge if the event cannot fit, or a non-nil error for other failures.

Example
package main

import (
	"context"
	"errors"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)

	if err != nil {
		panic(err)
	}

	// can be called multiple times with new messages until you
	// receive an azeventhubs.ErrMessageTooLarge
	err = batch.AddEventData(&azeventhubs.EventData{
		Body: []byte("hello"),
	}, nil)

	if errors.Is(err, azeventhubs.ErrEventDataTooLarge) {
		// Message was too large to fit into this batch.
		//
		// At this point you'd usually just send the batch (using ProducerClient.SendEventDataBatch),
		// create a new one, and start filling up the batch again.
		//
		// If this is the _only_ message being added to the batch then it's too big in general, and
		// will need to be split or shrunk to fit.
		panic(err)
	} else if err != nil {
		panic(err)
	}

	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}
Output:

Example (RawAMQPMessages)
package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)

	if err != nil {
		panic(err)
	}

	// This is functionally equivalent to EventDataBatch.AddEventData(), just with a more
	// advanced message format.
	// See ExampleEventDataBatch_AddEventData for more details.

	err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{
		Body: azeventhubs.AMQPAnnotatedMessageBody{
			Data: [][]byte{
				[]byte("hello"),
				[]byte("world"),
			},
		},
	}, nil)

	if err != nil {
		panic(err)
	}

	err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{
		Body: azeventhubs.AMQPAnnotatedMessageBody{
			Sequence: [][]any{
				// let the AMQP stack encode your strings (or other primitives) for you, no need
				// to convert them to bytes manually.
				{"hello", "world"},
				{"howdy", "world"},
			},
		},
	}, nil)

	if err != nil {
		panic(err)
	}

	err = batch.AddAMQPAnnotatedMessage(&azeventhubs.AMQPAnnotatedMessage{
		Body: azeventhubs.AMQPAnnotatedMessageBody{
			// let the AMQP stack encode your string (or other primitives) for you, no need
			// to convert them to bytes manually.
			Value: "hello world",
		},
	}, nil)

	if err != nil {
		panic(err)
	}

	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}
Output:

func (*EventDataBatch) NumBytes

func (b *EventDataBatch) NumBytes() uint64

NumBytes is the number of bytes in the batch.

func (*EventDataBatch) NumEvents added in v0.2.0

func (b *EventDataBatch) NumEvents() int32

NumEvents returns the number of events in the batch.

type EventDataBatchOptions added in v0.2.0

type EventDataBatchOptions struct {
	// MaxBytes overrides the max size (in bytes) for a batch.
	// By default NewEventDataBatch will use the max message size provided by the service.
	MaxBytes uint64

	// PartitionKey is hashed to calculate the partition assignment. Messages and message
	// batches with the same PartitionKey are guaranteed to end up in the same partition.
	// Note that if you use this option then PartitionID cannot be set.
	PartitionKey *string

	// PartitionID is the ID of the partition to send these messages to.
	// Note that if you use this option then PartitionKey cannot be set.
	PartitionID *string
}

EventDataBatchOptions contains optional parameters for the NewEventDataBatch function

type EventHubProperties

type EventHubProperties struct {
	CreatedOn    time.Time
	Name         string
	PartitionIDs []string
}

EventHubProperties represents properties of the Event Hub, like the number of partitions.

type GetEventHubPropertiesOptions

type GetEventHubPropertiesOptions struct {
}

GetEventHubPropertiesOptions contains optional parameters for the GetEventHubProperties function

type GetPartitionPropertiesOptions

type GetPartitionPropertiesOptions struct {
}

GetPartitionPropertiesOptions are the options for the GetPartitionProperties function.

type ListCheckpointsOptions added in v0.1.1

type ListCheckpointsOptions struct {
}

ListCheckpointsOptions contains optional parameters for the ListCheckpoints function

type ListOwnershipOptions added in v0.1.1

type ListOwnershipOptions struct {
}

ListOwnershipOptions contains optional parameters for the ListOwnership function

type Ownership added in v0.1.1

type Ownership struct {
	ConsumerGroup           string
	EventHubName            string
	FullyQualifiedNamespace string
	PartitionID             string

	OwnerID          string       // the owner ID of the Processor
	LastModifiedTime time.Time    // used when calculating if ownership has expired
	ETag             *azcore.ETag // the ETag, used when attempting to claim or update ownership of a partition.
}

Ownership tracks which consumer owns a particular partition.

type PartitionClient added in v0.1.1

type PartitionClient struct {
	// contains filtered or unexported fields
}

PartitionClient is used to receive events from an Event Hub partition.

This type is instantiated from the ConsumerClient type, using ConsumerClient.NewPartitionClient.

func (*PartitionClient) Close added in v0.1.1

func (pc *PartitionClient) Close(ctx context.Context) error

Close releases resources for this client.

func (*PartitionClient) ReceiveEvents added in v0.1.1

func (pc *PartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)

ReceiveEvents receives events until 'count' events have been received or the context has expired or been cancelled.

If your ReceiveEvents call appears to be stuck there are some common causes:

  1. The PartitionClientOptions.StartPosition defaults to "Latest" when the client is created. The connection is lazily initialized, so it's possible the link was initialized to a position after events you've sent. To make this deterministic, you can choose an explicit start point using sequence number, offset or a timestamp. See the [PartitionClientOptions.StartPosition] field for more details.

  2. You might have sent the events to a different partition than intended. By default, batches that are created using ProducerClient.NewEventDataBatch do not target a specific partition. When a partition is not specified, Azure Event Hubs service will choose the partition the events will be sent to.

    To fix this, you can specify a PartitionID as part of your [EventDataBatchOptions.PartitionID] options or open multiple PartitionClient instances, one for each partition. You can get the full list of partitions at runtime using ConsumerClient.GetEventHubProperties. See the "example_consuming_events_test.go" for an example of this pattern.

  3. Network issues can cause internal retries. To see log messages related to this use the instructions in the example function "Example_enableLogging".

type PartitionClientOptions added in v0.2.0

type PartitionClientOptions struct {
	// StartPosition is the position we will start receiving events from,
	// either an offset (inclusive) with Offset, or receiving events received
	// after a specific time using EnqueuedTime.
	StartPosition StartPosition

	// OwnerLevel is the priority for this partition client, also known as the 'epoch' level.
	// When used, a partition client with a higher OwnerLevel will take ownership of a partition
	// from partition clients with a lower OwnerLevel.
	// Default is off.
	OwnerLevel *int64

	// Prefetch represents the size of the internal prefetch buffer. When set,
	// this client will attempt to always maintain an internal cache of events of
	// this size, asynchronously, increasing the odds that ReceiveEvents() will use
	// a locally stored cache of events, rather than having to wait for events to
	// arrive from the network.
	//
	// Defaults to 300 events if Prefetch == 0.
	// Disabled if Prefetch < 0.
	Prefetch int32
}

PartitionClientOptions provides options for the NewPartitionClient function.

type PartitionProperties

type PartitionProperties struct {
	// BeginningSequenceNumber is the first sequence number for a partition.
	BeginningSequenceNumber int64
	// EventHubName is the name of the Event Hub for this partition.
	EventHubName string

	// IsEmpty is true if the partition is empty, false otherwise.
	IsEmpty bool

	// LastEnqueuedOffset is the offset of latest enqueued event.
	LastEnqueuedOffset int64

	// LastEnqueuedOn is the date of latest enqueued event.
	LastEnqueuedOn time.Time

	// LastEnqueuedSequenceNumber is the sequence number of the latest enqueued event.
	LastEnqueuedSequenceNumber int64

	// PartitionID is the partition ID of this partition.
	PartitionID string
}

PartitionProperties are the properties for a single partition.

type Processor added in v0.1.1

type Processor struct {
	// contains filtered or unexported fields
}

Processor uses a ConsumerClient and CheckpointStore to provide automatic load balancing between multiple Processor instances, even in separate processes or on separate machines.

See example_processor_test.go for an example, and the function documentation for [Run] for a more detailed description of how load balancing works.

func NewProcessor added in v0.1.1

func NewProcessor(consumerClient *ConsumerClient, checkpointStore CheckpointStore, options *ProcessorOptions) (*Processor, error)

NewProcessor creates a Processor.

More information can be found in the documentation for the azeventhubs.Processor type or the example_processor_test.go for an example.

func (*Processor) NextPartitionClient added in v0.1.1

func (p *Processor) NextPartitionClient(ctx context.Context) *ProcessorPartitionClient

NextPartitionClient will get the next owned [PartitionProcessorClient] if one is acquired or will block until a new one arrives or [processor.Run] is cancelled. You MUST call azeventhubs.ProcessorPartitionClient.Close on the returned client to avoid leaking resources.

This function is safe to call before [processor.Run] has been called and will typically be executed in a goroutine in a loop.

See example_processor_test.go for an example of typical usage.

func (*Processor) Run added in v0.1.1

func (p *Processor) Run(ctx context.Context) error

Run handles the load balancing loop, blocking until the passed in context is cancelled or it encounters an unrecoverable error. On cancellation, it will return a nil error.

This function should run for the lifetime of your application, or for as long as you want to continue to claim partitions.

As partitions are claimed new ProcessorPartitionClient instances will be returned from Processor.NextPartitionClient. This can happen at any time, based on new Processor instances coming online, as well as other Processors exiting.

ProcessorPartitionClient are used like a PartitionClient but provide an ProcessorPartitionClient.UpdateCheckpoint function that will store a checkpoint into the CheckpointStore. If the client were to crash, or be restarted it will pick up from that point.

See example_processor_test.go for an example of typical usage.

type ProcessorOptions added in v0.2.0

type ProcessorOptions struct {
	// LoadBalancingStrategy dictates how concurrent Processor instances distribute
	// ownership of partitions between them.
	// The default strategy is ProcessorStrategyBalanced.
	LoadBalancingStrategy ProcessorStrategy

	// UpdateInterval controls how often attempt to claim partitions.
	// The default value is 10 seconds.
	UpdateInterval time.Duration

	// PartitionExpirationDuration is the amount of time before a partition is considered
	// unowned.
	// The default value is 60 seconds.
	PartitionExpirationDuration time.Duration

	// StartPositions are the default start positions (configurable per partition, or with an overall
	// default value) if a checkpoint is not found in the CheckpointStore.
	// The default position is Latest.
	StartPositions StartPositions

	// Prefetch represents the size of the internal prefetch buffer for each ProcessorPartitionClient
	// created by this Processor. When set, this client will attempt to always maintain
	// an internal cache of events of this size, asynchronously, increasing the odds that
	// ReceiveEvents() will use a locally stored cache of events, rather than having to
	// wait for events to arrive from the network.
	//
	// Defaults to 300 events if Prefetch == 0.
	// Disabled if Prefetch < 0.
	Prefetch int32
}

ProcessorOptions are the options for the NewProcessor function.

type ProcessorPartitionClient added in v0.1.1

type ProcessorPartitionClient struct {
	// contains filtered or unexported fields
}

ProcessorPartitionClient allows you to receive events, similar to a PartitionClient, with integration into a checkpoint store for tracking progress.

This type is instantiated from Processor.NextPartitionClient, which handles dynamic load balancing.

See example_processor_test.go for an example of typical usage.

NOTE: If you do NOT want to use dynamic load balancing, and would prefer to track state and ownership manually, use the ConsumerClient type instead.

func (*ProcessorPartitionClient) Close added in v0.1.1

Close releases resources for the partition client. This does not close the ConsumerClient that the Processor was started with.

func (*ProcessorPartitionClient) PartitionID added in v0.1.1

func (p *ProcessorPartitionClient) PartitionID() string

PartitionID is the partition ID of the partition we're receiving from. This will not change during the lifetime of this ProcessorPartitionClient.

func (*ProcessorPartitionClient) ReceiveEvents added in v0.1.1

func (c *ProcessorPartitionClient) ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)

ReceiveEvents receives events until 'count' events have been received or the context has expired or been cancelled.

See PartitionClient.ReceiveEvents for more information, including troubleshooting.

func (*ProcessorPartitionClient) UpdateCheckpoint added in v0.1.1

func (p *ProcessorPartitionClient) UpdateCheckpoint(ctx context.Context, latestEvent *ReceivedEventData) error

UpdateCheckpoint updates the checkpoint store. This ensure that if the Processor is restarted it will start from after this point.

type ProcessorStrategy added in v0.1.1

type ProcessorStrategy string

ProcessorStrategy specifies the load balancing strategy used by the Processor.

const (
	// ProcessorStrategyBalanced will attempt to claim a single partition at a time, until each active
	// owner has an equal share of partitions.
	// This is the default strategy.
	ProcessorStrategyBalanced ProcessorStrategy = "balanced"

	// ProcessorStrategyGreedy will attempt to claim as many partitions at a time as it can, ignoring
	// balance.
	ProcessorStrategyGreedy ProcessorStrategy = "greedy"
)

type ProducerClient

type ProducerClient struct {
	// contains filtered or unexported fields
}

ProducerClient can be used to send events to an Event Hub.

func NewProducerClient

func NewProducerClient(fullyQualifiedNamespace string, eventHub string, credential azcore.TokenCredential, options *ProducerClientOptions) (*ProducerClient, error)

NewProducerClient creates a ProducerClient which uses an azcore.TokenCredential for authentication. You MUST call azeventhubs.ProducerClient.Close on this client to avoid leaking resources.

The fullyQualifiedNamespace is the Event Hubs namespace name (ex: myeventhub.servicebus.windows.net) The credential is one of the credentials in the azidentity package.

Example
package main

import (
	"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	defaultAzureCred, err := azidentity.NewDefaultAzureCredential(nil)

	if err != nil {
		panic(err)
	}

	producerClient, err = azeventhubs.NewProducerClient("<ex: myeventhubnamespace.servicebus.windows.net>", "eventhub-name", defaultAzureCred, nil)

	if err != nil {
		panic(err)
	}
}
Output:

func NewProducerClientFromConnectionString

func NewProducerClientFromConnectionString(connectionString string, eventHub string, options *ProducerClientOptions) (*ProducerClient, error)

NewProducerClientFromConnectionString creates a ProducerClient from a connection string. You MUST call azeventhubs.ProducerClient.Close on this client to avoid leaking resources.

connectionString can be one of two formats - with or without an EntityPath key.

When the connection string does not have an entity path, as shown below, the eventHub parameter cannot be empty and should contain the name of your event hub.

Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>

When the connection string DOES have an entity path, as shown below, the eventHub parameter must be empty.

Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>;
Example
// if the connection string contains an EntityPath
//
connectionString := "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<entity path>"
producerClient, err = azeventhubs.NewProducerClientFromConnectionString(connectionString, "", nil)

// or

// if the connection string does not contain an EntityPath
connectionString = "Endpoint=sb://<your-namespace>.servicebus.windows.net/;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"
producerClient, err = azeventhubs.NewProducerClientFromConnectionString(connectionString, "eventhub-name", nil)

if err != nil {
	panic(err)
}
Output:

func (*ProducerClient) Close

func (pc *ProducerClient) Close(ctx context.Context) error

Close releases resources for this client.

func (*ProducerClient) GetEventHubProperties

func (pc *ProducerClient) GetEventHubProperties(ctx context.Context, options *GetEventHubPropertiesOptions) (EventHubProperties, error)

GetEventHubProperties gets event hub properties, like the available partition IDs and when the Event Hub was created.

Example
package main

import (
	"context"
	"fmt"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	eventHubProps, err := producerClient.GetEventHubProperties(context.TODO(), nil)

	if err != nil {
		panic(err)
	}

	for _, partitionID := range eventHubProps.PartitionIDs {
		fmt.Printf("Partition ID: %s\n", partitionID)
	}
}
Output:

func (*ProducerClient) GetPartitionProperties

func (pc *ProducerClient) GetPartitionProperties(ctx context.Context, partitionID string, options *GetPartitionPropertiesOptions) (PartitionProperties, error)

GetPartitionProperties gets properties for a specific partition. This includes data like the last enqueued sequence number, the first sequence number and when an event was last enqueued to the partition.

Example
package main

import (
	"context"
	"fmt"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	partitionProps, err := producerClient.GetPartitionProperties(context.TODO(), "partition-id", nil)

	if err != nil {
		panic(err)
	}

	fmt.Printf("First sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.BeginningSequenceNumber)
	fmt.Printf("Last sequence number for partition ID %s: %d\n", partitionProps.PartitionID, partitionProps.LastEnqueuedSequenceNumber)
}
Output:

func (*ProducerClient) NewEventDataBatch

func (pc *ProducerClient) NewEventDataBatch(ctx context.Context, options *EventDataBatchOptions) (*EventDataBatch, error)

NewEventDataBatch can be used to create an EventDataBatch, which can contain multiple events.

EventDataBatch contains logic to make sure that the it doesn't exceed the maximum size for the Event Hubs link, using it's azeventhubs.EventDataBatch.AddEventData function. A lower size limit can also be configured through the options.

If the operation fails it can return an azeventhubs.Error type if the failure is actionable.

func (*ProducerClient) SendEventDataBatch added in v0.3.0

func (pc *ProducerClient) SendEventDataBatch(ctx context.Context, batch *EventDataBatch, options *SendEventDataBatchOptions) error

SendEventDataBatch sends an event data batch to Event Hubs.

Example
package main

import (
	"context"

	"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

var producerClient *azeventhubs.ProducerClient

func main() {
	batch, err := producerClient.NewEventDataBatch(context.TODO(), nil)

	if err != nil {
		panic(err)
	}

	// See ExampleProducerClient_AddEventData for more information.
	err = batch.AddEventData(&azeventhubs.EventData{Body: []byte("hello")}, nil)

	if err != nil {
		panic(err)
	}

	err = producerClient.SendEventDataBatch(context.TODO(), batch, nil)

	if err != nil {
		panic(err)
	}
}
Output:

type ProducerClientOptions

type ProducerClientOptions struct {
	// Application ID that will be passed to the namespace.
	ApplicationID string

	// NewWebSocketConn is a function that can create a net.Conn for use with websockets.
	// For an example, see ExampleNewClient_usingWebsockets() function in example_client_test.go.
	NewWebSocketConn func(ctx context.Context, params WebSocketConnParams) (net.Conn, error)

	// RetryOptions controls how often operations are retried from this client and any
	// Receivers and Senders created from this client.
	RetryOptions RetryOptions

	// TLSConfig configures a client with a custom *tls.Config.
	TLSConfig *tls.Config
}

ProducerClientOptions contains options for the `NewProducerClient` and `NewProducerClientFromConnectionString` functions.

type ReceiveEventsOptions

type ReceiveEventsOptions struct {
}

ReceiveEventsOptions contains optional parameters for the ReceiveEvents function

type ReceivedEventData

type ReceivedEventData struct {
	EventData

	// EnqueuedTime is the UTC time when the message was accepted and stored by Event Hubs.
	EnqueuedTime *time.Time

	// PartitionKey is used with a partitioned entity and enables assigning related messages
	// to the same internal partition. This ensures that the submission sequence order is correctly
	// recorded. The partition is chosen by a hash function in Event Hubs and cannot be chosen
	// directly.
	PartitionKey *string

	// Offset is the offset of the event.
	Offset *int64

	// RawAMQPMessage is the AMQP message, as received by the client. This can be useful to get access
	// to properties that are not exposed by ReceivedEventData such as payloads encoded into the
	// Value or Sequence section, payloads sent as multiple Data sections, as well as Footer
	// and Header fields.
	RawAMQPMessage *AMQPAnnotatedMessage

	// SequenceNumber is a unique number assigned to a message by Event Hubs.
	SequenceNumber int64

	// Properties set by the Event Hubs service.
	SystemProperties map[string]any
}

ReceivedEventData is an event that has been received using the ConsumerClient.

type RetryOptions

type RetryOptions = exported.RetryOptions

RetryOptions represent the options for retries.

type SendEventDataBatchOptions added in v0.3.0

type SendEventDataBatchOptions struct {
}

SendEventDataBatchOptions contains optional parameters for the SendEventDataBatch function

type StartPosition

type StartPosition struct {
	// Offset will start the consumer after the specified offset. Can be exclusive
	// or inclusive, based on the Inclusive property.
	// NOTE: offsets are not stable values, and might refer to different events over time
	// as the Event Hub events reach their age limit and are discarded.
	Offset *int64

	// SequenceNumber will start the consumer after the specified sequence number. Can be exclusive
	// or inclusive, based on the Inclusive property.
	SequenceNumber *int64

	// EnqueuedTime will start the consumer before events that were enqueued on or after EnqueuedTime.
	// Can be exclusive or inclusive, based on the Inclusive property.
	EnqueuedTime *time.Time

	// Inclusive configures whether the events directly at Offset, SequenceNumber or EnqueuedTime will be included (true)
	// or excluded (false).
	Inclusive bool

	// Earliest will start the consumer at the earliest event.
	Earliest *bool

	// Latest will start the consumer after the last event.
	Latest *bool
}

StartPosition indicates the position to start receiving events within a partition. The default position is Latest.

You can set this in the options for ConsumerClient.

type StartPositions added in v0.1.1

type StartPositions struct {
	// PerPartition controls the start position for a specific partition,
	// by partition ID. If a partition is not configured here it will default
	// to Default start position.
	PerPartition map[string]StartPosition

	// Default is used if the partition is not found in the PerPartition map.
	Default StartPosition
}

StartPositions are used if there is no checkpoint for a partition in the checkpoint store.

type UpdateCheckpointOptions added in v0.1.1

type UpdateCheckpointOptions struct {
}

UpdateCheckpointOptions contains optional parameters for the UpdateCheckpoint function

type WebSocketConnParams added in v0.2.0

type WebSocketConnParams = exported.WebSocketConnParams

WebSocketConnParams are passed to your web socket creation function (ClientOptions.NewWebSocketConn)

Directories

Path Synopsis
amqpwrap
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
Package amqpwrap has some simple wrappers to make it easier to abstract the go-amqp types.
auth
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
Package auth provides an abstraction over claims-based security for Azure Event Hub and Service Bus.
eh
mock
Package mock is a generated GoMock package.
Package mock is a generated GoMock package.
sas

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL